diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java index 5aa0310fe20..a5eb91128d3 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java @@ -996,14 +996,6 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi nodesWithPendingRequests.add(nodeId); isProcessed = false; - BiConsumer responseHandler = (clientResponse, error) -> { - if (error != null) { - handleShareAcknowledgeFailure(nodeToSend, requestBuilder.data(), this, error, clientResponse.receivedTimeMs()); - } else { - handleShareAcknowledgeSuccess(nodeToSend, requestBuilder.data(), this, clientResponse, clientResponse.receivedTimeMs()); - } - }; - if (requestBuilder == null) { handleSessionErrorCode(Errors.SHARE_SESSION_NOT_FOUND); return null; @@ -1014,7 +1006,16 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi } else { incompleteAcknowledgements.clear(); } - return new UnsentRequest(requestBuilder, Optional.of(nodeToSend)).whenComplete(responseHandler); + + UnsentRequest unsentRequest = new UnsentRequest(requestBuilder, Optional.of(nodeToSend)); + BiConsumer responseHandler = (clientResponse, error) -> { + if (error != null) { + handleShareAcknowledgeFailure(nodeToSend, requestBuilder.data(), this, error, unsentRequest.handler().completionTimeMs()); + } else { + handleShareAcknowledgeSuccess(nodeToSend, requestBuilder.data(), this, clientResponse, unsentRequest.handler().completionTimeMs()); + } + }; + return unsentRequest.whenComplete(responseHandler); } } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java index 6897f796dea..59473ae9f8b 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java @@ -368,6 +368,40 @@ public class ShareConsumeRequestManagerTest { completedAcknowledgements.clear(); } + @Test + public void testServerDisconnectedOnShareAcknowledge() { + buildRequestManager(); + // Enabling the config so that background event is sent when the acknowledgement response is received. + shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true); + + assignFromSubscribed(Collections.singleton(tp0)); + + // normal fetch + assertEquals(1, sendFetches()); + assertFalse(shareConsumeRequestManager.hasCompletedFetches()); + + client.prepareResponse(fullFetchResponse(tip0, records, acquiredRecords, Errors.NONE)); + networkClientDelegate.poll(time.timer(0)); + assertTrue(shareConsumeRequestManager.hasCompletedFetches()); + + Acknowledgements acknowledgements = Acknowledgements.empty(); + acknowledgements.add(1L, AcknowledgeType.ACCEPT); + acknowledgements.add(2L, AcknowledgeType.ACCEPT); + acknowledgements.add(3L, AcknowledgeType.REJECT); + + shareConsumeRequestManager.commitAsync(Collections.singletonMap(tip0, acknowledgements)); + + assertEquals(1, shareConsumeRequestManager.sendAcknowledgements()); + + client.prepareResponse(null, true); + networkClientDelegate.poll(time.timer(0)); + assertTrue(shareConsumeRequestManager.hasCompletedFetches()); + + assertEquals(Collections.singletonMap(tip0, acknowledgements), completedAcknowledgements.get(0)); + assertEquals(Errors.UNKNOWN_SERVER_ERROR, completedAcknowledgements.get(0).get(tip0).getAcknowledgeErrorCode()); + completedAcknowledgements.clear(); + } + @Test public void testAcknowledgeOnClose() { buildRequestManager();