KAFKA-14519; [2/N] New coordinator metrics (#14387)

This patch copy over existing metrics and add new consumer group metrics to the new GroupCoordinatorService.

Now that each coordinator is responsible for a topic partition, this patch introduces a GroupCoordinatorMetrics that records gauges for global metrics such as the number of generic groups in PreparingRebalance state, etc. For GroupCoordinatorShard specific metrics, GroupCoordinatorMetrics will activate new GroupCoordinatorMetricsShards that will be responsible for incrementing/decrementing TimelineLong objects and then aggregate the total amount across all shards.

As the CoordinatorRuntime/CoordinatorShard does not care about group metadata, we have introduced a CoordinatorMetrics.java/CoordinatorMetricsShard.java so that in the future transaction coordinator metrics can also be onboarded in a similar fashion.

Main files to look at:

GroupCoordinatorMetrics.java
GroupCoordinatorMetricsShard.java
CoordinatorMetrics.java
CoordinatorMetricsShard.java
CoordinatorRuntime.java
Metrics to add after #14408 is merged:

offset deletions sensor (OffsetDeletions); Meter(offset-deletion-rate, offset-deletion-count)
Metrics to add after https://issues.apache.org/jira/browse/KAFKA-14987 is merged:

offset expired sensor (OffsetExpired); Meter(offset-expiration-rate, offset-expiration-count)

Reviewers: Justine Olshan <jolshan@confluent.io>
This commit is contained in:
Jeff Kim 2023-11-21 00:38:50 -05:00 committed by GitHub
parent c7c82baf87
commit 07fee62afe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 2055 additions and 101 deletions

View File

@ -249,6 +249,7 @@
<allow pkg="org.apache.kafka.test" />
<allow pkg="org.apache.kafka.timeline" />
<subpackage name="metrics">
<allow pkg="com.yammer.metrics"/>
<allow pkg="org.apache.kafka.common.metrics" />
<allow pkg="org.apache.kafka.server.metrics" />
</subpackage>

View File

@ -36,7 +36,7 @@ import org.apache.kafka.common.security.token.delegation.internals.DelegationTok
import org.apache.kafka.common.utils.{LogContext, Time}
import org.apache.kafka.common.{ClusterResource, KafkaException, TopicPartition, Uuid}
import org.apache.kafka.coordinator.group
import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorRuntimeMetrics
import org.apache.kafka.coordinator.group.metrics.{GroupCoordinatorMetrics, GroupCoordinatorRuntimeMetrics}
import org.apache.kafka.coordinator.group.util.SystemTimerReaper
import org.apache.kafka.coordinator.group.{GroupCoordinator, GroupCoordinatorConfig, GroupCoordinatorService, RecordSerde}
import org.apache.kafka.image.publisher.MetadataPublisher
@ -588,6 +588,7 @@ class BrokerServer(
.withLoader(loader)
.withWriter(writer)
.withCoordinatorRuntimeMetrics(new GroupCoordinatorRuntimeMetrics(metrics))
.withGroupCoordinatorMetrics(new GroupCoordinatorMetrics(KafkaYammerMetrics.defaultRegistry, metrics))
.build()
} else {
GroupCoordinatorAdapter(

View File

@ -64,6 +64,7 @@ import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.coordinator.group.metrics.CoordinatorRuntimeMetrics;
import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics;
import org.apache.kafka.coordinator.group.runtime.CoordinatorShardBuilderSupplier;
import org.apache.kafka.coordinator.group.runtime.CoordinatorEventProcessor;
import org.apache.kafka.coordinator.group.runtime.CoordinatorLoader;
@ -105,6 +106,7 @@ public class GroupCoordinatorService implements GroupCoordinator {
private Time time;
private Timer timer;
private CoordinatorRuntimeMetrics coordinatorRuntimeMetrics;
private GroupCoordinatorMetrics groupCoordinatorMetrics;
public Builder(
int nodeId,
@ -139,6 +141,11 @@ public class GroupCoordinatorService implements GroupCoordinator {
return this;
}
public Builder withGroupCoordinatorMetrics(GroupCoordinatorMetrics groupCoordinatorMetrics) {
this.groupCoordinatorMetrics = groupCoordinatorMetrics;
return this;
}
public GroupCoordinatorService build() {
if (config == null)
throw new IllegalArgumentException("Config must be set.");
@ -152,6 +159,8 @@ public class GroupCoordinatorService implements GroupCoordinator {
throw new IllegalArgumentException("Timer must be set.");
if (coordinatorRuntimeMetrics == null)
throw new IllegalArgumentException("CoordinatorRuntimeMetrics must be set.");
if (groupCoordinatorMetrics == null)
throw new IllegalArgumentException("GroupCoordinatorMetrics must be set.");
String logPrefix = String.format("GroupCoordinator id=%d", nodeId);
LogContext logContext = new LogContext(String.format("[%s] ", logPrefix));
@ -179,12 +188,14 @@ public class GroupCoordinatorService implements GroupCoordinator {
.withCoordinatorShardBuilderSupplier(supplier)
.withTime(time)
.withCoordinatorRuntimeMetrics(coordinatorRuntimeMetrics)
.withCoordinatorMetrics(groupCoordinatorMetrics)
.build();
return new GroupCoordinatorService(
logContext,
config,
runtime
runtime,
groupCoordinatorMetrics
);
}
}
@ -204,6 +215,11 @@ public class GroupCoordinatorService implements GroupCoordinator {
*/
private final CoordinatorRuntime<GroupCoordinatorShard, Record> runtime;
/**
* The metrics registry.
*/
private final GroupCoordinatorMetrics groupCoordinatorMetrics;
/**
* Boolean indicating whether the coordinator is active or not.
*/
@ -224,11 +240,13 @@ public class GroupCoordinatorService implements GroupCoordinator {
GroupCoordinatorService(
LogContext logContext,
GroupCoordinatorConfig config,
CoordinatorRuntime<GroupCoordinatorShard, Record> runtime
CoordinatorRuntime<GroupCoordinatorShard, Record> runtime,
GroupCoordinatorMetrics groupCoordinatorMetrics
) {
this.log = logContext.logger(CoordinatorLoader.class);
this.config = config;
this.runtime = runtime;
this.groupCoordinatorMetrics = groupCoordinatorMetrics;
}
/**
@ -950,6 +968,7 @@ public class GroupCoordinatorService implements GroupCoordinator {
log.info("Shutting down.");
isActive.set(false);
Utils.closeQuietly(runtime, "coordinator runtime");
Utils.closeQuietly(groupCoordinatorMetrics, "group coordinator metrics");
log.info("Shutdown complete.");
}

View File

@ -16,6 +16,7 @@
*/
package org.apache.kafka.coordinator.group;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
import org.apache.kafka.common.message.DescribeGroupsResponseData;
@ -57,6 +58,10 @@ import org.apache.kafka.coordinator.group.generated.GroupMetadataKey;
import org.apache.kafka.coordinator.group.generated.GroupMetadataValue;
import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
import org.apache.kafka.coordinator.group.metrics.CoordinatorMetrics;
import org.apache.kafka.coordinator.group.metrics.CoordinatorMetricsShard;
import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics;
import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard;
import org.apache.kafka.coordinator.group.runtime.CoordinatorShard;
import org.apache.kafka.coordinator.group.runtime.CoordinatorShardBuilder;
import org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
@ -90,6 +95,8 @@ public class GroupCoordinatorShard implements CoordinatorShard<Record> {
private SnapshotRegistry snapshotRegistry;
private Time time;
private CoordinatorTimer<Void, Record> timer;
private CoordinatorMetrics coordinatorMetrics;
private TopicPartition topicPartition;
public Builder(
GroupCoordinatorConfig config
@ -121,6 +128,20 @@ public class GroupCoordinatorShard implements CoordinatorShard<Record> {
return this;
}
@Override
public CoordinatorShardBuilder<GroupCoordinatorShard, Record> withCoordinatorMetrics(
CoordinatorMetrics coordinatorMetrics
) {
this.coordinatorMetrics = coordinatorMetrics;
return this;
}
@Override
public CoordinatorShardBuilder<GroupCoordinatorShard, Record> withTopicPartition(TopicPartition topicPartition) {
this.topicPartition = topicPartition;
return this;
}
@Override
public CoordinatorShardBuilder<GroupCoordinatorShard, Record> withSnapshotRegistry(
SnapshotRegistry snapshotRegistry
@ -140,6 +161,13 @@ public class GroupCoordinatorShard implements CoordinatorShard<Record> {
throw new IllegalArgumentException("Time must be set.");
if (timer == null)
throw new IllegalArgumentException("Timer must be set.");
if (coordinatorMetrics == null || !(coordinatorMetrics instanceof GroupCoordinatorMetrics))
throw new IllegalArgumentException("CoordinatorMetrics must be set and be of type GroupCoordinatorMetrics.");
if (topicPartition == null)
throw new IllegalArgumentException("TopicPartition must be set.");
GroupCoordinatorMetricsShard metricsShard = ((GroupCoordinatorMetrics) coordinatorMetrics)
.newMetricsShard(snapshotRegistry, topicPartition);
GroupMetadataManager groupMetadataManager = new GroupMetadataManager.Builder()
.withLogContext(logContext)
@ -153,6 +181,7 @@ public class GroupCoordinatorShard implements CoordinatorShard<Record> {
.withGenericGroupNewMemberJoinTimeoutMs(config.genericGroupNewMemberJoinTimeoutMs)
.withGenericGroupMinSessionTimeoutMs(config.genericGroupMinSessionTimeoutMs)
.withGenericGroupMaxSessionTimeoutMs(config.genericGroupMaxSessionTimeoutMs)
.withGroupCoordinatorMetricsShard(metricsShard)
.build();
OffsetMetadataManager offsetMetadataManager = new OffsetMetadataManager.Builder()
@ -161,6 +190,7 @@ public class GroupCoordinatorShard implements CoordinatorShard<Record> {
.withTime(time)
.withGroupMetadataManager(groupMetadataManager)
.withGroupCoordinatorConfig(config)
.withGroupCoordinatorMetricsShard(metricsShard)
.build();
return new GroupCoordinatorShard(
@ -168,7 +198,9 @@ public class GroupCoordinatorShard implements CoordinatorShard<Record> {
groupMetadataManager,
offsetMetadataManager,
timer,
config
config,
coordinatorMetrics,
metricsShard
);
}
}
@ -205,25 +237,41 @@ public class GroupCoordinatorShard implements CoordinatorShard<Record> {
*/
private final GroupCoordinatorConfig config;
/**
* The coordinator metrics.
*/
private final CoordinatorMetrics coordinatorMetrics;
/**
* The coordinator metrics shard.
*/
private final CoordinatorMetricsShard metricsShard;
/**
* Constructor.
*
* @param logContext The log context.
* @param groupMetadataManager The group metadata manager.
* @param offsetMetadataManager The offset metadata manager.
* @param coordinatorMetrics The coordinator metrics.
* @param metricsShard The coordinator metrics shard.
*/
GroupCoordinatorShard(
LogContext logContext,
GroupMetadataManager groupMetadataManager,
OffsetMetadataManager offsetMetadataManager,
CoordinatorTimer<Void, Record> timer,
GroupCoordinatorConfig config
GroupCoordinatorConfig config,
CoordinatorMetrics coordinatorMetrics,
CoordinatorMetricsShard metricsShard
) {
this.log = logContext.logger(GroupCoordinatorShard.class);
this.groupMetadataManager = groupMetadataManager;
this.offsetMetadataManager = offsetMetadataManager;
this.timer = timer;
this.config = config;
this.coordinatorMetrics = coordinatorMetrics;
this.metricsShard = metricsShard;
}
/**
@ -503,6 +551,7 @@ public class GroupCoordinatorShard implements CoordinatorShard<Record> {
MetadataDelta emptyDelta = new MetadataDelta(newImage);
groupMetadataManager.onNewMetadataImage(newImage, emptyDelta);
offsetMetadataManager.onNewMetadataImage(newImage, emptyDelta);
coordinatorMetrics.activateMetricsShard(metricsShard);
groupMetadataManager.onLoaded();
scheduleGroupMetadataExpiration();
@ -511,6 +560,7 @@ public class GroupCoordinatorShard implements CoordinatorShard<Record> {
@Override
public void onUnloaded() {
timer.cancel(GROUP_EXPIRATION_KEY);
coordinatorMetrics.deactivateMetricsShard(metricsShard);
}
/**

View File

@ -74,6 +74,7 @@ import org.apache.kafka.coordinator.group.generated.GroupMetadataValue;
import org.apache.kafka.coordinator.group.generic.GenericGroup;
import org.apache.kafka.coordinator.group.generic.GenericGroupMember;
import org.apache.kafka.coordinator.group.generic.GenericGroupState;
import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard;
import org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
import org.apache.kafka.coordinator.group.runtime.CoordinatorTimer;
import org.apache.kafka.image.MetadataDelta;
@ -120,6 +121,9 @@ import static org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD;
import static org.apache.kafka.coordinator.group.generic.GenericGroupState.EMPTY;
import static org.apache.kafka.coordinator.group.generic.GenericGroupState.PREPARING_REBALANCE;
import static org.apache.kafka.coordinator.group.generic.GenericGroupState.STABLE;
import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.GENERIC_GROUP_COMPLETED_REBALANCES_SENSOR_NAME;
import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.CONSUMER_GROUP_REBALANCES_SENSOR_NAME;
import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.GENERIC_GROUP_REBALANCES_SENSOR_NAME;
/**
* The GroupMetadataManager manages the metadata of all generic and consumer groups. It holds
@ -148,6 +152,7 @@ public class GroupMetadataManager {
private int genericGroupNewMemberJoinTimeoutMs = 5 * 60 * 1000;
private int genericGroupMinSessionTimeoutMs;
private int genericGroupMaxSessionTimeoutMs;
private GroupCoordinatorMetricsShard metrics;
Builder withLogContext(LogContext logContext) {
this.logContext = logContext;
@ -224,6 +229,11 @@ public class GroupMetadataManager {
return this;
}
Builder withGroupCoordinatorMetricsShard(GroupCoordinatorMetricsShard metrics) {
this.metrics = metrics;
return this;
}
GroupMetadataManager build() {
if (logContext == null) logContext = new LogContext();
if (snapshotRegistry == null) snapshotRegistry = new SnapshotRegistry(logContext);
@ -234,12 +244,15 @@ public class GroupMetadataManager {
throw new IllegalArgumentException("Timer must be set.");
if (consumerGroupAssignors == null || consumerGroupAssignors.isEmpty())
throw new IllegalArgumentException("Assignors must be set before building.");
if (metrics == null)
throw new IllegalArgumentException("GroupCoordinatorMetricsShard must be set.");
return new GroupMetadataManager(
snapshotRegistry,
logContext,
time,
timer,
metrics,
consumerGroupAssignors,
metadataImage,
consumerGroupMaxSize,
@ -280,6 +293,11 @@ public class GroupMetadataManager {
*/
private final CoordinatorTimer<Void, Record> timer;
/**
* The coordinator metrics.
*/
private final GroupCoordinatorMetricsShard metrics;
/**
* The supported partition assignors keyed by their name.
*/
@ -364,6 +382,7 @@ public class GroupMetadataManager {
LogContext logContext,
Time time,
CoordinatorTimer<Void, Record> timer,
GroupCoordinatorMetricsShard metrics,
List<PartitionAssignor> assignors,
MetadataImage metadataImage,
int consumerGroupMaxSize,
@ -381,6 +400,7 @@ public class GroupMetadataManager {
this.snapshotRegistry = snapshotRegistry;
this.time = time;
this.timer = timer;
this.metrics = metrics;
this.metadataImage = metadataImage;
this.assignors = assignors.stream().collect(Collectors.toMap(PartitionAssignor::name, Function.identity()));
this.defaultAssignor = assignors.get(0);
@ -522,7 +542,7 @@ public class GroupMetadataManager {
}
if (group == null) {
ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, groupId);
ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, groupId, metrics);
groups.put(groupId, consumerGroup);
return consumerGroup;
} else {
@ -560,7 +580,7 @@ public class GroupMetadataManager {
}
if (group == null) {
GenericGroup genericGroup = new GenericGroup(logContext, groupId, GenericGroupState.EMPTY, time);
GenericGroup genericGroup = new GenericGroup(logContext, groupId, GenericGroupState.EMPTY, time, metrics);
groups.put(groupId, genericGroup);
return genericGroup;
} else {
@ -893,6 +913,7 @@ public class GroupMetadataManager {
groupEpoch += 1;
records.add(newGroupEpochRecord(groupId, groupEpoch));
log.info("[GroupId {}] Bumped group epoch to {}.", groupId, groupEpoch);
metrics.record(CONSUMER_GROUP_REBALANCES_SENSOR_NAME);
}
group.setMetadataRefreshDeadline(currentTimeMs + consumerGroupMetadataRefreshIntervalMs, groupEpoch);
@ -1332,6 +1353,7 @@ public class GroupMetadataManager {
+ " but did not receive ConsumerGroupTargetAssignmentMetadataValue tombstone.");
}
removeGroup(groupId);
metrics.onConsumerGroupStateTransition(consumerGroup.state(), null);
}
}
@ -1540,6 +1562,11 @@ public class GroupMetadataManager {
if (value == null) {
// Tombstone. Group should be removed.
Group group = groups.get(groupId);
if (group != null && group.type() == GENERIC) {
GenericGroup genericGroup = (GenericGroup) group;
metrics.onGenericGroupStateTransition(genericGroup.currentState(), null);
}
removeGroup(groupId);
} else {
List<GenericGroupMember> loadedMembers = new ArrayList<>();
@ -1574,6 +1601,7 @@ public class GroupMetadataManager {
groupId,
loadedMembers.isEmpty() ? EMPTY : STABLE,
time,
metrics,
value.generation(),
protocolType == null || protocolType.isEmpty() ? Optional.empty() : Optional.of(protocolType),
Optional.ofNullable(value.protocol()),
@ -2357,6 +2385,7 @@ public class GroupMetadataManager {
}
group.transitionTo(PREPARING_REBALANCE);
metrics.record(GENERIC_GROUP_REBALANCES_SENSOR_NAME);
log.info("Preparing to rebalance group {} in state {} with old generation {} (reason: {}).",
group.groupId(), group.currentState(), group.generationId(), reason);
@ -2823,6 +2852,7 @@ public class GroupMetadataManager {
// Update group's assignment and propagate to all members.
setAndPropagateAssignment(group, assignment);
group.transitionTo(STABLE);
metrics.record(GENERIC_GROUP_COMPLETED_REBALANCES_SENSOR_NAME);
}
}
});

View File

@ -37,6 +37,8 @@ import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
import org.apache.kafka.coordinator.group.generic.GenericGroup;
import org.apache.kafka.coordinator.group.generic.GenericGroupState;
import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard;
import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics;
import org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
@ -55,6 +57,9 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.apache.kafka.common.requests.OffsetFetchResponse.INVALID_OFFSET;
import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.NUM_OFFSETS;
import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.OFFSET_DELETIONS_SENSOR_NAME;
import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.OFFSET_EXPIRED_SENSOR_NAME;
/**
* The OffsetMetadataManager manages the offsets of all the groups. It basically maintains
@ -74,6 +79,7 @@ public class OffsetMetadataManager {
private GroupMetadataManager groupMetadataManager = null;
private MetadataImage metadataImage = null;
private GroupCoordinatorConfig config = null;
private GroupCoordinatorMetricsShard metrics = null;
Builder withLogContext(LogContext logContext) {
this.logContext = logContext;
@ -105,6 +111,11 @@ public class OffsetMetadataManager {
return this;
}
Builder withGroupCoordinatorMetricsShard(GroupCoordinatorMetricsShard metrics) {
this.metrics = metrics;
return this;
}
public OffsetMetadataManager build() {
if (logContext == null) logContext = new LogContext();
if (snapshotRegistry == null) snapshotRegistry = new SnapshotRegistry(logContext);
@ -115,13 +126,18 @@ public class OffsetMetadataManager {
throw new IllegalArgumentException("GroupMetadataManager cannot be null");
}
if (metrics == null) {
throw new IllegalArgumentException("GroupCoordinatorMetricsShard cannot be null");
}
return new OffsetMetadataManager(
snapshotRegistry,
logContext,
time,
metadataImage,
groupMetadataManager,
config
config,
metrics
);
}
}
@ -151,6 +167,11 @@ public class OffsetMetadataManager {
*/
private final GroupMetadataManager groupMetadataManager;
/**
* The coordinator metrics.
*/
private final GroupCoordinatorMetricsShard metrics;
/**
* The group coordinator config.
*/
@ -167,7 +188,8 @@ public class OffsetMetadataManager {
Time time,
MetadataImage metadataImage,
GroupMetadataManager groupMetadataManager,
GroupCoordinatorConfig config
GroupCoordinatorConfig config,
GroupCoordinatorMetricsShard metrics
) {
this.snapshotRegistry = snapshotRegistry;
this.log = logContext.logger(OffsetMetadataManager.class);
@ -175,6 +197,7 @@ public class OffsetMetadataManager {
this.metadataImage = metadataImage;
this.groupMetadataManager = groupMetadataManager;
this.config = config;
this.metrics = metrics;
this.offsetsByGroup = new TimelineHashMap<>(snapshotRegistry, 0);
}
@ -352,6 +375,10 @@ public class OffsetMetadataManager {
});
});
if (!records.isEmpty()) {
metrics.record(GroupCoordinatorMetrics.OFFSET_COMMITS_SENSOR_NAME, records.size());
}
return new CoordinatorResult<>(records, response);
}
@ -408,6 +435,7 @@ public class OffsetMetadataManager {
.setPartitions(responsePartitionCollection)
);
});
metrics.record(OFFSET_DELETIONS_SENSOR_NAME, records.size());
return new CoordinatorResult<>(
records,
@ -595,6 +623,7 @@ public class OffsetMetadataManager {
log.info("[GroupId {}] Expiring offsets of partitions (allOffsetsExpired={}): {}",
groupId, allOffsetsExpired, String.join(", ", expiredPartitions));
}
metrics.record(OFFSET_EXPIRED_SENSOR_NAME, expiredPartitions.size());
return allOffsetsExpired.get();
}
@ -710,7 +739,9 @@ public class OffsetMetadataManager {
.computeIfAbsent(groupId, __ -> new TimelineHashMap<>(snapshotRegistry, 0));
TimelineHashMap<Integer, OffsetAndMetadata> partitionOffsets = topicOffsets
.computeIfAbsent(topic, __ -> new TimelineHashMap<>(snapshotRegistry, 0));
partitionOffsets.put(partition, offsetAndMetadata);
if (partitionOffsets.put(partition, offsetAndMetadata) == null) {
metrics.incrementLocalGauge(NUM_OFFSETS);
}
}
/**
@ -734,6 +765,7 @@ public class OffsetMetadataManager {
return;
partitionOffsets.remove(partition);
metrics.decrementLocalGauge(NUM_OFFSETS);
if (partitionOffsets.isEmpty())
topicOffsets.remove(topic);

View File

@ -28,6 +28,7 @@ import org.apache.kafka.coordinator.group.OffsetExpirationCondition;
import org.apache.kafka.coordinator.group.OffsetExpirationConditionImpl;
import org.apache.kafka.coordinator.group.Record;
import org.apache.kafka.coordinator.group.RecordHelpers;
import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard;
import org.apache.kafka.image.ClusterImage;
import org.apache.kafka.image.TopicImage;
import org.apache.kafka.image.TopicsImage;
@ -45,6 +46,11 @@ import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import static org.apache.kafka.coordinator.group.consumer.ConsumerGroup.ConsumerGroupState.ASSIGNING;
import static org.apache.kafka.coordinator.group.consumer.ConsumerGroup.ConsumerGroupState.EMPTY;
import static org.apache.kafka.coordinator.group.consumer.ConsumerGroup.ConsumerGroupState.RECONCILING;
import static org.apache.kafka.coordinator.group.consumer.ConsumerGroup.ConsumerGroupState.STABLE;
/**
* A Consumer Group. All the metadata in this class are backed by
* records in the __consumer_offsets partitions.
@ -143,6 +149,11 @@ public class ConsumerGroup implements Group {
*/
private final TimelineHashMap<Uuid, TimelineHashMap<Integer, Integer>> currentPartitionEpoch;
/**
* The coordinator metrics.
*/
private final GroupCoordinatorMetricsShard metrics;
/**
* The metadata refresh deadline. It consists of a timestamp in milliseconds together with
* the group epoch at the time of setting it. The metadata refresh time is considered as a
@ -157,11 +168,12 @@ public class ConsumerGroup implements Group {
public ConsumerGroup(
SnapshotRegistry snapshotRegistry,
String groupId
String groupId,
GroupCoordinatorMetricsShard metrics
) {
this.snapshotRegistry = Objects.requireNonNull(snapshotRegistry);
this.groupId = Objects.requireNonNull(groupId);
this.state = new TimelineObject<>(snapshotRegistry, ConsumerGroupState.EMPTY);
this.state = new TimelineObject<>(snapshotRegistry, EMPTY);
this.groupEpoch = new TimelineInteger(snapshotRegistry);
this.members = new TimelineHashMap<>(snapshotRegistry, 0);
this.serverAssignors = new TimelineHashMap<>(snapshotRegistry, 0);
@ -170,6 +182,9 @@ public class ConsumerGroup implements Group {
this.targetAssignmentEpoch = new TimelineInteger(snapshotRegistry);
this.targetAssignment = new TimelineHashMap<>(snapshotRegistry, 0);
this.currentPartitionEpoch = new TimelineHashMap<>(snapshotRegistry, 0);
this.metrics = Objects.requireNonNull(metrics);
metrics.onConsumerGroupStateTransition(null, this.state.get());
}
/**
@ -672,20 +687,23 @@ public class ConsumerGroup implements Group {
* Updates the current state of the group.
*/
private void maybeUpdateGroupState() {
ConsumerGroupState previousState = state.get();
ConsumerGroupState newState = STABLE;
if (members.isEmpty()) {
state.set(ConsumerGroupState.EMPTY);
newState = EMPTY;
} else if (groupEpoch.get() > targetAssignmentEpoch.get()) {
state.set(ConsumerGroupState.ASSIGNING);
newState = ASSIGNING;
} else {
for (ConsumerGroupMember member : members.values()) {
if (member.targetMemberEpoch() != targetAssignmentEpoch.get() || member.state() != ConsumerGroupMember.MemberState.STABLE) {
state.set(ConsumerGroupState.RECONCILING);
return;
newState = RECONCILING;
break;
}
}
state.set(ConsumerGroupState.STABLE);
}
state.set(newState);
metrics.onConsumerGroupStateTransition(previousState, newState);
}
/**

View File

@ -37,6 +37,7 @@ import org.apache.kafka.coordinator.group.OffsetExpirationCondition;
import org.apache.kafka.coordinator.group.OffsetExpirationConditionImpl;
import org.apache.kafka.coordinator.group.Record;
import org.apache.kafka.coordinator.group.RecordHelpers;
import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard;
import org.slf4j.Logger;
import java.nio.ByteBuffer;
@ -185,17 +186,24 @@ public class GenericGroup implements Group {
*/
private boolean newMemberAdded = false;
/**
* Coordinator metrics.
*/
private final GroupCoordinatorMetricsShard metrics;
public GenericGroup(
LogContext logContext,
String groupId,
GenericGroupState initialState,
Time time
Time time,
GroupCoordinatorMetricsShard metrics
) {
this(
logContext,
groupId,
initialState,
time,
metrics,
0,
Optional.empty(),
Optional.empty(),
@ -209,6 +217,7 @@ public class GenericGroup implements Group {
String groupId,
GenericGroupState initialState,
Time time,
GroupCoordinatorMetricsShard metrics,
int generationId,
Optional<String> protocolType,
Optional<String> protocolName,
@ -221,11 +230,13 @@ public class GenericGroup implements Group {
this.state = Objects.requireNonNull(initialState);
this.previousState = DEAD;
this.time = Objects.requireNonNull(time);
this.metrics = Objects.requireNonNull(metrics);
this.generationId = generationId;
this.protocolType = protocolType;
this.protocolName = protocolName;
this.leaderId = leaderId;
this.currentStateTimestamp = currentStateTimestamp;
metrics.onGenericGroupStateTransition(null, initialState);
}
/**
@ -973,6 +984,7 @@ public class GenericGroup implements Group {
previousState = state;
state = groupState;
currentStateTimestamp = Optional.of(time.milliseconds());
metrics.onGenericGroupStateTransition(previousState, state);
}
/**

View File

@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.metrics;
import com.yammer.metrics.core.MetricName;
import com.yammer.metrics.core.MetricsRegistry;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.server.metrics.KafkaYammerMetrics;
import org.apache.kafka.timeline.SnapshotRegistry;
public abstract class CoordinatorMetrics {
public abstract CoordinatorMetricsShard newMetricsShard(SnapshotRegistry snapshotRegistry, TopicPartition tp);
public abstract void activateMetricsShard(CoordinatorMetricsShard shard);
public abstract void deactivateMetricsShard(CoordinatorMetricsShard shard);
public abstract MetricsRegistry registry();
public static MetricName getMetricName(String group, String type, String name) {
return KafkaYammerMetrics.getMetricName(group, type, name);
}
public abstract void onUpdateLastCommittedOffset(TopicPartition tp, long offset);
}

View File

@ -0,0 +1,97 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.metrics;
import com.yammer.metrics.core.MetricName;
import org.apache.kafka.common.TopicPartition;
/**
* A CoordinatorMetricsShard is mapped to a single CoordinatorShard. For gauges, each metrics shard increments/decrements
* based on the operations performed. Then, {@link CoordinatorMetrics} will perform aggregations across all shards.
*
* For sensors, each shard individually records the observed values.
*/
public interface CoordinatorMetricsShard {
/**
* Increment a global gauge.
*
* @param metricName the metric name.
*/
void incrementGlobalGauge(MetricName metricName);
/**
* Increment a local gauge.
*
* @param metricName the metric name.
*/
void incrementLocalGauge(MetricName metricName);
/**
* Decrement a global gauge.
*
* @param metricName the metric name.
*/
void decrementGlobalGauge(MetricName metricName);
/**
* Decrement a local gauge.
*
* @param metricName the metric name.
*/
void decrementLocalGauge(MetricName metricName);
/**
* Obtain the current value of a global gauge.
*
* @param metricName the metric name.
*/
long globalGaugeValue(MetricName metricName);
/**
* Obtain the current value of a local gauge.
*
* @param metricName the metric name.
*/
long localGaugeValue(MetricName metricName);
/**
* Increment the value of a sensor.
*
* @param sensorName the sensor name.
*/
void record(String sensorName);
/**
* Record a sensor with a value.
*
* @param sensorName the sensor name.
* @param val the value to record.
*/
void record(String sensorName, double val);
/**
* @return The topic partition.
*/
TopicPartition topicPartition();
/**
* Commits all gauges backed by the snapshot registry.
*
* @param offset The last committed offset.
*/
void commitUpTo(long offset);
}

View File

@ -0,0 +1,380 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.metrics;
import com.yammer.metrics.core.Gauge;
import com.yammer.metrics.core.MetricName;
import com.yammer.metrics.core.MetricsRegistry;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Meter;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.metrics.KafkaYammerMetrics;
import org.apache.kafka.timeline.SnapshotRegistry;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;
/**
* These are the metrics which are managed by the {@link org.apache.kafka.coordinator.group.GroupMetadataManager} class.
* They generally pertain to aspects of group management, such as the number of groups in different states.
*/
public class GroupCoordinatorMetrics extends CoordinatorMetrics implements AutoCloseable {
public static final String METRICS_GROUP = "group-coordinator-metrics";
public final static MetricName NUM_OFFSETS = getMetricName(
"GroupMetadataManager", "NumOffsets");
public final static MetricName NUM_GENERIC_GROUPS = getMetricName(
"GroupMetadataManager", "NumGroups");
public final static MetricName NUM_GENERIC_GROUPS_PREPARING_REBALANCE = getMetricName(
"GroupMetadataManager", "NumGroupsPreparingRebalance");
public final static MetricName NUM_GENERIC_GROUPS_COMPLETING_REBALANCE = getMetricName(
"GroupMetadataManager", "NumGroupsCompletingRebalance");
public final static MetricName NUM_GENERIC_GROUPS_STABLE = getMetricName(
"GroupMetadataManager", "NumGroupsStable");
public final static MetricName NUM_GENERIC_GROUPS_DEAD = getMetricName(
"GroupMetadataManager", "NumGroupsDead");
public final static MetricName NUM_GENERIC_GROUPS_EMPTY = getMetricName(
"GroupMetadataManager", "NumGroupsEmpty");
public final static MetricName NUM_CONSUMER_GROUPS = getMetricName(
"GroupMetadataManager", "NumConsumerGroups");
public final static MetricName NUM_CONSUMER_GROUPS_EMPTY = getMetricName(
"GroupMetadataManager", "NumConsumerGroupsEmpty");
public final static MetricName NUM_CONSUMER_GROUPS_ASSIGNING = getMetricName(
"GroupMetadataManager", "NumConsumerGroupsAssigning");
public final static MetricName NUM_CONSUMER_GROUPS_RECONCILING = getMetricName(
"GroupMetadataManager", "NumConsumerGroupsReconciling");
public final static MetricName NUM_CONSUMER_GROUPS_STABLE = getMetricName(
"GroupMetadataManager", "NumConsumerGroupsStable");
public final static MetricName NUM_CONSUMER_GROUPS_DEAD = getMetricName(
"GroupMetadataManager", "NumConsumerGroupsDead");
public static final String OFFSET_COMMITS_SENSOR_NAME = "OffsetCommits";
public static final String OFFSET_EXPIRED_SENSOR_NAME = "OffsetExpired";
public static final String OFFSET_DELETIONS_SENSOR_NAME = "OffsetDeletions";
public static final String GENERIC_GROUP_COMPLETED_REBALANCES_SENSOR_NAME = "CompletedRebalances";
public static final String GENERIC_GROUP_REBALANCES_SENSOR_NAME = "GenericGroupRebalances";
public static final String CONSUMER_GROUP_REBALANCES_SENSOR_NAME = "ConsumerGroupRebalances";
private final MetricsRegistry registry;
private final Metrics metrics;
private final Map<TopicPartition, CoordinatorMetricsShard> shards = new HashMap<>();
private static final AtomicLong NUM_GENERIC_GROUPS_PREPARING_REBALANCE_COUNTER = new AtomicLong(0);
private static final AtomicLong NUM_GENERIC_GROUPS_COMPLETING_REBALANCE_COUNTER = new AtomicLong(0);
private static final AtomicLong NUM_GENERIC_GROUPS_STABLE_COUNTER = new AtomicLong(0);
private static final AtomicLong NUM_GENERIC_GROUPS_DEAD_COUNTER = new AtomicLong(0);
private static final AtomicLong NUM_GENERIC_GROUPS_EMPTY_COUNTER = new AtomicLong(0);
/**
* Global sensors. These are shared across all metrics shards.
*/
public final Map<String, Sensor> globalSensors;
/**
* Global gauge counters. These are shared across all metrics shards.
*/
public static final Map<String, AtomicLong> GLOBAL_GAUGES = Collections.unmodifiableMap(Utils.mkMap(
Utils.mkEntry(NUM_GENERIC_GROUPS_PREPARING_REBALANCE.getName(), NUM_GENERIC_GROUPS_PREPARING_REBALANCE_COUNTER),
Utils.mkEntry(NUM_GENERIC_GROUPS_COMPLETING_REBALANCE.getName(), NUM_GENERIC_GROUPS_COMPLETING_REBALANCE_COUNTER),
Utils.mkEntry(NUM_GENERIC_GROUPS_STABLE.getName(), NUM_GENERIC_GROUPS_STABLE_COUNTER),
Utils.mkEntry(NUM_GENERIC_GROUPS_DEAD.getName(), NUM_GENERIC_GROUPS_DEAD_COUNTER),
Utils.mkEntry(NUM_GENERIC_GROUPS_EMPTY.getName(), NUM_GENERIC_GROUPS_EMPTY_COUNTER)
));
public GroupCoordinatorMetrics() {
this(KafkaYammerMetrics.defaultRegistry(), new Metrics());
}
public GroupCoordinatorMetrics(MetricsRegistry registry, Metrics metrics) {
this.registry = Objects.requireNonNull(registry);
this.metrics = Objects.requireNonNull(metrics);
registerGauges();
Sensor offsetCommitsSensor = metrics.sensor(OFFSET_COMMITS_SENSOR_NAME);
offsetCommitsSensor.add(new Meter(
metrics.metricName("offset-commit-rate",
METRICS_GROUP,
"The rate of committed offsets"),
metrics.metricName("offset-commit-count",
METRICS_GROUP,
"The total number of committed offsets")));
Sensor offsetExpiredSensor = metrics.sensor(OFFSET_EXPIRED_SENSOR_NAME);
offsetExpiredSensor.add(new Meter(
metrics.metricName("offset-expiration-rate",
METRICS_GROUP,
"The rate of expired offsets"),
metrics.metricName("offset-expiration-count",
METRICS_GROUP,
"The total number of expired offsets")));
Sensor offsetDeletionsSensor = metrics.sensor(OFFSET_DELETIONS_SENSOR_NAME);
offsetDeletionsSensor.add(new Meter(
metrics.metricName("offset-deletion-rate",
METRICS_GROUP,
"The rate of administrative deleted offsets"),
metrics.metricName("offset-deletion-count",
METRICS_GROUP,
"The total number of administrative deleted offsets")));
Sensor genericGroupCompletedRebalancesSensor = metrics.sensor(GENERIC_GROUP_COMPLETED_REBALANCES_SENSOR_NAME);
genericGroupCompletedRebalancesSensor.add(new Meter(
metrics.metricName("group-completed-rebalance-rate",
METRICS_GROUP,
"The rate of generic group completed rebalances"),
metrics.metricName("group-completed-rebalance-count",
METRICS_GROUP,
"The total number of generic group completed rebalances")));
Sensor genericGroupPreparingRebalancesSensor = metrics.sensor(GENERIC_GROUP_REBALANCES_SENSOR_NAME);
genericGroupPreparingRebalancesSensor.add(new Meter(
metrics.metricName("group-rebalance-rate",
METRICS_GROUP,
"The rate of generic group preparing rebalances"),
metrics.metricName("group-rebalance-count",
METRICS_GROUP,
"The total number of generic group preparing rebalances")));
Sensor consumerGroupRebalanceSensor = metrics.sensor(CONSUMER_GROUP_REBALANCES_SENSOR_NAME);
consumerGroupRebalanceSensor.add(new Meter(
metrics.metricName("consumer-group-rebalance-rate",
METRICS_GROUP,
"The rate of consumer group rebalances"),
metrics.metricName("consumer-group-rebalance-count",
METRICS_GROUP,
"The total number of consumer group rebalances")));
globalSensors = Collections.unmodifiableMap(Utils.mkMap(
Utils.mkEntry(OFFSET_COMMITS_SENSOR_NAME, offsetCommitsSensor),
Utils.mkEntry(OFFSET_EXPIRED_SENSOR_NAME, offsetExpiredSensor),
Utils.mkEntry(OFFSET_DELETIONS_SENSOR_NAME, offsetDeletionsSensor),
Utils.mkEntry(GENERIC_GROUP_COMPLETED_REBALANCES_SENSOR_NAME, genericGroupCompletedRebalancesSensor),
Utils.mkEntry(GENERIC_GROUP_REBALANCES_SENSOR_NAME, genericGroupPreparingRebalancesSensor),
Utils.mkEntry(CONSUMER_GROUP_REBALANCES_SENSOR_NAME, consumerGroupRebalanceSensor)
));
}
public Long numOffsets() {
return shards.values().stream().mapToLong(shard -> shard.localGaugeValue(NUM_OFFSETS)).sum();
}
public Long numGenericGroups() {
return shards.values().stream().mapToLong(shard -> shard.localGaugeValue(NUM_GENERIC_GROUPS)).sum();
}
public Long numGenericGroupsPreparingRebalanceCount() {
return NUM_GENERIC_GROUPS_PREPARING_REBALANCE_COUNTER.get();
}
public Long numGenericGroupsCompletingRebalanceCount() {
return NUM_GENERIC_GROUPS_COMPLETING_REBALANCE_COUNTER.get();
}
public Long numGenericGroupsStableCount() {
return NUM_GENERIC_GROUPS_STABLE_COUNTER.get();
}
public Long numGenericGroupsDeadCount() {
return NUM_GENERIC_GROUPS_DEAD_COUNTER.get();
}
public Long numGenericGroupsEmptyCount() {
return NUM_GENERIC_GROUPS_EMPTY_COUNTER.get();
}
public long numConsumerGroups() {
return shards.values().stream().mapToLong(shard -> shard.localGaugeValue(NUM_CONSUMER_GROUPS)).sum();
}
public long numConsumerGroupsEmpty() {
return shards.values().stream().mapToLong(shard -> shard.localGaugeValue(NUM_CONSUMER_GROUPS_EMPTY)).sum();
}
public long numConsumerGroupsAssigning() {
return shards.values().stream().mapToLong(shard -> shard.localGaugeValue(NUM_CONSUMER_GROUPS_ASSIGNING)).sum();
}
public long numConsumerGroupsReconciling() {
return shards.values().stream().mapToLong(shard -> shard.localGaugeValue(NUM_CONSUMER_GROUPS_RECONCILING)).sum();
}
public long numConsumerGroupsStable() {
return shards.values().stream().mapToLong(shard -> shard.localGaugeValue(NUM_CONSUMER_GROUPS_STABLE)).sum();
}
public long numConsumerGroupsDead() {
return shards.values().stream().mapToLong(shard -> shard.localGaugeValue(NUM_CONSUMER_GROUPS_DEAD)).sum();
}
@Override
public void close() {
Arrays.asList(
NUM_OFFSETS,
NUM_GENERIC_GROUPS,
NUM_GENERIC_GROUPS_PREPARING_REBALANCE,
NUM_GENERIC_GROUPS_COMPLETING_REBALANCE,
NUM_GENERIC_GROUPS_STABLE,
NUM_GENERIC_GROUPS_DEAD,
NUM_GENERIC_GROUPS_EMPTY,
NUM_CONSUMER_GROUPS,
NUM_CONSUMER_GROUPS_EMPTY,
NUM_CONSUMER_GROUPS_ASSIGNING,
NUM_CONSUMER_GROUPS_RECONCILING,
NUM_CONSUMER_GROUPS_STABLE,
NUM_CONSUMER_GROUPS_DEAD
).forEach(registry::removeMetric);
Arrays.asList(
OFFSET_COMMITS_SENSOR_NAME,
OFFSET_EXPIRED_SENSOR_NAME,
OFFSET_DELETIONS_SENSOR_NAME,
GENERIC_GROUP_COMPLETED_REBALANCES_SENSOR_NAME,
GENERIC_GROUP_REBALANCES_SENSOR_NAME,
CONSUMER_GROUP_REBALANCES_SENSOR_NAME
).forEach(metrics::removeSensor);
}
@Override
public GroupCoordinatorMetricsShard newMetricsShard(SnapshotRegistry snapshotRegistry, TopicPartition tp) {
return new GroupCoordinatorMetricsShard(snapshotRegistry, globalSensors, GLOBAL_GAUGES, tp);
}
@Override
public void activateMetricsShard(CoordinatorMetricsShard shard) {
shards.put(shard.topicPartition(), shard);
}
@Override
public void deactivateMetricsShard(CoordinatorMetricsShard shard) {
shards.remove(shard.topicPartition());
}
@Override
public MetricsRegistry registry() {
return this.registry;
}
@Override
public void onUpdateLastCommittedOffset(TopicPartition tp, long offset) {
CoordinatorMetricsShard shard = shards.get(tp);
if (shard != null) {
shard.commitUpTo(offset);
}
}
public static MetricName getMetricName(String type, String name) {
return getMetricName("kafka.coordinator.group", type, name);
}
private void registerGauges() {
registry.newGauge(NUM_OFFSETS, new Gauge<Long>() {
@Override
public Long value() {
return numOffsets();
}
});
registry.newGauge(NUM_GENERIC_GROUPS, new Gauge<Long>() {
@Override
public Long value() {
return numGenericGroups();
}
});
registry.newGauge(NUM_GENERIC_GROUPS_PREPARING_REBALANCE, new Gauge<Long>() {
@Override
public Long value() {
return numGenericGroupsPreparingRebalanceCount();
}
});
registry.newGauge(NUM_GENERIC_GROUPS_COMPLETING_REBALANCE, new Gauge<Long>() {
@Override
public Long value() {
return numGenericGroupsCompletingRebalanceCount();
}
});
registry.newGauge(NUM_GENERIC_GROUPS_STABLE, new Gauge<Long>() {
@Override
public Long value() {
return numGenericGroupsStableCount();
}
});
registry.newGauge(NUM_GENERIC_GROUPS_DEAD, new Gauge<Long>() {
@Override
public Long value() {
return numGenericGroupsDeadCount();
}
});
registry.newGauge(NUM_GENERIC_GROUPS_EMPTY, new Gauge<Long>() {
@Override
public Long value() {
return numGenericGroupsEmptyCount();
}
});
registry.newGauge(NUM_CONSUMER_GROUPS, new Gauge<Long>() {
@Override
public Long value() {
return numConsumerGroups();
}
});
registry.newGauge(NUM_CONSUMER_GROUPS_EMPTY, new Gauge<Long>() {
@Override
public Long value() {
return numConsumerGroupsEmpty();
}
});
registry.newGauge(NUM_CONSUMER_GROUPS_ASSIGNING, new Gauge<Long>() {
@Override
public Long value() {
return numConsumerGroupsAssigning();
}
});
registry.newGauge(NUM_CONSUMER_GROUPS_RECONCILING, new Gauge<Long>() {
@Override
public Long value() {
return numConsumerGroupsReconciling();
}
});
registry.newGauge(NUM_CONSUMER_GROUPS_STABLE, new Gauge<Long>() {
@Override
public Long value() {
return numConsumerGroupsStable();
}
});
registry.newGauge(NUM_CONSUMER_GROUPS_DEAD, new Gauge<Long>() {
@Override
public Long value() {
return numConsumerGroupsDead();
}
});
}
}

View File

@ -0,0 +1,325 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.metrics;
import com.yammer.metrics.core.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
import org.apache.kafka.coordinator.group.generic.GenericGroupState;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.apache.kafka.timeline.TimelineLong;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;
import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.NUM_CONSUMER_GROUPS;
import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.NUM_CONSUMER_GROUPS_ASSIGNING;
import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.NUM_CONSUMER_GROUPS_DEAD;
import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.NUM_CONSUMER_GROUPS_EMPTY;
import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.NUM_CONSUMER_GROUPS_RECONCILING;
import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.NUM_CONSUMER_GROUPS_STABLE;
import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.NUM_GENERIC_GROUPS;
import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.NUM_GENERIC_GROUPS_COMPLETING_REBALANCE;
import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.NUM_GENERIC_GROUPS_DEAD;
import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.NUM_GENERIC_GROUPS_EMPTY;
import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.NUM_GENERIC_GROUPS_PREPARING_REBALANCE;
import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.NUM_GENERIC_GROUPS_STABLE;
import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.NUM_OFFSETS;
/**
* This class is mapped to a single {@link org.apache.kafka.coordinator.group.GroupCoordinatorShard}. It will
* record all metrics that the shard handles with respect to {@link org.apache.kafka.coordinator.group.OffsetMetadataManager}
* and {@link org.apache.kafka.coordinator.group.GroupMetadataManager} operations.
*
* Local gauges will be recorded in this class which will be gathered by {@link GroupCoordinatorMetrics} to
* report.
*/
public class GroupCoordinatorMetricsShard implements CoordinatorMetricsShard {
/**
* This class represents a gauge counter for this shard. The TimelineLong object represents a gauge backed by
* the snapshot registry. Once we commit to a certain offset in the snapshot registry, we write the given
* TimelineLong's value to the AtomicLong. This AtomicLong represents the actual gauge counter that is queried
* when reporting the value to {@link GroupCoordinatorMetrics}.
*/
private static class TimelineGaugeCounter {
final TimelineLong timelineLong;
final AtomicLong atomicLong;
public TimelineGaugeCounter(TimelineLong timelineLong, AtomicLong atomicLong) {
this.timelineLong = timelineLong;
this.atomicLong = atomicLong;
}
}
/**
* Local timeline gauge counters keyed by the metric name.
*/
private final Map<String, TimelineGaugeCounter> localGauges;
/**
* All sensors keyed by the sensor name. A Sensor object is shared across all metrics shards.
*/
private final Map<String, Sensor> globalSensors;
/**
* Global gauge counters keyed by the metric name. The same counter is shared across all metrics shards.
*/
private final Map<String, AtomicLong> globalGauges;
/**
* The topic partition.
*/
private final TopicPartition topicPartition;
public GroupCoordinatorMetricsShard(
SnapshotRegistry snapshotRegistry,
Map<String, Sensor> globalSensors,
Map<String, AtomicLong> globalGauges,
TopicPartition topicPartition
) {
Objects.requireNonNull(snapshotRegistry);
TimelineLong numOffsetsTimeline = new TimelineLong(snapshotRegistry);
TimelineLong numGenericGroupsTimeline = new TimelineLong(snapshotRegistry);
TimelineLong numConsumerGroupsTimeline = new TimelineLong(snapshotRegistry);
TimelineLong numConsumerGroupsEmptyTimeline = new TimelineLong(snapshotRegistry);
TimelineLong numConsumerGroupsAssigningTimeline = new TimelineLong(snapshotRegistry);
TimelineLong numConsumerGroupsReconcilingTimeline = new TimelineLong(snapshotRegistry);
TimelineLong numConsumerGroupsStableTimeline = new TimelineLong(snapshotRegistry);
TimelineLong numConsumerGroupsDeadTimeline = new TimelineLong(snapshotRegistry);
this.localGauges = Collections.unmodifiableMap(Utils.mkMap(
Utils.mkEntry(NUM_OFFSETS.getName(),
new TimelineGaugeCounter(numOffsetsTimeline, new AtomicLong(0))),
Utils.mkEntry(NUM_GENERIC_GROUPS.getName(),
new TimelineGaugeCounter(numGenericGroupsTimeline, new AtomicLong(0))),
Utils.mkEntry(NUM_CONSUMER_GROUPS.getName(),
new TimelineGaugeCounter(numConsumerGroupsTimeline, new AtomicLong(0))),
Utils.mkEntry(NUM_CONSUMER_GROUPS_EMPTY.getName(),
new TimelineGaugeCounter(numConsumerGroupsEmptyTimeline, new AtomicLong(0))),
Utils.mkEntry(NUM_CONSUMER_GROUPS_ASSIGNING.getName(),
new TimelineGaugeCounter(numConsumerGroupsAssigningTimeline, new AtomicLong(0))),
Utils.mkEntry(NUM_CONSUMER_GROUPS_RECONCILING.getName(),
new TimelineGaugeCounter(numConsumerGroupsReconcilingTimeline, new AtomicLong(0))),
Utils.mkEntry(NUM_CONSUMER_GROUPS_STABLE.getName(),
new TimelineGaugeCounter(numConsumerGroupsStableTimeline, new AtomicLong(0))),
Utils.mkEntry(NUM_CONSUMER_GROUPS_DEAD.getName(),
new TimelineGaugeCounter(numConsumerGroupsDeadTimeline, new AtomicLong(0)))
));
this.globalSensors = Objects.requireNonNull(globalSensors);
this.globalGauges = Objects.requireNonNull(globalGauges);
this.topicPartition = Objects.requireNonNull(topicPartition);
}
@Override
public void incrementGlobalGauge(MetricName metricName) {
AtomicLong gaugeCounter = globalGauges.get(metricName.getName());
if (gaugeCounter != null) {
gaugeCounter.incrementAndGet();
}
}
@Override
public void incrementLocalGauge(MetricName metricName) {
TimelineGaugeCounter gaugeCounter = localGauges.get(metricName.getName());
if (gaugeCounter != null) {
synchronized (gaugeCounter.timelineLong) {
gaugeCounter.timelineLong.increment();
}
}
}
@Override
public void decrementGlobalGauge(MetricName metricName) {
AtomicLong gaugeCounter = globalGauges.get(metricName.getName());
if (gaugeCounter != null) {
gaugeCounter.decrementAndGet();
}
}
@Override
public void decrementLocalGauge(MetricName metricName) {
TimelineGaugeCounter gaugeCounter = localGauges.get(metricName.getName());
if (gaugeCounter != null) {
synchronized (gaugeCounter.timelineLong) {
gaugeCounter.timelineLong.decrement();
}
}
}
@Override
public long globalGaugeValue(MetricName metricName) {
AtomicLong gaugeCounter = globalGauges.get(metricName.getName());
if (gaugeCounter != null) {
return gaugeCounter.get();
}
return 0;
}
@Override
public long localGaugeValue(MetricName metricName) {
TimelineGaugeCounter gaugeCounter = localGauges.get(metricName.getName());
if (gaugeCounter != null) {
return gaugeCounter.atomicLong.get();
}
return 0;
}
@Override
public void record(String sensorName) {
Sensor sensor = globalSensors.get(sensorName);
if (sensor != null) {
sensor.record();
}
}
@Override
public void record(String sensorName, double val) {
Sensor sensor = globalSensors.get(sensorName);
if (sensor != null) {
sensor.record(val);
}
}
@Override
public TopicPartition topicPartition() {
return this.topicPartition;
}
@Override
public void commitUpTo(long offset) {
this.localGauges.forEach((__, gaugeCounter) -> {
long value;
synchronized (gaugeCounter.timelineLong) {
value = gaugeCounter.timelineLong.get(offset);
}
gaugeCounter.atomicLong.set(value);
});
}
/**
* Called when a generic group's state has changed. Increment/decrement
* the counter accordingly.
*
* @param oldState The previous state. null value means that it's a new group.
* @param newState The next state. null value means that the group has been removed.
*/
public void onGenericGroupStateTransition(GenericGroupState oldState, GenericGroupState newState) {
if (newState != null) {
switch (newState) {
case PREPARING_REBALANCE:
incrementGlobalGauge(NUM_GENERIC_GROUPS_PREPARING_REBALANCE);
break;
case COMPLETING_REBALANCE:
incrementGlobalGauge(NUM_GENERIC_GROUPS_COMPLETING_REBALANCE);
break;
case STABLE:
incrementGlobalGauge(NUM_GENERIC_GROUPS_STABLE);
break;
case DEAD:
incrementGlobalGauge(NUM_GENERIC_GROUPS_DEAD);
break;
case EMPTY:
incrementGlobalGauge(NUM_GENERIC_GROUPS_EMPTY);
}
} else {
decrementLocalGauge(NUM_GENERIC_GROUPS);
}
if (oldState != null) {
switch (oldState) {
case PREPARING_REBALANCE:
decrementGlobalGauge(NUM_GENERIC_GROUPS_PREPARING_REBALANCE);
break;
case COMPLETING_REBALANCE:
decrementGlobalGauge(NUM_GENERIC_GROUPS_COMPLETING_REBALANCE);
break;
case STABLE:
decrementGlobalGauge(NUM_GENERIC_GROUPS_STABLE);
break;
case DEAD:
decrementGlobalGauge(NUM_GENERIC_GROUPS_DEAD);
break;
case EMPTY:
decrementGlobalGauge(NUM_GENERIC_GROUPS_EMPTY);
}
} else {
incrementLocalGauge(NUM_GENERIC_GROUPS);
}
}
/**
* Called when a consumer group's state has changed. Increment/decrement
* the counter accordingly.
*
* @param oldState The previous state. null value means that it's a new group.
* @param newState The next state. null value means that the group has been removed.
*/
public void onConsumerGroupStateTransition(
ConsumerGroup.ConsumerGroupState oldState,
ConsumerGroup.ConsumerGroupState newState
) {
if (newState != null) {
switch (newState) {
case EMPTY:
incrementLocalGauge(NUM_CONSUMER_GROUPS_EMPTY);
break;
case ASSIGNING:
incrementLocalGauge(NUM_CONSUMER_GROUPS_ASSIGNING);
break;
case RECONCILING:
incrementLocalGauge(NUM_CONSUMER_GROUPS_RECONCILING);
break;
case STABLE:
incrementLocalGauge(NUM_CONSUMER_GROUPS_STABLE);
break;
case DEAD:
incrementLocalGauge(NUM_CONSUMER_GROUPS_DEAD);
}
} else {
decrementLocalGauge(NUM_CONSUMER_GROUPS);
}
if (oldState != null) {
switch (oldState) {
case EMPTY:
decrementLocalGauge(NUM_CONSUMER_GROUPS_EMPTY);
break;
case ASSIGNING:
decrementLocalGauge(NUM_CONSUMER_GROUPS_ASSIGNING);
break;
case RECONCILING:
decrementLocalGauge(NUM_CONSUMER_GROUPS_RECONCILING);
break;
case STABLE:
decrementLocalGauge(NUM_CONSUMER_GROUPS_STABLE);
break;
case DEAD:
decrementLocalGauge(NUM_CONSUMER_GROUPS_DEAD);
}
} else {
incrementLocalGauge(NUM_CONSUMER_GROUPS);
}
}
}

View File

@ -25,6 +25,7 @@ import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.coordinator.group.metrics.CoordinatorRuntimeMetrics;
import org.apache.kafka.coordinator.group.metrics.CoordinatorMetrics;
import org.apache.kafka.deferred.DeferredEvent;
import org.apache.kafka.deferred.DeferredEventQueue;
import org.apache.kafka.image.MetadataDelta;
@ -91,6 +92,7 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
private Time time = Time.SYSTEM;
private Timer timer;
private CoordinatorRuntimeMetrics runtimeMetrics;
private CoordinatorMetrics coordinatorMetrics;
public Builder<S, U> withLogPrefix(String logPrefix) {
this.logPrefix = logPrefix;
@ -137,6 +139,11 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
return this;
}
public Builder<S, U> withCoordinatorMetrics(CoordinatorMetrics coordinatorMetrics) {
this.coordinatorMetrics = coordinatorMetrics;
return this;
}
public CoordinatorRuntime<S, U> build() {
if (logPrefix == null)
logPrefix = "";
@ -156,6 +163,8 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
throw new IllegalArgumentException("Timer must be set.");
if (runtimeMetrics == null)
throw new IllegalArgumentException("CoordinatorRuntimeMetrics must be set.");
if (coordinatorMetrics == null)
throw new IllegalArgumentException("CoordinatorMetrics must be set.");
return new CoordinatorRuntime<>(
logPrefix,
@ -166,7 +175,8 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
coordinatorShardBuilderSupplier,
time,
timer,
runtimeMetrics
runtimeMetrics,
coordinatorMetrics
);
}
}
@ -497,6 +507,7 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
lastCommittedOffset = offset;
deferredEventQueue.completeUpTo(offset);
snapshotRegistry.deleteSnapshotsUpTo(offset);
coordinatorMetrics.onUpdateLastCommittedOffset(tp, offset);
}
/**
@ -510,8 +521,8 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
if (!newState.canTransitionFrom(state)) {
throw new IllegalStateException("Cannot transition from " + state + " to " + newState);
}
CoordinatorState oldState = state;
log.debug("Transition from {} to {}.", state, newState);
switch (newState) {
case LOADING:
@ -525,6 +536,8 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
.withSnapshotRegistry(snapshotRegistry)
.withTime(time)
.withTimer(timer)
.withCoordinatorMetrics(coordinatorMetrics)
.withTopicPartition(tp)
.build();
break;
@ -1046,6 +1059,11 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
*/
private final CoordinatorRuntimeMetrics runtimeMetrics;
/**
* The coordinator metrics.
*/
private final CoordinatorMetrics coordinatorMetrics;
/**
* Atomic boolean indicating whether the runtime is running.
*/
@ -1077,7 +1095,8 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
CoordinatorShardBuilderSupplier<S, U> coordinatorShardBuilderSupplier,
Time time,
Timer timer,
CoordinatorRuntimeMetrics runtimeMetrics
CoordinatorRuntimeMetrics runtimeMetrics,
CoordinatorMetrics coordinatorMetrics
) {
this.logPrefix = logPrefix;
this.logContext = logContext;
@ -1091,6 +1110,7 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
this.loader = loader;
this.coordinatorShardBuilderSupplier = coordinatorShardBuilderSupplier;
this.runtimeMetrics = runtimeMetrics;
this.coordinatorMetrics = coordinatorMetrics;
}
/**

View File

@ -16,8 +16,10 @@
*/
package org.apache.kafka.coordinator.group.runtime;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.coordinator.group.metrics.CoordinatorMetrics;
import org.apache.kafka.timeline.SnapshotRegistry;
@ -74,6 +76,28 @@ public interface CoordinatorShardBuilder<S extends CoordinatorShard<U>, U> {
CoordinatorTimer<Void, U> timer
);
/**
* Sets the coordinator metrics.
*
* @param coordinatorMetrics The coordinator metrics.
*
* @return The builder.
*/
CoordinatorShardBuilder<S, U> withCoordinatorMetrics(
CoordinatorMetrics coordinatorMetrics
);
/**
* Sets the topic partition.
*
* @param topicPartition The topic partition.
*
* @return The builder.
*/
CoordinatorShardBuilder<S, U> withTopicPartition(
TopicPartition topicPartition
);
/**
* @return The built coordinator.
*/

View File

@ -62,6 +62,7 @@ import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics;
import org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime;
import org.apache.kafka.server.record.BrokerCompressionType;
import org.apache.kafka.server.util.FutureUtils;
@ -128,7 +129,8 @@ public class GroupCoordinatorServiceTest {
GroupCoordinatorService service = new GroupCoordinatorService(
new LogContext(),
createConfig(),
runtime
runtime,
new GroupCoordinatorMetrics()
);
service.startup(() -> 1);
@ -143,7 +145,8 @@ public class GroupCoordinatorServiceTest {
GroupCoordinatorService service = new GroupCoordinatorService(
new LogContext(),
createConfig(),
runtime
runtime,
new GroupCoordinatorMetrics()
);
ConsumerGroupHeartbeatRequestData request = new ConsumerGroupHeartbeatRequestData()
@ -167,7 +170,8 @@ public class GroupCoordinatorServiceTest {
GroupCoordinatorService service = new GroupCoordinatorService(
new LogContext(),
createConfig(),
runtime
runtime,
new GroupCoordinatorMetrics()
);
ConsumerGroupHeartbeatRequestData request = new ConsumerGroupHeartbeatRequestData()
@ -215,7 +219,8 @@ public class GroupCoordinatorServiceTest {
GroupCoordinatorService service = new GroupCoordinatorService(
new LogContext(),
createConfig(),
runtime
runtime,
new GroupCoordinatorMetrics()
);
ConsumerGroupHeartbeatRequestData request = new ConsumerGroupHeartbeatRequestData()
@ -248,7 +253,8 @@ public class GroupCoordinatorServiceTest {
GroupCoordinatorService service = new GroupCoordinatorService(
new LogContext(),
createConfig(),
runtime
runtime,
new GroupCoordinatorMetrics()
);
assertThrows(CoordinatorNotAvailableException.class,
@ -265,7 +271,8 @@ public class GroupCoordinatorServiceTest {
GroupCoordinatorService service = new GroupCoordinatorService(
new LogContext(),
createConfig(),
runtime
runtime,
new GroupCoordinatorMetrics()
);
Properties expectedProperties = new Properties();
@ -282,7 +289,8 @@ public class GroupCoordinatorServiceTest {
GroupCoordinatorService service = new GroupCoordinatorService(
new LogContext(),
createConfig(),
runtime
runtime,
new GroupCoordinatorMetrics()
);
assertThrows(CoordinatorNotAvailableException.class,
@ -303,7 +311,8 @@ public class GroupCoordinatorServiceTest {
GroupCoordinatorService service = new GroupCoordinatorService(
new LogContext(),
createConfig(),
runtime
runtime,
new GroupCoordinatorMetrics()
);
assertThrows(CoordinatorNotAvailableException.class,
@ -324,7 +333,8 @@ public class GroupCoordinatorServiceTest {
GroupCoordinatorService service = new GroupCoordinatorService(
new LogContext(),
createConfig(),
runtime
runtime,
new GroupCoordinatorMetrics()
);
JoinGroupRequestData request = new JoinGroupRequestData()
@ -355,7 +365,8 @@ public class GroupCoordinatorServiceTest {
GroupCoordinatorService service = new GroupCoordinatorService(
new LogContext(),
createConfig(),
runtime
runtime,
new GroupCoordinatorMetrics()
);
JoinGroupRequestData request = new JoinGroupRequestData()
@ -388,7 +399,8 @@ public class GroupCoordinatorServiceTest {
GroupCoordinatorService service = new GroupCoordinatorService(
new LogContext(),
createConfig(),
runtime
runtime,
new GroupCoordinatorMetrics()
);
service.startup(() -> 1);
@ -433,7 +445,8 @@ public class GroupCoordinatorServiceTest {
GroupCoordinatorService service = new GroupCoordinatorService(
new LogContext(),
createConfig(),
runtime
runtime,
new GroupCoordinatorMetrics()
);
JoinGroupRequestData request = new JoinGroupRequestData()
@ -458,7 +471,8 @@ public class GroupCoordinatorServiceTest {
GroupCoordinatorService service = new GroupCoordinatorService(
new LogContext(),
createConfig(),
runtime
runtime,
new GroupCoordinatorMetrics()
);
SyncGroupRequestData request = new SyncGroupRequestData()
@ -489,7 +503,8 @@ public class GroupCoordinatorServiceTest {
GroupCoordinatorService service = new GroupCoordinatorService(
new LogContext(),
createConfig(),
runtime
runtime,
new GroupCoordinatorMetrics()
);
SyncGroupRequestData request = new SyncGroupRequestData()
@ -523,7 +538,8 @@ public class GroupCoordinatorServiceTest {
GroupCoordinatorService service = new GroupCoordinatorService(
new LogContext(),
createConfig(),
runtime
runtime,
new GroupCoordinatorMetrics()
);
service.startup(() -> 1);
@ -551,7 +567,8 @@ public class GroupCoordinatorServiceTest {
GroupCoordinatorService service = new GroupCoordinatorService(
new LogContext(),
createConfig(),
runtime
runtime,
new GroupCoordinatorMetrics()
);
SyncGroupRequestData request = new SyncGroupRequestData()
@ -576,7 +593,8 @@ public class GroupCoordinatorServiceTest {
GroupCoordinatorService service = new GroupCoordinatorService(
new LogContext(),
createConfig(),
runtime
runtime,
new GroupCoordinatorMetrics()
);
HeartbeatRequestData request = new HeartbeatRequestData()
@ -607,7 +625,8 @@ public class GroupCoordinatorServiceTest {
GroupCoordinatorService service = new GroupCoordinatorService(
new LogContext(),
createConfig(),
runtime
runtime,
new GroupCoordinatorMetrics()
);
HeartbeatRequestData request = new HeartbeatRequestData()
@ -638,7 +657,8 @@ public class GroupCoordinatorServiceTest {
GroupCoordinatorService service = new GroupCoordinatorService(
new LogContext(),
createConfig(),
runtime
runtime,
new GroupCoordinatorMetrics()
);
HeartbeatRequestData request = new HeartbeatRequestData()
@ -672,7 +692,8 @@ public class GroupCoordinatorServiceTest {
GroupCoordinatorService service = new GroupCoordinatorService(
new LogContext(),
createConfig(),
runtime
runtime,
new GroupCoordinatorMetrics()
);
HeartbeatRequestData request = new HeartbeatRequestData()
@ -696,7 +717,8 @@ public class GroupCoordinatorServiceTest {
GroupCoordinatorService service = new GroupCoordinatorService(
new LogContext(),
createConfig(),
runtime
runtime,
new GroupCoordinatorMetrics()
);
int partitionCount = 3;
service.startup(() -> partitionCount);
@ -746,7 +768,8 @@ public class GroupCoordinatorServiceTest {
GroupCoordinatorService service = new GroupCoordinatorService(
new LogContext(),
createConfig(),
runtime
runtime,
new GroupCoordinatorMetrics()
);
int partitionCount = 3;
service.startup(() -> partitionCount);
@ -797,7 +820,8 @@ public class GroupCoordinatorServiceTest {
GroupCoordinatorService service = new GroupCoordinatorService(
new LogContext(),
createConfig(),
runtime
runtime,
new GroupCoordinatorMetrics()
);
int partitionCount = 3;
service.startup(() -> partitionCount);
@ -838,7 +862,8 @@ public class GroupCoordinatorServiceTest {
GroupCoordinatorService service = new GroupCoordinatorService(
new LogContext(),
createConfig(),
runtime
runtime,
new GroupCoordinatorMetrics()
);
int partitionCount = 0;
service.startup(() -> partitionCount);
@ -862,7 +887,8 @@ public class GroupCoordinatorServiceTest {
GroupCoordinatorService service = new GroupCoordinatorService(
new LogContext(),
createConfig(),
runtime
runtime,
new GroupCoordinatorMetrics()
);
ListGroupsRequestData request = new ListGroupsRequestData();
@ -885,7 +911,8 @@ public class GroupCoordinatorServiceTest {
GroupCoordinatorService service = new GroupCoordinatorService(
new LogContext(),
createConfig(),
runtime
runtime,
mock(GroupCoordinatorMetrics.class)
);
int partitionCount = 2;
service.startup(() -> partitionCount);
@ -926,7 +953,8 @@ public class GroupCoordinatorServiceTest {
GroupCoordinatorService service = new GroupCoordinatorService(
new LogContext(),
createConfig(),
runtime
runtime,
mock(GroupCoordinatorMetrics.class)
);
int partitionCount = 1;
service.startup(() -> partitionCount);
@ -958,7 +986,8 @@ public class GroupCoordinatorServiceTest {
GroupCoordinatorService service = new GroupCoordinatorService(
new LogContext(),
createConfig(),
runtime
runtime,
mock(GroupCoordinatorMetrics.class)
);
int partitionCount = 1;
service.startup(() -> partitionCount);
@ -989,7 +1018,8 @@ public class GroupCoordinatorServiceTest {
GroupCoordinatorService service = new GroupCoordinatorService(
new LogContext(),
createConfig(),
runtime
runtime,
new GroupCoordinatorMetrics()
);
CompletableFuture<List<DescribeGroupsResponseData.DescribedGroup>> future = service.describeGroups(
@ -1015,7 +1045,8 @@ public class GroupCoordinatorServiceTest {
GroupCoordinatorService service = new GroupCoordinatorService(
new LogContext(),
createConfig(),
runtime
runtime,
new GroupCoordinatorMetrics()
);
service.startup(() -> 1);
@ -1068,7 +1099,8 @@ public class GroupCoordinatorServiceTest {
GroupCoordinatorService service = new GroupCoordinatorService(
new LogContext(),
createConfig(),
runtime
runtime,
new GroupCoordinatorMetrics()
);
OffsetFetchRequestData.OffsetFetchRequestGroup request =
@ -1101,7 +1133,8 @@ public class GroupCoordinatorServiceTest {
GroupCoordinatorService service = new GroupCoordinatorService(
new LogContext(),
createConfig(),
runtime
runtime,
new GroupCoordinatorMetrics()
);
service.startup(() -> 1);
@ -1151,7 +1184,8 @@ public class GroupCoordinatorServiceTest {
GroupCoordinatorService service = new GroupCoordinatorService(
new LogContext(),
createConfig(),
runtime
runtime,
new GroupCoordinatorMetrics()
);
OffsetFetchRequestData.OffsetFetchRequestGroup request =
@ -1178,7 +1212,8 @@ public class GroupCoordinatorServiceTest {
GroupCoordinatorService service = new GroupCoordinatorService(
new LogContext(),
createConfig(),
runtime
runtime,
new GroupCoordinatorMetrics()
);
LeaveGroupRequestData request = new LeaveGroupRequestData()
@ -1209,7 +1244,8 @@ public class GroupCoordinatorServiceTest {
GroupCoordinatorService service = new GroupCoordinatorService(
new LogContext(),
createConfig(),
runtime
runtime,
new GroupCoordinatorMetrics()
);
LeaveGroupRequestData request = new LeaveGroupRequestData()
@ -1261,7 +1297,8 @@ public class GroupCoordinatorServiceTest {
GroupCoordinatorService service = new GroupCoordinatorService(
new LogContext(),
createConfig(),
runtime
runtime,
new GroupCoordinatorMetrics()
);
LeaveGroupRequestData request = new LeaveGroupRequestData()
@ -1285,7 +1322,8 @@ public class GroupCoordinatorServiceTest {
GroupCoordinatorService service = new GroupCoordinatorService(
new LogContext(),
createConfig(),
runtime
runtime,
mock(GroupCoordinatorMetrics.class)
);
service.startup(() -> 1);
@ -1334,7 +1372,8 @@ public class GroupCoordinatorServiceTest {
GroupCoordinatorService service = new GroupCoordinatorService(
new LogContext(),
createConfig(),
runtime
runtime,
mock(GroupCoordinatorMetrics.class)
);
service.startup(() -> 1);
@ -1378,7 +1417,8 @@ public class GroupCoordinatorServiceTest {
GroupCoordinatorService service = new GroupCoordinatorService(
new LogContext(),
createConfig(),
runtime
runtime,
mock(GroupCoordinatorMetrics.class)
);
service.startup(() -> 1);
@ -1419,7 +1459,8 @@ public class GroupCoordinatorServiceTest {
GroupCoordinatorService service = new GroupCoordinatorService(
new LogContext(),
createConfig(),
runtime
runtime,
new GroupCoordinatorMetrics()
);
OffsetDeleteRequestData request = new OffsetDeleteRequestData()
@ -1444,7 +1485,8 @@ public class GroupCoordinatorServiceTest {
GroupCoordinatorService service = new GroupCoordinatorService(
new LogContext(),
createConfig(),
runtime
runtime,
mock(GroupCoordinatorMetrics.class)
);
service.startup(() -> 3);
@ -1519,7 +1561,8 @@ public class GroupCoordinatorServiceTest {
GroupCoordinatorService service = new GroupCoordinatorService(
new LogContext(),
createConfig(),
runtime
runtime,
mock(GroupCoordinatorMetrics.class)
);
service.startup(() -> 1);
@ -1552,7 +1595,8 @@ public class GroupCoordinatorServiceTest {
GroupCoordinatorService service = new GroupCoordinatorService(
new LogContext(),
createConfig(),
runtime
runtime,
mock(GroupCoordinatorMetrics.class)
);
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection> future = service.deleteGroups(

View File

@ -43,6 +43,8 @@ import org.apache.kafka.coordinator.group.generated.GroupMetadataKey;
import org.apache.kafka.coordinator.group.generated.GroupMetadataValue;
import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
import org.apache.kafka.coordinator.group.metrics.CoordinatorMetrics;
import org.apache.kafka.coordinator.group.metrics.CoordinatorMetricsShard;
import org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.server.common.ApiMessageAndVersion;
@ -80,12 +82,16 @@ public class GroupCoordinatorShardTest {
public void testConsumerGroupHeartbeat() {
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class);
CoordinatorMetricsShard metricsShard = mock(CoordinatorMetricsShard.class);
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
new LogContext(),
groupMetadataManager,
offsetMetadataManager,
new MockCoordinatorTimer<>(new MockTime()),
mock(GroupCoordinatorConfig.class)
mock(GroupCoordinatorConfig.class),
coordinatorMetrics,
metricsShard
);
RequestContext context = requestContext(ApiKeys.CONSUMER_GROUP_HEARTBEAT);
@ -107,12 +113,16 @@ public class GroupCoordinatorShardTest {
public void testCommitOffset() {
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class);
CoordinatorMetricsShard metricsShard = mock(CoordinatorMetricsShard.class);
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
new LogContext(),
groupMetadataManager,
offsetMetadataManager,
new MockCoordinatorTimer<>(new MockTime()),
mock(GroupCoordinatorConfig.class)
mock(GroupCoordinatorConfig.class),
coordinatorMetrics,
metricsShard
);
RequestContext context = requestContext(ApiKeys.OFFSET_COMMIT);
@ -139,7 +149,9 @@ public class GroupCoordinatorShardTest {
groupMetadataManager,
offsetMetadataManager,
new MockCoordinatorTimer<>(new MockTime()),
mock(GroupCoordinatorConfig.class)
mock(GroupCoordinatorConfig.class),
mock(CoordinatorMetrics.class),
mock(CoordinatorMetricsShard.class)
);
RequestContext context = requestContext(ApiKeys.DELETE_GROUPS);
@ -193,7 +205,9 @@ public class GroupCoordinatorShardTest {
groupMetadataManager,
offsetMetadataManager,
new MockCoordinatorTimer<>(new MockTime()),
mock(GroupCoordinatorConfig.class)
mock(GroupCoordinatorConfig.class),
mock(CoordinatorMetrics.class),
mock(CoordinatorMetricsShard.class)
);
RequestContext context = requestContext(ApiKeys.DELETE_GROUPS);
@ -253,12 +267,16 @@ public class GroupCoordinatorShardTest {
public void testReplayOffsetCommit() {
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class);
CoordinatorMetricsShard metricsShard = mock(CoordinatorMetricsShard.class);
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
new LogContext(),
groupMetadataManager,
offsetMetadataManager,
new MockCoordinatorTimer<>(new MockTime()),
mock(GroupCoordinatorConfig.class)
mock(GroupCoordinatorConfig.class),
coordinatorMetrics,
metricsShard
);
OffsetCommitKey key = new OffsetCommitKey();
@ -281,12 +299,16 @@ public class GroupCoordinatorShardTest {
public void testReplayOffsetCommitWithNullValue() {
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class);
CoordinatorMetricsShard metricsShard = mock(CoordinatorMetricsShard.class);
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
new LogContext(),
groupMetadataManager,
offsetMetadataManager,
new MockCoordinatorTimer<>(new MockTime()),
mock(GroupCoordinatorConfig.class)
mock(GroupCoordinatorConfig.class),
coordinatorMetrics,
metricsShard
);
OffsetCommitKey key = new OffsetCommitKey();
@ -308,12 +330,16 @@ public class GroupCoordinatorShardTest {
public void testReplayConsumerGroupMetadata() {
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class);
CoordinatorMetricsShard metricsShard = mock(CoordinatorMetricsShard.class);
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
new LogContext(),
groupMetadataManager,
offsetMetadataManager,
new MockCoordinatorTimer<>(new MockTime()),
mock(GroupCoordinatorConfig.class)
mock(GroupCoordinatorConfig.class),
coordinatorMetrics,
metricsShard
);
ConsumerGroupMetadataKey key = new ConsumerGroupMetadataKey();
@ -331,12 +357,16 @@ public class GroupCoordinatorShardTest {
public void testReplayConsumerGroupMetadataWithNullValue() {
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class);
CoordinatorMetricsShard metricsShard = mock(CoordinatorMetricsShard.class);
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
new LogContext(),
groupMetadataManager,
offsetMetadataManager,
new MockCoordinatorTimer<>(new MockTime()),
mock(GroupCoordinatorConfig.class)
mock(GroupCoordinatorConfig.class),
coordinatorMetrics,
metricsShard
);
ConsumerGroupMetadataKey key = new ConsumerGroupMetadataKey();
@ -353,12 +383,16 @@ public class GroupCoordinatorShardTest {
public void testReplayConsumerGroupPartitionMetadata() {
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class);
CoordinatorMetricsShard metricsShard = mock(CoordinatorMetricsShard.class);
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
new LogContext(),
groupMetadataManager,
offsetMetadataManager,
new MockCoordinatorTimer<>(new MockTime()),
mock(GroupCoordinatorConfig.class)
mock(GroupCoordinatorConfig.class),
coordinatorMetrics,
metricsShard
);
ConsumerGroupPartitionMetadataKey key = new ConsumerGroupPartitionMetadataKey();
@ -376,12 +410,16 @@ public class GroupCoordinatorShardTest {
public void testReplayConsumerGroupPartitionMetadataWithNullValue() {
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class);
CoordinatorMetricsShard metricsShard = mock(CoordinatorMetricsShard.class);
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
new LogContext(),
groupMetadataManager,
offsetMetadataManager,
new MockCoordinatorTimer<>(new MockTime()),
mock(GroupCoordinatorConfig.class)
mock(GroupCoordinatorConfig.class),
coordinatorMetrics,
metricsShard
);
ConsumerGroupPartitionMetadataKey key = new ConsumerGroupPartitionMetadataKey();
@ -398,12 +436,16 @@ public class GroupCoordinatorShardTest {
public void testReplayConsumerGroupMemberMetadata() {
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class);
CoordinatorMetricsShard metricsShard = mock(CoordinatorMetricsShard.class);
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
new LogContext(),
groupMetadataManager,
offsetMetadataManager,
new MockCoordinatorTimer<>(new MockTime()),
mock(GroupCoordinatorConfig.class)
mock(GroupCoordinatorConfig.class),
coordinatorMetrics,
metricsShard
);
ConsumerGroupMemberMetadataKey key = new ConsumerGroupMemberMetadataKey();
@ -421,12 +463,16 @@ public class GroupCoordinatorShardTest {
public void testReplayConsumerGroupMemberMetadataWithNullValue() {
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class);
CoordinatorMetricsShard metricsShard = mock(CoordinatorMetricsShard.class);
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
new LogContext(),
groupMetadataManager,
offsetMetadataManager,
new MockCoordinatorTimer<>(new MockTime()),
mock(GroupCoordinatorConfig.class)
mock(GroupCoordinatorConfig.class),
coordinatorMetrics,
metricsShard
);
ConsumerGroupMemberMetadataKey key = new ConsumerGroupMemberMetadataKey();
@ -443,12 +489,16 @@ public class GroupCoordinatorShardTest {
public void testReplayConsumerGroupTargetAssignmentMetadata() {
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class);
CoordinatorMetricsShard metricsShard = mock(CoordinatorMetricsShard.class);
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
new LogContext(),
groupMetadataManager,
offsetMetadataManager,
new MockCoordinatorTimer<>(new MockTime()),
mock(GroupCoordinatorConfig.class)
mock(GroupCoordinatorConfig.class),
coordinatorMetrics,
metricsShard
);
ConsumerGroupTargetAssignmentMetadataKey key = new ConsumerGroupTargetAssignmentMetadataKey();
@ -466,12 +516,16 @@ public class GroupCoordinatorShardTest {
public void testReplayConsumerGroupTargetAssignmentMetadataWithNullValue() {
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class);
CoordinatorMetricsShard metricsShard = mock(CoordinatorMetricsShard.class);
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
new LogContext(),
groupMetadataManager,
offsetMetadataManager,
new MockCoordinatorTimer<>(new MockTime()),
mock(GroupCoordinatorConfig.class)
mock(GroupCoordinatorConfig.class),
coordinatorMetrics,
metricsShard
);
ConsumerGroupTargetAssignmentMetadataKey key = new ConsumerGroupTargetAssignmentMetadataKey();
@ -488,12 +542,16 @@ public class GroupCoordinatorShardTest {
public void testReplayConsumerGroupTargetAssignmentMember() {
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class);
CoordinatorMetricsShard metricsShard = mock(CoordinatorMetricsShard.class);
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
new LogContext(),
groupMetadataManager,
offsetMetadataManager,
new MockCoordinatorTimer<>(new MockTime()),
mock(GroupCoordinatorConfig.class)
mock(GroupCoordinatorConfig.class),
coordinatorMetrics,
metricsShard
);
ConsumerGroupTargetAssignmentMemberKey key = new ConsumerGroupTargetAssignmentMemberKey();
@ -511,12 +569,16 @@ public class GroupCoordinatorShardTest {
public void testReplayConsumerGroupTargetAssignmentMemberKeyWithNullValue() {
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class);
CoordinatorMetricsShard metricsShard = mock(CoordinatorMetricsShard.class);
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
new LogContext(),
groupMetadataManager,
offsetMetadataManager,
new MockCoordinatorTimer<>(new MockTime()),
mock(GroupCoordinatorConfig.class)
mock(GroupCoordinatorConfig.class),
coordinatorMetrics,
metricsShard
);
ConsumerGroupTargetAssignmentMemberKey key = new ConsumerGroupTargetAssignmentMemberKey();
@ -533,12 +595,16 @@ public class GroupCoordinatorShardTest {
public void testReplayConsumerGroupCurrentMemberAssignment() {
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class);
CoordinatorMetricsShard metricsShard = mock(CoordinatorMetricsShard.class);
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
new LogContext(),
groupMetadataManager,
offsetMetadataManager,
new MockCoordinatorTimer<>(new MockTime()),
mock(GroupCoordinatorConfig.class)
mock(GroupCoordinatorConfig.class),
coordinatorMetrics,
metricsShard
);
ConsumerGroupCurrentMemberAssignmentKey key = new ConsumerGroupCurrentMemberAssignmentKey();
@ -556,12 +622,16 @@ public class GroupCoordinatorShardTest {
public void testReplayConsumerGroupCurrentMemberAssignmentWithNullValue() {
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class);
CoordinatorMetricsShard metricsShard = mock(CoordinatorMetricsShard.class);
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
new LogContext(),
groupMetadataManager,
offsetMetadataManager,
new MockCoordinatorTimer<>(new MockTime()),
mock(GroupCoordinatorConfig.class)
mock(GroupCoordinatorConfig.class),
coordinatorMetrics,
metricsShard
);
ConsumerGroupCurrentMemberAssignmentKey key = new ConsumerGroupCurrentMemberAssignmentKey();
@ -578,12 +648,16 @@ public class GroupCoordinatorShardTest {
public void testReplayKeyCannotBeNull() {
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class);
CoordinatorMetricsShard metricsShard = mock(CoordinatorMetricsShard.class);
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
new LogContext(),
groupMetadataManager,
offsetMetadataManager,
new MockCoordinatorTimer<>(new MockTime()),
mock(GroupCoordinatorConfig.class)
mock(GroupCoordinatorConfig.class),
coordinatorMetrics,
metricsShard
);
assertThrows(NullPointerException.class, () -> coordinator.replay(new Record(null, null)));
@ -593,12 +667,16 @@ public class GroupCoordinatorShardTest {
public void testReplayWithUnsupportedVersion() {
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class);
CoordinatorMetricsShard metricsShard = mock(CoordinatorMetricsShard.class);
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
new LogContext(),
groupMetadataManager,
offsetMetadataManager,
new MockCoordinatorTimer<>(new MockTime()),
mock(GroupCoordinatorConfig.class)
mock(GroupCoordinatorConfig.class),
coordinatorMetrics,
metricsShard
);
ConsumerGroupCurrentMemberAssignmentKey key = new ConsumerGroupCurrentMemberAssignmentKey();
@ -615,12 +693,16 @@ public class GroupCoordinatorShardTest {
MetadataImage image = MetadataImage.EMPTY;
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class);
CoordinatorMetricsShard metricsShard = mock(CoordinatorMetricsShard.class);
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
new LogContext(),
groupMetadataManager,
offsetMetadataManager,
new MockCoordinatorTimer<>(new MockTime()),
mock(GroupCoordinatorConfig.class)
mock(GroupCoordinatorConfig.class),
coordinatorMetrics,
metricsShard
);
coordinator.onLoaded(image);
@ -637,12 +719,16 @@ public class GroupCoordinatorShardTest {
public void testReplayGroupMetadata() {
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class);
CoordinatorMetricsShard metricsShard = mock(CoordinatorMetricsShard.class);
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
new LogContext(),
groupMetadataManager,
offsetMetadataManager,
new MockCoordinatorTimer<>(new MockTime()),
mock(GroupCoordinatorConfig.class)
mock(GroupCoordinatorConfig.class),
coordinatorMetrics,
metricsShard
);
GroupMetadataKey key = new GroupMetadataKey();
@ -660,12 +746,16 @@ public class GroupCoordinatorShardTest {
public void testReplayGroupMetadataWithNullValue() {
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class);
CoordinatorMetricsShard metricsShard = mock(CoordinatorMetricsShard.class);
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
new LogContext(),
groupMetadataManager,
offsetMetadataManager,
new MockCoordinatorTimer<>(new MockTime()),
mock(GroupCoordinatorConfig.class)
mock(GroupCoordinatorConfig.class),
coordinatorMetrics,
metricsShard
);
GroupMetadataKey key = new GroupMetadataKey();
@ -689,7 +779,9 @@ public class GroupCoordinatorShardTest {
groupMetadataManager,
offsetMetadataManager,
timer,
GroupCoordinatorConfigTest.createGroupCoordinatorConfig(4096, 1000L, 24 * 60)
GroupCoordinatorConfigTest.createGroupCoordinatorConfig(4096, 1000L, 24 * 60),
mock(CoordinatorMetrics.class),
mock(CoordinatorMetricsShard.class)
);
MetadataImage image = MetadataImage.EMPTY;
@ -717,7 +809,9 @@ public class GroupCoordinatorShardTest {
groupMetadataManager,
offsetMetadataManager,
timer,
mock(GroupCoordinatorConfig.class)
mock(GroupCoordinatorConfig.class),
mock(CoordinatorMetrics.class),
mock(CoordinatorMetricsShard.class)
);
Record offsetCommitTombstone = RecordHelpers.newOffsetCommitTombstoneRecord("group-id", "topic", 0);

View File

@ -90,6 +90,7 @@ import org.apache.kafka.coordinator.group.generated.GroupMetadataValue;
import org.apache.kafka.coordinator.group.generic.GenericGroup;
import org.apache.kafka.coordinator.group.generic.GenericGroupMember;
import org.apache.kafka.coordinator.group.generic.GenericGroupState;
import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard;
import org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
@ -141,6 +142,9 @@ import static org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD;
import static org.apache.kafka.coordinator.group.generic.GenericGroupState.EMPTY;
import static org.apache.kafka.coordinator.group.generic.GenericGroupState.PREPARING_REBALANCE;
import static org.apache.kafka.coordinator.group.generic.GenericGroupState.STABLE;
import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.GENERIC_GROUP_COMPLETED_REBALANCES_SENSOR_NAME;
import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.CONSUMER_GROUP_REBALANCES_SENSOR_NAME;
import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.GENERIC_GROUP_REBALANCES_SENSOR_NAME;
import static org.junit.jupiter.api.AssertionFailureBuilder.assertionFailure;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@ -152,6 +156,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class GroupMetadataManagerTest {
@ -317,6 +322,7 @@ public class GroupMetadataManagerTest {
final private int genericGroupNewMemberJoinTimeoutMs = 5 * 60 * 1000;
private int genericGroupMinSessionTimeoutMs = 10;
private int genericGroupMaxSessionTimeoutMs = 10 * 60 * 1000;
private GroupCoordinatorMetricsShard metrics = mock(GroupCoordinatorMetricsShard.class);
public Builder withMetadataImage(MetadataImage metadataImage) {
this.metadataImage = metadataImage;
@ -371,6 +377,7 @@ public class GroupMetadataManagerTest {
time,
timer,
snapshotRegistry,
metrics,
new GroupMetadataManager.Builder()
.withSnapshotRegistry(snapshotRegistry)
.withLogContext(logContext)
@ -387,6 +394,7 @@ public class GroupMetadataManagerTest {
.withGenericGroupMaxSessionTimeoutMs(genericGroupMaxSessionTimeoutMs)
.withGenericGroupInitialRebalanceDelayMs(genericGroupInitialRebalanceDelayMs)
.withGenericGroupNewMemberJoinTimeoutMs(genericGroupNewMemberJoinTimeoutMs)
.withGroupCoordinatorMetricsShard(metrics)
.build(),
genericGroupInitialRebalanceDelayMs,
genericGroupNewMemberJoinTimeoutMs
@ -405,6 +413,7 @@ public class GroupMetadataManagerTest {
final MockTime time;
final MockCoordinatorTimer<Void, Record> timer;
final SnapshotRegistry snapshotRegistry;
final GroupCoordinatorMetricsShard metrics;
final GroupMetadataManager groupMetadataManager;
final int genericGroupInitialRebalanceDelayMs;
final int genericGroupNewMemberJoinTimeoutMs;
@ -416,6 +425,7 @@ public class GroupMetadataManagerTest {
MockTime time,
MockCoordinatorTimer<Void, Record> timer,
SnapshotRegistry snapshotRegistry,
GroupCoordinatorMetricsShard metrics,
GroupMetadataManager groupMetadataManager,
int genericGroupInitialRebalanceDelayMs,
int genericGroupNewMemberJoinTimeoutMs
@ -423,6 +433,7 @@ public class GroupMetadataManagerTest {
this.time = time;
this.timer = timer;
this.snapshotRegistry = snapshotRegistry;
this.metrics = metrics;
this.groupMetadataManager = groupMetadataManager;
this.genericGroupInitialRebalanceDelayMs = genericGroupInitialRebalanceDelayMs;
this.genericGroupNewMemberJoinTimeoutMs = genericGroupNewMemberJoinTimeoutMs;
@ -3829,6 +3840,12 @@ public class GroupMetadataManagerTest {
"group-id",
STABLE,
context.time,
new GroupCoordinatorMetricsShard(
context.snapshotRegistry,
Collections.emptyMap(),
Collections.emptyMap(),
new TopicPartition("__consumer_offsets", 0)
),
1,
Optional.of("consumer"),
Optional.of("range"),
@ -9557,6 +9574,71 @@ public class GroupMetadataManagerTest {
assertEquals(Collections.emptyList(), records);
}
@Test
public void testGenericGroupCompletedRebalanceSensor() throws Exception {
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.build();
context.joinGenericGroupAsDynamicMemberAndCompleteRebalance("group-id");
verify(context.metrics).record(GENERIC_GROUP_COMPLETED_REBALANCES_SENSOR_NAME);
}
@Test
public void testGenericGroupRebalanceSensor() throws Exception {
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.build();
context.createGenericGroup("group-id");
context.joinGenericGroupAsDynamicMemberAndCompleteJoin(
new JoinGroupRequestBuilder()
.withGroupId("group-id")
.withMemberId(UNKNOWN_MEMBER_ID)
.withDefaultProtocolTypeAndProtocols()
.build()
);
verify(context.metrics).record(GENERIC_GROUP_REBALANCES_SENSOR_NAME);
}
@Test
public void testConsumerGroupRebalanceSensor() {
String groupId = "fooup";
// Use a static member id as it makes the test easier.
String memberId = Uuid.randomUuid().toString();
Uuid fooTopicId = Uuid.randomUuid();
String fooTopicName = "foo";
Uuid barTopicId = Uuid.randomUuid();
String barTopicName = "bar";
MockPartitionAssignor assignor = new MockPartitionAssignor("range");
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.withAssignors(Collections.singletonList(assignor))
.withMetadataImage(new MetadataImageBuilder()
.addTopic(fooTopicId, fooTopicName, 6)
.addTopic(barTopicId, barTopicName, 3)
.addRacks()
.build())
.build();
assignor.prepareGroupAssignment(new GroupAssignment(
Collections.singletonMap(memberId, new MemberAssignment(mkAssignment(
mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5),
mkTopicAssignment(barTopicId, 0, 1, 2)
)))
));
context.consumerGroupHeartbeat(
new ConsumerGroupHeartbeatRequestData()
.setGroupId(groupId)
.setMemberId(memberId)
.setMemberEpoch(0)
.setServerAssignor("range")
.setRebalanceTimeoutMs(5000)
.setSubscribedTopicNames(Arrays.asList("foo", "bar"))
.setTopicPartitions(Collections.emptyList()));
verify(context.metrics).record(CONSUMER_GROUP_REBALANCES_SENSOR_NAME);
}
private static void assertNoOrEmptyResult(List<ExpiredTimeout<Void, Record>> timeouts) {
assertTrue(timeouts.size() <= 1);
timeouts.forEach(timeout -> assertEquals(EMPTY_RESULT, timeout.result));

View File

@ -51,6 +51,7 @@ import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
import org.apache.kafka.coordinator.group.generic.GenericGroup;
import org.apache.kafka.coordinator.group.generic.GenericGroupMember;
import org.apache.kafka.coordinator.group.generic.GenericGroupState;
import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard;
import org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.server.common.ApiMessageAndVersion;
@ -70,6 +71,9 @@ import java.util.OptionalInt;
import java.util.OptionalLong;
import static org.apache.kafka.common.requests.OffsetFetchResponse.INVALID_OFFSET;
import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.OFFSET_COMMITS_SENSOR_NAME;
import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.OFFSET_DELETIONS_SENSOR_NAME;
import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.OFFSET_EXPIRED_SENSOR_NAME;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@ -77,6 +81,8 @@ import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class OffsetMetadataManagerTest {
@ -89,6 +95,7 @@ public class OffsetMetadataManagerTest {
private GroupMetadataManager groupMetadataManager = null;
private MetadataImage metadataImage = null;
private GroupCoordinatorConfig config = null;
private GroupCoordinatorMetricsShard metrics = mock(GroupCoordinatorMetricsShard.class);
Builder withOffsetMetadataMaxSize(int offsetMetadataMaxSize) {
config = GroupCoordinatorConfigTest.createGroupCoordinatorConfig(offsetMetadataMaxSize, 60000L, 24 * 60 * 1000);
@ -119,6 +126,7 @@ public class OffsetMetadataManagerTest {
.withLogContext(logContext)
.withMetadataImage(metadataImage)
.withConsumerGroupAssignors(Collections.singletonList(new RangeAssignor()))
.withGroupCoordinatorMetricsShard(metrics)
.build();
}
@ -129,12 +137,14 @@ public class OffsetMetadataManagerTest {
.withMetadataImage(metadataImage)
.withGroupMetadataManager(groupMetadataManager)
.withGroupCoordinatorConfig(config)
.withGroupCoordinatorMetricsShard(metrics)
.build();
return new OffsetMetadataManagerTestContext(
time,
timer,
snapshotRegistry,
metrics,
groupMetadataManager,
offsetMetadataManager
);
@ -144,6 +154,7 @@ public class OffsetMetadataManagerTest {
final MockTime time;
final MockCoordinatorTimer<Void, Record> timer;
final SnapshotRegistry snapshotRegistry;
final GroupCoordinatorMetricsShard metrics;
final GroupMetadataManager groupMetadataManager;
final OffsetMetadataManager offsetMetadataManager;
@ -154,12 +165,14 @@ public class OffsetMetadataManagerTest {
MockTime time,
MockCoordinatorTimer<Void, Record> timer,
SnapshotRegistry snapshotRegistry,
GroupCoordinatorMetricsShard metrics,
GroupMetadataManager groupMetadataManager,
OffsetMetadataManager offsetMetadataManager
) {
this.time = time;
this.timer = timer;
this.snapshotRegistry = snapshotRegistry;
this.metrics = metrics;
this.groupMetadataManager = groupMetadataManager;
this.offsetMetadataManager = offsetMetadataManager;
}
@ -1986,6 +1999,128 @@ public class OffsetMetadataManagerTest {
assertNull(context.offsetMetadataManager.offset("foo", "bar", 0));
}
@Test
public void testOffsetCommitsSensor() {
OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build();
// Create an empty group.
GenericGroup group = context.groupMetadataManager.getOrMaybeCreateGenericGroup(
"foo",
true
);
// Add member.
group.add(mkGenericMember("member", Optional.of("new-instance-id")));
// Transition to next generation.
group.transitionTo(GenericGroupState.PREPARING_REBALANCE);
group.initNextGeneration();
assertEquals(1, group.generationId());
group.transitionTo(GenericGroupState.STABLE);
CoordinatorResult<OffsetCommitResponseData, Record> result = context.commitOffset(
new OffsetCommitRequestData()
.setGroupId("foo")
.setMemberId("member")
.setGenerationIdOrMemberEpoch(1)
.setRetentionTimeMs(1234L)
.setTopics(Collections.singletonList(
new OffsetCommitRequestData.OffsetCommitRequestTopic()
.setName("bar")
.setPartitions(Arrays.asList(
new OffsetCommitRequestData.OffsetCommitRequestPartition()
.setPartitionIndex(0)
.setCommittedOffset(100L),
new OffsetCommitRequestData.OffsetCommitRequestPartition()
.setPartitionIndex(1)
.setCommittedOffset(150L)
))
))
);
verify(context.metrics).record(OFFSET_COMMITS_SENSOR_NAME, 2);
}
@Test
public void testOffsetsExpiredSensor() {
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
Group group = mock(Group.class);
OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder()
.withGroupMetadataManager(groupMetadataManager)
.withOffsetsRetentionMs(1000)
.build();
long commitTimestamp = context.time.milliseconds();
context.commitOffset("group-id", "firstTopic", 0, 100L, 0, commitTimestamp);
context.commitOffset("group-id", "secondTopic", 0, 100L, 0, commitTimestamp);
context.commitOffset("group-id", "secondTopic", 1, 100L, 0, commitTimestamp + 500);
context.time.sleep(1000);
// firstTopic-0: group is still subscribed to firstTopic. Do not expire.
// secondTopic-0: should expire as offset retention has passed.
// secondTopic-1: has not passed offset retention. Do not expire.
List<Record> expectedRecords = Collections.singletonList(
RecordHelpers.newOffsetCommitTombstoneRecord("group-id", "secondTopic", 0)
);
when(groupMetadataManager.group("group-id")).thenReturn(group);
when(group.offsetExpirationCondition()).thenReturn(Optional.of(
new OffsetExpirationConditionImpl(offsetAndMetadata -> offsetAndMetadata.commitTimestampMs)));
when(group.isSubscribedToTopic("firstTopic")).thenReturn(true);
when(group.isSubscribedToTopic("secondTopic")).thenReturn(false);
List<Record> records = new ArrayList<>();
assertFalse(context.cleanupExpiredOffsets("group-id", records));
assertEquals(expectedRecords, records);
// Expire secondTopic-1.
context.time.sleep(500);
records = new ArrayList<>();
assertFalse(context.cleanupExpiredOffsets("group-id", records));
verify(context.metrics, times(2)).record(OFFSET_EXPIRED_SENSOR_NAME, 1);
// Add 2 more commits, then expire all.
when(group.isSubscribedToTopic("firstTopic")).thenReturn(false);
context.commitOffset("group-id", "firstTopic", 1, 100L, 0, commitTimestamp + 500);
context.commitOffset("group-id", "secondTopic", 0, 101L, 0, commitTimestamp + 500);
records = new ArrayList<>();
assertTrue(context.cleanupExpiredOffsets("group-id", records));
verify(context.metrics).record(OFFSET_EXPIRED_SENSOR_NAME, 3);
}
@Test
public void testOffsetDeletionsSensor() {
OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build();
GenericGroup group = context.groupMetadataManager.getOrMaybeCreateGenericGroup("foo", true);
context.commitOffset("foo", "bar", 0, 100L, 0);
context.commitOffset("foo", "bar", 1, 150L, 0);
group.setSubscribedTopics(Optional.of(Collections.emptySet()));
OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection requestTopicCollection =
new OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection(Collections.singletonList(
new OffsetDeleteRequestData.OffsetDeleteRequestTopic()
.setName("bar")
.setPartitions(Arrays.asList(
new OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(0),
new OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(1)
))
).iterator());
context.deleteOffsets(
new OffsetDeleteRequestData()
.setGroupId("foo")
.setTopics(requestTopicCollection)
);
verify(context.metrics).record(OFFSET_DELETIONS_SENSOR_NAME, 2);
}
private void verifyReplay(
OffsetMetadataManagerTestContext context,
String groupId,

View File

@ -45,6 +45,7 @@ import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
import org.apache.kafka.coordinator.group.generic.GenericGroup;
import org.apache.kafka.coordinator.group.generic.GenericGroupMember;
import org.apache.kafka.coordinator.group.generic.GenericGroupState;
import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.MetadataVersion;
import org.junit.jupiter.api.Test;
@ -91,6 +92,7 @@ import static org.junit.jupiter.api.AssertionFailureBuilder.assertionFailure;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.Mockito.mock;
public class RecordHelpersTest {
@ -527,7 +529,8 @@ public class RecordHelpersTest {
new LogContext(),
"group-id",
GenericGroupState.PREPARING_REBALANCE,
time
time,
mock(GroupCoordinatorMetricsShard.class)
);
Map<String, byte[]> assignment = new HashMap<>();
@ -597,7 +600,8 @@ public class RecordHelpersTest {
new LogContext(),
"group-id",
GenericGroupState.PREPARING_REBALANCE,
time
time,
mock(GroupCoordinatorMetricsShard.class)
);
expectedMembers.forEach(member -> {
@ -648,7 +652,8 @@ public class RecordHelpersTest {
new LogContext(),
"group-id",
GenericGroupState.PREPARING_REBALANCE,
time
time,
mock(GroupCoordinatorMetricsShard.class)
);
expectedMembers.forEach(member -> {
@ -707,7 +712,8 @@ public class RecordHelpersTest {
new LogContext(),
"group-id",
GenericGroupState.PREPARING_REBALANCE,
time
time,
mock(GroupCoordinatorMetricsShard.class)
);
group.initNextGeneration();

View File

@ -16,6 +16,7 @@
*/
package org.apache.kafka.coordinator.group.consumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.GroupNotEmptyException;
import org.apache.kafka.common.errors.StaleMemberEpochException;
@ -26,6 +27,7 @@ import org.apache.kafka.coordinator.group.GroupMetadataManagerTest;
import org.apache.kafka.coordinator.group.OffsetAndMetadata;
import org.apache.kafka.coordinator.group.OffsetExpirationCondition;
import org.apache.kafka.coordinator.group.OffsetExpirationConditionImpl;
import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.junit.jupiter.api.Test;
@ -46,12 +48,17 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
public class ConsumerGroupTest {
private ConsumerGroup createConsumerGroup(String groupId) {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
return new ConsumerGroup(snapshotRegistry, groupId);
return new ConsumerGroup(
snapshotRegistry,
groupId,
mock(GroupCoordinatorMetricsShard.class)
);
}
@Test
@ -642,7 +649,13 @@ public class ConsumerGroupTest {
@Test
public void testAsListedGroup() {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
ConsumerGroup group = new ConsumerGroup(snapshotRegistry, "group-foo");
GroupCoordinatorMetricsShard metricsShard = new GroupCoordinatorMetricsShard(
snapshotRegistry,
Collections.emptyMap(),
Collections.emptyMap(),
new TopicPartition("__consumer_offsets", 0)
);
ConsumerGroup group = new ConsumerGroup(snapshotRegistry, "group-foo", metricsShard);
snapshotRegistry.getOrCreateSnapshot(0);
assertEquals(ConsumerGroup.ConsumerGroupState.EMPTY.toString(), group.stateAsString(0));
group.updateMember(new ConsumerGroupMember.Builder("member1")
@ -656,7 +669,11 @@ public class ConsumerGroupTest {
@Test
public void testValidateOffsetFetch() {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
ConsumerGroup group = new ConsumerGroup(snapshotRegistry, "group-foo");
ConsumerGroup group = new ConsumerGroup(
snapshotRegistry,
"group-foo",
mock(GroupCoordinatorMetricsShard.class)
);
// Simulate a call from the admin client without member id and member epoch.
group.validateOffsetFetch(null, -1, Long.MAX_VALUE);
@ -715,7 +732,7 @@ public class ConsumerGroupTest {
long commitTimestamp = 20000L;
long offsetsRetentionMs = 10000L;
OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(15000L, OptionalInt.empty(), "", commitTimestamp, OptionalLong.empty());
ConsumerGroup group = new ConsumerGroup(new SnapshotRegistry(new LogContext()), "group-id");
ConsumerGroup group = new ConsumerGroup(new SnapshotRegistry(new LogContext()), "group-id", mock(GroupCoordinatorMetricsShard.class));
Optional<OffsetExpirationCondition> offsetExpirationCondition = group.offsetExpirationCondition();
assertTrue(offsetExpirationCondition.isPresent());

View File

@ -18,6 +18,7 @@ package org.apache.kafka.coordinator.group.generic;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
import org.apache.kafka.common.errors.FencedInstanceIdException;
import org.apache.kafka.common.errors.GroupIdNotFoundException;
@ -36,6 +37,8 @@ import org.apache.kafka.common.utils.Time;
import org.apache.kafka.coordinator.group.OffsetAndMetadata;
import org.apache.kafka.coordinator.group.OffsetExpirationCondition;
import org.apache.kafka.coordinator.group.OffsetExpirationConditionImpl;
import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@ -59,6 +62,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
public class GenericGroupTest {
private final String protocolType = "consumer";
@ -68,12 +72,19 @@ public class GenericGroupTest {
private final String clientHost = "clientHost";
private final int rebalanceTimeoutMs = 60000;
private final int sessionTimeoutMs = 10000;
private final LogContext logContext = new LogContext();
private final GroupCoordinatorMetricsShard metrics = new GroupCoordinatorMetricsShard(
new SnapshotRegistry(logContext),
Collections.emptyMap(),
Collections.emptyMap(),
new TopicPartition("__consumer_offsets", 0)
);
private GenericGroup group = null;
@BeforeEach
public void initialize() {
group = new GenericGroup(new LogContext(), "groupId", EMPTY, Time.SYSTEM);
group = new GenericGroup(logContext, "groupId", EMPTY, Time.SYSTEM, metrics);
}
@Test
@ -1099,7 +1110,7 @@ public class GenericGroupTest {
OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(15000L, OptionalInt.empty(), "", commitTimestamp, OptionalLong.empty());
MockTime time = new MockTime();
long currentStateTimestamp = time.milliseconds();
GenericGroup group = new GenericGroup(new LogContext(), "groupId", EMPTY, time);
GenericGroup group = new GenericGroup(new LogContext(), "groupId", EMPTY, time, mock(GroupCoordinatorMetricsShard.class));
// 1. Test no protocol type. Simple consumer case, Base timestamp based off of last commit timestamp.
Optional<OffsetExpirationCondition> offsetExpirationCondition = group.offsetExpirationCondition();
@ -1174,7 +1185,7 @@ public class GenericGroupTest {
@Test
public void testIsSubscribedToTopic() {
GenericGroup group = new GenericGroup(new LogContext(), "groupId", EMPTY, Time.SYSTEM);
GenericGroup group = new GenericGroup(new LogContext(), "groupId", EMPTY, Time.SYSTEM, mock(GroupCoordinatorMetricsShard.class));
// 1. group has no protocol type => not subscribed
assertFalse(group.isSubscribedToTopic("topic"));

View File

@ -0,0 +1,249 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.metrics;
import com.yammer.metrics.core.MetricsRegistry;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
import org.apache.kafka.coordinator.group.generic.GenericGroup;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.junit.jupiter.api.Test;
import java.util.Collections;
import static org.apache.kafka.coordinator.group.generic.GenericGroupState.COMPLETING_REBALANCE;
import static org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD;
import static org.apache.kafka.coordinator.group.generic.GenericGroupState.EMPTY;
import static org.apache.kafka.coordinator.group.generic.GenericGroupState.PREPARING_REBALANCE;
import static org.apache.kafka.coordinator.group.generic.GenericGroupState.STABLE;
import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.NUM_CONSUMER_GROUPS;
import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.NUM_CONSUMER_GROUPS_ASSIGNING;
import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.NUM_CONSUMER_GROUPS_DEAD;
import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.NUM_CONSUMER_GROUPS_EMPTY;
import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.NUM_CONSUMER_GROUPS_RECONCILING;
import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.NUM_CONSUMER_GROUPS_STABLE;
import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.NUM_GENERIC_GROUPS;
import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.NUM_GENERIC_GROUPS_COMPLETING_REBALANCE;
import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.NUM_GENERIC_GROUPS_DEAD;
import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.NUM_GENERIC_GROUPS_EMPTY;
import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.NUM_GENERIC_GROUPS_PREPARING_REBALANCE;
import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.NUM_GENERIC_GROUPS_STABLE;
import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.NUM_OFFSETS;
import static org.apache.kafka.coordinator.group.metrics.MetricsTestUtils.assertGaugeValue;
import static org.apache.kafka.coordinator.group.metrics.MetricsTestUtils.metricName;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class GroupCoordinatorMetricsShardTest {
@Test
public void testLocalGauges() {
MetricsRegistry registry = new MetricsRegistry();
Metrics metrics = new Metrics();
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
TopicPartition tp = new TopicPartition("__consumer_offsets", 0);
GroupCoordinatorMetrics coordinatorMetrics = new GroupCoordinatorMetrics(registry, metrics);
GroupCoordinatorMetricsShard shard = coordinatorMetrics.newMetricsShard(snapshotRegistry, tp);
shard.incrementLocalGauge(NUM_OFFSETS);
shard.incrementLocalGauge(NUM_GENERIC_GROUPS);
shard.incrementLocalGauge(NUM_CONSUMER_GROUPS);
shard.incrementLocalGauge(NUM_CONSUMER_GROUPS_EMPTY);
shard.incrementLocalGauge(NUM_CONSUMER_GROUPS_ASSIGNING);
shard.incrementLocalGauge(NUM_CONSUMER_GROUPS_RECONCILING);
shard.incrementLocalGauge(NUM_CONSUMER_GROUPS_STABLE);
shard.incrementLocalGauge(NUM_CONSUMER_GROUPS_DEAD);
snapshotRegistry.getOrCreateSnapshot(1000);
// The value should not be updated until the offset has been committed.
assertEquals(0, shard.localGaugeValue(NUM_OFFSETS));
assertEquals(0, shard.localGaugeValue(NUM_GENERIC_GROUPS));
assertEquals(0, shard.localGaugeValue(NUM_CONSUMER_GROUPS));
assertEquals(0, shard.localGaugeValue(NUM_CONSUMER_GROUPS_EMPTY));
assertEquals(0, shard.localGaugeValue(NUM_CONSUMER_GROUPS_ASSIGNING));
assertEquals(0, shard.localGaugeValue(NUM_CONSUMER_GROUPS_RECONCILING));
assertEquals(0, shard.localGaugeValue(NUM_CONSUMER_GROUPS_STABLE));
assertEquals(0, shard.localGaugeValue(NUM_CONSUMER_GROUPS_DEAD));
shard.commitUpTo(1000);
assertEquals(1, shard.localGaugeValue(NUM_OFFSETS));
assertEquals(1, shard.localGaugeValue(NUM_GENERIC_GROUPS));
assertEquals(1, shard.localGaugeValue(NUM_CONSUMER_GROUPS));
assertEquals(1, shard.localGaugeValue(NUM_CONSUMER_GROUPS_EMPTY));
assertEquals(1, shard.localGaugeValue(NUM_CONSUMER_GROUPS_ASSIGNING));
assertEquals(1, shard.localGaugeValue(NUM_CONSUMER_GROUPS_RECONCILING));
assertEquals(1, shard.localGaugeValue(NUM_CONSUMER_GROUPS_STABLE));
assertEquals(1, shard.localGaugeValue(NUM_CONSUMER_GROUPS_DEAD));
shard.decrementLocalGauge(NUM_OFFSETS);
shard.decrementLocalGauge(NUM_GENERIC_GROUPS);
shard.decrementLocalGauge(NUM_CONSUMER_GROUPS);
shard.decrementLocalGauge(NUM_CONSUMER_GROUPS_EMPTY);
shard.decrementLocalGauge(NUM_CONSUMER_GROUPS_ASSIGNING);
shard.decrementLocalGauge(NUM_CONSUMER_GROUPS_RECONCILING);
shard.decrementLocalGauge(NUM_CONSUMER_GROUPS_STABLE);
shard.decrementLocalGauge(NUM_CONSUMER_GROUPS_DEAD);
snapshotRegistry.getOrCreateSnapshot(2000);
shard.commitUpTo(2000);
assertEquals(0, shard.localGaugeValue(NUM_OFFSETS));
assertEquals(0, shard.localGaugeValue(NUM_GENERIC_GROUPS));
assertEquals(0, shard.localGaugeValue(NUM_CONSUMER_GROUPS));
assertEquals(0, shard.localGaugeValue(NUM_CONSUMER_GROUPS_EMPTY));
assertEquals(0, shard.localGaugeValue(NUM_CONSUMER_GROUPS_ASSIGNING));
assertEquals(0, shard.localGaugeValue(NUM_CONSUMER_GROUPS_RECONCILING));
assertEquals(0, shard.localGaugeValue(NUM_CONSUMER_GROUPS_STABLE));
assertEquals(0, shard.localGaugeValue(NUM_CONSUMER_GROUPS_DEAD));
}
@Test
public void testGenericGroupStateTransitionMetrics() {
MetricsRegistry registry = new MetricsRegistry();
Metrics metrics = new Metrics();
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
TopicPartition tp = new TopicPartition("__consumer_offsets", 0);
GroupCoordinatorMetrics coordinatorMetrics = new GroupCoordinatorMetrics(registry, metrics);
GroupCoordinatorMetricsShard shard = coordinatorMetrics.newMetricsShard(snapshotRegistry, tp);
coordinatorMetrics.activateMetricsShard(shard);
LogContext logContext = new LogContext();
GenericGroup group0 = new GenericGroup(logContext, "groupId0", EMPTY, Time.SYSTEM, shard);
GenericGroup group1 = new GenericGroup(logContext, "groupId1", EMPTY, Time.SYSTEM, shard);
GenericGroup group2 = new GenericGroup(logContext, "groupId2", EMPTY, Time.SYSTEM, shard);
GenericGroup group3 = new GenericGroup(logContext, "groupId3", EMPTY, Time.SYSTEM, shard);
snapshotRegistry.getOrCreateSnapshot(1000);
shard.commitUpTo(1000);
assertEquals(4, shard.localGaugeValue(NUM_GENERIC_GROUPS));
group0.transitionTo(PREPARING_REBALANCE);
group0.transitionTo(COMPLETING_REBALANCE);
group1.transitionTo(PREPARING_REBALANCE);
group2.transitionTo(DEAD);
snapshotRegistry.getOrCreateSnapshot(2000);
shard.commitUpTo(2000);
assertEquals(1, shard.globalGaugeValue(NUM_GENERIC_GROUPS_EMPTY));
assertEquals(1, shard.globalGaugeValue(NUM_GENERIC_GROUPS_PREPARING_REBALANCE));
assertEquals(1, shard.globalGaugeValue(NUM_GENERIC_GROUPS_COMPLETING_REBALANCE));
assertEquals(1, shard.globalGaugeValue(NUM_GENERIC_GROUPS_DEAD));
assertEquals(0, shard.globalGaugeValue(NUM_GENERIC_GROUPS_STABLE));
group0.transitionTo(STABLE);
group1.transitionTo(COMPLETING_REBALANCE);
group3.transitionTo(DEAD);
snapshotRegistry.getOrCreateSnapshot(3000);
shard.commitUpTo(3000);
assertEquals(0, shard.globalGaugeValue(NUM_GENERIC_GROUPS_EMPTY));
assertEquals(0, shard.globalGaugeValue(NUM_GENERIC_GROUPS_PREPARING_REBALANCE));
assertEquals(1, shard.globalGaugeValue(NUM_GENERIC_GROUPS_COMPLETING_REBALANCE));
assertEquals(2, shard.globalGaugeValue(NUM_GENERIC_GROUPS_DEAD));
assertEquals(1, shard.globalGaugeValue(NUM_GENERIC_GROUPS_STABLE));
assertGaugeValue(registry, metricName("GroupMetadataManager", "NumGroups"), 4);
assertGaugeValue(registry, metricName("GroupMetadataManager", "NumGroupsEmpty"), 0);
assertGaugeValue(registry, metricName("GroupMetadataManager", "NumGroupsPreparingRebalance"), 0);
assertGaugeValue(registry, metricName("GroupMetadataManager", "NumGroupsCompletingRebalance"), 1);
assertGaugeValue(registry, metricName("GroupMetadataManager", "NumGroupsDead"), 2);
assertGaugeValue(registry, metricName("GroupMetadataManager", "NumGroupsStable"), 1);
}
@Test
public void testConsumerGroupStateTransitionMetrics() {
MetricsRegistry registry = new MetricsRegistry();
Metrics metrics = new Metrics();
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
TopicPartition tp = new TopicPartition("__consumer_offsets", 0);
GroupCoordinatorMetrics coordinatorMetrics = new GroupCoordinatorMetrics(registry, metrics);
GroupCoordinatorMetricsShard shard = coordinatorMetrics.newMetricsShard(snapshotRegistry, tp);
coordinatorMetrics.activateMetricsShard(shard);
ConsumerGroup group0 = new ConsumerGroup(
snapshotRegistry,
"group-0",
shard
);
ConsumerGroup group1 = new ConsumerGroup(
snapshotRegistry,
"group-1",
shard
);
ConsumerGroup group2 = new ConsumerGroup(
snapshotRegistry,
"group-2",
shard
);
ConsumerGroup group3 = new ConsumerGroup(
snapshotRegistry,
"group-3",
shard
);
snapshotRegistry.getOrCreateSnapshot(1000);
shard.commitUpTo(1000);
assertEquals(4, shard.localGaugeValue(NUM_CONSUMER_GROUPS));
assertEquals(4, shard.localGaugeValue(NUM_CONSUMER_GROUPS_EMPTY));
ConsumerGroupMember member0 = group0.getOrMaybeCreateMember("member-id", true);
ConsumerGroupMember member1 = group1.getOrMaybeCreateMember("member-id", true);
ConsumerGroupMember member2 = group2.getOrMaybeCreateMember("member-id", true);
ConsumerGroupMember member3 = group3.getOrMaybeCreateMember("member-id", true);
group0.updateMember(member0);
group1.updateMember(member1);
group2.updateMember(member2);
group3.updateMember(member3);
snapshotRegistry.getOrCreateSnapshot(2000);
shard.commitUpTo(2000);
assertEquals(0, shard.localGaugeValue(NUM_CONSUMER_GROUPS_EMPTY));
assertEquals(4, shard.localGaugeValue(NUM_CONSUMER_GROUPS_STABLE));
group2.setGroupEpoch(1);
group3.setGroupEpoch(1);
snapshotRegistry.getOrCreateSnapshot(3000);
shard.commitUpTo(3000);
assertEquals(0, shard.localGaugeValue(NUM_CONSUMER_GROUPS_EMPTY));
assertEquals(2, shard.localGaugeValue(NUM_CONSUMER_GROUPS_ASSIGNING));
assertEquals(2, shard.localGaugeValue(NUM_CONSUMER_GROUPS_STABLE));
group2.setTargetAssignmentEpoch(1);
// Set member2 to ASSIGNING state.
new ConsumerGroupMember.Builder(member2)
.setPartitionsPendingAssignment(Collections.singletonMap(Uuid.ZERO_UUID, Collections.singleton(0)))
.build();
snapshotRegistry.getOrCreateSnapshot(4000);
shard.commitUpTo(4000);
assertEquals(0, shard.localGaugeValue(NUM_CONSUMER_GROUPS_EMPTY));
assertEquals(1, shard.localGaugeValue(NUM_CONSUMER_GROUPS_ASSIGNING));
assertEquals(1, shard.localGaugeValue(NUM_CONSUMER_GROUPS_RECONCILING));
assertEquals(2, shard.localGaugeValue(NUM_CONSUMER_GROUPS_STABLE));
assertGaugeValue(registry, metricName("GroupMetadataManager", "NumConsumerGroups"), 4);
assertGaugeValue(registry, metricName("GroupMetadataManager", "NumConsumerGroupsEmpty"), 0);
assertGaugeValue(registry, metricName("GroupMetadataManager", "NumConsumerGroupsAssigning"), 1);
assertGaugeValue(registry, metricName("GroupMetadataManager", "NumConsumerGroupsReconciling"), 1);
assertGaugeValue(registry, metricName("GroupMetadataManager", "NumConsumerGroupsStable"), 2);
assertGaugeValue(registry, metricName("GroupMetadataManager", "NumConsumerGroupsDead"), 0);
}
}

View File

@ -0,0 +1,169 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.metrics;
import com.yammer.metrics.core.MetricsRegistry;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.stream.IntStream;
import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.GENERIC_GROUP_COMPLETED_REBALANCES_SENSOR_NAME;
import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.CONSUMER_GROUP_REBALANCES_SENSOR_NAME;
import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.GENERIC_GROUP_REBALANCES_SENSOR_NAME;
import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.NUM_CONSUMER_GROUPS;
import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.NUM_OFFSETS;
import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.OFFSET_COMMITS_SENSOR_NAME;
import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.OFFSET_EXPIRED_SENSOR_NAME;
import static org.apache.kafka.coordinator.group.metrics.MetricsTestUtils.assertGaugeValue;
import static org.apache.kafka.coordinator.group.metrics.MetricsTestUtils.assertMetricsForTypeEqual;
import static org.apache.kafka.coordinator.group.metrics.MetricsTestUtils.metricName;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class GroupCoordinatorMetricsTest {
@Test
public void testMetricNames() {
MetricsRegistry registry = new MetricsRegistry();
Metrics metrics = new Metrics();
HashSet<org.apache.kafka.common.MetricName> expectedMetrics = new HashSet<>(Arrays.asList(
metrics.metricName("offset-commit-rate", GroupCoordinatorMetrics.METRICS_GROUP),
metrics.metricName("offset-commit-count", GroupCoordinatorMetrics.METRICS_GROUP),
metrics.metricName("offset-expiration-rate", GroupCoordinatorMetrics.METRICS_GROUP),
metrics.metricName("offset-expiration-count", GroupCoordinatorMetrics.METRICS_GROUP),
metrics.metricName("offset-deletion-rate", GroupCoordinatorMetrics.METRICS_GROUP),
metrics.metricName("offset-deletion-count", GroupCoordinatorMetrics.METRICS_GROUP),
metrics.metricName("group-completed-rebalance-rate", GroupCoordinatorMetrics.METRICS_GROUP),
metrics.metricName("group-completed-rebalance-count", GroupCoordinatorMetrics.METRICS_GROUP),
metrics.metricName("group-rebalance-rate", GroupCoordinatorMetrics.METRICS_GROUP),
metrics.metricName("group-rebalance-count", GroupCoordinatorMetrics.METRICS_GROUP),
metrics.metricName("consumer-group-rebalance-rate", GroupCoordinatorMetrics.METRICS_GROUP),
metrics.metricName("consumer-group-rebalance-count", GroupCoordinatorMetrics.METRICS_GROUP)
));
try {
try (GroupCoordinatorMetrics ignored = new GroupCoordinatorMetrics(registry, metrics)) {
HashSet<String> expectedRegistry = new HashSet<>(Arrays.asList(
"kafka.coordinator.group:type=GroupMetadataManager,name=NumOffsets",
"kafka.coordinator.group:type=GroupMetadataManager,name=NumGroups",
"kafka.coordinator.group:type=GroupMetadataManager,name=NumGroupsPreparingRebalance",
"kafka.coordinator.group:type=GroupMetadataManager,name=NumGroupsCompletingRebalance",
"kafka.coordinator.group:type=GroupMetadataManager,name=NumGroupsStable",
"kafka.coordinator.group:type=GroupMetadataManager,name=NumGroupsDead",
"kafka.coordinator.group:type=GroupMetadataManager,name=NumGroupsEmpty",
"kafka.coordinator.group:type=GroupMetadataManager,name=NumConsumerGroups",
"kafka.coordinator.group:type=GroupMetadataManager,name=NumConsumerGroupsEmpty",
"kafka.coordinator.group:type=GroupMetadataManager,name=NumConsumerGroupsAssigning",
"kafka.coordinator.group:type=GroupMetadataManager,name=NumConsumerGroupsReconciling",
"kafka.coordinator.group:type=GroupMetadataManager,name=NumConsumerGroupsStable",
"kafka.coordinator.group:type=GroupMetadataManager,name=NumConsumerGroupsDead"
));
assertMetricsForTypeEqual(registry, "kafka.coordinator.group", expectedRegistry);
expectedMetrics.forEach(metricName -> assertTrue(metrics.metrics().containsKey(metricName)));
}
assertMetricsForTypeEqual(registry, "kafka.coordinator.group", Collections.emptySet());
expectedMetrics.forEach(metricName -> assertFalse(metrics.metrics().containsKey(metricName)));
} finally {
registry.shutdown();
}
}
@Test
public void sumLocalGauges() {
MetricsRegistry registry = new MetricsRegistry();
Metrics metrics = new Metrics();
GroupCoordinatorMetrics coordinatorMetrics = new GroupCoordinatorMetrics(registry, metrics);
SnapshotRegistry snapshotRegistry0 = new SnapshotRegistry(new LogContext());
SnapshotRegistry snapshotRegistry1 = new SnapshotRegistry(new LogContext());
TopicPartition tp0 = new TopicPartition("__consumer_offsets", 0);
TopicPartition tp1 = new TopicPartition("__consumer_offsets", 1);
GroupCoordinatorMetricsShard shard0 = coordinatorMetrics.newMetricsShard(snapshotRegistry0, tp0);
GroupCoordinatorMetricsShard shard1 = coordinatorMetrics.newMetricsShard(snapshotRegistry1, tp1);
coordinatorMetrics.activateMetricsShard(shard0);
coordinatorMetrics.activateMetricsShard(shard1);
IntStream.range(0, 5).forEach(__ -> shard0.incrementLocalGauge(NUM_CONSUMER_GROUPS));
IntStream.range(0, 5).forEach(__ -> shard1.incrementLocalGauge(NUM_CONSUMER_GROUPS));
IntStream.range(0, 3).forEach(__ -> shard1.decrementLocalGauge(NUM_CONSUMER_GROUPS));
IntStream.range(0, 6).forEach(__ -> shard0.incrementLocalGauge(NUM_OFFSETS));
IntStream.range(0, 2).forEach(__ -> shard1.incrementLocalGauge(NUM_OFFSETS));
IntStream.range(0, 1).forEach(__ -> shard1.decrementLocalGauge(NUM_OFFSETS));
snapshotRegistry0.getOrCreateSnapshot(1000);
snapshotRegistry1.getOrCreateSnapshot(1500);
shard0.commitUpTo(1000);
shard1.commitUpTo(1500);
assertEquals(5, shard0.localGaugeValue(NUM_CONSUMER_GROUPS));
assertEquals(2, shard1.localGaugeValue(NUM_CONSUMER_GROUPS));
assertEquals(6, shard0.localGaugeValue(NUM_OFFSETS));
assertEquals(1, shard1.localGaugeValue(NUM_OFFSETS));
assertEquals(7, coordinatorMetrics.numConsumerGroups());
assertEquals(7, coordinatorMetrics.numOffsets());
assertGaugeValue(registry, metricName("GroupMetadataManager", "NumConsumerGroups"), 7);
assertGaugeValue(registry, metricName("GroupMetadataManager", "NumOffsets"), 7);
}
@Test
public void testGlobalSensors() {
MetricsRegistry registry = new MetricsRegistry();
Time time = new MockTime();
Metrics metrics = new Metrics(time);
GroupCoordinatorMetrics coordinatorMetrics = new GroupCoordinatorMetrics(registry, metrics);
GroupCoordinatorMetricsShard shard = coordinatorMetrics.newMetricsShard(
new SnapshotRegistry(new LogContext()), new TopicPartition("__consumer_offsets", 0)
);
shard.record(GENERIC_GROUP_COMPLETED_REBALANCES_SENSOR_NAME, 10);
assertMetricValue(metrics, metrics.metricName("group-completed-rebalance-rate", GroupCoordinatorMetrics.METRICS_GROUP), 1.0 / 3.0);
assertMetricValue(metrics, metrics.metricName("group-completed-rebalance-count", GroupCoordinatorMetrics.METRICS_GROUP), 10);
shard.record(OFFSET_COMMITS_SENSOR_NAME, 20);
assertMetricValue(metrics, metrics.metricName("offset-commit-rate", GroupCoordinatorMetrics.METRICS_GROUP), 2.0 / 3.0);
assertMetricValue(metrics, metrics.metricName("offset-commit-count", GroupCoordinatorMetrics.METRICS_GROUP), 20);
shard.record(OFFSET_EXPIRED_SENSOR_NAME, 30);
assertMetricValue(metrics, metrics.metricName("offset-expiration-rate", GroupCoordinatorMetrics.METRICS_GROUP), 1.0);
assertMetricValue(metrics, metrics.metricName("offset-expiration-count", GroupCoordinatorMetrics.METRICS_GROUP), 30);
shard.record(GENERIC_GROUP_REBALANCES_SENSOR_NAME, 40);
assertMetricValue(metrics, metrics.metricName("group-rebalance-rate", GroupCoordinatorMetrics.METRICS_GROUP), 4.0 / 3.0);
assertMetricValue(metrics, metrics.metricName("group-rebalance-count", GroupCoordinatorMetrics.METRICS_GROUP), 40);
shard.record(CONSUMER_GROUP_REBALANCES_SENSOR_NAME, 50);
assertMetricValue(metrics, metrics.metricName("consumer-group-rebalance-rate", GroupCoordinatorMetrics.METRICS_GROUP), 5.0 / 3.0);
assertMetricValue(metrics, metrics.metricName("consumer-group-rebalance-count", GroupCoordinatorMetrics.METRICS_GROUP), 50);
}
private void assertMetricValue(Metrics metrics, MetricName metricName, double val) {
assertEquals(val, metrics.metric(metricName).metricValue());
}
}

View File

@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.metrics;
import com.yammer.metrics.core.Gauge;
import com.yammer.metrics.core.MetricName;
import com.yammer.metrics.core.MetricsRegistry;
import java.util.Set;
import java.util.TreeSet;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class MetricsTestUtils {
static void assertMetricsForTypeEqual(
MetricsRegistry registry,
String expectedPrefix,
Set<String> expected
) {
Set<String> actual = new TreeSet<>();
registry.allMetrics().forEach((name, __) -> {
StringBuilder bld = new StringBuilder();
bld.append(name.getGroup());
bld.append(":type=").append(name.getType());
bld.append(",name=").append(name.getName());
if (bld.toString().startsWith(expectedPrefix)) {
actual.add(bld.toString());
}
});
assertEquals(new TreeSet<>(expected), actual);
}
static void assertGaugeValue(MetricsRegistry registry, MetricName metricName, long count) {
Gauge gauge = (Gauge) registry.allMetrics().get(metricName);
assertEquals(count, (long) gauge.value());
}
static MetricName metricName(String type, String name) {
String mBeanName = String.format("kafka.coordinator.group:type=%s,name=%s", type, name);
return new MetricName("kafka.coordinator.group", type, name, null, mBeanName);
}
}

View File

@ -21,6 +21,8 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.NotCoordinatorException;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.coordinator.group.metrics.CoordinatorMetrics;
import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics;
import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorRuntimeMetrics;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
@ -245,6 +247,12 @@ public class CoordinatorRuntimeTest {
return this;
}
@Override
public CoordinatorShardBuilder<MockCoordinatorShard, String> withCoordinatorMetrics(CoordinatorMetrics coordinatorMetrics) {
return this;
}
@Override
public CoordinatorShardBuilder<MockCoordinatorShard, String> withTopicPartition(
TopicPartition topicPartition
) {
@ -288,12 +296,14 @@ public class CoordinatorRuntimeTest {
.withPartitionWriter(writer)
.withCoordinatorShardBuilderSupplier(supplier)
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
.withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
.build();
when(builder.withSnapshotRegistry(any())).thenReturn(builder);
when(builder.withLogContext(any())).thenReturn(builder);
when(builder.withTime(any())).thenReturn(builder);
when(builder.withTimer(any())).thenReturn(builder);
when(builder.withCoordinatorMetrics(any())).thenReturn(builder);
when(builder.withTopicPartition(any())).thenReturn(builder);
when(builder.build()).thenReturn(coordinator);
when(supplier.get()).thenReturn(builder);
@ -353,12 +363,14 @@ public class CoordinatorRuntimeTest {
.withPartitionWriter(writer)
.withCoordinatorShardBuilderSupplier(supplier)
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
.withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
.build();
when(builder.withSnapshotRegistry(any())).thenReturn(builder);
when(builder.withLogContext(any())).thenReturn(builder);
when(builder.withTime(any())).thenReturn(builder);
when(builder.withTimer(any())).thenReturn(builder);
when(builder.withCoordinatorMetrics(any())).thenReturn(builder);
when(builder.withTopicPartition(any())).thenReturn(builder);
when(builder.build()).thenReturn(coordinator);
when(supplier.get()).thenReturn(builder);
@ -405,12 +417,14 @@ public class CoordinatorRuntimeTest {
.withPartitionWriter(new MockPartitionWriter())
.withCoordinatorShardBuilderSupplier(supplier)
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
.withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
.build();
when(builder.withSnapshotRegistry(any())).thenReturn(builder);
when(builder.withLogContext(any())).thenReturn(builder);
when(builder.withTime(any())).thenReturn(builder);
when(builder.withTimer(any())).thenReturn(builder);
when(builder.withCoordinatorMetrics(any())).thenReturn(builder);
when(builder.withTopicPartition(any())).thenReturn(builder);
when(builder.build()).thenReturn(coordinator);
when(supplier.get()).thenReturn(builder);
@ -455,12 +469,14 @@ public class CoordinatorRuntimeTest {
.withPartitionWriter(new MockPartitionWriter())
.withCoordinatorShardBuilderSupplier(supplier)
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
.withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
.build();
when(builder.withSnapshotRegistry(any())).thenReturn(builder);
when(builder.withLogContext(any())).thenReturn(builder);
when(builder.withTime(any())).thenReturn(builder);
when(builder.withTimer(any())).thenReturn(builder);
when(builder.withCoordinatorMetrics(any())).thenReturn(builder);
when(builder.withTopicPartition(any())).thenReturn(builder);
when(builder.build()).thenReturn(coordinator);
when(supplier.get()).thenReturn(builder);
@ -522,12 +538,14 @@ public class CoordinatorRuntimeTest {
.withPartitionWriter(writer)
.withCoordinatorShardBuilderSupplier(supplier)
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
.withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
.build();
when(builder.withSnapshotRegistry(any())).thenReturn(builder);
when(builder.withLogContext(any())).thenReturn(builder);
when(builder.withTime(any())).thenReturn(builder);
when(builder.withTimer(any())).thenReturn(builder);
when(builder.withCoordinatorMetrics(any())).thenReturn(builder);
when(builder.withTopicPartition(any())).thenReturn(builder);
when(builder.build()).thenReturn(coordinator);
when(supplier.get()).thenReturn(builder);
@ -571,6 +589,7 @@ public class CoordinatorRuntimeTest {
.withPartitionWriter(writer)
.withCoordinatorShardBuilderSupplier(supplier)
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
.withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
.build();
when(builder.withSnapshotRegistry(any())).thenReturn(builder);
@ -610,6 +629,7 @@ public class CoordinatorRuntimeTest {
.withPartitionWriter(new MockPartitionWriter())
.withCoordinatorShardBuilderSupplier(supplier)
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
.withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
.build();
when(builder.withSnapshotRegistry(any())).thenReturn(builder);
@ -618,6 +638,7 @@ public class CoordinatorRuntimeTest {
when(builder.withTimer(any())).thenReturn(builder);
when(builder.withTime(any())).thenReturn(builder);
when(builder.withTimer(any())).thenReturn(builder);
when(builder.withCoordinatorMetrics(any())).thenReturn(builder);
when(builder.withTopicPartition(any())).thenReturn(builder);
when(builder.build()).thenReturn(coordinator);
when(supplier.get()).thenReturn(builder);
@ -649,6 +670,7 @@ public class CoordinatorRuntimeTest {
.withPartitionWriter(writer)
.withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier())
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
.withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
.build();
// Schedule the loading.
@ -758,6 +780,7 @@ public class CoordinatorRuntimeTest {
.withPartitionWriter(new MockPartitionWriter())
.withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier())
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
.withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
.build();
// Scheduling a write fails with a NotCoordinatorException because the coordinator
@ -779,6 +802,7 @@ public class CoordinatorRuntimeTest {
.withPartitionWriter(new MockPartitionWriter())
.withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier())
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
.withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
.build();
// Loads the coordinator.
@ -804,6 +828,7 @@ public class CoordinatorRuntimeTest {
.withPartitionWriter(new MockPartitionWriter())
.withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier())
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
.withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
.build();
// Loads the coordinator.
@ -850,6 +875,7 @@ public class CoordinatorRuntimeTest {
.withPartitionWriter(writer)
.withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier())
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
.withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
.build();
// Loads the coordinator.
@ -898,6 +924,7 @@ public class CoordinatorRuntimeTest {
.withPartitionWriter(writer)
.withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier())
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
.withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
.build();
// Loads the coordinator.
@ -953,6 +980,7 @@ public class CoordinatorRuntimeTest {
.withPartitionWriter(new MockPartitionWriter())
.withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier())
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
.withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
.build();
// Schedule a read. It fails because the coordinator does not exist.
@ -975,6 +1003,7 @@ public class CoordinatorRuntimeTest {
.withPartitionWriter(writer)
.withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier())
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
.withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
.build();
// Loads the coordinator.
@ -1017,6 +1046,7 @@ public class CoordinatorRuntimeTest {
.withPartitionWriter(new MockPartitionWriter())
.withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier())
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
.withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
.build();
// Loads the coordinator.
@ -1083,6 +1113,7 @@ public class CoordinatorRuntimeTest {
.withPartitionWriter(writer)
.withCoordinatorShardBuilderSupplier(supplier)
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
.withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
.build();
MockCoordinatorShard coordinator0 = mock(MockCoordinatorShard.class);
@ -1093,6 +1124,7 @@ public class CoordinatorRuntimeTest {
when(builder.withLogContext(any())).thenReturn(builder);
when(builder.withTime(any())).thenReturn(builder);
when(builder.withTimer(any())).thenReturn(builder);
when(builder.withCoordinatorMetrics(any())).thenReturn(builder);
when(builder.withTopicPartition(any())).thenReturn(builder);
when(builder.withTime(any())).thenReturn(builder);
when(builder.build())
@ -1139,6 +1171,7 @@ public class CoordinatorRuntimeTest {
.withPartitionWriter(new MockPartitionWriter())
.withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier())
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
.withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
.build();
// Loads the coordinator.
@ -1191,6 +1224,7 @@ public class CoordinatorRuntimeTest {
.withPartitionWriter(new MockPartitionWriter())
.withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier())
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
.withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
.build();
// Loads the coordinator.
@ -1263,6 +1297,7 @@ public class CoordinatorRuntimeTest {
.withPartitionWriter(new MockPartitionWriter())
.withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier())
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
.withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
.build();
// Loads the coordinator.
@ -1332,6 +1367,7 @@ public class CoordinatorRuntimeTest {
.withPartitionWriter(new MockPartitionWriter())
.withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier())
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
.withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
.build();
// Loads the coordinator.
@ -1389,6 +1425,7 @@ public class CoordinatorRuntimeTest {
.withPartitionWriter(new MockPartitionWriter())
.withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier())
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
.withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
.build();
// Loads the coordinator.
@ -1436,12 +1473,14 @@ public class CoordinatorRuntimeTest {
.withPartitionWriter(writer)
.withCoordinatorShardBuilderSupplier(supplier)
.withCoordinatorRuntimeMetrics(runtimeMetrics)
.withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
.build();
when(builder.withSnapshotRegistry(any())).thenReturn(builder);
when(builder.withLogContext(any())).thenReturn(builder);
when(builder.withTime(any())).thenReturn(builder);
when(builder.withTimer(any())).thenReturn(builder);
when(builder.withCoordinatorMetrics(any())).thenReturn(builder);
when(builder.withTopicPartition(any())).thenReturn(builder);
when(builder.build()).thenReturn(coordinator);
when(supplier.get()).thenReturn(builder);
@ -1506,12 +1545,14 @@ public class CoordinatorRuntimeTest {
.withPartitionWriter(writer)
.withCoordinatorShardBuilderSupplier(supplier)
.withCoordinatorRuntimeMetrics(runtimeMetrics)
.withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
.build();
when(builder.withSnapshotRegistry(any())).thenReturn(builder);
when(builder.withLogContext(any())).thenReturn(builder);
when(builder.withTime(any())).thenReturn(builder);
when(builder.withTimer(any())).thenReturn(builder);
when(builder.withCoordinatorMetrics(any())).thenReturn(builder);
when(builder.withTopicPartition(any())).thenReturn(builder);
when(builder.build()).thenReturn(coordinator);
when(supplier.get()).thenReturn(builder);