mirror of https://github.com/apache/kafka.git
KAFKA-17696 New consumer background operations unaware of metadata errors (#17440)
Reviewers: Kirk True <ktrue@confluent.io>, Lianet Magrans <lmagrans@confluent.io>
This commit is contained in:
parent
9ae1b0f017
commit
d76238a18f
|
@ -83,9 +83,7 @@ import org.apache.kafka.common.errors.AuthenticationException;
|
|||
import org.apache.kafka.common.errors.GroupAuthorizationException;
|
||||
import org.apache.kafka.common.errors.InterruptException;
|
||||
import org.apache.kafka.common.errors.InvalidGroupIdException;
|
||||
import org.apache.kafka.common.errors.InvalidTopicException;
|
||||
import org.apache.kafka.common.errors.TimeoutException;
|
||||
import org.apache.kafka.common.errors.TopicAuthorizationException;
|
||||
import org.apache.kafka.common.internals.ClusterResourceListeners;
|
||||
import org.apache.kafka.common.metrics.KafkaMetric;
|
||||
import org.apache.kafka.common.metrics.Metrics;
|
||||
|
@ -353,7 +351,8 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
|
|||
metrics,
|
||||
fetchMetricsManager.throttleTimeSensor(),
|
||||
clientTelemetryReporter.map(ClientTelemetryReporter::telemetrySender).orElse(null),
|
||||
backgroundEventHandler);
|
||||
backgroundEventHandler,
|
||||
false);
|
||||
this.offsetCommitCallbackInvoker = new OffsetCommitCallbackInvoker(interceptors);
|
||||
this.groupMetadata.set(initializeGroupMetadata(config, groupRebalanceConfig));
|
||||
final Supplier<RequestManagers> requestManagersSupplier = RequestManagers.supplier(time,
|
||||
|
@ -524,7 +523,8 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
|
|||
logContext,
|
||||
client,
|
||||
metadata,
|
||||
backgroundEventHandler
|
||||
backgroundEventHandler,
|
||||
false
|
||||
);
|
||||
this.offsetCommitCallbackInvoker = new OffsetCommitCallbackInvoker(interceptors);
|
||||
Supplier<RequestManagers> requestManagersSupplier = RequestManagers.supplier(
|
||||
|
@ -1574,12 +1574,9 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
|
|||
subscriptions.assignedPartitions());
|
||||
|
||||
try {
|
||||
// If users subscribe to a topic with invalid name or without permission, they will get some exceptions.
|
||||
// Because network thread keeps trying to send MetadataRequest or ConsumerGroupHeartbeatRequest in the background,
|
||||
// there will be some error events in the background queue.
|
||||
// If users have fatal error, they will get some exceptions in the background queue.
|
||||
// When running unsubscribe, these exceptions should be ignored, or users can't unsubscribe successfully.
|
||||
processBackgroundEvents(unsubscribeEvent.future(), timer,
|
||||
e -> e instanceof InvalidTopicException || e instanceof TopicAuthorizationException || e instanceof GroupAuthorizationException);
|
||||
processBackgroundEvents(unsubscribeEvent.future(), timer, e -> e instanceof GroupAuthorizationException);
|
||||
log.info("Unsubscribed all topics or patterns and assigned partitions");
|
||||
} catch (TimeoutException e) {
|
||||
log.error("Failed while waiting for the unsubscribe event to complete");
|
||||
|
|
|
@ -20,6 +20,7 @@ import org.apache.kafka.clients.KafkaClient;
|
|||
import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
|
||||
import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.CompletableApplicationEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.CompletableEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper;
|
||||
import org.apache.kafka.common.internals.IdempotentCloser;
|
||||
|
@ -40,6 +41,7 @@ import java.util.Objects;
|
|||
import java.util.Optional;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.DEFAULT_CLOSE_TIMEOUT_MS;
|
||||
import static org.apache.kafka.common.utils.Utils.closeQuietly;
|
||||
|
@ -154,6 +156,8 @@ public class ConsumerNetworkThread extends KafkaThread implements Closeable {
|
|||
.reduce(Long.MAX_VALUE, Math::min);
|
||||
|
||||
reapExpiredApplicationEvents(currentTimeMs);
|
||||
List<CompletableEvent<?>> uncompletedEvents = applicationEventReaper.uncompletedEvents();
|
||||
maybeFailOnMetadataError(uncompletedEvents);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -165,9 +169,13 @@ public class ConsumerNetworkThread extends KafkaThread implements Closeable {
|
|||
|
||||
for (ApplicationEvent event : events) {
|
||||
try {
|
||||
if (event instanceof CompletableEvent)
|
||||
if (event instanceof CompletableEvent) {
|
||||
applicationEventReaper.add((CompletableEvent<?>) event);
|
||||
|
||||
// Check if there are any metadata errors and fail the CompletableEvent if an error is present.
|
||||
// This call is meant to handle "immediately completed events" which may not enter the awaiting state,
|
||||
// so metadata errors need to be checked and handled right away.
|
||||
maybeFailOnMetadataError(List.of((CompletableEvent<?>) event));
|
||||
}
|
||||
applicationEventProcessor.process(event);
|
||||
} catch (Throwable t) {
|
||||
log.warn("Error processing event {}", t.getMessage(), t);
|
||||
|
@ -325,4 +333,21 @@ public class ConsumerNetworkThread extends KafkaThread implements Closeable {
|
|||
log.debug("Closed the consumer network thread");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* If there is a metadata error, complete all uncompleted events that require subscription metadata.
|
||||
*/
|
||||
private void maybeFailOnMetadataError(List<CompletableEvent<?>> events) {
|
||||
List<? extends CompletableApplicationEvent<?>> subscriptionMetadataEvent = events.stream()
|
||||
.filter(e -> e instanceof CompletableApplicationEvent<?>)
|
||||
.map(e -> (CompletableApplicationEvent<?>) e)
|
||||
.filter(CompletableApplicationEvent::requireSubscriptionMetadata)
|
||||
.collect(Collectors.toList());
|
||||
|
||||
if (subscriptionMetadataEvent.isEmpty())
|
||||
return;
|
||||
networkClientDelegate.getAndClearMetadataError().ifPresent(metadataError ->
|
||||
subscriptionMetadataEvent.forEach(event -> event.future().completeExceptionally(metadataError))
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -69,6 +69,8 @@ public class NetworkClientDelegate implements AutoCloseable {
|
|||
private final int requestTimeoutMs;
|
||||
private final Queue<UnsentRequest> unsentRequests;
|
||||
private final long retryBackoffMs;
|
||||
private Optional<Exception> metadataError;
|
||||
private final boolean notifyMetadataErrorsViaErrorQueue;
|
||||
|
||||
public NetworkClientDelegate(
|
||||
final Time time,
|
||||
|
@ -76,7 +78,8 @@ public class NetworkClientDelegate implements AutoCloseable {
|
|||
final LogContext logContext,
|
||||
final KafkaClient client,
|
||||
final Metadata metadata,
|
||||
final BackgroundEventHandler backgroundEventHandler) {
|
||||
final BackgroundEventHandler backgroundEventHandler,
|
||||
final boolean notifyMetadataErrorsViaErrorQueue) {
|
||||
this.time = time;
|
||||
this.client = client;
|
||||
this.metadata = metadata;
|
||||
|
@ -85,6 +88,8 @@ public class NetworkClientDelegate implements AutoCloseable {
|
|||
this.unsentRequests = new ArrayDeque<>();
|
||||
this.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
|
||||
this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
|
||||
this.metadataError = Optional.empty();
|
||||
this.notifyMetadataErrorsViaErrorQueue = notifyMetadataErrorsViaErrorQueue;
|
||||
}
|
||||
|
||||
// Visible for testing
|
||||
|
@ -150,7 +155,11 @@ public class NetworkClientDelegate implements AutoCloseable {
|
|||
try {
|
||||
metadata.maybeThrowAnyException();
|
||||
} catch (Exception e) {
|
||||
if (notifyMetadataErrorsViaErrorQueue) {
|
||||
backgroundEventHandler.add(new ErrorEvent(e));
|
||||
} else {
|
||||
metadataError = Optional.of(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -231,6 +240,12 @@ public class NetworkClientDelegate implements AutoCloseable {
|
|||
);
|
||||
}
|
||||
|
||||
public Optional<Exception> getAndClearMetadataError() {
|
||||
Optional<Exception> metadataError = this.metadataError;
|
||||
this.metadataError = Optional.empty();
|
||||
return metadataError;
|
||||
}
|
||||
|
||||
public Node leastLoadedNode() {
|
||||
return this.client.leastLoadedNode(time.milliseconds()).node();
|
||||
}
|
||||
|
@ -412,7 +427,8 @@ public class NetworkClientDelegate implements AutoCloseable {
|
|||
final Metrics metrics,
|
||||
final Sensor throttleTimeSensor,
|
||||
final ClientTelemetrySender clientTelemetrySender,
|
||||
final BackgroundEventHandler backgroundEventHandler) {
|
||||
final BackgroundEventHandler backgroundEventHandler,
|
||||
final boolean notifyMetadataErrorsViaErrorQueue) {
|
||||
return new CachedSupplier<>() {
|
||||
@Override
|
||||
protected NetworkClientDelegate create() {
|
||||
|
@ -426,7 +442,7 @@ public class NetworkClientDelegate implements AutoCloseable {
|
|||
metadata,
|
||||
throttleTimeSensor,
|
||||
clientTelemetrySender);
|
||||
return new NetworkClientDelegate(time, config, logContext, client, metadata, backgroundEventHandler);
|
||||
return new NetworkClientDelegate(time, config, logContext, client, metadata, backgroundEventHandler, notifyMetadataErrorsViaErrorQueue);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
|
|
@ -279,7 +279,8 @@ public class ShareConsumerImpl<K, V> implements ShareConsumerDelegate<K, V> {
|
|||
metrics,
|
||||
shareFetchMetricsManager.throttleTimeSensor(),
|
||||
clientTelemetryReporter.map(ClientTelemetryReporter::telemetrySender).orElse(null),
|
||||
backgroundEventHandler
|
||||
backgroundEventHandler,
|
||||
true
|
||||
);
|
||||
this.completedAcknowledgements = new LinkedList<>();
|
||||
|
||||
|
@ -378,7 +379,7 @@ public class ShareConsumerImpl<K, V> implements ShareConsumerDelegate<K, V> {
|
|||
final BackgroundEventHandler backgroundEventHandler = new BackgroundEventHandler(backgroundEventQueue);
|
||||
|
||||
final Supplier<NetworkClientDelegate> networkClientDelegateSupplier =
|
||||
() -> new NetworkClientDelegate(time, config, logContext, client, metadata, backgroundEventHandler);
|
||||
() -> new NetworkClientDelegate(time, config, logContext, client, metadata, backgroundEventHandler, true);
|
||||
|
||||
GroupRebalanceConfig groupRebalanceConfig = new GroupRebalanceConfig(
|
||||
config,
|
||||
|
|
|
@ -18,6 +18,9 @@
|
|||
package org.apache.kafka.clients.consumer.internals.events;
|
||||
|
||||
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
|
||||
import java.time.Duration;
|
||||
|
||||
/**
|
||||
* Event to check if all assigned partitions have fetch positions. If there are positions missing, it will fetch
|
||||
|
@ -32,4 +35,15 @@ public class CheckAndUpdatePositionsEvent extends CompletableApplicationEvent<Bo
|
|||
public CheckAndUpdatePositionsEvent(long deadlineMs) {
|
||||
super(Type.CHECK_AND_UPDATE_POSITIONS, deadlineMs);
|
||||
}
|
||||
|
||||
/**
|
||||
* Indicates that this event requires subscription metadata to be present
|
||||
* for its execution. This is used to ensure that metadata errors are
|
||||
* handled correctly during the {@link org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer#poll(Duration) poll}
|
||||
* or {@link org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer#position(TopicPartition) position} process.
|
||||
*/
|
||||
@Override
|
||||
public boolean requireSubscriptionMetadata() {
|
||||
return true;
|
||||
}
|
||||
}
|
|
@ -52,4 +52,8 @@ public abstract class CompletableApplicationEvent<T> extends ApplicationEvent im
|
|||
protected String toStringBase() {
|
||||
return super.toStringBase() + ", future=" + future + ", deadlineMs=" + deadlineMs;
|
||||
}
|
||||
|
||||
public boolean requireSubscriptionMetadata() {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -29,6 +29,7 @@ import java.util.List;
|
|||
import java.util.Objects;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* {@code CompletableEventReaper} is responsible for tracking {@link CompletableEvent time-bound events} and removing
|
||||
|
@ -155,4 +156,11 @@ public class CompletableEventReaper {
|
|||
public boolean contains(CompletableEvent<?> event) {
|
||||
return event != null && tracked.contains(event);
|
||||
}
|
||||
|
||||
public List<CompletableEvent<?>> uncompletedEvents() {
|
||||
return tracked.stream()
|
||||
.filter(e -> !e.future().isDone())
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -64,6 +64,11 @@ public class ListOffsetsEvent extends CompletableApplicationEvent<Map<TopicParti
|
|||
return requireTimestamps;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean requireSubscriptionMetadata() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toStringBase() {
|
||||
return super.toStringBase() +
|
||||
|
|
|
@ -66,10 +66,8 @@ import org.apache.kafka.common.config.ConfigException;
|
|||
import org.apache.kafka.common.errors.GroupAuthorizationException;
|
||||
import org.apache.kafka.common.errors.InterruptException;
|
||||
import org.apache.kafka.common.errors.InvalidGroupIdException;
|
||||
import org.apache.kafka.common.errors.InvalidTopicException;
|
||||
import org.apache.kafka.common.errors.RetriableException;
|
||||
import org.apache.kafka.common.errors.TimeoutException;
|
||||
import org.apache.kafka.common.errors.TopicAuthorizationException;
|
||||
import org.apache.kafka.common.errors.UnsupportedVersionException;
|
||||
import org.apache.kafka.common.errors.WakeupException;
|
||||
import org.apache.kafka.common.internals.ClusterResourceListeners;
|
||||
|
@ -282,40 +280,6 @@ public class AsyncKafkaConsumerTest {
|
|||
assertEquals("This consumer has already been closed.", res.getMessage());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUnsubscribeWithInvalidTopicException() {
|
||||
consumer = newConsumer();
|
||||
backgroundEventQueue.add(new ErrorEvent(new InvalidTopicException("Invalid topic name")));
|
||||
completeUnsubscribeApplicationEventSuccessfully();
|
||||
assertDoesNotThrow(() -> consumer.unsubscribe());
|
||||
assertDoesNotThrow(() -> consumer.close());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCloseWithInvalidTopicException() {
|
||||
consumer = newConsumer();
|
||||
backgroundEventQueue.add(new ErrorEvent(new InvalidTopicException("Invalid topic name")));
|
||||
completeUnsubscribeApplicationEventSuccessfully();
|
||||
assertDoesNotThrow(() -> consumer.close());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUnsubscribeWithTopicAuthorizationException() {
|
||||
consumer = newConsumer();
|
||||
backgroundEventQueue.add(new ErrorEvent(new TopicAuthorizationException(Set.of("test-topic"))));
|
||||
completeUnsubscribeApplicationEventSuccessfully();
|
||||
assertDoesNotThrow(() -> consumer.unsubscribe());
|
||||
assertDoesNotThrow(() -> consumer.close());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCloseWithTopicAuthorizationException() {
|
||||
consumer = newConsumer();
|
||||
backgroundEventQueue.add(new ErrorEvent(new TopicAuthorizationException(Set.of("test-topic"))));
|
||||
completeUnsubscribeApplicationEventSuccessfully();
|
||||
assertDoesNotThrow(() -> consumer.close());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCommitAsyncWithNullCallback() {
|
||||
consumer = newConsumer();
|
||||
|
|
|
@ -3700,7 +3700,7 @@ public class FetchRequestManagerTest {
|
|||
properties.setProperty(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, String.valueOf(requestTimeoutMs));
|
||||
properties.setProperty(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG, String.valueOf(retryBackoffMs));
|
||||
ConsumerConfig config = new ConsumerConfig(properties);
|
||||
networkClientDelegate = spy(new TestableNetworkClientDelegate(time, config, logContext, client, metadata, backgroundEventHandler));
|
||||
networkClientDelegate = spy(new TestableNetworkClientDelegate(time, config, logContext, client, metadata, backgroundEventHandler, true));
|
||||
}
|
||||
|
||||
private <T> List<Long> collectRecordOffsets(List<ConsumerRecord<T, T>> records) {
|
||||
|
@ -3777,8 +3777,9 @@ public class FetchRequestManagerTest {
|
|||
LogContext logContext,
|
||||
KafkaClient client,
|
||||
Metadata metadata,
|
||||
BackgroundEventHandler backgroundEventHandler) {
|
||||
super(time, config, logContext, client, metadata, backgroundEventHandler);
|
||||
BackgroundEventHandler backgroundEventHandler,
|
||||
boolean notifyMetadataErrorsViaErrorQueue) {
|
||||
super(time, config, logContext, client, metadata, backgroundEventHandler, notifyMetadataErrorsViaErrorQueue);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -52,6 +52,7 @@ import static org.apache.kafka.clients.consumer.ConsumerConfig.REQUEST_TIMEOUT_M
|
|||
import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
@ -78,7 +79,7 @@ public class NetworkClientDelegateTest {
|
|||
|
||||
@Test
|
||||
void testPollResultTimer() throws Exception {
|
||||
try (NetworkClientDelegate ncd = newNetworkClientDelegate()) {
|
||||
try (NetworkClientDelegate ncd = newNetworkClientDelegate(false)) {
|
||||
NetworkClientDelegate.UnsentRequest req = new NetworkClientDelegate.UnsentRequest(
|
||||
new FindCoordinatorRequest.Builder(
|
||||
new FindCoordinatorRequestData()
|
||||
|
@ -102,7 +103,7 @@ public class NetworkClientDelegateTest {
|
|||
|
||||
@Test
|
||||
public void testSuccessfulResponse() throws Exception {
|
||||
try (NetworkClientDelegate ncd = newNetworkClientDelegate()) {
|
||||
try (NetworkClientDelegate ncd = newNetworkClientDelegate(false)) {
|
||||
NetworkClientDelegate.UnsentRequest unsentRequest = newUnsentFindCoordinatorRequest();
|
||||
prepareFindCoordinatorResponse(Errors.NONE);
|
||||
|
||||
|
@ -116,7 +117,7 @@ public class NetworkClientDelegateTest {
|
|||
|
||||
@Test
|
||||
public void testTimeoutBeforeSend() throws Exception {
|
||||
try (NetworkClientDelegate ncd = newNetworkClientDelegate()) {
|
||||
try (NetworkClientDelegate ncd = newNetworkClientDelegate(false)) {
|
||||
client.setUnreachable(mockNode(), REQUEST_TIMEOUT_MS);
|
||||
NetworkClientDelegate.UnsentRequest unsentRequest = newUnsentFindCoordinatorRequest();
|
||||
ncd.add(unsentRequest);
|
||||
|
@ -130,7 +131,7 @@ public class NetworkClientDelegateTest {
|
|||
|
||||
@Test
|
||||
public void testTimeoutAfterSend() throws Exception {
|
||||
try (NetworkClientDelegate ncd = newNetworkClientDelegate()) {
|
||||
try (NetworkClientDelegate ncd = newNetworkClientDelegate(false)) {
|
||||
NetworkClientDelegate.UnsentRequest unsentRequest = newUnsentFindCoordinatorRequest();
|
||||
ncd.add(unsentRequest);
|
||||
ncd.poll(0, time.milliseconds());
|
||||
|
@ -164,7 +165,7 @@ public class NetworkClientDelegateTest {
|
|||
|
||||
@Test
|
||||
public void testEnsureTimerSetOnAdd() {
|
||||
NetworkClientDelegate ncd = newNetworkClientDelegate();
|
||||
NetworkClientDelegate ncd = newNetworkClientDelegate(false);
|
||||
NetworkClientDelegate.UnsentRequest findCoordRequest = newUnsentFindCoordinatorRequest();
|
||||
assertNull(findCoordRequest.timer());
|
||||
|
||||
|
@ -181,7 +182,7 @@ public class NetworkClientDelegateTest {
|
|||
|
||||
@Test
|
||||
public void testHasAnyPendingRequests() throws Exception {
|
||||
try (NetworkClientDelegate networkClientDelegate = newNetworkClientDelegate()) {
|
||||
try (NetworkClientDelegate networkClientDelegate = newNetworkClientDelegate(false)) {
|
||||
NetworkClientDelegate.UnsentRequest unsentRequest = newUnsentFindCoordinatorRequest();
|
||||
networkClientDelegate.add(unsentRequest);
|
||||
|
||||
|
@ -212,9 +213,24 @@ public class NetworkClientDelegateTest {
|
|||
AuthenticationException authException = new AuthenticationException("Test Auth Exception");
|
||||
doThrow(authException).when(metadata).maybeThrowAnyException();
|
||||
|
||||
NetworkClientDelegate networkClientDelegate = newNetworkClientDelegate(false);
|
||||
assertTrue(networkClientDelegate.getAndClearMetadataError().isEmpty());
|
||||
networkClientDelegate.poll(0, time.milliseconds());
|
||||
|
||||
Optional<Exception> metadataError = networkClientDelegate.getAndClearMetadataError();
|
||||
assertTrue(metadataError.isPresent());
|
||||
assertInstanceOf(AuthenticationException.class, metadataError.get());
|
||||
assertEquals(authException.getMessage(), metadataError.get().getMessage());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPropagateMetadataErrorWithErrorEvent() {
|
||||
AuthenticationException authException = new AuthenticationException("Test Auth Exception");
|
||||
doThrow(authException).when(metadata).maybeThrowAnyException();
|
||||
|
||||
LinkedList<BackgroundEvent> backgroundEventQueue = new LinkedList<>();
|
||||
this.backgroundEventHandler = new BackgroundEventHandler(backgroundEventQueue);
|
||||
NetworkClientDelegate networkClientDelegate = newNetworkClientDelegate();
|
||||
NetworkClientDelegate networkClientDelegate = newNetworkClientDelegate(true);
|
||||
|
||||
assertEquals(0, backgroundEventQueue.size());
|
||||
networkClientDelegate.poll(0, time.milliseconds());
|
||||
|
@ -226,7 +242,7 @@ public class NetworkClientDelegateTest {
|
|||
assertEquals(authException, ((ErrorEvent) event).error());
|
||||
}
|
||||
|
||||
public NetworkClientDelegate newNetworkClientDelegate() {
|
||||
public NetworkClientDelegate newNetworkClientDelegate(boolean notifyMetadataErrorsViaErrorQueue) {
|
||||
LogContext logContext = new LogContext();
|
||||
Properties properties = new Properties();
|
||||
properties.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
|
||||
|
@ -238,7 +254,8 @@ public class NetworkClientDelegateTest {
|
|||
logContext,
|
||||
this.client,
|
||||
this.metadata,
|
||||
this.backgroundEventHandler);
|
||||
this.backgroundEventHandler,
|
||||
notifyMetadataErrorsViaErrorQueue);
|
||||
}
|
||||
|
||||
public NetworkClientDelegate.UnsentRequest newUnsentFindCoordinatorRequest() {
|
||||
|
|
|
@ -1663,7 +1663,8 @@ public class ShareConsumeRequestManagerTest {
|
|||
properties.setProperty(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, String.valueOf(requestTimeoutMs));
|
||||
properties.setProperty(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG, String.valueOf(retryBackoffMs));
|
||||
ConsumerConfig config = new ConsumerConfig(properties);
|
||||
networkClientDelegate = spy(new TestableNetworkClientDelegate(time, config, logContext, client, metadata, new BackgroundEventHandler(new LinkedBlockingQueue<>())));
|
||||
networkClientDelegate = spy(new TestableNetworkClientDelegate(time, config, logContext, client, metadata,
|
||||
new BackgroundEventHandler(new LinkedBlockingQueue<>()), false));
|
||||
}
|
||||
|
||||
private class TestableShareConsumeRequestManager<K, V> extends ShareConsumeRequestManager {
|
||||
|
@ -1715,8 +1716,9 @@ public class ShareConsumeRequestManagerTest {
|
|||
LogContext logContext,
|
||||
KafkaClient client,
|
||||
Metadata metadata,
|
||||
BackgroundEventHandler backgroundEventHandler) {
|
||||
super(time, config, logContext, client, metadata, backgroundEventHandler);
|
||||
BackgroundEventHandler backgroundEventHandler,
|
||||
boolean notifyMetadataErrorsViaErrorQueue) {
|
||||
super(time, config, logContext, client, metadata, backgroundEventHandler, notifyMetadataErrorsViaErrorQueue);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -1273,7 +1273,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
|
|||
}
|
||||
|
||||
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
|
||||
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_17696"))
|
||||
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_18034"))
|
||||
def testCommitWithNoAccess(quorum: String, groupProtocol: String): Unit = {
|
||||
val consumer = createConsumer()
|
||||
assertThrows(classOf[GroupAuthorizationException], () => consumer.commitSync(Map(tp -> new OffsetAndMetadata(5)).asJava))
|
||||
|
@ -1310,7 +1310,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
|
|||
}
|
||||
|
||||
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
|
||||
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_17696"))
|
||||
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_18034"))
|
||||
def testCommitWithNoGroupAccess(quorum: String, groupProtocol: String): Unit = {
|
||||
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, READ, ALLOW)), topicResource)
|
||||
val consumer = createConsumer()
|
||||
|
@ -1328,7 +1328,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
|
|||
}
|
||||
|
||||
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
|
||||
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_17696"))
|
||||
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
|
||||
def testOffsetFetchWithNoAccess(quorum: String, groupProtocol: String): Unit = {
|
||||
val consumer = createConsumer()
|
||||
consumer.assign(List(tp).asJava)
|
||||
|
@ -1336,7 +1336,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
|
|||
}
|
||||
|
||||
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
|
||||
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_17696"))
|
||||
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_18034"))
|
||||
def testOffsetFetchWithNoGroupAccess(quorum: String, groupProtocol: String): Unit = {
|
||||
createTopicWithBrokerPrincipal(topic)
|
||||
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, READ, ALLOW)), topicResource)
|
||||
|
@ -1581,7 +1581,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
|
|||
}
|
||||
|
||||
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
|
||||
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_17696"))
|
||||
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
|
||||
def testListOffsetsWithNoTopicAccess(quorum: String, groupProtocol: String): Unit = {
|
||||
val consumer = createConsumer()
|
||||
assertThrows(classOf[TopicAuthorizationException], () => consumer.endOffsets(Set(tp).asJava))
|
||||
|
|
|
@ -171,7 +171,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
|
|||
* Tests the ability of producing and consuming with the appropriate ACLs set.
|
||||
*/
|
||||
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
|
||||
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_17696"))
|
||||
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
|
||||
def testProduceConsumeViaAssign(quorum: String, groupProtocol: String): Unit = {
|
||||
setAclsAndProduce(tp)
|
||||
val consumer = createConsumer()
|
||||
|
@ -200,7 +200,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
|
|||
}
|
||||
|
||||
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
|
||||
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_17696"))
|
||||
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
|
||||
def testProduceConsumeViaSubscribe(quorum: String, groupProtocol: String): Unit = {
|
||||
setAclsAndProduce(tp)
|
||||
val consumer = createConsumer()
|
||||
|
@ -210,7 +210,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
|
|||
}
|
||||
|
||||
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
|
||||
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_17696"))
|
||||
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
|
||||
def testProduceConsumeWithWildcardAcls(quorum: String, groupProtocol: String): Unit = {
|
||||
setWildcardResourceAcls()
|
||||
val producer = createProducer()
|
||||
|
@ -222,7 +222,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
|
|||
}
|
||||
|
||||
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
|
||||
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_17696"))
|
||||
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
|
||||
def testProduceConsumeWithPrefixedAcls(quorum: String, groupProtocol: String): Unit = {
|
||||
setPrefixedResourceAcls()
|
||||
val producer = createProducer()
|
||||
|
@ -234,7 +234,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
|
|||
}
|
||||
|
||||
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
|
||||
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_17696"))
|
||||
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
|
||||
def testProduceConsumeTopicAutoCreateTopicCreateAcl(quorum: String, groupProtocol: String): Unit = {
|
||||
// topic2 is not created on setup()
|
||||
val tp2 = new TopicPartition("topic2", 0)
|
||||
|
@ -404,7 +404,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
|
|||
* ACL set.
|
||||
*/
|
||||
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
|
||||
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_17696"))
|
||||
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
|
||||
def testNoConsumeWithoutDescribeAclViaAssign(quorum: String, groupProtocol: String): Unit = {
|
||||
noConsumeWithoutDescribeAclSetup()
|
||||
val consumer = createConsumer()
|
||||
|
@ -415,7 +415,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
|
|||
}
|
||||
|
||||
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
|
||||
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_17696"))
|
||||
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
|
||||
def testNoConsumeWithoutDescribeAclViaSubscribe(quorum: String, groupProtocol: String): Unit = {
|
||||
noConsumeWithoutDescribeAclSetup()
|
||||
val consumer = createConsumer()
|
||||
|
@ -456,7 +456,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
|
|||
}
|
||||
|
||||
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
|
||||
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_17696"))
|
||||
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
|
||||
def testNoConsumeWithDescribeAclViaAssign(quorum: String, groupProtocol: String): Unit = {
|
||||
noConsumeWithDescribeAclSetup()
|
||||
val consumer = createConsumer()
|
||||
|
@ -468,7 +468,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
|
|||
}
|
||||
|
||||
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
|
||||
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_17696"))
|
||||
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
|
||||
def testNoConsumeWithDescribeAclViaSubscribe(quorum: String, groupProtocol: String): Unit = {
|
||||
noConsumeWithDescribeAclSetup()
|
||||
val consumer = createConsumer()
|
||||
|
@ -497,7 +497,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
|
|||
* ACL set.
|
||||
*/
|
||||
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
|
||||
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_17696"))
|
||||
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
|
||||
def testNoGroupAcl(quorum: String, groupProtocol: String): Unit = {
|
||||
val superuserAdminClient = createSuperuserAdminClient()
|
||||
superuserAdminClient.createAcls(List(AclTopicWrite(), AclTopicCreate(), AclTopicDescribe()).asJava).values
|
||||
|
|
|
@ -59,7 +59,7 @@ abstract class SaslEndToEndAuthorizationTest extends EndToEndAuthorizationTest {
|
|||
*/
|
||||
@Timeout(15)
|
||||
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
|
||||
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_17696"))
|
||||
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
|
||||
def testTwoConsumersWithDifferentSaslCredentials(quorum: String, groupProtocol: String): Unit = {
|
||||
setAclsAndProduce(tp)
|
||||
consumerConfig.putIfAbsent(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol)
|
||||
|
|
|
@ -576,8 +576,8 @@ object QuorumTestHarness {
|
|||
// The following parameter groups are to *temporarily* avoid bugs with the CONSUMER group protocol Consumer
|
||||
// implementation that would otherwise cause tests to fail.
|
||||
def getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_16176: stream.Stream[Arguments] = getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly
|
||||
def getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_17696: stream.Stream[Arguments] = getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly
|
||||
def getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_17960: stream.Stream[Arguments] = getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly
|
||||
def getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_17961: stream.Stream[Arguments] = getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly
|
||||
def getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_17964: stream.Stream[Arguments] = getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly
|
||||
def getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_18034: stream.Stream[Arguments] = getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue