KAFKA-18899: Improve handling of timeouts for commitAsync() in ShareConsumer. (#19192)

Previously, the `ShareConsumer.commitAsync()` method retried sending
`ShareAcknowledge` requests indefinitely. Now it will instead use the
defaultApiTimeout config to expire the request so that it does not retry forever.

PR also fixes a bug in processing `commitSync() `requests, where we
need an additional check if the node is free.

Co-authored-by: Andrew Schofield <aschofield@confluent.io>
Reviewers: Andrew Schofield <aschofield@confluent.io>
This commit is contained in:
Shivsundar R 2025-03-26 05:06:59 -04:00 committed by GitHub
parent 1547204baa
commit 91758cc99d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 322 additions and 155 deletions

View File

@ -317,6 +317,10 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi
} else {
// Processing the acknowledgements from commitSync
for (AcknowledgeRequestState acknowledgeRequestState : requestStates.getValue().getSyncRequestQueue()) {
if (!isNodeFree(nodeId)) {
log.trace("Skipping acknowledge request because previous request to {} has not been processed, so acks are not sent", nodeId);
break;
}
maybeBuildRequest(acknowledgeRequestState, currentTimeMs, false, isAsyncSent).ifPresent(unsentRequests::add);
}
}
@ -369,7 +373,9 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi
AtomicBoolean isAsyncSent) {
boolean asyncSent = true;
try {
if (acknowledgeRequestState == null || (!acknowledgeRequestState.isCloseRequest() && acknowledgeRequestState.isEmpty())) {
if (acknowledgeRequestState == null ||
(!acknowledgeRequestState.isCloseRequest() && acknowledgeRequestState.isEmpty()) ||
(acknowledgeRequestState.isCloseRequest() && acknowledgeRequestState.isProcessed)) {
return Optional.empty();
}
@ -380,6 +386,8 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi
acknowledgeRequestState.handleAcknowledgeTimedOut(tip);
}
acknowledgeRequestState.incompleteAcknowledgements.clear();
// Reset timer for any future processing on the same request state.
acknowledgeRequestState.maybeResetTimerAndRequestState();
return Optional.empty();
}
@ -527,8 +535,12 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi
* Enqueue an AcknowledgeRequestState to be picked up on the next poll.
*
* @param acknowledgementsMap The acknowledgements to commit
* @param deadlineMs Time until which the request will be retried if it fails with
* an expected retriable error.
*/
public void commitAsync(final Map<TopicIdPartition, NodeAcknowledgements> acknowledgementsMap) {
public void commitAsync(
final Map<TopicIdPartition, NodeAcknowledgements> acknowledgementsMap,
final long deadlineMs) {
final Cluster cluster = metadata.fetch();
final ResultHandler resultHandler = new ResultHandler(Optional.empty());
@ -552,7 +564,7 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi
if (asyncRequestState == null) {
acknowledgeRequestStates.get(nodeId).setAsyncRequest(new AcknowledgeRequestState(logContext,
ShareConsumeRequestManager.class.getSimpleName() + ":2",
Long.MAX_VALUE,
deadlineMs,
retryBackoffMs,
retryBackoffMaxMs,
sessionHandler,
@ -1004,8 +1016,10 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi
}
private TopicIdPartition lookupTopicId(Uuid topicId, int partitionIndex) {
String topicName = metadata.topicNames().getOrDefault(topicId,
topicNamesMap.remove(new IdAndPartition(topicId, partitionIndex)));
String topicName = metadata.topicNames().get(topicId);
if (topicName == null) {
topicName = topicNamesMap.remove(new IdAndPartition(topicId, partitionIndex));
}
if (topicName == null) {
log.error("Topic name not found in metadata for topicId {} and partitionIndex {}", topicId, partitionIndex);
return null;
@ -1087,6 +1101,11 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi
*/
private boolean isProcessed;
/**
* Timeout in milliseconds indicating how long the request would be retried if it fails with a retriable exception.
*/
private final long timeoutMs;
AcknowledgeRequestState(LogContext logContext,
String owner,
long deadlineMs,
@ -1106,6 +1125,7 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi
this.incompleteAcknowledgements = new HashMap<>();
this.requestType = acknowledgeRequestType;
this.isProcessed = false;
this.timeoutMs = remainingMs();
}
UnsentRequest buildRequest() {
@ -1188,6 +1208,17 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi
inFlightAcknowledgements.isEmpty();
}
/**
* Resets the timer with the configured timeout and resets the RequestState.
* This is only applicable for commitAsync() requests as these states could be re-used.
*/
void maybeResetTimerAndRequestState() {
if (requestType == AcknowledgeRequestType.COMMIT_ASYNC) {
resetTimeout(timeoutMs);
reset();
}
}
/**
* Sets the error code in the acknowledgements and sends the response
* through a background event.
@ -1242,6 +1273,7 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi
processPendingInFlightAcknowledgements(new InvalidRecordStateException(INVALID_RESPONSE));
resultHandler.completeIfEmpty();
isProcessed = true;
maybeResetTimerAndRequestState();
}
/**

View File

@ -201,7 +201,7 @@ public class ShareConsumerImpl<K, V> implements ShareConsumerDelegate<K, V> {
private final SubscriptionState subscriptions;
private final ConsumerMetadata metadata;
private final Metrics metrics;
private final int defaultApiTimeoutMs;
private final long defaultApiTimeoutMs;
private volatile boolean closed = false;
// Init value is needed to avoid NPE in case of exception raised in the constructor
private Optional<ClientTelemetryReporter> clientTelemetryReporter = Optional.empty();
@ -450,7 +450,7 @@ public class ShareConsumerImpl<K, V> implements ShareConsumerDelegate<K, V> {
final Metrics metrics,
final SubscriptionState subscriptions,
final ConsumerMetadata metadata,
final int defaultApiTimeoutMs,
final long defaultApiTimeoutMs,
final String groupId) {
this.log = logContext.logger(getClass());
this.subscriptions = subscriptions;
@ -651,7 +651,8 @@ public class ShareConsumerImpl<K, V> implements ShareConsumerDelegate<K, V> {
applicationEventHandler.wakeupNetworkThread();
} else if (!acknowledgementsMap.isEmpty()) {
// Asynchronously commit any waiting acknowledgements
applicationEventHandler.add(new ShareAcknowledgeAsyncEvent(acknowledgementsMap));
Timer timer = time.timer(defaultApiTimeoutMs);
applicationEventHandler.add(new ShareAcknowledgeAsyncEvent(acknowledgementsMap, calculateDeadlineMs(timer)));
// Notify the network thread to wake up and start the next round of fetching
applicationEventHandler.wakeupNetworkThread();
@ -660,7 +661,8 @@ public class ShareConsumerImpl<K, V> implements ShareConsumerDelegate<K, V> {
} else {
if (!acknowledgementsMap.isEmpty()) {
// Asynchronously commit any waiting acknowledgements
applicationEventHandler.add(new ShareAcknowledgeAsyncEvent(acknowledgementsMap));
Timer timer = time.timer(defaultApiTimeoutMs);
applicationEventHandler.add(new ShareAcknowledgeAsyncEvent(acknowledgementsMap, calculateDeadlineMs(timer)));
// Notify the network thread to wake up and start the next round of fetching
applicationEventHandler.wakeupNetworkThread();
@ -758,7 +760,8 @@ public class ShareConsumerImpl<K, V> implements ShareConsumerDelegate<K, V> {
Map<TopicIdPartition, NodeAcknowledgements> acknowledgementsMap = acknowledgementsToSend();
if (!acknowledgementsMap.isEmpty()) {
ShareAcknowledgeAsyncEvent event = new ShareAcknowledgeAsyncEvent(acknowledgementsMap);
Timer timer = time.timer(defaultApiTimeoutMs);
ShareAcknowledgeAsyncEvent event = new ShareAcknowledgeAsyncEvent(acknowledgementsMap, calculateDeadlineMs(timer));
applicationEventHandler.add(event);
}
} finally {

View File

@ -53,6 +53,10 @@ public class TimedRequestState extends RequestState {
return timer.isExpired();
}
public void resetTimeout(long timeoutMs) {
timer.updateAndReset(timeoutMs);
}
public long remainingMs() {
timer.update();
return timer.remainingMs();

View File

@ -508,7 +508,7 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
}
ShareConsumeRequestManager manager = requestManagers.shareConsumeRequestManager.get();
manager.commitAsync(event.acknowledgementsMap());
manager.commitAsync(event.acknowledgementsMap(), event.deadlineMs());
}
/**

View File

@ -24,13 +24,25 @@ import java.util.Map;
public class ShareAcknowledgeAsyncEvent extends ApplicationEvent {
private final Map<TopicIdPartition, NodeAcknowledgements> acknowledgementsMap;
private final long deadlineMs;
public ShareAcknowledgeAsyncEvent(final Map<TopicIdPartition, NodeAcknowledgements> acknowledgementsMap) {
public ShareAcknowledgeAsyncEvent(final Map<TopicIdPartition, NodeAcknowledgements> acknowledgementsMap,
final long deadlineMs) {
super(Type.SHARE_ACKNOWLEDGE_ASYNC);
this.acknowledgementsMap = acknowledgementsMap;
this.deadlineMs = deadlineMs;
}
public Map<TopicIdPartition, NodeAcknowledgements> acknowledgementsMap() {
return acknowledgementsMap;
}
public long deadlineMs() {
return deadlineMs;
}
@Override
protected String toStringBase() {
return super.toStringBase() + ", acknowledgementsMap=" + acknowledgementsMap + ", deadlineMs=" + deadlineMs;
}
}

View File

@ -156,6 +156,7 @@ public class ShareConsumeRequestManagerTest {
private final long retryBackoffMs = 100;
private final long requestTimeoutMs = 30000;
private final long defaultApiTimeoutMs = 60000;
private MockTime time = new MockTime(1);
private SubscriptionState subscriptions;
private ConsumerMetadata metadata;
@ -326,12 +327,10 @@ public class ShareConsumeRequestManagerTest {
networkClientDelegate.poll(time.timer(0));
assertTrue(shareConsumeRequestManager.hasCompletedFetches());
Acknowledgements acknowledgements = Acknowledgements.empty();
acknowledgements.add(1L, AcknowledgeType.ACCEPT);
acknowledgements.add(2L, AcknowledgeType.ACCEPT);
acknowledgements.add(3L, AcknowledgeType.REJECT);
Acknowledgements acknowledgements = getAcknowledgements(1, AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT);
shareConsumeRequestManager.commitSync(Map.of(tip0, new NodeAcknowledgements(0, acknowledgements)), time.milliseconds() + 2000);
shareConsumeRequestManager.commitSync(Map.of(tip0, new NodeAcknowledgements(0, acknowledgements)),
calculateDeadlineMs(time.timer(2000)));
assertEquals(1, shareConsumeRequestManager.sendAcknowledgements());
@ -340,7 +339,6 @@ public class ShareConsumeRequestManagerTest {
assertTrue(shareConsumeRequestManager.hasCompletedFetches());
assertEquals(Map.of(tip0, acknowledgements), completedAcknowledgements.get(0));
completedAcknowledgements.clear();
}
@Test
@ -359,12 +357,10 @@ public class ShareConsumeRequestManagerTest {
networkClientDelegate.poll(time.timer(0));
assertTrue(shareConsumeRequestManager.hasCompletedFetches());
Acknowledgements acknowledgements = Acknowledgements.empty();
acknowledgements.add(1L, AcknowledgeType.ACCEPT);
acknowledgements.add(2L, AcknowledgeType.ACCEPT);
acknowledgements.add(3L, AcknowledgeType.REJECT);
Acknowledgements acknowledgements = getAcknowledgements(1, AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT);
shareConsumeRequestManager.commitAsync(Map.of(tip0, new NodeAcknowledgements(0, acknowledgements)));
shareConsumeRequestManager.commitAsync(Map.of(tip0, new NodeAcknowledgements(0, acknowledgements)),
calculateDeadlineMs(time.timer(defaultApiTimeoutMs)));
assertEquals(1, shareConsumeRequestManager.sendAcknowledgements());
@ -373,7 +369,6 @@ public class ShareConsumeRequestManagerTest {
assertTrue(shareConsumeRequestManager.hasCompletedFetches());
assertEquals(Map.of(tip0, acknowledgements), completedAcknowledgements.get(0));
completedAcknowledgements.clear();
}
@Test
@ -394,18 +389,19 @@ public class ShareConsumeRequestManagerTest {
fetchRecords();
Acknowledgements acknowledgements = Acknowledgements.empty();
acknowledgements.add(1L, AcknowledgeType.ACCEPT);
acknowledgements.add(2L, AcknowledgeType.ACCEPT);
Acknowledgements acknowledgements = getAcknowledgements(1,
AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT);
shareConsumeRequestManager.commitAsync(Map.of(tip0, new NodeAcknowledgements(0, acknowledgements)));
shareConsumeRequestManager.commitAsync(Map.of(tip0, new NodeAcknowledgements(0, acknowledgements)),
calculateDeadlineMs(time.timer(defaultApiTimeoutMs)));
assertEquals(1, shareConsumeRequestManager.sendAcknowledgements());
Acknowledgements acknowledgements2 = Acknowledgements.empty();
acknowledgements2.add(3L, AcknowledgeType.REJECT);
shareConsumeRequestManager.commitAsync(Map.of(tip0, new NodeAcknowledgements(0, acknowledgements2)));
shareConsumeRequestManager.commitAsync(Map.of(tip0, new NodeAcknowledgements(0, acknowledgements2)),
calculateDeadlineMs(time.timer(defaultApiTimeoutMs)));
client.prepareResponse(null, true);
networkClientDelegate.poll(time.timer(0));
@ -457,9 +453,7 @@ public class ShareConsumeRequestManagerTest {
shareConsumeRequestManager.fetch(Map.of(tip0, new NodeAcknowledgements(0, acknowledgements)));
// Remaining acknowledgements sent with close().
Acknowledgements acknowledgements2 = Acknowledgements.empty();
acknowledgements2.add(2L, AcknowledgeType.ACCEPT);
acknowledgements2.add(3L, AcknowledgeType.REJECT);
Acknowledgements acknowledgements2 = getAcknowledgements(2, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT);
shareConsumeRequestManager.acknowledgeOnClose(Map.of(tip0, new NodeAcknowledgements(0, acknowledgements2)),
calculateDeadlineMs(time.timer(100)));
@ -493,12 +487,10 @@ public class ShareConsumeRequestManagerTest {
networkClientDelegate.poll(time.timer(0));
assertTrue(shareConsumeRequestManager.hasCompletedFetches());
Acknowledgements acknowledgements = Acknowledgements.empty();
acknowledgements.add(1L, AcknowledgeType.ACCEPT);
acknowledgements.add(2L, AcknowledgeType.ACCEPT);
acknowledgements.add(3L, AcknowledgeType.REJECT);
Acknowledgements acknowledgements = getAcknowledgements(1, AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT);
shareConsumeRequestManager.commitAsync(Map.of(tip0, new NodeAcknowledgements(0, acknowledgements)));
shareConsumeRequestManager.commitAsync(Map.of(tip0, new NodeAcknowledgements(0, acknowledgements)),
calculateDeadlineMs(time.timer(defaultApiTimeoutMs)));
shareConsumeRequestManager.acknowledgeOnClose(Collections.emptyMap(),
calculateDeadlineMs(time.timer(100)));
@ -512,7 +504,6 @@ public class ShareConsumeRequestManagerTest {
assertTrue(shareConsumeRequestManager.hasCompletedFetches());
assertEquals(Map.of(tip0, acknowledgements), completedAcknowledgements.get(0));
completedAcknowledgements.clear();
}
@Test
@ -531,10 +522,7 @@ public class ShareConsumeRequestManagerTest {
networkClientDelegate.poll(time.timer(0));
assertTrue(shareConsumeRequestManager.hasCompletedFetches());
Acknowledgements acknowledgements = Acknowledgements.empty();
acknowledgements.add(1L, AcknowledgeType.ACCEPT);
acknowledgements.add(2L, AcknowledgeType.ACCEPT);
acknowledgements.add(3L, AcknowledgeType.REJECT);
Acknowledgements acknowledgements = getAcknowledgements(1, AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT);
shareConsumeRequestManager.commitSync(Map.of(tip0, new NodeAcknowledgements(0, acknowledgements)),
calculateDeadlineMs(time.timer(100)));
@ -551,7 +539,6 @@ public class ShareConsumeRequestManagerTest {
assertTrue(shareConsumeRequestManager.hasCompletedFetches());
assertEquals(Map.of(tip0, acknowledgements), completedAcknowledgements.get(0));
completedAcknowledgements.clear();
}
@Test
@ -560,10 +547,7 @@ public class ShareConsumeRequestManagerTest {
// Enabling the config so that background event is sent when the acknowledgement response is received.
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
Acknowledgements acknowledgements = Acknowledgements.empty();
acknowledgements.add(1L, AcknowledgeType.ACCEPT);
acknowledgements.add(2L, AcknowledgeType.ACCEPT);
acknowledgements.add(3L, AcknowledgeType.REJECT);
Acknowledgements acknowledgements = getAcknowledgements(1, AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT);
ShareConsumeRequestManager.ResultHandler resultHandler = shareConsumeRequestManager.buildResultHandler(null, Optional.empty());
@ -571,7 +555,7 @@ public class ShareConsumeRequestManagerTest {
resultHandler.complete(tip0, null, ShareConsumeRequestManager.AcknowledgeRequestType.COMMIT_ASYNC);
assertEquals(0, completedAcknowledgements.size());
// Setting isCommitAsync to false should still not send any background event
// Setting the request type to COMMIT_SYNC should still not send any background event
// as we have initialized remainingResults to null.
resultHandler.complete(tip0, acknowledgements, ShareConsumeRequestManager.AcknowledgeRequestType.COMMIT_SYNC);
assertEquals(0, completedAcknowledgements.size());
@ -587,10 +571,7 @@ public class ShareConsumeRequestManagerTest {
// Enabling the config so that background event is sent when the acknowledgement response is received.
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
Acknowledgements acknowledgements = Acknowledgements.empty();
acknowledgements.add(1L, AcknowledgeType.ACCEPT);
acknowledgements.add(2L, AcknowledgeType.ACCEPT);
acknowledgements.add(3L, AcknowledgeType.REJECT);
Acknowledgements acknowledgements = getAcknowledgements(1, AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT);
final CompletableFuture<Map<TopicIdPartition, Acknowledgements>> future = new CompletableFuture<>();
@ -652,19 +633,15 @@ public class ShareConsumeRequestManagerTest {
networkClientDelegate.poll(time.timer(0));
assertTrue(shareConsumeRequestManager.hasCompletedFetches());
Acknowledgements acknowledgements = Acknowledgements.empty();
acknowledgements.add(1L, AcknowledgeType.ACCEPT);
acknowledgements.add(2L, AcknowledgeType.ACCEPT);
acknowledgements.add(3L, AcknowledgeType.REJECT);
Acknowledgements acknowledgements = getAcknowledgements(1, AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT);
shareConsumeRequestManager.commitAsync(Map.of(tip0, new NodeAcknowledgements(0, acknowledgements)));
shareConsumeRequestManager.commitAsync(Map.of(tip0, new NodeAcknowledgements(0, acknowledgements)),
calculateDeadlineMs(time.timer(defaultApiTimeoutMs)));
Acknowledgements acknowledgements2 = Acknowledgements.empty();
acknowledgements.add(4L, AcknowledgeType.ACCEPT);
acknowledgements.add(5L, AcknowledgeType.ACCEPT);
acknowledgements.add(6L, AcknowledgeType.ACCEPT);
Acknowledgements acknowledgements2 = getAcknowledgements(4, AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT);
shareConsumeRequestManager.commitAsync(Map.of(tip0, new NodeAcknowledgements(0, acknowledgements2)));
shareConsumeRequestManager.commitAsync(Map.of(tip0, new NodeAcknowledgements(0, acknowledgements2)),
calculateDeadlineMs(time.timer(defaultApiTimeoutMs)));
assertEquals(6, shareConsumeRequestManager.requestStates(0).getAsyncRequest().getAcknowledgementsToSendCount(tip0));
@ -695,19 +672,15 @@ public class ShareConsumeRequestManagerTest {
networkClientDelegate.poll(time.timer(0));
assertTrue(shareConsumeRequestManager.hasCompletedFetches());
Acknowledgements acknowledgements = Acknowledgements.empty();
acknowledgements.add(1L, AcknowledgeType.ACCEPT);
acknowledgements.add(2L, AcknowledgeType.ACCEPT);
acknowledgements.add(3L, AcknowledgeType.REJECT);
Acknowledgements acknowledgements = getAcknowledgements(1, AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT);
shareConsumeRequestManager.commitAsync(Map.of(tip0, new NodeAcknowledgements(0, acknowledgements)));
shareConsumeRequestManager.commitAsync(Map.of(tip0, new NodeAcknowledgements(0, acknowledgements)),
calculateDeadlineMs(time.timer(defaultApiTimeoutMs)));
Acknowledgements acknowledgements2 = Acknowledgements.empty();
acknowledgements2.add(4L, AcknowledgeType.ACCEPT);
acknowledgements2.add(5L, AcknowledgeType.ACCEPT);
acknowledgements2.add(6L, AcknowledgeType.ACCEPT);
Acknowledgements acknowledgements2 = getAcknowledgements(4, AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT);
shareConsumeRequestManager.commitSync(Map.of(tip0, new NodeAcknowledgements(0, acknowledgements2)), 60000L);
shareConsumeRequestManager.commitSync(Map.of(tip0, new NodeAcknowledgements(0, acknowledgements2)),
calculateDeadlineMs(time.timer(60000L)));
assertEquals(3, shareConsumeRequestManager.requestStates(0).getAsyncRequest().getAcknowledgementsToSendCount(tip0));
assertEquals(1, shareConsumeRequestManager.requestStates(0).getSyncRequestQueue().size());
@ -750,13 +723,8 @@ public class ShareConsumeRequestManagerTest {
networkClientDelegate.poll(time.timer(0));
assertTrue(shareConsumeRequestManager.hasCompletedFetches());
Acknowledgements acknowledgements = Acknowledgements.empty();
acknowledgements.add(1L, AcknowledgeType.ACCEPT);
acknowledgements.add(2L, AcknowledgeType.ACCEPT);
acknowledgements.add(3L, AcknowledgeType.REJECT);
acknowledgements.add(4L, AcknowledgeType.ACCEPT);
acknowledgements.add(5L, AcknowledgeType.RELEASE);
acknowledgements.add(6L, AcknowledgeType.ACCEPT);
Acknowledgements acknowledgements = getAcknowledgements(1, AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT,
AcknowledgeType.ACCEPT, AcknowledgeType.RELEASE, AcknowledgeType.ACCEPT);
shareConsumeRequestManager.commitSync(Map.of(tip0, new NodeAcknowledgements(0, acknowledgements)), 60000L);
assertNull(shareConsumeRequestManager.requestStates(0).getAsyncRequest());
@ -784,6 +752,120 @@ public class ShareConsumeRequestManagerTest {
assertEquals(0, shareConsumeRequestManager.requestStates(0).getSyncRequestQueue().peek().getIncompleteAcknowledgementsCount(tip0));
}
@Test
public void testRetryAcknowledgementsMultipleCommitAsync() {
buildRequestManager();
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
assignFromSubscribed(Collections.singleton(tp0));
// normal fetch
assertEquals(1, sendFetches());
assertFalse(shareConsumeRequestManager.hasCompletedFetches());
client.prepareResponse(fullFetchResponse(tip0, buildRecords(1L, 6, 1),
ShareCompletedFetchTest.acquiredRecords(1L, 6), Errors.NONE));
networkClientDelegate.poll(time.timer(0));
assertTrue(shareConsumeRequestManager.hasCompletedFetches());
Acknowledgements acknowledgements = getAcknowledgements(1, AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT);
// commitAsync() acknowledges the first 2 records.
shareConsumeRequestManager.commitAsync(Map.of(tip0, new NodeAcknowledgements(0, acknowledgements)), calculateDeadlineMs(time, 1000L));
assertEquals(2, shareConsumeRequestManager.requestStates(0).getAsyncRequest().getAcknowledgementsToSendCount(tip0));
assertEquals(1, shareConsumeRequestManager.sendAcknowledgements());
assertEquals(2, shareConsumeRequestManager.requestStates(0).getAsyncRequest().getInFlightAcknowledgementsCount(tip0));
// Response contains a retriable exception, so we retry.
client.prepareResponse(fullAcknowledgeResponse(tip0, Errors.REQUEST_TIMED_OUT));
networkClientDelegate.poll(time.timer(0));
Acknowledgements acknowledgements1 = getAcknowledgements(3, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT);
// 2nd commitAsync() acknowledges the next 2 records.
shareConsumeRequestManager.commitAsync(Map.of(tip0, new NodeAcknowledgements(0, acknowledgements1)), calculateDeadlineMs(time, 1000L));
assertEquals(2, shareConsumeRequestManager.requestStates(0).getAsyncRequest().getIncompleteAcknowledgementsCount(tip0));
Acknowledgements acknowledgements2 = getAcknowledgements(5, AcknowledgeType.RELEASE, AcknowledgeType.ACCEPT);
// 3rd commitAsync() acknowledges the next 2 records.
shareConsumeRequestManager.commitAsync(Map.of(tip0, new NodeAcknowledgements(0, acknowledgements2)), calculateDeadlineMs(time, 1000L));
time.sleep(2000L);
// As the timer for the initial commitAsync() was 1000ms, the request times out, and we fill the callback with a timeout exception.
assertEquals(0, shareConsumeRequestManager.sendAcknowledgements());
assertEquals(1, completedAcknowledgements.size());
assertEquals(2, completedAcknowledgements.get(0).get(tip0).size());
assertEquals(Errors.REQUEST_TIMED_OUT.exception(), completedAcknowledgements.get(0).get(tip0).getAcknowledgeException());
completedAcknowledgements.clear();
// Further requests which came before the timeout are processed as expected.
assertEquals(1, shareConsumeRequestManager.sendAcknowledgements());
assertEquals(4, shareConsumeRequestManager.requestStates(0).getAsyncRequest().getInFlightAcknowledgementsCount(tip0));
client.prepareResponse(fullAcknowledgeResponse(tip0, Errors.NONE));
networkClientDelegate.poll(time.timer(0));
assertEquals(0, shareConsumeRequestManager.requestStates(0).getAsyncRequest().getInFlightAcknowledgementsCount(tip0));
assertEquals(1, completedAcknowledgements.size());
assertEquals(4, completedAcknowledgements.get(0).get(tip0).size());
assertNull(completedAcknowledgements.get(0).get(tip0).getAcknowledgeException());
}
@Test
public void testRetryAcknowledgementsMultipleCommitSync() {
buildRequestManager();
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
assignFromSubscribed(Collections.singleton(tp0));
// normal fetch
assertEquals(1, sendFetches());
assertFalse(shareConsumeRequestManager.hasCompletedFetches());
client.prepareResponse(fullFetchResponse(tip0, buildRecords(1L, 6, 1),
ShareCompletedFetchTest.acquiredRecords(1L, 6), Errors.NONE));
networkClientDelegate.poll(time.timer(0));
assertTrue(shareConsumeRequestManager.hasCompletedFetches());
Acknowledgements acknowledgements = getAcknowledgements(1, AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT);
// commitSync() for the first 2 acknowledgements.
shareConsumeRequestManager.commitSync(Map.of(tip0, new NodeAcknowledgements(0, acknowledgements)), calculateDeadlineMs(time, 1000L));
assertEquals(1, shareConsumeRequestManager.sendAcknowledgements());
// Response contains a retriable exception.
client.prepareResponse(fullAcknowledgeResponse(tip0, Errors.REQUEST_TIMED_OUT));
networkClientDelegate.poll(time.timer(0));
assertEquals(2, shareConsumeRequestManager.requestStates(0).getSyncRequestQueue().peek().getIncompleteAcknowledgementsCount(tip0));
// We expire the commitSync request as it had a timer of 1000ms.
time.sleep(2000L);
Acknowledgements acknowledgements1 = getAcknowledgements(3, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT, AcknowledgeType.RELEASE, AcknowledgeType.ACCEPT);
// commitSync() for the next 4 acknowledgements.
shareConsumeRequestManager.commitSync(Map.of(tip0, new NodeAcknowledgements(0, acknowledgements1)), calculateDeadlineMs(time, 1000L));
// We send the 2nd commitSync request, and fail the first one as timer has expired.
assertEquals(1, shareConsumeRequestManager.sendAcknowledgements());
assertEquals(2, completedAcknowledgements.get(0).get(tip0).size());
assertEquals(Errors.REQUEST_TIMED_OUT.exception(), completedAcknowledgements.get(0).get(tip0).getAcknowledgeException());
completedAcknowledgements.clear();
// We get a successful response for the 2nd commitSync request.
client.prepareResponse(fullAcknowledgeResponse(tip0, Errors.NONE));
networkClientDelegate.poll(time.timer(0));
assertEquals(1, completedAcknowledgements.size());
assertEquals(4, completedAcknowledgements.get(0).get(tip0).size());
assertNull(completedAcknowledgements.get(0).get(tip0).getAcknowledgeException());
}
@Test
public void testPiggybackAcknowledgementsInFlight() {
buildRequestManager();
@ -798,9 +880,8 @@ public class ShareConsumeRequestManagerTest {
networkClientDelegate.poll(time.timer(0));
assertTrue(shareConsumeRequestManager.hasCompletedFetches());
Acknowledgements acknowledgements = Acknowledgements.empty();
acknowledgements.add(1L, AcknowledgeType.ACCEPT);
acknowledgements.add(2L, AcknowledgeType.ACCEPT);
Acknowledgements acknowledgements = getAcknowledgements(1,
AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT);
// Reading records from the share fetch buffer.
fetchRecords();
@ -833,6 +914,7 @@ public class ShareConsumeRequestManagerTest {
@Test
public void testCommitAsyncWithSubscriptionChange() {
buildRequestManager();
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
assignFromSubscribed(singleton(tp0));
@ -843,10 +925,7 @@ public class ShareConsumeRequestManagerTest {
networkClientDelegate.poll(time.timer(0));
assertTrue(shareConsumeRequestManager.hasCompletedFetches());
Acknowledgements acknowledgements = Acknowledgements.empty();
acknowledgements.add(1L, AcknowledgeType.ACCEPT);
acknowledgements.add(2L, AcknowledgeType.ACCEPT);
acknowledgements.add(3L, AcknowledgeType.REJECT);
Acknowledgements acknowledgements = getAcknowledgements(1, AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT);
subscriptions.subscribeToShareGroup(Collections.singleton(topicName2));
subscriptions.assignFromSubscribed(Collections.singleton(t2p0));
@ -855,21 +934,29 @@ public class ShareConsumeRequestManagerTest {
RequestTestUtils.metadataUpdateWithIds(1, Map.of(topicName2, 1),
tp -> validLeaderEpoch, topicIds, false));
shareConsumeRequestManager.commitAsync(Map.of(tip0, new NodeAcknowledgements(0, acknowledgements)));
shareConsumeRequestManager.commitAsync(Map.of(tip0, new NodeAcknowledgements(0, acknowledgements)),
calculateDeadlineMs(time.timer(defaultApiTimeoutMs)));
assertEquals(1, shareConsumeRequestManager.sendAcknowledgements());
client.prepareResponse(fullAcknowledgeResponse(t2ip0, Errors.NONE));
client.prepareResponse(fullAcknowledgeResponse(tip0, Errors.NONE));
networkClientDelegate.poll(time.timer(0));
assertEquals(3, completedAcknowledgements.get(0).get(tip0).size());
assertNull(completedAcknowledgements.get(0).get(tip0).getAcknowledgeException());
// We should send a fetch to the newly subscribed partition.
assertEquals(1, sendFetches());
client.prepareResponse(fullFetchResponse(t2ip0, records, acquiredRecords, Errors.NONE));
networkClientDelegate.poll(time.timer(0));
assertTrue(shareConsumeRequestManager.hasCompletedFetches());
}
@Test
public void testCommitSyncWithSubscriptionChange() {
buildRequestManager();
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
assignFromSubscribed(singleton(tp0));
@ -880,10 +967,7 @@ public class ShareConsumeRequestManagerTest {
networkClientDelegate.poll(time.timer(0));
assertTrue(shareConsumeRequestManager.hasCompletedFetches());
Acknowledgements acknowledgements = Acknowledgements.empty();
acknowledgements.add(1L, AcknowledgeType.ACCEPT);
acknowledgements.add(2L, AcknowledgeType.ACCEPT);
acknowledgements.add(3L, AcknowledgeType.REJECT);
Acknowledgements acknowledgements = getAcknowledgements(1, AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT);
subscriptions.subscribeToShareGroup(Collections.singleton(topicName2));
subscriptions.assignFromSubscribed(Collections.singleton(t2p0));
@ -897,16 +981,24 @@ public class ShareConsumeRequestManagerTest {
assertEquals(1, shareConsumeRequestManager.sendAcknowledgements());
client.prepareResponse(fullAcknowledgeResponse(t2ip0, Errors.NONE));
client.prepareResponse(fullAcknowledgeResponse(tip0, Errors.NONE));
networkClientDelegate.poll(time.timer(0));
assertEquals(3, completedAcknowledgements.get(0).get(tip0).size());
assertNull(completedAcknowledgements.get(0).get(tip0).getAcknowledgeException());
// We should send a fetch to the newly subscribed partition.
assertEquals(1, sendFetches());
client.prepareResponse(fullFetchResponse(t2ip0, records, acquiredRecords, Errors.NONE));
networkClientDelegate.poll(time.timer(0));
assertTrue(shareConsumeRequestManager.hasCompletedFetches());
}
@Test
public void testCloseWithSubscriptionChange() {
buildRequestManager();
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
assignFromSubscribed(singleton(tp0));
@ -917,10 +1009,7 @@ public class ShareConsumeRequestManagerTest {
networkClientDelegate.poll(time.timer(0));
assertTrue(shareConsumeRequestManager.hasCompletedFetches());
Acknowledgements acknowledgements = Acknowledgements.empty();
acknowledgements.add(1L, AcknowledgeType.ACCEPT);
acknowledgements.add(2L, AcknowledgeType.ACCEPT);
acknowledgements.add(3L, AcknowledgeType.REJECT);
Acknowledgements acknowledgements = getAcknowledgements(1, AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT);
subscriptions.subscribeToShareGroup(Collections.singleton(topicName2));
subscriptions.assignFromSubscribed(Collections.singleton(t2p0));
@ -934,9 +1023,12 @@ public class ShareConsumeRequestManagerTest {
assertEquals(1, shareConsumeRequestManager.sendAcknowledgements());
client.prepareResponse(fullAcknowledgeResponse(t2ip0, Errors.NONE));
client.prepareResponse(fullAcknowledgeResponse(tip0, Errors.NONE));
networkClientDelegate.poll(time.timer(0));
assertEquals(3, completedAcknowledgements.get(0).get(tip0).size());
assertNull(completedAcknowledgements.get(0).get(tip0).getAcknowledgeException());
// As we are closing, we would not send any more fetches.
assertEquals(0, sendFetches());
}
@ -954,10 +1046,7 @@ public class ShareConsumeRequestManagerTest {
networkClientDelegate.poll(time.timer(0));
assertTrue(shareConsumeRequestManager.hasCompletedFetches());
Acknowledgements acknowledgements = Acknowledgements.empty();
acknowledgements.add(0L, AcknowledgeType.ACCEPT);
acknowledgements.add(1L, AcknowledgeType.RELEASE);
acknowledgements.add(2L, AcknowledgeType.ACCEPT);
Acknowledgements acknowledgements = getAcknowledgements(1, AcknowledgeType.ACCEPT, AcknowledgeType.RELEASE, AcknowledgeType.ACCEPT);
// Send acknowledgements via ShareFetch
shareConsumeRequestManager.fetch(Map.of(tip0, new NodeAcknowledgements(0, acknowledgements)));
@ -981,9 +1070,6 @@ public class ShareConsumeRequestManagerTest {
buildRequestManager();
subscriptions.subscribeToShareGroup(Collections.singleton(topicName));
Set<TopicPartition> partitions = new HashSet<>();
partitions.add(tp0);
partitions.add(tp1);
subscriptions.assignFromSubscribed(Collections.singletonList(tp0));
client.updateMetadata(
@ -1004,10 +1090,7 @@ public class ShareConsumeRequestManagerTest {
networkClientDelegate.poll(time.timer(0));
assertTrue(shareConsumeRequestManager.hasCompletedFetches());
Acknowledgements acknowledgements = Acknowledgements.empty();
acknowledgements.add(0L, AcknowledgeType.ACCEPT);
acknowledgements.add(1L, AcknowledgeType.RELEASE);
acknowledgements.add(2L, AcknowledgeType.ACCEPT);
Acknowledgements acknowledgements = getAcknowledgements(0, AcknowledgeType.ACCEPT, AcknowledgeType.RELEASE, AcknowledgeType.ACCEPT);
// Send acknowledgements via ShareFetch
shareConsumeRequestManager.fetch(Map.of(tip0, new NodeAcknowledgements(0, acknowledgements)));
@ -1099,6 +1182,48 @@ public class ShareConsumeRequestManagerTest {
assertEquals(0, builder.data().forgottenTopicsData().size());
}
@Test
public void testShareFetchAndCloseMultipleNodes() {
buildRequestManager();
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
subscriptions.subscribeToShareGroup(Collections.singleton(topicName));
subscriptions.assignFromSubscribed(List.of(tp0, tp1));
client.updateMetadata(
RequestTestUtils.metadataUpdateWithIds(2, Map.of(topicName, 2),
tp -> validLeaderEpoch, topicIds, false));
assertEquals(2, sendFetches());
assertFalse(shareConsumeRequestManager.hasCompletedFetches());
client.prepareResponse(fullFetchResponse(tip0, records, acquiredRecords, Errors.NONE));
client.prepareResponse(fullFetchResponse(tip1, records, acquiredRecords, Errors.NONE));
networkClientDelegate.poll(time.timer(0));
assertTrue(shareConsumeRequestManager.hasCompletedFetches());
Acknowledgements acknowledgements = getAcknowledgements(1, AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT);
Acknowledgements acknowledgements1 = getAcknowledgements(1, AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT);
Map<TopicIdPartition, NodeAcknowledgements> acknowledgementsMap = new HashMap<>();
acknowledgementsMap.put(tip0, new NodeAcknowledgements(0, acknowledgements));
acknowledgementsMap.put(tip1, new NodeAcknowledgements(1, acknowledgements1));
shareConsumeRequestManager.acknowledgeOnClose(acknowledgementsMap, calculateDeadlineMs(time, 1000L));
assertEquals(2, shareConsumeRequestManager.sendAcknowledgements());
client.prepareResponse(fullAcknowledgeResponse(tip0, Errors.NONE));
client.prepareResponse(fullAcknowledgeResponse(tip1, Errors.NONE));
networkClientDelegate.poll(time.timer(0));
assertEquals(3, completedAcknowledgements.get(0).get(tip0).size());
assertEquals(3, completedAcknowledgements.get(0).get(tip1).size());
assertEquals(0, shareConsumeRequestManager.sendAcknowledgements());
assertNull(shareConsumeRequestManager.requestStates(0));
assertNull(shareConsumeRequestManager.requestStates(1));
}
@Test
public void testRetryAcknowledgementsWithLeaderChange() {
buildRequestManager();
@ -1124,15 +1249,11 @@ public class ShareConsumeRequestManagerTest {
networkClientDelegate.poll(time.timer(0));
assertTrue(shareConsumeRequestManager.hasCompletedFetches());
Acknowledgements acknowledgements = Acknowledgements.empty();
acknowledgements.add(1L, AcknowledgeType.ACCEPT);
acknowledgements.add(2L, AcknowledgeType.ACCEPT);
acknowledgements.add(3L, AcknowledgeType.REJECT);
acknowledgements.add(4L, AcknowledgeType.ACCEPT);
acknowledgements.add(5L, AcknowledgeType.RELEASE);
acknowledgements.add(6L, AcknowledgeType.ACCEPT);
Acknowledgements acknowledgements = getAcknowledgements(1, AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT,
AcknowledgeType.ACCEPT, AcknowledgeType.RELEASE, AcknowledgeType.ACCEPT);
shareConsumeRequestManager.commitSync(Map.of(tip0, new NodeAcknowledgements(0, acknowledgements)), 60000L);
shareConsumeRequestManager.commitSync(Map.of(tip0, new NodeAcknowledgements(0, acknowledgements)),
calculateDeadlineMs(time.timer(60000L)));
assertNull(shareConsumeRequestManager.requestStates(0).getAsyncRequest());
assertEquals(1, shareConsumeRequestManager.requestStates(0).getSyncRequestQueue().size());
@ -1167,11 +1288,11 @@ public class ShareConsumeRequestManagerTest {
networkClientDelegate.poll(time.timer(0));
assertTrue(shareConsumeRequestManager.hasCompletedFetches());
Acknowledgements acknowledgements = Acknowledgements.empty();
acknowledgements.add(1L, AcknowledgeType.ACCEPT);
acknowledgements.add(2L, AcknowledgeType.ACCEPT);
Acknowledgements acknowledgements = getAcknowledgements(1,
AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT);
shareConsumeRequestManager.commitAsync(Map.of(tip0, new NodeAcknowledgements(0, acknowledgements)));
shareConsumeRequestManager.commitAsync(Map.of(tip0, new NodeAcknowledgements(0, acknowledgements)),
calculateDeadlineMs(time.timer(defaultApiTimeoutMs)));
assertEquals(1, shareConsumeRequestManager.sendAcknowledgements());
@ -1189,7 +1310,8 @@ public class ShareConsumeRequestManagerTest {
Acknowledgements acknowledgements2 = Acknowledgements.empty();
acknowledgements2.add(3L, AcknowledgeType.ACCEPT);
shareConsumeRequestManager.commitAsync(Map.of(tip0, new NodeAcknowledgements(0, acknowledgements2)));
shareConsumeRequestManager.commitAsync(Map.of(tip0, new NodeAcknowledgements(0, acknowledgements2)),
calculateDeadlineMs(time.timer(defaultApiTimeoutMs)));
TestUtils.retryOnExceptionWithTimeout(() -> assertEquals(1, shareConsumeRequestManager.sendAcknowledgements()));
@ -1223,21 +1345,15 @@ public class ShareConsumeRequestManagerTest {
networkClientDelegate.poll(time.timer(0));
assertTrue(shareConsumeRequestManager.hasCompletedFetches());
Acknowledgements acknowledgements = Acknowledgements.empty();
acknowledgements.add(0L, AcknowledgeType.ACCEPT);
acknowledgements.add(1L, AcknowledgeType.ACCEPT);
acknowledgements.add(2L, AcknowledgeType.ACCEPT);
Acknowledgements acknowledgements = getAcknowledgements(1, AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT);
Acknowledgements acknowledgements2 = Acknowledgements.empty();
acknowledgements2.add(0L, AcknowledgeType.ACCEPT);
acknowledgements2.add(1L, AcknowledgeType.ACCEPT);
acknowledgements2.add(2L, AcknowledgeType.ACCEPT);
Acknowledgements acknowledgements2 = getAcknowledgements(1, AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT);
Map<TopicIdPartition, NodeAcknowledgements> acks = new HashMap<>();
acks.put(tip0, new NodeAcknowledgements(0, acknowledgements));
acks.put(t2ip0, new NodeAcknowledgements(0, acknowledgements2));
shareConsumeRequestManager.commitAsync(acks);
shareConsumeRequestManager.commitAsync(acks, calculateDeadlineMs(time.timer(defaultApiTimeoutMs)));
assertEquals(1, shareConsumeRequestManager.sendAcknowledgements());
@ -1360,7 +1476,8 @@ public class ShareConsumeRequestManagerTest {
Acknowledgements acknowledgements = Acknowledgements.empty();
acknowledgements.add(1L, AcknowledgeType.ACCEPT);
shareConsumeRequestManager.commitAsync(Map.of(tip0, new NodeAcknowledgements(0, acknowledgements)));
shareConsumeRequestManager.commitAsync(Map.of(tip0, new NodeAcknowledgements(0, acknowledgements)),
calculateDeadlineMs(time.timer(defaultApiTimeoutMs)));
assertEquals(1, shareConsumeRequestManager.sendAcknowledgements());
@ -1380,9 +1497,8 @@ public class ShareConsumeRequestManagerTest {
completedAcknowledgements.clear();
// Send remaining acknowledgements through piggybacking on the next fetch.
Acknowledgements acknowledgements1 = Acknowledgements.empty();
acknowledgements1.add(2L, AcknowledgeType.ACCEPT);
acknowledgements1.add(3L, AcknowledgeType.REJECT);
Acknowledgements acknowledgements1 = getAcknowledgements(2,
AcknowledgeType.ACCEPT, AcknowledgeType.REJECT);
shareConsumeRequestManager.fetch(Map.of(tip0, new NodeAcknowledgements(0, acknowledgements1)));
@ -1432,10 +1548,7 @@ public class ShareConsumeRequestManagerTest {
assignFromSubscribed(singleton(tp0));
Acknowledgements acknowledgements = Acknowledgements.empty();
acknowledgements.add(1L, AcknowledgeType.ACCEPT);
acknowledgements.add(2L, AcknowledgeType.ACCEPT);
acknowledgements.add(3L, AcknowledgeType.REJECT);
Acknowledgements acknowledgements = getAcknowledgements(1, AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT);
shareConsumeRequestManager.fetch(Map.of(tip0, new NodeAcknowledgements(0, acknowledgements)));
@ -2085,9 +2198,8 @@ public class ShareConsumeRequestManagerTest {
Acknowledgements acknowledgementsTp0 = Acknowledgements.empty();
acknowledgementsTp0.add(1L, AcknowledgeType.ACCEPT);
Acknowledgements acknowledgementsTp1 = Acknowledgements.empty();
acknowledgementsTp1.add(1L, AcknowledgeType.ACCEPT);
acknowledgementsTp1.add(2L, AcknowledgeType.ACCEPT);
Acknowledgements acknowledgementsTp1 = getAcknowledgements(1,
AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT);
Map<TopicIdPartition, NodeAcknowledgements> commitAcks = new HashMap<>();
commitAcks.put(tip0, new NodeAcknowledgements(0, acknowledgementsTp0));
@ -2101,7 +2213,7 @@ public class ShareConsumeRequestManagerTest {
assertNotEquals(startingClusterMetadata, metadata.fetch());
// We fail the acknowledgements for records which were received from node0 with NOT_LEADER_OR_FOLLOWER exception.
shareConsumeRequestManager.commitAsync(commitAcks);
shareConsumeRequestManager.commitAsync(commitAcks, calculateDeadlineMs(time.timer(defaultApiTimeoutMs)));
assertEquals(1, completedAcknowledgements.get(0).size());
assertEquals(acknowledgementsTp0, completedAcknowledgements.get(0).get(tip0));
assertEquals(Errors.NOT_LEADER_OR_FOLLOWER.exception(), completedAcknowledgements.get(0).get(tip0).getAcknowledgeException());
@ -2124,10 +2236,7 @@ public class ShareConsumeRequestManagerTest {
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
subscriptions.subscribeToShareGroup(Collections.singleton(topicName));
Set<TopicPartition> partitions = new HashSet<>();
partitions.add(tp0);
partitions.add(tp1);
subscriptions.assignFromSubscribed(partitions);
subscriptions.assignFromSubscribed(List.of(tp0, tp1));
client.updateMetadata(
RequestTestUtils.metadataUpdateWithIds(2, Map.of(topicName, 2),
@ -2175,9 +2284,8 @@ public class ShareConsumeRequestManagerTest {
Acknowledgements acknowledgementsTp0 = Acknowledgements.empty();
acknowledgementsTp0.add(1L, AcknowledgeType.ACCEPT);
Acknowledgements acknowledgementsTp1 = Acknowledgements.empty();
acknowledgementsTp1.add(1L, AcknowledgeType.ACCEPT);
acknowledgementsTp1.add(2L, AcknowledgeType.ACCEPT);
Acknowledgements acknowledgementsTp1 = getAcknowledgements(1,
AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT);
Map<TopicIdPartition, NodeAcknowledgements> commitAcks = new HashMap<>();
commitAcks.put(tip0, new NodeAcknowledgements(0, acknowledgementsTp0));
@ -2266,9 +2374,8 @@ public class ShareConsumeRequestManagerTest {
Acknowledgements acknowledgementsTp0 = Acknowledgements.empty();
acknowledgementsTp0.add(1L, AcknowledgeType.ACCEPT);
Acknowledgements acknowledgementsTp1 = Acknowledgements.empty();
acknowledgementsTp1.add(1L, AcknowledgeType.ACCEPT);
acknowledgementsTp1.add(2L, AcknowledgeType.ACCEPT);
Acknowledgements acknowledgementsTp1 = getAcknowledgements(1,
AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT);
shareConsumeRequestManager.fetch(Map.of(tip1, new NodeAcknowledgements(1, acknowledgementsTp1)));
@ -2280,7 +2387,8 @@ public class ShareConsumeRequestManagerTest {
assertNotEquals(startingClusterMetadata, metadata.fetch());
// We fail the acknowledgements for records which were received from node0 with NOT_LEADER_OR_FOLLOWER exception.
shareConsumeRequestManager.acknowledgeOnClose(Map.of(tip0, new NodeAcknowledgements(0, acknowledgementsTp0)), calculateDeadlineMs(time.timer(100)));
shareConsumeRequestManager.acknowledgeOnClose(Map.of(tip0, new NodeAcknowledgements(0, acknowledgementsTp0)),
calculateDeadlineMs(time.timer(100)));
// Verify if the callback was invoked with the failed acknowledgements.
assertEquals(1, completedAcknowledgements.get(0).size());
@ -2420,7 +2528,6 @@ public class ShareConsumeRequestManagerTest {
assertTrue(shareConsumeRequestManager.hasCompletedFetches());
assertNull(completedAcknowledgements.get(0).get(tip1).getAcknowledgeException());
completedAcknowledgements.clear();
partitionRecords = fetchRecords();
assertTrue(partitionRecords.containsKey(tp0));
@ -2527,6 +2634,15 @@ public class ShareConsumeRequestManagerTest {
assertTrue(fetch.isEmpty(), reason);
}
private Acknowledgements getAcknowledgements(int startIndex, AcknowledgeType... acknowledgeTypes) {
Acknowledgements acknowledgements = Acknowledgements.empty();
int index = startIndex;
for (AcknowledgeType type : acknowledgeTypes) {
acknowledgements.add(index++, type);
}
return acknowledgements;
}
private <K, V> Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchRecords() {
ShareFetch<K, V> fetch = collectFetch();
if (fetch.isEmpty()) {