KAFKA-7320; Add consumer configuration to disable auto topic creation [KIP-361] (#5542)

Implements KIP-361 to provide a consumer configuration to specify whether subscribing or assigning a non-existent topic would result in it being automatically created or not.

Reviewers: Jason Gustafson <jason@confluent.io>
This commit is contained in:
Dhruvil Shah 2019-05-08 09:31:05 -07:00 committed by Jason Gustafson
parent 83823aedf2
commit e6cff21fd8
18 changed files with 228 additions and 66 deletions

View File

@ -16,8 +16,8 @@
*/
package org.apache.kafka.clients;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.requests.RequestHeader;
import org.slf4j.Logger;
@ -74,10 +74,10 @@ public class ManualMetadataUpdater implements MetadataUpdater {
}
@Override
public void handleAuthenticationFailure(AuthenticationException exception) {
// We don't fail the broker on authentication failures, but there is sufficient information in the broker logs
// to identify the failure.
log.debug("An authentication error occurred in broker-to-broker communication.", exception);
public void handleFatalException(KafkaException exception) {
// We don't fail the broker on failures, but there should be sufficient information in the logs indicating the reason
// for failure.
log.debug("An error occurred in broker-to-broker communication.", exception);
}
@Override

View File

@ -21,7 +21,6 @@ import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.InvalidMetadataException;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
@ -66,8 +65,8 @@ public class Metadata implements Closeable {
private int requestVersion; // bumped on every new topic addition
private long lastRefreshMs;
private long lastSuccessfulRefreshMs;
private AuthenticationException authenticationException;
private KafkaException metadataException;
private KafkaException fatalException;
private KafkaException recoverableException;
private MetadataCache cache = MetadataCache.empty();
private boolean needUpdate;
private final ClusterResourceListeners clusterResourceListeners;
@ -202,25 +201,13 @@ public class Metadata implements Closeable {
}
/**
* If any non-retriable authentication exceptions were encountered during
* metadata update, clear and return the exception.
* If any non-retriable exceptions were encountered during metadata update, clear and return the exception.
*/
public synchronized AuthenticationException getAndClearAuthenticationException() {
if (authenticationException != null) {
AuthenticationException exception = authenticationException;
authenticationException = null;
return exception;
} else
return null;
}
synchronized KafkaException getAndClearMetadataException() {
if (this.metadataException != null) {
KafkaException metadataException = this.metadataException;
this.metadataException = null;
return metadataException;
} else
return null;
public synchronized KafkaException getAndClearMetadataException() {
KafkaException metadataException = Optional.ofNullable(fatalException).orElse(recoverableException);
fatalException = null;
recoverableException = null;
return metadataException;
}
public synchronized void bootstrap(List<InetSocketAddress> addresses, long now) {
@ -281,7 +268,7 @@ public class Metadata implements Closeable {
private void maybeSetMetadataError(Cluster cluster) {
// if we encounter any invalid topics, cache the exception to later throw to the user
metadataException = null;
recoverableException = null;
checkInvalidTopics(cluster);
checkUnauthorizedTopics(cluster);
}
@ -289,14 +276,16 @@ public class Metadata implements Closeable {
private void checkInvalidTopics(Cluster cluster) {
if (!cluster.invalidTopics().isEmpty()) {
log.error("Metadata response reported invalid topics {}", cluster.invalidTopics());
metadataException = new InvalidTopicException(cluster.invalidTopics());
// We may be able to recover from this exception if metadata for this topic is no longer needed
recoverableException = new InvalidTopicException(cluster.invalidTopics());
}
}
private void checkUnauthorizedTopics(Cluster cluster) {
if (!cluster.unauthorizedTopics().isEmpty()) {
log.error("Topic authorization failed for topics {}", cluster.unauthorizedTopics());
metadataException = new TopicAuthorizationException(new HashSet<>(cluster.unauthorizedTopics()));
// We may be able to recover from this exception if metadata for this topic is no longer needed
recoverableException = new TopicAuthorizationException(new HashSet<>(cluster.unauthorizedTopics()));
}
}
@ -368,10 +357,6 @@ public class Metadata implements Closeable {
}
public synchronized void maybeThrowException() {
AuthenticationException authenticationException = getAndClearAuthenticationException();
if (authenticationException != null)
throw authenticationException;
KafkaException metadataException = getAndClearMetadataException();
if (metadataException != null)
throw metadataException;
@ -381,9 +366,9 @@ public class Metadata implements Closeable {
* Record an attempt to update the metadata that failed. We need to keep track of this
* to avoid retrying immediately.
*/
public synchronized void failedUpdate(long now, AuthenticationException authenticationException) {
public synchronized void failedUpdate(long now, KafkaException fatalException) {
this.lastRefreshMs = now;
this.authenticationException = authenticationException;
this.fatalException = fatalException;
}
/**

View File

@ -16,8 +16,8 @@
*/
package org.apache.kafka.clients;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.requests.RequestHeader;
@ -64,11 +64,11 @@ public interface MetadataUpdater extends Closeable {
void handleDisconnection(String destination);
/**
* Handle authentication failure. Propagate the authentication exception if awaiting metadata.
* Handle failure. Propagate the exception if awaiting metadata.
*
* @param exception authentication exception from broker
* @param fatalException exception corresponding to the failure
*/
void handleAuthenticationFailure(AuthenticationException exception);
void handleFatalException(KafkaException fatalException);
/**
* Handle responses for metadata requests.

View File

@ -17,6 +17,7 @@
package org.apache.kafka.clients;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.AuthenticationException;
@ -433,8 +434,8 @@ public class NetworkClient implements KafkaClient {
doSend(request, false, now);
}
private void sendInternalMetadataRequest(MetadataRequest.Builder builder,
String nodeConnectionId, long now) {
// package-private for testing
void sendInternalMetadataRequest(MetadataRequest.Builder builder, String nodeConnectionId, long now) {
ClientRequest clientRequest = newClientRequest(nodeConnectionId, builder, now, true);
doSend(clientRequest, true, now);
}
@ -480,6 +481,9 @@ public class NetworkClient implements KafkaClient {
clientRequest.callback(), clientRequest.destination(), now, now,
false, unsupportedVersionException, null, null);
abortedSends.add(clientResponse);
if (isInternalRequest && clientRequest.apiKey() == ApiKeys.METADATA)
metadataUpdater.handleFatalException(unsupportedVersionException);
}
}
@ -715,7 +719,7 @@ public class NetworkClient implements KafkaClient {
case AUTHENTICATION_FAILED:
AuthenticationException exception = disconnectState.exception();
connectionStates.authenticationFailed(nodeId, now, exception);
metadataUpdater.handleAuthenticationFailure(exception);
metadataUpdater.handleFatalException(exception);
log.error("Connection to node {} ({}) failed authentication due to: {}", nodeId,
disconnectState.remoteAddress(), exception.getMessage());
break;
@ -1005,9 +1009,9 @@ public class NetworkClient implements KafkaClient {
}
@Override
public void handleAuthenticationFailure(AuthenticationException exception) {
public void handleFatalException(KafkaException fatalException) {
if (metadata.updateRequested())
metadata.failedUpdate(time.milliseconds(), exception);
metadata.failedUpdate(time.milliseconds(), fatalException);
inProgressRequestVersion = null;
}

View File

@ -19,6 +19,7 @@ package org.apache.kafka.clients.admin.internals;
import org.apache.kafka.clients.MetadataUpdater;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.requests.MetadataResponse;
@ -104,7 +105,7 @@ public class AdminMetadataManager {
}
@Override
public void handleAuthenticationFailure(AuthenticationException e) {
public void handleFatalException(KafkaException e) {
updateFailed(e);
}

View File

@ -261,6 +261,14 @@ public class ConsumerConfig extends AbstractConfig {
public static final String DEFAULT_ISOLATION_LEVEL = IsolationLevel.READ_UNCOMMITTED.toString().toLowerCase(Locale.ROOT);
/** <code>allow.auto.create.topics</code> */
public static final String ALLOW_AUTO_CREATE_TOPICS_CONFIG = "allow.auto.create.topics";
private static final String ALLOW_AUTO_CREATE_TOPICS_DOC = "Allow automatic topic creation on the broker when" +
" subscribing to or assigning a topic. A topic being subscribed to will be automatically created only if the" +
" broker allows for it using `auto.create.topics.enable` broker configuration. This configuration must" +
" be set to `false` when using brokers older than 0.11.0";
public static final boolean DEFAULT_ALLOW_AUTO_CREATE_TOPICS = true;
static {
CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG,
Type.LIST,
@ -464,6 +472,11 @@ public class ConsumerConfig extends AbstractConfig {
in(IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT), IsolationLevel.READ_UNCOMMITTED.toString().toLowerCase(Locale.ROOT)),
Importance.MEDIUM,
ISOLATION_LEVEL_DOC)
.define(ALLOW_AUTO_CREATE_TOPICS_CONFIG,
Type.BOOLEAN,
DEFAULT_ALLOW_AUTO_CREATE_TOPICS,
Importance.MEDIUM,
ALLOW_AUTO_CREATE_TOPICS_DOC)
// security support
.define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
Type.STRING,
@ -472,7 +485,6 @@ public class ConsumerConfig extends AbstractConfig {
CommonClientConfigs.SECURITY_PROTOCOL_DOC)
.withClientSslSupport()
.withClientSaslSupport();
}
@Override

View File

@ -727,6 +727,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
this.metadata = new ConsumerMetadata(retryBackoffMs,
config.getLong(ConsumerConfig.METADATA_MAX_AGE_CONFIG),
!config.getBoolean(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG),
config.getBoolean(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG),
subscriptions, logContext, clusterResourceListeners);
List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(
config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG), config.getString(ConsumerConfig.CLIENT_DNS_LOOKUP_CONFIG));
@ -1830,7 +1831,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
Timer timer = time.timer(timeout);
Map<String, List<PartitionInfo>> topicMetadata = fetcher.getTopicMetadata(
new MetadataRequest.Builder(Collections.singletonList(topic), true), timer);
new MetadataRequest.Builder(Collections.singletonList(topic), metadata.allowAutoTopicCreation()), timer);
return topicMetadata.get(topic);
} finally {
release();

View File

@ -28,21 +28,28 @@ import java.util.Set;
public class ConsumerMetadata extends Metadata {
private final boolean includeInternalTopics;
private final boolean allowAutoTopicCreation;
private final SubscriptionState subscription;
private final Set<String> transientTopics;
public ConsumerMetadata(long refreshBackoffMs,
long metadataExpireMs,
boolean includeInternalTopics,
boolean allowAutoTopicCreation,
SubscriptionState subscription,
LogContext logContext,
ClusterResourceListeners clusterResourceListeners) {
super(refreshBackoffMs, metadataExpireMs, logContext, clusterResourceListeners);
this.includeInternalTopics = includeInternalTopics;
this.allowAutoTopicCreation = allowAutoTopicCreation;
this.subscription = subscription;
this.transientTopics = new HashSet<>();
}
public boolean allowAutoTopicCreation() {
return allowAutoTopicCreation;
}
@Override
public synchronized MetadataRequest.Builder newMetadataRequestBuilder() {
if (subscription.hasPatternSubscription())
@ -50,7 +57,7 @@ public class ConsumerMetadata extends Metadata {
List<String> topics = new ArrayList<>();
topics.addAll(subscription.groupSubscription());
topics.addAll(transientTopics);
return new MetadataRequest.Builder(topics, true);
return new MetadataRequest.Builder(topics, allowAutoTopicCreation);
}
synchronized void addTransientTopics(Set<String> topics) {
@ -73,5 +80,4 @@ public class ConsumerMetadata extends Metadata {
return subscription.matchesSubscribedPattern(topic);
}
}

View File

@ -18,7 +18,6 @@ package org.apache.kafka.clients.producer.internals;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.requests.MetadataResponse;
@ -111,9 +110,9 @@ public class ProducerMetadata extends Metadata {
}
@Override
public synchronized void failedUpdate(long now, AuthenticationException authenticationException) {
super.failedUpdate(now, authenticationException);
if (authenticationException != null)
public synchronized void failedUpdate(long now, KafkaException fatalException) {
super.failedUpdate(now, fatalException);
if (fatalException != null)
notifyAll();
}

View File

@ -16,8 +16,10 @@
*/
package org.apache.kafka.clients;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.network.NetworkReceive;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.CommonFields;
@ -58,25 +60,26 @@ public class NetworkClientTest {
protected final long reconnectBackoffMsTest = 10 * 1000;
protected final long reconnectBackoffMaxMsTest = 10 * 10000;
private final TestMetadataUpdater metadataUpdater = new TestMetadataUpdater(Collections.singletonList(node));
private final NetworkClient client = createNetworkClient(reconnectBackoffMaxMsTest);
private final NetworkClient clientWithNoExponentialBackoff = createNetworkClient(reconnectBackoffMsTest);
private final NetworkClient clientWithStaticNodes = createNetworkClientWithStaticNodes();
private final NetworkClient clientWithNoVersionDiscovery = createNetworkClientWithNoVersionDiscovery();
private NetworkClient createNetworkClient(long reconnectBackoffMaxMs) {
return new NetworkClient(selector, new ManualMetadataUpdater(Collections.singletonList(node)), "mock", Integer.MAX_VALUE,
return new NetworkClient(selector, metadataUpdater, "mock", Integer.MAX_VALUE,
reconnectBackoffMsTest, reconnectBackoffMaxMs, 64 * 1024, 64 * 1024,
defaultRequestTimeoutMs, ClientDnsLookup.DEFAULT, time, true, new ApiVersions(), new LogContext());
}
private NetworkClient createNetworkClientWithStaticNodes() {
return new NetworkClient(selector, new ManualMetadataUpdater(Collections.singletonList(node)),
return new NetworkClient(selector, metadataUpdater,
"mock-static", Integer.MAX_VALUE, 0, 0, 64 * 1024, 64 * 1024, defaultRequestTimeoutMs,
ClientDnsLookup.DEFAULT, time, true, new ApiVersions(), new LogContext());
}
private NetworkClient createNetworkClientWithNoVersionDiscovery() {
return new NetworkClient(selector, new ManualMetadataUpdater(Collections.singletonList(node)), "mock", Integer.MAX_VALUE,
return new NetworkClient(selector, metadataUpdater, "mock", Integer.MAX_VALUE,
reconnectBackoffMsTest, reconnectBackoffMaxMsTest,
64 * 1024, 64 * 1024, defaultRequestTimeoutMs,
ClientDnsLookup.DEFAULT, time, false, new ApiVersions(), new LogContext());
@ -140,6 +143,16 @@ public class NetworkClientTest {
assertFalse("Connection should not be ready after close", client.isReady(node, 0));
}
@Test
public void testUnsupportedVersionDuringInternalMetadataRequest() {
List<String> topics = Arrays.asList("topic_1");
// disabling auto topic creation for versions less than 4 is not supported
MetadataRequest.Builder builder = new MetadataRequest.Builder(topics, false, (short) 3);
client.sendInternalMetadataRequest(builder, node.idString(), time.milliseconds());
assertEquals(UnsupportedVersionException.class, metadataUpdater.getAndClearFailure().getClass());
}
private void checkSimpleRequestResponse(NetworkClient networkClient) {
awaitReady(networkClient, node); // has to be before creating any request, as it may send ApiVersionsRequest and its response is mocked with correlation id 0
ProduceRequest.Builder builder = ProduceRequest.Builder.forCurrentMagic((short) 1, 1000,
@ -583,4 +596,25 @@ public class NetworkClientTest {
this.response = response;
}
}
// ManualMetadataUpdater with ability to keep track of failures
private static class TestMetadataUpdater extends ManualMetadataUpdater {
KafkaException failure;
public TestMetadataUpdater(List<Node> nodes) {
super(nodes);
}
@Override
public void handleFatalException(KafkaException exception) {
failure = exception;
super.handleFatalException(exception);
}
public KafkaException getAndClearFailure() {
KafkaException failure = this.failure;
this.failure = null;
return failure;
}
}
}

View File

@ -1656,8 +1656,8 @@ public class KafkaConsumerTest {
}
private ConsumerMetadata createMetadata(SubscriptionState subscription) {
return new ConsumerMetadata(0, Long.MAX_VALUE, false, subscription,
new LogContext(), new ClusterResourceListeners());
return new ConsumerMetadata(0, Long.MAX_VALUE, false, false,
subscription, new LogContext(), new ClusterResourceListeners());
}
private Node prepareRebalance(MockClient client, Node node, final Set<String> subscribedTopics, PartitionAssignor assignor, List<TopicPartition> partitions, Node coordinator) {

View File

@ -96,7 +96,7 @@ public class AbstractCoordinatorTest {
LogContext logContext = new LogContext();
this.mockTime = new MockTime();
ConsumerMetadata metadata = new ConsumerMetadata(retryBackoffMs, 60 * 60 * 1000L,
false, new SubscriptionState(logContext, OffsetResetStrategy.EARLIEST),
false, false, new SubscriptionState(logContext, OffsetResetStrategy.EARLIEST),
logContext, new ClusterResourceListeners());
this.mockClient = new MockClient(mockTime, metadata);

View File

@ -139,7 +139,7 @@ public class ConsumerCoordinatorTest {
LogContext logContext = new LogContext();
this.subscriptions = new SubscriptionState(logContext, OffsetResetStrategy.EARLIEST);
this.metadata = new ConsumerMetadata(0, Long.MAX_VALUE, false,
subscriptions, logContext, new ClusterResourceListeners());
false, subscriptions, logContext, new ClusterResourceListeners());
this.client = new MockClient(time, metadata);
this.client.updateMetadata(metadataResponse);
this.consumerClient = new ConsumerNetworkClient(logContext, client, metadata, time, 100,
@ -1181,7 +1181,7 @@ public class ConsumerCoordinatorTest {
private void testInternalTopicInclusion(boolean includeInternalTopics) {
metadata = new ConsumerMetadata(0, Long.MAX_VALUE, includeInternalTopics,
subscriptions, new LogContext(), new ClusterResourceListeners());
false, subscriptions, new LogContext(), new ClusterResourceListeners());
client = new MockClient(time, metadata);
coordinator = buildCoordinator(new Metrics(), assignors, false, Optional.empty());

View File

@ -159,8 +159,8 @@ public class ConsumerMetadataTest {
private ConsumerMetadata newConsumerMetadata(boolean includeInternalTopics) {
long refreshBackoffMs = 50;
long expireMs = 50000;
return new ConsumerMetadata(refreshBackoffMs, expireMs, includeInternalTopics, subscription, new LogContext(),
new ClusterResourceListeners());
return new ConsumerMetadata(refreshBackoffMs, expireMs, includeInternalTopics, false,
subscription, new LogContext(), new ClusterResourceListeners());
}
}

View File

@ -21,6 +21,7 @@ import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.DisconnectException;
@ -238,7 +239,7 @@ public class ConsumerNetworkClientTest {
fail("Expected authentication error thrown");
} catch (AuthenticationException e) {
// After the exception is raised, it should have been cleared
assertNull(metadata.getAndClearAuthenticationException());
assertNull(metadata.getAndClearMetadataException());
}
}
@ -258,6 +259,18 @@ public class ConsumerNetworkClientTest {
consumerClient.poll(time.timer(Duration.ZERO));
}
@Test
public void testMetadataFailurePropagated() {
KafkaException metadataException = new KafkaException();
metadata.failedUpdate(time.milliseconds(), metadataException);
try {
consumerClient.poll(time.timer(Duration.ZERO));
fail("Expected poll to throw exception");
} catch (Exception e) {
assertEquals(metadataException, e);
}
}
@Test
public void testFutureCompletionOutsidePoll() throws Exception {
// Tests the scenario in which the request that is being awaited in one thread

View File

@ -3395,7 +3395,7 @@ public class FetcherTest {
LogContext logContext = new LogContext();
time = new MockTime(1);
subscriptions = new SubscriptionState(logContext, offsetResetStrategy);
metadata = new ConsumerMetadata(0, Long.MAX_VALUE, false,
metadata = new ConsumerMetadata(0, Long.MAX_VALUE, false, false,
subscriptions, logContext, new ClusterResourceListeners());
client = new MockClient(time, metadata);
metrics = new Metrics(metricConfig, time);

View File

@ -157,7 +157,7 @@ public class OffsetForLeaderEpochClientTest {
LogContext logContext = new LogContext();
time = new MockTime(1);
subscriptions = new SubscriptionState(logContext, offsetResetStrategy);
metadata = new ConsumerMetadata(0, Long.MAX_VALUE, false,
metadata = new ConsumerMetadata(0, Long.MAX_VALUE, false, false,
subscriptions, logContext, new ClusterResourceListeners());
client = new MockClient(time, metadata);
consumerClient = new ConsumerNetworkClient(logContext, client, metadata, time,

View File

@ -0,0 +1,107 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package integration.kafka.api
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
import org.junit.runners.Parameterized.Parameters
import java.lang.{Boolean => JBoolean}
import java.time.Duration
import java.util
import scala.collection.JavaConverters._
import kafka.api.IntegrationTestHarness
import kafka.server.KafkaConfig
import kafka.utils.TestUtils
import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig}
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.utils.Utils
import org.junit.{After, Test}
/**
* Tests behavior of specifying auto topic creation configuration for the consumer and broker
*/
@RunWith(value = classOf[Parameterized])
class ConsumerTopicCreationTest(brokerAutoTopicCreationEnable: JBoolean, consumerAllowAutoCreateTopics: JBoolean) extends IntegrationTestHarness {
override protected def brokerCount: Int = 1
val topic = "topic"
val part = 0
val tp = new TopicPartition(topic, part)
val producerClientId = "ConsumerTestProducer"
val consumerClientId = "ConsumerTestConsumer"
var adminClient: AdminClient = null
// configure server properties
this.serverConfig.setProperty(KafkaConfig.ControlledShutdownEnableProp, "false") // speed up shutdown
this.serverConfig.setProperty(KafkaConfig.AutoCreateTopicsEnableProp, brokerAutoTopicCreationEnable.toString)
// configure client properties
this.producerConfig.setProperty(ProducerConfig.CLIENT_ID_CONFIG, producerClientId)
this.consumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, consumerClientId)
this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-test")
this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
this.consumerConfig.setProperty(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "100")
this.consumerConfig.setProperty(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, consumerAllowAutoCreateTopics.toString)
@After
override def tearDown(): Unit = {
if (adminClient != null)
Utils.closeQuietly(adminClient, "AdminClient")
super.tearDown()
}
@Test
def testAutoTopicCreation(): Unit = {
val consumer = createConsumer()
adminClient = AdminClient.create(createConfig())
consumer.subscribe(util.Arrays.asList(topic))
consumer.poll(Duration.ofMillis(100))
val topicCreated = adminClient.listTopics.names.get.contains(topic)
if (brokerAutoTopicCreationEnable && consumerAllowAutoCreateTopics)
assert(topicCreated == true)
else
assert(topicCreated == false)
}
def createConfig(): util.Map[String, Object] = {
val config = new util.HashMap[String, Object]
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
config.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "20000")
val securityProps: util.Map[Object, Object] =
TestUtils.adminClientSecurityConfigs(securityProtocol, trustStoreFile, clientSaslProperties)
securityProps.asScala.foreach { case (key, value) => config.put(key.asInstanceOf[String], value) }
config
}
}
object ConsumerTopicCreationTest {
@Parameters(name = "brokerTopicCreation={0}, consumerTopicCreation={1}")
def parameters: java.util.Collection[Array[Object]] = {
val data = new java.util.ArrayList[Array[Object]]()
for (brokerAutoTopicCreationEnable <- Array(JBoolean.TRUE, JBoolean.FALSE))
for (consumerAutoCreateTopicsPolicy <- Array(JBoolean.TRUE, JBoolean.FALSE))
data.add(Array(brokerAutoTopicCreationEnable, consumerAutoCreateTopicsPolicy))
data
}
}