From ef1e30bf5b0aba1758522f8cacd2804d3bfbd4fb Mon Sep 17 00:00:00 2001 From: Jay Kreps Date: Tue, 11 Feb 2014 16:24:08 -0800 Subject: [PATCH] KAFKA-1259 Close blocked only until all messages had been sent not until all acknowledgements had been received. --- .../kafka/clients/producer/internals/Sender.java | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 87dd1a6fd3a..d93a455827a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -101,7 +101,9 @@ public class Sender implements Runnable { } } - // send anything left in the accumulator + // okay we stopped accepting requests but there may still be + // requests in the accumulator or waiting for acknowledgment, + // wait until these are completed. int unsent = 0; do { try { @@ -109,7 +111,7 @@ public class Sender implements Runnable { } catch (Exception e) { e.printStackTrace(); } - } while (unsent > 0); + } while (unsent > 0 || this.inFlightRequests.totalInFlightRequests() > 0); // close all the connections this.selector.close(); @@ -534,6 +536,13 @@ public class Sender implements Runnable { return requests.remove(node); } } + + public int totalInFlightRequests() { + int total = 0; + for (Deque deque : this.requests.values()) + total += deque.size(); + return total; + } } }