KAFKA-6768; Transactional producer may hang in close with pending requests (#4842)

This patch fixes an edge case in producer shutdown which prevents `close()` from completing due to a pending request which will never be sent due to shutdown initiation. I have added a test case which reproduces the scenario.

Reviewers: Apurva Mehta <apurva@confluent.io>, Ismael Juma <ismael@juma.me.uk>
This commit is contained in:
Jason Gustafson 2018-04-09 15:39:07 -07:00 committed by GitHub
parent e6b4d17c59
commit 0a8f35b684
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 21 additions and 1 deletions

View File

@ -329,7 +329,7 @@ public class Sender implements Runnable {
return false;
AbstractRequest.Builder<?> requestBuilder = nextRequestHandler.requestBuilder();
while (running) {
while (!forceClose) {
Node targetNode = null;
try {
if (nextRequestHandler.needsCoordinator()) {

View File

@ -132,6 +132,26 @@ public class TransactionManagerTest {
client.setNode(brokerNode);
}
@Test
public void testSenderShutdownWithPendingAddPartitions() throws Exception {
long pid = 13131L;
short epoch = 1;
doInitTransactions(pid, epoch);
transactionManager.beginTransaction();
transactionManager.maybeAddPartitionToTransaction(tp0);
FutureRecordMetadata sendFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
"value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future;
prepareAddPartitionsToTxn(tp0, Errors.NONE);
prepareProduceResponse(Errors.NONE, pid, epoch);
sender.initiateClose();
sender.run();
assertTrue(sendFuture.isDone());
}
@Test
public void testEndTxnNotSentIfIncompleteBatches() {
long pid = 13131L;