mirror of https://github.com/apache/kafka.git
KAFKA-18817:[1/N] ShareGroupHeartbeat and ShareGroupDescribe API must check topic describe (#19055)
1、Client support for TopicAuthException in DescribeShareGroup and HB path 2、ShareConsumerImpl#sendAcknowledgementsAndLeaveGroup swallow TopicAuthorizationException and GroupAuthorizationException Reviewers: ShivsundarR <shr@confluent.io>, Andrew Schofield <aschofield@confluent.io>
This commit is contained in:
parent
898dcd11ad
commit
a24fedfba0
|
@ -159,7 +159,9 @@ public class DescribeShareGroupsHandler extends AdminApiHandler.Batched<Coordina
|
||||||
Set<CoordinatorKey> groupsToUnmap) {
|
Set<CoordinatorKey> groupsToUnmap) {
|
||||||
switch (error) {
|
switch (error) {
|
||||||
case GROUP_AUTHORIZATION_FAILED:
|
case GROUP_AUTHORIZATION_FAILED:
|
||||||
|
case TOPIC_AUTHORIZATION_FAILED:
|
||||||
log.debug("`DescribeShareGroups` request for group id {} failed due to error {}", groupId.idValue, error);
|
log.debug("`DescribeShareGroups` request for group id {} failed due to error {}", groupId.idValue, error);
|
||||||
|
// The topic auth response received on DescribeShareGroup is a generic one not including topic names, so we just pass it on unchanged here.
|
||||||
failed.put(groupId, error.exception(errorMsg));
|
failed.put(groupId, error.exception(errorMsg));
|
||||||
break;
|
break;
|
||||||
|
|
||||||
|
|
|
@ -54,9 +54,11 @@ import org.apache.kafka.common.Metric;
|
||||||
import org.apache.kafka.common.MetricName;
|
import org.apache.kafka.common.MetricName;
|
||||||
import org.apache.kafka.common.TopicIdPartition;
|
import org.apache.kafka.common.TopicIdPartition;
|
||||||
import org.apache.kafka.common.Uuid;
|
import org.apache.kafka.common.Uuid;
|
||||||
|
import org.apache.kafka.common.errors.GroupAuthorizationException;
|
||||||
import org.apache.kafka.common.errors.InterruptException;
|
import org.apache.kafka.common.errors.InterruptException;
|
||||||
import org.apache.kafka.common.errors.InvalidGroupIdException;
|
import org.apache.kafka.common.errors.InvalidGroupIdException;
|
||||||
import org.apache.kafka.common.errors.TimeoutException;
|
import org.apache.kafka.common.errors.TimeoutException;
|
||||||
|
import org.apache.kafka.common.errors.TopicAuthorizationException;
|
||||||
import org.apache.kafka.common.internals.ClusterResourceListeners;
|
import org.apache.kafka.common.internals.ClusterResourceListeners;
|
||||||
import org.apache.kafka.common.metrics.KafkaMetric;
|
import org.apache.kafka.common.metrics.KafkaMetric;
|
||||||
import org.apache.kafka.common.metrics.Metrics;
|
import org.apache.kafka.common.metrics.Metrics;
|
||||||
|
@ -92,6 +94,7 @@ import java.util.concurrent.LinkedBlockingQueue;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
import java.util.function.Predicate;
|
||||||
import java.util.function.Supplier;
|
import java.util.function.Supplier;
|
||||||
|
|
||||||
import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_JMX_PREFIX;
|
import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_JMX_PREFIX;
|
||||||
|
@ -914,7 +917,10 @@ public class ShareConsumerImpl<K, V> implements ShareConsumerDelegate<K, V> {
|
||||||
ShareUnsubscribeEvent unsubscribeEvent = new ShareUnsubscribeEvent(calculateDeadlineMs(timer));
|
ShareUnsubscribeEvent unsubscribeEvent = new ShareUnsubscribeEvent(calculateDeadlineMs(timer));
|
||||||
applicationEventHandler.add(unsubscribeEvent);
|
applicationEventHandler.add(unsubscribeEvent);
|
||||||
try {
|
try {
|
||||||
processBackgroundEvents(unsubscribeEvent.future(), timer);
|
// If users have fatal error, they will get some exceptions in the background queue.
|
||||||
|
// When running unsubscribe, these exceptions should be ignored, or users can't unsubscribe successfully.
|
||||||
|
processBackgroundEvents(unsubscribeEvent.future(), timer, e -> (e instanceof GroupAuthorizationException
|
||||||
|
|| e instanceof TopicAuthorizationException));
|
||||||
log.info("Completed releasing assignment and leaving group to close consumer.");
|
log.info("Completed releasing assignment and leaving group to close consumer.");
|
||||||
} catch (TimeoutException e) {
|
} catch (TimeoutException e) {
|
||||||
log.warn("Consumer triggered an unsubscribe event to leave the group but couldn't " +
|
log.warn("Consumer triggered an unsubscribe event to leave the group but couldn't " +
|
||||||
|
@ -1107,18 +1113,27 @@ public class ShareConsumerImpl<K, V> implements ShareConsumerDelegate<K, V> {
|
||||||
* Each iteration gives the application thread an opportunity to process background events, which may be
|
* Each iteration gives the application thread an opportunity to process background events, which may be
|
||||||
* necessary to complete the overall processing.
|
* necessary to complete the overall processing.
|
||||||
*
|
*
|
||||||
* @param future Event that contains a {@link CompletableFuture}; it is on this future that the
|
* @param future Event that contains a {@link CompletableFuture}; it is on this future that the
|
||||||
* application thread will wait for completion
|
* application thread will wait for completion
|
||||||
* @param timer Overall timer that bounds how long to wait for the event to complete
|
* @param timer Overall timer that bounds how long to wait for the event to complete
|
||||||
|
* @param ignoreErrorEventException Predicate to ignore background errors.
|
||||||
|
* Any exceptions found while processing background events that match the predicate won't be propagated.
|
||||||
* @return {@code true} if the event completed within the timeout, {@code false} otherwise
|
* @return {@code true} if the event completed within the timeout, {@code false} otherwise
|
||||||
*/
|
*/
|
||||||
// Visible for testing
|
// Visible for testing
|
||||||
<T> T processBackgroundEvents(final Future<T> future,
|
<T> T processBackgroundEvents(final Future<T> future,
|
||||||
final Timer timer) {
|
final Timer timer,
|
||||||
|
final Predicate<Exception> ignoreErrorEventException) {
|
||||||
log.trace("Will wait up to {} ms for future {} to complete", timer.remainingMs(), future);
|
log.trace("Will wait up to {} ms for future {} to complete", timer.remainingMs(), future);
|
||||||
|
|
||||||
do {
|
do {
|
||||||
boolean hadEvents = processBackgroundEvents();
|
boolean hadEvents = false;
|
||||||
|
try {
|
||||||
|
hadEvents = processBackgroundEvents();
|
||||||
|
} catch (Exception e) {
|
||||||
|
if (!ignoreErrorEventException.test(e))
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
if (future.isDone()) {
|
if (future.isDone()) {
|
||||||
|
|
|
@ -35,6 +35,7 @@ import java.util.Map;
|
||||||
* - {@link Errors#INVALID_REQUEST}
|
* - {@link Errors#INVALID_REQUEST}
|
||||||
* - {@link Errors#INVALID_GROUP_ID}
|
* - {@link Errors#INVALID_GROUP_ID}
|
||||||
* - {@link Errors#GROUP_ID_NOT_FOUND}
|
* - {@link Errors#GROUP_ID_NOT_FOUND}
|
||||||
|
* - {@link Errors#TOPIC_AUTHORIZATION_FAILED}
|
||||||
*/
|
*/
|
||||||
public class ShareGroupDescribeResponse extends AbstractResponse {
|
public class ShareGroupDescribeResponse extends AbstractResponse {
|
||||||
|
|
||||||
|
|
|
@ -35,6 +35,7 @@ import java.util.Map;
|
||||||
* - {@link Errors#INVALID_REQUEST}
|
* - {@link Errors#INVALID_REQUEST}
|
||||||
* - {@link Errors#UNKNOWN_MEMBER_ID}
|
* - {@link Errors#UNKNOWN_MEMBER_ID}
|
||||||
* - {@link Errors#GROUP_MAX_SIZE_REACHED}
|
* - {@link Errors#GROUP_MAX_SIZE_REACHED}
|
||||||
|
* - {@link Errors#TOPIC_AUTHORIZATION_FAILED}
|
||||||
*/
|
*/
|
||||||
public class ShareGroupHeartbeatResponse extends AbstractResponse {
|
public class ShareGroupHeartbeatResponse extends AbstractResponse {
|
||||||
private final ShareGroupHeartbeatResponseData data;
|
private final ShareGroupHeartbeatResponseData data;
|
||||||
|
|
|
@ -27,6 +27,7 @@
|
||||||
// - INVALID_REQUEST (version 0+)
|
// - INVALID_REQUEST (version 0+)
|
||||||
// - INVALID_GROUP_ID (version 0+)
|
// - INVALID_GROUP_ID (version 0+)
|
||||||
// - GROUP_ID_NOT_FOUND (version 0+)
|
// - GROUP_ID_NOT_FOUND (version 0+)
|
||||||
|
// - TOPIC_AUTHORIZATION_FAILED (version 0+)
|
||||||
"fields": [
|
"fields": [
|
||||||
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
|
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
|
||||||
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
|
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
|
||||||
|
|
|
@ -21,12 +21,13 @@
|
||||||
"flexibleVersions": "0+",
|
"flexibleVersions": "0+",
|
||||||
// Supported errors:
|
// Supported errors:
|
||||||
// - GROUP_AUTHORIZATION_FAILED (version 0+)
|
// - GROUP_AUTHORIZATION_FAILED (version 0+)
|
||||||
// - NOT_COORDINATOR (version 0+)
|
// - NOT_COORDINATOR (version 0+)
|
||||||
// - COORDINATOR_NOT_AVAILABLE (version 0+)
|
// - COORDINATOR_NOT_AVAILABLE (version 0+)
|
||||||
// - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
|
// - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
|
||||||
// - INVALID_REQUEST (version 0+)
|
// - INVALID_REQUEST (version 0+)
|
||||||
// - UNKNOWN_MEMBER_ID (version 0+)
|
// - UNKNOWN_MEMBER_ID (version 0+)
|
||||||
// - GROUP_MAX_SIZE_REACHED (version 0+)
|
// - GROUP_MAX_SIZE_REACHED (version 0+)
|
||||||
|
// - TOPIC_AUTHORIZATION_FAILED (version 0+)
|
||||||
"fields": [
|
"fields": [
|
||||||
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
|
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
|
||||||
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
|
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
|
||||||
|
|
|
@ -34,6 +34,7 @@ import org.apache.kafka.common.TopicPartition;
|
||||||
import org.apache.kafka.common.Uuid;
|
import org.apache.kafka.common.Uuid;
|
||||||
import org.apache.kafka.common.errors.InvalidGroupIdException;
|
import org.apache.kafka.common.errors.InvalidGroupIdException;
|
||||||
import org.apache.kafka.common.errors.TimeoutException;
|
import org.apache.kafka.common.errors.TimeoutException;
|
||||||
|
import org.apache.kafka.common.errors.TopicAuthorizationException;
|
||||||
import org.apache.kafka.common.errors.WakeupException;
|
import org.apache.kafka.common.errors.WakeupException;
|
||||||
import org.apache.kafka.common.metrics.Metrics;
|
import org.apache.kafka.common.metrics.Metrics;
|
||||||
import org.apache.kafka.common.serialization.StringDeserializer;
|
import org.apache.kafka.common.serialization.StringDeserializer;
|
||||||
|
@ -53,12 +54,14 @@ import java.util.Collections;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Properties;
|
import java.util.Properties;
|
||||||
|
import java.util.Set;
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
import java.util.concurrent.LinkedBlockingQueue;
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
import java.util.function.Predicate;
|
||||||
|
|
||||||
import static java.util.Collections.singleton;
|
import static java.util.Collections.singleton;
|
||||||
import static java.util.Collections.singletonList;
|
import static java.util.Collections.singletonList;
|
||||||
|
@ -271,6 +274,26 @@ public class ShareConsumerImplTest {
|
||||||
assertEquals("This consumer has already been closed.", res.getMessage());
|
assertEquals("This consumer has already been closed.", res.getMessage());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testUnsubscribeWithTopicAuthorizationException() {
|
||||||
|
SubscriptionState subscriptions = new SubscriptionState(new LogContext(), AutoOffsetResetStrategy.NONE);
|
||||||
|
consumer = newConsumer(subscriptions);
|
||||||
|
|
||||||
|
backgroundEventQueue.add(new ErrorEvent(new TopicAuthorizationException(Set.of("test-topic"))));
|
||||||
|
completeShareUnsubscribeApplicationEventSuccessfully(subscriptions);
|
||||||
|
assertDoesNotThrow(() -> consumer.unsubscribe());
|
||||||
|
assertDoesNotThrow(() -> consumer.close());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCloseWithTopicAuthorizationException() {
|
||||||
|
SubscriptionState subscriptions = new SubscriptionState(new LogContext(), AutoOffsetResetStrategy.NONE);
|
||||||
|
consumer = newConsumer(subscriptions);
|
||||||
|
|
||||||
|
completeShareUnsubscribeApplicationEventSuccessfully(subscriptions);
|
||||||
|
assertDoesNotThrow(() -> consumer.close());
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testVerifyApplicationEventOnShutdown() {
|
public void testVerifyApplicationEventOnShutdown() {
|
||||||
SubscriptionState subscriptions = new SubscriptionState(new LogContext(), AutoOffsetResetStrategy.NONE);
|
SubscriptionState subscriptions = new SubscriptionState(new LogContext(), AutoOffsetResetStrategy.NONE);
|
||||||
|
@ -502,7 +525,7 @@ public class ShareConsumerImplTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Tests {@link ShareConsumerImpl#processBackgroundEvents(Future, Timer) processBackgroundEvents}
|
* Tests {@link ShareConsumerImpl#processBackgroundEvents(Future, Timer, Predicate) processBackgroundEvents}
|
||||||
* handles the case where the {@link Future} takes a bit of time to complete, but does within the timeout.
|
* handles the case where the {@link Future} takes a bit of time to complete, but does within the timeout.
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
|
@ -529,14 +552,14 @@ public class ShareConsumerImplTest {
|
||||||
return null;
|
return null;
|
||||||
}).when(future).get(any(Long.class), any(TimeUnit.class));
|
}).when(future).get(any(Long.class), any(TimeUnit.class));
|
||||||
|
|
||||||
consumer.processBackgroundEvents(future, timer);
|
consumer.processBackgroundEvents(future, timer, e -> false);
|
||||||
|
|
||||||
// 800 is the 1000 ms timeout (above) minus the 200 ms delay for the two incremental timeouts/retries.
|
// 800 is the 1000 ms timeout (above) minus the 200 ms delay for the two incremental timeouts/retries.
|
||||||
assertEquals(800, timer.remainingMs());
|
assertEquals(800, timer.remainingMs());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Tests {@link ShareConsumerImpl#processBackgroundEvents(Future, Timer) processBackgroundEvents}
|
* Tests {@link ShareConsumerImpl#processBackgroundEvents(Future, Timer, Predicate) processBackgroundEvents}
|
||||||
* handles the case where the {@link Future} is already complete when invoked, so it doesn't have to wait.
|
* handles the case where the {@link Future} is already complete when invoked, so it doesn't have to wait.
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
|
@ -548,7 +571,7 @@ public class ShareConsumerImplTest {
|
||||||
// Create a future that is already completed.
|
// Create a future that is already completed.
|
||||||
CompletableFuture<?> future = CompletableFuture.completedFuture(null);
|
CompletableFuture<?> future = CompletableFuture.completedFuture(null);
|
||||||
|
|
||||||
consumer.processBackgroundEvents(future, timer);
|
consumer.processBackgroundEvents(future, timer, e -> false);
|
||||||
|
|
||||||
// Because we didn't need to perform a timed get, we should still have every last millisecond
|
// Because we didn't need to perform a timed get, we should still have every last millisecond
|
||||||
// of our initial timeout.
|
// of our initial timeout.
|
||||||
|
@ -556,7 +579,7 @@ public class ShareConsumerImplTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Tests {@link ShareConsumerImpl#processBackgroundEvents(Future, Timer) processBackgroundEvents}
|
* Tests {@link ShareConsumerImpl#processBackgroundEvents(Future, Timer, Predicate) processBackgroundEvents}
|
||||||
* handles the case where the {@link Future} does not complete within the timeout.
|
* handles the case where the {@link Future} does not complete within the timeout.
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
|
@ -572,7 +595,7 @@ public class ShareConsumerImplTest {
|
||||||
throw new java.util.concurrent.TimeoutException("Intentional timeout");
|
throw new java.util.concurrent.TimeoutException("Intentional timeout");
|
||||||
}).when(future).get(any(Long.class), any(TimeUnit.class));
|
}).when(future).get(any(Long.class), any(TimeUnit.class));
|
||||||
|
|
||||||
assertThrows(TimeoutException.class, () -> consumer.processBackgroundEvents(future, timer));
|
assertThrows(TimeoutException.class, () -> consumer.processBackgroundEvents(future, timer, e -> false));
|
||||||
|
|
||||||
// Because we forced our mocked future to continuously time out, we should have no time remaining.
|
// Because we forced our mocked future to continuously time out, we should have no time remaining.
|
||||||
assertEquals(0, timer.remainingMs());
|
assertEquals(0, timer.remainingMs());
|
||||||
|
|
Loading…
Reference in New Issue