KAFKA-18618: Improve leader change handling of acknowledgements [1/N] (#18672)

Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>, ShivsundarR <shr@confluent.io>, Manikumar Reddy <manikumar.reddy@gmail.com>
This commit is contained in:
Andrew Schofield 2025-02-06 14:32:55 +00:00 committed by GitHub
parent b2b2408692
commit aa8c57665f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
21 changed files with 538 additions and 184 deletions

View File

@ -20,8 +20,10 @@ import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.DisconnectException;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.InvalidRecordStateException;
import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
import org.apache.kafka.common.errors.WakeupException;
import java.util.Map;
@ -42,12 +44,16 @@ public interface AcknowledgementCommitCallback {
*
* @param exception The exception thrown during processing of the request, or null if the acknowledgement completed successfully.
* <p><ul>
* <li> {@link AuthorizationException} if not authorized to the topic or group
* <li> {@link InvalidRecordStateException} if the record state is invalid
* <li> {@link AuthorizationException} if not authorized to the topic of group
* <li> {@link NotLeaderOrFollowerException} if the leader had changed by the time the acknowledgements were sent
* <li> {@link DisconnectException} if the broker disconnected before the request could be completed
* <li> {@link WakeupException} if {@link KafkaShareConsumer#wakeup()} is called before or while this function is called
* <li> {@link InterruptException} if the calling thread is interrupted before or while this function is called
* <li> {@link KafkaException} for any other unrecoverable errors
* </ul>
* <p>Note that even if the exception is a retriable exception, the acknowledgement could not be completed and the
* records need to be fetched again. The callback is called after any retries have been performed.
*/
void onComplete(Map<TopicIdPartition, Set<Long>> offsets, Exception exception);
}

View File

@ -17,6 +17,7 @@
package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.consumer.AcknowledgementCommitCallback;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicIdPartition;
import org.slf4j.Logger;
@ -45,10 +46,7 @@ public class AcknowledgementCommitCallbackHandler {
void onComplete(List<Map<TopicIdPartition, Acknowledgements>> acknowledgementsMapList) {
final ArrayList<Throwable> exceptions = new ArrayList<>();
acknowledgementsMapList.forEach(acknowledgementsMap -> acknowledgementsMap.forEach((partition, acknowledgements) -> {
Exception exception = null;
if (acknowledgements.getAcknowledgeErrorCode() != null) {
exception = acknowledgements.getAcknowledgeErrorCode().exception();
}
KafkaException exception = acknowledgements.getAcknowledgeException();
Set<Long> offsets = acknowledgements.getAcknowledgementsTypeMap().keySet();
Set<Long> offsetsCopy = Collections.unmodifiableSet(offsets);
enteredCallback = true;

View File

@ -17,6 +17,7 @@
package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.consumer.AcknowledgeType;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.protocol.Errors;
import java.util.ArrayList;
@ -35,8 +36,11 @@ public class Acknowledgements {
// The acknowledgements keyed by offset. If the record is a gap, the AcknowledgeType will be null.
private final Map<Long, AcknowledgeType> acknowledgements;
// When the broker responds to the acknowledgements, this is the error code returned.
private Errors acknowledgeErrorCode;
// When the broker responds to the acknowledgements, this is the exception thrown.
private KafkaException acknowledgeException;
// Set when the broker has responded to the acknowledgements.
private boolean completed;
public static Acknowledgements empty() {
return new Acknowledgements(new TreeMap<>());
@ -44,6 +48,8 @@ public class Acknowledgements {
private Acknowledgements(Map<Long, AcknowledgeType> acknowledgements) {
this.acknowledgements = acknowledgements;
this.acknowledgeException = null;
this.completed = false;
}
/**
@ -115,25 +121,26 @@ public class Acknowledgements {
* @return Whether the acknowledgements were sent to the broker and a response received
*/
public boolean isCompleted() {
return acknowledgeErrorCode != null;
return completed;
}
/**
* Set the acknowledgement error code when the response has been received from the broker.
* Completes the acknowledgements when the response has been received from the broker.
*
* @param acknowledgeErrorCode the error code
* @param acknowledgeException the exception (will be null if successful)
*/
public void setAcknowledgeErrorCode(Errors acknowledgeErrorCode) {
this.acknowledgeErrorCode = acknowledgeErrorCode;
public void complete(KafkaException acknowledgeException) {
this.acknowledgeException = acknowledgeException;
completed = true;
}
/**
* Get the acknowledgement error code when the response has been received from the broker.
* Get the acknowledgement exception when the response has been received from the broker.
*
* @return the error code
*/
public Errors getAcknowledgeErrorCode() {
return acknowledgeErrorCode;
public KafkaException getAcknowledgeException() {
return acknowledgeException;
}
/**
@ -301,10 +308,10 @@ public class Acknowledgements {
public String toString() {
StringBuilder sb = new StringBuilder("Acknowledgements(");
sb.append(acknowledgements);
if (acknowledgeErrorCode != null) {
sb.append(", errorCode=");
sb.append(acknowledgeErrorCode.code());
}
sb.append(", acknowledgeException=");
sb.append(acknowledgeException != null ? Errors.forException(acknowledgeException) : "null");
sb.append(", completed=");
sb.append(completed);
sb.append(")");
return sb.toString();
}

View File

@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.consumer.internals;
import java.util.Objects;
/**
* This class combines Acknowledgements with the id of the node to use for acknowledging.
*/
public class NodeAcknowledgements {
private final int nodeId;
private final Acknowledgements acknowledgements;
public NodeAcknowledgements(int nodeId, Acknowledgements acknowledgements) {
this.nodeId = nodeId;
this.acknowledgements = Objects.requireNonNull(acknowledgements);
}
public int nodeId() {
return nodeId;
}
public Acknowledgements acknowledgements() {
return acknowledgements;
}
}

View File

@ -55,7 +55,7 @@ import java.util.Optional;
* to keep track of aborted transactions or the need to keep track of fetch position.
*/
public class ShareCompletedFetch {
final int nodeId;
final TopicIdPartition partition;
final ShareFetchResponseData.PartitionData partitionData;
final short requestVersion;
@ -79,12 +79,14 @@ public class ShareCompletedFetch {
ShareCompletedFetch(final LogContext logContext,
final BufferSupplier decompressionBufferSupplier,
final int nodeId,
final TopicIdPartition partition,
final ShareFetchResponseData.PartitionData partitionData,
final ShareFetchMetricsAggregator metricAggregator,
final short requestVersion) {
this.log = logContext.logger(org.apache.kafka.clients.consumer.internals.ShareCompletedFetch.class);
this.decompressionBufferSupplier = decompressionBufferSupplier;
this.nodeId = nodeId;
this.partition = partition;
this.partitionData = partitionData;
this.metricAggregator = metricAggregator;
@ -156,7 +158,7 @@ public class ShareCompletedFetch {
final int maxRecords,
final boolean checkCrcs) {
// Creating an empty ShareInFlightBatch
ShareInFlightBatch<K, V> inFlightBatch = new ShareInFlightBatch<>(partition);
ShareInFlightBatch<K, V> inFlightBatch = new ShareInFlightBatch<>(nodeId, partition);
if (cachedBatchException != null) {
// If the event that a CRC check fails, reject the entire record batch because it is corrupt.

View File

@ -23,6 +23,7 @@ import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentR
import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
import org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementCommitCallbackEvent;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
@ -87,8 +88,8 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi
private final IdempotentCloser idempotentCloser = new IdempotentCloser();
private Uuid memberId;
private boolean fetchMoreRecords = false;
private final Map<TopicIdPartition, Acknowledgements> fetchAcknowledgementsToSend;
private final Map<TopicIdPartition, Acknowledgements> fetchAcknowledgementsInFlight;
private final Map<Integer, Map<TopicIdPartition, Acknowledgements>> fetchAcknowledgementsToSend;
private final Map<Integer, Map<TopicIdPartition, Acknowledgements>> fetchAcknowledgementsInFlight;
private final Map<Integer, Tuple<AcknowledgeRequestState>> acknowledgeRequestStates;
private final long retryBackoffMs;
private final long retryBackoffMaxMs;
@ -146,7 +147,6 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi
Map<Node, ShareSessionHandler> handlerMap = new HashMap<>();
Map<String, Uuid> topicIds = metadata.topicIds();
Set<TopicIdPartition> fetchedPartitions = new HashSet<>();
for (TopicPartition partition : partitionsToFetch()) {
Optional<Node> leaderOpt = metadata.currentLeader(partition).leader;
@ -172,73 +172,60 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi
k -> sessionHandlers.computeIfAbsent(node.id(), n -> new ShareSessionHandler(logContext, n, memberId)));
TopicIdPartition tip = new TopicIdPartition(topicId, partition);
Acknowledgements acknowledgementsToSend = fetchAcknowledgementsToSend.remove(tip);
if (acknowledgementsToSend != null) {
metricsManager.recordAcknowledgementSent(acknowledgementsToSend.size());
fetchAcknowledgementsInFlight.put(tip, acknowledgementsToSend);
Acknowledgements acknowledgementsToSend = null;
Map<TopicIdPartition, Acknowledgements> nodeAcksFromFetchMap = fetchAcknowledgementsToSend.get(node.id());
if (nodeAcksFromFetchMap != null) {
acknowledgementsToSend = nodeAcksFromFetchMap.remove(tip);
if (acknowledgementsToSend != null) {
metricsManager.recordAcknowledgementSent(acknowledgementsToSend.size());
fetchAcknowledgementsInFlight.computeIfAbsent(node.id(), k -> new HashMap<>()).put(tip, acknowledgementsToSend);
}
}
handler.addPartitionToFetch(tip, acknowledgementsToSend);
fetchedPartitions.add(tip);
topicNamesMap.putIfAbsent(new IdAndPartition(tip.topicId(), tip.partition()), tip.topic());
log.debug("Added fetch request for partition {} to node {}", tip, node.id());
}
}
// Map storing the list of partitions to forget in the upcoming request.
Map<Node, List<TopicIdPartition>> partitionsToForgetMap = new HashMap<>();
// Iterate over the session handlers to see if there are acknowledgements to be sent for partitions
// which are no longer part of the current subscription, or whose records were fetched from a
// previous leader.
Cluster cluster = metadata.fetch();
// Iterating over the session handlers to see if there are acknowledgements to be sent for partitions
// which are no longer part of the current subscription.
sessionHandlers.forEach((nodeId, sessionHandler) -> {
Node node = cluster.nodeById(nodeId);
if (node != null) {
if (nodesWithPendingRequests.contains(node.id())) {
log.trace("Skipping fetch because previous fetch request to {} has not been processed", node.id());
log.trace("Skipping fetch because previous fetch request to {} has not been processed", nodeId);
} else {
for (TopicIdPartition tip : sessionHandler.sessionPartitions()) {
if (!fetchedPartitions.contains(tip)) {
Acknowledgements acknowledgementsToSend = fetchAcknowledgementsToSend.remove(tip);
Map<TopicIdPartition, Acknowledgements> nodeAcksFromFetchMap = fetchAcknowledgementsToSend.get(nodeId);
if (nodeAcksFromFetchMap != null) {
nodeAcksFromFetchMap.forEach((tip, acks) -> {
metricsManager.recordAcknowledgementSent(acks.size());
fetchAcknowledgementsInFlight.computeIfAbsent(node.id(), k -> new HashMap<>()).put(tip, acks);
if (acknowledgementsToSend != null) {
metricsManager.recordAcknowledgementSent(acknowledgementsToSend.size());
fetchAcknowledgementsInFlight.put(tip, acknowledgementsToSend);
sessionHandler.addPartitionToAcknowledgeOnly(tip, acks);
handlerMap.put(node, sessionHandler);
sessionHandler.addPartitionToFetch(tip, acknowledgementsToSend);
handlerMap.put(node, sessionHandler);
topicNamesMap.putIfAbsent(new IdAndPartition(tip.topicId(), tip.partition()), tip.topic());
log.debug("Added fetch request for previously subscribed partition {} to node {}", tip, nodeId);
});
partitionsToForgetMap.putIfAbsent(node, new ArrayList<>());
partitionsToForgetMap.get(node).add(tip);
topicNamesMap.putIfAbsent(new IdAndPartition(tip.topicId(), tip.partition()), tip.topic());
fetchedPartitions.add(tip);
log.debug("Added fetch request for previously subscribed partition {} to node {}", tip, node.id());
}
}
nodeAcksFromFetchMap.clear();
}
}
}
});
Map<Node, ShareFetchRequest.Builder> builderMap = new LinkedHashMap<>();
for (Map.Entry<Node, ShareSessionHandler> entry : handlerMap.entrySet()) {
ShareFetchRequest.Builder builder = entry.getValue().newShareFetchBuilder(groupId, fetchConfig);
Node node = entry.getKey();
if (partitionsToForgetMap.containsKey(node)) {
if (builder.data().forgottenTopicsData() == null) {
builder.data().setForgottenTopicsData(new ArrayList<>());
}
builder.updateForgottenData(partitionsToForgetMap.get(node));
}
builderMap.put(node, builder);
}
List<UnsentRequest> requests = builderMap.entrySet().stream().map(entry -> {
// Iterate over the share session handlers and build a list of UnsentRequests
List<UnsentRequest> requests = handlerMap.entrySet().stream().map(entry -> {
Node target = entry.getKey();
ShareSessionHandler handler = entry.getValue();
log.trace("Building ShareFetch request to send to node {}", target.id());
ShareFetchRequest.Builder requestBuilder = entry.getValue();
ShareFetchRequest.Builder requestBuilder = handler.newShareFetchBuilder(groupId, fetchConfig);
nodesWithPendingRequests.add(target.id());
@ -255,14 +242,29 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi
return new PollResult(requests);
}
public void fetch(Map<TopicIdPartition, Acknowledgements> acknowledgementsMap) {
public void fetch(Map<TopicIdPartition, NodeAcknowledgements> acknowledgementsMap) {
if (!fetchMoreRecords) {
log.debug("Fetch more data");
fetchMoreRecords = true;
}
// The acknowledgements sent via ShareFetch are stored in this map.
acknowledgementsMap.forEach((tip, acks) -> fetchAcknowledgementsToSend.merge(tip, acks, Acknowledgements::merge));
acknowledgementsMap.forEach((tip, nodeAcks) -> {
int nodeId = nodeAcks.nodeId();
Map<TopicIdPartition, Acknowledgements> currentNodeAcknowledgementsMap = fetchAcknowledgementsToSend.get(nodeId);
if (currentNodeAcknowledgementsMap != null) {
Acknowledgements currentAcknowledgementsForNode = currentNodeAcknowledgementsMap.get(tip);
if (currentAcknowledgementsForNode != null) {
currentAcknowledgementsForNode.merge(nodeAcks.acknowledgements());
} else {
currentNodeAcknowledgementsMap.put(tip, nodeAcks.acknowledgements());
}
} else {
Map<TopicIdPartition, Acknowledgements> nodeAcknowledgementsMap = new HashMap<>();
nodeAcknowledgementsMap.put(tip, nodeAcks.acknowledgements());
fetchAcknowledgementsToSend.put(nodeId, nodeAcknowledgementsMap);
}
});
}
/**
@ -282,7 +284,8 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi
log.trace("Skipping acknowledge request because previous request to {} has not been processed, so acks are not sent", nodeId);
} else {
isAsyncSent.set(false);
// First, the acknowledgements from commitAsync is sent.
// First, the acknowledgements from commitAsync are sent.
maybeBuildRequest(requestStates.getValue().getAsyncRequest(), currentTimeMs, true, isAsyncSent).ifPresent(unsentRequests::add);
// Check to ensure we start processing commitSync/close only if there are no commitAsync requests left to process.
@ -305,7 +308,6 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi
}
}
}
}
PollResult pollResult = null;
@ -455,7 +457,7 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi
* @return The future which completes when the acknowledgements finished
*/
public CompletableFuture<Map<TopicIdPartition, Acknowledgements>> commitSync(
final Map<TopicIdPartition, Acknowledgements> acknowledgementsMap,
final Map<TopicIdPartition, NodeAcknowledgements> acknowledgementsMap,
final long deadlineMs) {
final AtomicInteger resultCount = new AtomicInteger();
final CompletableFuture<Map<TopicIdPartition, Acknowledgements>> future = new CompletableFuture<>();
@ -471,17 +473,18 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi
// Add the incoming commitSync() request to the queue.
Map<TopicIdPartition, Acknowledgements> acknowledgementsMapForNode = new HashMap<>();
for (TopicIdPartition tip : sessionHandler.sessionPartitions()) {
Acknowledgements acknowledgements = acknowledgementsMap.get(tip);
if (acknowledgements != null) {
acknowledgementsMapForNode.put(tip, acknowledgements);
NodeAcknowledgements nodeAcknowledgements = acknowledgementsMap.get(tip);
if ((nodeAcknowledgements != null) && (nodeAcknowledgements.nodeId() == node.id())) {
acknowledgementsMapForNode.put(tip, nodeAcknowledgements.acknowledgements());
metricsManager.recordAcknowledgementSent(acknowledgements.size());
metricsManager.recordAcknowledgementSent(nodeAcknowledgements.acknowledgements().size());
log.debug("Added sync acknowledge request for partition {} to node {}", tip.topicPartition(), node.id());
resultCount.incrementAndGet();
}
}
acknowledgeRequestStates.get(nodeId).addSyncRequest(new AcknowledgeRequestState(logContext,
if (!acknowledgementsMapForNode.isEmpty()) {
acknowledgeRequestStates.get(nodeId).addSyncRequest(new AcknowledgeRequestState(logContext,
ShareConsumeRequestManager.class.getSimpleName() + ":1",
deadlineMs,
retryBackoffMs,
@ -491,7 +494,8 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi
acknowledgementsMapForNode,
resultHandler,
AcknowledgeRequestType.COMMIT_SYNC
));
));
}
}
});
@ -505,7 +509,7 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi
*
* @param acknowledgementsMap The acknowledgements to commit
*/
public void commitAsync(final Map<TopicIdPartition, Acknowledgements> acknowledgementsMap) {
public void commitAsync(final Map<TopicIdPartition, NodeAcknowledgements> acknowledgementsMap) {
final Cluster cluster = metadata.fetch();
final ResultHandler resultHandler = new ResultHandler(Optional.empty());
@ -517,8 +521,9 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi
acknowledgeRequestStates.putIfAbsent(nodeId, new Tuple<>(null, null, null));
for (TopicIdPartition tip : sessionHandler.sessionPartitions()) {
Acknowledgements acknowledgements = acknowledgementsMap.get(tip);
if (acknowledgements != null) {
NodeAcknowledgements nodeAcknowledgements = acknowledgementsMap.get(tip);
if ((nodeAcknowledgements != null) && (nodeAcknowledgements.nodeId() == node.id())) {
Acknowledgements acknowledgements = nodeAcknowledgements.acknowledgements();
acknowledgementsMapForNode.put(tip, acknowledgements);
metricsManager.recordAcknowledgementSent(acknowledgements.size());
@ -560,7 +565,7 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi
*
* @return The future which completes when the acknowledgements finished
*/
public CompletableFuture<Void> acknowledgeOnClose(final Map<TopicIdPartition, Acknowledgements> acknowledgementsMap,
public CompletableFuture<Void> acknowledgeOnClose(final Map<TopicIdPartition, NodeAcknowledgements> acknowledgementsMap,
final long deadlineMs) {
final Cluster cluster = metadata.fetch();
final AtomicInteger resultCount = new AtomicInteger();
@ -572,23 +577,29 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi
Node node = cluster.nodeById(nodeId);
if (node != null) {
Map<TopicIdPartition, Acknowledgements> acknowledgementsMapForNode = new HashMap<>();
for (TopicIdPartition tip : sessionHandler.sessionPartitions()) {
Acknowledgements acknowledgements = acknowledgementsMap.getOrDefault(tip, Acknowledgements.empty());
Acknowledgements acksFromShareFetch = fetchAcknowledgementsToSend.remove(tip);
if (acksFromShareFetch != null) {
acknowledgements.merge(acksFromShareFetch);
acknowledgementsMap.forEach((tip, nodeAcks) -> {
Acknowledgements acknowledgements = Acknowledgements.empty();
Map<TopicIdPartition, Acknowledgements> nodeAcksFromFetchMap = fetchAcknowledgementsToSend.get(nodeId);
if (nodeAcksFromFetchMap != null) {
Acknowledgements acksFromFetchMap = nodeAcksFromFetchMap.remove(tip);
if (acksFromFetchMap != null) {
acknowledgements.merge(acksFromFetchMap);
}
}
if (acknowledgements != null && !acknowledgements.isEmpty()) {
if (nodeAcks.nodeId() == node.id()) {
acknowledgements.merge(nodeAcks.acknowledgements());
}
if (!acknowledgements.isEmpty()) {
acknowledgementsMapForNode.put(tip, acknowledgements);
metricsManager.recordAcknowledgementSent(acknowledgements.size());
log.debug("Added closing acknowledge request for partition {} to node {}", tip.topicPartition(), node.id());
resultCount.incrementAndGet();
}
}
});
acknowledgeRequestStates.putIfAbsent(nodeId, new Tuple<>(null, null, null));
@ -611,7 +622,6 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi
resultHandler,
AcknowledgeRequestType.CLOSE
));
}
}
});
@ -664,14 +674,17 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi
log.debug("ShareFetch for partition {} returned fetch data {}", tip, partitionData);
Acknowledgements acks = fetchAcknowledgementsInFlight.remove(tip);
if (acks != null) {
if (partitionData.acknowledgeErrorCode() != Errors.NONE.code()) {
metricsManager.recordFailedAcknowledgements(acks.size());
Map<TopicIdPartition, Acknowledgements> nodeAcknowledgementsInFlight = fetchAcknowledgementsInFlight.get(fetchTarget.id());
if (nodeAcknowledgementsInFlight != null) {
Acknowledgements acks = nodeAcknowledgementsInFlight.remove(tip);
if (acks != null) {
if (partitionData.acknowledgeErrorCode() != Errors.NONE.code()) {
metricsManager.recordFailedAcknowledgements(acks.size());
}
acks.complete(Errors.forCode(partitionData.acknowledgeErrorCode()).exception());
Map<TopicIdPartition, Acknowledgements> acksMap = Collections.singletonMap(tip, acks);
maybeSendShareAcknowledgeCommitCallbackEvent(acksMap);
}
acks.setAcknowledgeErrorCode(Errors.forCode(partitionData.acknowledgeErrorCode()));
Map<TopicIdPartition, Acknowledgements> acksMap = Collections.singletonMap(tip, acks);
maybeSendShareAcknowledgeCommitCallbackEvent(acksMap);
}
Errors partitionError = Errors.forCode(partitionData.errorCode());
@ -686,6 +699,7 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi
ShareCompletedFetch completedFetch = new ShareCompletedFetch(
logContext,
BufferSupplier.create(),
fetchTarget.id(),
tip,
partitionData,
shareFetchMetricsAggregator,
@ -727,12 +741,19 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi
partition.partitionIndex(),
metadata.topicNames().get(topic.topicId()));
Acknowledgements acks = fetchAcknowledgementsInFlight.remove(tip);
if (acks != null) {
metricsManager.recordFailedAcknowledgements(acks.size());
acks.setAcknowledgeErrorCode(Errors.forException(error));
Map<TopicIdPartition, Acknowledgements> acksMap = Collections.singletonMap(tip, acks);
maybeSendShareAcknowledgeCommitCallbackEvent(acksMap);
Map<TopicIdPartition, Acknowledgements> nodeAcknowledgementsInFlight = fetchAcknowledgementsInFlight.get(fetchTarget.id());
if (nodeAcknowledgementsInFlight != null) {
Acknowledgements acks = nodeAcknowledgementsInFlight.remove(tip);
if (acks != null) {
metricsManager.recordFailedAcknowledgements(acks.size());
if (error instanceof KafkaException) {
acks.complete((KafkaException) error);
} else {
acks.complete(Errors.UNKNOWN_SERVER_ERROR.exception());
}
Map<TopicIdPartition, Acknowledgements> acksMap = Collections.singletonMap(tip, acks);
maybeSendShareAcknowledgeCommitCallbackEvent(acksMap);
}
}
}));
} finally {
@ -1065,7 +1086,7 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi
void handleAcknowledgeErrorCode(TopicIdPartition tip, Errors acknowledgeErrorCode) {
Acknowledgements acks = inFlightAcknowledgements.get(tip);
if (acks != null) {
acks.setAcknowledgeErrorCode(acknowledgeErrorCode);
acks.complete(acknowledgeErrorCode.exception());
}
resultHandler.complete(tip, acks, onCommitAsync());
}
@ -1077,14 +1098,14 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi
void handleAcknowledgeTimedOut(TopicIdPartition tip) {
Acknowledgements acks = incompleteAcknowledgements.get(tip);
if (acks != null) {
acks.setAcknowledgeErrorCode(Errors.REQUEST_TIMED_OUT);
acks.complete(Errors.REQUEST_TIMED_OUT.exception());
}
resultHandler.complete(tip, acks, onCommitAsync());
}
/**
* Set the error code for all remaining acknowledgements in the event
* of a session error which prevents the remains acknowledgements from
* of a session error which prevents the remaining acknowledgements from
* being sent.
*/
void handleSessionErrorCode(Errors errorCode) {
@ -1093,7 +1114,7 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi
acknowledgementsMapToClear.forEach((tip, acks) -> {
if (acks != null) {
acks.setAcknowledgeErrorCode(errorCode);
acks.complete(errorCode.exception());
}
resultHandler.complete(tip, acks, onCommitAsync());
});
@ -1174,6 +1195,7 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi
if (!isCommitAsync && acknowledgements != null) {
result.put(partition, acknowledgements);
}
// For commitAsync, we do not wait for other results to complete, we prepare a background event
// for every ShareAcknowledgeResponse.
// For commitAsync, we send out a background event for every TopicIdPartition, so we use a singletonMap each time.
@ -1291,6 +1313,5 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi
public String toString() {
return super.toString().toLowerCase(Locale.ROOT);
}
}
}

View File

@ -54,7 +54,6 @@ import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.InvalidGroupIdException;
import org.apache.kafka.common.errors.TimeoutException;
@ -62,7 +61,6 @@ import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter;
import org.apache.kafka.common.telemetry.internals.ClientTelemetryUtils;
@ -611,7 +609,7 @@ public class ShareConsumerImpl<K, V> implements ShareConsumerDelegate<K, V> {
private ShareFetch<K, V> pollForFetches(final Timer timer) {
long pollTimeout = Math.min(applicationEventHandler.maximumTimeToWait(), timer.remainingMs());
Map<TopicIdPartition, Acknowledgements> acknowledgementsMap = currentFetch.takeAcknowledgedRecords();
Map<TopicIdPartition, NodeAcknowledgements> acknowledgementsMap = currentFetch.takeAcknowledgedRecords();
// If data is available already, return it immediately
final ShareFetch<K, V> fetch = collect(acknowledgementsMap);
@ -636,7 +634,7 @@ public class ShareConsumerImpl<K, V> implements ShareConsumerDelegate<K, V> {
return collect(Collections.emptyMap());
}
private ShareFetch<K, V> collect(Map<TopicIdPartition, Acknowledgements> acknowledgementsMap) {
private ShareFetch<K, V> collect(Map<TopicIdPartition, NodeAcknowledgements> acknowledgementsMap) {
if (currentFetch.isEmpty()) {
final ShareFetch<K, V> fetch = fetchCollector.collect(fetchBuffer);
if (fetch.isEmpty()) {
@ -709,7 +707,7 @@ public class ShareConsumerImpl<K, V> implements ShareConsumerDelegate<K, V> {
acknowledgeBatchIfImplicitAcknowledgement(false);
Timer requestTimer = time.timer(timeout.toMillis());
Map<TopicIdPartition, Acknowledgements> acknowledgementsMap = acknowledgementsToSend();
Map<TopicIdPartition, NodeAcknowledgements> acknowledgementsMap = acknowledgementsToSend();
if (acknowledgementsMap.isEmpty()) {
return Collections.emptyMap();
} else {
@ -721,16 +719,11 @@ public class ShareConsumerImpl<K, V> implements ShareConsumerDelegate<K, V> {
Map<TopicIdPartition, Optional<KafkaException>> result = new HashMap<>();
Map<TopicIdPartition, Acknowledgements> completedAcknowledgements = ConsumerUtils.getResult(commitFuture);
completedAcknowledgements.forEach((tip, acks) -> {
Errors ackErrorCode = acks.getAcknowledgeErrorCode();
if (ackErrorCode == null) {
KafkaException exception = acks.getAcknowledgeException();
if (exception == null) {
result.put(tip, Optional.empty());
} else {
ApiException exception = ackErrorCode.exception();
if (exception == null) {
result.put(tip, Optional.empty());
} else {
result.put(tip, Optional.of(ackErrorCode.exception()));
}
result.put(tip, Optional.of(exception));
}
});
return result;
@ -757,7 +750,7 @@ public class ShareConsumerImpl<K, V> implements ShareConsumerDelegate<K, V> {
// If using implicit acknowledgement, acknowledge the previously fetched records
acknowledgeBatchIfImplicitAcknowledgement(false);
Map<TopicIdPartition, Acknowledgements> acknowledgementsMap = acknowledgementsToSend();
Map<TopicIdPartition, NodeAcknowledgements> acknowledgementsMap = acknowledgementsToSend();
if (!acknowledgementsMap.isEmpty()) {
ShareAcknowledgeAsyncEvent event = new ShareAcknowledgeAsyncEvent(acknowledgementsMap);
applicationEventHandler.add(event);
@ -1045,7 +1038,7 @@ public class ShareConsumerImpl<K, V> implements ShareConsumerDelegate<K, V> {
/**
* Returns any ready acknowledgements to be sent to the cluster.
*/
private Map<TopicIdPartition, Acknowledgements> acknowledgementsToSend() {
private Map<TopicIdPartition, NodeAcknowledgements> acknowledgementsToSend() {
return currentFetch.takeAcknowledgedRecords();
}

View File

@ -138,14 +138,15 @@ public class ShareFetch<K, V> {
* to send. If some records were not acknowledged, the in-flight records will not be empty after this
* method.
*
* @return The map of acknowledgements to send
* @return The map of acknowledgements to send, along with node information
*/
public Map<TopicIdPartition, Acknowledgements> takeAcknowledgedRecords() {
Map<TopicIdPartition, Acknowledgements> acknowledgementMap = new LinkedHashMap<>();
public Map<TopicIdPartition, NodeAcknowledgements> takeAcknowledgedRecords() {
Map<TopicIdPartition, NodeAcknowledgements> acknowledgementMap = new LinkedHashMap<>();
batches.forEach((tip, batch) -> {
int nodeId = batch.nodeId();
Acknowledgements acknowledgements = batch.takeAcknowledgedRecords();
if (!acknowledgements.isEmpty())
acknowledgementMap.put(tip, acknowledgements);
acknowledgementMap.put(tip, new NodeAcknowledgements(nodeId, acknowledgements));
});
return acknowledgementMap;
}

View File

@ -29,6 +29,7 @@ import java.util.TreeMap;
import java.util.TreeSet;
public class ShareInFlightBatch<K, V> {
private final int nodeId;
final TopicIdPartition partition;
private final Map<Long, ConsumerRecord<K, V>> inFlightRecords;
private final Set<Long> acknowledgedRecords;
@ -36,7 +37,8 @@ public class ShareInFlightBatch<K, V> {
private KafkaException exception;
private boolean hasCachedException = false;
public ShareInFlightBatch(TopicIdPartition partition) {
public ShareInFlightBatch(int nodeId, TopicIdPartition partition) {
this.nodeId = nodeId;
this.partition = partition;
inFlightRecords = new TreeMap<>();
acknowledgedRecords = new TreeSet<>();
@ -87,6 +89,10 @@ public class ShareInFlightBatch<K, V> {
return inFlightRecords.size();
}
int nodeId() {
return nodeId;
}
Acknowledgements takeAcknowledgedRecords() {
// Usually, all records will be acknowledged, so we can just clear the in-flight records leaving
// an empty batch, which will trigger more fetching

View File

@ -68,12 +68,12 @@ public class ShareSessionHandler {
*/
private final LinkedHashMap<TopicPartition, TopicIdPartition> sessionPartitions;
/*
/**
* The partitions to be included in the next ShareFetch request.
*/
private LinkedHashMap<TopicPartition, TopicIdPartition> nextPartitions;
/*
/**
* The acknowledgements to be included in the next ShareFetch/ShareAcknowledge request.
*/
private LinkedHashMap<TopicIdPartition, Acknowledgements> nextAcknowledgements;
@ -103,6 +103,10 @@ public class ShareSessionHandler {
}
}
public void addPartitionToAcknowledgeOnly(TopicIdPartition topicIdPartition, Acknowledgements partitionAcknowledgements) {
nextAcknowledgements.put(topicIdPartition, partitionAcknowledgements);
}
public ShareFetchRequest.Builder newShareFetchBuilder(String groupId, FetchConfig fetchConfig) {
List<TopicIdPartition> added = new ArrayList<>();
List<TopicIdPartition> removed = new ArrayList<>();

View File

@ -16,21 +16,21 @@
*/
package org.apache.kafka.clients.consumer.internals.events;
import org.apache.kafka.clients.consumer.internals.Acknowledgements;
import org.apache.kafka.clients.consumer.internals.NodeAcknowledgements;
import org.apache.kafka.common.TopicIdPartition;
import java.util.Map;
public class ShareAcknowledgeAsyncEvent extends ApplicationEvent {
private final Map<TopicIdPartition, Acknowledgements> acknowledgementsMap;
private final Map<TopicIdPartition, NodeAcknowledgements> acknowledgementsMap;
public ShareAcknowledgeAsyncEvent(final Map<TopicIdPartition, Acknowledgements> acknowledgementsMap) {
public ShareAcknowledgeAsyncEvent(final Map<TopicIdPartition, NodeAcknowledgements> acknowledgementsMap) {
super(Type.SHARE_ACKNOWLEDGE_ASYNC);
this.acknowledgementsMap = acknowledgementsMap;
}
public Map<TopicIdPartition, Acknowledgements> acknowledgementsMap() {
public Map<TopicIdPartition, NodeAcknowledgements> acknowledgementsMap() {
return acknowledgementsMap;
}
}

View File

@ -16,21 +16,21 @@
*/
package org.apache.kafka.clients.consumer.internals.events;
import org.apache.kafka.clients.consumer.internals.Acknowledgements;
import org.apache.kafka.clients.consumer.internals.NodeAcknowledgements;
import org.apache.kafka.common.TopicIdPartition;
import java.util.Map;
public class ShareAcknowledgeOnCloseEvent extends CompletableApplicationEvent<Void> {
private final Map<TopicIdPartition, Acknowledgements> acknowledgementsMap;
private final Map<TopicIdPartition, NodeAcknowledgements> acknowledgementsMap;
public ShareAcknowledgeOnCloseEvent(final Map<TopicIdPartition, Acknowledgements> acknowledgementsMap, final long deadlineMs) {
public ShareAcknowledgeOnCloseEvent(final Map<TopicIdPartition, NodeAcknowledgements> acknowledgementsMap, final long deadlineMs) {
super(Type.SHARE_ACKNOWLEDGE_ON_CLOSE, deadlineMs);
this.acknowledgementsMap = acknowledgementsMap;
}
public Map<TopicIdPartition, Acknowledgements> acknowledgementsMap() {
public Map<TopicIdPartition, NodeAcknowledgements> acknowledgementsMap() {
return acknowledgementsMap;
}

View File

@ -17,20 +17,21 @@
package org.apache.kafka.clients.consumer.internals.events;
import org.apache.kafka.clients.consumer.internals.Acknowledgements;
import org.apache.kafka.clients.consumer.internals.NodeAcknowledgements;
import org.apache.kafka.common.TopicIdPartition;
import java.util.Map;
public class ShareAcknowledgeSyncEvent extends CompletableApplicationEvent<Map<TopicIdPartition, Acknowledgements>> {
private final Map<TopicIdPartition, Acknowledgements> acknowledgementsMap;
private final Map<TopicIdPartition, NodeAcknowledgements> acknowledgementsMap;
public ShareAcknowledgeSyncEvent(final Map<TopicIdPartition, Acknowledgements> acknowledgementsMap, final long deadlineMs) {
public ShareAcknowledgeSyncEvent(final Map<TopicIdPartition, NodeAcknowledgements> acknowledgementsMap, final long deadlineMs) {
super(Type.SHARE_ACKNOWLEDGE_SYNC, deadlineMs);
this.acknowledgementsMap = acknowledgementsMap;
}
public Map<TopicIdPartition, Acknowledgements> acknowledgementsMap() {
public Map<TopicIdPartition, NodeAcknowledgements> acknowledgementsMap() {
return acknowledgementsMap;
}

View File

@ -16,21 +16,21 @@
*/
package org.apache.kafka.clients.consumer.internals.events;
import org.apache.kafka.clients.consumer.internals.Acknowledgements;
import org.apache.kafka.clients.consumer.internals.NodeAcknowledgements;
import org.apache.kafka.common.TopicIdPartition;
import java.util.Map;
public class ShareFetchEvent extends ApplicationEvent {
private final Map<TopicIdPartition, Acknowledgements> acknowledgementsMap;
private final Map<TopicIdPartition, NodeAcknowledgements> acknowledgementsMap;
public ShareFetchEvent(Map<TopicIdPartition, Acknowledgements> acknowledgementsMap) {
public ShareFetchEvent(Map<TopicIdPartition, NodeAcknowledgements> acknowledgementsMap) {
super(Type.SHARE_FETCH);
this.acknowledgementsMap = acknowledgementsMap;
}
public Map<TopicIdPartition, Acknowledgements> acknowledgementsMap() {
public Map<TopicIdPartition, NodeAcknowledgements> acknowledgementsMap() {
return acknowledgementsMap;
}

View File

@ -85,7 +85,7 @@ class AcknowledgementCommitCallbackHandlerTest {
Acknowledgements acknowledgements = Acknowledgements.empty();
acknowledgements.add(0L, AcknowledgeType.ACCEPT);
acknowledgements.add(1L, AcknowledgeType.REJECT);
acknowledgements.setAcknowledgeErrorCode(Errors.INVALID_RECORD_STATE);
acknowledgements.complete(Errors.INVALID_RECORD_STATE.exception());
acknowledgementsMap.put(tip0, acknowledgements);
acknowledgementCommitCallbackHandler.onComplete(Collections.singletonList(acknowledgementsMap));
@ -101,7 +101,7 @@ class AcknowledgementCommitCallbackHandlerTest {
Acknowledgements acknowledgements = Acknowledgements.empty();
acknowledgements.add(0L, AcknowledgeType.ACCEPT);
acknowledgements.add(1L, AcknowledgeType.REJECT);
acknowledgements.setAcknowledgeErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED);
acknowledgements.complete(Errors.TOPIC_AUTHORIZATION_FAILED.exception());
acknowledgementsMap.put(tip0, acknowledgements);
acknowledgementCommitCallbackHandler.onComplete(Collections.singletonList(acknowledgementsMap));
@ -116,12 +116,12 @@ class AcknowledgementCommitCallbackHandlerTest {
Acknowledgements acknowledgements = Acknowledgements.empty();
acknowledgements.add(0L, AcknowledgeType.ACCEPT);
acknowledgements.add(1L, AcknowledgeType.REJECT);
acknowledgements.setAcknowledgeErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED);
acknowledgements.complete(Errors.TOPIC_AUTHORIZATION_FAILED.exception());
acknowledgementsMap.put(tip0, acknowledgements);
Acknowledgements acknowledgements1 = Acknowledgements.empty();
acknowledgements1.add(0L, AcknowledgeType.RELEASE);
acknowledgements1.setAcknowledgeErrorCode(Errors.INVALID_RECORD_STATE);
acknowledgements1.complete(Errors.INVALID_RECORD_STATE.exception());
acknowledgementsMap.put(tip1, acknowledgements1);
Map<TopicIdPartition, Acknowledgements> acknowledgementsMap2 = new HashMap<>();

View File

@ -367,6 +367,7 @@ public class ShareCompletedFetchTest {
return new ShareCompletedFetch(
logContext,
BufferSupplier.create(),
0,
TIP,
partitionData,
shareFetchMetricsAggregator,

View File

@ -38,7 +38,9 @@ import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.compress.Compression;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.DisconnectException;
import org.apache.kafka.common.errors.ShareSessionNotFoundException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.internals.ClusterResourceListeners;
@ -112,6 +114,7 @@ import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZE
import static org.apache.kafka.clients.consumer.internals.events.CompletableEvent.calculateDeadlineMs;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
@ -268,7 +271,7 @@ public class ShareConsumeRequestManagerTest {
Acknowledgements acknowledgements = Acknowledgements.empty();
acknowledgements.add(1L, AcknowledgeType.ACCEPT);
shareConsumeRequestManager.fetch(Collections.singletonMap(tip0, acknowledgements));
shareConsumeRequestManager.fetch(Collections.singletonMap(tip0, new NodeAcknowledgements(0, acknowledgements)));
assertEquals(1, sendFetches());
assertFalse(shareConsumeRequestManager.hasCompletedFetches());
@ -287,7 +290,7 @@ public class ShareConsumeRequestManagerTest {
Acknowledgements acknowledgements2 = Acknowledgements.empty();
acknowledgements2.add(2L, AcknowledgeType.REJECT);
shareConsumeRequestManager.fetch(Collections.singletonMap(tip0, acknowledgements2));
shareConsumeRequestManager.fetch(Collections.singletonMap(tip0, new NodeAcknowledgements(0, acknowledgements2)));
assertEquals(1, sendFetches());
assertFalse(shareConsumeRequestManager.hasCompletedFetches());
@ -328,7 +331,7 @@ public class ShareConsumeRequestManagerTest {
acknowledgements.add(2L, AcknowledgeType.ACCEPT);
acknowledgements.add(3L, AcknowledgeType.REJECT);
shareConsumeRequestManager.commitSync(Collections.singletonMap(tip0, acknowledgements), time.milliseconds() + 2000);
shareConsumeRequestManager.commitSync(Collections.singletonMap(tip0, new NodeAcknowledgements(0, acknowledgements)), time.milliseconds() + 2000);
assertEquals(1, shareConsumeRequestManager.sendAcknowledgements());
@ -361,7 +364,7 @@ public class ShareConsumeRequestManagerTest {
acknowledgements.add(2L, AcknowledgeType.ACCEPT);
acknowledgements.add(3L, AcknowledgeType.REJECT);
shareConsumeRequestManager.commitAsync(Collections.singletonMap(tip0, acknowledgements));
shareConsumeRequestManager.commitAsync(Collections.singletonMap(tip0, new NodeAcknowledgements(0, acknowledgements)));
assertEquals(1, shareConsumeRequestManager.sendAcknowledgements());
@ -395,20 +398,20 @@ public class ShareConsumeRequestManagerTest {
acknowledgements.add(1L, AcknowledgeType.ACCEPT);
acknowledgements.add(2L, AcknowledgeType.ACCEPT);
shareConsumeRequestManager.commitAsync(Collections.singletonMap(tip0, acknowledgements));
shareConsumeRequestManager.commitAsync(Collections.singletonMap(tip0, new NodeAcknowledgements(0, acknowledgements)));
assertEquals(1, shareConsumeRequestManager.sendAcknowledgements());
Acknowledgements acknowledgements2 = Acknowledgements.empty();
acknowledgements2.add(3L, AcknowledgeType.REJECT);
shareConsumeRequestManager.commitAsync(Collections.singletonMap(tip0, acknowledgements2));
shareConsumeRequestManager.commitAsync(Collections.singletonMap(tip0, new NodeAcknowledgements(0, acknowledgements2)));
client.prepareResponse(null, true);
networkClientDelegate.poll(time.timer(0));
assertEquals(Collections.singletonMap(tip0, acknowledgements), completedAcknowledgements.get(0));
assertEquals(Errors.UNKNOWN_SERVER_ERROR, completedAcknowledgements.get(0).get(tip0).getAcknowledgeErrorCode());
assertInstanceOf(UnknownServerException.class, completedAcknowledgements.get(0).get(tip0).getAcknowledgeException());
completedAcknowledgements.clear();
assertEquals(1, shareConsumeRequestManager.requestStates(0).getAsyncRequest().getAcknowledgementsToSendCount(tip0));
@ -419,7 +422,7 @@ public class ShareConsumeRequestManagerTest {
assertNull(shareConsumeRequestManager.requestStates(0));
// The callback for these unsent acknowledgements will be invoked with an error code.
assertEquals(Collections.singletonMap(tip0, acknowledgements2), completedAcknowledgements.get(0));
assertEquals(Errors.SHARE_SESSION_NOT_FOUND, completedAcknowledgements.get(0).get(tip0).getAcknowledgeErrorCode());
assertInstanceOf(ShareSessionNotFoundException.class, completedAcknowledgements.get(0).get(tip0).getAcknowledgeException());
});
// Attempt a normal fetch to check if nodesWithPendingRequests is empty.
@ -451,14 +454,14 @@ public class ShareConsumeRequestManagerTest {
acknowledgements.add(1L, AcknowledgeType.ACCEPT);
// Piggyback acknowledgements
shareConsumeRequestManager.fetch(Collections.singletonMap(tip0, acknowledgements));
shareConsumeRequestManager.fetch(Collections.singletonMap(tip0, new NodeAcknowledgements(0, acknowledgements)));
// Remaining acknowledgements sent with close().
Acknowledgements acknowledgements2 = Acknowledgements.empty();
acknowledgements2.add(2L, AcknowledgeType.ACCEPT);
acknowledgements2.add(3L, AcknowledgeType.REJECT);
shareConsumeRequestManager.acknowledgeOnClose(Collections.singletonMap(tip0, acknowledgements2),
shareConsumeRequestManager.acknowledgeOnClose(Collections.singletonMap(tip0, new NodeAcknowledgements(0, acknowledgements2)),
calculateDeadlineMs(time.timer(100)));
assertEquals(1, shareConsumeRequestManager.sendAcknowledgements());
@ -468,7 +471,7 @@ public class ShareConsumeRequestManagerTest {
assertEquals(1, completedAcknowledgements.size());
Acknowledgements mergedAcks = acknowledgements.merge(acknowledgements2);
mergedAcks.setAcknowledgeErrorCode(Errors.NONE);
mergedAcks.complete(null);
// Verifying that all 3 offsets were acknowledged as part of the final ShareAcknowledge on close.
assertEquals(mergedAcks.getAcknowledgementsTypeMap(), completedAcknowledgements.get(0).get(tip0).getAcknowledgementsTypeMap());
assertTrue(shareConsumeRequestManager.hasCompletedFetches());
@ -495,7 +498,7 @@ public class ShareConsumeRequestManagerTest {
acknowledgements.add(2L, AcknowledgeType.ACCEPT);
acknowledgements.add(3L, AcknowledgeType.REJECT);
shareConsumeRequestManager.commitAsync(Collections.singletonMap(tip0, acknowledgements));
shareConsumeRequestManager.commitAsync(Collections.singletonMap(tip0, new NodeAcknowledgements(0, acknowledgements)));
shareConsumeRequestManager.acknowledgeOnClose(Collections.emptyMap(),
calculateDeadlineMs(time.timer(100)));
@ -533,7 +536,7 @@ public class ShareConsumeRequestManagerTest {
acknowledgements.add(2L, AcknowledgeType.ACCEPT);
acknowledgements.add(3L, AcknowledgeType.REJECT);
shareConsumeRequestManager.commitSync(Collections.singletonMap(tip0, acknowledgements),
shareConsumeRequestManager.commitSync(Collections.singletonMap(tip0, new NodeAcknowledgements(0, acknowledgements)),
calculateDeadlineMs(time.timer(100)));
shareConsumeRequestManager.acknowledgeOnClose(Collections.emptyMap(),
calculateDeadlineMs(time.timer(100)));
@ -654,14 +657,14 @@ public class ShareConsumeRequestManagerTest {
acknowledgements.add(2L, AcknowledgeType.ACCEPT);
acknowledgements.add(3L, AcknowledgeType.REJECT);
shareConsumeRequestManager.commitAsync(Collections.singletonMap(tip0, acknowledgements));
shareConsumeRequestManager.commitAsync(Collections.singletonMap(tip0, new NodeAcknowledgements(0, acknowledgements)));
Acknowledgements acknowledgements2 = Acknowledgements.empty();
acknowledgements.add(4L, AcknowledgeType.ACCEPT);
acknowledgements.add(5L, AcknowledgeType.ACCEPT);
acknowledgements.add(6L, AcknowledgeType.ACCEPT);
shareConsumeRequestManager.commitAsync(Collections.singletonMap(tip0, acknowledgements2));
shareConsumeRequestManager.commitAsync(Collections.singletonMap(tip0, new NodeAcknowledgements(0, acknowledgements2)));
assertEquals(6, shareConsumeRequestManager.requestStates(0).getAsyncRequest().getAcknowledgementsToSendCount(tip0));
@ -697,14 +700,14 @@ public class ShareConsumeRequestManagerTest {
acknowledgements.add(2L, AcknowledgeType.ACCEPT);
acknowledgements.add(3L, AcknowledgeType.REJECT);
shareConsumeRequestManager.commitAsync(Collections.singletonMap(tip0, acknowledgements));
shareConsumeRequestManager.commitAsync(Collections.singletonMap(tip0, new NodeAcknowledgements(0, acknowledgements)));
Acknowledgements acknowledgements2 = Acknowledgements.empty();
acknowledgements2.add(4L, AcknowledgeType.ACCEPT);
acknowledgements2.add(5L, AcknowledgeType.ACCEPT);
acknowledgements2.add(6L, AcknowledgeType.ACCEPT);
shareConsumeRequestManager.commitSync(Collections.singletonMap(tip0, acknowledgements2), 60000L);
shareConsumeRequestManager.commitSync(Collections.singletonMap(tip0, new NodeAcknowledgements(0, acknowledgements2)), 60000L);
assertEquals(3, shareConsumeRequestManager.requestStates(0).getAsyncRequest().getAcknowledgementsToSendCount(tip0));
assertEquals(1, shareConsumeRequestManager.requestStates(0).getSyncRequestQueue().size());
@ -755,7 +758,7 @@ public class ShareConsumeRequestManagerTest {
acknowledgements.add(5L, AcknowledgeType.RELEASE);
acknowledgements.add(6L, AcknowledgeType.ACCEPT);
shareConsumeRequestManager.commitSync(Collections.singletonMap(tip0, acknowledgements), 60000L);
shareConsumeRequestManager.commitSync(Collections.singletonMap(tip0, new NodeAcknowledgements(0, acknowledgements)), 60000L);
assertNull(shareConsumeRequestManager.requestStates(0).getAsyncRequest());
assertEquals(1, shareConsumeRequestManager.requestStates(0).getSyncRequestQueue().size());
@ -803,7 +806,7 @@ public class ShareConsumeRequestManagerTest {
fetchRecords();
// Piggyback acknowledgements
shareConsumeRequestManager.fetch(Collections.singletonMap(tip0, acknowledgements));
shareConsumeRequestManager.fetch(Collections.singletonMap(tip0, new NodeAcknowledgements(0, acknowledgements)));
assertEquals(1, sendFetches());
assertFalse(shareConsumeRequestManager.hasCompletedFetches());
@ -813,7 +816,7 @@ public class ShareConsumeRequestManagerTest {
Acknowledgements acknowledgements2 = Acknowledgements.empty();
acknowledgements2.add(3L, AcknowledgeType.ACCEPT);
shareConsumeRequestManager.fetch(Collections.singletonMap(tip0, acknowledgements2));
shareConsumeRequestManager.fetch(Collections.singletonMap(tip0, new NodeAcknowledgements(0, acknowledgements2)));
client.prepareResponse(fullFetchResponse(tip0, records, acquiredRecords, Errors.NONE));
networkClientDelegate.poll(time.timer(0));
@ -847,7 +850,7 @@ public class ShareConsumeRequestManagerTest {
assignFromSubscribed(singleton(tp1));
shareConsumeRequestManager.commitAsync(Collections.singletonMap(tip0, acknowledgements));
shareConsumeRequestManager.commitAsync(Collections.singletonMap(tip0, new NodeAcknowledgements(0, acknowledgements)));
assertEquals(1, shareConsumeRequestManager.sendAcknowledgements());
@ -878,7 +881,7 @@ public class ShareConsumeRequestManagerTest {
acknowledgements.add(2L, AcknowledgeType.ACCEPT);
// Send acknowledgements via ShareFetch
shareConsumeRequestManager.fetch(Collections.singletonMap(tip0, acknowledgements));
shareConsumeRequestManager.fetch(Collections.singletonMap(tip0, new NodeAcknowledgements(0, acknowledgements)));
fetchRecords();
// Subscription changes.
assignFromSubscribed(singleton(tp1));
@ -923,7 +926,7 @@ public class ShareConsumeRequestManagerTest {
acknowledgements.add(2L, AcknowledgeType.ACCEPT);
// Send acknowledgements via ShareFetch
shareConsumeRequestManager.fetch(Collections.singletonMap(tip0, acknowledgements));
shareConsumeRequestManager.fetch(Collections.singletonMap(tip0, new NodeAcknowledgements(0, acknowledgements)));
fetchRecords();
// Subscription changes.
subscriptions.assignFromSubscribed(Collections.singletonList(tp1));
@ -1045,7 +1048,7 @@ public class ShareConsumeRequestManagerTest {
acknowledgements.add(5L, AcknowledgeType.RELEASE);
acknowledgements.add(6L, AcknowledgeType.ACCEPT);
shareConsumeRequestManager.commitSync(Collections.singletonMap(tip0, acknowledgements), 60000L);
shareConsumeRequestManager.commitSync(Collections.singletonMap(tip0, new NodeAcknowledgements(0, acknowledgements)), 60000L);
assertNull(shareConsumeRequestManager.requestStates(0).getAsyncRequest());
assertEquals(1, shareConsumeRequestManager.requestStates(0).getSyncRequestQueue().size());
@ -1084,7 +1087,7 @@ public class ShareConsumeRequestManagerTest {
acknowledgements.add(1L, AcknowledgeType.ACCEPT);
acknowledgements.add(2L, AcknowledgeType.ACCEPT);
shareConsumeRequestManager.commitAsync(Collections.singletonMap(tip0, acknowledgements));
shareConsumeRequestManager.commitAsync(Collections.singletonMap(tip0, new NodeAcknowledgements(0, acknowledgements)));
assertEquals(1, shareConsumeRequestManager.sendAcknowledgements());
@ -1102,7 +1105,7 @@ public class ShareConsumeRequestManagerTest {
Acknowledgements acknowledgements2 = Acknowledgements.empty();
acknowledgements2.add(3L, AcknowledgeType.ACCEPT);
shareConsumeRequestManager.commitAsync(Collections.singletonMap(tip0, acknowledgements2));
shareConsumeRequestManager.commitAsync(Collections.singletonMap(tip0, new NodeAcknowledgements(0, acknowledgements2)));
TestUtils.retryOnExceptionWithTimeout(() -> assertEquals(1, shareConsumeRequestManager.sendAcknowledgements()));
@ -1146,9 +1149,9 @@ public class ShareConsumeRequestManagerTest {
acknowledgements2.add(1L, AcknowledgeType.ACCEPT);
acknowledgements2.add(2L, AcknowledgeType.ACCEPT);
Map<TopicIdPartition, Acknowledgements> acks = new HashMap<>();
acks.put(tip0, acknowledgements);
acks.put(t2ip0, acknowledgements2);
Map<TopicIdPartition, NodeAcknowledgements> acks = new HashMap<>();
acks.put(tip0, new NodeAcknowledgements(0, acknowledgements));
acks.put(t2ip0, new NodeAcknowledgements(0, acknowledgements2));
shareConsumeRequestManager.commitAsync(acks);
@ -1582,7 +1585,7 @@ public class ShareConsumeRequestManagerTest {
Acknowledgements acknowledgements = Acknowledgements.empty();
acknowledgements.add(1L, AcknowledgeType.ACCEPT);
shareConsumeRequestManager.fetch(Collections.singletonMap(tip0, acknowledgements));
shareConsumeRequestManager.fetch(Collections.singletonMap(tip0, new NodeAcknowledgements(0, acknowledgements)));
assertEquals(startingClusterMetadata, metadata.fetch());
@ -1687,7 +1690,7 @@ public class ShareConsumeRequestManagerTest {
Acknowledgements acknowledgements = Acknowledgements.empty();
acknowledgements.add(1L, AcknowledgeType.ACCEPT);
shareConsumeRequestManager.fetch(Collections.singletonMap(tip0, acknowledgements));
shareConsumeRequestManager.fetch(Collections.singletonMap(tip0, new NodeAcknowledgements(0, acknowledgements)));
// The metadata snapshot will have been updated with the new leader information
assertNotEquals(startingClusterMetadata, metadata.fetch());
@ -1727,6 +1730,239 @@ public class ShareConsumeRequestManagerTest {
assertEquals(1, fetchedRecords.size());
}
/**
* Test the scenario that the metadata indicated a change in leadership between ShareFetch requests such
* as could occur when metadata is periodically updated.
*/
@ParameterizedTest
@EnumSource(value = Errors.class, names = {"FENCED_LEADER_EPOCH", "NOT_LEADER_OR_FOLLOWER"})
public void testWhenLeadershipChangeBetweenShareFetchRequests(Errors error) {
buildRequestManager();
subscriptions.subscribeToShareGroup(Collections.singleton(topicName));
Set<TopicPartition> partitions = new HashSet<>();
partitions.add(tp0);
partitions.add(tp1);
subscriptions.assignFromSubscribed(partitions);
client.updateMetadata(
RequestTestUtils.metadataUpdateWithIds(2, singletonMap(topicName, 2),
tp -> validLeaderEpoch, topicIds, false));
Node nodeId0 = metadata.fetch().nodeById(0);
Node nodeId1 = metadata.fetch().nodeById(1);
Cluster startingClusterMetadata = metadata.fetch();
assertFalse(metadata.updateRequested());
assertEquals(2, sendFetches());
assertFalse(shareConsumeRequestManager.hasCompletedFetches());
LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> partitionData = new LinkedHashMap<>();
partitionData.put(tip0,
new ShareFetchResponseData.PartitionData()
.setPartitionIndex(tip0.topicPartition().partition())
.setErrorCode(Errors.NONE.code())
.setRecords(records)
.setAcquiredRecords(ShareCompletedFetchTest.acquiredRecords(1L, 1))
.setAcknowledgeErrorCode(Errors.NONE.code()));
client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, partitionData, Collections.emptyList()), nodeId0);
partitionData.clear();
partitionData.put(tip1,
new ShareFetchResponseData.PartitionData()
.setPartitionIndex(tip1.topicPartition().partition())
.setErrorCode(Errors.NONE.code()));
client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, partitionData, Collections.emptyList()), nodeId1);
networkClientDelegate.poll(time.timer(0));
assertTrue(shareConsumeRequestManager.hasCompletedFetches());
Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecords = fetchRecords();
assertTrue(partitionRecords.containsKey(tp0));
assertFalse(partitionRecords.containsKey(tp1));
List<ConsumerRecord<byte[], byte[]>> fetchedRecords = partitionRecords.get(tp0);
assertEquals(1, fetchedRecords.size());
Acknowledgements acknowledgements = Acknowledgements.empty();
acknowledgements.add(1L, AcknowledgeType.ACCEPT);
shareConsumeRequestManager.fetch(Collections.singletonMap(tip0, new NodeAcknowledgements(0, acknowledgements)));
assertEquals(startingClusterMetadata, metadata.fetch());
// Move the leadership of tp0 onto node 1
HashMap<TopicPartition, Metadata.LeaderIdAndEpoch> partitionLeaders = new HashMap<>();
partitionLeaders.put(tp0, new Metadata.LeaderIdAndEpoch(Optional.of(nodeId1.id()), Optional.of(validLeaderEpoch + 1)));
metadata.updatePartitionLeadership(partitionLeaders, List.of());
assertNotEquals(startingClusterMetadata, metadata.fetch());
// Even though the partitions are on the same leader, records were fetched on the previous leader.
// A fetch is sent to the previous leader to remove the partition from the share session and get the acknowledge error code.
assertEquals(2, sendFetches());
assertFalse(shareConsumeRequestManager.hasCompletedFetches());
partitionData.clear();
partitionData.put(tip0,
new ShareFetchResponseData.PartitionData()
.setPartitionIndex(tip0.topicPartition().partition())
.setErrorCode(Errors.NONE.code())
.setAcknowledgeErrorCode(error.code()));
client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, partitionData, Collections.emptyList()), nodeId0);
partitionData.clear();
partitionData.put(tip0,
new ShareFetchResponseData.PartitionData()
.setPartitionIndex(tip0.topicPartition().partition())
.setErrorCode(Errors.NONE.code())
.setRecords(records)
.setAcquiredRecords(ShareCompletedFetchTest.acquiredRecords(1L, 1))
.setAcknowledgeErrorCode(Errors.NONE.code()));
partitionData.put(tip1,
new ShareFetchResponseData.PartitionData()
.setPartitionIndex(tip1.topicPartition().partition())
.setRecords(records)
.setAcquiredRecords(ShareCompletedFetchTest.acquiredRecords(1L, 1))
.setAcknowledgeErrorCode(Errors.NONE.code()));
client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, partitionData, Collections.emptyList()), nodeId1);
networkClientDelegate.poll(time.timer(0));
assertTrue(shareConsumeRequestManager.hasCompletedFetches());
partitionRecords = fetchRecords();
assertTrue(partitionRecords.containsKey(tp0));
assertTrue(partitionRecords.containsKey(tp1));
fetchedRecords = partitionRecords.get(tp0);
assertEquals(1, fetchedRecords.size());
fetchedRecords = partitionRecords.get(tp1);
assertEquals(1, fetchedRecords.size());
}
@Test
void testWhenLeadershipChangedAfterDisconnected() {
buildRequestManager();
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
subscriptions.subscribeToShareGroup(Collections.singleton(topicName));
Set<TopicPartition> partitions = new HashSet<>();
partitions.add(tp0);
partitions.add(tp1);
subscriptions.assignFromSubscribed(partitions);
client.updateMetadata(
RequestTestUtils.metadataUpdateWithIds(2, singletonMap(topicName, 2),
tp -> validLeaderEpoch, topicIds, false));
Node nodeId0 = metadata.fetch().nodeById(0);
Node nodeId1 = metadata.fetch().nodeById(1);
Cluster startingClusterMetadata = metadata.fetch();
assertFalse(metadata.updateRequested());
assertEquals(2, sendFetches());
assertFalse(shareConsumeRequestManager.hasCompletedFetches());
LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> partitionData = new LinkedHashMap<>();
partitionData.put(tip0,
new ShareFetchResponseData.PartitionData()
.setPartitionIndex(tip0.topicPartition().partition())
.setErrorCode(Errors.NONE.code())
.setRecords(records)
.setAcquiredRecords(ShareCompletedFetchTest.acquiredRecords(1L, 1))
.setAcknowledgeErrorCode(Errors.NONE.code()));
client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, partitionData, Collections.emptyList()), nodeId0);
partitionData.clear();
partitionData.put(tip1,
new ShareFetchResponseData.PartitionData()
.setPartitionIndex(tip1.topicPartition().partition())
.setErrorCode(Errors.NONE.code()));
client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, partitionData, Collections.emptyList()), nodeId1);
networkClientDelegate.poll(time.timer(0));
assertTrue(shareConsumeRequestManager.hasCompletedFetches());
Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecords = fetchRecords();
assertTrue(partitionRecords.containsKey(tp0));
assertFalse(partitionRecords.containsKey(tp1));
List<ConsumerRecord<byte[], byte[]>> fetchedRecords = partitionRecords.get(tp0);
assertEquals(1, fetchedRecords.size());
Acknowledgements acknowledgements = Acknowledgements.empty();
acknowledgements.add(1, AcknowledgeType.ACCEPT);
shareConsumeRequestManager.fetch(Collections.singletonMap(tip0, new NodeAcknowledgements(0, acknowledgements)));
assertEquals(startingClusterMetadata, metadata.fetch());
acknowledgements = Acknowledgements.empty();
acknowledgements.add(1, AcknowledgeType.ACCEPT);
shareConsumeRequestManager.fetch(Collections.singletonMap(tip0, new NodeAcknowledgements(0, acknowledgements)));
assertEquals(2, sendFetches());
assertFalse(shareConsumeRequestManager.hasCompletedFetches());
partitionData.clear();
partitionData.put(tip0,
new ShareFetchResponseData.PartitionData()
.setPartitionIndex(tip0.topicPartition().partition())
.setErrorCode(Errors.NONE.code())
.setAcknowledgeErrorCode(Errors.NONE.code()));
client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, partitionData, Collections.emptyList()), nodeId0, true);
partitionData.clear();
partitionData.put(tip1,
new ShareFetchResponseData.PartitionData()
.setPartitionIndex(tip1.topicPartition().partition())
.setRecords(records)
.setAcquiredRecords(ShareCompletedFetchTest.acquiredRecords(1L, 1))
.setAcknowledgeErrorCode(Errors.NONE.code()));
client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, partitionData, Collections.emptyList()), nodeId1);
networkClientDelegate.poll(time.timer(0));
assertTrue(shareConsumeRequestManager.hasCompletedFetches());
// The node was disconnected, so the acknowledgement failed
assertInstanceOf(DisconnectException.class, completedAcknowledgements.get(0).get(tip0).getAcknowledgeException());
completedAcknowledgements.clear();
partitionRecords = fetchRecords();
assertFalse(partitionRecords.containsKey(tp0));
assertTrue(partitionRecords.containsKey(tp1));
fetchedRecords = partitionRecords.get(tp1);
assertEquals(1, fetchedRecords.size());
// Move the leadership of tp0 onto node 1
HashMap<TopicPartition, Metadata.LeaderIdAndEpoch> partitionLeaders = new HashMap<>();
partitionLeaders.put(tp0, new Metadata.LeaderIdAndEpoch(Optional.of(nodeId1.id()), Optional.of(validLeaderEpoch + 1)));
metadata.updatePartitionLeadership(partitionLeaders, List.of());
assertNotEquals(startingClusterMetadata, metadata.fetch());
shareConsumeRequestManager.fetch(Collections.singletonMap(tip1, new NodeAcknowledgements(1, acknowledgements)));
assertEquals(1, sendFetches());
assertFalse(shareConsumeRequestManager.hasCompletedFetches());
partitionData.clear();
partitionData.put(tip0,
new ShareFetchResponseData.PartitionData()
.setPartitionIndex(tip0.topicPartition().partition())
.setErrorCode(Errors.NONE.code())
.setRecords(records)
.setAcquiredRecords(ShareCompletedFetchTest.acquiredRecords(1L, 1))
.setAcknowledgeErrorCode(Errors.NONE.code()));
partitionData.put(tip1,
new ShareFetchResponseData.PartitionData()
.setPartitionIndex(tip1.topicPartition().partition()));
client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, partitionData, Collections.emptyList()), nodeId1);
networkClientDelegate.poll(time.timer(0));
assertTrue(shareConsumeRequestManager.hasCompletedFetches());
assertNull(completedAcknowledgements.get(0).get(tip1).getAcknowledgeException());
completedAcknowledgements.clear();
partitionRecords = fetchRecords();
assertTrue(partitionRecords.containsKey(tp0));
assertFalse(partitionRecords.containsKey(tp1));
fetchedRecords = partitionRecords.get(tp0);
assertEquals(1, fetchedRecords.size());
}
private ShareFetchResponse fetchResponseWithTopLevelError(TopicIdPartition tp, Errors error) {
Map<TopicIdPartition, ShareFetchResponseData.PartitionData> partitions = Collections.singletonMap(tp,
new ShareFetchResponseData.PartitionData()

View File

@ -240,7 +240,7 @@ public class ShareConsumerImplTest {
final String topicName = "foo";
final int partition = 3;
final TopicIdPartition tip = new TopicIdPartition(Uuid.randomUuid(), partition, topicName);
final ShareInFlightBatch<String, String> batch = new ShareInFlightBatch<>(tip);
final ShareInFlightBatch<String, String> batch = new ShareInFlightBatch<>(0, tip);
batch.addRecord(new ConsumerRecord<>(topicName, partition, 2, "key1", "value1"));
doAnswer(invocation -> {
consumer.wakeup();
@ -465,7 +465,7 @@ public class ShareConsumerImplTest {
final TopicPartition tp = new TopicPartition("topic", 0);
final TopicIdPartition tip = new TopicIdPartition(Uuid.randomUuid(), tp);
final ShareInFlightBatch<String, String> batch = new ShareInFlightBatch<>(tip);
final ShareInFlightBatch<String, String> batch = new ShareInFlightBatch<>(0, tip);
batch.addRecord(new ConsumerRecord<>("topic", 0, 2, "key1", "value1"));
final ShareFetch<String, String> fetch = ShareFetch.empty();
fetch.add(tip, batch);

View File

@ -170,6 +170,7 @@ public class ShareFetchBufferTest {
return new ShareCompletedFetch(
logContext,
BufferSupplier.create(),
0,
tp,
partitionData,
shareFetchMetricsAggregator,

View File

@ -347,6 +347,7 @@ public class ShareFetchCollectorTest {
return new ShareCompletedFetch(
logContext,
BufferSupplier.create(),
0,
topicAPartition0,
partitionData,
shareFetchMetricsAggregator,

View File

@ -337,7 +337,7 @@ public class ShareSessionHandlerTest {
}
@Test
public void testForgottenPartitions() {
public void testPartitionForgottenOnAcknowledgeOnly() {
String groupId = "G1";
Uuid memberId = Uuid.randomUuid();
ShareSessionHandler handler = new ShareSessionHandler(LOG_CONTEXT, 1, memberId);
@ -362,6 +362,42 @@ public class ShareSessionHandlerTest {
new RespEntry("foo", 0, topicId))));
handler.handleResponse(resp, ApiKeys.SHARE_FETCH.latestVersion(true));
// Remove the topic from the session by setting acknowledgements only - this is not asking to fetch records
ShareFetchRequestData requestData2 = handler.newShareFetchBuilder(groupId, fetchConfig).build().data();
handler.addPartitionToAcknowledgeOnly(foo0, Acknowledgements.empty());
assertEquals(Collections.singletonList(foo0), reqForgetList(requestData2, topicNames));
// Should have the same session ID, next epoch, and same ID usage
assertEquals(memberId.toString(), requestData2.memberId(), "Did not use same session");
assertEquals(1, requestData2.shareSessionEpoch(), "Did not have correct epoch");
}
@Test
public void testForgottenPartitions() {
String groupId = "G1";
Uuid memberId = Uuid.randomUuid();
ShareSessionHandler handler = new ShareSessionHandler(LOG_CONTEXT, 1, memberId);
// We want to test when all topics are removed from the session
Map<String, Uuid> topicIds = new HashMap<>();
Map<Uuid, String> topicNames = new HashMap<>();
Uuid topicId = addTopicId(topicIds, topicNames, "foo");
TopicIdPartition foo0 = new TopicIdPartition(topicId, 0, "foo");
handler.addPartitionToFetch(foo0, null);
ShareFetchRequestData requestData1 = handler.newShareFetchBuilder(groupId, fetchConfig).build().data();
assertMapsEqual(reqMap(foo0), handler.sessionPartitionMap());
ArrayList<TopicIdPartition> expectedToSend1 = new ArrayList<>();
expectedToSend1.add(new TopicIdPartition(topicId, 0, "foo"));
assertListEquals(expectedToSend1, reqFetchList(requestData1, topicNames));
ShareFetchResponse resp = new ShareFetchResponse(
new ShareFetchResponseData()
.setErrorCode(Errors.NONE.code())
.setThrottleTimeMs(0)
.setResponses(respList(
new RespEntry("foo", 0, topicId))));
handler.handleResponse(resp, ApiKeys.SHARE_FETCH.latestVersion(true));
// Remove the topic from the session
ShareFetchRequestData requestData2 = handler.newShareFetchBuilder(groupId, fetchConfig).build().data();
assertEquals(Collections.singletonList(foo0), reqForgetList(requestData2, topicNames));