From e489682c45be0c63ccb28ead581b25d8eb77b2fc Mon Sep 17 00:00:00 2001 From: Abhinav Dixit Date: Wed, 9 Jul 2025 14:21:05 +0530 Subject: [PATCH] KAFKA-19450: ShareConsumerPerformance does not handle exceptions from consumeMessagesForSingleShareConsumer (#20126) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ### About Within `ShareConsumerPerformance.java`, all the share consumers run with within an executorService object and when we perform `executorService.submit()`, we do not store this future and exception would be recovered only when we do a future.get() in this case. I believe this is a shortcoming in `ShareConsumerPerformance.java` which needs to be improved. Reviewers: Andrew Schofield --- .../kafka/tools/ShareConsumerPerformance.java | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/tools/src/main/java/org/apache/kafka/tools/ShareConsumerPerformance.java b/tools/src/main/java/org/apache/kafka/tools/ShareConsumerPerformance.java index 953b2182326..b04bf922d04 100644 --- a/tools/src/main/java/org/apache/kafka/tools/ShareConsumerPerformance.java +++ b/tools/src/main/java/org/apache/kafka/tools/ShareConsumerPerformance.java @@ -43,9 +43,12 @@ import java.util.Map; import java.util.Optional; import java.util.Properties; import java.util.Set; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; import joptsimple.OptionException; @@ -108,7 +111,7 @@ public class ShareConsumerPerformance { ShareConsumerPerfOptions options, AtomicLong totalMessagesRead, AtomicLong totalBytesRead, - long startMs) { + long startMs) throws ExecutionException, InterruptedException, TimeoutException { long numMessages = options.numMessages(); long recordFetchTimeoutMs = options.recordFetchTimeoutMs(); shareConsumers.forEach(shareConsumer -> shareConsumer.subscribe(options.topic())); @@ -120,17 +123,18 @@ public class ShareConsumerPerformance { ExecutorService executorService = Executors.newFixedThreadPool(shareConsumers.size()); + List> futures = new ArrayList<>(); for (int i = 0; i < shareConsumers.size(); i++) { final int index = i; ShareConsumerConsumption shareConsumerConsumption = new ShareConsumerConsumption(0, 0); - executorService.submit(() -> { + futures.add(executorService.submit(() -> { try { consumeMessagesForSingleShareConsumer(shareConsumers.get(index), messagesRead, bytesRead, options, shareConsumerConsumption, index + 1); } catch (InterruptedException e) { throw new RuntimeException(e); } - }); + })); shareConsumersConsumptionDetails.add(shareConsumerConsumption); } LOG.debug("Shutting down of thread pool is started"); @@ -153,6 +157,9 @@ public class ShareConsumerPerformance { // Preserve interrupt status Thread.currentThread().interrupt(); } + for (Future future : futures) { + future.get(); + } if (options.showShareConsumerStats()) { long endMs = System.currentTimeMillis();