KAFKA-18155 : Fix bug in response handler for ShareAcknowledge (#18029)

In the response handler for ShareAcknowledge, we are passing the clientResponse.receivedTimeMs() to the handler methods. But when there is a disconnect or when the response received is null, we should be passing the current time instead.

This bug was causing consumer to hang as it did not call the handler methods on disconnect, and further requests were blocked waiting for its completion.

Reviewers: Andrew Schofield <aschofield@confluent.io>,  Apoorv Mittal <apoorvmittal10@gmail.com>, Manikumar Reddy <manikumar.reddy@gmail.com>
This commit is contained in:
ShivsundarR 2024-12-05 02:29:13 -05:00 committed by GitHub
parent 41fc058573
commit 8fde6dedea
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 44 additions and 9 deletions

View File

@ -996,14 +996,6 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi
nodesWithPendingRequests.add(nodeId);
isProcessed = false;
BiConsumer<ClientResponse, Throwable> responseHandler = (clientResponse, error) -> {
if (error != null) {
handleShareAcknowledgeFailure(nodeToSend, requestBuilder.data(), this, error, clientResponse.receivedTimeMs());
} else {
handleShareAcknowledgeSuccess(nodeToSend, requestBuilder.data(), this, clientResponse, clientResponse.receivedTimeMs());
}
};
if (requestBuilder == null) {
handleSessionErrorCode(Errors.SHARE_SESSION_NOT_FOUND);
return null;
@ -1014,7 +1006,16 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi
} else {
incompleteAcknowledgements.clear();
}
return new UnsentRequest(requestBuilder, Optional.of(nodeToSend)).whenComplete(responseHandler);
UnsentRequest unsentRequest = new UnsentRequest(requestBuilder, Optional.of(nodeToSend));
BiConsumer<ClientResponse, Throwable> responseHandler = (clientResponse, error) -> {
if (error != null) {
handleShareAcknowledgeFailure(nodeToSend, requestBuilder.data(), this, error, unsentRequest.handler().completionTimeMs());
} else {
handleShareAcknowledgeSuccess(nodeToSend, requestBuilder.data(), this, clientResponse, unsentRequest.handler().completionTimeMs());
}
};
return unsentRequest.whenComplete(responseHandler);
}
}

View File

@ -368,6 +368,40 @@ public class ShareConsumeRequestManagerTest {
completedAcknowledgements.clear();
}
@Test
public void testServerDisconnectedOnShareAcknowledge() {
buildRequestManager();
// Enabling the config so that background event is sent when the acknowledgement response is received.
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
assignFromSubscribed(Collections.singleton(tp0));
// normal fetch
assertEquals(1, sendFetches());
assertFalse(shareConsumeRequestManager.hasCompletedFetches());
client.prepareResponse(fullFetchResponse(tip0, records, acquiredRecords, Errors.NONE));
networkClientDelegate.poll(time.timer(0));
assertTrue(shareConsumeRequestManager.hasCompletedFetches());
Acknowledgements acknowledgements = Acknowledgements.empty();
acknowledgements.add(1L, AcknowledgeType.ACCEPT);
acknowledgements.add(2L, AcknowledgeType.ACCEPT);
acknowledgements.add(3L, AcknowledgeType.REJECT);
shareConsumeRequestManager.commitAsync(Collections.singletonMap(tip0, acknowledgements));
assertEquals(1, shareConsumeRequestManager.sendAcknowledgements());
client.prepareResponse(null, true);
networkClientDelegate.poll(time.timer(0));
assertTrue(shareConsumeRequestManager.hasCompletedFetches());
assertEquals(Collections.singletonMap(tip0, acknowledgements), completedAcknowledgements.get(0));
assertEquals(Errors.UNKNOWN_SERVER_ERROR, completedAcknowledgements.get(0).get(tip0).getAcknowledgeErrorCode());
completedAcknowledgements.clear();
}
@Test
public void testAcknowledgeOnClose() {
buildRequestManager();