diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java index b36b778546b..e8a40bd66f3 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java @@ -175,22 +175,25 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi TopicIdPartition tip = new TopicIdPartition(topicId, partition); Acknowledgements acknowledgementsToSend = null; + boolean canSendAcknowledgements = true; + Map nodeAcksFromFetchMap = fetchAcknowledgementsToSend.get(node.id()); if (nodeAcksFromFetchMap != null) { acknowledgementsToSend = nodeAcksFromFetchMap.remove(tip); + if (acknowledgementsToSend != null) { - if (handler.isNewSession()) { - // Failing the acknowledgements as we cannot have piggybacked acknowledgements in the initial ShareFetchRequest. - acknowledgementsToSend.complete(Errors.INVALID_SHARE_SESSION_EPOCH.exception()); - maybeSendShareAcknowledgeCommitCallbackEvent(Map.of(tip, acknowledgementsToSend)); - } else { - metricsManager.recordAcknowledgementSent(acknowledgementsToSend.size()); - fetchAcknowledgementsInFlight.computeIfAbsent(node.id(), k -> new HashMap<>()).put(tip, acknowledgementsToSend); + // Check if the share session epoch is valid for sending acknowledgements. + if (!maybeAddAcknowledgements(handler, node, tip, acknowledgementsToSend)) { + canSendAcknowledgements = false; } } } - handler.addPartitionToFetch(tip, acknowledgementsToSend); + if (canSendAcknowledgements) { + handler.addPartitionToFetch(tip, acknowledgementsToSend); + } else { + handler.addPartitionToFetch(tip, null); + } topicNamesMap.putIfAbsent(new IdAndPartition(tip.topicId(), tip.partition()), tip.topic()); log.debug("Added fetch request for partition {} to node {}", tip, node.id()); @@ -212,8 +215,10 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi if (nodeAcksFromFetchMap != null) { nodeAcksFromFetchMap.forEach((tip, acks) -> { if (!isLeaderKnownToHaveChanged(nodeId, tip)) { - metricsManager.recordAcknowledgementSent(acks.size()); - fetchAcknowledgementsInFlight.computeIfAbsent(node.id(), k -> new HashMap<>()).put(tip, acks); + // Check if the share session epoch is valid for sending acknowledgements. + if (!maybeAddAcknowledgements(sessionHandler, node, tip, acks)) { + return; + } sessionHandler.addPartitionToAcknowledgeOnly(tip, acks); handlerMap.put(node, sessionHandler); @@ -256,6 +261,28 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi return new PollResult(requests); } + /** + * + * @return True if we can add acknowledgements to the share session. + * If we cannot add acknowledgements, they are completed with {@link Errors#INVALID_SHARE_SESSION_EPOCH} exception. + */ + private boolean maybeAddAcknowledgements(ShareSessionHandler handler, + Node node, + TopicIdPartition tip, + Acknowledgements acknowledgements) { + if (handler.isNewSession()) { + // Failing the acknowledgements as we cannot have piggybacked acknowledgements in the initial ShareFetchRequest. + log.debug("Cannot send acknowledgements on initial epoch for ShareSession for partition {}", tip); + acknowledgements.complete(Errors.INVALID_SHARE_SESSION_EPOCH.exception()); + maybeSendShareAcknowledgeCommitCallbackEvent(Map.of(tip, acknowledgements)); + return false; + } else { + metricsManager.recordAcknowledgementSent(acknowledgements.size()); + fetchAcknowledgementsInFlight.computeIfAbsent(node.id(), k -> new HashMap<>()).put(tip, acknowledgements); + return true; + } + } + public void fetch(Map acknowledgementsMap, Map controlRecordAcknowledgements) { if (!fetchMoreRecords) { diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java index 5cdde2df6ab..0ce3a524880 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java @@ -1407,8 +1407,49 @@ public class ShareConsumeRequestManagerTest { shareConsumeRequestManager.fetch(Map.of(tip0, new NodeAcknowledgements(0, acknowledgements)), Collections.emptyMap()); + NetworkClientDelegate.PollResult pollResult = shareConsumeRequestManager.sendFetchesReturnPollResult(); + assertEquals(1, pollResult.unsentRequests.size()); + ShareFetchRequest.Builder builder = (ShareFetchRequest.Builder) pollResult.unsentRequests.get(0).requestBuilder(); + assertEquals(1, builder.data().topics().size()); + // We should not add the acknowledgements as part of the request. + assertEquals(0, builder.data().topics().find(tip0.topicId()).partitions().find(0).acknowledgementBatches().size()); + + assertEquals(3, completedAcknowledgements.get(0).get(tip0).size()); + assertEquals(Errors.INVALID_SHARE_SESSION_EPOCH.exception(), completedAcknowledgements.get(0).get(tip0).getAcknowledgeException()); + } + + @Test + public void testPiggybackAcknowledgementsOnInitialShareSessionErrorSubscriptionChange() { + buildRequestManager(); + shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true); + + assignFromSubscribed(singleton(tp0)); sendFetchAndVerifyResponse(records, acquiredRecords, Errors.NONE); + fetchRecords(); + + // Simulate a broker restart, but no leader change, this resets share session epoch to 0. + assertEquals(1, sendFetches()); + assertFalse(shareConsumeRequestManager.hasCompletedFetches()); + client.prepareResponse(fetchResponseWithTopLevelError(tip0, Errors.SHARE_SESSION_NOT_FOUND)); + networkClientDelegate.poll(time.timer(0)); + + // Simulate a metadata update with no topics in the response. + client.updateMetadata( + RequestTestUtils.metadataUpdateWithIds(1, Collections.emptyMap(), + tp -> validLeaderEpoch, null, false)); + + // The acknowledgements for the initial fetch from tip0 are processed now and sent to the background thread. + Acknowledgements acknowledgements = getAcknowledgements(1, AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT); + shareConsumeRequestManager.fetch(Map.of(tip0, new NodeAcknowledgements(0, acknowledgements)), Collections.emptyMap()); + + assertEquals(0, completedAcknowledgements.size()); + + // Next fetch would not include any acknowledgements. + NetworkClientDelegate.PollResult pollResult = shareConsumeRequestManager.sendFetchesReturnPollResult(); + assertEquals(0, pollResult.unsentRequests.size()); + + // We should fail any waiting acknowledgements for tip-0 as it would have a share session epoch equal to 0. assertEquals(3, completedAcknowledgements.get(0).get(tip0).size()); assertEquals(Errors.INVALID_SHARE_SESSION_EPOCH.exception(), completedAcknowledgements.get(0).get(tip0).getAcknowledgeException()); }