From e1f45218c963ab6651320448a430f7fa3fd95033 Mon Sep 17 00:00:00 2001 From: Shivsundar R Date: Thu, 31 Jul 2025 17:07:44 -0400 Subject: [PATCH] KAFKA-19485 (II) : Complete any pending acknowledgements in ShareFetch on an error response. (#20247) *What* Currently when we received a top level error response in ShareFetch, we would log the error, update the share session epoch and proceed to the next request. But these acknowledgements(if any) are not completed and the callback would not have been processed. PR aims to address this by completing these acknowledgements with the error code from the response in this case. Reviewers: Andrew Schofield --- .../internals/ShareConsumeRequestManager.java | 10 ++++++ .../ShareConsumeRequestManagerTest.java | 33 +++++++++++++++++++ 2 files changed, 43 insertions(+) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java index e016d0d3984..51e3fb39dfb 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java @@ -758,6 +758,16 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi if (response.error() == Errors.UNKNOWN_TOPIC_ID) { metadata.requestUpdate(false); } + // Complete any inFlight acknowledgements with the error code from the response. + Map nodeAcknowledgementsInFlight = fetchAcknowledgementsInFlight.get(fetchTarget.id()); + if (nodeAcknowledgementsInFlight != null) { + nodeAcknowledgementsInFlight.forEach((tip, acks) -> { + acks.complete(Errors.forCode(response.error().code()).exception()); + metricsManager.recordFailedAcknowledgements(acks.size()); + }); + maybeSendShareAcknowledgeCommitCallbackEvent(nodeAcknowledgementsInFlight); + nodeAcknowledgementsInFlight.clear(); + } return; } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java index 57407260dee..68da71d7767 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java @@ -1455,6 +1455,39 @@ public class ShareConsumeRequestManagerTest { assertEquals(Errors.INVALID_SHARE_SESSION_EPOCH.exception(), completedAcknowledgements.get(0).get(tip0).getAcknowledgeException()); } + @Test + public void testPiggybackAcknowledgementsOnInitialShareSession_ShareSessionNotFound() { + buildRequestManager(); + shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true); + + assignFromSubscribed(singleton(tp0)); + sendFetchAndVerifyResponse(records, acquiredRecords, Errors.NONE); + + fetchRecords(); + + // The acknowledgements for the initial fetch from tip0 are processed now and sent to the background thread. + Acknowledgements acknowledgements = getAcknowledgements(1, AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT); + shareConsumeRequestManager.fetch(Map.of(tip0, new NodeAcknowledgements(0, acknowledgements)), Collections.emptyMap()); + + // We attempt to send the acknowledgements piggybacking on the fetch. + assertEquals(1, sendFetches()); + assertFalse(shareConsumeRequestManager.hasCompletedFetches()); + + // Simulate a broker restart, but no leader change, this resets share session epoch to 0. + client.prepareResponse(fetchResponseWithTopLevelError(tip0, Errors.SHARE_SESSION_NOT_FOUND)); + networkClientDelegate.poll(time.timer(0)); + + // We would complete these acknowledgements with the error code from the response. + assertEquals(3, completedAcknowledgements.get(0).get(tip0).size()); + assertEquals(Errors.SHARE_SESSION_NOT_FOUND.exception(), completedAcknowledgements.get(0).get(tip0).getAcknowledgeException()); + + // Next fetch would proceed as expected and would not include any acknowledgements. + NetworkClientDelegate.PollResult pollResult = shareConsumeRequestManager.sendFetchesReturnPollResult(); + assertEquals(1, pollResult.unsentRequests.size()); + ShareFetchRequest.Builder builder = (ShareFetchRequest.Builder) pollResult.unsentRequests.get(0).requestBuilder(); + assertEquals(0, builder.data().topics().find(topicId).partitions().find(0).acknowledgementBatches().size()); + } + @Test public void testInvalidDefaultRecordBatch() { buildRequestManager();