mirror of https://github.com/apache/kafka.git
KAFKA-17664: Added check before sending background event after ShareAcknowledge (#17332)
What Currently, we prepare a ShareAcknowledgeCommitCallback event for every ShareAcknowledgeResponse and send it over to the application thread. In cases where the acknowledgement commit callback handler is not configured by the user, this event is not used in the application thread. So we can generate this event based on whether the callback was configured. In this PR, we have a new event which the application thread sends whenever the user enables or disables the commit callback handler, thereby letting the ShareConsumeRequestManager know if it has to send the background event or not. Test We also have a unit test verifying if the ShareConsumeRequestManager sends back the event based on the boolean configured. Reviewers: Andrew Schofield <aschofield@confluent.io>, Manikumar Reddy <manikumar.reddy@gmail.com>
This commit is contained in:
parent
2beadc8bd0
commit
7c34cf6e2c
|
|
@ -92,6 +92,7 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi
|
|||
private final long retryBackoffMaxMs;
|
||||
private boolean closing = false;
|
||||
private final CompletableFuture<Void> closeFuture;
|
||||
private boolean isAcknowledgementCommitCallbackRegistered = false;
|
||||
|
||||
ShareConsumeRequestManager(final Time time,
|
||||
final LogContext logContext,
|
||||
|
|
@ -206,6 +207,7 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi
|
|||
log.debug("Fetch more data");
|
||||
fetchMoreRecords = true;
|
||||
}
|
||||
|
||||
acknowledgementsMap.forEach((tip, acks) -> fetchAcknowledgementsMap.merge(tip, acks, Acknowledgements::merge));
|
||||
}
|
||||
|
||||
|
|
@ -271,6 +273,17 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi
|
|||
return !nodesWithPendingRequests.contains(nodeId);
|
||||
}
|
||||
|
||||
public void setAcknowledgementCommitCallbackRegistered(boolean isAcknowledgementCommitCallbackRegistered) {
|
||||
this.isAcknowledgementCommitCallbackRegistered = isAcknowledgementCommitCallbackRegistered;
|
||||
}
|
||||
|
||||
private void maybeSendShareAcknowledgeCommitCallbackEvent(Map<TopicIdPartition, Acknowledgements> acknowledgementsMap) {
|
||||
if (isAcknowledgementCommitCallbackRegistered) {
|
||||
ShareAcknowledgementCommitCallbackEvent event = new ShareAcknowledgementCommitCallbackEvent(acknowledgementsMap);
|
||||
backgroundEventHandler.add(event);
|
||||
}
|
||||
}
|
||||
|
||||
private Optional<UnsentRequest> maybeBuildRequest(AcknowledgeRequestState acknowledgeRequestState,
|
||||
long currentTimeMs,
|
||||
boolean onCommitAsync,
|
||||
|
|
@ -588,8 +601,7 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi
|
|||
}
|
||||
acks.setAcknowledgeErrorCode(Errors.forCode(partitionData.acknowledgeErrorCode()));
|
||||
Map<TopicIdPartition, Acknowledgements> acksMap = Collections.singletonMap(tip, acks);
|
||||
ShareAcknowledgementCommitCallbackEvent event = new ShareAcknowledgementCommitCallbackEvent(acksMap);
|
||||
backgroundEventHandler.add(event);
|
||||
maybeSendShareAcknowledgeCommitCallbackEvent(acksMap);
|
||||
}
|
||||
|
||||
Errors partitionError = Errors.forCode(partitionData.errorCode());
|
||||
|
|
@ -650,8 +662,7 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi
|
|||
metricsManager.recordFailedAcknowledgements(acks.size());
|
||||
acks.setAcknowledgeErrorCode(Errors.forException(error));
|
||||
Map<TopicIdPartition, Acknowledgements> acksMap = Collections.singletonMap(tip, acks);
|
||||
ShareAcknowledgementCommitCallbackEvent event = new ShareAcknowledgementCommitCallbackEvent(acksMap);
|
||||
backgroundEventHandler.add(event);
|
||||
maybeSendShareAcknowledgeCommitCallbackEvent(acksMap);
|
||||
}
|
||||
}));
|
||||
} finally {
|
||||
|
|
@ -745,8 +756,8 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi
|
|||
acknowledgeRequestState.onFailedAttempt(responseCompletionTimeMs);
|
||||
} else {
|
||||
acknowledgeRequestState.onSuccessfulAttempt(responseCompletionTimeMs);
|
||||
acknowledgeRequestState.processingComplete();
|
||||
}
|
||||
acknowledgeRequestState.processingComplete();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -1085,8 +1096,7 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi
|
|||
// For commitAsync, we do not wait for other results to complete, we prepare a background event
|
||||
// for every ShareAcknowledgeResponse.
|
||||
if (isCommitAsync || (remainingResults != null && remainingResults.decrementAndGet() == 0)) {
|
||||
ShareAcknowledgementCommitCallbackEvent event = new ShareAcknowledgementCommitCallbackEvent(result);
|
||||
backgroundEventHandler.add(event);
|
||||
maybeSendShareAcknowledgeCommitCallbackEvent(result);
|
||||
future.ifPresent(future -> future.complete(result));
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -43,6 +43,7 @@ import org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgeAsyncE
|
|||
import org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgeOnCloseEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgeSyncEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementCommitCallbackEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementCommitCallbackRegistrationEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.ShareFetchEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.ShareSubscriptionChangeEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.ShareUnsubscribeEvent;
|
||||
|
|
@ -765,8 +766,16 @@ public class ShareConsumerImpl<K, V> implements ShareConsumerDelegate<K, V> {
|
|||
acquireAndEnsureOpen();
|
||||
try {
|
||||
if (callback != null) {
|
||||
if (acknowledgementCommitCallbackHandler == null) {
|
||||
ShareAcknowledgementCommitCallbackRegistrationEvent event = new ShareAcknowledgementCommitCallbackRegistrationEvent(true);
|
||||
applicationEventHandler.add(event);
|
||||
}
|
||||
acknowledgementCommitCallbackHandler = new AcknowledgementCommitCallbackHandler(callback);
|
||||
} else {
|
||||
if (acknowledgementCommitCallbackHandler != null) {
|
||||
ShareAcknowledgementCommitCallbackRegistrationEvent event = new ShareAcknowledgementCommitCallbackRegistrationEvent(false);
|
||||
applicationEventHandler.add(event);
|
||||
}
|
||||
completedAcknowledgements.clear();
|
||||
acknowledgementCommitCallbackHandler = null;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -36,6 +36,7 @@ public abstract class ApplicationEvent {
|
|||
SHARE_FETCH, SHARE_ACKNOWLEDGE_ASYNC, SHARE_ACKNOWLEDGE_SYNC,
|
||||
SHARE_SUBSCRIPTION_CHANGE, SHARE_UNSUBSCRIBE,
|
||||
SHARE_ACKNOWLEDGE_ON_CLOSE,
|
||||
SHARE_ACKNOWLEDGEMENT_COMMIT_CALLBACK_REGISTRATION,
|
||||
SEEK_UNVALIDATED,
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -143,6 +143,10 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
|
|||
process((ShareAcknowledgeOnCloseEvent) event);
|
||||
return;
|
||||
|
||||
case SHARE_ACKNOWLEDGEMENT_COMMIT_CALLBACK_REGISTRATION:
|
||||
process((ShareAcknowledgementCommitCallbackRegistrationEvent) event);
|
||||
return;
|
||||
|
||||
case SEEK_UNVALIDATED:
|
||||
process((SeekUnvalidatedEvent) event);
|
||||
return;
|
||||
|
|
@ -384,6 +388,20 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
|
|||
future.whenComplete(complete(event.future()));
|
||||
}
|
||||
|
||||
/**
|
||||
* Process event indicating whether the AcknowledgeCommitCallbackHandler is configured by the user.
|
||||
*
|
||||
* @param event Event containing a boolean to indicate if the callback handler is configured or not.
|
||||
*/
|
||||
private void process(final ShareAcknowledgementCommitCallbackRegistrationEvent event) {
|
||||
if (!requestManagers.shareConsumeRequestManager.isPresent()) {
|
||||
return;
|
||||
}
|
||||
|
||||
ShareConsumeRequestManager manager = requestManagers.shareConsumeRequestManager.get();
|
||||
manager.setAcknowledgementCommitCallbackRegistered(event.isCallbackRegistered());
|
||||
}
|
||||
|
||||
private <T> BiConsumer<? super T, ? super Throwable> complete(final CompletableFuture<T> b) {
|
||||
return (value, exception) -> {
|
||||
if (exception != null)
|
||||
|
|
|
|||
|
|
@ -0,0 +1,31 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.kafka.clients.consumer.internals.events;
|
||||
|
||||
public class ShareAcknowledgementCommitCallbackRegistrationEvent extends ApplicationEvent {
|
||||
|
||||
boolean isCallbackRegistered;
|
||||
|
||||
public ShareAcknowledgementCommitCallbackRegistrationEvent(boolean isCallbackRegistered) {
|
||||
super(Type.SHARE_ACKNOWLEDGEMENT_COMMIT_CALLBACK_REGISTRATION);
|
||||
this.isCallbackRegistered = isCallbackRegistered;
|
||||
}
|
||||
|
||||
public boolean isCallbackRegistered() {
|
||||
return isCallbackRegistered;
|
||||
}
|
||||
}
|
||||
|
|
@ -243,6 +243,8 @@ public class ShareConsumeRequestManagerTest {
|
|||
@Test
|
||||
public void testMultipleFetches() {
|
||||
buildRequestManager();
|
||||
// Enabling the config so that background event is sent when the acknowledgement response is received.
|
||||
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
|
||||
|
||||
assignFromSubscribed(Collections.singleton(tp0));
|
||||
|
||||
|
|
@ -305,6 +307,8 @@ public class ShareConsumeRequestManagerTest {
|
|||
@Test
|
||||
public void testCommitSync() {
|
||||
buildRequestManager();
|
||||
// Enabling the config so that background event is sent when the acknowledgement response is received.
|
||||
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
|
||||
|
||||
assignFromSubscribed(Collections.singleton(tp0));
|
||||
|
||||
|
|
@ -336,6 +340,8 @@ public class ShareConsumeRequestManagerTest {
|
|||
@Test
|
||||
public void testCommitAsync() {
|
||||
buildRequestManager();
|
||||
// Enabling the config so that background event is sent when the acknowledgement response is received.
|
||||
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
|
||||
|
||||
assignFromSubscribed(Collections.singleton(tp0));
|
||||
|
||||
|
|
@ -367,6 +373,8 @@ public class ShareConsumeRequestManagerTest {
|
|||
@Test
|
||||
public void testAcknowledgeOnClose() {
|
||||
buildRequestManager();
|
||||
// Enabling the config so that background event is sent when the acknowledgement response is received.
|
||||
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
|
||||
|
||||
assignFromSubscribed(Collections.singleton(tp0));
|
||||
|
||||
|
|
@ -408,6 +416,8 @@ public class ShareConsumeRequestManagerTest {
|
|||
@Test
|
||||
public void testAcknowledgeOnCloseWithPendingCommitAsync() {
|
||||
buildRequestManager();
|
||||
// Enabling the config so that background event is sent when the acknowledgement response is received.
|
||||
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
|
||||
|
||||
assignFromSubscribed(Collections.singleton(tp0));
|
||||
|
||||
|
|
@ -641,6 +651,55 @@ public class ShareConsumeRequestManagerTest {
|
|||
assertEquals(0, shareConsumeRequestManager.requestStates(0).getSyncRequestQueue().peek().getIncompleteAcknowledgementsCount(tip0));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCallbackHandlerConfig() throws InterruptedException {
|
||||
buildRequestManager();
|
||||
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
|
||||
|
||||
assignFromSubscribed(Collections.singleton(tp0));
|
||||
|
||||
// normal fetch
|
||||
assertEquals(1, sendFetches());
|
||||
assertFalse(shareConsumeRequestManager.hasCompletedFetches());
|
||||
|
||||
client.prepareResponse(fullFetchResponse(tip0, records, acquiredRecords, Errors.NONE));
|
||||
networkClientDelegate.poll(time.timer(0));
|
||||
assertTrue(shareConsumeRequestManager.hasCompletedFetches());
|
||||
|
||||
Acknowledgements acknowledgements = Acknowledgements.empty();
|
||||
acknowledgements.add(1L, AcknowledgeType.ACCEPT);
|
||||
acknowledgements.add(2L, AcknowledgeType.ACCEPT);
|
||||
|
||||
shareConsumeRequestManager.commitAsync(Collections.singletonMap(tip0, acknowledgements));
|
||||
|
||||
assertEquals(1, shareConsumeRequestManager.sendAcknowledgements());
|
||||
|
||||
client.prepareResponse(fullAcknowledgeResponse(tip0, Errors.NONE));
|
||||
networkClientDelegate.poll(time.timer(0));
|
||||
assertTrue(shareConsumeRequestManager.hasCompletedFetches());
|
||||
|
||||
assertEquals(Collections.singletonMap(tip0, acknowledgements), completedAcknowledgements.get(0));
|
||||
|
||||
completedAcknowledgements.clear();
|
||||
|
||||
// Setting the boolean to false, indicating there is no callback handler registered.
|
||||
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(false);
|
||||
|
||||
Acknowledgements acknowledgements2 = Acknowledgements.empty();
|
||||
acknowledgements2.add(3L, AcknowledgeType.ACCEPT);
|
||||
|
||||
shareConsumeRequestManager.commitAsync(Collections.singletonMap(tip0, acknowledgements2));
|
||||
|
||||
TestUtils.retryOnExceptionWithTimeout(() -> assertEquals(1, shareConsumeRequestManager.sendAcknowledgements()));
|
||||
|
||||
client.prepareResponse(fullAcknowledgeResponse(tip0, Errors.NONE));
|
||||
networkClientDelegate.poll(time.timer(0));
|
||||
assertTrue(shareConsumeRequestManager.hasCompletedFetches());
|
||||
|
||||
// We expect no acknowledgements to be added as the callback handler is not configured.
|
||||
assertEquals(0, completedAcknowledgements.size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMultipleTopicsFetch() {
|
||||
buildRequestManager();
|
||||
|
|
|
|||
|
|
@ -16,6 +16,7 @@
|
|||
*/
|
||||
package org.apache.kafka.clients.consumer.internals;
|
||||
|
||||
import org.apache.kafka.clients.consumer.AcknowledgementCommitCallback;
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
|
||||
|
|
@ -25,6 +26,7 @@ import org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper
|
|||
import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.PollEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgeOnCloseEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementCommitCallbackRegistrationEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.ShareSubscriptionChangeEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.ShareUnsubscribeEvent;
|
||||
import org.apache.kafka.common.KafkaException;
|
||||
|
|
@ -66,9 +68,11 @@ import static org.junit.jupiter.api.Assertions.assertNull;
|
|||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.ArgumentMatchers.argThat;
|
||||
import static org.mockito.Mockito.doAnswer;
|
||||
import static org.mockito.Mockito.doReturn;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
|
|
@ -250,6 +254,41 @@ public class ShareConsumerImplTest {
|
|||
verify(applicationEventHandler).add(any(ShareUnsubscribeEvent.class));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAcknowledgementCommitCallbackRegistrationEvent() {
|
||||
consumer = newConsumer();
|
||||
AcknowledgementCommitCallback callback = mock(AcknowledgementCommitCallback.class);
|
||||
|
||||
consumer.setAcknowledgementCommitCallback(callback);
|
||||
verify(applicationEventHandler).add(argThat(event ->
|
||||
event instanceof ShareAcknowledgementCommitCallbackRegistrationEvent &&
|
||||
((ShareAcknowledgementCommitCallbackRegistrationEvent) event).isCallbackRegistered()
|
||||
));
|
||||
|
||||
consumer.setAcknowledgementCommitCallback(callback);
|
||||
// As we have already set the callback, we should not add another event. We only add when we initially register.
|
||||
verify(applicationEventHandler, times(1)).add(any(ShareAcknowledgementCommitCallbackRegistrationEvent.class));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAcknowledgementCommitCallbackRegistrationEvent_Null() {
|
||||
consumer = newConsumer();
|
||||
AcknowledgementCommitCallback callback = mock(AcknowledgementCommitCallback.class);
|
||||
|
||||
consumer.setAcknowledgementCommitCallback(null);
|
||||
// Initially callback is set to null, setting again to null should not add an event.
|
||||
verify(applicationEventHandler, times(0)).add(any(ShareAcknowledgementCommitCallbackRegistrationEvent.class));
|
||||
|
||||
consumer.setAcknowledgementCommitCallback(callback);
|
||||
verify(applicationEventHandler, times(1)).add(any(ShareAcknowledgementCommitCallbackRegistrationEvent.class));
|
||||
|
||||
// Now we are changing from a non-null callback to null, this should add an event.
|
||||
consumer.setAcknowledgementCommitCallback(null);
|
||||
verify(applicationEventHandler).add(argThat(event ->
|
||||
event instanceof ShareAcknowledgementCommitCallbackRegistrationEvent &&
|
||||
!((ShareAcknowledgementCommitCallbackRegistrationEvent) event).isCallbackRegistered()));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCompleteQuietly() {
|
||||
AtomicReference<Throwable> exception = new AtomicReference<>();
|
||||
|
|
|
|||
Loading…
Reference in New Issue