From d91a94e7bff7a2ffe94b562ef1108307afce1bc7 Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Fri, 20 Sep 2019 17:50:12 -0700 Subject: [PATCH] KAFKA-8609: Add consumer rebalance metrics (#7347) Adding the following metrics in: 1. AbstractCoordinator (for both consumer and connect) * rebalance-latency-avg * rebalance-latency-max * rebalance-total * rebalance-rate-per-hour * failed-rebalance-total * failed-rebalance-rate-per-hour * last-rebalance-seconds-ago 2. ConsumerCoordinator * partition-revoked-latency-avg * partition-revoked-latency-max * partition-assigned-latency-avg * partition-assigned-latency-max * partition-lost-latency-avg * partition-lost-latency-max Reviewers: Bruno Cadonna , A. Sophie Blee-Goldman , Matthias J. Sax --- .../internals/AbstractCoordinator.java | 138 +++++++++++++----- .../internals/ConsumerCoordinator.java | 45 +++++- .../clients/consumer/internals/Heartbeat.java | 2 +- .../kafka/common/metrics/JmxReporter.java | 1 + .../internals/AbstractCoordinatorTest.java | 93 +++++++++++- .../internals/ConsumerCoordinatorTest.java | 54 +++++++ 6 files changed, 288 insertions(+), 45 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index 4c71a89f3bd..fbd15d8e905 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -39,12 +39,13 @@ import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity; import org.apache.kafka.common.message.LeaveGroupResponseData.MemberResponse; import org.apache.kafka.common.message.SyncGroupRequestData; import org.apache.kafka.common.metrics.Measurable; -import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.metrics.stats.Avg; +import org.apache.kafka.common.metrics.stats.CumulativeCount; import org.apache.kafka.common.metrics.stats.Max; import org.apache.kafka.common.metrics.stats.Meter; +import org.apache.kafka.common.metrics.stats.Rate; import org.apache.kafka.common.metrics.stats.WindowedCount; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; @@ -122,13 +123,15 @@ public abstract class AbstractCoordinator implements Closeable { protected final ConsumerNetworkClient client; protected final Time time; - private HeartbeatThread heartbeatThread = null; + private Node coordinator = null; private boolean rejoinNeeded = true; private boolean needsJoinPrepare = true; private MemberState state = MemberState.UNJOINED; + private HeartbeatThread heartbeatThread = null; private RequestFuture joinFuture = null; - private Node coordinator = null; private Generation generation = Generation.NO_GENERATION; + private long lastRebalanceStartMs = -1L; + private long lastRebalanceEndMs = -1L; private RequestFuture findCoordinatorFuture = null; @@ -440,6 +443,10 @@ public abstract class AbstractCoordinator implements Closeable { disableHeartbeatThread(); state = MemberState.REBALANCING; + // a rebalance can be triggered consecutively if the previous one failed, + // in this case we would not update the start time. + if (lastRebalanceStartMs == -1L) + lastRebalanceStartMs = time.milliseconds(); joinFuture = sendJoinGroupRequest(); joinFuture.addListener(new RequestFutureListener() { @Override @@ -450,6 +457,10 @@ public abstract class AbstractCoordinator implements Closeable { log.info("Successfully joined group with generation {}", generation.generationId); state = MemberState.STABLE; rejoinNeeded = false; + // record rebalance latency + lastRebalanceEndMs = time.milliseconds(); + sensors.successfulRebalanceSensor.record(lastRebalanceEndMs - lastRebalanceStartMs); + lastRebalanceStartMs = -1L; if (heartbeatThread != null) heartbeatThread.enable(); @@ -462,6 +473,7 @@ public abstract class AbstractCoordinator implements Closeable { // after having been woken up, the exception is ignored and we will rejoin synchronized (AbstractCoordinator.this) { state = MemberState.UNJOINED; + sensors.failedRebalanceSensor.record(); } } }); @@ -511,7 +523,7 @@ public abstract class AbstractCoordinator implements Closeable { Errors error = joinResponse.error(); if (error == Errors.NONE) { log.debug("Received successful JoinGroup response: {}", joinResponse); - sensors.joinLatency.record(response.requestLatencyMs()); + sensors.joinSensor.record(response.requestLatencyMs()); synchronized (AbstractCoordinator.this) { if (state != MemberState.REBALANCING) { @@ -641,7 +653,7 @@ public abstract class AbstractCoordinator implements Closeable { RequestFuture future) { Errors error = syncResponse.error(); if (error == Errors.NONE) { - sensors.syncLatency.record(response.requestLatencyMs()); + sensors.syncSensor.record(response.requestLatencyMs()); future.complete(ByteBuffer.wrap(syncResponse.data.assignment())); } else { requestRejoin(); @@ -931,7 +943,7 @@ public abstract class AbstractCoordinator implements Closeable { private class HeartbeatResponseHandler extends CoordinatorResponseHandler { @Override public void handle(HeartbeatResponse heartbeatResponse, RequestFuture future) { - sensors.heartbeatLatency.record(response.requestLatencyMs()); + sensors.heartbeatSensor.record(response.requestLatencyMs()); Errors error = heartbeatResponse.error(); if (error == Errors.NONE) { log.debug("Received successful Heartbeat response"); @@ -1005,44 +1017,98 @@ public abstract class AbstractCoordinator implements Closeable { private class GroupCoordinatorMetrics { public final String metricGrpName; - public final Sensor heartbeatLatency; - public final Sensor joinLatency; - public final Sensor syncLatency; + public final Sensor heartbeatSensor; + public final Sensor joinSensor; + public final Sensor syncSensor; + public final Sensor successfulRebalanceSensor; + public final Sensor failedRebalanceSensor; public GroupCoordinatorMetrics(Metrics metrics, String metricGrpPrefix) { this.metricGrpName = metricGrpPrefix + "-coordinator-metrics"; - this.heartbeatLatency = metrics.sensor("heartbeat-latency"); - this.heartbeatLatency.add(metrics.metricName("heartbeat-response-time-max", - this.metricGrpName, - "The max time taken to receive a response to a heartbeat request"), new Max()); - this.heartbeatLatency.add(createMeter(metrics, metricGrpName, "heartbeat", "heartbeats")); + this.heartbeatSensor = metrics.sensor("heartbeat-latency"); + this.heartbeatSensor.add(metrics.metricName("heartbeat-response-time-max", + this.metricGrpName, + "The max time taken to receive a response to a heartbeat request"), new Max()); + this.heartbeatSensor.add(createMeter(metrics, metricGrpName, "heartbeat", "heartbeats")); - this.joinLatency = metrics.sensor("join-latency"); - this.joinLatency.add(metrics.metricName("join-time-avg", - this.metricGrpName, - "The average time taken for a group rejoin"), new Avg()); - this.joinLatency.add(metrics.metricName("join-time-max", - this.metricGrpName, - "The max time taken for a group rejoin"), new Max()); - this.joinLatency.add(createMeter(metrics, metricGrpName, "join", "group joins")); + this.joinSensor = metrics.sensor("join-latency"); + this.joinSensor.add(metrics.metricName("join-time-avg", + this.metricGrpName, + "The average time taken for a group rejoin"), new Avg()); + this.joinSensor.add(metrics.metricName("join-time-max", + this.metricGrpName, + "The max time taken for a group rejoin"), new Max()); + this.joinSensor.add(createMeter(metrics, metricGrpName, "join", "group joins")); + this.syncSensor = metrics.sensor("sync-latency"); + this.syncSensor.add(metrics.metricName("sync-time-avg", + this.metricGrpName, + "The average time taken for a group sync"), new Avg()); + this.syncSensor.add(metrics.metricName("sync-time-max", + this.metricGrpName, + "The max time taken for a group sync"), new Max()); + this.syncSensor.add(createMeter(metrics, metricGrpName, "sync", "group syncs")); - this.syncLatency = metrics.sensor("sync-latency"); - this.syncLatency.add(metrics.metricName("sync-time-avg", + this.successfulRebalanceSensor = metrics.sensor("rebalance-latency"); + this.successfulRebalanceSensor.add(metrics.metricName("rebalance-latency-avg", + this.metricGrpName, + "The average time taken for a group to complete a successful rebalance, which may be composed of " + + "several failed re-trials until it succeeded"), new Avg()); + this.successfulRebalanceSensor.add(metrics.metricName("rebalance-latency-max", + this.metricGrpName, + "The max time taken for a group to complete a successful rebalance, which may be composed of " + + "several failed re-trials until it succeeded"), new Max()); + this.successfulRebalanceSensor.add( + metrics.metricName("rebalance-total", this.metricGrpName, - "The average time taken for a group sync"), new Avg()); - this.syncLatency.add(metrics.metricName("sync-time-max", + "The total number of successful rebalance events, each event is composed of " + + "several failed re-trials until it succeeded"), + new CumulativeCount() + ); + this.successfulRebalanceSensor.add( + metrics.metricName( + "rebalance-rate-per-hour", this.metricGrpName, - "The max time taken for a group sync"), new Max()); - this.syncLatency.add(createMeter(metrics, metricGrpName, "sync", "group syncs")); + "The number of successful rebalance events per hour, each event is composed of " + + "several failed re-trials until it succeeded"), + new Rate(TimeUnit.HOURS, new WindowedCount()) + ); - Measurable lastHeartbeat = - new Measurable() { - public double measure(MetricConfig config, long now) { - return TimeUnit.SECONDS.convert(now - heartbeat.lastHeartbeatSend(), TimeUnit.MILLISECONDS); - } - }; + this.failedRebalanceSensor = metrics.sensor("failed-rebalance"); + this.failedRebalanceSensor.add( + metrics.metricName("failed-rebalance-total", + this.metricGrpName, + "The total number of failed rebalance events"), + new CumulativeCount() + ); + this.failedRebalanceSensor.add( + metrics.metricName( + "failed-rebalance-rate-per-hour", + this.metricGrpName, + "The number of failed rebalance events per hour"), + new Rate(TimeUnit.HOURS, new WindowedCount()) + ); + + Measurable lastRebalance = (config, now) -> { + if (lastRebalanceEndMs == -1L) + // if no rebalance is ever triggered, we just return -1. + return -1d; + else + return TimeUnit.SECONDS.convert(now - lastRebalanceEndMs, TimeUnit.MILLISECONDS); + }; + metrics.addMetric(metrics.metricName("last-rebalance-seconds-ago", + this.metricGrpName, + "The number of seconds since the last successful rebalance event"), + lastRebalance); + + Measurable lastHeartbeat = (config, now) -> { + if (heartbeat.lastHeartbeatSend() == 0L) + // if no heartbeat is ever triggered, just return -1. + return -1d; + else + return TimeUnit.SECONDS.convert(now - heartbeat.lastHeartbeatSend(), TimeUnit.MILLISECONDS); + }; metrics.addMetric(metrics.metricName("last-heartbeat-seconds-ago", this.metricGrpName, "The number of seconds since the last coordinator heartbeat was sent"), @@ -1251,4 +1317,8 @@ public abstract class AbstractCoordinator implements Closeable { public Heartbeat heartbeat() { return heartbeat; } + + public void setLastRebalanceTime(final long timestamp) { + lastRebalanceEndMs = timestamp; + } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index a3aaface782..f361735a6e0 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -268,7 +268,9 @@ public final class ConsumerCoordinator extends AbstractCoordinator { ConsumerRebalanceListener listener = subscriptions.rebalanceListener(); try { + final long startMs = time.milliseconds(); listener.onPartitionsAssigned(assignedPartitions); + sensors.assignCallbackSensor.record(time.milliseconds() - startMs); } catch (WakeupException | InterruptException e) { throw e; } catch (Exception e) { @@ -285,7 +287,9 @@ public final class ConsumerCoordinator extends AbstractCoordinator { ConsumerRebalanceListener listener = subscriptions.rebalanceListener(); try { + final long startMs = time.milliseconds(); listener.onPartitionsRevoked(revokedPartitions); + sensors.revokeCallbackSensor.record(time.milliseconds() - startMs); } catch (WakeupException | InterruptException e) { throw e; } catch (Exception e) { @@ -302,7 +306,9 @@ public final class ConsumerCoordinator extends AbstractCoordinator { ConsumerRebalanceListener listener = subscriptions.rebalanceListener(); try { + final long startMs = time.milliseconds(); listener.onPartitionsLost(lostPartitions); + sensors.loseCallbackSensor.record(time.milliseconds() - startMs); } catch (WakeupException | InterruptException e) { throw e; } catch (Exception e) { @@ -1078,7 +1084,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator { @Override public void handle(OffsetCommitResponse commitResponse, RequestFuture future) { - sensors.commitLatency.record(response.requestLatencyMs()); + sensors.commitSensor.record(response.requestLatencyMs()); Set unauthorizedTopics = new HashSet<>(); for (OffsetCommitResponseData.OffsetCommitResponseTopic topic : commitResponse.data().topics()) { @@ -1241,19 +1247,46 @@ public final class ConsumerCoordinator extends AbstractCoordinator { private class ConsumerCoordinatorMetrics { private final String metricGrpName; - private final Sensor commitLatency; + private final Sensor commitSensor; + private final Sensor revokeCallbackSensor; + private final Sensor assignCallbackSensor; + private final Sensor loseCallbackSensor; private ConsumerCoordinatorMetrics(Metrics metrics, String metricGrpPrefix) { this.metricGrpName = metricGrpPrefix + "-coordinator-metrics"; - this.commitLatency = metrics.sensor("commit-latency"); - this.commitLatency.add(metrics.metricName("commit-latency-avg", + this.commitSensor = metrics.sensor("commit-latency"); + this.commitSensor.add(metrics.metricName("commit-latency-avg", this.metricGrpName, "The average time taken for a commit request"), new Avg()); - this.commitLatency.add(metrics.metricName("commit-latency-max", + this.commitSensor.add(metrics.metricName("commit-latency-max", this.metricGrpName, "The max time taken for a commit request"), new Max()); - this.commitLatency.add(createMeter(metrics, metricGrpName, "commit", "commit calls")); + this.commitSensor.add(createMeter(metrics, metricGrpName, "commit", "commit calls")); + + this.revokeCallbackSensor = metrics.sensor("partition-revoked-latency"); + this.revokeCallbackSensor.add(metrics.metricName("partition-revoked-latency-avg", + this.metricGrpName, + "The average time taken for a partition-revoked rebalance listener callback"), new Avg()); + this.revokeCallbackSensor.add(metrics.metricName("partition-revoked-latency-max", + this.metricGrpName, + "The max time taken for a partition-revoked rebalance listener callback"), new Max()); + + this.assignCallbackSensor = metrics.sensor("partition-assigned-latency"); + this.assignCallbackSensor.add(metrics.metricName("partition-assigned-latency-avg", + this.metricGrpName, + "The average time taken for a partition-assigned rebalance listener callback"), new Avg()); + this.assignCallbackSensor.add(metrics.metricName("partition-assigned-latency-max", + this.metricGrpName, + "The max time taken for a partition-assigned rebalance listener callback"), new Max()); + + this.loseCallbackSensor = metrics.sensor("partition-lost-latency"); + this.loseCallbackSensor.add(metrics.metricName("partition-lost-latency-avg", + this.metricGrpName, + "The average time taken for a partition-lost rebalance listener callback"), new Avg()); + this.loseCallbackSensor.add(metrics.metricName("partition-lost-latency-max", + this.metricGrpName, + "The max time taken for a partition-lost rebalance listener callback"), new Max()); Measurable numParts = (config, now) -> subscriptions.numAssignedPartitions(); metrics.addMetric(metrics.metricName("assigned-partitions", diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java index 3bf8c92c2ed..4d19ef4a014 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java @@ -31,7 +31,7 @@ public final class Heartbeat { private final Timer sessionTimer; private final Timer pollTimer; - private volatile long lastHeartbeatSend; + private volatile long lastHeartbeatSend = 0L; public Heartbeat(GroupRebalanceConfig config, Time time) { diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java b/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java index 8d7bdbe85a0..3f852cf1a53 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java @@ -78,6 +78,7 @@ public class JmxReporter implements MetricsReporter { public boolean containsMbean(String mbeanName) { return mbeans.containsKey(mbeanName); } + @Override public void metricChange(KafkaMetric metric) { synchronized (LOCK) { diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java index e0264b3f359..1936fcc3c31 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java @@ -32,6 +32,7 @@ import org.apache.kafka.common.message.JoinGroupResponseData; import org.apache.kafka.common.message.LeaveGroupResponseData; import org.apache.kafka.common.message.LeaveGroupResponseData.MemberResponse; import org.apache.kafka.common.message.SyncGroupResponseData; +import org.apache.kafka.common.metrics.KafkaMetric; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; @@ -88,12 +89,13 @@ public class AbstractCoordinatorTest { private static final String GROUP_ID = "dummy-group"; private static final String METRIC_GROUP_PREFIX = "consumer"; - private MockClient mockClient; - private MockTime mockTime; private Node node; + private Metrics metrics; + private MockTime mockTime; private Node coordinatorNode; - private ConsumerNetworkClient consumerClient; + private MockClient mockClient; private DummyCoordinator coordinator; + private ConsumerNetworkClient consumerClient; private final String memberId = "memberId"; private final String leaderId = "leaderId"; @@ -124,7 +126,7 @@ public class AbstractCoordinatorTest { retryBackoffMs, REQUEST_TIMEOUT_MS, HEARTBEAT_INTERVAL_MS); - Metrics metrics = new Metrics(); + metrics = new Metrics(mockTime); mockClient.updateMetadata(TestUtils.metadataUpdateWith(1, emptyMap())); this.node = metadata.fetch().nodes().get(0); @@ -143,6 +145,89 @@ public class AbstractCoordinatorTest { mockTime); } + @Test + public void testMetrics() { + setupCoordinator(); + + assertNotNull(getMetric("heartbeat-response-time-max")); + assertNotNull(getMetric("heartbeat-rate")); + assertNotNull(getMetric("heartbeat-total")); + assertNotNull(getMetric("last-heartbeat-seconds-ago")); + assertNotNull(getMetric("join-time-avg")); + assertNotNull(getMetric("join-time-max")); + assertNotNull(getMetric("join-rate")); + assertNotNull(getMetric("join-total")); + assertNotNull(getMetric("sync-time-avg")); + assertNotNull(getMetric("sync-time-max")); + assertNotNull(getMetric("sync-rate")); + assertNotNull(getMetric("sync-total")); + assertNotNull(getMetric("rebalance-latency-avg")); + assertNotNull(getMetric("rebalance-latency-max")); + assertNotNull(getMetric("rebalance-rate-per-hour")); + assertNotNull(getMetric("rebalance-total")); + assertNotNull(getMetric("last-rebalance-seconds-ago")); + assertNotNull(getMetric("failed-rebalance-rate-per-hour")); + assertNotNull(getMetric("failed-rebalance-total")); + + metrics.sensor("heartbeat-latency").record(1.0d); + metrics.sensor("heartbeat-latency").record(6.0d); + metrics.sensor("heartbeat-latency").record(2.0d); + + assertEquals(6.0d, getMetric("heartbeat-response-time-max").metricValue()); + assertEquals(0.1d, getMetric("heartbeat-rate").metricValue()); + assertEquals(3.0d, getMetric("heartbeat-total").metricValue()); + + assertEquals(-1.0d, getMetric("last-heartbeat-seconds-ago").metricValue()); + coordinator.heartbeat().sentHeartbeat(mockTime.milliseconds()); + assertEquals(0.0d, getMetric("last-heartbeat-seconds-ago").metricValue()); + mockTime.sleep(10 * 1000L); + assertEquals(10.0d, getMetric("last-heartbeat-seconds-ago").metricValue()); + + metrics.sensor("join-latency").record(1.0d); + metrics.sensor("join-latency").record(6.0d); + metrics.sensor("join-latency").record(2.0d); + + assertEquals(3.0d, getMetric("join-time-avg").metricValue()); + assertEquals(6.0d, getMetric("join-time-max").metricValue()); + assertEquals(0.1d, getMetric("join-rate").metricValue()); + assertEquals(3.0d, getMetric("join-total").metricValue()); + + metrics.sensor("sync-latency").record(1.0d); + metrics.sensor("sync-latency").record(6.0d); + metrics.sensor("sync-latency").record(2.0d); + + assertEquals(3.0d, getMetric("sync-time-avg").metricValue()); + assertEquals(6.0d, getMetric("sync-time-max").metricValue()); + assertEquals(0.1d, getMetric("sync-rate").metricValue()); + assertEquals(3.0d, getMetric("sync-total").metricValue()); + + metrics.sensor("rebalance-latency").record(1.0d); + metrics.sensor("rebalance-latency").record(6.0d); + metrics.sensor("rebalance-latency").record(2.0d); + + assertEquals(3.0d, getMetric("rebalance-latency-avg").metricValue()); + assertEquals(6.0d, getMetric("rebalance-latency-max").metricValue()); + assertEquals(360.0d, getMetric("rebalance-rate-per-hour").metricValue()); + assertEquals(3.0d, getMetric("rebalance-total").metricValue()); + + metrics.sensor("failed-rebalance").record(1.0d); + metrics.sensor("failed-rebalance").record(6.0d); + metrics.sensor("failed-rebalance").record(2.0d); + + assertEquals(360.0d, getMetric("failed-rebalance-rate-per-hour").metricValue()); + assertEquals(3.0d, getMetric("failed-rebalance-total").metricValue()); + + assertEquals(-1.0d, getMetric("last-rebalance-seconds-ago").metricValue()); + coordinator.setLastRebalanceTime(mockTime.milliseconds()); + assertEquals(0.0d, getMetric("last-rebalance-seconds-ago").metricValue()); + mockTime.sleep(10 * 1000L); + assertEquals(10.0d, getMetric("last-rebalance-seconds-ago").metricValue()); + } + + private KafkaMetric getMetric(final String name) { + return metrics.metrics().get(metrics.metricName(name, "consumer-coordinator-metrics")); + } + @Test public void testCoordinatorDiscoveryBackoff() { setupCoordinator(); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java index 91d3529ea10..068f7bd8809 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java @@ -49,7 +49,9 @@ import org.apache.kafka.common.message.LeaveGroupResponseData; import org.apache.kafka.common.message.OffsetCommitRequestData; import org.apache.kafka.common.message.OffsetCommitResponseData; import org.apache.kafka.common.message.SyncGroupResponseData; +import org.apache.kafka.common.metrics.KafkaMetric; import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.record.RecordBatch; import org.apache.kafka.common.requests.AbstractRequest; @@ -200,6 +202,58 @@ public class ConsumerCoordinatorTest { this.coordinator.close(time.timer(0)); } + @Test + public void testMetrics() { + assertNotNull(getMetric("commit-latency-avg")); + assertNotNull(getMetric("commit-latency-max")); + assertNotNull(getMetric("commit-rate")); + assertNotNull(getMetric("commit-total")); + assertNotNull(getMetric("partition-revoked-latency-avg")); + assertNotNull(getMetric("partition-revoked-latency-max")); + assertNotNull(getMetric("partition-assigned-latency-avg")); + assertNotNull(getMetric("partition-assigned-latency-max")); + assertNotNull(getMetric("partition-lost-latency-avg")); + assertNotNull(getMetric("partition-lost-latency-max")); + assertNotNull(getMetric("assigned-partitions")); + + metrics.sensor("commit-latency").record(1.0d); + metrics.sensor("commit-latency").record(6.0d); + metrics.sensor("commit-latency").record(2.0d); + + assertEquals(3.0d, getMetric("commit-latency-avg").metricValue()); + assertEquals(6.0d, getMetric("commit-latency-max").metricValue()); + assertEquals(0.1d, getMetric("commit-rate").metricValue()); + assertEquals(3.0d, getMetric("commit-total").metricValue()); + + metrics.sensor("partition-revoked-latency").record(1.0d); + metrics.sensor("partition-revoked-latency").record(2.0d); + metrics.sensor("partition-assigned-latency").record(1.0d); + metrics.sensor("partition-assigned-latency").record(2.0d); + metrics.sensor("partition-lost-latency").record(1.0d); + metrics.sensor("partition-lost-latency").record(2.0d); + + assertEquals(1.5d, getMetric("partition-revoked-latency-avg").metricValue()); + assertEquals(2.0d, getMetric("partition-revoked-latency-max").metricValue()); + assertEquals(1.5d, getMetric("partition-assigned-latency-avg").metricValue()); + assertEquals(2.0d, getMetric("partition-assigned-latency-max").metricValue()); + assertEquals(1.5d, getMetric("partition-lost-latency-avg").metricValue()); + assertEquals(2.0d, getMetric("partition-lost-latency-max").metricValue()); + + assertEquals(0.0d, getMetric("assigned-partitions").metricValue()); + subscriptions.assignFromUser(Collections.singleton(t1p)); + assertEquals(1.0d, getMetric("assigned-partitions").metricValue()); + subscriptions.assignFromUser(Utils.mkSet(t1p, t2p)); + assertEquals(2.0d, getMetric("assigned-partitions").metricValue()); + } + + private KafkaMetric getMetric(final String name) { + return metrics.metrics().get(metrics.metricName(name, "consumer" + groupId + "-coordinator-metrics")); + } + + private Sensor getSensor(final String name) { + return metrics.sensor(name); + } + @Test public void testSelectRebalanceProtcol() { List assignors = new ArrayList<>();