KAFKA-17502: Modified commitSync() and close() handling in clients (#17136)

Currently the code in ShareConsumeRequestManager works on the basis that there can only be one commitSync()/close() at a time. But there is a chance these calls timeout on the application thread, but are still sent later on the background thread. This will mean the incoming commitSync()/close() will not be processed, resulting in possible loss of acknowledgements.

To cover this case, we will now have a list of AcknowledgeRequestStates to store the commitSyncs() and a separate requestState to store the close(). This queue will be processed one by one until its empty. For close(), we are still assuming there can only be one active close() at a time.

eviewers:  Andrew Schofield <aschofield@confluent.io>, Manikumar Reddy <manikumar.reddy@gmail.com>
This commit is contained in:
ShivsundarR 2024-09-13 16:54:05 +05:30 committed by GitHub
parent 02e3f7cc28
commit 3a79fabacf
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 149 additions and 82 deletions

View File

@ -50,10 +50,12 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
@ -84,7 +86,7 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi
private Uuid memberId;
private boolean fetchMoreRecords = false;
private final Map<TopicIdPartition, Acknowledgements> fetchAcknowledgementsMap;
private final Map<Integer, Pair<AcknowledgeRequestState>> acknowledgeRequestStates;
private final Map<Integer, Tuple<AcknowledgeRequestState>> acknowledgeRequestStates;
private final long retryBackoffMs;
private final long retryBackoffMaxMs;
private boolean closing = false;
@ -132,7 +134,7 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi
return pollResult;
}
if (!fetchMoreRecords || closing) {
if (!fetchMoreRecords) {
return PollResult.EMPTY;
}
@ -216,7 +218,7 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi
private PollResult processAcknowledgements(long currentTimeMs) {
List<UnsentRequest> unsentRequests = new ArrayList<>();
AtomicBoolean isAsyncDone = new AtomicBoolean();
for (Map.Entry<Integer, Pair<AcknowledgeRequestState>> requestStates : acknowledgeRequestStates.entrySet()) {
for (Map.Entry<Integer, Tuple<AcknowledgeRequestState>> requestStates : acknowledgeRequestStates.entrySet()) {
int nodeId = requestStates.getKey();
if (!isNodeFree(nodeId)) {
@ -226,10 +228,25 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi
// For commitAsync
maybeBuildRequest(requestStates.getValue().getAsyncRequest(), currentTimeMs, true, isAsyncDone).ifPresent(unsentRequests::add);
// Check to ensure we start processing commitSync/close only if there are no commitAsync requests left to process.
if (isAsyncDone.get()) {
// We try to process the close request only if we have processed the async and the sync requests for the node.
if (requestStates.getValue().getSyncRequestQueue() == null) {
if (!isNodeFree(nodeId)) {
log.trace("Skipping acknowledge request because previous request to {} has not been processed, so acks are not sent", nodeId);
} else if (isAsyncDone.get()) {
maybeBuildRequest(requestStates.getValue().getSyncRequest(), currentTimeMs, false, isAsyncDone).ifPresent(unsentRequests::add);
} else {
AcknowledgeRequestState closeRequestState = requestStates.getValue().getCloseRequest();
maybeBuildRequest(closeRequestState, currentTimeMs, false, isAsyncDone).ifPresent(unsentRequests::add);
}
} else {
for (AcknowledgeRequestState acknowledgeRequestState : requestStates.getValue().getSyncRequestQueue()) {
if (!isNodeFree(nodeId)) {
log.trace("Skipping acknowledge request because previous request to {} has not been processed, so acks are not sent", nodeId);
break;
}
maybeBuildRequest(acknowledgeRequestState, currentTimeMs, false, isAsyncDone).ifPresent(unsentRequests::add);
}
}
}
}
}
@ -242,12 +259,10 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi
pollResult = PollResult.EMPTY;
} else if (closing) {
if (!closeFuture.isDone()) {
log.trace("Completing acknowledgement on close");
closeFuture.complete(null);
}
pollResult = PollResult.EMPTY;
}
return pollResult;
}
@ -298,25 +313,56 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi
/**
* Prunes the empty acknowledgementRequestStates.
* Returns true if there are still some acknowledgements left to be processed.
* Returns true if there are still any acknowledgements left to be processed.
*/
private boolean checkAndRemoveCompletedAcknowledgements() {
boolean areAnyAcksLeft = false;
Iterator<Map.Entry<Integer, Pair<AcknowledgeRequestState>>> iterator = acknowledgeRequestStates.entrySet().iterator();
Iterator<Map.Entry<Integer, Tuple<AcknowledgeRequestState>>> iterator = acknowledgeRequestStates.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<Integer, Pair<AcknowledgeRequestState>> acknowledgeRequestStatePair = iterator.next();
if (isRequestStateInProgress(acknowledgeRequestStatePair.getValue().getAsyncRequest()) || isRequestStateInProgress(acknowledgeRequestStatePair.getValue().getSyncRequest())) {
Map.Entry<Integer, Tuple<AcknowledgeRequestState>> acknowledgeRequestStatePair = iterator.next();
boolean areAsyncAcksLeft = true, areSyncAcksLeft = true;
if (!isRequestStateInProgress(acknowledgeRequestStatePair.getValue().getAsyncRequest())) {
acknowledgeRequestStatePair.getValue().setAsyncRequest(null);
areAsyncAcksLeft = false;
}
if (!areRequestStatesInProgress(acknowledgeRequestStatePair.getValue().getSyncRequestQueue())) {
acknowledgeRequestStatePair.getValue().nullifySyncRequestQueue();
areSyncAcksLeft = false;
}
if (!isRequestStateInProgress(acknowledgeRequestStatePair.getValue().getCloseRequest())) {
acknowledgeRequestStatePair.getValue().setCloseRequest(null);
}
if (areAsyncAcksLeft || areSyncAcksLeft) {
areAnyAcksLeft = true;
} else if (!closing) {
} else if (acknowledgeRequestStatePair.getValue().getCloseRequest() == null) {
iterator.remove();
}
}
if (!acknowledgeRequestStates.isEmpty()) areAnyAcksLeft = true;
return areAnyAcksLeft;
}
private boolean isRequestStateInProgress(AcknowledgeRequestState acknowledgeRequestState) {
return acknowledgeRequestState != null && !(acknowledgeRequestState.isEmpty());
if (acknowledgeRequestState == null) {
return false;
} else if (acknowledgeRequestState.onClose()) {
return !acknowledgeRequestState.isProcessed;
} else {
return !(acknowledgeRequestState.isEmpty());
}
}
private boolean areRequestStatesInProgress(Queue<AcknowledgeRequestState> acknowledgeRequestStates) {
if (acknowledgeRequestStates == null) return false;
for (AcknowledgeRequestState acknowledgeRequestState : acknowledgeRequestStates) {
if (isRequestStateInProgress(acknowledgeRequestState)) {
return true;
}
}
return false;
}
/**
@ -340,15 +386,9 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi
sessionHandlers.forEach((nodeId, sessionHandler) -> {
Node node = cluster.nodeById(nodeId);
if (node != null) {
acknowledgeRequestStates.putIfAbsent(nodeId, new Pair<>(null, null));
acknowledgeRequestStates.putIfAbsent(nodeId, new Tuple<>(null, null, null));
// Ensure there is no commitSync()/close() request already present as they are blocking calls
// and only one request can be active at a time.
if (acknowledgeRequestStates.get(nodeId).getSyncRequest() != null && !acknowledgeRequestStates.get(nodeId).getSyncRequest().isEmpty()) {
log.error("Attempt to call commitSync() when there is an existing sync request for node {}", node.id());
future.completeExceptionally(
new IllegalStateException("Attempt to call commitSync() when there is an existing sync request for node : " + node.id()));
} else {
// Add the incoming commitSync() request to the queue.
Map<TopicIdPartition, Acknowledgements> acknowledgementsMapForNode = new HashMap<>();
for (TopicIdPartition tip : sessionHandler.sessionPartitions()) {
Acknowledgements acknowledgements = acknowledgementsMap.get(tip);
@ -361,9 +401,7 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi
}
}
// There can only be one commitSync()/close() happening at a time. So per node, there will be one acknowledge request state representing commitSync() and close().
acknowledgeRequestStates.get(nodeId).setSyncRequest(new AcknowledgeRequestState(logContext,
acknowledgeRequestStates.get(nodeId).addSyncRequest(new AcknowledgeRequestState(logContext,
ShareConsumeRequestManager.class.getSimpleName() + ":1",
deadlineMs,
retryBackoffMs,
@ -375,7 +413,7 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi
AcknowledgeRequestType.COMMIT_SYNC
));
}
}
});
resultHandler.completeIfEmpty();
@ -396,7 +434,7 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi
if (node != null) {
Map<TopicIdPartition, Acknowledgements> acknowledgementsMapForNode = new HashMap<>();
acknowledgeRequestStates.putIfAbsent(nodeId, new Pair<>(null, null));
acknowledgeRequestStates.putIfAbsent(nodeId, new Tuple<>(null, null, null));
for (TopicIdPartition tip : sessionHandler.sessionPartitions()) {
Acknowledgements acknowledgements = acknowledgementsMap.get(tip);
@ -470,17 +508,17 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi
}
}
acknowledgeRequestStates.putIfAbsent(nodeId, new Pair<>(null, null));
acknowledgeRequestStates.putIfAbsent(nodeId, new Tuple<>(null, null, null));
// Ensure there is no commitSync()/close() request already present as they are blocking calls
// Ensure there is no close() request already present as they are blocking calls
// and only one request can be active at a time.
if (acknowledgeRequestStates.get(nodeId).getSyncRequest() != null && !acknowledgeRequestStates.get(nodeId).getSyncRequest().isEmpty()) {
log.error("Attempt to call close() when there is an existing sync request for node {}-{}", node.id(), acknowledgeRequestStates.get(nodeId).getSyncRequest());
if (acknowledgeRequestStates.get(nodeId).getCloseRequest() != null && !acknowledgeRequestStates.get(nodeId).getCloseRequest().isEmpty()) {
log.error("Attempt to call close() when there is an existing close request for node {}-{}", node.id(), acknowledgeRequestStates.get(nodeId).getSyncRequestQueue());
closeFuture.completeExceptionally(
new IllegalStateException("Attempt to call close() when there is an existing sync request for node : " + node.id()));
new IllegalStateException("Attempt to call close() when there is an existing close request for node : " + node.id()));
} else {
// There can only be one commitSync()/close() happening at a time. So per node, there will be one acknowledge request state.
acknowledgeRequestStates.get(nodeId).setSyncRequest(new AcknowledgeRequestState(logContext,
// There can only be one close() happening at a time. So per node, there will be one acknowledge request state.
acknowledgeRequestStates.get(nodeId).setCloseRequest(new AcknowledgeRequestState(logContext,
ShareConsumeRequestManager.class.getSimpleName() + ":3",
deadlineMs,
retryBackoffMs,
@ -626,9 +664,6 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi
acknowledgeRequestState.onSuccessfulAttempt(responseCompletionTimeMs);
acknowledgeRequestState.processingComplete();
if (!closeFuture.isDone()) {
closeFuture.complete(null);
}
} else {
if (!acknowledgeRequestState.sessionHandler.handleResponse(response, resp.requestHeader().apiVersion())) {
// Received a response-level error code.
@ -678,7 +713,9 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi
}
}
if (acknowledgeRequestState.isProcessed) {
metricsManager.recordLatency(resp.destination(), resp.requestLatencyMs());
}
} finally {
log.debug("Removing pending request for node {} - success", fetchTarget.id());
nodesWithPendingRequests.remove(fetchTarget.id());
@ -785,6 +822,13 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi
*/
private final AcknowledgeRequestType requestType;
/**
* Boolean to indicate if the request has been processed,
* Set to true once we process the response and do not retry the request.
* Initialized to false every time we build a request.
*/
private boolean isProcessed;
AcknowledgeRequestState(LogContext logContext,
String owner,
long deadlineMs,
@ -803,6 +847,7 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi
this.inFlightAcknowledgements = new HashMap<>();
this.incompleteAcknowledgements = new HashMap<>();
this.requestType = acknowledgeRequestType;
this.isProcessed = false;
}
UnsentRequest buildRequest() {
@ -823,6 +868,7 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi
log.trace("Building acknowledgements to send : {}", finalAcknowledgementsToSend);
nodesWithPendingRequests.add(nodeId);
isProcessed = false;
BiConsumer<ClientResponse, Throwable> responseHandler = (clientResponse, error) -> {
if (error != null) {
@ -925,6 +971,7 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi
void processingComplete() {
inFlightAcknowledgements.clear();
resultHandler.completeIfEmpty();
isProcessed = true;
}
/**
@ -1009,33 +1056,50 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi
}
}
static class Pair<V> {
static class Tuple<V> {
private V asyncRequest;
private V syncRequest;
private Queue<V> syncRequestQueue;
private V closeRequest;
public Pair(V asyncRequest, V syncRequest) {
public Tuple(V asyncRequest, Queue<V> syncRequestQueue, V closeRequest) {
this.asyncRequest = asyncRequest;
this.syncRequest = syncRequest;
this.syncRequestQueue = syncRequestQueue;
this.closeRequest = closeRequest;
}
public void setAsyncRequest(V asyncRequest) {
this.asyncRequest = asyncRequest;
}
public void setSyncRequest(V second) {
this.syncRequest = second;
public void nullifySyncRequestQueue() {
this.syncRequestQueue = null;
}
public void addSyncRequest(V syncRequest) {
if (syncRequestQueue == null) {
syncRequestQueue = new LinkedList<>();
}
this.syncRequestQueue.add(syncRequest);
}
public void setCloseRequest(V closeRequest) {
this.closeRequest = closeRequest;
}
public V getAsyncRequest() {
return asyncRequest;
}
public V getSyncRequest() {
return syncRequest;
public Queue<V> getSyncRequestQueue() {
return syncRequestQueue;
}
public V getCloseRequest() {
return closeRequest;
}
}
Pair<AcknowledgeRequestState> requestStates(int nodeId) {
Tuple<AcknowledgeRequestState> requestStates(int nodeId) {
return acknowledgeRequestStates.get(nodeId);
}

View File

@ -711,11 +711,11 @@ public class ShareConsumerImpl<K, V> implements ShareConsumerDelegate<K, V> {
completedAcknowledgements.forEach((tip, acks) -> {
Errors ackErrorCode = acks.getAcknowledgeErrorCode();
if (ackErrorCode == null) {
result.put(tip, null);
result.put(tip, Optional.empty());
} else {
ApiException exception = ackErrorCode.exception();
if (exception == null) {
result.put(tip, null);
result.put(tip, Optional.empty());
} else {
result.put(tip, Optional.of(ackErrorCode.exception()));
}

View File

@ -500,7 +500,8 @@ public class ShareConsumeRequestManagerTest {
shareConsumeRequestManager.commitSync(Collections.singletonMap(tip0, acknowledgements2), 60000L);
assertEquals(3, shareConsumeRequestManager.requestStates(0).getAsyncRequest().getAcknowledgementsToSendCount(tip0));
assertEquals(3, shareConsumeRequestManager.requestStates(0).getSyncRequest().getAcknowledgementsToSendCount(tip0));
assertEquals(1, shareConsumeRequestManager.requestStates(0).getSyncRequestQueue().size());
assertEquals(3, shareConsumeRequestManager.requestStates(0).getSyncRequestQueue().peek().getAcknowledgementsToSendCount(tip0));
assertEquals(1, shareConsumeRequestManager.sendAcknowledgements());
@ -510,16 +511,18 @@ public class ShareConsumeRequestManagerTest {
networkClientDelegate.poll(time.timer(0));
assertEquals(0, shareConsumeRequestManager.requestStates(0).getAsyncRequest().getInFlightAcknowledgementsCount(tip0));
assertEquals(3, shareConsumeRequestManager.requestStates(0).getSyncRequest().getAcknowledgementsToSendCount(tip0));
assertEquals(1, shareConsumeRequestManager.requestStates(0).getSyncRequestQueue().size());
assertEquals(3, shareConsumeRequestManager.requestStates(0).getSyncRequestQueue().peek().getAcknowledgementsToSendCount(tip0));
assertEquals(1, shareConsumeRequestManager.sendAcknowledgements());
assertEquals(3, shareConsumeRequestManager.requestStates(0).getSyncRequest().getInFlightAcknowledgementsCount(tip0));
assertEquals(1, shareConsumeRequestManager.requestStates(0).getSyncRequestQueue().size());
assertEquals(3, shareConsumeRequestManager.requestStates(0).getSyncRequestQueue().peek().getInFlightAcknowledgementsCount(tip0));
client.prepareResponse(fullAcknowledgeResponse(tip0, Errors.NONE));
networkClientDelegate.poll(time.timer(0));
assertEquals(0, shareConsumeRequestManager.requestStates(0).getSyncRequest().getInFlightAcknowledgementsCount(tip0));
assertEquals(0, shareConsumeRequestManager.requestStates(0).getSyncRequestQueue().peek().getInFlightAcknowledgementsCount(tip0));
}
@Test
@ -548,27 +551,27 @@ public class ShareConsumeRequestManagerTest {
shareConsumeRequestManager.commitSync(Collections.singletonMap(tip0, acknowledgements), 60000L);
assertNull(shareConsumeRequestManager.requestStates(0).getAsyncRequest());
assertEquals(6, shareConsumeRequestManager.requestStates(0).getSyncRequest().getAcknowledgementsToSendCount(tip0));
assertEquals(1, shareConsumeRequestManager.requestStates(0).getSyncRequestQueue().size());
assertEquals(6, shareConsumeRequestManager.requestStates(0).getSyncRequestQueue().peek().getAcknowledgementsToSendCount(tip0));
assertEquals(1, shareConsumeRequestManager.sendAcknowledgements());
assertEquals(6, shareConsumeRequestManager.requestStates(0).getSyncRequest().getInFlightAcknowledgementsCount(tip0));
assertEquals(6, shareConsumeRequestManager.requestStates(0).getSyncRequestQueue().peek().getInFlightAcknowledgementsCount(tip0));
client.prepareResponse(fullAcknowledgeResponse(tip0, Errors.REQUEST_TIMED_OUT));
networkClientDelegate.poll(time.timer(0));
assertEquals(6, shareConsumeRequestManager.requestStates(0).getSyncRequest().getIncompleteAcknowledgementsCount(tip0));
assertEquals(0, shareConsumeRequestManager.requestStates(0).getSyncRequest().getInFlightAcknowledgementsCount(tip0));
assertEquals(6, shareConsumeRequestManager.requestStates(0).getSyncRequestQueue().peek().getIncompleteAcknowledgementsCount(tip0));
assertEquals(0, shareConsumeRequestManager.requestStates(0).getSyncRequestQueue().peek().getInFlightAcknowledgementsCount(tip0));
TestUtils.retryOnExceptionWithTimeout(() -> assertEquals(1, shareConsumeRequestManager.sendAcknowledgements()));
assertEquals(6, shareConsumeRequestManager.requestStates(0).getSyncRequest().getInFlightAcknowledgementsCount(tip0));
assertEquals(6, shareConsumeRequestManager.requestStates(0).getSyncRequestQueue().peek().getInFlightAcknowledgementsCount(tip0));
client.prepareResponse(fullAcknowledgeResponse(tip0, Errors.NONE));
networkClientDelegate.poll(time.timer(0));
assertEquals(0, shareConsumeRequestManager.requestStates(0).getSyncRequest().getInFlightAcknowledgementsCount(tip0));
assertEquals(0, shareConsumeRequestManager.requestStates(0).getSyncRequest().getIncompleteAcknowledgementsCount(tip0));
assertEquals(0, shareConsumeRequestManager.requestStates(0).getSyncRequestQueue().peek().getInFlightAcknowledgementsCount(tip0));
assertEquals(0, shareConsumeRequestManager.requestStates(0).getSyncRequestQueue().peek().getIncompleteAcknowledgementsCount(tip0));
}
@Test
@ -1122,7 +1125,7 @@ public class ShareConsumeRequestManagerTest {
return pollResult.unsentRequests.size();
}
public Pair<AcknowledgeRequestState> requestStates(int nodeId) {
public Tuple<AcknowledgeRequestState> requestStates(int nodeId) {
return super.requestStates(nodeId);
}
}