From 2da02d9fcf72ef0d73dbf958f2e24ce179bfd6ff Mon Sep 17 00:00:00 2001 From: Arpit Goyal Date: Mon, 6 Oct 2025 23:57:50 +0530 Subject: [PATCH] KAFKA-19723 Adding consumer rebalance metrics test (#20565) Added Testcases for consumer rebalance metric manager test. Reviewers: Lianet Magrans , TengYao Chi , Hong-Yi Chen --- .../ConsumerRebalanceMetricsManagerTest.java | 298 +++++++++++++++++- 1 file changed, 288 insertions(+), 10 deletions(-) diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/metrics/ConsumerRebalanceMetricsManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/metrics/ConsumerRebalanceMetricsManagerTest.java index a7d91227767..a5c52fac090 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/metrics/ConsumerRebalanceMetricsManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/metrics/ConsumerRebalanceMetricsManagerTest.java @@ -19,45 +19,323 @@ package org.apache.kafka.clients.consumer.internals.metrics; import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy; import org.apache.kafka.clients.consumer.internals.SubscriptionState; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import java.util.Optional; import java.util.Set; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.mock; class ConsumerRebalanceMetricsManagerTest { - private final Time time = new MockTime(); - private final Metrics metrics = new Metrics(time); + private Time time; + private Metrics metrics; + private SubscriptionState subscriptionState; + private ConsumerRebalanceMetricsManager metricsManager; + private MetricConfig metricConfig; + private long windowSizeMs; + private int numSamples; + + @BeforeEach + public void setUp() { + time = new MockTime(); + // Use MetricConfig with its default values + windowSizeMs = 30000; // 30 seconds - default value + numSamples = 2; // default value + metricConfig = new MetricConfig() + .samples(numSamples) + .timeWindow(windowSizeMs, java.util.concurrent.TimeUnit.MILLISECONDS); + metrics = new Metrics(metricConfig, time); + subscriptionState = new SubscriptionState(mock(LogContext.class), AutoOffsetResetStrategy.EARLIEST); + metricsManager = new ConsumerRebalanceMetricsManager(metrics, subscriptionState); + } + + @AfterEach + public void tearDown() { + metrics.close(); + } @Test public void testAssignedPartitionCountMetric() { - SubscriptionState subscriptionState = new SubscriptionState(mock(LogContext.class), AutoOffsetResetStrategy.EARLIEST); - ConsumerRebalanceMetricsManager consumerRebalanceMetricsManager = new ConsumerRebalanceMetricsManager(metrics, subscriptionState); - - assertNotNull(metrics.metric(consumerRebalanceMetricsManager.assignedPartitionsCount), "Metric assigned-partitions has not been registered as expected"); + assertNotNull(metrics.metric(metricsManager.assignedPartitionsCount), "Metric assigned-partitions has not been registered as expected"); // Check for manually assigned partitions subscriptionState.assignFromUser(Set.of(new TopicPartition("topic", 0), new TopicPartition("topic", 1))); - assertEquals(2.0d, metrics.metric(consumerRebalanceMetricsManager.assignedPartitionsCount).metricValue()); + assertEquals(2.0d, metrics.metric(metricsManager.assignedPartitionsCount).metricValue()); subscriptionState.assignFromUser(Set.of()); - assertEquals(0.0d, metrics.metric(consumerRebalanceMetricsManager.assignedPartitionsCount).metricValue()); + assertEquals(0.0d, metrics.metric(metricsManager.assignedPartitionsCount).metricValue()); subscriptionState.unsubscribe(); - assertEquals(0.0d, metrics.metric(consumerRebalanceMetricsManager.assignedPartitionsCount).metricValue()); + assertEquals(0.0d, metrics.metric(metricsManager.assignedPartitionsCount).metricValue()); // Check for automatically assigned partitions subscriptionState.subscribe(Set.of("topic"), Optional.empty()); subscriptionState.assignFromSubscribed(Set.of(new TopicPartition("topic", 0))); - assertEquals(1.0d, metrics.metric(consumerRebalanceMetricsManager.assignedPartitionsCount).metricValue()); + assertEquals(1.0d, metrics.metric(metricsManager.assignedPartitionsCount).metricValue()); + } + + @Test + public void testRebalanceTimingMetrics() { + + // Verify timing metrics are registered + assertNotNull(metrics.metric(metricsManager.rebalanceLatencyAvg)); + assertNotNull(metrics.metric(metricsManager.rebalanceLatencyMax)); + assertNotNull(metrics.metric(metricsManager.rebalanceLatencyTotal)); + assertNotNull(metrics.metric(metricsManager.rebalanceTotal)); + + // Record first rebalance (10ms duration) + metricsManager.recordRebalanceStarted(time.milliseconds()); + time.sleep(10); + metricsManager.recordRebalanceEnded(time.milliseconds()); + + // Verify metrics after first rebalance + assertEquals(10.0d, metrics.metric(metricsManager.rebalanceLatencyAvg).metricValue()); + assertEquals(10.0d, metrics.metric(metricsManager.rebalanceLatencyMax).metricValue()); + assertEquals(10.0d, metrics.metric(metricsManager.rebalanceLatencyTotal).metricValue()); + assertEquals(1.0d, metrics.metric(metricsManager.rebalanceTotal).metricValue()); + + // Record second rebalance (30ms duration) + metricsManager.recordRebalanceStarted(time.milliseconds()); + time.sleep(30); + metricsManager.recordRebalanceEnded(time.milliseconds()); + + // Verify metrics after second rebalance + assertEquals(20.0d, metrics.metric(metricsManager.rebalanceLatencyAvg).metricValue(), + "Average latency should be (10 + 30) / 2 = 20ms"); + assertEquals(30.0d, metrics.metric(metricsManager.rebalanceLatencyMax).metricValue(), + "Max latency should be max(10, 30) = 30ms"); + assertEquals(40.0d, metrics.metric(metricsManager.rebalanceLatencyTotal).metricValue(), + "Total latency should be 10 + 30 = 40ms"); + assertEquals(2.0d, metrics.metric(metricsManager.rebalanceTotal).metricValue()); + + // Record third rebalance (50ms duration) + metricsManager.recordRebalanceStarted(time.milliseconds()); + time.sleep(50); + metricsManager.recordRebalanceEnded(time.milliseconds()); + + // Verify metrics after third rebalance + assertEquals(30.0d, metrics.metric(metricsManager.rebalanceLatencyAvg).metricValue(), + "Average latency should be (10 + 30 + 50) / 3 = 30ms"); + assertEquals(50.0d, metrics.metric(metricsManager.rebalanceLatencyMax).metricValue(), + "Max latency should be max(10, 30, 50) = 50ms"); + assertEquals(90.0d, metrics.metric(metricsManager.rebalanceLatencyTotal).metricValue(), + "Total latency should be 10 + 30 + 50 = 90ms"); + assertEquals(3.0d, metrics.metric(metricsManager.rebalanceTotal).metricValue()); + } + + @Test + public void testRebalanceRateMetric() { + + // Verify rate metric is registered + assertNotNull(metrics.metric(metricsManager.rebalanceRatePerHour)); + + // Record 3 rebalances within 30ms total (3 x 10ms) + int rebalanceCount = 3; + long startTime = time.milliseconds(); + for (int i = 0; i < rebalanceCount; i++) { + metricsManager.recordRebalanceStarted(time.milliseconds()); + time.sleep(10); + metricsManager.recordRebalanceEnded(time.milliseconds()); + } + long endTime = time.milliseconds(); + long actualElapsedMs = endTime - startTime; + + double ratePerHour = (Double) metrics.metric(metricsManager.rebalanceRatePerHour).metricValue(); + + // The Rate metric calculation: + // - Uses elapsed time from the oldest sample + // - Ensures minimum window size of (numSamples - 1) * windowSizeMs + // - With default config: minWindow = (2-1) * 30000 = 30000ms + long minWindowMs = (numSamples - 1) * windowSizeMs; // (2-1) * 30000 = 30000ms + + // Since actualElapsedMs (30ms) is much less than minWindowMs (30000ms), + // the rate calculation will use minWindowMs as the window + // Rate per hour = count / (windowMs / 1000) * 3600 + double expectedRatePerHour = (double) rebalanceCount / (minWindowMs / 1000.0) * 3600.0; + + assertEquals(expectedRatePerHour, ratePerHour, 1.0, + String.format("With %d rebalances in %dms, min window %dms: expecting %.1f rebalances/hour", + rebalanceCount, actualElapsedMs, minWindowMs, expectedRatePerHour)); + } + + @Test + public void testFailedRebalanceMetrics() { + + // Verify failed rebalance metrics are registered + assertNotNull(metrics.metric(metricsManager.failedRebalanceTotal)); + assertNotNull(metrics.metric(metricsManager.failedRebalanceRate)); + + assertEquals(0.0d, metrics.metric(metricsManager.failedRebalanceTotal).metricValue(), + "Initially, there should be no failed rebalances"); + + // Start a rebalance but don't complete it + metricsManager.recordRebalanceStarted(time.milliseconds()); + time.sleep(10); + + metricsManager.maybeRecordRebalanceFailed(); + assertEquals(1.0d, metrics.metric(metricsManager.failedRebalanceTotal).metricValue(), + "Failed rebalance count should increment to 1 after recording failure"); + + // Complete a successful rebalance + metricsManager.recordRebalanceStarted(time.milliseconds()); + time.sleep(10); + metricsManager.recordRebalanceEnded(time.milliseconds()); + + metricsManager.maybeRecordRebalanceFailed(); + assertEquals(1.0d, metrics.metric(metricsManager.failedRebalanceTotal).metricValue(), + "Failed count should not increment after successful rebalance completes"); + + // Start another rebalance, don't complete it, then record failure + time.sleep(10); + metricsManager.recordRebalanceStarted(time.milliseconds()); + assertTrue(metricsManager.rebalanceStarted(), "Rebalance should be in progress"); + time.sleep(10); + // Don't call recordRebalanceEnded() to simulate an incomplete rebalance + metricsManager.maybeRecordRebalanceFailed(); + assertEquals(2.0d, metrics.metric(metricsManager.failedRebalanceTotal).metricValue()); + + double failedRate = (Double) metrics.metric(metricsManager.failedRebalanceRate).metricValue(); + + // Calculate expected failed rate based on Rate metric behavior + // We had 2 failures over ~40ms, but minimum window is (numSamples - 1) * windowSizeMs + long minWindowMs = (numSamples - 1) * windowSizeMs; // (2-1) * 30000 = 30000ms + double expectedFailedRatePerHour = 2.0 / (minWindowMs / 1000.0) * 3600.0; + + assertEquals(expectedFailedRatePerHour, failedRate, 1.0, + String.format("With 2 failures, min window %dms: expecting %.1f failures/hour", + minWindowMs, expectedFailedRatePerHour)); + } + + @Test + public void testLastRebalanceSecondsAgoMetric() { + + // Verify metric is registered + assertNotNull(metrics.metric(metricsManager.lastRebalanceSecondsAgo)); + + assertEquals(-1.0d, metrics.metric(metricsManager.lastRebalanceSecondsAgo).metricValue(), + "Should return -1 when no rebalance has occurred"); + + // Complete a rebalance + metricsManager.recordRebalanceStarted(time.milliseconds()); + time.sleep(10); + metricsManager.recordRebalanceEnded(time.milliseconds()); + + assertEquals(0.0d, metrics.metric(metricsManager.lastRebalanceSecondsAgo).metricValue(), + "Should return 0 immediately after rebalance completes"); + + // Advance time by 5 seconds + time.sleep(5000); + assertEquals(5.0d, metrics.metric(metricsManager.lastRebalanceSecondsAgo).metricValue()); + + // Advance time by another 10 seconds + time.sleep(10000); + assertEquals(15.0d, metrics.metric(metricsManager.lastRebalanceSecondsAgo).metricValue()); + + // Complete another rebalance + metricsManager.recordRebalanceStarted(time.milliseconds()); + time.sleep(20); + metricsManager.recordRebalanceEnded(time.milliseconds()); + + assertEquals(0.0d, metrics.metric(metricsManager.lastRebalanceSecondsAgo).metricValue(), + "Should reset to 0 after a new rebalance completes"); + } + + @Test + public void testRebalanceStartedFlag() { + + assertFalse(metricsManager.rebalanceStarted(), + "Initially, no rebalance should be in progress"); + + metricsManager.recordRebalanceStarted(time.milliseconds()); + assertTrue(metricsManager.rebalanceStarted(), + "Rebalance should be marked as started after recordRebalanceStarted()"); + + time.sleep(10); + metricsManager.recordRebalanceEnded(time.milliseconds()); + assertFalse(metricsManager.rebalanceStarted(), + "Rebalance should not be in progress after recordRebalanceEnded()"); + + // Start another rebalance - advance time first + time.sleep(100); + metricsManager.recordRebalanceStarted(time.milliseconds()); + assertTrue(metricsManager.rebalanceStarted(), + "New rebalance should be marked as started"); + } + + @Test + public void testMultipleConsecutiveFailures() { + + // Record multiple consecutive failures + for (int i = 0; i < 5; i++) { + metricsManager.recordRebalanceStarted(time.milliseconds()); + time.sleep(10); + metricsManager.maybeRecordRebalanceFailed(); + } + + assertEquals(5.0d, metrics.metric(metricsManager.failedRebalanceTotal).metricValue(), + "Should have recorded 5 consecutive failed rebalances"); + + assertEquals(0.0d, metrics.metric(metricsManager.rebalanceTotal).metricValue(), + "Successful rebalance count should remain 0 when only failures occur"); + } + + @Test + public void testMixedSuccessAndFailureScenarios() { + + // Success -> Failure -> Success -> Failure pattern + // First success + metricsManager.recordRebalanceStarted(time.milliseconds()); + time.sleep(20); + metricsManager.recordRebalanceEnded(time.milliseconds()); + assertEquals(1.0d, metrics.metric(metricsManager.rebalanceTotal).metricValue()); + + // First failure + time.sleep(10); + metricsManager.recordRebalanceStarted(time.milliseconds()); + assertTrue(metricsManager.rebalanceStarted(), "First failure rebalance should be in progress"); + time.sleep(30); + metricsManager.maybeRecordRebalanceFailed(); + + double failedAfterFirst = (Double) metrics.metric(metricsManager.failedRebalanceTotal).metricValue(); + assertEquals(1.0d, failedAfterFirst, "Should have recorded one failed rebalance after first failure"); + + // Second success + time.sleep(10); + metricsManager.recordRebalanceStarted(time.milliseconds()); + time.sleep(40); + metricsManager.recordRebalanceEnded(time.milliseconds()); + assertEquals(2.0d, metrics.metric(metricsManager.rebalanceTotal).metricValue()); + + // Second failure + time.sleep(10); + metricsManager.recordRebalanceStarted(time.milliseconds()); + assertTrue(metricsManager.rebalanceStarted(), "Second failure rebalance should be in progress"); + time.sleep(50); + metricsManager.maybeRecordRebalanceFailed(); + + assertEquals(2.0d, metrics.metric(metricsManager.rebalanceTotal).metricValue(), + "Should have 2 successful rebalances in mixed scenario"); + assertEquals(2.0d, metrics.metric(metricsManager.failedRebalanceTotal).metricValue(), + "Should have 2 failed rebalances in mixed scenario"); + + assertEquals(30.0d, metrics.metric(metricsManager.rebalanceLatencyAvg).metricValue(), + "Average latency should only include successful rebalances: (20 + 40) / 2 = 30ms"); + assertEquals(40.0d, metrics.metric(metricsManager.rebalanceLatencyMax).metricValue(), + "Max latency should be 40ms from successful rebalances only"); + assertEquals(60.0d, metrics.metric(metricsManager.rebalanceLatencyTotal).metricValue(), + "Total latency should only include successful rebalances: 20 + 40 = 60ms"); } }