diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java index 6aa334d487b..a83e971600e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java @@ -1169,14 +1169,16 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi * signal the completion when all results are known. */ public void complete(TopicIdPartition partition, Acknowledgements acknowledgements, boolean isCommitAsync) { - if (acknowledgements != null) { + if (!isCommitAsync && acknowledgements != null) { result.put(partition, acknowledgements); } // For commitAsync, we do not wait for other results to complete, we prepare a background event // for every ShareAcknowledgeResponse. // For commitAsync, we send out a background event for every TopicIdPartition, so we use a singletonMap each time. if (isCommitAsync) { - maybeSendShareAcknowledgeCommitCallbackEvent(Collections.singletonMap(partition, acknowledgements)); + if (acknowledgements != null) { + maybeSendShareAcknowledgeCommitCallbackEvent(Collections.singletonMap(partition, acknowledgements)); + } } else if (remainingResults != null && remainingResults.decrementAndGet() == 0) { maybeSendShareAcknowledgeCommitCallbackEvent(result); future.ifPresent(future -> future.complete(result)); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java index 1b1ed587203..640eadc0e77 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java @@ -96,8 +96,10 @@ import java.util.Map; import java.util.Optional; import java.util.Properties; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -548,6 +550,89 @@ public class ShareConsumeRequestManagerTest { completedAcknowledgements.clear(); } + @Test + public void testResultHandlerOnCommitAsync() { + buildRequestManager(); + // Enabling the config so that background event is sent when the acknowledgement response is received. + shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true); + + Acknowledgements acknowledgements = Acknowledgements.empty(); + acknowledgements.add(1L, AcknowledgeType.ACCEPT); + acknowledgements.add(2L, AcknowledgeType.ACCEPT); + acknowledgements.add(3L, AcknowledgeType.REJECT); + + ShareConsumeRequestManager.ResultHandler resultHandler = shareConsumeRequestManager.buildResultHandler(null, Optional.empty()); + + // Passing null acknowledgements should mean we do not send the background event at all. + resultHandler.complete(tip0, null, true); + assertEquals(0, completedAcknowledgements.size()); + + // Setting isCommitAsync to false should still not send any background event + // as we have initialized remainingResults to null. + resultHandler.complete(tip0, acknowledgements, false); + assertEquals(0, completedAcknowledgements.size()); + + // Sending non-null acknowledgements means we do send the background event + resultHandler.complete(tip0, acknowledgements, true); + assertEquals(3, completedAcknowledgements.get(0).get(tip0).size()); + } + + @Test + public void testResultHandlerOnCommitSync() { + buildRequestManager(); + // Enabling the config so that background event is sent when the acknowledgement response is received. + shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true); + + Acknowledgements acknowledgements = Acknowledgements.empty(); + acknowledgements.add(1L, AcknowledgeType.ACCEPT); + acknowledgements.add(2L, AcknowledgeType.ACCEPT); + acknowledgements.add(3L, AcknowledgeType.REJECT); + + final CompletableFuture> future = new CompletableFuture<>(); + + // Initializing resultCount to 3. + AtomicInteger resultCount = new AtomicInteger(3); + + ShareConsumeRequestManager.ResultHandler resultHandler = shareConsumeRequestManager.buildResultHandler(resultCount, Optional.of(future)); + + // We only send the background event after all results have been completed. + resultHandler.complete(tip0, acknowledgements, false); + assertEquals(0, completedAcknowledgements.size()); + assertFalse(future.isDone()); + + resultHandler.complete(t2ip0, null, false); + assertEquals(0, completedAcknowledgements.size()); + assertFalse(future.isDone()); + + // After third response is received, we send the background event. + resultHandler.complete(tip1, acknowledgements, false); + assertEquals(1, completedAcknowledgements.size()); + assertEquals(2, completedAcknowledgements.get(0).size()); + assertEquals(3, completedAcknowledgements.get(0).get(tip0).size()); + assertEquals(3, completedAcknowledgements.get(0).get(tip1).size()); + assertTrue(future.isDone()); + } + + @Test + public void testResultHandlerCompleteIfEmpty() { + buildRequestManager(); + + final CompletableFuture> future = new CompletableFuture<>(); + + // Initializing resultCount to 1. + AtomicInteger resultCount = new AtomicInteger(1); + + ShareConsumeRequestManager.ResultHandler resultHandler = shareConsumeRequestManager.buildResultHandler(resultCount, Optional.of(future)); + + resultHandler.completeIfEmpty(); + assertFalse(future.isDone()); + + resultCount.decrementAndGet(); + + resultHandler.completeIfEmpty(); + assertTrue(future.isDone()); + } + @Test public void testBatchingAcknowledgeRequestStates() { buildRequestManager(); @@ -1730,6 +1815,11 @@ public class ShareConsumeRequestManagerTest { return pollResult.unsentRequests.size(); } + public ResultHandler buildResultHandler(final AtomicInteger remainingResults, + final Optional>> future) { + return new ResultHandler(remainingResults, future); + } + public Tuple requestStates(int nodeId) { return super.requestStates(nodeId); }