mirror of https://github.com/apache/kafka.git
MINOR: Remove classic group preparing rebalance sensor (#15143)
Remove "group-rebalance-rate" and "group-rebalance-count" metrics from the new coordinator as this is not part of KIP-848. Reviewers: David Jacot <djacot@confluent.io>
This commit is contained in:
parent
71b2cd6173
commit
ac7ddc7d46
|
@ -127,7 +127,6 @@ import static org.apache.kafka.coordinator.group.classic.ClassicGroupState.PREPA
|
||||||
import static org.apache.kafka.coordinator.group.classic.ClassicGroupState.STABLE;
|
import static org.apache.kafka.coordinator.group.classic.ClassicGroupState.STABLE;
|
||||||
import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.CLASSIC_GROUP_COMPLETED_REBALANCES_SENSOR_NAME;
|
import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.CLASSIC_GROUP_COMPLETED_REBALANCES_SENSOR_NAME;
|
||||||
import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.CONSUMER_GROUP_REBALANCES_SENSOR_NAME;
|
import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.CONSUMER_GROUP_REBALANCES_SENSOR_NAME;
|
||||||
import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.CLASSIC_GROUP_REBALANCES_SENSOR_NAME;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The GroupMetadataManager manages the metadata of all classic and consumer groups. It holds
|
* The GroupMetadataManager manages the metadata of all classic and consumer groups. It holds
|
||||||
|
@ -2639,7 +2638,6 @@ public class GroupMetadataManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
group.transitionTo(PREPARING_REBALANCE);
|
group.transitionTo(PREPARING_REBALANCE);
|
||||||
metrics.record(CLASSIC_GROUP_REBALANCES_SENSOR_NAME);
|
|
||||||
|
|
||||||
log.info("Preparing to rebalance group {} in state {} with old generation {} (reason: {}).",
|
log.info("Preparing to rebalance group {} in state {} with old generation {} (reason: {}).",
|
||||||
group.groupId(), group.currentState(), group.generationId(), reason);
|
group.groupId(), group.currentState(), group.generationId(), reason);
|
||||||
|
|
|
@ -71,7 +71,6 @@ public class GroupCoordinatorMetrics extends CoordinatorMetrics implements AutoC
|
||||||
public static final String OFFSET_EXPIRED_SENSOR_NAME = "OffsetExpired";
|
public static final String OFFSET_EXPIRED_SENSOR_NAME = "OffsetExpired";
|
||||||
public static final String OFFSET_DELETIONS_SENSOR_NAME = "OffsetDeletions";
|
public static final String OFFSET_DELETIONS_SENSOR_NAME = "OffsetDeletions";
|
||||||
public static final String CLASSIC_GROUP_COMPLETED_REBALANCES_SENSOR_NAME = "CompletedRebalances";
|
public static final String CLASSIC_GROUP_COMPLETED_REBALANCES_SENSOR_NAME = "CompletedRebalances";
|
||||||
public static final String CLASSIC_GROUP_REBALANCES_SENSOR_NAME = "ClassicGroupRebalances";
|
|
||||||
public static final String CONSUMER_GROUP_REBALANCES_SENSOR_NAME = "ConsumerGroupRebalances";
|
public static final String CONSUMER_GROUP_REBALANCES_SENSOR_NAME = "ConsumerGroupRebalances";
|
||||||
|
|
||||||
private final MetricName classicGroupCountMetricName;
|
private final MetricName classicGroupCountMetricName;
|
||||||
|
@ -186,15 +185,6 @@ public class GroupCoordinatorMetrics extends CoordinatorMetrics implements AutoC
|
||||||
METRICS_GROUP,
|
METRICS_GROUP,
|
||||||
"The total number of classic group completed rebalances")));
|
"The total number of classic group completed rebalances")));
|
||||||
|
|
||||||
Sensor classicGroupPreparingRebalancesSensor = metrics.sensor(CLASSIC_GROUP_REBALANCES_SENSOR_NAME);
|
|
||||||
classicGroupPreparingRebalancesSensor.add(new Meter(
|
|
||||||
metrics.metricName("group-rebalance-rate",
|
|
||||||
METRICS_GROUP,
|
|
||||||
"The rate of classic group preparing rebalances"),
|
|
||||||
metrics.metricName("group-rebalance-count",
|
|
||||||
METRICS_GROUP,
|
|
||||||
"The total number of classic group preparing rebalances")));
|
|
||||||
|
|
||||||
Sensor consumerGroupRebalanceSensor = metrics.sensor(CONSUMER_GROUP_REBALANCES_SENSOR_NAME);
|
Sensor consumerGroupRebalanceSensor = metrics.sensor(CONSUMER_GROUP_REBALANCES_SENSOR_NAME);
|
||||||
consumerGroupRebalanceSensor.add(new Meter(
|
consumerGroupRebalanceSensor.add(new Meter(
|
||||||
metrics.metricName("consumer-group-rebalance-rate",
|
metrics.metricName("consumer-group-rebalance-rate",
|
||||||
|
@ -209,7 +199,6 @@ public class GroupCoordinatorMetrics extends CoordinatorMetrics implements AutoC
|
||||||
Utils.mkEntry(OFFSET_EXPIRED_SENSOR_NAME, offsetExpiredSensor),
|
Utils.mkEntry(OFFSET_EXPIRED_SENSOR_NAME, offsetExpiredSensor),
|
||||||
Utils.mkEntry(OFFSET_DELETIONS_SENSOR_NAME, offsetDeletionsSensor),
|
Utils.mkEntry(OFFSET_DELETIONS_SENSOR_NAME, offsetDeletionsSensor),
|
||||||
Utils.mkEntry(CLASSIC_GROUP_COMPLETED_REBALANCES_SENSOR_NAME, classicGroupCompletedRebalancesSensor),
|
Utils.mkEntry(CLASSIC_GROUP_COMPLETED_REBALANCES_SENSOR_NAME, classicGroupCompletedRebalancesSensor),
|
||||||
Utils.mkEntry(CLASSIC_GROUP_REBALANCES_SENSOR_NAME, classicGroupPreparingRebalancesSensor),
|
|
||||||
Utils.mkEntry(CONSUMER_GROUP_REBALANCES_SENSOR_NAME, consumerGroupRebalanceSensor)
|
Utils.mkEntry(CONSUMER_GROUP_REBALANCES_SENSOR_NAME, consumerGroupRebalanceSensor)
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
@ -261,7 +250,6 @@ public class GroupCoordinatorMetrics extends CoordinatorMetrics implements AutoC
|
||||||
OFFSET_EXPIRED_SENSOR_NAME,
|
OFFSET_EXPIRED_SENSOR_NAME,
|
||||||
OFFSET_DELETIONS_SENSOR_NAME,
|
OFFSET_DELETIONS_SENSOR_NAME,
|
||||||
CLASSIC_GROUP_COMPLETED_REBALANCES_SENSOR_NAME,
|
CLASSIC_GROUP_COMPLETED_REBALANCES_SENSOR_NAME,
|
||||||
CLASSIC_GROUP_REBALANCES_SENSOR_NAME,
|
|
||||||
CONSUMER_GROUP_REBALANCES_SENSOR_NAME
|
CONSUMER_GROUP_REBALANCES_SENSOR_NAME
|
||||||
).forEach(metrics::removeSensor);
|
).forEach(metrics::removeSensor);
|
||||||
}
|
}
|
||||||
|
|
|
@ -148,7 +148,6 @@ import static org.apache.kafka.coordinator.group.classic.ClassicGroupState.PREPA
|
||||||
import static org.apache.kafka.coordinator.group.classic.ClassicGroupState.STABLE;
|
import static org.apache.kafka.coordinator.group.classic.ClassicGroupState.STABLE;
|
||||||
import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.CLASSIC_GROUP_COMPLETED_REBALANCES_SENSOR_NAME;
|
import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.CLASSIC_GROUP_COMPLETED_REBALANCES_SENSOR_NAME;
|
||||||
import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.CONSUMER_GROUP_REBALANCES_SENSOR_NAME;
|
import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.CONSUMER_GROUP_REBALANCES_SENSOR_NAME;
|
||||||
import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.CLASSIC_GROUP_REBALANCES_SENSOR_NAME;
|
|
||||||
import static org.junit.jupiter.api.AssertionFailureBuilder.assertionFailure;
|
import static org.junit.jupiter.api.AssertionFailureBuilder.assertionFailure;
|
||||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||||
|
@ -10674,22 +10673,6 @@ public class GroupMetadataManagerTest {
|
||||||
verify(context.metrics).record(CLASSIC_GROUP_COMPLETED_REBALANCES_SENSOR_NAME);
|
verify(context.metrics).record(CLASSIC_GROUP_COMPLETED_REBALANCES_SENSOR_NAME);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testClassicGroupRebalanceSensor() throws Exception {
|
|
||||||
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
|
|
||||||
.build();
|
|
||||||
context.createClassicGroup("group-id");
|
|
||||||
|
|
||||||
context.joinClassicGroupAsDynamicMemberAndCompleteJoin(
|
|
||||||
new JoinGroupRequestBuilder()
|
|
||||||
.withGroupId("group-id")
|
|
||||||
.withMemberId(UNKNOWN_MEMBER_ID)
|
|
||||||
.withDefaultProtocolTypeAndProtocols()
|
|
||||||
.build()
|
|
||||||
);
|
|
||||||
verify(context.metrics).record(CLASSIC_GROUP_REBALANCES_SENSOR_NAME);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testConsumerGroupRebalanceSensor() {
|
public void testConsumerGroupRebalanceSensor() {
|
||||||
String groupId = "fooup";
|
String groupId = "fooup";
|
||||||
|
|
|
@ -35,7 +35,6 @@ import java.util.stream.IntStream;
|
||||||
|
|
||||||
import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.CLASSIC_GROUP_COMPLETED_REBALANCES_SENSOR_NAME;
|
import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.CLASSIC_GROUP_COMPLETED_REBALANCES_SENSOR_NAME;
|
||||||
import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.CONSUMER_GROUP_REBALANCES_SENSOR_NAME;
|
import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.CONSUMER_GROUP_REBALANCES_SENSOR_NAME;
|
||||||
import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.CLASSIC_GROUP_REBALANCES_SENSOR_NAME;
|
|
||||||
import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.METRICS_GROUP;
|
import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.METRICS_GROUP;
|
||||||
import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.OFFSET_COMMITS_SENSOR_NAME;
|
import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.OFFSET_COMMITS_SENSOR_NAME;
|
||||||
import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.OFFSET_EXPIRED_SENSOR_NAME;
|
import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.OFFSET_EXPIRED_SENSOR_NAME;
|
||||||
|
@ -62,8 +61,6 @@ public class GroupCoordinatorMetricsTest {
|
||||||
metrics.metricName("offset-deletion-count", GroupCoordinatorMetrics.METRICS_GROUP),
|
metrics.metricName("offset-deletion-count", GroupCoordinatorMetrics.METRICS_GROUP),
|
||||||
metrics.metricName("group-completed-rebalance-rate", GroupCoordinatorMetrics.METRICS_GROUP),
|
metrics.metricName("group-completed-rebalance-rate", GroupCoordinatorMetrics.METRICS_GROUP),
|
||||||
metrics.metricName("group-completed-rebalance-count", GroupCoordinatorMetrics.METRICS_GROUP),
|
metrics.metricName("group-completed-rebalance-count", GroupCoordinatorMetrics.METRICS_GROUP),
|
||||||
metrics.metricName("group-rebalance-rate", GroupCoordinatorMetrics.METRICS_GROUP),
|
|
||||||
metrics.metricName("group-rebalance-count", GroupCoordinatorMetrics.METRICS_GROUP),
|
|
||||||
metrics.metricName("consumer-group-rebalance-rate", GroupCoordinatorMetrics.METRICS_GROUP),
|
metrics.metricName("consumer-group-rebalance-rate", GroupCoordinatorMetrics.METRICS_GROUP),
|
||||||
metrics.metricName("consumer-group-rebalance-count", GroupCoordinatorMetrics.METRICS_GROUP),
|
metrics.metricName("consumer-group-rebalance-count", GroupCoordinatorMetrics.METRICS_GROUP),
|
||||||
metrics.metricName(
|
metrics.metricName(
|
||||||
|
@ -194,10 +191,6 @@ public class GroupCoordinatorMetricsTest {
|
||||||
assertMetricValue(metrics, metrics.metricName("offset-expiration-rate", GroupCoordinatorMetrics.METRICS_GROUP), 1.0);
|
assertMetricValue(metrics, metrics.metricName("offset-expiration-rate", GroupCoordinatorMetrics.METRICS_GROUP), 1.0);
|
||||||
assertMetricValue(metrics, metrics.metricName("offset-expiration-count", GroupCoordinatorMetrics.METRICS_GROUP), 30);
|
assertMetricValue(metrics, metrics.metricName("offset-expiration-count", GroupCoordinatorMetrics.METRICS_GROUP), 30);
|
||||||
|
|
||||||
shard.record(CLASSIC_GROUP_REBALANCES_SENSOR_NAME, 40);
|
|
||||||
assertMetricValue(metrics, metrics.metricName("group-rebalance-rate", GroupCoordinatorMetrics.METRICS_GROUP), 4.0 / 3.0);
|
|
||||||
assertMetricValue(metrics, metrics.metricName("group-rebalance-count", GroupCoordinatorMetrics.METRICS_GROUP), 40);
|
|
||||||
|
|
||||||
shard.record(CONSUMER_GROUP_REBALANCES_SENSOR_NAME, 50);
|
shard.record(CONSUMER_GROUP_REBALANCES_SENSOR_NAME, 50);
|
||||||
assertMetricValue(metrics, metrics.metricName("consumer-group-rebalance-rate", GroupCoordinatorMetrics.METRICS_GROUP), 5.0 / 3.0);
|
assertMetricValue(metrics, metrics.metricName("consumer-group-rebalance-rate", GroupCoordinatorMetrics.METRICS_GROUP), 5.0 / 3.0);
|
||||||
assertMetricValue(metrics, metrics.metricName("consumer-group-rebalance-count", GroupCoordinatorMetrics.METRICS_GROUP), 50);
|
assertMetricValue(metrics, metrics.metricName("consumer-group-rebalance-count", GroupCoordinatorMetrics.METRICS_GROUP), 50);
|
||||||
|
|
Loading…
Reference in New Issue