KAFKA-19667: Close ShareConsumer in ShareConsumerPerformance after metrics displayed (#20467)

Ensure that metrics are retrieved and displayed (when requested) before
ShareConsumer.close() is called. This is important because metrics are
technically supposed to be removed on ShareConsumer.close(), which means
retrieving them after close() would yield an empty map.

Related to https://github.com/apache/kafka/pull/20267.

Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>
This commit is contained in:
Andrew Schofield 2025-09-04 18:42:58 +01:00 committed by GitHub
parent 9257c431ed
commit 1d0c5f2820
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 32 additions and 7 deletions

View File

@ -20,6 +20,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaShareConsumer; import org.apache.kafka.clients.consumer.KafkaShareConsumer;
import org.apache.kafka.clients.consumer.ShareConsumer;
import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Metric; import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName; import org.apache.kafka.common.MetricName;
@ -47,8 +48,8 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import joptsimple.OptionException; import joptsimple.OptionException;
import joptsimple.OptionSpec; import joptsimple.OptionSpec;
@ -59,6 +60,10 @@ public class ShareConsumerPerformance {
private static final Logger LOG = LoggerFactory.getLogger(ShareConsumerPerformance.class); private static final Logger LOG = LoggerFactory.getLogger(ShareConsumerPerformance.class);
public static void main(String[] args) { public static void main(String[] args) {
run(args, KafkaShareConsumer::new);
}
static void run(String[] args, Function<Properties, ShareConsumer<byte[], byte[]>> shareConsumerCreator) {
try { try {
LOG.info("Starting share consumer/consumers..."); LOG.info("Starting share consumer/consumers...");
ShareConsumerPerfOptions options = new ShareConsumerPerfOptions(args); ShareConsumerPerfOptions options = new ShareConsumerPerfOptions(args);
@ -68,9 +73,9 @@ public class ShareConsumerPerformance {
if (!options.hideHeader()) if (!options.hideHeader())
printHeader(); printHeader();
List<KafkaShareConsumer<byte[], byte[]>> shareConsumers = new ArrayList<>(); List<ShareConsumer<byte[], byte[]>> shareConsumers = new ArrayList<>();
for (int i = 0; i < options.threads(); i++) { for (int i = 0; i < options.threads(); i++) {
shareConsumers.add(new KafkaShareConsumer<>(options.props())); shareConsumers.add(shareConsumerCreator.apply(options.props()));
} }
long startMs = System.currentTimeMillis(); long startMs = System.currentTimeMillis();
consume(shareConsumers, options, totalMessagesRead, totalBytesRead, startMs); consume(shareConsumers, options, totalMessagesRead, totalBytesRead, startMs);
@ -83,7 +88,6 @@ public class ShareConsumerPerformance {
shareConsumers.forEach(shareConsumer -> { shareConsumers.forEach(shareConsumer -> {
@SuppressWarnings("UnusedLocalVariable") @SuppressWarnings("UnusedLocalVariable")
Map<TopicIdPartition, Optional<KafkaException>> ignored = shareConsumer.commitSync(); Map<TopicIdPartition, Optional<KafkaException>> ignored = shareConsumer.commitSync();
shareConsumer.close(Duration.ofMillis(500));
}); });
// Print final stats for share group. // Print final stats for share group.
@ -94,6 +98,7 @@ public class ShareConsumerPerformance {
shareConsumersMetrics.forEach(ToolsUtils::printMetrics); shareConsumersMetrics.forEach(ToolsUtils::printMetrics);
shareConsumers.forEach(shareConsumer -> shareConsumer.close(Duration.ofMillis(500)));
} catch (Throwable e) { } catch (Throwable e) {
System.err.println(e.getMessage()); System.err.println(e.getMessage());
System.err.println(Utils.stackTrace(e)); System.err.println(Utils.stackTrace(e));
@ -106,11 +111,11 @@ public class ShareConsumerPerformance {
System.out.printf("start.time, end.time, data.consumed.in.MB, MB.sec, nMsg.sec, data.consumed.in.nMsg%s%n", newFieldsInHeader); System.out.printf("start.time, end.time, data.consumed.in.MB, MB.sec, nMsg.sec, data.consumed.in.nMsg%s%n", newFieldsInHeader);
} }
private static void consume(List<KafkaShareConsumer<byte[], byte[]>> shareConsumers, private static void consume(List<ShareConsumer<byte[], byte[]>> shareConsumers,
ShareConsumerPerfOptions options, ShareConsumerPerfOptions options,
AtomicLong totalMessagesRead, AtomicLong totalMessagesRead,
AtomicLong totalBytesRead, AtomicLong totalBytesRead,
long startMs) throws ExecutionException, InterruptedException, TimeoutException { long startMs) throws ExecutionException, InterruptedException {
long numMessages = options.numMessages(); long numMessages = options.numMessages();
long recordFetchTimeoutMs = options.recordFetchTimeoutMs(); long recordFetchTimeoutMs = options.recordFetchTimeoutMs();
shareConsumers.forEach(shareConsumer -> shareConsumer.subscribe(options.topic())); shareConsumers.forEach(shareConsumer -> shareConsumer.subscribe(options.topic()));
@ -180,7 +185,7 @@ public class ShareConsumerPerformance {
totalBytesRead.set(bytesRead.get()); totalBytesRead.set(bytesRead.get());
} }
private static void consumeMessagesForSingleShareConsumer(KafkaShareConsumer<byte[], byte[]> shareConsumer, private static void consumeMessagesForSingleShareConsumer(ShareConsumer<byte[], byte[]> shareConsumer,
AtomicLong totalMessagesRead, AtomicLong totalMessagesRead,
AtomicLong totalBytesRead, AtomicLong totalBytesRead,
ShareConsumerPerfOptions options, ShareConsumerPerfOptions options,

View File

@ -17,7 +17,10 @@
package org.apache.kafka.tools; package org.apache.kafka.tools;
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.MockShareConsumer;
import org.apache.kafka.clients.consumer.ShareConsumer;
import org.apache.kafka.common.utils.Exit; import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
@ -30,6 +33,8 @@ import java.io.PrintWriter;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.text.SimpleDateFormat; import java.text.SimpleDateFormat;
import java.util.Properties;
import java.util.function.Function;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertTrue;
@ -120,6 +125,21 @@ public class ShareConsumerPerformanceTest {
assertEquals("perf-share-consumer-client", config.props().getProperty(ConsumerConfig.CLIENT_ID_CONFIG)); assertEquals("perf-share-consumer-client", config.props().getProperty(ConsumerConfig.CLIENT_ID_CONFIG));
} }
@Test
public void testMetricsRetrievedBeforeConsumerClosed() {
String[] args = new String[]{
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--messages", "0",
"--print-metrics"
};
Function<Properties, ShareConsumer<byte[], byte[]>> shareConsumerCreator = properties -> new MockShareConsumer<>();
String err = ToolsTestUtils.captureStandardErr(() -> ShareConsumerPerformance.run(args, shareConsumerCreator));
assertTrue(Utils.isBlank(err), "Should be no stderr message, but was \"" + err + "\"");
}
private void testHeaderMatchContent(int expectedOutputLineCount, Runnable runnable) { private void testHeaderMatchContent(int expectedOutputLineCount, Runnable runnable) {
String out = ToolsTestUtils.captureStandardOut(() -> { String out = ToolsTestUtils.captureStandardOut(() -> {
ShareConsumerPerformance.printHeader(); ShareConsumerPerformance.printHeader();