KAFKA-1259 Close blocked only until all messages had been sent not until all acknowledgements had been received.

This commit is contained in:
Jay Kreps 2014-02-11 16:24:08 -08:00
parent e1845ba1d8
commit ef1e30bf5b
1 changed files with 11 additions and 2 deletions

View File

@ -101,7 +101,9 @@ public class Sender implements Runnable {
}
}
// send anything left in the accumulator
// okay we stopped accepting requests but there may still be
// requests in the accumulator or waiting for acknowledgment,
// wait until these are completed.
int unsent = 0;
do {
try {
@ -109,7 +111,7 @@ public class Sender implements Runnable {
} catch (Exception e) {
e.printStackTrace();
}
} while (unsent > 0);
} while (unsent > 0 || this.inFlightRequests.totalInFlightRequests() > 0);
// close all the connections
this.selector.close();
@ -534,6 +536,13 @@ public class Sender implements Runnable {
return requests.remove(node);
}
}
public int totalInFlightRequests() {
int total = 0;
for (Deque<InFlightRequest> deque : this.requests.values())
total += deque.size();
return total;
}
}
}