KAFKA-19229: Ignore background errors while closing share consumers. (Fix flaky test) (#19647)
CI / build (push) Waiting to run Details

- A couple of newly added tests were found to be flaky in
`AuthorizerIntegrationTest.scala`.
- `testShareGroupDescribeWithGroupDescribeAndTopicDescribeAcl` and
`testShareGroupDescribeWithoutGroupDescribeAcl`. These tests pass
locally, so could not replicate the failure.
- But logs from develocity indicated that the test fails when the
following condition happens :
   When the background error event arrives after the consumer had
unsubscribed, then these events are processed in the
`handleCompletedAcknowledgements` method and the exception from the
event is thrown, preventing `close()` to complete.

- We need to handle this race condition where we might get the
background event after unsubscribe and before processing the callbacks.
- PR fixes this by ignoring the exceptions in the background queue when
the `handleCompletedAcknowledgements` method is called during `close()`.
This ensures `close()` completes successfully.
- Have added a unit test which mimics the race condition as well.

Reviewers: Andrew Schofield <aschofield@confluent.io>
This commit is contained in:
Shivsundar R 2025-05-09 06:20:09 -04:00 committed by GitHub
parent 70c0aca4b7
commit 58c08441d1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 48 additions and 6 deletions

View File

@ -567,7 +567,7 @@ public class ShareConsumerImpl<K, V> implements ShareConsumerDelegate<K, V> {
acquireAndEnsureOpen(); acquireAndEnsureOpen();
try { try {
// Handle any completed acknowledgements for which we already have the responses // Handle any completed acknowledgements for which we already have the responses
handleCompletedAcknowledgements(); handleCompletedAcknowledgements(false);
// If using implicit acknowledgement, acknowledge the previously fetched records // If using implicit acknowledgement, acknowledge the previously fetched records
acknowledgeBatchIfImplicitAcknowledgement(); acknowledgeBatchIfImplicitAcknowledgement();
@ -708,7 +708,7 @@ public class ShareConsumerImpl<K, V> implements ShareConsumerDelegate<K, V> {
acquireAndEnsureOpen(); acquireAndEnsureOpen();
try { try {
// Handle any completed acknowledgements for which we already have the responses // Handle any completed acknowledgements for which we already have the responses
handleCompletedAcknowledgements(); handleCompletedAcknowledgements(false);
// If using implicit acknowledgement, acknowledge the previously fetched records // If using implicit acknowledgement, acknowledge the previously fetched records
acknowledgeBatchIfImplicitAcknowledgement(); acknowledgeBatchIfImplicitAcknowledgement();
@ -752,7 +752,7 @@ public class ShareConsumerImpl<K, V> implements ShareConsumerDelegate<K, V> {
acquireAndEnsureOpen(); acquireAndEnsureOpen();
try { try {
// Handle any completed acknowledgements for which we already have the responses // Handle any completed acknowledgements for which we already have the responses
handleCompletedAcknowledgements(); handleCompletedAcknowledgements(false);
// If using implicit acknowledgement, acknowledge the previously fetched records // If using implicit acknowledgement, acknowledge the previously fetched records
acknowledgeBatchIfImplicitAcknowledgement(); acknowledgeBatchIfImplicitAcknowledgement();
@ -883,7 +883,7 @@ public class ShareConsumerImpl<K, V> implements ShareConsumerDelegate<K, V> {
swallow(log, Level.ERROR, "Failed to stop finding coordinator", swallow(log, Level.ERROR, "Failed to stop finding coordinator",
this::stopFindCoordinatorOnClose, firstException); this::stopFindCoordinatorOnClose, firstException);
swallow(log, Level.ERROR, "Failed invoking acknowledgement commit callback", swallow(log, Level.ERROR, "Failed invoking acknowledgement commit callback",
this::handleCompletedAcknowledgements, firstException); () -> handleCompletedAcknowledgements(true), firstException);
if (applicationEventHandler != null) if (applicationEventHandler != null)
closeQuietly(() -> applicationEventHandler.close(Duration.ofMillis(closeTimer.remainingMs())), "Failed shutting down network thread", firstException); closeQuietly(() -> applicationEventHandler.close(Duration.ofMillis(closeTimer.remainingMs())), "Failed shutting down network thread", firstException);
closeTimer.update(); closeTimer.update();
@ -1017,8 +1017,12 @@ public class ShareConsumerImpl<K, V> implements ShareConsumerDelegate<K, V> {
* <p> * <p>
* If the acknowledgement commit callback throws an exception, this method will throw an exception. * If the acknowledgement commit callback throws an exception, this method will throw an exception.
*/ */
private void handleCompletedAcknowledgements() { private void handleCompletedAcknowledgements(boolean onClose) {
processBackgroundEvents(); // If the user gets any fatal errors, they will get these exceptions in the background queue.
// While closing, we ignore these exceptions so that the consumers close successfully.
processBackgroundEvents(onClose ? e -> (e instanceof GroupAuthorizationException
|| e instanceof TopicAuthorizationException
|| e instanceof InvalidTopicException) : e -> false);
if (!completedAcknowledgements.isEmpty()) { if (!completedAcknowledgements.isEmpty()) {
try { try {
@ -1065,6 +1069,15 @@ public class ShareConsumerImpl<K, V> implements ShareConsumerDelegate<K, V> {
return ShareAcknowledgementMode.fromString(s); return ShareAcknowledgementMode.fromString(s);
} }
private void processBackgroundEvents(final Predicate<Exception> ignoreErrorEventException) {
try {
processBackgroundEvents();
} catch (Exception e) {
if (!ignoreErrorEventException.test(e))
throw e;
}
}
/** /**
* Process the eventsif anythat were produced by the {@link ConsumerNetworkThread network thread}. * Process the eventsif anythat were produced by the {@link ConsumerNetworkThread network thread}.
* It is possible that {@link ErrorEvent an error} * It is possible that {@link ErrorEvent an error}

View File

@ -41,6 +41,7 @@ import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicAuthorizationException; import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.MockTime;
@ -50,6 +51,8 @@ import org.apache.kafka.test.MockConsumerInterceptor;
import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.mockito.ArgumentMatchers; import org.mockito.ArgumentMatchers;
import org.mockito.InOrder; import org.mockito.InOrder;
import org.mockito.Mockito; import org.mockito.Mockito;
@ -672,6 +675,32 @@ public class ShareConsumerImplTest {
verify(applicationEventHandler).addAndGet(any(ShareAcknowledgeOnCloseEvent.class)); verify(applicationEventHandler).addAndGet(any(ShareAcknowledgeOnCloseEvent.class));
} }
@ParameterizedTest
@EnumSource(value = Errors.class, names = {"TOPIC_AUTHORIZATION_FAILED", "GROUP_AUTHORIZATION_FAILED", "INVALID_TOPIC_EXCEPTION"})
public void testCloseWithBackgroundQueueErrorsAfterUnsubscribe(Errors error) {
SubscriptionState subscriptions = new SubscriptionState(new LogContext(), AutoOffsetResetStrategy.NONE);
consumer = newConsumer(subscriptions);
// Complete the acknowledge on close event successfully
completeShareAcknowledgeOnCloseApplicationEventSuccessfully();
// Complete the unsubscribe event successfully
completeShareUnsubscribeApplicationEventSuccessfully(subscriptions);
// Mock the applicationEventHandler to add errors to the queue after unsubscribe
doAnswer(invocation -> {
// Add errors to the queue after unsubscribe event is processed
backgroundEventQueue.add(new ErrorEvent(error.exception()));
return null;
}).when(applicationEventHandler).add(any(StopFindCoordinatorOnCloseEvent.class));
// Close should complete successfully despite the errors in the background queue
assertDoesNotThrow(() -> consumer.close());
// Verify that the background queue was processed
assertTrue(backgroundEventQueue.isEmpty(), "Background queue should be empty after close");
}
private Properties requiredConsumerPropertiesAndGroupId(final String groupId) { private Properties requiredConsumerPropertiesAndGroupId(final String groupId) {
final Properties props = requiredConsumerProperties(); final Properties props = requiredConsumerProperties();
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);