diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/AcknowledgementCommitCallback.java b/clients/src/main/java/org/apache/kafka/clients/consumer/AcknowledgementCommitCallback.java index f37fbe05750..c84f9c78d0d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/AcknowledgementCommitCallback.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/AcknowledgementCommitCallback.java @@ -20,8 +20,10 @@ import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.annotation.InterfaceStability; import org.apache.kafka.common.errors.AuthorizationException; +import org.apache.kafka.common.errors.DisconnectException; import org.apache.kafka.common.errors.InterruptException; import org.apache.kafka.common.errors.InvalidRecordStateException; +import org.apache.kafka.common.errors.NotLeaderOrFollowerException; import org.apache.kafka.common.errors.WakeupException; import java.util.Map; @@ -42,12 +44,16 @@ public interface AcknowledgementCommitCallback { * * @param exception The exception thrown during processing of the request, or null if the acknowledgement completed successfully. *

+ *

Note that even if the exception is a retriable exception, the acknowledgement could not be completed and the + * records need to be fetched again. The callback is called after any retries have been performed. */ void onComplete(Map> offsets, Exception exception); } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AcknowledgementCommitCallbackHandler.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AcknowledgementCommitCallbackHandler.java index 794be0a67b3..b746e1a2135 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AcknowledgementCommitCallbackHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AcknowledgementCommitCallbackHandler.java @@ -17,6 +17,7 @@ package org.apache.kafka.clients.consumer.internals; import org.apache.kafka.clients.consumer.AcknowledgementCommitCallback; +import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicIdPartition; import org.slf4j.Logger; @@ -45,10 +46,7 @@ public class AcknowledgementCommitCallbackHandler { void onComplete(List> acknowledgementsMapList) { final ArrayList exceptions = new ArrayList<>(); acknowledgementsMapList.forEach(acknowledgementsMap -> acknowledgementsMap.forEach((partition, acknowledgements) -> { - Exception exception = null; - if (acknowledgements.getAcknowledgeErrorCode() != null) { - exception = acknowledgements.getAcknowledgeErrorCode().exception(); - } + KafkaException exception = acknowledgements.getAcknowledgeException(); Set offsets = acknowledgements.getAcknowledgementsTypeMap().keySet(); Set offsetsCopy = Collections.unmodifiableSet(offsets); enteredCallback = true; diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Acknowledgements.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Acknowledgements.java index 410f8478d4c..8d3fab23587 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Acknowledgements.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Acknowledgements.java @@ -17,6 +17,7 @@ package org.apache.kafka.clients.consumer.internals; import org.apache.kafka.clients.consumer.AcknowledgeType; +import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.protocol.Errors; import java.util.ArrayList; @@ -35,8 +36,11 @@ public class Acknowledgements { // The acknowledgements keyed by offset. If the record is a gap, the AcknowledgeType will be null. private final Map acknowledgements; - // When the broker responds to the acknowledgements, this is the error code returned. - private Errors acknowledgeErrorCode; + // When the broker responds to the acknowledgements, this is the exception thrown. + private KafkaException acknowledgeException; + + // Set when the broker has responded to the acknowledgements. + private boolean completed; public static Acknowledgements empty() { return new Acknowledgements(new TreeMap<>()); @@ -44,6 +48,8 @@ public class Acknowledgements { private Acknowledgements(Map acknowledgements) { this.acknowledgements = acknowledgements; + this.acknowledgeException = null; + this.completed = false; } /** @@ -115,25 +121,26 @@ public class Acknowledgements { * @return Whether the acknowledgements were sent to the broker and a response received */ public boolean isCompleted() { - return acknowledgeErrorCode != null; + return completed; } /** - * Set the acknowledgement error code when the response has been received from the broker. + * Completes the acknowledgements when the response has been received from the broker. * - * @param acknowledgeErrorCode the error code + * @param acknowledgeException the exception (will be null if successful) */ - public void setAcknowledgeErrorCode(Errors acknowledgeErrorCode) { - this.acknowledgeErrorCode = acknowledgeErrorCode; + public void complete(KafkaException acknowledgeException) { + this.acknowledgeException = acknowledgeException; + completed = true; } /** - * Get the acknowledgement error code when the response has been received from the broker. + * Get the acknowledgement exception when the response has been received from the broker. * * @return the error code */ - public Errors getAcknowledgeErrorCode() { - return acknowledgeErrorCode; + public KafkaException getAcknowledgeException() { + return acknowledgeException; } /** @@ -301,10 +308,10 @@ public class Acknowledgements { public String toString() { StringBuilder sb = new StringBuilder("Acknowledgements("); sb.append(acknowledgements); - if (acknowledgeErrorCode != null) { - sb.append(", errorCode="); - sb.append(acknowledgeErrorCode.code()); - } + sb.append(", acknowledgeException="); + sb.append(acknowledgeException != null ? Errors.forException(acknowledgeException) : "null"); + sb.append(", completed="); + sb.append(completed); sb.append(")"); return sb.toString(); } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NodeAcknowledgements.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NodeAcknowledgements.java new file mode 100644 index 00000000000..b0eb6066dc4 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NodeAcknowledgements.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import java.util.Objects; + +/** + * This class combines Acknowledgements with the id of the node to use for acknowledging. + */ +public class NodeAcknowledgements { + private final int nodeId; + private final Acknowledgements acknowledgements; + + public NodeAcknowledgements(int nodeId, Acknowledgements acknowledgements) { + this.nodeId = nodeId; + this.acknowledgements = Objects.requireNonNull(acknowledgements); + } + + public int nodeId() { + return nodeId; + } + + public Acknowledgements acknowledgements() { + return acknowledgements; + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetch.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetch.java index 838416b8428..83bae92d48a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetch.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetch.java @@ -55,7 +55,7 @@ import java.util.Optional; * to keep track of aborted transactions or the need to keep track of fetch position. */ public class ShareCompletedFetch { - + final int nodeId; final TopicIdPartition partition; final ShareFetchResponseData.PartitionData partitionData; final short requestVersion; @@ -79,12 +79,14 @@ public class ShareCompletedFetch { ShareCompletedFetch(final LogContext logContext, final BufferSupplier decompressionBufferSupplier, + final int nodeId, final TopicIdPartition partition, final ShareFetchResponseData.PartitionData partitionData, final ShareFetchMetricsAggregator metricAggregator, final short requestVersion) { this.log = logContext.logger(org.apache.kafka.clients.consumer.internals.ShareCompletedFetch.class); this.decompressionBufferSupplier = decompressionBufferSupplier; + this.nodeId = nodeId; this.partition = partition; this.partitionData = partitionData; this.metricAggregator = metricAggregator; @@ -156,7 +158,7 @@ public class ShareCompletedFetch { final int maxRecords, final boolean checkCrcs) { // Creating an empty ShareInFlightBatch - ShareInFlightBatch inFlightBatch = new ShareInFlightBatch<>(partition); + ShareInFlightBatch inFlightBatch = new ShareInFlightBatch<>(nodeId, partition); if (cachedBatchException != null) { // If the event that a CRC check fails, reject the entire record batch because it is corrupt. diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java index 20a022cb6ca..42087e96eef 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java @@ -23,6 +23,7 @@ import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentR import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler; import org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementCommitCallbackEvent; import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.TopicPartition; @@ -87,8 +88,8 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi private final IdempotentCloser idempotentCloser = new IdempotentCloser(); private Uuid memberId; private boolean fetchMoreRecords = false; - private final Map fetchAcknowledgementsToSend; - private final Map fetchAcknowledgementsInFlight; + private final Map> fetchAcknowledgementsToSend; + private final Map> fetchAcknowledgementsInFlight; private final Map> acknowledgeRequestStates; private final long retryBackoffMs; private final long retryBackoffMaxMs; @@ -146,7 +147,6 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi Map handlerMap = new HashMap<>(); Map topicIds = metadata.topicIds(); - Set fetchedPartitions = new HashSet<>(); for (TopicPartition partition : partitionsToFetch()) { Optional leaderOpt = metadata.currentLeader(partition).leader; @@ -172,73 +172,60 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi k -> sessionHandlers.computeIfAbsent(node.id(), n -> new ShareSessionHandler(logContext, n, memberId))); TopicIdPartition tip = new TopicIdPartition(topicId, partition); - Acknowledgements acknowledgementsToSend = fetchAcknowledgementsToSend.remove(tip); - if (acknowledgementsToSend != null) { - metricsManager.recordAcknowledgementSent(acknowledgementsToSend.size()); - fetchAcknowledgementsInFlight.put(tip, acknowledgementsToSend); + Acknowledgements acknowledgementsToSend = null; + Map nodeAcksFromFetchMap = fetchAcknowledgementsToSend.get(node.id()); + if (nodeAcksFromFetchMap != null) { + acknowledgementsToSend = nodeAcksFromFetchMap.remove(tip); + if (acknowledgementsToSend != null) { + metricsManager.recordAcknowledgementSent(acknowledgementsToSend.size()); + fetchAcknowledgementsInFlight.computeIfAbsent(node.id(), k -> new HashMap<>()).put(tip, acknowledgementsToSend); + } } + handler.addPartitionToFetch(tip, acknowledgementsToSend); - fetchedPartitions.add(tip); topicNamesMap.putIfAbsent(new IdAndPartition(tip.topicId(), tip.partition()), tip.topic()); log.debug("Added fetch request for partition {} to node {}", tip, node.id()); } } - // Map storing the list of partitions to forget in the upcoming request. - Map> partitionsToForgetMap = new HashMap<>(); + + // Iterate over the session handlers to see if there are acknowledgements to be sent for partitions + // which are no longer part of the current subscription, or whose records were fetched from a + // previous leader. Cluster cluster = metadata.fetch(); - // Iterating over the session handlers to see if there are acknowledgements to be sent for partitions - // which are no longer part of the current subscription. sessionHandlers.forEach((nodeId, sessionHandler) -> { Node node = cluster.nodeById(nodeId); if (node != null) { if (nodesWithPendingRequests.contains(node.id())) { - log.trace("Skipping fetch because previous fetch request to {} has not been processed", node.id()); + log.trace("Skipping fetch because previous fetch request to {} has not been processed", nodeId); } else { - for (TopicIdPartition tip : sessionHandler.sessionPartitions()) { - if (!fetchedPartitions.contains(tip)) { - Acknowledgements acknowledgementsToSend = fetchAcknowledgementsToSend.remove(tip); + Map nodeAcksFromFetchMap = fetchAcknowledgementsToSend.get(nodeId); + if (nodeAcksFromFetchMap != null) { + nodeAcksFromFetchMap.forEach((tip, acks) -> { + metricsManager.recordAcknowledgementSent(acks.size()); + fetchAcknowledgementsInFlight.computeIfAbsent(node.id(), k -> new HashMap<>()).put(tip, acks); - if (acknowledgementsToSend != null) { - metricsManager.recordAcknowledgementSent(acknowledgementsToSend.size()); - fetchAcknowledgementsInFlight.put(tip, acknowledgementsToSend); + sessionHandler.addPartitionToAcknowledgeOnly(tip, acks); + handlerMap.put(node, sessionHandler); - sessionHandler.addPartitionToFetch(tip, acknowledgementsToSend); - handlerMap.put(node, sessionHandler); + topicNamesMap.putIfAbsent(new IdAndPartition(tip.topicId(), tip.partition()), tip.topic()); + log.debug("Added fetch request for previously subscribed partition {} to node {}", tip, nodeId); + }); - partitionsToForgetMap.putIfAbsent(node, new ArrayList<>()); - partitionsToForgetMap.get(node).add(tip); - - topicNamesMap.putIfAbsent(new IdAndPartition(tip.topicId(), tip.partition()), tip.topic()); - fetchedPartitions.add(tip); - log.debug("Added fetch request for previously subscribed partition {} to node {}", tip, node.id()); - } - } + nodeAcksFromFetchMap.clear(); } } } }); - Map builderMap = new LinkedHashMap<>(); - for (Map.Entry entry : handlerMap.entrySet()) { - ShareFetchRequest.Builder builder = entry.getValue().newShareFetchBuilder(groupId, fetchConfig); - Node node = entry.getKey(); - - if (partitionsToForgetMap.containsKey(node)) { - if (builder.data().forgottenTopicsData() == null) { - builder.data().setForgottenTopicsData(new ArrayList<>()); - } - builder.updateForgottenData(partitionsToForgetMap.get(node)); - } - - builderMap.put(node, builder); - } - - List requests = builderMap.entrySet().stream().map(entry -> { + // Iterate over the share session handlers and build a list of UnsentRequests + List requests = handlerMap.entrySet().stream().map(entry -> { Node target = entry.getKey(); + ShareSessionHandler handler = entry.getValue(); + log.trace("Building ShareFetch request to send to node {}", target.id()); - ShareFetchRequest.Builder requestBuilder = entry.getValue(); + ShareFetchRequest.Builder requestBuilder = handler.newShareFetchBuilder(groupId, fetchConfig); nodesWithPendingRequests.add(target.id()); @@ -255,14 +242,29 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi return new PollResult(requests); } - public void fetch(Map acknowledgementsMap) { + public void fetch(Map acknowledgementsMap) { if (!fetchMoreRecords) { log.debug("Fetch more data"); fetchMoreRecords = true; } // The acknowledgements sent via ShareFetch are stored in this map. - acknowledgementsMap.forEach((tip, acks) -> fetchAcknowledgementsToSend.merge(tip, acks, Acknowledgements::merge)); + acknowledgementsMap.forEach((tip, nodeAcks) -> { + int nodeId = nodeAcks.nodeId(); + Map currentNodeAcknowledgementsMap = fetchAcknowledgementsToSend.get(nodeId); + if (currentNodeAcknowledgementsMap != null) { + Acknowledgements currentAcknowledgementsForNode = currentNodeAcknowledgementsMap.get(tip); + if (currentAcknowledgementsForNode != null) { + currentAcknowledgementsForNode.merge(nodeAcks.acknowledgements()); + } else { + currentNodeAcknowledgementsMap.put(tip, nodeAcks.acknowledgements()); + } + } else { + Map nodeAcknowledgementsMap = new HashMap<>(); + nodeAcknowledgementsMap.put(tip, nodeAcks.acknowledgements()); + fetchAcknowledgementsToSend.put(nodeId, nodeAcknowledgementsMap); + } + }); } /** @@ -282,7 +284,8 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi log.trace("Skipping acknowledge request because previous request to {} has not been processed, so acks are not sent", nodeId); } else { isAsyncSent.set(false); - // First, the acknowledgements from commitAsync is sent. + + // First, the acknowledgements from commitAsync are sent. maybeBuildRequest(requestStates.getValue().getAsyncRequest(), currentTimeMs, true, isAsyncSent).ifPresent(unsentRequests::add); // Check to ensure we start processing commitSync/close only if there are no commitAsync requests left to process. @@ -305,7 +308,6 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi } } } - } PollResult pollResult = null; @@ -455,7 +457,7 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi * @return The future which completes when the acknowledgements finished */ public CompletableFuture> commitSync( - final Map acknowledgementsMap, + final Map acknowledgementsMap, final long deadlineMs) { final AtomicInteger resultCount = new AtomicInteger(); final CompletableFuture> future = new CompletableFuture<>(); @@ -471,17 +473,18 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi // Add the incoming commitSync() request to the queue. Map acknowledgementsMapForNode = new HashMap<>(); for (TopicIdPartition tip : sessionHandler.sessionPartitions()) { - Acknowledgements acknowledgements = acknowledgementsMap.get(tip); - if (acknowledgements != null) { - acknowledgementsMapForNode.put(tip, acknowledgements); + NodeAcknowledgements nodeAcknowledgements = acknowledgementsMap.get(tip); + if ((nodeAcknowledgements != null) && (nodeAcknowledgements.nodeId() == node.id())) { + acknowledgementsMapForNode.put(tip, nodeAcknowledgements.acknowledgements()); - metricsManager.recordAcknowledgementSent(acknowledgements.size()); + metricsManager.recordAcknowledgementSent(nodeAcknowledgements.acknowledgements().size()); log.debug("Added sync acknowledge request for partition {} to node {}", tip.topicPartition(), node.id()); resultCount.incrementAndGet(); } } - acknowledgeRequestStates.get(nodeId).addSyncRequest(new AcknowledgeRequestState(logContext, + if (!acknowledgementsMapForNode.isEmpty()) { + acknowledgeRequestStates.get(nodeId).addSyncRequest(new AcknowledgeRequestState(logContext, ShareConsumeRequestManager.class.getSimpleName() + ":1", deadlineMs, retryBackoffMs, @@ -491,7 +494,8 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi acknowledgementsMapForNode, resultHandler, AcknowledgeRequestType.COMMIT_SYNC - )); + )); + } } }); @@ -505,7 +509,7 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi * * @param acknowledgementsMap The acknowledgements to commit */ - public void commitAsync(final Map acknowledgementsMap) { + public void commitAsync(final Map acknowledgementsMap) { final Cluster cluster = metadata.fetch(); final ResultHandler resultHandler = new ResultHandler(Optional.empty()); @@ -517,8 +521,9 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi acknowledgeRequestStates.putIfAbsent(nodeId, new Tuple<>(null, null, null)); for (TopicIdPartition tip : sessionHandler.sessionPartitions()) { - Acknowledgements acknowledgements = acknowledgementsMap.get(tip); - if (acknowledgements != null) { + NodeAcknowledgements nodeAcknowledgements = acknowledgementsMap.get(tip); + if ((nodeAcknowledgements != null) && (nodeAcknowledgements.nodeId() == node.id())) { + Acknowledgements acknowledgements = nodeAcknowledgements.acknowledgements(); acknowledgementsMapForNode.put(tip, acknowledgements); metricsManager.recordAcknowledgementSent(acknowledgements.size()); @@ -560,7 +565,7 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi * * @return The future which completes when the acknowledgements finished */ - public CompletableFuture acknowledgeOnClose(final Map acknowledgementsMap, + public CompletableFuture acknowledgeOnClose(final Map acknowledgementsMap, final long deadlineMs) { final Cluster cluster = metadata.fetch(); final AtomicInteger resultCount = new AtomicInteger(); @@ -572,23 +577,29 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi Node node = cluster.nodeById(nodeId); if (node != null) { Map acknowledgementsMapForNode = new HashMap<>(); - for (TopicIdPartition tip : sessionHandler.sessionPartitions()) { - Acknowledgements acknowledgements = acknowledgementsMap.getOrDefault(tip, Acknowledgements.empty()); - Acknowledgements acksFromShareFetch = fetchAcknowledgementsToSend.remove(tip); - - if (acksFromShareFetch != null) { - acknowledgements.merge(acksFromShareFetch); + acknowledgementsMap.forEach((tip, nodeAcks) -> { + Acknowledgements acknowledgements = Acknowledgements.empty(); + Map nodeAcksFromFetchMap = fetchAcknowledgementsToSend.get(nodeId); + if (nodeAcksFromFetchMap != null) { + Acknowledgements acksFromFetchMap = nodeAcksFromFetchMap.remove(tip); + if (acksFromFetchMap != null) { + acknowledgements.merge(acksFromFetchMap); + } } - if (acknowledgements != null && !acknowledgements.isEmpty()) { + if (nodeAcks.nodeId() == node.id()) { + acknowledgements.merge(nodeAcks.acknowledgements()); + } + + if (!acknowledgements.isEmpty()) { acknowledgementsMapForNode.put(tip, acknowledgements); metricsManager.recordAcknowledgementSent(acknowledgements.size()); log.debug("Added closing acknowledge request for partition {} to node {}", tip.topicPartition(), node.id()); resultCount.incrementAndGet(); } - } + }); acknowledgeRequestStates.putIfAbsent(nodeId, new Tuple<>(null, null, null)); @@ -611,7 +622,6 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi resultHandler, AcknowledgeRequestType.CLOSE )); - } } }); @@ -664,14 +674,17 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi log.debug("ShareFetch for partition {} returned fetch data {}", tip, partitionData); - Acknowledgements acks = fetchAcknowledgementsInFlight.remove(tip); - if (acks != null) { - if (partitionData.acknowledgeErrorCode() != Errors.NONE.code()) { - metricsManager.recordFailedAcknowledgements(acks.size()); + Map nodeAcknowledgementsInFlight = fetchAcknowledgementsInFlight.get(fetchTarget.id()); + if (nodeAcknowledgementsInFlight != null) { + Acknowledgements acks = nodeAcknowledgementsInFlight.remove(tip); + if (acks != null) { + if (partitionData.acknowledgeErrorCode() != Errors.NONE.code()) { + metricsManager.recordFailedAcknowledgements(acks.size()); + } + acks.complete(Errors.forCode(partitionData.acknowledgeErrorCode()).exception()); + Map acksMap = Collections.singletonMap(tip, acks); + maybeSendShareAcknowledgeCommitCallbackEvent(acksMap); } - acks.setAcknowledgeErrorCode(Errors.forCode(partitionData.acknowledgeErrorCode())); - Map acksMap = Collections.singletonMap(tip, acks); - maybeSendShareAcknowledgeCommitCallbackEvent(acksMap); } Errors partitionError = Errors.forCode(partitionData.errorCode()); @@ -686,6 +699,7 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi ShareCompletedFetch completedFetch = new ShareCompletedFetch( logContext, BufferSupplier.create(), + fetchTarget.id(), tip, partitionData, shareFetchMetricsAggregator, @@ -727,12 +741,19 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi partition.partitionIndex(), metadata.topicNames().get(topic.topicId())); - Acknowledgements acks = fetchAcknowledgementsInFlight.remove(tip); - if (acks != null) { - metricsManager.recordFailedAcknowledgements(acks.size()); - acks.setAcknowledgeErrorCode(Errors.forException(error)); - Map acksMap = Collections.singletonMap(tip, acks); - maybeSendShareAcknowledgeCommitCallbackEvent(acksMap); + Map nodeAcknowledgementsInFlight = fetchAcknowledgementsInFlight.get(fetchTarget.id()); + if (nodeAcknowledgementsInFlight != null) { + Acknowledgements acks = nodeAcknowledgementsInFlight.remove(tip); + if (acks != null) { + metricsManager.recordFailedAcknowledgements(acks.size()); + if (error instanceof KafkaException) { + acks.complete((KafkaException) error); + } else { + acks.complete(Errors.UNKNOWN_SERVER_ERROR.exception()); + } + Map acksMap = Collections.singletonMap(tip, acks); + maybeSendShareAcknowledgeCommitCallbackEvent(acksMap); + } } })); } finally { @@ -1065,7 +1086,7 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi void handleAcknowledgeErrorCode(TopicIdPartition tip, Errors acknowledgeErrorCode) { Acknowledgements acks = inFlightAcknowledgements.get(tip); if (acks != null) { - acks.setAcknowledgeErrorCode(acknowledgeErrorCode); + acks.complete(acknowledgeErrorCode.exception()); } resultHandler.complete(tip, acks, onCommitAsync()); } @@ -1077,14 +1098,14 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi void handleAcknowledgeTimedOut(TopicIdPartition tip) { Acknowledgements acks = incompleteAcknowledgements.get(tip); if (acks != null) { - acks.setAcknowledgeErrorCode(Errors.REQUEST_TIMED_OUT); + acks.complete(Errors.REQUEST_TIMED_OUT.exception()); } resultHandler.complete(tip, acks, onCommitAsync()); } /** * Set the error code for all remaining acknowledgements in the event - * of a session error which prevents the remains acknowledgements from + * of a session error which prevents the remaining acknowledgements from * being sent. */ void handleSessionErrorCode(Errors errorCode) { @@ -1093,7 +1114,7 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi acknowledgementsMapToClear.forEach((tip, acks) -> { if (acks != null) { - acks.setAcknowledgeErrorCode(errorCode); + acks.complete(errorCode.exception()); } resultHandler.complete(tip, acks, onCommitAsync()); }); @@ -1174,6 +1195,7 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi if (!isCommitAsync && acknowledgements != null) { result.put(partition, acknowledgements); } + // For commitAsync, we do not wait for other results to complete, we prepare a background event // for every ShareAcknowledgeResponse. // For commitAsync, we send out a background event for every TopicIdPartition, so we use a singletonMap each time. @@ -1291,6 +1313,5 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi public String toString() { return super.toString().toLowerCase(Locale.ROOT); } - } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java index 667f2d00033..fc8d410de4a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java @@ -54,7 +54,6 @@ import org.apache.kafka.common.Metric; import org.apache.kafka.common.MetricName; import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.Uuid; -import org.apache.kafka.common.errors.ApiException; import org.apache.kafka.common.errors.InterruptException; import org.apache.kafka.common.errors.InvalidGroupIdException; import org.apache.kafka.common.errors.TimeoutException; @@ -62,7 +61,6 @@ import org.apache.kafka.common.internals.ClusterResourceListeners; import org.apache.kafka.common.metrics.KafkaMetric; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.MetricsReporter; -import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter; import org.apache.kafka.common.telemetry.internals.ClientTelemetryUtils; @@ -611,7 +609,7 @@ public class ShareConsumerImpl implements ShareConsumerDelegate { private ShareFetch pollForFetches(final Timer timer) { long pollTimeout = Math.min(applicationEventHandler.maximumTimeToWait(), timer.remainingMs()); - Map acknowledgementsMap = currentFetch.takeAcknowledgedRecords(); + Map acknowledgementsMap = currentFetch.takeAcknowledgedRecords(); // If data is available already, return it immediately final ShareFetch fetch = collect(acknowledgementsMap); @@ -636,7 +634,7 @@ public class ShareConsumerImpl implements ShareConsumerDelegate { return collect(Collections.emptyMap()); } - private ShareFetch collect(Map acknowledgementsMap) { + private ShareFetch collect(Map acknowledgementsMap) { if (currentFetch.isEmpty()) { final ShareFetch fetch = fetchCollector.collect(fetchBuffer); if (fetch.isEmpty()) { @@ -709,7 +707,7 @@ public class ShareConsumerImpl implements ShareConsumerDelegate { acknowledgeBatchIfImplicitAcknowledgement(false); Timer requestTimer = time.timer(timeout.toMillis()); - Map acknowledgementsMap = acknowledgementsToSend(); + Map acknowledgementsMap = acknowledgementsToSend(); if (acknowledgementsMap.isEmpty()) { return Collections.emptyMap(); } else { @@ -721,16 +719,11 @@ public class ShareConsumerImpl implements ShareConsumerDelegate { Map> result = new HashMap<>(); Map completedAcknowledgements = ConsumerUtils.getResult(commitFuture); completedAcknowledgements.forEach((tip, acks) -> { - Errors ackErrorCode = acks.getAcknowledgeErrorCode(); - if (ackErrorCode == null) { + KafkaException exception = acks.getAcknowledgeException(); + if (exception == null) { result.put(tip, Optional.empty()); } else { - ApiException exception = ackErrorCode.exception(); - if (exception == null) { - result.put(tip, Optional.empty()); - } else { - result.put(tip, Optional.of(ackErrorCode.exception())); - } + result.put(tip, Optional.of(exception)); } }); return result; @@ -757,7 +750,7 @@ public class ShareConsumerImpl implements ShareConsumerDelegate { // If using implicit acknowledgement, acknowledge the previously fetched records acknowledgeBatchIfImplicitAcknowledgement(false); - Map acknowledgementsMap = acknowledgementsToSend(); + Map acknowledgementsMap = acknowledgementsToSend(); if (!acknowledgementsMap.isEmpty()) { ShareAcknowledgeAsyncEvent event = new ShareAcknowledgeAsyncEvent(acknowledgementsMap); applicationEventHandler.add(event); @@ -1045,7 +1038,7 @@ public class ShareConsumerImpl implements ShareConsumerDelegate { /** * Returns any ready acknowledgements to be sent to the cluster. */ - private Map acknowledgementsToSend() { + private Map acknowledgementsToSend() { return currentFetch.takeAcknowledgedRecords(); } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetch.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetch.java index 1bab0527536..eb79fa79c40 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetch.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetch.java @@ -138,14 +138,15 @@ public class ShareFetch { * to send. If some records were not acknowledged, the in-flight records will not be empty after this * method. * - * @return The map of acknowledgements to send + * @return The map of acknowledgements to send, along with node information */ - public Map takeAcknowledgedRecords() { - Map acknowledgementMap = new LinkedHashMap<>(); + public Map takeAcknowledgedRecords() { + Map acknowledgementMap = new LinkedHashMap<>(); batches.forEach((tip, batch) -> { + int nodeId = batch.nodeId(); Acknowledgements acknowledgements = batch.takeAcknowledgedRecords(); if (!acknowledgements.isEmpty()) - acknowledgementMap.put(tip, acknowledgements); + acknowledgementMap.put(tip, new NodeAcknowledgements(nodeId, acknowledgements)); }); return acknowledgementMap; } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareInFlightBatch.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareInFlightBatch.java index bff9d62e3a7..b2d6fad17fd 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareInFlightBatch.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareInFlightBatch.java @@ -29,6 +29,7 @@ import java.util.TreeMap; import java.util.TreeSet; public class ShareInFlightBatch { + private final int nodeId; final TopicIdPartition partition; private final Map> inFlightRecords; private final Set acknowledgedRecords; @@ -36,7 +37,8 @@ public class ShareInFlightBatch { private KafkaException exception; private boolean hasCachedException = false; - public ShareInFlightBatch(TopicIdPartition partition) { + public ShareInFlightBatch(int nodeId, TopicIdPartition partition) { + this.nodeId = nodeId; this.partition = partition; inFlightRecords = new TreeMap<>(); acknowledgedRecords = new TreeSet<>(); @@ -87,6 +89,10 @@ public class ShareInFlightBatch { return inFlightRecords.size(); } + int nodeId() { + return nodeId; + } + Acknowledgements takeAcknowledgedRecords() { // Usually, all records will be acknowledged, so we can just clear the in-flight records leaving // an empty batch, which will trigger more fetching diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareSessionHandler.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareSessionHandler.java index 27cfdc5981e..8888c8dc3ee 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareSessionHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareSessionHandler.java @@ -68,12 +68,12 @@ public class ShareSessionHandler { */ private final LinkedHashMap sessionPartitions; - /* + /** * The partitions to be included in the next ShareFetch request. */ private LinkedHashMap nextPartitions; - /* + /** * The acknowledgements to be included in the next ShareFetch/ShareAcknowledge request. */ private LinkedHashMap nextAcknowledgements; @@ -103,6 +103,10 @@ public class ShareSessionHandler { } } + public void addPartitionToAcknowledgeOnly(TopicIdPartition topicIdPartition, Acknowledgements partitionAcknowledgements) { + nextAcknowledgements.put(topicIdPartition, partitionAcknowledgements); + } + public ShareFetchRequest.Builder newShareFetchBuilder(String groupId, FetchConfig fetchConfig) { List added = new ArrayList<>(); List removed = new ArrayList<>(); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareAcknowledgeAsyncEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareAcknowledgeAsyncEvent.java index 7bfc86e9235..6c88133e266 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareAcknowledgeAsyncEvent.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareAcknowledgeAsyncEvent.java @@ -16,21 +16,21 @@ */ package org.apache.kafka.clients.consumer.internals.events; -import org.apache.kafka.clients.consumer.internals.Acknowledgements; +import org.apache.kafka.clients.consumer.internals.NodeAcknowledgements; import org.apache.kafka.common.TopicIdPartition; import java.util.Map; public class ShareAcknowledgeAsyncEvent extends ApplicationEvent { - private final Map acknowledgementsMap; + private final Map acknowledgementsMap; - public ShareAcknowledgeAsyncEvent(final Map acknowledgementsMap) { + public ShareAcknowledgeAsyncEvent(final Map acknowledgementsMap) { super(Type.SHARE_ACKNOWLEDGE_ASYNC); this.acknowledgementsMap = acknowledgementsMap; } - public Map acknowledgementsMap() { + public Map acknowledgementsMap() { return acknowledgementsMap; } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareAcknowledgeOnCloseEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareAcknowledgeOnCloseEvent.java index 0916ab8666c..cc98655b83e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareAcknowledgeOnCloseEvent.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareAcknowledgeOnCloseEvent.java @@ -16,21 +16,21 @@ */ package org.apache.kafka.clients.consumer.internals.events; -import org.apache.kafka.clients.consumer.internals.Acknowledgements; +import org.apache.kafka.clients.consumer.internals.NodeAcknowledgements; import org.apache.kafka.common.TopicIdPartition; import java.util.Map; public class ShareAcknowledgeOnCloseEvent extends CompletableApplicationEvent { - private final Map acknowledgementsMap; + private final Map acknowledgementsMap; - public ShareAcknowledgeOnCloseEvent(final Map acknowledgementsMap, final long deadlineMs) { + public ShareAcknowledgeOnCloseEvent(final Map acknowledgementsMap, final long deadlineMs) { super(Type.SHARE_ACKNOWLEDGE_ON_CLOSE, deadlineMs); this.acknowledgementsMap = acknowledgementsMap; } - public Map acknowledgementsMap() { + public Map acknowledgementsMap() { return acknowledgementsMap; } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareAcknowledgeSyncEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareAcknowledgeSyncEvent.java index 49cb422e633..8b3237c6f95 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareAcknowledgeSyncEvent.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareAcknowledgeSyncEvent.java @@ -17,20 +17,21 @@ package org.apache.kafka.clients.consumer.internals.events; import org.apache.kafka.clients.consumer.internals.Acknowledgements; +import org.apache.kafka.clients.consumer.internals.NodeAcknowledgements; import org.apache.kafka.common.TopicIdPartition; import java.util.Map; public class ShareAcknowledgeSyncEvent extends CompletableApplicationEvent> { - private final Map acknowledgementsMap; + private final Map acknowledgementsMap; - public ShareAcknowledgeSyncEvent(final Map acknowledgementsMap, final long deadlineMs) { + public ShareAcknowledgeSyncEvent(final Map acknowledgementsMap, final long deadlineMs) { super(Type.SHARE_ACKNOWLEDGE_SYNC, deadlineMs); this.acknowledgementsMap = acknowledgementsMap; } - public Map acknowledgementsMap() { + public Map acknowledgementsMap() { return acknowledgementsMap; } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareFetchEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareFetchEvent.java index 2a2b56e87cd..1f2f0cc9a17 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareFetchEvent.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareFetchEvent.java @@ -16,21 +16,21 @@ */ package org.apache.kafka.clients.consumer.internals.events; -import org.apache.kafka.clients.consumer.internals.Acknowledgements; +import org.apache.kafka.clients.consumer.internals.NodeAcknowledgements; import org.apache.kafka.common.TopicIdPartition; import java.util.Map; public class ShareFetchEvent extends ApplicationEvent { - private final Map acknowledgementsMap; + private final Map acknowledgementsMap; - public ShareFetchEvent(Map acknowledgementsMap) { + public ShareFetchEvent(Map acknowledgementsMap) { super(Type.SHARE_FETCH); this.acknowledgementsMap = acknowledgementsMap; } - public Map acknowledgementsMap() { + public Map acknowledgementsMap() { return acknowledgementsMap; } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AcknowledgementCommitCallbackHandlerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AcknowledgementCommitCallbackHandlerTest.java index 9a28b49ed7e..c6e10040d32 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AcknowledgementCommitCallbackHandlerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AcknowledgementCommitCallbackHandlerTest.java @@ -85,7 +85,7 @@ class AcknowledgementCommitCallbackHandlerTest { Acknowledgements acknowledgements = Acknowledgements.empty(); acknowledgements.add(0L, AcknowledgeType.ACCEPT); acknowledgements.add(1L, AcknowledgeType.REJECT); - acknowledgements.setAcknowledgeErrorCode(Errors.INVALID_RECORD_STATE); + acknowledgements.complete(Errors.INVALID_RECORD_STATE.exception()); acknowledgementsMap.put(tip0, acknowledgements); acknowledgementCommitCallbackHandler.onComplete(Collections.singletonList(acknowledgementsMap)); @@ -101,7 +101,7 @@ class AcknowledgementCommitCallbackHandlerTest { Acknowledgements acknowledgements = Acknowledgements.empty(); acknowledgements.add(0L, AcknowledgeType.ACCEPT); acknowledgements.add(1L, AcknowledgeType.REJECT); - acknowledgements.setAcknowledgeErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED); + acknowledgements.complete(Errors.TOPIC_AUTHORIZATION_FAILED.exception()); acknowledgementsMap.put(tip0, acknowledgements); acknowledgementCommitCallbackHandler.onComplete(Collections.singletonList(acknowledgementsMap)); @@ -116,12 +116,12 @@ class AcknowledgementCommitCallbackHandlerTest { Acknowledgements acknowledgements = Acknowledgements.empty(); acknowledgements.add(0L, AcknowledgeType.ACCEPT); acknowledgements.add(1L, AcknowledgeType.REJECT); - acknowledgements.setAcknowledgeErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED); + acknowledgements.complete(Errors.TOPIC_AUTHORIZATION_FAILED.exception()); acknowledgementsMap.put(tip0, acknowledgements); Acknowledgements acknowledgements1 = Acknowledgements.empty(); acknowledgements1.add(0L, AcknowledgeType.RELEASE); - acknowledgements1.setAcknowledgeErrorCode(Errors.INVALID_RECORD_STATE); + acknowledgements1.complete(Errors.INVALID_RECORD_STATE.exception()); acknowledgementsMap.put(tip1, acknowledgements1); Map acknowledgementsMap2 = new HashMap<>(); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetchTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetchTest.java index 3b488d3d002..73efb010c8b 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetchTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetchTest.java @@ -367,6 +367,7 @@ public class ShareCompletedFetchTest { return new ShareCompletedFetch( logContext, BufferSupplier.create(), + 0, TIP, partitionData, shareFetchMetricsAggregator, diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java index efb190506ea..ede3f5415fc 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java @@ -38,7 +38,9 @@ import org.apache.kafka.common.Uuid; import org.apache.kafka.common.compress.Compression; import org.apache.kafka.common.errors.AuthenticationException; import org.apache.kafka.common.errors.DisconnectException; +import org.apache.kafka.common.errors.ShareSessionNotFoundException; import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.apache.kafka.common.errors.UnknownServerException; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.internals.RecordHeader; import org.apache.kafka.common.internals.ClusterResourceListeners; @@ -112,6 +114,7 @@ import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZE import static org.apache.kafka.clients.consumer.internals.events.CompletableEvent.calculateDeadlineMs; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -268,7 +271,7 @@ public class ShareConsumeRequestManagerTest { Acknowledgements acknowledgements = Acknowledgements.empty(); acknowledgements.add(1L, AcknowledgeType.ACCEPT); - shareConsumeRequestManager.fetch(Collections.singletonMap(tip0, acknowledgements)); + shareConsumeRequestManager.fetch(Collections.singletonMap(tip0, new NodeAcknowledgements(0, acknowledgements))); assertEquals(1, sendFetches()); assertFalse(shareConsumeRequestManager.hasCompletedFetches()); @@ -287,7 +290,7 @@ public class ShareConsumeRequestManagerTest { Acknowledgements acknowledgements2 = Acknowledgements.empty(); acknowledgements2.add(2L, AcknowledgeType.REJECT); - shareConsumeRequestManager.fetch(Collections.singletonMap(tip0, acknowledgements2)); + shareConsumeRequestManager.fetch(Collections.singletonMap(tip0, new NodeAcknowledgements(0, acknowledgements2))); assertEquals(1, sendFetches()); assertFalse(shareConsumeRequestManager.hasCompletedFetches()); @@ -328,7 +331,7 @@ public class ShareConsumeRequestManagerTest { acknowledgements.add(2L, AcknowledgeType.ACCEPT); acknowledgements.add(3L, AcknowledgeType.REJECT); - shareConsumeRequestManager.commitSync(Collections.singletonMap(tip0, acknowledgements), time.milliseconds() + 2000); + shareConsumeRequestManager.commitSync(Collections.singletonMap(tip0, new NodeAcknowledgements(0, acknowledgements)), time.milliseconds() + 2000); assertEquals(1, shareConsumeRequestManager.sendAcknowledgements()); @@ -361,7 +364,7 @@ public class ShareConsumeRequestManagerTest { acknowledgements.add(2L, AcknowledgeType.ACCEPT); acknowledgements.add(3L, AcknowledgeType.REJECT); - shareConsumeRequestManager.commitAsync(Collections.singletonMap(tip0, acknowledgements)); + shareConsumeRequestManager.commitAsync(Collections.singletonMap(tip0, new NodeAcknowledgements(0, acknowledgements))); assertEquals(1, shareConsumeRequestManager.sendAcknowledgements()); @@ -395,20 +398,20 @@ public class ShareConsumeRequestManagerTest { acknowledgements.add(1L, AcknowledgeType.ACCEPT); acknowledgements.add(2L, AcknowledgeType.ACCEPT); - shareConsumeRequestManager.commitAsync(Collections.singletonMap(tip0, acknowledgements)); + shareConsumeRequestManager.commitAsync(Collections.singletonMap(tip0, new NodeAcknowledgements(0, acknowledgements))); assertEquals(1, shareConsumeRequestManager.sendAcknowledgements()); Acknowledgements acknowledgements2 = Acknowledgements.empty(); acknowledgements2.add(3L, AcknowledgeType.REJECT); - shareConsumeRequestManager.commitAsync(Collections.singletonMap(tip0, acknowledgements2)); + shareConsumeRequestManager.commitAsync(Collections.singletonMap(tip0, new NodeAcknowledgements(0, acknowledgements2))); client.prepareResponse(null, true); networkClientDelegate.poll(time.timer(0)); assertEquals(Collections.singletonMap(tip0, acknowledgements), completedAcknowledgements.get(0)); - assertEquals(Errors.UNKNOWN_SERVER_ERROR, completedAcknowledgements.get(0).get(tip0).getAcknowledgeErrorCode()); + assertInstanceOf(UnknownServerException.class, completedAcknowledgements.get(0).get(tip0).getAcknowledgeException()); completedAcknowledgements.clear(); assertEquals(1, shareConsumeRequestManager.requestStates(0).getAsyncRequest().getAcknowledgementsToSendCount(tip0)); @@ -419,7 +422,7 @@ public class ShareConsumeRequestManagerTest { assertNull(shareConsumeRequestManager.requestStates(0)); // The callback for these unsent acknowledgements will be invoked with an error code. assertEquals(Collections.singletonMap(tip0, acknowledgements2), completedAcknowledgements.get(0)); - assertEquals(Errors.SHARE_SESSION_NOT_FOUND, completedAcknowledgements.get(0).get(tip0).getAcknowledgeErrorCode()); + assertInstanceOf(ShareSessionNotFoundException.class, completedAcknowledgements.get(0).get(tip0).getAcknowledgeException()); }); // Attempt a normal fetch to check if nodesWithPendingRequests is empty. @@ -451,14 +454,14 @@ public class ShareConsumeRequestManagerTest { acknowledgements.add(1L, AcknowledgeType.ACCEPT); // Piggyback acknowledgements - shareConsumeRequestManager.fetch(Collections.singletonMap(tip0, acknowledgements)); + shareConsumeRequestManager.fetch(Collections.singletonMap(tip0, new NodeAcknowledgements(0, acknowledgements))); // Remaining acknowledgements sent with close(). Acknowledgements acknowledgements2 = Acknowledgements.empty(); acknowledgements2.add(2L, AcknowledgeType.ACCEPT); acknowledgements2.add(3L, AcknowledgeType.REJECT); - shareConsumeRequestManager.acknowledgeOnClose(Collections.singletonMap(tip0, acknowledgements2), + shareConsumeRequestManager.acknowledgeOnClose(Collections.singletonMap(tip0, new NodeAcknowledgements(0, acknowledgements2)), calculateDeadlineMs(time.timer(100))); assertEquals(1, shareConsumeRequestManager.sendAcknowledgements()); @@ -468,7 +471,7 @@ public class ShareConsumeRequestManagerTest { assertEquals(1, completedAcknowledgements.size()); Acknowledgements mergedAcks = acknowledgements.merge(acknowledgements2); - mergedAcks.setAcknowledgeErrorCode(Errors.NONE); + mergedAcks.complete(null); // Verifying that all 3 offsets were acknowledged as part of the final ShareAcknowledge on close. assertEquals(mergedAcks.getAcknowledgementsTypeMap(), completedAcknowledgements.get(0).get(tip0).getAcknowledgementsTypeMap()); assertTrue(shareConsumeRequestManager.hasCompletedFetches()); @@ -495,7 +498,7 @@ public class ShareConsumeRequestManagerTest { acknowledgements.add(2L, AcknowledgeType.ACCEPT); acknowledgements.add(3L, AcknowledgeType.REJECT); - shareConsumeRequestManager.commitAsync(Collections.singletonMap(tip0, acknowledgements)); + shareConsumeRequestManager.commitAsync(Collections.singletonMap(tip0, new NodeAcknowledgements(0, acknowledgements))); shareConsumeRequestManager.acknowledgeOnClose(Collections.emptyMap(), calculateDeadlineMs(time.timer(100))); @@ -533,7 +536,7 @@ public class ShareConsumeRequestManagerTest { acknowledgements.add(2L, AcknowledgeType.ACCEPT); acknowledgements.add(3L, AcknowledgeType.REJECT); - shareConsumeRequestManager.commitSync(Collections.singletonMap(tip0, acknowledgements), + shareConsumeRequestManager.commitSync(Collections.singletonMap(tip0, new NodeAcknowledgements(0, acknowledgements)), calculateDeadlineMs(time.timer(100))); shareConsumeRequestManager.acknowledgeOnClose(Collections.emptyMap(), calculateDeadlineMs(time.timer(100))); @@ -654,14 +657,14 @@ public class ShareConsumeRequestManagerTest { acknowledgements.add(2L, AcknowledgeType.ACCEPT); acknowledgements.add(3L, AcknowledgeType.REJECT); - shareConsumeRequestManager.commitAsync(Collections.singletonMap(tip0, acknowledgements)); + shareConsumeRequestManager.commitAsync(Collections.singletonMap(tip0, new NodeAcknowledgements(0, acknowledgements))); Acknowledgements acknowledgements2 = Acknowledgements.empty(); acknowledgements.add(4L, AcknowledgeType.ACCEPT); acknowledgements.add(5L, AcknowledgeType.ACCEPT); acknowledgements.add(6L, AcknowledgeType.ACCEPT); - shareConsumeRequestManager.commitAsync(Collections.singletonMap(tip0, acknowledgements2)); + shareConsumeRequestManager.commitAsync(Collections.singletonMap(tip0, new NodeAcknowledgements(0, acknowledgements2))); assertEquals(6, shareConsumeRequestManager.requestStates(0).getAsyncRequest().getAcknowledgementsToSendCount(tip0)); @@ -697,14 +700,14 @@ public class ShareConsumeRequestManagerTest { acknowledgements.add(2L, AcknowledgeType.ACCEPT); acknowledgements.add(3L, AcknowledgeType.REJECT); - shareConsumeRequestManager.commitAsync(Collections.singletonMap(tip0, acknowledgements)); + shareConsumeRequestManager.commitAsync(Collections.singletonMap(tip0, new NodeAcknowledgements(0, acknowledgements))); Acknowledgements acknowledgements2 = Acknowledgements.empty(); acknowledgements2.add(4L, AcknowledgeType.ACCEPT); acknowledgements2.add(5L, AcknowledgeType.ACCEPT); acknowledgements2.add(6L, AcknowledgeType.ACCEPT); - shareConsumeRequestManager.commitSync(Collections.singletonMap(tip0, acknowledgements2), 60000L); + shareConsumeRequestManager.commitSync(Collections.singletonMap(tip0, new NodeAcknowledgements(0, acknowledgements2)), 60000L); assertEquals(3, shareConsumeRequestManager.requestStates(0).getAsyncRequest().getAcknowledgementsToSendCount(tip0)); assertEquals(1, shareConsumeRequestManager.requestStates(0).getSyncRequestQueue().size()); @@ -755,7 +758,7 @@ public class ShareConsumeRequestManagerTest { acknowledgements.add(5L, AcknowledgeType.RELEASE); acknowledgements.add(6L, AcknowledgeType.ACCEPT); - shareConsumeRequestManager.commitSync(Collections.singletonMap(tip0, acknowledgements), 60000L); + shareConsumeRequestManager.commitSync(Collections.singletonMap(tip0, new NodeAcknowledgements(0, acknowledgements)), 60000L); assertNull(shareConsumeRequestManager.requestStates(0).getAsyncRequest()); assertEquals(1, shareConsumeRequestManager.requestStates(0).getSyncRequestQueue().size()); @@ -803,7 +806,7 @@ public class ShareConsumeRequestManagerTest { fetchRecords(); // Piggyback acknowledgements - shareConsumeRequestManager.fetch(Collections.singletonMap(tip0, acknowledgements)); + shareConsumeRequestManager.fetch(Collections.singletonMap(tip0, new NodeAcknowledgements(0, acknowledgements))); assertEquals(1, sendFetches()); assertFalse(shareConsumeRequestManager.hasCompletedFetches()); @@ -813,7 +816,7 @@ public class ShareConsumeRequestManagerTest { Acknowledgements acknowledgements2 = Acknowledgements.empty(); acknowledgements2.add(3L, AcknowledgeType.ACCEPT); - shareConsumeRequestManager.fetch(Collections.singletonMap(tip0, acknowledgements2)); + shareConsumeRequestManager.fetch(Collections.singletonMap(tip0, new NodeAcknowledgements(0, acknowledgements2))); client.prepareResponse(fullFetchResponse(tip0, records, acquiredRecords, Errors.NONE)); networkClientDelegate.poll(time.timer(0)); @@ -847,7 +850,7 @@ public class ShareConsumeRequestManagerTest { assignFromSubscribed(singleton(tp1)); - shareConsumeRequestManager.commitAsync(Collections.singletonMap(tip0, acknowledgements)); + shareConsumeRequestManager.commitAsync(Collections.singletonMap(tip0, new NodeAcknowledgements(0, acknowledgements))); assertEquals(1, shareConsumeRequestManager.sendAcknowledgements()); @@ -878,7 +881,7 @@ public class ShareConsumeRequestManagerTest { acknowledgements.add(2L, AcknowledgeType.ACCEPT); // Send acknowledgements via ShareFetch - shareConsumeRequestManager.fetch(Collections.singletonMap(tip0, acknowledgements)); + shareConsumeRequestManager.fetch(Collections.singletonMap(tip0, new NodeAcknowledgements(0, acknowledgements))); fetchRecords(); // Subscription changes. assignFromSubscribed(singleton(tp1)); @@ -923,7 +926,7 @@ public class ShareConsumeRequestManagerTest { acknowledgements.add(2L, AcknowledgeType.ACCEPT); // Send acknowledgements via ShareFetch - shareConsumeRequestManager.fetch(Collections.singletonMap(tip0, acknowledgements)); + shareConsumeRequestManager.fetch(Collections.singletonMap(tip0, new NodeAcknowledgements(0, acknowledgements))); fetchRecords(); // Subscription changes. subscriptions.assignFromSubscribed(Collections.singletonList(tp1)); @@ -1045,7 +1048,7 @@ public class ShareConsumeRequestManagerTest { acknowledgements.add(5L, AcknowledgeType.RELEASE); acknowledgements.add(6L, AcknowledgeType.ACCEPT); - shareConsumeRequestManager.commitSync(Collections.singletonMap(tip0, acknowledgements), 60000L); + shareConsumeRequestManager.commitSync(Collections.singletonMap(tip0, new NodeAcknowledgements(0, acknowledgements)), 60000L); assertNull(shareConsumeRequestManager.requestStates(0).getAsyncRequest()); assertEquals(1, shareConsumeRequestManager.requestStates(0).getSyncRequestQueue().size()); @@ -1084,7 +1087,7 @@ public class ShareConsumeRequestManagerTest { acknowledgements.add(1L, AcknowledgeType.ACCEPT); acknowledgements.add(2L, AcknowledgeType.ACCEPT); - shareConsumeRequestManager.commitAsync(Collections.singletonMap(tip0, acknowledgements)); + shareConsumeRequestManager.commitAsync(Collections.singletonMap(tip0, new NodeAcknowledgements(0, acknowledgements))); assertEquals(1, shareConsumeRequestManager.sendAcknowledgements()); @@ -1102,7 +1105,7 @@ public class ShareConsumeRequestManagerTest { Acknowledgements acknowledgements2 = Acknowledgements.empty(); acknowledgements2.add(3L, AcknowledgeType.ACCEPT); - shareConsumeRequestManager.commitAsync(Collections.singletonMap(tip0, acknowledgements2)); + shareConsumeRequestManager.commitAsync(Collections.singletonMap(tip0, new NodeAcknowledgements(0, acknowledgements2))); TestUtils.retryOnExceptionWithTimeout(() -> assertEquals(1, shareConsumeRequestManager.sendAcknowledgements())); @@ -1146,9 +1149,9 @@ public class ShareConsumeRequestManagerTest { acknowledgements2.add(1L, AcknowledgeType.ACCEPT); acknowledgements2.add(2L, AcknowledgeType.ACCEPT); - Map acks = new HashMap<>(); - acks.put(tip0, acknowledgements); - acks.put(t2ip0, acknowledgements2); + Map acks = new HashMap<>(); + acks.put(tip0, new NodeAcknowledgements(0, acknowledgements)); + acks.put(t2ip0, new NodeAcknowledgements(0, acknowledgements2)); shareConsumeRequestManager.commitAsync(acks); @@ -1582,7 +1585,7 @@ public class ShareConsumeRequestManagerTest { Acknowledgements acknowledgements = Acknowledgements.empty(); acknowledgements.add(1L, AcknowledgeType.ACCEPT); - shareConsumeRequestManager.fetch(Collections.singletonMap(tip0, acknowledgements)); + shareConsumeRequestManager.fetch(Collections.singletonMap(tip0, new NodeAcknowledgements(0, acknowledgements))); assertEquals(startingClusterMetadata, metadata.fetch()); @@ -1687,7 +1690,7 @@ public class ShareConsumeRequestManagerTest { Acknowledgements acknowledgements = Acknowledgements.empty(); acknowledgements.add(1L, AcknowledgeType.ACCEPT); - shareConsumeRequestManager.fetch(Collections.singletonMap(tip0, acknowledgements)); + shareConsumeRequestManager.fetch(Collections.singletonMap(tip0, new NodeAcknowledgements(0, acknowledgements))); // The metadata snapshot will have been updated with the new leader information assertNotEquals(startingClusterMetadata, metadata.fetch()); @@ -1727,6 +1730,239 @@ public class ShareConsumeRequestManagerTest { assertEquals(1, fetchedRecords.size()); } + /** + * Test the scenario that the metadata indicated a change in leadership between ShareFetch requests such + * as could occur when metadata is periodically updated. + */ + @ParameterizedTest + @EnumSource(value = Errors.class, names = {"FENCED_LEADER_EPOCH", "NOT_LEADER_OR_FOLLOWER"}) + public void testWhenLeadershipChangeBetweenShareFetchRequests(Errors error) { + buildRequestManager(); + + subscriptions.subscribeToShareGroup(Collections.singleton(topicName)); + Set partitions = new HashSet<>(); + partitions.add(tp0); + partitions.add(tp1); + subscriptions.assignFromSubscribed(partitions); + + client.updateMetadata( + RequestTestUtils.metadataUpdateWithIds(2, singletonMap(topicName, 2), + tp -> validLeaderEpoch, topicIds, false)); + Node nodeId0 = metadata.fetch().nodeById(0); + Node nodeId1 = metadata.fetch().nodeById(1); + + Cluster startingClusterMetadata = metadata.fetch(); + assertFalse(metadata.updateRequested()); + + assertEquals(2, sendFetches()); + assertFalse(shareConsumeRequestManager.hasCompletedFetches()); + + LinkedHashMap partitionData = new LinkedHashMap<>(); + partitionData.put(tip0, + new ShareFetchResponseData.PartitionData() + .setPartitionIndex(tip0.topicPartition().partition()) + .setErrorCode(Errors.NONE.code()) + .setRecords(records) + .setAcquiredRecords(ShareCompletedFetchTest.acquiredRecords(1L, 1)) + .setAcknowledgeErrorCode(Errors.NONE.code())); + client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, partitionData, Collections.emptyList()), nodeId0); + partitionData.clear(); + partitionData.put(tip1, + new ShareFetchResponseData.PartitionData() + .setPartitionIndex(tip1.topicPartition().partition()) + .setErrorCode(Errors.NONE.code())); + client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, partitionData, Collections.emptyList()), nodeId1); + networkClientDelegate.poll(time.timer(0)); + assertTrue(shareConsumeRequestManager.hasCompletedFetches()); + + Map>> partitionRecords = fetchRecords(); + assertTrue(partitionRecords.containsKey(tp0)); + assertFalse(partitionRecords.containsKey(tp1)); + + List> fetchedRecords = partitionRecords.get(tp0); + assertEquals(1, fetchedRecords.size()); + + Acknowledgements acknowledgements = Acknowledgements.empty(); + acknowledgements.add(1L, AcknowledgeType.ACCEPT); + shareConsumeRequestManager.fetch(Collections.singletonMap(tip0, new NodeAcknowledgements(0, acknowledgements))); + + assertEquals(startingClusterMetadata, metadata.fetch()); + + // Move the leadership of tp0 onto node 1 + HashMap partitionLeaders = new HashMap<>(); + partitionLeaders.put(tp0, new Metadata.LeaderIdAndEpoch(Optional.of(nodeId1.id()), Optional.of(validLeaderEpoch + 1))); + metadata.updatePartitionLeadership(partitionLeaders, List.of()); + + assertNotEquals(startingClusterMetadata, metadata.fetch()); + + // Even though the partitions are on the same leader, records were fetched on the previous leader. + // A fetch is sent to the previous leader to remove the partition from the share session and get the acknowledge error code. + assertEquals(2, sendFetches()); + assertFalse(shareConsumeRequestManager.hasCompletedFetches()); + + partitionData.clear(); + partitionData.put(tip0, + new ShareFetchResponseData.PartitionData() + .setPartitionIndex(tip0.topicPartition().partition()) + .setErrorCode(Errors.NONE.code()) + .setAcknowledgeErrorCode(error.code())); + client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, partitionData, Collections.emptyList()), nodeId0); + partitionData.clear(); + partitionData.put(tip0, + new ShareFetchResponseData.PartitionData() + .setPartitionIndex(tip0.topicPartition().partition()) + .setErrorCode(Errors.NONE.code()) + .setRecords(records) + .setAcquiredRecords(ShareCompletedFetchTest.acquiredRecords(1L, 1)) + .setAcknowledgeErrorCode(Errors.NONE.code())); + partitionData.put(tip1, + new ShareFetchResponseData.PartitionData() + .setPartitionIndex(tip1.topicPartition().partition()) + .setRecords(records) + .setAcquiredRecords(ShareCompletedFetchTest.acquiredRecords(1L, 1)) + .setAcknowledgeErrorCode(Errors.NONE.code())); + client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, partitionData, Collections.emptyList()), nodeId1); + networkClientDelegate.poll(time.timer(0)); + assertTrue(shareConsumeRequestManager.hasCompletedFetches()); + + partitionRecords = fetchRecords(); + assertTrue(partitionRecords.containsKey(tp0)); + assertTrue(partitionRecords.containsKey(tp1)); + + fetchedRecords = partitionRecords.get(tp0); + assertEquals(1, fetchedRecords.size()); + fetchedRecords = partitionRecords.get(tp1); + assertEquals(1, fetchedRecords.size()); + } + + @Test + void testWhenLeadershipChangedAfterDisconnected() { + buildRequestManager(); + shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true); + + subscriptions.subscribeToShareGroup(Collections.singleton(topicName)); + Set partitions = new HashSet<>(); + partitions.add(tp0); + partitions.add(tp1); + subscriptions.assignFromSubscribed(partitions); + + client.updateMetadata( + RequestTestUtils.metadataUpdateWithIds(2, singletonMap(topicName, 2), + tp -> validLeaderEpoch, topicIds, false)); + Node nodeId0 = metadata.fetch().nodeById(0); + Node nodeId1 = metadata.fetch().nodeById(1); + + Cluster startingClusterMetadata = metadata.fetch(); + assertFalse(metadata.updateRequested()); + + assertEquals(2, sendFetches()); + assertFalse(shareConsumeRequestManager.hasCompletedFetches()); + + LinkedHashMap partitionData = new LinkedHashMap<>(); + partitionData.put(tip0, + new ShareFetchResponseData.PartitionData() + .setPartitionIndex(tip0.topicPartition().partition()) + .setErrorCode(Errors.NONE.code()) + .setRecords(records) + .setAcquiredRecords(ShareCompletedFetchTest.acquiredRecords(1L, 1)) + .setAcknowledgeErrorCode(Errors.NONE.code())); + client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, partitionData, Collections.emptyList()), nodeId0); + partitionData.clear(); + partitionData.put(tip1, + new ShareFetchResponseData.PartitionData() + .setPartitionIndex(tip1.topicPartition().partition()) + .setErrorCode(Errors.NONE.code())); + client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, partitionData, Collections.emptyList()), nodeId1); + networkClientDelegate.poll(time.timer(0)); + assertTrue(shareConsumeRequestManager.hasCompletedFetches()); + + Map>> partitionRecords = fetchRecords(); + assertTrue(partitionRecords.containsKey(tp0)); + assertFalse(partitionRecords.containsKey(tp1)); + + List> fetchedRecords = partitionRecords.get(tp0); + assertEquals(1, fetchedRecords.size()); + + Acknowledgements acknowledgements = Acknowledgements.empty(); + acknowledgements.add(1, AcknowledgeType.ACCEPT); + shareConsumeRequestManager.fetch(Collections.singletonMap(tip0, new NodeAcknowledgements(0, acknowledgements))); + + assertEquals(startingClusterMetadata, metadata.fetch()); + + acknowledgements = Acknowledgements.empty(); + acknowledgements.add(1, AcknowledgeType.ACCEPT); + shareConsumeRequestManager.fetch(Collections.singletonMap(tip0, new NodeAcknowledgements(0, acknowledgements))); + + assertEquals(2, sendFetches()); + assertFalse(shareConsumeRequestManager.hasCompletedFetches()); + + partitionData.clear(); + partitionData.put(tip0, + new ShareFetchResponseData.PartitionData() + .setPartitionIndex(tip0.topicPartition().partition()) + .setErrorCode(Errors.NONE.code()) + .setAcknowledgeErrorCode(Errors.NONE.code())); + client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, partitionData, Collections.emptyList()), nodeId0, true); + partitionData.clear(); + partitionData.put(tip1, + new ShareFetchResponseData.PartitionData() + .setPartitionIndex(tip1.topicPartition().partition()) + .setRecords(records) + .setAcquiredRecords(ShareCompletedFetchTest.acquiredRecords(1L, 1)) + .setAcknowledgeErrorCode(Errors.NONE.code())); + client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, partitionData, Collections.emptyList()), nodeId1); + networkClientDelegate.poll(time.timer(0)); + assertTrue(shareConsumeRequestManager.hasCompletedFetches()); + + // The node was disconnected, so the acknowledgement failed + assertInstanceOf(DisconnectException.class, completedAcknowledgements.get(0).get(tip0).getAcknowledgeException()); + completedAcknowledgements.clear(); + + partitionRecords = fetchRecords(); + assertFalse(partitionRecords.containsKey(tp0)); + assertTrue(partitionRecords.containsKey(tp1)); + + fetchedRecords = partitionRecords.get(tp1); + assertEquals(1, fetchedRecords.size()); + + // Move the leadership of tp0 onto node 1 + HashMap partitionLeaders = new HashMap<>(); + partitionLeaders.put(tp0, new Metadata.LeaderIdAndEpoch(Optional.of(nodeId1.id()), Optional.of(validLeaderEpoch + 1))); + metadata.updatePartitionLeadership(partitionLeaders, List.of()); + + assertNotEquals(startingClusterMetadata, metadata.fetch()); + + shareConsumeRequestManager.fetch(Collections.singletonMap(tip1, new NodeAcknowledgements(1, acknowledgements))); + + assertEquals(1, sendFetches()); + assertFalse(shareConsumeRequestManager.hasCompletedFetches()); + + partitionData.clear(); + partitionData.put(tip0, + new ShareFetchResponseData.PartitionData() + .setPartitionIndex(tip0.topicPartition().partition()) + .setErrorCode(Errors.NONE.code()) + .setRecords(records) + .setAcquiredRecords(ShareCompletedFetchTest.acquiredRecords(1L, 1)) + .setAcknowledgeErrorCode(Errors.NONE.code())); + partitionData.put(tip1, + new ShareFetchResponseData.PartitionData() + .setPartitionIndex(tip1.topicPartition().partition())); + client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, partitionData, Collections.emptyList()), nodeId1); + networkClientDelegate.poll(time.timer(0)); + assertTrue(shareConsumeRequestManager.hasCompletedFetches()); + + assertNull(completedAcknowledgements.get(0).get(tip1).getAcknowledgeException()); + completedAcknowledgements.clear(); + + partitionRecords = fetchRecords(); + assertTrue(partitionRecords.containsKey(tp0)); + assertFalse(partitionRecords.containsKey(tp1)); + + fetchedRecords = partitionRecords.get(tp0); + assertEquals(1, fetchedRecords.size()); + } + private ShareFetchResponse fetchResponseWithTopLevelError(TopicIdPartition tp, Errors error) { Map partitions = Collections.singletonMap(tp, new ShareFetchResponseData.PartitionData() diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java index 04db229c8df..6f5e78451b1 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java @@ -240,7 +240,7 @@ public class ShareConsumerImplTest { final String topicName = "foo"; final int partition = 3; final TopicIdPartition tip = new TopicIdPartition(Uuid.randomUuid(), partition, topicName); - final ShareInFlightBatch batch = new ShareInFlightBatch<>(tip); + final ShareInFlightBatch batch = new ShareInFlightBatch<>(0, tip); batch.addRecord(new ConsumerRecord<>(topicName, partition, 2, "key1", "value1")); doAnswer(invocation -> { consumer.wakeup(); @@ -465,7 +465,7 @@ public class ShareConsumerImplTest { final TopicPartition tp = new TopicPartition("topic", 0); final TopicIdPartition tip = new TopicIdPartition(Uuid.randomUuid(), tp); - final ShareInFlightBatch batch = new ShareInFlightBatch<>(tip); + final ShareInFlightBatch batch = new ShareInFlightBatch<>(0, tip); batch.addRecord(new ConsumerRecord<>("topic", 0, 2, "key1", "value1")); final ShareFetch fetch = ShareFetch.empty(); fetch.add(tip, batch); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareFetchBufferTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareFetchBufferTest.java index 2fa8d6cb659..f7039e838b7 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareFetchBufferTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareFetchBufferTest.java @@ -170,6 +170,7 @@ public class ShareFetchBufferTest { return new ShareCompletedFetch( logContext, BufferSupplier.create(), + 0, tp, partitionData, shareFetchMetricsAggregator, diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareFetchCollectorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareFetchCollectorTest.java index eb63c94673e..194d9b2a2c4 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareFetchCollectorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareFetchCollectorTest.java @@ -347,6 +347,7 @@ public class ShareFetchCollectorTest { return new ShareCompletedFetch( logContext, BufferSupplier.create(), + 0, topicAPartition0, partitionData, shareFetchMetricsAggregator, diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareSessionHandlerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareSessionHandlerTest.java index 0ce2f349f98..e0ae3e982bb 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareSessionHandlerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareSessionHandlerTest.java @@ -337,7 +337,7 @@ public class ShareSessionHandlerTest { } @Test - public void testForgottenPartitions() { + public void testPartitionForgottenOnAcknowledgeOnly() { String groupId = "G1"; Uuid memberId = Uuid.randomUuid(); ShareSessionHandler handler = new ShareSessionHandler(LOG_CONTEXT, 1, memberId); @@ -362,6 +362,42 @@ public class ShareSessionHandlerTest { new RespEntry("foo", 0, topicId)))); handler.handleResponse(resp, ApiKeys.SHARE_FETCH.latestVersion(true)); + // Remove the topic from the session by setting acknowledgements only - this is not asking to fetch records + ShareFetchRequestData requestData2 = handler.newShareFetchBuilder(groupId, fetchConfig).build().data(); + handler.addPartitionToAcknowledgeOnly(foo0, Acknowledgements.empty()); + assertEquals(Collections.singletonList(foo0), reqForgetList(requestData2, topicNames)); + + // Should have the same session ID, next epoch, and same ID usage + assertEquals(memberId.toString(), requestData2.memberId(), "Did not use same session"); + assertEquals(1, requestData2.shareSessionEpoch(), "Did not have correct epoch"); + } + + @Test + public void testForgottenPartitions() { + String groupId = "G1"; + Uuid memberId = Uuid.randomUuid(); + ShareSessionHandler handler = new ShareSessionHandler(LOG_CONTEXT, 1, memberId); + + // We want to test when all topics are removed from the session + Map topicIds = new HashMap<>(); + Map topicNames = new HashMap<>(); + Uuid topicId = addTopicId(topicIds, topicNames, "foo"); + TopicIdPartition foo0 = new TopicIdPartition(topicId, 0, "foo"); + handler.addPartitionToFetch(foo0, null); + ShareFetchRequestData requestData1 = handler.newShareFetchBuilder(groupId, fetchConfig).build().data(); + assertMapsEqual(reqMap(foo0), handler.sessionPartitionMap()); + ArrayList expectedToSend1 = new ArrayList<>(); + expectedToSend1.add(new TopicIdPartition(topicId, 0, "foo")); + assertListEquals(expectedToSend1, reqFetchList(requestData1, topicNames)); + + ShareFetchResponse resp = new ShareFetchResponse( + new ShareFetchResponseData() + .setErrorCode(Errors.NONE.code()) + .setThrottleTimeMs(0) + .setResponses(respList( + new RespEntry("foo", 0, topicId)))); + handler.handleResponse(resp, ApiKeys.SHARE_FETCH.latestVersion(true)); + // Remove the topic from the session ShareFetchRequestData requestData2 = handler.newShareFetchBuilder(groupId, fetchConfig).build().data(); assertEquals(Collections.singletonList(foo0), reqForgetList(requestData2, topicNames));