KAFKA-8609: Add consumer rebalance metrics (#7347)

Adding the following metrics in:

1. AbstractCoordinator (for both consumer and connect)
* rebalance-latency-avg
* rebalance-latency-max
* rebalance-total
* rebalance-rate-per-hour
* failed-rebalance-total
* failed-rebalance-rate-per-hour
* last-rebalance-seconds-ago

2. ConsumerCoordinator
* partition-revoked-latency-avg
* partition-revoked-latency-max
* partition-assigned-latency-avg
* partition-assigned-latency-max
* partition-lost-latency-avg
* partition-lost-latency-max

Reviewers: Bruno Cadonna <bruno@confluent.io>, A. Sophie Blee-Goldman <sophie@confluent.io>, Matthias J. Sax <matthias@confluent.io>
This commit is contained in:
Guozhang Wang 2019-09-20 17:50:12 -07:00 committed by GitHub
parent 8001aff304
commit d91a94e7bf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 288 additions and 45 deletions

View File

@ -39,12 +39,13 @@ import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity;
import org.apache.kafka.common.message.LeaveGroupResponseData.MemberResponse; import org.apache.kafka.common.message.LeaveGroupResponseData.MemberResponse;
import org.apache.kafka.common.message.SyncGroupRequestData; import org.apache.kafka.common.message.SyncGroupRequestData;
import org.apache.kafka.common.metrics.Measurable; import org.apache.kafka.common.metrics.Measurable;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg; import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.CumulativeCount;
import org.apache.kafka.common.metrics.stats.Max; import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Meter; import org.apache.kafka.common.metrics.stats.Meter;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.metrics.stats.WindowedCount; import org.apache.kafka.common.metrics.stats.WindowedCount;
import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.Errors;
@ -122,13 +123,15 @@ public abstract class AbstractCoordinator implements Closeable {
protected final ConsumerNetworkClient client; protected final ConsumerNetworkClient client;
protected final Time time; protected final Time time;
private HeartbeatThread heartbeatThread = null; private Node coordinator = null;
private boolean rejoinNeeded = true; private boolean rejoinNeeded = true;
private boolean needsJoinPrepare = true; private boolean needsJoinPrepare = true;
private MemberState state = MemberState.UNJOINED; private MemberState state = MemberState.UNJOINED;
private HeartbeatThread heartbeatThread = null;
private RequestFuture<ByteBuffer> joinFuture = null; private RequestFuture<ByteBuffer> joinFuture = null;
private Node coordinator = null;
private Generation generation = Generation.NO_GENERATION; private Generation generation = Generation.NO_GENERATION;
private long lastRebalanceStartMs = -1L;
private long lastRebalanceEndMs = -1L;
private RequestFuture<Void> findCoordinatorFuture = null; private RequestFuture<Void> findCoordinatorFuture = null;
@ -440,6 +443,10 @@ public abstract class AbstractCoordinator implements Closeable {
disableHeartbeatThread(); disableHeartbeatThread();
state = MemberState.REBALANCING; state = MemberState.REBALANCING;
// a rebalance can be triggered consecutively if the previous one failed,
// in this case we would not update the start time.
if (lastRebalanceStartMs == -1L)
lastRebalanceStartMs = time.milliseconds();
joinFuture = sendJoinGroupRequest(); joinFuture = sendJoinGroupRequest();
joinFuture.addListener(new RequestFutureListener<ByteBuffer>() { joinFuture.addListener(new RequestFutureListener<ByteBuffer>() {
@Override @Override
@ -450,6 +457,10 @@ public abstract class AbstractCoordinator implements Closeable {
log.info("Successfully joined group with generation {}", generation.generationId); log.info("Successfully joined group with generation {}", generation.generationId);
state = MemberState.STABLE; state = MemberState.STABLE;
rejoinNeeded = false; rejoinNeeded = false;
// record rebalance latency
lastRebalanceEndMs = time.milliseconds();
sensors.successfulRebalanceSensor.record(lastRebalanceEndMs - lastRebalanceStartMs);
lastRebalanceStartMs = -1L;
if (heartbeatThread != null) if (heartbeatThread != null)
heartbeatThread.enable(); heartbeatThread.enable();
@ -462,6 +473,7 @@ public abstract class AbstractCoordinator implements Closeable {
// after having been woken up, the exception is ignored and we will rejoin // after having been woken up, the exception is ignored and we will rejoin
synchronized (AbstractCoordinator.this) { synchronized (AbstractCoordinator.this) {
state = MemberState.UNJOINED; state = MemberState.UNJOINED;
sensors.failedRebalanceSensor.record();
} }
} }
}); });
@ -511,7 +523,7 @@ public abstract class AbstractCoordinator implements Closeable {
Errors error = joinResponse.error(); Errors error = joinResponse.error();
if (error == Errors.NONE) { if (error == Errors.NONE) {
log.debug("Received successful JoinGroup response: {}", joinResponse); log.debug("Received successful JoinGroup response: {}", joinResponse);
sensors.joinLatency.record(response.requestLatencyMs()); sensors.joinSensor.record(response.requestLatencyMs());
synchronized (AbstractCoordinator.this) { synchronized (AbstractCoordinator.this) {
if (state != MemberState.REBALANCING) { if (state != MemberState.REBALANCING) {
@ -641,7 +653,7 @@ public abstract class AbstractCoordinator implements Closeable {
RequestFuture<ByteBuffer> future) { RequestFuture<ByteBuffer> future) {
Errors error = syncResponse.error(); Errors error = syncResponse.error();
if (error == Errors.NONE) { if (error == Errors.NONE) {
sensors.syncLatency.record(response.requestLatencyMs()); sensors.syncSensor.record(response.requestLatencyMs());
future.complete(ByteBuffer.wrap(syncResponse.data.assignment())); future.complete(ByteBuffer.wrap(syncResponse.data.assignment()));
} else { } else {
requestRejoin(); requestRejoin();
@ -931,7 +943,7 @@ public abstract class AbstractCoordinator implements Closeable {
private class HeartbeatResponseHandler extends CoordinatorResponseHandler<HeartbeatResponse, Void> { private class HeartbeatResponseHandler extends CoordinatorResponseHandler<HeartbeatResponse, Void> {
@Override @Override
public void handle(HeartbeatResponse heartbeatResponse, RequestFuture<Void> future) { public void handle(HeartbeatResponse heartbeatResponse, RequestFuture<Void> future) {
sensors.heartbeatLatency.record(response.requestLatencyMs()); sensors.heartbeatSensor.record(response.requestLatencyMs());
Errors error = heartbeatResponse.error(); Errors error = heartbeatResponse.error();
if (error == Errors.NONE) { if (error == Errors.NONE) {
log.debug("Received successful Heartbeat response"); log.debug("Received successful Heartbeat response");
@ -1005,44 +1017,98 @@ public abstract class AbstractCoordinator implements Closeable {
private class GroupCoordinatorMetrics { private class GroupCoordinatorMetrics {
public final String metricGrpName; public final String metricGrpName;
public final Sensor heartbeatLatency; public final Sensor heartbeatSensor;
public final Sensor joinLatency; public final Sensor joinSensor;
public final Sensor syncLatency; public final Sensor syncSensor;
public final Sensor successfulRebalanceSensor;
public final Sensor failedRebalanceSensor;
public GroupCoordinatorMetrics(Metrics metrics, String metricGrpPrefix) { public GroupCoordinatorMetrics(Metrics metrics, String metricGrpPrefix) {
this.metricGrpName = metricGrpPrefix + "-coordinator-metrics"; this.metricGrpName = metricGrpPrefix + "-coordinator-metrics";
this.heartbeatLatency = metrics.sensor("heartbeat-latency"); this.heartbeatSensor = metrics.sensor("heartbeat-latency");
this.heartbeatLatency.add(metrics.metricName("heartbeat-response-time-max", this.heartbeatSensor.add(metrics.metricName("heartbeat-response-time-max",
this.metricGrpName, this.metricGrpName,
"The max time taken to receive a response to a heartbeat request"), new Max()); "The max time taken to receive a response to a heartbeat request"), new Max());
this.heartbeatLatency.add(createMeter(metrics, metricGrpName, "heartbeat", "heartbeats")); this.heartbeatSensor.add(createMeter(metrics, metricGrpName, "heartbeat", "heartbeats"));
this.joinLatency = metrics.sensor("join-latency"); this.joinSensor = metrics.sensor("join-latency");
this.joinLatency.add(metrics.metricName("join-time-avg", this.joinSensor.add(metrics.metricName("join-time-avg",
this.metricGrpName, this.metricGrpName,
"The average time taken for a group rejoin"), new Avg()); "The average time taken for a group rejoin"), new Avg());
this.joinLatency.add(metrics.metricName("join-time-max", this.joinSensor.add(metrics.metricName("join-time-max",
this.metricGrpName, this.metricGrpName,
"The max time taken for a group rejoin"), new Max()); "The max time taken for a group rejoin"), new Max());
this.joinLatency.add(createMeter(metrics, metricGrpName, "join", "group joins")); this.joinSensor.add(createMeter(metrics, metricGrpName, "join", "group joins"));
this.syncSensor = metrics.sensor("sync-latency");
this.syncSensor.add(metrics.metricName("sync-time-avg",
this.metricGrpName,
"The average time taken for a group sync"), new Avg());
this.syncSensor.add(metrics.metricName("sync-time-max",
this.metricGrpName,
"The max time taken for a group sync"), new Max());
this.syncSensor.add(createMeter(metrics, metricGrpName, "sync", "group syncs"));
this.syncLatency = metrics.sensor("sync-latency"); this.successfulRebalanceSensor = metrics.sensor("rebalance-latency");
this.syncLatency.add(metrics.metricName("sync-time-avg", this.successfulRebalanceSensor.add(metrics.metricName("rebalance-latency-avg",
this.metricGrpName,
"The average time taken for a group to complete a successful rebalance, which may be composed of " +
"several failed re-trials until it succeeded"), new Avg());
this.successfulRebalanceSensor.add(metrics.metricName("rebalance-latency-max",
this.metricGrpName,
"The max time taken for a group to complete a successful rebalance, which may be composed of " +
"several failed re-trials until it succeeded"), new Max());
this.successfulRebalanceSensor.add(
metrics.metricName("rebalance-total",
this.metricGrpName, this.metricGrpName,
"The average time taken for a group sync"), new Avg()); "The total number of successful rebalance events, each event is composed of " +
this.syncLatency.add(metrics.metricName("sync-time-max", "several failed re-trials until it succeeded"),
new CumulativeCount()
);
this.successfulRebalanceSensor.add(
metrics.metricName(
"rebalance-rate-per-hour",
this.metricGrpName, this.metricGrpName,
"The max time taken for a group sync"), new Max()); "The number of successful rebalance events per hour, each event is composed of " +
this.syncLatency.add(createMeter(metrics, metricGrpName, "sync", "group syncs")); "several failed re-trials until it succeeded"),
new Rate(TimeUnit.HOURS, new WindowedCount())
);
Measurable lastHeartbeat = this.failedRebalanceSensor = metrics.sensor("failed-rebalance");
new Measurable() { this.failedRebalanceSensor.add(
public double measure(MetricConfig config, long now) { metrics.metricName("failed-rebalance-total",
return TimeUnit.SECONDS.convert(now - heartbeat.lastHeartbeatSend(), TimeUnit.MILLISECONDS); this.metricGrpName,
} "The total number of failed rebalance events"),
}; new CumulativeCount()
);
this.failedRebalanceSensor.add(
metrics.metricName(
"failed-rebalance-rate-per-hour",
this.metricGrpName,
"The number of failed rebalance events per hour"),
new Rate(TimeUnit.HOURS, new WindowedCount())
);
Measurable lastRebalance = (config, now) -> {
if (lastRebalanceEndMs == -1L)
// if no rebalance is ever triggered, we just return -1.
return -1d;
else
return TimeUnit.SECONDS.convert(now - lastRebalanceEndMs, TimeUnit.MILLISECONDS);
};
metrics.addMetric(metrics.metricName("last-rebalance-seconds-ago",
this.metricGrpName,
"The number of seconds since the last successful rebalance event"),
lastRebalance);
Measurable lastHeartbeat = (config, now) -> {
if (heartbeat.lastHeartbeatSend() == 0L)
// if no heartbeat is ever triggered, just return -1.
return -1d;
else
return TimeUnit.SECONDS.convert(now - heartbeat.lastHeartbeatSend(), TimeUnit.MILLISECONDS);
};
metrics.addMetric(metrics.metricName("last-heartbeat-seconds-ago", metrics.addMetric(metrics.metricName("last-heartbeat-seconds-ago",
this.metricGrpName, this.metricGrpName,
"The number of seconds since the last coordinator heartbeat was sent"), "The number of seconds since the last coordinator heartbeat was sent"),
@ -1251,4 +1317,8 @@ public abstract class AbstractCoordinator implements Closeable {
public Heartbeat heartbeat() { public Heartbeat heartbeat() {
return heartbeat; return heartbeat;
} }
public void setLastRebalanceTime(final long timestamp) {
lastRebalanceEndMs = timestamp;
}
} }

View File

@ -268,7 +268,9 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
ConsumerRebalanceListener listener = subscriptions.rebalanceListener(); ConsumerRebalanceListener listener = subscriptions.rebalanceListener();
try { try {
final long startMs = time.milliseconds();
listener.onPartitionsAssigned(assignedPartitions); listener.onPartitionsAssigned(assignedPartitions);
sensors.assignCallbackSensor.record(time.milliseconds() - startMs);
} catch (WakeupException | InterruptException e) { } catch (WakeupException | InterruptException e) {
throw e; throw e;
} catch (Exception e) { } catch (Exception e) {
@ -285,7 +287,9 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
ConsumerRebalanceListener listener = subscriptions.rebalanceListener(); ConsumerRebalanceListener listener = subscriptions.rebalanceListener();
try { try {
final long startMs = time.milliseconds();
listener.onPartitionsRevoked(revokedPartitions); listener.onPartitionsRevoked(revokedPartitions);
sensors.revokeCallbackSensor.record(time.milliseconds() - startMs);
} catch (WakeupException | InterruptException e) { } catch (WakeupException | InterruptException e) {
throw e; throw e;
} catch (Exception e) { } catch (Exception e) {
@ -302,7 +306,9 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
ConsumerRebalanceListener listener = subscriptions.rebalanceListener(); ConsumerRebalanceListener listener = subscriptions.rebalanceListener();
try { try {
final long startMs = time.milliseconds();
listener.onPartitionsLost(lostPartitions); listener.onPartitionsLost(lostPartitions);
sensors.loseCallbackSensor.record(time.milliseconds() - startMs);
} catch (WakeupException | InterruptException e) { } catch (WakeupException | InterruptException e) {
throw e; throw e;
} catch (Exception e) { } catch (Exception e) {
@ -1078,7 +1084,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
@Override @Override
public void handle(OffsetCommitResponse commitResponse, RequestFuture<Void> future) { public void handle(OffsetCommitResponse commitResponse, RequestFuture<Void> future) {
sensors.commitLatency.record(response.requestLatencyMs()); sensors.commitSensor.record(response.requestLatencyMs());
Set<String> unauthorizedTopics = new HashSet<>(); Set<String> unauthorizedTopics = new HashSet<>();
for (OffsetCommitResponseData.OffsetCommitResponseTopic topic : commitResponse.data().topics()) { for (OffsetCommitResponseData.OffsetCommitResponseTopic topic : commitResponse.data().topics()) {
@ -1241,19 +1247,46 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
private class ConsumerCoordinatorMetrics { private class ConsumerCoordinatorMetrics {
private final String metricGrpName; private final String metricGrpName;
private final Sensor commitLatency; private final Sensor commitSensor;
private final Sensor revokeCallbackSensor;
private final Sensor assignCallbackSensor;
private final Sensor loseCallbackSensor;
private ConsumerCoordinatorMetrics(Metrics metrics, String metricGrpPrefix) { private ConsumerCoordinatorMetrics(Metrics metrics, String metricGrpPrefix) {
this.metricGrpName = metricGrpPrefix + "-coordinator-metrics"; this.metricGrpName = metricGrpPrefix + "-coordinator-metrics";
this.commitLatency = metrics.sensor("commit-latency"); this.commitSensor = metrics.sensor("commit-latency");
this.commitLatency.add(metrics.metricName("commit-latency-avg", this.commitSensor.add(metrics.metricName("commit-latency-avg",
this.metricGrpName, this.metricGrpName,
"The average time taken for a commit request"), new Avg()); "The average time taken for a commit request"), new Avg());
this.commitLatency.add(metrics.metricName("commit-latency-max", this.commitSensor.add(metrics.metricName("commit-latency-max",
this.metricGrpName, this.metricGrpName,
"The max time taken for a commit request"), new Max()); "The max time taken for a commit request"), new Max());
this.commitLatency.add(createMeter(metrics, metricGrpName, "commit", "commit calls")); this.commitSensor.add(createMeter(metrics, metricGrpName, "commit", "commit calls"));
this.revokeCallbackSensor = metrics.sensor("partition-revoked-latency");
this.revokeCallbackSensor.add(metrics.metricName("partition-revoked-latency-avg",
this.metricGrpName,
"The average time taken for a partition-revoked rebalance listener callback"), new Avg());
this.revokeCallbackSensor.add(metrics.metricName("partition-revoked-latency-max",
this.metricGrpName,
"The max time taken for a partition-revoked rebalance listener callback"), new Max());
this.assignCallbackSensor = metrics.sensor("partition-assigned-latency");
this.assignCallbackSensor.add(metrics.metricName("partition-assigned-latency-avg",
this.metricGrpName,
"The average time taken for a partition-assigned rebalance listener callback"), new Avg());
this.assignCallbackSensor.add(metrics.metricName("partition-assigned-latency-max",
this.metricGrpName,
"The max time taken for a partition-assigned rebalance listener callback"), new Max());
this.loseCallbackSensor = metrics.sensor("partition-lost-latency");
this.loseCallbackSensor.add(metrics.metricName("partition-lost-latency-avg",
this.metricGrpName,
"The average time taken for a partition-lost rebalance listener callback"), new Avg());
this.loseCallbackSensor.add(metrics.metricName("partition-lost-latency-max",
this.metricGrpName,
"The max time taken for a partition-lost rebalance listener callback"), new Max());
Measurable numParts = (config, now) -> subscriptions.numAssignedPartitions(); Measurable numParts = (config, now) -> subscriptions.numAssignedPartitions();
metrics.addMetric(metrics.metricName("assigned-partitions", metrics.addMetric(metrics.metricName("assigned-partitions",

View File

@ -31,7 +31,7 @@ public final class Heartbeat {
private final Timer sessionTimer; private final Timer sessionTimer;
private final Timer pollTimer; private final Timer pollTimer;
private volatile long lastHeartbeatSend; private volatile long lastHeartbeatSend = 0L;
public Heartbeat(GroupRebalanceConfig config, public Heartbeat(GroupRebalanceConfig config,
Time time) { Time time) {

View File

@ -78,6 +78,7 @@ public class JmxReporter implements MetricsReporter {
public boolean containsMbean(String mbeanName) { public boolean containsMbean(String mbeanName) {
return mbeans.containsKey(mbeanName); return mbeans.containsKey(mbeanName);
} }
@Override @Override
public void metricChange(KafkaMetric metric) { public void metricChange(KafkaMetric metric) {
synchronized (LOCK) { synchronized (LOCK) {

View File

@ -32,6 +32,7 @@ import org.apache.kafka.common.message.JoinGroupResponseData;
import org.apache.kafka.common.message.LeaveGroupResponseData; import org.apache.kafka.common.message.LeaveGroupResponseData;
import org.apache.kafka.common.message.LeaveGroupResponseData.MemberResponse; import org.apache.kafka.common.message.LeaveGroupResponseData.MemberResponse;
import org.apache.kafka.common.message.SyncGroupResponseData; import org.apache.kafka.common.message.SyncGroupResponseData;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.Errors;
@ -88,12 +89,13 @@ public class AbstractCoordinatorTest {
private static final String GROUP_ID = "dummy-group"; private static final String GROUP_ID = "dummy-group";
private static final String METRIC_GROUP_PREFIX = "consumer"; private static final String METRIC_GROUP_PREFIX = "consumer";
private MockClient mockClient;
private MockTime mockTime;
private Node node; private Node node;
private Metrics metrics;
private MockTime mockTime;
private Node coordinatorNode; private Node coordinatorNode;
private ConsumerNetworkClient consumerClient; private MockClient mockClient;
private DummyCoordinator coordinator; private DummyCoordinator coordinator;
private ConsumerNetworkClient consumerClient;
private final String memberId = "memberId"; private final String memberId = "memberId";
private final String leaderId = "leaderId"; private final String leaderId = "leaderId";
@ -124,7 +126,7 @@ public class AbstractCoordinatorTest {
retryBackoffMs, retryBackoffMs,
REQUEST_TIMEOUT_MS, REQUEST_TIMEOUT_MS,
HEARTBEAT_INTERVAL_MS); HEARTBEAT_INTERVAL_MS);
Metrics metrics = new Metrics(); metrics = new Metrics(mockTime);
mockClient.updateMetadata(TestUtils.metadataUpdateWith(1, emptyMap())); mockClient.updateMetadata(TestUtils.metadataUpdateWith(1, emptyMap()));
this.node = metadata.fetch().nodes().get(0); this.node = metadata.fetch().nodes().get(0);
@ -143,6 +145,89 @@ public class AbstractCoordinatorTest {
mockTime); mockTime);
} }
@Test
public void testMetrics() {
setupCoordinator();
assertNotNull(getMetric("heartbeat-response-time-max"));
assertNotNull(getMetric("heartbeat-rate"));
assertNotNull(getMetric("heartbeat-total"));
assertNotNull(getMetric("last-heartbeat-seconds-ago"));
assertNotNull(getMetric("join-time-avg"));
assertNotNull(getMetric("join-time-max"));
assertNotNull(getMetric("join-rate"));
assertNotNull(getMetric("join-total"));
assertNotNull(getMetric("sync-time-avg"));
assertNotNull(getMetric("sync-time-max"));
assertNotNull(getMetric("sync-rate"));
assertNotNull(getMetric("sync-total"));
assertNotNull(getMetric("rebalance-latency-avg"));
assertNotNull(getMetric("rebalance-latency-max"));
assertNotNull(getMetric("rebalance-rate-per-hour"));
assertNotNull(getMetric("rebalance-total"));
assertNotNull(getMetric("last-rebalance-seconds-ago"));
assertNotNull(getMetric("failed-rebalance-rate-per-hour"));
assertNotNull(getMetric("failed-rebalance-total"));
metrics.sensor("heartbeat-latency").record(1.0d);
metrics.sensor("heartbeat-latency").record(6.0d);
metrics.sensor("heartbeat-latency").record(2.0d);
assertEquals(6.0d, getMetric("heartbeat-response-time-max").metricValue());
assertEquals(0.1d, getMetric("heartbeat-rate").metricValue());
assertEquals(3.0d, getMetric("heartbeat-total").metricValue());
assertEquals(-1.0d, getMetric("last-heartbeat-seconds-ago").metricValue());
coordinator.heartbeat().sentHeartbeat(mockTime.milliseconds());
assertEquals(0.0d, getMetric("last-heartbeat-seconds-ago").metricValue());
mockTime.sleep(10 * 1000L);
assertEquals(10.0d, getMetric("last-heartbeat-seconds-ago").metricValue());
metrics.sensor("join-latency").record(1.0d);
metrics.sensor("join-latency").record(6.0d);
metrics.sensor("join-latency").record(2.0d);
assertEquals(3.0d, getMetric("join-time-avg").metricValue());
assertEquals(6.0d, getMetric("join-time-max").metricValue());
assertEquals(0.1d, getMetric("join-rate").metricValue());
assertEquals(3.0d, getMetric("join-total").metricValue());
metrics.sensor("sync-latency").record(1.0d);
metrics.sensor("sync-latency").record(6.0d);
metrics.sensor("sync-latency").record(2.0d);
assertEquals(3.0d, getMetric("sync-time-avg").metricValue());
assertEquals(6.0d, getMetric("sync-time-max").metricValue());
assertEquals(0.1d, getMetric("sync-rate").metricValue());
assertEquals(3.0d, getMetric("sync-total").metricValue());
metrics.sensor("rebalance-latency").record(1.0d);
metrics.sensor("rebalance-latency").record(6.0d);
metrics.sensor("rebalance-latency").record(2.0d);
assertEquals(3.0d, getMetric("rebalance-latency-avg").metricValue());
assertEquals(6.0d, getMetric("rebalance-latency-max").metricValue());
assertEquals(360.0d, getMetric("rebalance-rate-per-hour").metricValue());
assertEquals(3.0d, getMetric("rebalance-total").metricValue());
metrics.sensor("failed-rebalance").record(1.0d);
metrics.sensor("failed-rebalance").record(6.0d);
metrics.sensor("failed-rebalance").record(2.0d);
assertEquals(360.0d, getMetric("failed-rebalance-rate-per-hour").metricValue());
assertEquals(3.0d, getMetric("failed-rebalance-total").metricValue());
assertEquals(-1.0d, getMetric("last-rebalance-seconds-ago").metricValue());
coordinator.setLastRebalanceTime(mockTime.milliseconds());
assertEquals(0.0d, getMetric("last-rebalance-seconds-ago").metricValue());
mockTime.sleep(10 * 1000L);
assertEquals(10.0d, getMetric("last-rebalance-seconds-ago").metricValue());
}
private KafkaMetric getMetric(final String name) {
return metrics.metrics().get(metrics.metricName(name, "consumer-coordinator-metrics"));
}
@Test @Test
public void testCoordinatorDiscoveryBackoff() { public void testCoordinatorDiscoveryBackoff() {
setupCoordinator(); setupCoordinator();

View File

@ -49,7 +49,9 @@ import org.apache.kafka.common.message.LeaveGroupResponseData;
import org.apache.kafka.common.message.OffsetCommitRequestData; import org.apache.kafka.common.message.OffsetCommitRequestData;
import org.apache.kafka.common.message.OffsetCommitResponseData; import org.apache.kafka.common.message.OffsetCommitResponseData;
import org.apache.kafka.common.message.SyncGroupResponseData; import org.apache.kafka.common.message.SyncGroupResponseData;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.RecordBatch; import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.requests.AbstractRequest; import org.apache.kafka.common.requests.AbstractRequest;
@ -200,6 +202,58 @@ public class ConsumerCoordinatorTest {
this.coordinator.close(time.timer(0)); this.coordinator.close(time.timer(0));
} }
@Test
public void testMetrics() {
assertNotNull(getMetric("commit-latency-avg"));
assertNotNull(getMetric("commit-latency-max"));
assertNotNull(getMetric("commit-rate"));
assertNotNull(getMetric("commit-total"));
assertNotNull(getMetric("partition-revoked-latency-avg"));
assertNotNull(getMetric("partition-revoked-latency-max"));
assertNotNull(getMetric("partition-assigned-latency-avg"));
assertNotNull(getMetric("partition-assigned-latency-max"));
assertNotNull(getMetric("partition-lost-latency-avg"));
assertNotNull(getMetric("partition-lost-latency-max"));
assertNotNull(getMetric("assigned-partitions"));
metrics.sensor("commit-latency").record(1.0d);
metrics.sensor("commit-latency").record(6.0d);
metrics.sensor("commit-latency").record(2.0d);
assertEquals(3.0d, getMetric("commit-latency-avg").metricValue());
assertEquals(6.0d, getMetric("commit-latency-max").metricValue());
assertEquals(0.1d, getMetric("commit-rate").metricValue());
assertEquals(3.0d, getMetric("commit-total").metricValue());
metrics.sensor("partition-revoked-latency").record(1.0d);
metrics.sensor("partition-revoked-latency").record(2.0d);
metrics.sensor("partition-assigned-latency").record(1.0d);
metrics.sensor("partition-assigned-latency").record(2.0d);
metrics.sensor("partition-lost-latency").record(1.0d);
metrics.sensor("partition-lost-latency").record(2.0d);
assertEquals(1.5d, getMetric("partition-revoked-latency-avg").metricValue());
assertEquals(2.0d, getMetric("partition-revoked-latency-max").metricValue());
assertEquals(1.5d, getMetric("partition-assigned-latency-avg").metricValue());
assertEquals(2.0d, getMetric("partition-assigned-latency-max").metricValue());
assertEquals(1.5d, getMetric("partition-lost-latency-avg").metricValue());
assertEquals(2.0d, getMetric("partition-lost-latency-max").metricValue());
assertEquals(0.0d, getMetric("assigned-partitions").metricValue());
subscriptions.assignFromUser(Collections.singleton(t1p));
assertEquals(1.0d, getMetric("assigned-partitions").metricValue());
subscriptions.assignFromUser(Utils.mkSet(t1p, t2p));
assertEquals(2.0d, getMetric("assigned-partitions").metricValue());
}
private KafkaMetric getMetric(final String name) {
return metrics.metrics().get(metrics.metricName(name, "consumer" + groupId + "-coordinator-metrics"));
}
private Sensor getSensor(final String name) {
return metrics.sensor(name);
}
@Test @Test
public void testSelectRebalanceProtcol() { public void testSelectRebalanceProtcol() {
List<ConsumerPartitionAssignor> assignors = new ArrayList<>(); List<ConsumerPartitionAssignor> assignors = new ArrayList<>();