diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java index e016d0d3984..51e3fb39dfb 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java @@ -758,6 +758,16 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi if (response.error() == Errors.UNKNOWN_TOPIC_ID) { metadata.requestUpdate(false); } + // Complete any inFlight acknowledgements with the error code from the response. + Map nodeAcknowledgementsInFlight = fetchAcknowledgementsInFlight.get(fetchTarget.id()); + if (nodeAcknowledgementsInFlight != null) { + nodeAcknowledgementsInFlight.forEach((tip, acks) -> { + acks.complete(Errors.forCode(response.error().code()).exception()); + metricsManager.recordFailedAcknowledgements(acks.size()); + }); + maybeSendShareAcknowledgeCommitCallbackEvent(nodeAcknowledgementsInFlight); + nodeAcknowledgementsInFlight.clear(); + } return; } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java index 57407260dee..68da71d7767 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java @@ -1455,6 +1455,39 @@ public class ShareConsumeRequestManagerTest { assertEquals(Errors.INVALID_SHARE_SESSION_EPOCH.exception(), completedAcknowledgements.get(0).get(tip0).getAcknowledgeException()); } + @Test + public void testPiggybackAcknowledgementsOnInitialShareSession_ShareSessionNotFound() { + buildRequestManager(); + shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true); + + assignFromSubscribed(singleton(tp0)); + sendFetchAndVerifyResponse(records, acquiredRecords, Errors.NONE); + + fetchRecords(); + + // The acknowledgements for the initial fetch from tip0 are processed now and sent to the background thread. + Acknowledgements acknowledgements = getAcknowledgements(1, AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT); + shareConsumeRequestManager.fetch(Map.of(tip0, new NodeAcknowledgements(0, acknowledgements)), Collections.emptyMap()); + + // We attempt to send the acknowledgements piggybacking on the fetch. + assertEquals(1, sendFetches()); + assertFalse(shareConsumeRequestManager.hasCompletedFetches()); + + // Simulate a broker restart, but no leader change, this resets share session epoch to 0. + client.prepareResponse(fetchResponseWithTopLevelError(tip0, Errors.SHARE_SESSION_NOT_FOUND)); + networkClientDelegate.poll(time.timer(0)); + + // We would complete these acknowledgements with the error code from the response. + assertEquals(3, completedAcknowledgements.get(0).get(tip0).size()); + assertEquals(Errors.SHARE_SESSION_NOT_FOUND.exception(), completedAcknowledgements.get(0).get(tip0).getAcknowledgeException()); + + // Next fetch would proceed as expected and would not include any acknowledgements. + NetworkClientDelegate.PollResult pollResult = shareConsumeRequestManager.sendFetchesReturnPollResult(); + assertEquals(1, pollResult.unsentRequests.size()); + ShareFetchRequest.Builder builder = (ShareFetchRequest.Builder) pollResult.unsentRequests.get(0).requestBuilder(); + assertEquals(0, builder.data().topics().find(topicId).partitions().find(0).acknowledgementBatches().size()); + } + @Test public void testInvalidDefaultRecordBatch() { buildRequestManager();