KAFKA-19450: ShareConsumerPerformance does not handle exceptions from consumeMessagesForSingleShareConsumer (#20126)

### About
Within `ShareConsumerPerformance.java`, all the share consumers run with
within an executorService object and when we
perform `executorService.submit()`, we do not store this future and
exception would be recovered only when we do a future.get() in this
case. I believe this is a shortcoming
in `ShareConsumerPerformance.java` which needs to be improved.

Reviewers: Andrew Schofield <aschofield@confluent.io>
This commit is contained in:
Abhinav Dixit 2025-07-09 14:21:05 +05:30 committed by GitHub
parent 36b9bb94f1
commit e489682c45
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 10 additions and 3 deletions

View File

@ -43,9 +43,12 @@ import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.Properties; import java.util.Properties;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import joptsimple.OptionException; import joptsimple.OptionException;
@ -108,7 +111,7 @@ public class ShareConsumerPerformance {
ShareConsumerPerfOptions options, ShareConsumerPerfOptions options,
AtomicLong totalMessagesRead, AtomicLong totalMessagesRead,
AtomicLong totalBytesRead, AtomicLong totalBytesRead,
long startMs) { long startMs) throws ExecutionException, InterruptedException, TimeoutException {
long numMessages = options.numMessages(); long numMessages = options.numMessages();
long recordFetchTimeoutMs = options.recordFetchTimeoutMs(); long recordFetchTimeoutMs = options.recordFetchTimeoutMs();
shareConsumers.forEach(shareConsumer -> shareConsumer.subscribe(options.topic())); shareConsumers.forEach(shareConsumer -> shareConsumer.subscribe(options.topic()));
@ -120,17 +123,18 @@ public class ShareConsumerPerformance {
ExecutorService executorService = Executors.newFixedThreadPool(shareConsumers.size()); ExecutorService executorService = Executors.newFixedThreadPool(shareConsumers.size());
List<Future<?>> futures = new ArrayList<>();
for (int i = 0; i < shareConsumers.size(); i++) { for (int i = 0; i < shareConsumers.size(); i++) {
final int index = i; final int index = i;
ShareConsumerConsumption shareConsumerConsumption = new ShareConsumerConsumption(0, 0); ShareConsumerConsumption shareConsumerConsumption = new ShareConsumerConsumption(0, 0);
executorService.submit(() -> { futures.add(executorService.submit(() -> {
try { try {
consumeMessagesForSingleShareConsumer(shareConsumers.get(index), messagesRead, bytesRead, options, consumeMessagesForSingleShareConsumer(shareConsumers.get(index), messagesRead, bytesRead, options,
shareConsumerConsumption, index + 1); shareConsumerConsumption, index + 1);
} catch (InterruptedException e) { } catch (InterruptedException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
}); }));
shareConsumersConsumptionDetails.add(shareConsumerConsumption); shareConsumersConsumptionDetails.add(shareConsumerConsumption);
} }
LOG.debug("Shutting down of thread pool is started"); LOG.debug("Shutting down of thread pool is started");
@ -153,6 +157,9 @@ public class ShareConsumerPerformance {
// Preserve interrupt status // Preserve interrupt status
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
} }
for (Future<?> future : futures) {
future.get();
}
if (options.showShareConsumerStats()) { if (options.showShareConsumerStats()) {
long endMs = System.currentTimeMillis(); long endMs = System.currentTimeMillis();