From 13ba9c9305343d5b11bf26d72d1688cda1d5fd60 Mon Sep 17 00:00:00 2001 From: Kowshik Prakasam Date: Wed, 2 Jun 2021 11:30:28 -0700 Subject: [PATCH] KAFKA-12867: Fix ConsumeBenchWorker exit behavior for maxMessages config (#10797) The trogdor ConsumeBenchWorker allows several consumption tasks to be run in parallel, the number is configurable using the threadsPerWorker config. If one of the consumption tasks completes executing successfully due to maxMessages being consumed, then, the consumption task prematurely notifies the doneFuture causing the entire ConsumeBenchWorker to halt. This becomes a problem when more than 1 consumption task is running in parallel, because the successful completion of 1 of the tasks shuts down the entire worker while the other tasks are still running. When the worker is shut down, it kills all the active consumption tasks, though they have not consumed maxMessages yet. This commit defers notification of the doneFuture to the CloseStatusUpdater thread, which is already responsible for tracking the status of the tasks and updating their status when all of the tasks complete. Reviewers: Rajini Sivaram --- .../org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java index f6067b59af4..84ce1d333f1 100644 --- a/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java +++ b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java @@ -282,7 +282,6 @@ public class ConsumeBenchWorker implements TaskWorker { log.info("{} Consumed total number of messages={}, bytes={} in {} ms. status: {}", clientId, messagesConsumed, bytesConsumed, curTimeMs - startTimeMs, statusData); } - doneFuture.complete(""); consumer.close(); return null; } @@ -307,6 +306,7 @@ public class ConsumeBenchWorker implements TaskWorker { } statusUpdaterFuture.cancel(false); statusUpdater.update(); + doneFuture.complete(""); } }