KAFKA-18779: Validate responses from broker in client for ShareFetch and ShareAcknowledge RPCs. (#18939)

- Currently if we received extraneous topic partitions in the response
or if the response was missing some partitions requested, we were
processing the response as it came and even populated the callback with
these partitions.

- These invalid responses should be parsed at the
`ShareConsumeRequestManager`.

- If the response missed any acknowledgements for partitions that were
requested, then we fail the request with `InvalidRecordStateException`
and populate the callbacks.

- For any extraneous partitions in the response, we log an error and
ignore them.

Some refactors are also done in this PR in ShareConsumeRequestManager to
make the code more readable.

Reviewers: Andrew Schofield <aschofield@confluent.io>
This commit is contained in:
Shivsundar R 2025-02-24 05:27:24 -05:00 committed by GitHub
parent 289e958c39
commit 2880e04129
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 254 additions and 98 deletions

View File

@ -28,9 +28,11 @@ import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.InvalidRecordStateException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.internals.IdempotentCloser;
import org.apache.kafka.common.message.ShareAcknowledgeRequestData;
import org.apache.kafka.common.message.ShareAcknowledgeResponseData;
import org.apache.kafka.common.message.ShareFetchRequestData;
import org.apache.kafka.common.message.ShareFetchResponseData;
import org.apache.kafka.common.protocol.Errors;
@ -96,6 +98,7 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi
private final CompletableFuture<Void> closeFuture;
private boolean isAcknowledgementCommitCallbackRegistered = false;
private final Map<IdAndPartition, String> topicNamesMap = new HashMap<>();
private static final String INVALID_RESPONSE = "Acknowledgement not successful due to invalid response from broker";
ShareConsumeRequestManager(final Time time,
final LogContext logContext,
@ -366,7 +369,7 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi
AtomicBoolean isAsyncSent) {
boolean asyncSent = true;
try {
if (acknowledgeRequestState == null || (!acknowledgeRequestState.onClose() && acknowledgeRequestState.isEmpty())) {
if (acknowledgeRequestState == null || (!acknowledgeRequestState.isCloseRequest() && acknowledgeRequestState.isEmpty())) {
return Optional.empty();
}
@ -441,7 +444,7 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi
private boolean isRequestStateInProgress(AcknowledgeRequestState acknowledgeRequestState) {
if (acknowledgeRequestState == null) {
return false;
} else if (acknowledgeRequestState.onClose()) {
} else if (acknowledgeRequestState.isCloseRequest()) {
return !acknowledgeRequestState.isProcessed;
} else {
return !(acknowledgeRequestState.isEmpty());
@ -716,11 +719,12 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi
final Map<TopicIdPartition, ShareFetchResponseData.PartitionData> responseData = new LinkedHashMap<>();
response.data().responses().forEach(topicResponse ->
topicResponse.partitions().forEach(partition ->
responseData.put(new TopicIdPartition(topicResponse.topicId(),
partition.partitionIndex(),
metadata.topicNames().getOrDefault(topicResponse.topicId(),
topicNamesMap.remove(new IdAndPartition(topicResponse.topicId(), partition.partitionIndex())))), partition))
topicResponse.partitions().forEach(partition -> {
TopicIdPartition tip = lookupTopicId(topicResponse.topicId(), partition.partitionIndex());
if (tip != null) {
responseData.put(tip, partition);
}
})
);
final Set<TopicPartition> partitions = responseData.keySet().stream().map(TopicIdPartition::topicPartition).collect(Collectors.toSet());
@ -749,7 +753,7 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi
Errors partitionError = Errors.forCode(partitionData.errorCode());
if (partitionError == Errors.NOT_LEADER_OR_FOLLOWER || partitionError == Errors.FENCED_LEADER_EPOCH) {
log.debug("For {}, received error {}, with leaderIdAndEpoch {}", tip, partitionError, partitionData.currentLeader());
log.debug("For {}, received error {}, with leaderIdAndEpoch {} in ShareFetch", tip, partitionError, partitionData.currentLeader());
if (partitionData.currentLeader().leaderId() != -1 && partitionData.currentLeader().leaderEpoch() != -1) {
partitionsWithUpdatedLeaderInfo.put(tip.topicPartition(), new Metadata.LeaderIdAndEpoch(
Optional.of(partitionData.currentLeader().leaderId()), Optional.of(partitionData.currentLeader().leaderEpoch())));
@ -771,6 +775,14 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi
}
}
// Handle any acknowledgements which were not received in the response for this node.
if (fetchAcknowledgementsInFlight.get(fetchTarget.id()) != null) {
fetchAcknowledgementsInFlight.remove(fetchTarget.id()).forEach((partition, acknowledgements) -> {
acknowledgements.complete(new InvalidRecordStateException(INVALID_RESPONSE));
maybeSendShareAcknowledgeCommitCallbackEvent(Map.of(partition, acknowledgements));
});
}
if (!partitionsWithUpdatedLeaderInfo.isEmpty()) {
List<Node> leaderNodes = response.data().nodeEndpoints().stream()
.map(e -> new Node(e.nodeId(), e.host(), e.port(), e.rack()))
@ -797,13 +809,15 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi
}
requestData.topics().forEach(topic -> topic.partitions().forEach(partition -> {
TopicIdPartition tip = new TopicIdPartition(topic.topicId(),
partition.partitionIndex(),
metadata.topicNames().get(topic.topicId()));
TopicIdPartition tip = lookupTopicId(topic.topicId(), partition.partitionIndex());
if (tip == null) {
return;
}
Map<TopicIdPartition, Acknowledgements> nodeAcknowledgementsInFlight = fetchAcknowledgementsInFlight.get(fetchTarget.id());
if (nodeAcknowledgementsInFlight != null) {
Acknowledgements acks = nodeAcknowledgementsInFlight.remove(tip);
if (acks != null) {
metricsManager.recordFailedAcknowledgements(acks.size());
if (error instanceof KafkaException) {
@ -833,20 +847,21 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi
Map<TopicPartition, Metadata.LeaderIdAndEpoch> partitionsWithUpdatedLeaderInfo = new HashMap<>();
if (acknowledgeRequestState.onClose()) {
response.data().responses().forEach(topic -> topic.partitions().forEach(partition -> {
TopicIdPartition tip = new TopicIdPartition(topic.topicId(),
partition.partitionIndex(),
metadata.topicNames().get(topic.topicId()));
if (partition.errorCode() != Errors.NONE.code()) {
if (acknowledgeRequestState.isCloseRequest()) {
response.data().responses().forEach(topicResponse -> topicResponse.partitions().forEach(partitionData -> {
TopicIdPartition tip = lookupTopicId(topicResponse.topicId(), partitionData.partitionIndex());
if (tip == null) {
return;
}
if (partitionData.errorCode() != Errors.NONE.code()) {
metricsManager.recordFailedAcknowledgements(acknowledgeRequestState.getInFlightAcknowledgementsCount(tip));
}
acknowledgeRequestState.handleAcknowledgeErrorCode(tip, Errors.forCode(partition.errorCode()));
acknowledgeRequestState.handleAcknowledgeErrorCode(tip, Errors.forCode(partitionData.errorCode()));
}));
acknowledgeRequestState.onSuccessfulAttempt(responseCompletionTimeMs);
acknowledgeRequestState.processingComplete();
} else {
if (!acknowledgeRequestState.sessionHandler.handleResponse(response, resp.requestHeader().apiVersion())) {
// Received a response-level error code.
@ -856,59 +871,23 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi
// We retry the request until the timer expires, unless we are closing.
acknowledgeRequestState.moveAllToIncompleteAcks();
} else {
response.data().responses().forEach(shareAcknowledgeTopicResponse -> shareAcknowledgeTopicResponse.partitions().forEach(partitionData -> {
TopicIdPartition tip = new TopicIdPartition(shareAcknowledgeTopicResponse.topicId(),
partitionData.partitionIndex(),
metadata.topicNames().get(shareAcknowledgeTopicResponse.topicId()));
acknowledgeRequestState.handleAcknowledgeErrorCode(tip, response.error());
}));
acknowledgeRequestState.processPendingInFlightAcknowledgements(response.error().exception());
acknowledgeRequestState.processingComplete();
}
} else {
AtomicBoolean shouldRetry = new AtomicBoolean(false);
// Check all partition level error codes
response.data().responses().forEach(shareAcknowledgeTopicResponse -> shareAcknowledgeTopicResponse.partitions().forEach(partitionData -> {
response.data().responses().forEach(topicResponse -> topicResponse.partitions().forEach(partitionData -> {
Errors partitionError = Errors.forCode(partitionData.errorCode());
TopicIdPartition tip = new TopicIdPartition(shareAcknowledgeTopicResponse.topicId(),
partitionData.partitionIndex(),
metadata.topicNames().get(shareAcknowledgeTopicResponse.topicId()));
if (partitionError.exception() != null) {
boolean retry = false;
if (partitionError == Errors.NOT_LEADER_OR_FOLLOWER || partitionError == Errors.FENCED_LEADER_EPOCH) {
// If the leader has changed, there's no point in retrying the operation because the acquisition locks
// will have been released.
TopicPartition tp = new TopicPartition(metadata.topicNames().get(shareAcknowledgeTopicResponse.topicId()), partitionData.partitionIndex());
log.debug("For {}, received error {}, with leaderIdAndEpoch {}", tp, partitionError, partitionData.currentLeader());
if (partitionData.currentLeader().leaderId() != -1 && partitionData.currentLeader().leaderEpoch() != -1) {
partitionsWithUpdatedLeaderInfo.put(tp, new Metadata.LeaderIdAndEpoch(
Optional.of(partitionData.currentLeader().leaderId()), Optional.of(partitionData.currentLeader().leaderEpoch())));
}
} else if (partitionError.exception() instanceof RetriableException) {
retry = true;
}
if (retry) {
// Move to incomplete acknowledgements to retry
acknowledgeRequestState.moveToIncompleteAcks(tip);
shouldRetry.set(true);
} else {
metricsManager.recordFailedAcknowledgements(acknowledgeRequestState.getInFlightAcknowledgementsCount(tip));
acknowledgeRequestState.handleAcknowledgeErrorCode(tip, partitionError);
}
} else {
acknowledgeRequestState.handleAcknowledgeErrorCode(tip, partitionError);
TopicIdPartition tip = lookupTopicId(topicResponse.topicId(), partitionData.partitionIndex());
if (tip == null) {
return;
}
handlePartitionError(partitionData, partitionsWithUpdatedLeaderInfo, acknowledgeRequestState, partitionError, tip, shouldRetry);
}));
if (shouldRetry.get()) {
acknowledgeRequestState.onFailedAttempt(responseCompletionTimeMs);
} else {
acknowledgeRequestState.onSuccessfulAttempt(responseCompletionTimeMs);
acknowledgeRequestState.processingComplete();
}
processRetryLogic(acknowledgeRequestState, shouldRetry, responseCompletionTimeMs);
}
}
@ -927,7 +906,7 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi
log.debug("Removing pending request for node {} - success", fetchTarget.id());
nodesWithPendingRequests.remove(fetchTarget.id());
if (acknowledgeRequestState.onClose()) {
if (acknowledgeRequestState.isCloseRequest()) {
log.debug("Removing node from ShareSession {}", fetchTarget.id());
sessionHandlers.remove(fetchTarget.id());
}
@ -945,9 +924,11 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi
acknowledgeRequestState.onFailedAttempt(responseCompletionTimeMs);
requestData.topics().forEach(topic -> topic.partitions().forEach(partition -> {
TopicIdPartition tip = new TopicIdPartition(topic.topicId(),
partition.partitionIndex(),
metadata.topicNames().get(topic.topicId()));
TopicIdPartition tip = lookupTopicId(topic.topicId(), partition.partitionIndex());
if (tip == null) {
return;
}
metricsManager.recordFailedAcknowledgements(acknowledgeRequestState.getInFlightAcknowledgementsCount(tip));
acknowledgeRequestState.handleAcknowledgeErrorCode(tip, Errors.forException(error));
}));
@ -957,13 +938,81 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi
log.debug("Removing pending request for node {} - failed", fetchTarget.id());
nodesWithPendingRequests.remove(fetchTarget.id());
if (acknowledgeRequestState.onClose()) {
if (acknowledgeRequestState.isCloseRequest()) {
log.debug("Removing node from ShareSession {}", fetchTarget.id());
sessionHandlers.remove(fetchTarget.id());
}
}
}
private void handlePartitionError(ShareAcknowledgeResponseData.PartitionData partitionData,
Map<TopicPartition, Metadata.LeaderIdAndEpoch> partitionsWithUpdatedLeaderInfo,
AcknowledgeRequestState acknowledgeRequestState,
Errors partitionError,
TopicIdPartition tip,
AtomicBoolean shouldRetry) {
if (partitionError.exception() != null) {
boolean retry = false;
if (partitionError == Errors.NOT_LEADER_OR_FOLLOWER || partitionError == Errors.FENCED_LEADER_EPOCH) {
// If the leader has changed, there's no point in retrying the operation because the acquisition locks
// will have been released.
updateLeaderInfoMap(partitionData, partitionsWithUpdatedLeaderInfo, partitionError, tip.topicPartition());
} else if (partitionError.exception() instanceof RetriableException) {
retry = true;
}
if (retry) {
if (acknowledgeRequestState.moveToIncompleteAcks(tip)) {
shouldRetry.set(true);
}
} else {
metricsManager.recordFailedAcknowledgements(acknowledgeRequestState.getInFlightAcknowledgementsCount(tip));
acknowledgeRequestState.handleAcknowledgeErrorCode(tip, partitionError);
}
} else {
acknowledgeRequestState.handleAcknowledgeErrorCode(tip, partitionError);
}
}
private void processRetryLogic(AcknowledgeRequestState acknowledgeRequestState,
AtomicBoolean shouldRetry,
long responseCompletionTimeMs) {
if (shouldRetry.get()) {
acknowledgeRequestState.onFailedAttempt(responseCompletionTimeMs);
// Check for any acknowledgements that did not receive a response.
// These acknowledgements are failed with InvalidRecordStateException.
acknowledgeRequestState.processPendingInFlightAcknowledgements(new InvalidRecordStateException(INVALID_RESPONSE));
} else {
acknowledgeRequestState.onSuccessfulAttempt(responseCompletionTimeMs);
acknowledgeRequestState.processingComplete();
}
}
private void updateLeaderInfoMap(ShareAcknowledgeResponseData.PartitionData partitionData,
Map<TopicPartition, Metadata.LeaderIdAndEpoch> partitionsWithUpdatedLeaderInfo,
Errors partitionError,
TopicPartition tp) {
log.debug("For {}, received error {}, with leaderIdAndEpoch {} in ShareAcknowledge", tp, partitionError, partitionData.currentLeader());
if (partitionData.currentLeader().leaderId() != -1 && partitionData.currentLeader().leaderEpoch() != -1) {
partitionsWithUpdatedLeaderInfo.put(tp, new Metadata.LeaderIdAndEpoch(
Optional.of(partitionData.currentLeader().leaderId()),
Optional.of(partitionData.currentLeader().leaderEpoch())
));
}
}
private TopicIdPartition lookupTopicId(Uuid topicId, int partitionIndex) {
String topicName = metadata.topicNames().getOrDefault(topicId,
topicNamesMap.remove(new IdAndPartition(topicId, partitionIndex)));
if (topicName == null) {
log.error("Topic name not found in metadata for topicId {} and partitionIndex {}", topicId, partitionIndex);
return null;
}
return new TopicIdPartition(topicId, partitionIndex, topicName);
}
private List<TopicPartition> partitionsToFetch() {
return subscriptions.fetchablePartitions(tp -> true);
}
@ -1061,7 +1110,7 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi
UnsentRequest buildRequest() {
// If this is the closing request, close the share session by setting the final epoch
if (onClose()) {
if (isCloseRequest()) {
sessionHandler.notifyClose();
}
@ -1144,11 +1193,13 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi
* through a background event.
*/
void handleAcknowledgeErrorCode(TopicIdPartition tip, Errors acknowledgeErrorCode) {
Acknowledgements acks = inFlightAcknowledgements.get(tip);
Acknowledgements acks = inFlightAcknowledgements.remove(tip);
if (acks != null) {
acks.complete(acknowledgeErrorCode.exception());
resultHandler.complete(tip, acks, requestType);
} else {
log.error("Invalid partition {} received in ShareAcknowledge response", tip);
}
resultHandler.complete(tip, acks, onCommitAsync());
}
/**
@ -1159,8 +1210,8 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi
Acknowledgements acks = incompleteAcknowledgements.get(tip);
if (acks != null) {
acks.complete(Errors.REQUEST_TIMED_OUT.exception());
resultHandler.complete(tip, acks, requestType);
}
resultHandler.complete(tip, acks, onCommitAsync());
}
/**
@ -1176,7 +1227,7 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi
if (acks != null) {
acks.complete(errorCode.exception());
}
resultHandler.complete(tip, acks, onCommitAsync());
resultHandler.complete(tip, acks, requestType);
});
acknowledgementsMapToClear.clear();
processingComplete();
@ -1187,11 +1238,26 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi
}
void processingComplete() {
inFlightAcknowledgements.clear();
// If there are any pending inFlightAcknowledgements after processing the response, we fail them with an InvalidRecordStateException.
processPendingInFlightAcknowledgements(new InvalidRecordStateException(INVALID_RESPONSE));
resultHandler.completeIfEmpty();
isProcessed = true;
}
/**
* Fail any existing in-flight acknowledgements with the given exception and clear the map.
* We also send a background event to update {@link org.apache.kafka.clients.consumer.AcknowledgementCommitCallback }
*/
private void processPendingInFlightAcknowledgements(KafkaException exception) {
if (!inFlightAcknowledgements.isEmpty()) {
inFlightAcknowledgements.forEach((partition, acknowledgements) -> {
acknowledgements.complete(exception);
resultHandler.complete(partition, acknowledgements, requestType);
});
inFlightAcknowledgements.clear();
}
}
/**
* Moves all the in-flight acknowledgements to incomplete acknowledgements to retry
* in the next request.
@ -1208,21 +1274,25 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi
/**
* Moves the in-flight acknowledgements for a given partition to incomplete acknowledgements to retry
* in the next request.
*
* @param tip The TopicIdPartition for which we move the acknowledgements.
* @return True if the partition was sent in the request.
* <p> False if the partition was not part of the request, we log an error and ignore such partitions. </p>
*/
public void moveToIncompleteAcks(TopicIdPartition tip) {
public boolean moveToIncompleteAcks(TopicIdPartition tip) {
Acknowledgements acks = inFlightAcknowledgements.remove(tip);
if (acks != null) {
incompleteAcknowledgements.put(tip, acks);
return true;
} else {
log.error("Invalid partition {} received in ShareAcknowledge response", tip);
return false;
}
}
public boolean onClose() {
public boolean isCloseRequest() {
return requestType == AcknowledgeRequestType.CLOSE;
}
public boolean onCommitAsync() {
return requestType == AcknowledgeRequestType.COMMIT_ASYNC;
}
}
/**
@ -1251,21 +1321,19 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi
* Handle the result of a ShareAcknowledge request sent to one or more nodes and
* signal the completion when all results are known.
*/
public void complete(TopicIdPartition partition, Acknowledgements acknowledgements, boolean isCommitAsync) {
if (!isCommitAsync && acknowledgements != null) {
result.put(partition, acknowledgements);
}
// For commitAsync, we do not wait for other results to complete, we prepare a background event
// for every ShareAcknowledgeResponse.
// For commitAsync, we send out a background event for every TopicIdPartition, so we use a singletonMap each time.
if (isCommitAsync) {
public void complete(TopicIdPartition partition, Acknowledgements acknowledgements, AcknowledgeRequestType type) {
if (type.equals(AcknowledgeRequestType.COMMIT_ASYNC)) {
if (acknowledgements != null) {
maybeSendShareAcknowledgeCommitCallbackEvent(Map.of(partition, acknowledgements));
}
} else if (remainingResults != null && remainingResults.decrementAndGet() == 0) {
maybeSendShareAcknowledgeCommitCallbackEvent(result);
future.ifPresent(future -> future.complete(result));
} else {
if (acknowledgements != null) {
result.put(partition, acknowledgements);
}
if (remainingResults != null && remainingResults.decrementAndGet() == 0) {
maybeSendShareAcknowledgeCommitCallbackEvent(result);
future.ifPresent(future -> future.complete(result));
}
}
}

View File

@ -38,6 +38,7 @@ import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.compress.Compression;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.DisconnectException;
import org.apache.kafka.common.errors.InvalidRecordStateException;
import org.apache.kafka.common.errors.ShareSessionNotFoundException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.UnknownServerException;
@ -567,16 +568,16 @@ public class ShareConsumeRequestManagerTest {
ShareConsumeRequestManager.ResultHandler resultHandler = shareConsumeRequestManager.buildResultHandler(null, Optional.empty());
// Passing null acknowledgements should mean we do not send the background event at all.
resultHandler.complete(tip0, null, true);
resultHandler.complete(tip0, null, ShareConsumeRequestManager.AcknowledgeRequestType.COMMIT_ASYNC);
assertEquals(0, completedAcknowledgements.size());
// Setting isCommitAsync to false should still not send any background event
// as we have initialized remainingResults to null.
resultHandler.complete(tip0, acknowledgements, false);
resultHandler.complete(tip0, acknowledgements, ShareConsumeRequestManager.AcknowledgeRequestType.COMMIT_SYNC);
assertEquals(0, completedAcknowledgements.size());
// Sending non-null acknowledgements means we do send the background event
resultHandler.complete(tip0, acknowledgements, true);
resultHandler.complete(tip0, acknowledgements, ShareConsumeRequestManager.AcknowledgeRequestType.COMMIT_ASYNC);
assertEquals(3, completedAcknowledgements.get(0).get(tip0).size());
}
@ -599,16 +600,16 @@ public class ShareConsumeRequestManagerTest {
ShareConsumeRequestManager.ResultHandler resultHandler = shareConsumeRequestManager.buildResultHandler(resultCount, Optional.of(future));
// We only send the background event after all results have been completed.
resultHandler.complete(tip0, acknowledgements, false);
resultHandler.complete(tip0, acknowledgements, ShareConsumeRequestManager.AcknowledgeRequestType.COMMIT_SYNC);
assertEquals(0, completedAcknowledgements.size());
assertFalse(future.isDone());
resultHandler.complete(t2ip0, null, false);
resultHandler.complete(t2ip0, null, ShareConsumeRequestManager.AcknowledgeRequestType.COMMIT_SYNC);
assertEquals(0, completedAcknowledgements.size());
assertFalse(future.isDone());
// After third response is received, we send the background event.
resultHandler.complete(tip1, acknowledgements, false);
resultHandler.complete(tip1, acknowledgements, ShareConsumeRequestManager.AcknowledgeRequestType.COMMIT_SYNC);
assertEquals(1, completedAcknowledgements.size());
assertEquals(2, completedAcknowledgements.get(0).size());
assertEquals(3, completedAcknowledgements.get(0).get(tip0).size());
@ -1315,6 +1316,87 @@ public class ShareConsumeRequestManagerTest {
assertThrows(NullPointerException.class, (Executable) shareFetch.records().get(t2p0));
}
@Test
public void testShareFetchInvalidResponse() {
buildRequestManager();
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
subscriptions.subscribeToShareGroup(Collections.singleton(topicName));
subscriptions.assignFromSubscribed(Collections.singleton(tp0));
client.updateMetadata(
RequestTestUtils.metadataUpdateWithIds(1, Map.of(topicName, 1),
tp -> validLeaderEpoch, topicIds, false));
assertEquals(1, sendFetches());
assertFalse(shareConsumeRequestManager.hasCompletedFetches());
client.prepareResponse(fullFetchResponse(t2ip0, records, acquiredRecords, Errors.NONE));
networkClientDelegate.poll(time.timer(0));
assertFalse(shareConsumeRequestManager.hasCompletedFetches());
}
@Test
public void testShareAcknowledgeInvalidResponse() throws InterruptedException {
buildRequestManager();
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
subscriptions.subscribeToShareGroup(Collections.singleton(topicName));
subscriptions.assignFromSubscribed(Collections.singleton(tp0));
client.updateMetadata(
RequestTestUtils.metadataUpdateWithIds(1, Map.of(topicName, 1),
tp -> validLeaderEpoch, topicIds, false));
assertEquals(1, sendFetches());
assertFalse(shareConsumeRequestManager.hasCompletedFetches());
client.prepareResponse(fullFetchResponse(tip0, records, acquiredRecords, Errors.NONE));
networkClientDelegate.poll(time.timer(0));
assertTrue(shareConsumeRequestManager.hasCompletedFetches());
fetchRecords();
Acknowledgements acknowledgements = Acknowledgements.empty();
acknowledgements.add(1L, AcknowledgeType.ACCEPT);
shareConsumeRequestManager.commitAsync(Map.of(tip0, new NodeAcknowledgements(0, acknowledgements)));
assertEquals(1, shareConsumeRequestManager.sendAcknowledgements());
// If a top-level error is received, we still retry the acknowledgements independent of the topic-partitions received in the response.
client.prepareResponse(acknowledgeResponseWithTopLevelError(t2ip0, Errors.LEADER_NOT_AVAILABLE));
networkClientDelegate.poll(time.timer(0));
assertEquals(1, shareConsumeRequestManager.requestStates(0).getAsyncRequest().getIncompleteAcknowledgementsCount(tip0));
TestUtils.retryOnExceptionWithTimeout(() -> assertEquals(1, shareConsumeRequestManager.sendAcknowledgements()));
client.prepareResponse(fullAcknowledgeResponse(t2ip0, Errors.NONE));
networkClientDelegate.poll(time.timer(0));
// If we do not get the expected partitions in the response, we fail these acknowledgements with InvalidRecordStateException.
assertEquals(InvalidRecordStateException.class, completedAcknowledgements.get(0).get(tip0).getAcknowledgeException().getClass());
completedAcknowledgements.clear();
// Send remaining acknowledgements through piggybacking on the next fetch.
Acknowledgements acknowledgements1 = Acknowledgements.empty();
acknowledgements1.add(2L, AcknowledgeType.ACCEPT);
acknowledgements1.add(3L, AcknowledgeType.REJECT);
shareConsumeRequestManager.fetch(Map.of(tip0, new NodeAcknowledgements(0, acknowledgements1)));
assertEquals(1, sendFetches());
assertFalse(shareConsumeRequestManager.hasCompletedFetches());
client.prepareResponse(fullFetchResponse(t2ip0, records, acquiredRecords, Errors.NONE));
networkClientDelegate.poll(time.timer(0));
assertFalse(shareConsumeRequestManager.hasCompletedFetches());
// If we do not get the expected partitions in the response, we fail these acknowledgements with InvalidRecordStateException.
assertEquals(InvalidRecordStateException.class, completedAcknowledgements.get(0).get(tip0).getAcknowledgeException().getClass());
}
@Test
public void testCloseShouldBeIdempotent() {
buildRequestManager();
@ -2378,6 +2460,12 @@ public class ShareConsumeRequestManagerTest {
return ShareAcknowledgeResponse.of(Errors.NONE, 0, new LinkedHashMap<>(partitions), Collections.emptyList());
}
private ShareAcknowledgeResponse acknowledgeResponseWithTopLevelError(TopicIdPartition tp, Errors error) {
Map<TopicIdPartition, ShareAcknowledgeResponseData.PartitionData> partitions = Map.of(tp,
partitionDataForAcknowledge(tp, Errors.NONE));
return ShareAcknowledgeResponse.of(error, 0, new LinkedHashMap<>(partitions), Collections.emptyList());
}
private ShareAcknowledgeResponse fullAcknowledgeResponse(TopicIdPartition tp, Errors error) {
Map<TopicIdPartition, ShareAcknowledgeResponseData.PartitionData> partitions = Map.of(tp,
partitionDataForAcknowledge(tp, error));