From 0e40b80c86631f10d221e1ce8e43055d91451564 Mon Sep 17 00:00:00 2001 From: ShivsundarR Date: Wed, 12 Feb 2025 10:54:01 -0500 Subject: [PATCH] KAFKA-18769: Improve leadership changes handling in ShareConsumeRequestManager. (#18851) Reviewers: Andrew Schofield --- .../internals/ShareConsumeRequestManager.java | 161 +++++--- .../ShareConsumeRequestManagerTest.java | 371 +++++++++++++++++- 2 files changed, 474 insertions(+), 58 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java index 42087e96eef..07782ed771a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java @@ -191,8 +191,8 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi // Iterate over the session handlers to see if there are acknowledgements to be sent for partitions - // which are no longer part of the current subscription, or whose records were fetched from a - // previous leader. + // which are no longer part of the current subscription. + // We fail acknowledgements for records fetched from a previous leader. Cluster cluster = metadata.fetch(); sessionHandlers.forEach((nodeId, sessionHandler) -> { Node node = cluster.nodeById(nodeId); @@ -203,14 +203,20 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi Map nodeAcksFromFetchMap = fetchAcknowledgementsToSend.get(nodeId); if (nodeAcksFromFetchMap != null) { nodeAcksFromFetchMap.forEach((tip, acks) -> { - metricsManager.recordAcknowledgementSent(acks.size()); - fetchAcknowledgementsInFlight.computeIfAbsent(node.id(), k -> new HashMap<>()).put(tip, acks); + if (!isLeaderKnownToHaveChanged(nodeId, tip)) { + metricsManager.recordAcknowledgementSent(acks.size()); + fetchAcknowledgementsInFlight.computeIfAbsent(node.id(), k -> new HashMap<>()).put(tip, acks); - sessionHandler.addPartitionToAcknowledgeOnly(tip, acks); - handlerMap.put(node, sessionHandler); + sessionHandler.addPartitionToAcknowledgeOnly(tip, acks); + handlerMap.put(node, sessionHandler); - topicNamesMap.putIfAbsent(new IdAndPartition(tip.topicId(), tip.partition()), tip.topic()); - log.debug("Added fetch request for previously subscribed partition {} to node {}", tip, nodeId); + topicNamesMap.putIfAbsent(new IdAndPartition(tip.topicId(), tip.partition()), tip.topic()); + log.debug("Added fetch request for previously subscribed partition {} to node {}", tip, nodeId); + } else { + log.debug("Leader for the partition is down or has changed, failing Acknowledgements for partition {}", tip); + acks.complete(Errors.NOT_LEADER_OR_FOLLOWER.exception()); + maybeSendShareAcknowledgeCommitCallbackEvent(Collections.singletonMap(tip, acks)); + } }); nodeAcksFromFetchMap.clear(); @@ -475,11 +481,16 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi for (TopicIdPartition tip : sessionHandler.sessionPartitions()) { NodeAcknowledgements nodeAcknowledgements = acknowledgementsMap.get(tip); if ((nodeAcknowledgements != null) && (nodeAcknowledgements.nodeId() == node.id())) { - acknowledgementsMapForNode.put(tip, nodeAcknowledgements.acknowledgements()); + if (!isLeaderKnownToHaveChanged(node.id(), tip)) { + acknowledgementsMapForNode.put(tip, nodeAcknowledgements.acknowledgements()); - metricsManager.recordAcknowledgementSent(nodeAcknowledgements.acknowledgements().size()); - log.debug("Added sync acknowledge request for partition {} to node {}", tip.topicPartition(), node.id()); - resultCount.incrementAndGet(); + metricsManager.recordAcknowledgementSent(nodeAcknowledgements.acknowledgements().size()); + log.debug("Added sync acknowledge request for partition {} to node {}", tip.topicPartition(), node.id()); + resultCount.incrementAndGet(); + } else { + nodeAcknowledgements.acknowledgements().complete(Errors.NOT_LEADER_OR_FOLLOWER.exception()); + maybeSendShareAcknowledgeCommitCallbackEvent(Collections.singletonMap(tip, nodeAcknowledgements.acknowledgements())); + } } } @@ -523,29 +534,34 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi for (TopicIdPartition tip : sessionHandler.sessionPartitions()) { NodeAcknowledgements nodeAcknowledgements = acknowledgementsMap.get(tip); if ((nodeAcknowledgements != null) && (nodeAcknowledgements.nodeId() == node.id())) { - Acknowledgements acknowledgements = nodeAcknowledgements.acknowledgements(); - acknowledgementsMapForNode.put(tip, acknowledgements); + if (!isLeaderKnownToHaveChanged(node.id(), tip)) { + Acknowledgements acknowledgements = nodeAcknowledgements.acknowledgements(); + acknowledgementsMapForNode.put(tip, acknowledgements); - metricsManager.recordAcknowledgementSent(acknowledgements.size()); - log.debug("Added async acknowledge request for partition {} to node {}", tip.topicPartition(), node.id()); - AcknowledgeRequestState asyncRequestState = acknowledgeRequestStates.get(nodeId).getAsyncRequest(); - if (asyncRequestState == null) { - acknowledgeRequestStates.get(nodeId).setAsyncRequest(new AcknowledgeRequestState(logContext, - ShareConsumeRequestManager.class.getSimpleName() + ":2", - Long.MAX_VALUE, - retryBackoffMs, - retryBackoffMaxMs, - sessionHandler, - nodeId, - acknowledgementsMapForNode, - resultHandler, - AcknowledgeRequestType.COMMIT_ASYNC - )); - } else { - Acknowledgements prevAcks = asyncRequestState.acknowledgementsToSend.putIfAbsent(tip, acknowledgements); - if (prevAcks != null) { - asyncRequestState.acknowledgementsToSend.get(tip).merge(acknowledgements); + metricsManager.recordAcknowledgementSent(acknowledgements.size()); + log.debug("Added async acknowledge request for partition {} to node {}", tip.topicPartition(), node.id()); + AcknowledgeRequestState asyncRequestState = acknowledgeRequestStates.get(nodeId).getAsyncRequest(); + if (asyncRequestState == null) { + acknowledgeRequestStates.get(nodeId).setAsyncRequest(new AcknowledgeRequestState(logContext, + ShareConsumeRequestManager.class.getSimpleName() + ":2", + Long.MAX_VALUE, + retryBackoffMs, + retryBackoffMaxMs, + sessionHandler, + nodeId, + acknowledgementsMapForNode, + resultHandler, + AcknowledgeRequestType.COMMIT_ASYNC + )); + } else { + Acknowledgements prevAcks = asyncRequestState.acknowledgementsToSend.putIfAbsent(tip, acknowledgements); + if (prevAcks != null) { + asyncRequestState.acknowledgementsToSend.get(tip).merge(acknowledgements); + } } + } else { + nodeAcknowledgements.acknowledgements().complete(Errors.NOT_LEADER_OR_FOLLOWER.exception()); + maybeSendShareAcknowledgeCommitCallbackEvent(Collections.singletonMap(tip, nodeAcknowledgements.acknowledgements())); } } } @@ -572,40 +588,57 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi final ResultHandler resultHandler = new ResultHandler(resultCount, Optional.empty()); closing = true; + Map> acknowledgementsMapAllNodes = new HashMap<>(); + + acknowledgementsMap.forEach((tip, nodeAcks) -> { + if (!isLeaderKnownToHaveChanged(nodeAcks.nodeId(), tip)) { + Map acksMap = acknowledgementsMapAllNodes.computeIfAbsent(nodeAcks.nodeId(), k -> new HashMap<>()); + Acknowledgements prevAcks = acksMap.putIfAbsent(tip, nodeAcks.acknowledgements()); + if (prevAcks != null) { + acksMap.get(tip).merge(nodeAcks.acknowledgements()); + } + } else { + nodeAcks.acknowledgements().complete(Errors.NOT_LEADER_OR_FOLLOWER.exception()); + maybeSendShareAcknowledgeCommitCallbackEvent(Collections.singletonMap(tip, nodeAcks.acknowledgements())); + } + }); sessionHandlers.forEach((nodeId, sessionHandler) -> { Node node = cluster.nodeById(nodeId); if (node != null) { - Map acknowledgementsMapForNode = new HashMap<>(); - - acknowledgementsMap.forEach((tip, nodeAcks) -> { - Acknowledgements acknowledgements = Acknowledgements.empty(); - Map nodeAcksFromFetchMap = fetchAcknowledgementsToSend.get(nodeId); - if (nodeAcksFromFetchMap != null) { - Acknowledgements acksFromFetchMap = nodeAcksFromFetchMap.remove(tip); - if (acksFromFetchMap != null) { - acknowledgements.merge(acksFromFetchMap); + //Add any waiting piggyback acknowledgements for the node. + Map fetchAcks = fetchAcknowledgementsToSend.remove(nodeId); + if (fetchAcks != null) { + fetchAcks.forEach((tip, acks) -> { + if (!isLeaderKnownToHaveChanged(nodeId, tip)) { + Map acksMap = acknowledgementsMapAllNodes.computeIfAbsent(nodeId, k -> new HashMap<>()); + Acknowledgements prevAcks = acksMap.putIfAbsent(tip, acks); + if (prevAcks != null) { + acksMap.get(tip).merge(acks); + } + } else { + acks.complete(Errors.NOT_LEADER_OR_FOLLOWER.exception()); + maybeSendShareAcknowledgeCommitCallbackEvent(Collections.singletonMap(tip, acks)); } - } - - if (nodeAcks.nodeId() == node.id()) { - acknowledgements.merge(nodeAcks.acknowledgements()); - } - - if (!acknowledgements.isEmpty()) { - acknowledgementsMapForNode.put(tip, acknowledgements); + }); + } + Map acknowledgementsMapForNode = acknowledgementsMapAllNodes.get(nodeId); + if (acknowledgementsMapForNode != null) { + acknowledgementsMapForNode.forEach((tip, acknowledgements) -> { metricsManager.recordAcknowledgementSent(acknowledgements.size()); log.debug("Added closing acknowledge request for partition {} to node {}", tip.topicPartition(), node.id()); resultCount.incrementAndGet(); - } - }); + }); + } else { + acknowledgementsMapForNode = new HashMap<>(); + } acknowledgeRequestStates.putIfAbsent(nodeId, new Tuple<>(null, null, null)); // Ensure there is no close() request already present as they are blocking calls // and only one request can be active at a time. - if (acknowledgeRequestStates.get(nodeId).getCloseRequest() != null && !acknowledgeRequestStates.get(nodeId).getCloseRequest().isEmpty()) { + if (acknowledgeRequestStates.get(nodeId).getCloseRequest() != null && isRequestStateInProgress(acknowledgeRequestStates.get(nodeId).getCloseRequest())) { log.error("Attempt to call close() when there is an existing close request for node {}-{}", node.id(), acknowledgeRequestStates.get(nodeId).getSyncRequestQueue()); closeFuture.completeExceptionally( new IllegalStateException("Attempt to call close() when there is an existing close request for node : " + node.id())); @@ -630,6 +663,28 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi return closeFuture; } + /** + * The method checks whether the leader for a topicIdPartition has changed. + * @param nodeId The previous leader for the partition. + * @param topicIdPartition The TopicIdPartition to check. + * @return Returns true if leader information is available and leader has changed. + * If the leader information is not available or if the leader has not changed, it returns false. + */ + private boolean isLeaderKnownToHaveChanged(int nodeId, TopicIdPartition topicIdPartition) { + Optional leaderNode = metadata.currentLeader(topicIdPartition.topicPartition()).leader; + if (leaderNode.isPresent()) { + if (leaderNode.get().id() != nodeId) { + log.debug("Node {} is no longer the leader for partition {}, failing acknowledgements", nodeId, topicIdPartition); + return true; + } + } else { + log.debug("No leader found for partition {}", topicIdPartition); + metadata.requestUpdate(false); + return false; + } + return false; + } + private void handleShareFetchSuccess(Node fetchTarget, @SuppressWarnings("unused") ShareFetchRequestData requestData, ClientResponse resp) { diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java index ede3f5415fc..999e7bbe0cc 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java @@ -848,13 +848,18 @@ public class ShareConsumeRequestManagerTest { acknowledgements.add(2L, AcknowledgeType.ACCEPT); acknowledgements.add(3L, AcknowledgeType.REJECT); - assignFromSubscribed(singleton(tp1)); + subscriptions.subscribeToShareGroup(Collections.singleton(topicName2)); + subscriptions.assignFromSubscribed(Collections.singleton(t2p0)); + + client.updateMetadata( + RequestTestUtils.metadataUpdateWithIds(1, singletonMap(topicName2, 1), + tp -> validLeaderEpoch, topicIds, false)); shareConsumeRequestManager.commitAsync(Collections.singletonMap(tip0, new NodeAcknowledgements(0, acknowledgements))); assertEquals(1, shareConsumeRequestManager.sendAcknowledgements()); - client.prepareResponse(fullAcknowledgeResponse(tip1, Errors.NONE)); + client.prepareResponse(fullAcknowledgeResponse(t2ip0, Errors.NONE)); networkClientDelegate.poll(time.timer(0)); // We should send a fetch to the newly subscribed partition. @@ -862,6 +867,80 @@ public class ShareConsumeRequestManagerTest { } + @Test + public void testCommitSyncWithSubscriptionChange() { + buildRequestManager(); + + assignFromSubscribed(singleton(tp0)); + + assertEquals(1, sendFetches()); + assertFalse(shareConsumeRequestManager.hasCompletedFetches()); + + client.prepareResponse(fullFetchResponse(tip0, records, acquiredRecords, Errors.NONE)); + networkClientDelegate.poll(time.timer(0)); + assertTrue(shareConsumeRequestManager.hasCompletedFetches()); + + Acknowledgements acknowledgements = Acknowledgements.empty(); + acknowledgements.add(1L, AcknowledgeType.ACCEPT); + acknowledgements.add(2L, AcknowledgeType.ACCEPT); + acknowledgements.add(3L, AcknowledgeType.REJECT); + + subscriptions.subscribeToShareGroup(Collections.singleton(topicName2)); + subscriptions.assignFromSubscribed(Collections.singleton(t2p0)); + + client.updateMetadata( + RequestTestUtils.metadataUpdateWithIds(1, singletonMap(topicName2, 1), + tp -> validLeaderEpoch, topicIds, false)); + + shareConsumeRequestManager.commitSync(Collections.singletonMap(tip0, new NodeAcknowledgements(0, acknowledgements)), + calculateDeadlineMs(time.timer(100))); + + assertEquals(1, shareConsumeRequestManager.sendAcknowledgements()); + + client.prepareResponse(fullAcknowledgeResponse(t2ip0, Errors.NONE)); + networkClientDelegate.poll(time.timer(0)); + + // We should send a fetch to the newly subscribed partition. + assertEquals(1, sendFetches()); + } + + @Test + public void testCloseWithSubscriptionChange() { + buildRequestManager(); + + assignFromSubscribed(singleton(tp0)); + + assertEquals(1, sendFetches()); + assertFalse(shareConsumeRequestManager.hasCompletedFetches()); + + client.prepareResponse(fullFetchResponse(tip0, records, acquiredRecords, Errors.NONE)); + networkClientDelegate.poll(time.timer(0)); + assertTrue(shareConsumeRequestManager.hasCompletedFetches()); + + Acknowledgements acknowledgements = Acknowledgements.empty(); + acknowledgements.add(1L, AcknowledgeType.ACCEPT); + acknowledgements.add(2L, AcknowledgeType.ACCEPT); + acknowledgements.add(3L, AcknowledgeType.REJECT); + + subscriptions.subscribeToShareGroup(Collections.singleton(topicName2)); + subscriptions.assignFromSubscribed(Collections.singleton(t2p0)); + + client.updateMetadata( + RequestTestUtils.metadataUpdateWithIds(1, singletonMap(topicName2, 1), + tp -> validLeaderEpoch, topicIds, false)); + + shareConsumeRequestManager.acknowledgeOnClose(Collections.singletonMap(tip0, new NodeAcknowledgements(0, acknowledgements)), + calculateDeadlineMs(time.timer(100))); + + assertEquals(1, shareConsumeRequestManager.sendAcknowledgements()); + + client.prepareResponse(fullAcknowledgeResponse(t2ip0, Errors.NONE)); + networkClientDelegate.poll(time.timer(0)); + + // As we are closing, we would not send any more fetches. + assertEquals(0, sendFetches()); + } + @Test public void testShareFetchWithSubscriptionChange() { buildRequestManager(); @@ -884,7 +963,12 @@ public class ShareConsumeRequestManagerTest { shareConsumeRequestManager.fetch(Collections.singletonMap(tip0, new NodeAcknowledgements(0, acknowledgements))); fetchRecords(); // Subscription changes. - assignFromSubscribed(singleton(tp1)); + subscriptions.subscribeToShareGroup(Collections.singleton(topicName2)); + subscriptions.assignFromSubscribed(Collections.singleton(t2p0)); + + client.updateMetadata( + RequestTestUtils.metadataUpdateWithIds(1, singletonMap(topicName2, 1), + tp -> validLeaderEpoch, topicIds, false)); assertEquals(1, sendFetches()); assertFalse(shareConsumeRequestManager.hasCompletedFetches()); @@ -1738,6 +1822,7 @@ public class ShareConsumeRequestManagerTest { @EnumSource(value = Errors.class, names = {"FENCED_LEADER_EPOCH", "NOT_LEADER_OR_FOLLOWER"}) public void testWhenLeadershipChangeBetweenShareFetchRequests(Errors error) { buildRequestManager(); + shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true); subscriptions.subscribeToShareGroup(Collections.singleton(topicName)); Set partitions = new HashSet<>(); @@ -1796,9 +1881,11 @@ public class ShareConsumeRequestManagerTest { assertNotEquals(startingClusterMetadata, metadata.fetch()); // Even though the partitions are on the same leader, records were fetched on the previous leader. - // A fetch is sent to the previous leader to remove the partition from the share session and get the acknowledge error code. - assertEquals(2, sendFetches()); + // We do not send those acknowledgements to the previous leader, we fail them with NOT_LEADER_OR_FOLLOWER exception. + assertEquals(1, sendFetches()); assertFalse(shareConsumeRequestManager.hasCompletedFetches()); + assertEquals(acknowledgements, completedAcknowledgements.get(0).get(tip0)); + assertEquals(Errors.NOT_LEADER_OR_FOLLOWER.exception(), completedAcknowledgements.get(0).get(tip0).getAcknowledgeException()); partitionData.clear(); partitionData.put(tip0, @@ -1835,6 +1922,280 @@ public class ShareConsumeRequestManagerTest { assertEquals(1, fetchedRecords.size()); } + @Test + void testLeadershipChangeAfterFetchBeforeCommitAsync() { + buildRequestManager(); + shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true); + + subscriptions.subscribeToShareGroup(Collections.singleton(topicName)); + Set partitions = new HashSet<>(); + partitions.add(tp0); + partitions.add(tp1); + subscriptions.assignFromSubscribed(partitions); + + client.updateMetadata( + RequestTestUtils.metadataUpdateWithIds(2, singletonMap(topicName, 2), + tp -> validLeaderEpoch, topicIds, false)); + Node nodeId0 = metadata.fetch().nodeById(0); + Node nodeId1 = metadata.fetch().nodeById(1); + + Cluster startingClusterMetadata = metadata.fetch(); + assertFalse(metadata.updateRequested()); + + assertEquals(2, sendFetches()); + assertFalse(shareConsumeRequestManager.hasCompletedFetches()); + + LinkedHashMap partitionData = new LinkedHashMap<>(); + partitionData.put(tip0, + new ShareFetchResponseData.PartitionData() + .setPartitionIndex(tip0.topicPartition().partition()) + .setErrorCode(Errors.NONE.code()) + .setRecords(records) + .setAcquiredRecords(ShareCompletedFetchTest.acquiredRecords(1L, 1)) + .setAcknowledgeErrorCode(Errors.NONE.code())); + client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, partitionData, Collections.emptyList()), nodeId0); + partitionData.clear(); + partitionData.put(tip1, + new ShareFetchResponseData.PartitionData() + .setPartitionIndex(tip1.topicPartition().partition()) + .setErrorCode(Errors.NONE.code()) + .setRecords(records) + .setAcquiredRecords(ShareCompletedFetchTest.acquiredRecords(1L, 2)) + .setAcknowledgeErrorCode(Errors.NONE.code())); + client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, partitionData, Collections.emptyList()), nodeId1); + networkClientDelegate.poll(time.timer(0)); + assertTrue(shareConsumeRequestManager.hasCompletedFetches()); + + Map>> partitionRecords = fetchRecords(); + assertTrue(partitionRecords.containsKey(tp0)); + assertTrue(partitionRecords.containsKey(tp1)); + + List> fetchedRecords = partitionRecords.get(tp0); + assertEquals(1, fetchedRecords.size()); + + fetchedRecords = partitionRecords.get(tp1); + assertEquals(2, fetchedRecords.size()); + + Acknowledgements acknowledgementsTp0 = Acknowledgements.empty(); + acknowledgementsTp0.add(1L, AcknowledgeType.ACCEPT); + + Acknowledgements acknowledgementsTp1 = Acknowledgements.empty(); + acknowledgementsTp1.add(1L, AcknowledgeType.ACCEPT); + acknowledgementsTp1.add(2L, AcknowledgeType.ACCEPT); + + Map commitAcks = new HashMap<>(); + commitAcks.put(tip0, new NodeAcknowledgements(0, acknowledgementsTp0)); + commitAcks.put(tip1, new NodeAcknowledgements(1, acknowledgementsTp1)); + + // Move the leadership of tp0 onto node 1 + HashMap partitionLeaders = new HashMap<>(); + partitionLeaders.put(tp0, new Metadata.LeaderIdAndEpoch(Optional.of(nodeId1.id()), Optional.of(validLeaderEpoch + 1))); + metadata.updatePartitionLeadership(partitionLeaders, List.of()); + + assertNotEquals(startingClusterMetadata, metadata.fetch()); + + // We fail the acknowledgements for records which were received from node0 with NOT_LEADER_OR_FOLLOWER exception. + shareConsumeRequestManager.commitAsync(commitAcks); + assertEquals(1, completedAcknowledgements.get(0).size()); + assertEquals(acknowledgementsTp0, completedAcknowledgements.get(0).get(tip0)); + assertEquals(Errors.NOT_LEADER_OR_FOLLOWER.exception(), completedAcknowledgements.get(0).get(tip0).getAcknowledgeException()); + + // We only send acknowledgements for tip1 to node1. + assertEquals(1, shareConsumeRequestManager.sendAcknowledgements()); + + client.prepareResponse(fullAcknowledgeResponse(tip1, Errors.NONE)); + networkClientDelegate.poll(time.timer(0)); + + assertEquals(1, completedAcknowledgements.get(1).size()); + assertEquals(acknowledgementsTp1, completedAcknowledgements.get(1).get(tip1)); + assertNull(completedAcknowledgements.get(1).get(tip1).getAcknowledgeException()); + + } + + @Test + void testLeadershipChangeAfterFetchBeforeCommitSync() { + buildRequestManager(); + shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true); + + subscriptions.subscribeToShareGroup(Collections.singleton(topicName)); + Set partitions = new HashSet<>(); + partitions.add(tp0); + partitions.add(tp1); + subscriptions.assignFromSubscribed(partitions); + + client.updateMetadata( + RequestTestUtils.metadataUpdateWithIds(2, singletonMap(topicName, 2), + tp -> validLeaderEpoch, topicIds, false)); + Node nodeId0 = metadata.fetch().nodeById(0); + Node nodeId1 = metadata.fetch().nodeById(1); + + Cluster startingClusterMetadata = metadata.fetch(); + assertFalse(metadata.updateRequested()); + + assertEquals(2, sendFetches()); + assertFalse(shareConsumeRequestManager.hasCompletedFetches()); + + LinkedHashMap partitionData = new LinkedHashMap<>(); + partitionData.put(tip0, + new ShareFetchResponseData.PartitionData() + .setPartitionIndex(tip0.topicPartition().partition()) + .setErrorCode(Errors.NONE.code()) + .setRecords(records) + .setAcquiredRecords(ShareCompletedFetchTest.acquiredRecords(1L, 1)) + .setAcknowledgeErrorCode(Errors.NONE.code())); + client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, partitionData, Collections.emptyList()), nodeId0); + partitionData.clear(); + partitionData.put(tip1, + new ShareFetchResponseData.PartitionData() + .setPartitionIndex(tip1.topicPartition().partition()) + .setErrorCode(Errors.NONE.code()) + .setRecords(records) + .setAcquiredRecords(ShareCompletedFetchTest.acquiredRecords(1L, 2)) + .setAcknowledgeErrorCode(Errors.NONE.code())); + client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, partitionData, Collections.emptyList()), nodeId1); + networkClientDelegate.poll(time.timer(0)); + assertTrue(shareConsumeRequestManager.hasCompletedFetches()); + + Map>> partitionRecords = fetchRecords(); + assertTrue(partitionRecords.containsKey(tp0)); + assertTrue(partitionRecords.containsKey(tp1)); + + List> fetchedRecords = partitionRecords.get(tp0); + assertEquals(1, fetchedRecords.size()); + + fetchedRecords = partitionRecords.get(tp1); + assertEquals(2, fetchedRecords.size()); + + Acknowledgements acknowledgementsTp0 = Acknowledgements.empty(); + acknowledgementsTp0.add(1L, AcknowledgeType.ACCEPT); + + Acknowledgements acknowledgementsTp1 = Acknowledgements.empty(); + acknowledgementsTp1.add(1L, AcknowledgeType.ACCEPT); + acknowledgementsTp1.add(2L, AcknowledgeType.ACCEPT); + + Map commitAcks = new HashMap<>(); + commitAcks.put(tip0, new NodeAcknowledgements(0, acknowledgementsTp0)); + commitAcks.put(tip1, new NodeAcknowledgements(1, acknowledgementsTp1)); + + // Move the leadership of tp0 onto node 1 + HashMap partitionLeaders = new HashMap<>(); + partitionLeaders.put(tp0, new Metadata.LeaderIdAndEpoch(Optional.of(nodeId1.id()), Optional.of(validLeaderEpoch + 1))); + metadata.updatePartitionLeadership(partitionLeaders, List.of()); + + assertNotEquals(startingClusterMetadata, metadata.fetch()); + + // We fail the acknowledgements for records which were received from node0 with NOT_LEADER_OR_FOLLOWER exception. + shareConsumeRequestManager.commitSync(commitAcks, calculateDeadlineMs(time.timer(100))); + + // Verify if the callback was invoked with the failed acknowledgements. + assertEquals(1, completedAcknowledgements.get(0).size()); + assertEquals(acknowledgementsTp0, completedAcknowledgements.get(0).get(tip0)); + assertEquals(Errors.NOT_LEADER_OR_FOLLOWER.exception(), completedAcknowledgements.get(0).get(tip0).getAcknowledgeException()); + + // We only send acknowledgements for tip1 to node1. + assertEquals(1, shareConsumeRequestManager.sendAcknowledgements()); + + client.prepareResponse(fullAcknowledgeResponse(tip1, Errors.NONE)); + networkClientDelegate.poll(time.timer(0)); + + assertEquals(1, completedAcknowledgements.get(1).size()); + assertEquals(acknowledgementsTp1, completedAcknowledgements.get(1).get(tip1)); + assertNull(completedAcknowledgements.get(1).get(tip1).getAcknowledgeException()); + } + + @Test + void testLeadershipChangeAfterFetchBeforeClose() { + buildRequestManager(); + shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true); + + subscriptions.subscribeToShareGroup(Collections.singleton(topicName)); + Set partitions = new HashSet<>(); + partitions.add(tp0); + partitions.add(tp1); + subscriptions.assignFromSubscribed(partitions); + + client.updateMetadata( + RequestTestUtils.metadataUpdateWithIds(2, singletonMap(topicName, 2), + tp -> validLeaderEpoch, topicIds, false)); + Node nodeId0 = metadata.fetch().nodeById(0); + Node nodeId1 = metadata.fetch().nodeById(1); + + Cluster startingClusterMetadata = metadata.fetch(); + assertFalse(metadata.updateRequested()); + + assertEquals(2, sendFetches()); + assertFalse(shareConsumeRequestManager.hasCompletedFetches()); + + LinkedHashMap partitionData = new LinkedHashMap<>(); + partitionData.put(tip0, + new ShareFetchResponseData.PartitionData() + .setPartitionIndex(tip0.topicPartition().partition()) + .setErrorCode(Errors.NONE.code()) + .setRecords(records) + .setAcquiredRecords(ShareCompletedFetchTest.acquiredRecords(1L, 1)) + .setAcknowledgeErrorCode(Errors.NONE.code())); + client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, partitionData, Collections.emptyList()), nodeId0); + partitionData.clear(); + partitionData.put(tip1, + new ShareFetchResponseData.PartitionData() + .setPartitionIndex(tip1.topicPartition().partition()) + .setErrorCode(Errors.NONE.code()) + .setRecords(records) + .setAcquiredRecords(ShareCompletedFetchTest.acquiredRecords(1L, 2)) + .setAcknowledgeErrorCode(Errors.NONE.code())); + client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, partitionData, Collections.emptyList()), nodeId1); + networkClientDelegate.poll(time.timer(0)); + assertTrue(shareConsumeRequestManager.hasCompletedFetches()); + + Map>> partitionRecords = fetchRecords(); + assertTrue(partitionRecords.containsKey(tp0)); + assertTrue(partitionRecords.containsKey(tp1)); + + List> fetchedRecords = partitionRecords.get(tp0); + assertEquals(1, fetchedRecords.size()); + + fetchedRecords = partitionRecords.get(tp1); + assertEquals(2, fetchedRecords.size()); + + Acknowledgements acknowledgementsTp0 = Acknowledgements.empty(); + acknowledgementsTp0.add(1L, AcknowledgeType.ACCEPT); + + Acknowledgements acknowledgementsTp1 = Acknowledgements.empty(); + acknowledgementsTp1.add(1L, AcknowledgeType.ACCEPT); + acknowledgementsTp1.add(2L, AcknowledgeType.ACCEPT); + + shareConsumeRequestManager.fetch(Collections.singletonMap(tip1, new NodeAcknowledgements(1, acknowledgementsTp1))); + + // Move the leadership of tp0 onto node 1 + HashMap partitionLeaders = new HashMap<>(); + partitionLeaders.put(tp0, new Metadata.LeaderIdAndEpoch(Optional.of(nodeId1.id()), Optional.of(validLeaderEpoch + 1))); + metadata.updatePartitionLeadership(partitionLeaders, List.of()); + + assertNotEquals(startingClusterMetadata, metadata.fetch()); + + // We fail the acknowledgements for records which were received from node0 with NOT_LEADER_OR_FOLLOWER exception. + shareConsumeRequestManager.acknowledgeOnClose(Collections.singletonMap(tip0, new NodeAcknowledgements(0, acknowledgementsTp0)), calculateDeadlineMs(time.timer(100))); + + // Verify if the callback was invoked with the failed acknowledgements. + assertEquals(1, completedAcknowledgements.get(0).size()); + assertEquals(acknowledgementsTp0.getAcknowledgementsTypeMap(), completedAcknowledgements.get(0).get(tip0).getAcknowledgementsTypeMap()); + assertEquals(Errors.NOT_LEADER_OR_FOLLOWER.exception(), completedAcknowledgements.get(0).get(tip0).getAcknowledgeException()); + completedAcknowledgements.clear(); + + // As we are closing, we still send the request to both the nodes, but with empty acknowledgements to node0, as it is no longer the leader. + assertEquals(2, shareConsumeRequestManager.sendAcknowledgements()); + + client.prepareResponseFrom(fullAcknowledgeResponse(tip1, Errors.NONE), nodeId1); + networkClientDelegate.poll(time.timer(0)); + + client.prepareResponseFrom(emptyAcknowledgeResponse(), nodeId0); + networkClientDelegate.poll(time.timer(0)); + + assertEquals(1, completedAcknowledgements.get(0).size()); + assertEquals(acknowledgementsTp1, completedAcknowledgements.get(0).get(tip1)); + assertNull(completedAcknowledgements.get(0).get(tip1).getAcknowledgeException()); + } + @Test void testWhenLeadershipChangedAfterDisconnected() { buildRequestManager();