From a24fedfba00d249bbacb1f1e139771ed196614e8 Mon Sep 17 00:00:00 2001 From: DL1231 <53332773+DL1231@users.noreply.github.com> Date: Mon, 3 Mar 2025 17:49:37 +0800 Subject: [PATCH] KAFKA-18817:[1/N] ShareGroupHeartbeat and ShareGroupDescribe API must check topic describe (#19055) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1、Client support for TopicAuthException in DescribeShareGroup and HB path 2、ShareConsumerImpl#sendAcknowledgementsAndLeaveGroup swallow TopicAuthorizationException and GroupAuthorizationException Reviewers: ShivsundarR , Andrew Schofield --- .../internals/DescribeShareGroupsHandler.java | 2 ++ .../consumer/internals/ShareConsumerImpl.java | 27 ++++++++++---- .../requests/ShareGroupDescribeResponse.java | 1 + .../requests/ShareGroupHeartbeatResponse.java | 1 + .../message/ShareGroupDescribeResponse.json | 1 + .../message/ShareGroupHeartbeatResponse.json | 3 +- .../internals/ShareConsumerImplTest.java | 35 +++++++++++++++---- 7 files changed, 57 insertions(+), 13 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeShareGroupsHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeShareGroupsHandler.java index 1c79225b083..1aa37bbf776 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeShareGroupsHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeShareGroupsHandler.java @@ -159,7 +159,9 @@ public class DescribeShareGroupsHandler extends AdminApiHandler.Batched groupsToUnmap) { switch (error) { case GROUP_AUTHORIZATION_FAILED: + case TOPIC_AUTHORIZATION_FAILED: log.debug("`DescribeShareGroups` request for group id {} failed due to error {}", groupId.idValue, error); + // The topic auth response received on DescribeShareGroup is a generic one not including topic names, so we just pass it on unchanged here. failed.put(groupId, error.exception(errorMsg)); break; diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java index e808ecf5896..4dc8cf4c697 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java @@ -54,9 +54,11 @@ import org.apache.kafka.common.Metric; import org.apache.kafka.common.MetricName; import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.GroupAuthorizationException; import org.apache.kafka.common.errors.InterruptException; import org.apache.kafka.common.errors.InvalidGroupIdException; import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.errors.TopicAuthorizationException; import org.apache.kafka.common.internals.ClusterResourceListeners; import org.apache.kafka.common.metrics.KafkaMetric; import org.apache.kafka.common.metrics.Metrics; @@ -92,6 +94,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Predicate; import java.util.function.Supplier; import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_JMX_PREFIX; @@ -914,7 +917,10 @@ public class ShareConsumerImpl implements ShareConsumerDelegate { ShareUnsubscribeEvent unsubscribeEvent = new ShareUnsubscribeEvent(calculateDeadlineMs(timer)); applicationEventHandler.add(unsubscribeEvent); try { - processBackgroundEvents(unsubscribeEvent.future(), timer); + // If users have fatal error, they will get some exceptions in the background queue. + // When running unsubscribe, these exceptions should be ignored, or users can't unsubscribe successfully. + processBackgroundEvents(unsubscribeEvent.future(), timer, e -> (e instanceof GroupAuthorizationException + || e instanceof TopicAuthorizationException)); log.info("Completed releasing assignment and leaving group to close consumer."); } catch (TimeoutException e) { log.warn("Consumer triggered an unsubscribe event to leave the group but couldn't " + @@ -1107,18 +1113,27 @@ public class ShareConsumerImpl implements ShareConsumerDelegate { * Each iteration gives the application thread an opportunity to process background events, which may be * necessary to complete the overall processing. * - * @param future Event that contains a {@link CompletableFuture}; it is on this future that the - * application thread will wait for completion - * @param timer Overall timer that bounds how long to wait for the event to complete + * @param future Event that contains a {@link CompletableFuture}; it is on this future that the + * application thread will wait for completion + * @param timer Overall timer that bounds how long to wait for the event to complete + * @param ignoreErrorEventException Predicate to ignore background errors. + * Any exceptions found while processing background events that match the predicate won't be propagated. * @return {@code true} if the event completed within the timeout, {@code false} otherwise */ // Visible for testing T processBackgroundEvents(final Future future, - final Timer timer) { + final Timer timer, + final Predicate ignoreErrorEventException) { log.trace("Will wait up to {} ms for future {} to complete", timer.remainingMs(), future); do { - boolean hadEvents = processBackgroundEvents(); + boolean hadEvents = false; + try { + hadEvents = processBackgroundEvents(); + } catch (Exception e) { + if (!ignoreErrorEventException.test(e)) + throw e; + } try { if (future.isDone()) { diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ShareGroupDescribeResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ShareGroupDescribeResponse.java index 95dd371eedf..fc25658d703 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ShareGroupDescribeResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ShareGroupDescribeResponse.java @@ -35,6 +35,7 @@ import java.util.Map; * - {@link Errors#INVALID_REQUEST} * - {@link Errors#INVALID_GROUP_ID} * - {@link Errors#GROUP_ID_NOT_FOUND} + * - {@link Errors#TOPIC_AUTHORIZATION_FAILED} */ public class ShareGroupDescribeResponse extends AbstractResponse { diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ShareGroupHeartbeatResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ShareGroupHeartbeatResponse.java index de05d44aebe..08ce09e1f4b 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ShareGroupHeartbeatResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ShareGroupHeartbeatResponse.java @@ -35,6 +35,7 @@ import java.util.Map; * - {@link Errors#INVALID_REQUEST} * - {@link Errors#UNKNOWN_MEMBER_ID} * - {@link Errors#GROUP_MAX_SIZE_REACHED} + * - {@link Errors#TOPIC_AUTHORIZATION_FAILED} */ public class ShareGroupHeartbeatResponse extends AbstractResponse { private final ShareGroupHeartbeatResponseData data; diff --git a/clients/src/main/resources/common/message/ShareGroupDescribeResponse.json b/clients/src/main/resources/common/message/ShareGroupDescribeResponse.json index c093b788bfc..e90e431f64e 100644 --- a/clients/src/main/resources/common/message/ShareGroupDescribeResponse.json +++ b/clients/src/main/resources/common/message/ShareGroupDescribeResponse.json @@ -27,6 +27,7 @@ // - INVALID_REQUEST (version 0+) // - INVALID_GROUP_ID (version 0+) // - GROUP_ID_NOT_FOUND (version 0+) + // - TOPIC_AUTHORIZATION_FAILED (version 0+) "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, diff --git a/clients/src/main/resources/common/message/ShareGroupHeartbeatResponse.json b/clients/src/main/resources/common/message/ShareGroupHeartbeatResponse.json index e0ff5a93d54..75aa62b76f4 100644 --- a/clients/src/main/resources/common/message/ShareGroupHeartbeatResponse.json +++ b/clients/src/main/resources/common/message/ShareGroupHeartbeatResponse.json @@ -21,12 +21,13 @@ "flexibleVersions": "0+", // Supported errors: // - GROUP_AUTHORIZATION_FAILED (version 0+) - // - NOT_COORDINATOR (version 0+) + // - NOT_COORDINATOR (version 0+) // - COORDINATOR_NOT_AVAILABLE (version 0+) // - COORDINATOR_LOAD_IN_PROGRESS (version 0+) // - INVALID_REQUEST (version 0+) // - UNKNOWN_MEMBER_ID (version 0+) // - GROUP_MAX_SIZE_REACHED (version 0+) + // - TOPIC_AUTHORIZATION_FAILED (version 0+) "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java index 6f5e78451b1..688547a3975 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java @@ -34,6 +34,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.errors.InvalidGroupIdException; import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.errors.TopicAuthorizationException; import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.serialization.StringDeserializer; @@ -53,12 +54,14 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Properties; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Predicate; import static java.util.Collections.singleton; import static java.util.Collections.singletonList; @@ -271,6 +274,26 @@ public class ShareConsumerImplTest { assertEquals("This consumer has already been closed.", res.getMessage()); } + @Test + public void testUnsubscribeWithTopicAuthorizationException() { + SubscriptionState subscriptions = new SubscriptionState(new LogContext(), AutoOffsetResetStrategy.NONE); + consumer = newConsumer(subscriptions); + + backgroundEventQueue.add(new ErrorEvent(new TopicAuthorizationException(Set.of("test-topic")))); + completeShareUnsubscribeApplicationEventSuccessfully(subscriptions); + assertDoesNotThrow(() -> consumer.unsubscribe()); + assertDoesNotThrow(() -> consumer.close()); + } + + @Test + public void testCloseWithTopicAuthorizationException() { + SubscriptionState subscriptions = new SubscriptionState(new LogContext(), AutoOffsetResetStrategy.NONE); + consumer = newConsumer(subscriptions); + + completeShareUnsubscribeApplicationEventSuccessfully(subscriptions); + assertDoesNotThrow(() -> consumer.close()); + } + @Test public void testVerifyApplicationEventOnShutdown() { SubscriptionState subscriptions = new SubscriptionState(new LogContext(), AutoOffsetResetStrategy.NONE); @@ -502,7 +525,7 @@ public class ShareConsumerImplTest { } /** - * Tests {@link ShareConsumerImpl#processBackgroundEvents(Future, Timer) processBackgroundEvents} + * Tests {@link ShareConsumerImpl#processBackgroundEvents(Future, Timer, Predicate) processBackgroundEvents} * handles the case where the {@link Future} takes a bit of time to complete, but does within the timeout. */ @Test @@ -529,14 +552,14 @@ public class ShareConsumerImplTest { return null; }).when(future).get(any(Long.class), any(TimeUnit.class)); - consumer.processBackgroundEvents(future, timer); + consumer.processBackgroundEvents(future, timer, e -> false); // 800 is the 1000 ms timeout (above) minus the 200 ms delay for the two incremental timeouts/retries. assertEquals(800, timer.remainingMs()); } /** - * Tests {@link ShareConsumerImpl#processBackgroundEvents(Future, Timer) processBackgroundEvents} + * Tests {@link ShareConsumerImpl#processBackgroundEvents(Future, Timer, Predicate) processBackgroundEvents} * handles the case where the {@link Future} is already complete when invoked, so it doesn't have to wait. */ @Test @@ -548,7 +571,7 @@ public class ShareConsumerImplTest { // Create a future that is already completed. CompletableFuture future = CompletableFuture.completedFuture(null); - consumer.processBackgroundEvents(future, timer); + consumer.processBackgroundEvents(future, timer, e -> false); // Because we didn't need to perform a timed get, we should still have every last millisecond // of our initial timeout. @@ -556,7 +579,7 @@ public class ShareConsumerImplTest { } /** - * Tests {@link ShareConsumerImpl#processBackgroundEvents(Future, Timer) processBackgroundEvents} + * Tests {@link ShareConsumerImpl#processBackgroundEvents(Future, Timer, Predicate) processBackgroundEvents} * handles the case where the {@link Future} does not complete within the timeout. */ @Test @@ -572,7 +595,7 @@ public class ShareConsumerImplTest { throw new java.util.concurrent.TimeoutException("Intentional timeout"); }).when(future).get(any(Long.class), any(TimeUnit.class)); - assertThrows(TimeoutException.class, () -> consumer.processBackgroundEvents(future, timer)); + assertThrows(TimeoutException.class, () -> consumer.processBackgroundEvents(future, timer, e -> false)); // Because we forced our mocked future to continuously time out, we should have no time remaining. assertEquals(0, timer.remainingMs());