KAFKA-13504: Retry connect internal topics' creation in case of InvalidReplicationFactorException (#11565)

In case the Kafka Broker cluster and the Kafka Connect cluster is started together and Connect would want to create its topics, there's a high chance to fail the creation with InvalidReplicationFactorException.

---------

Co-authored-by: Daniel Urban <durban@cloudera.com>

Reviewers: Daniel Urban <durban@cloudera.com>, Mickael Maison <mickael.maison@gmail.com>, Viktor Somogyi-Vass <viktorsomogyi@gmail.com>, Chris Egerton <chrise@aiven.io>, Laszlo Hunyadi <laszlo.istvan.hunyady@gmail.com>
This commit is contained in:
Andras Katona 2023-05-24 11:05:02 +02:00 committed by GitHub
parent 1c5cf4e170
commit 5b3b385881
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 221 additions and 94 deletions

View File

@ -27,7 +27,6 @@ import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
@ -193,7 +192,7 @@ import static org.apache.kafka.connect.util.ConnectUtils.className;
* rebalance must be deferred.
* </p>
*/
public class KafkaConfigBackingStore implements ConfigBackingStore {
public class KafkaConfigBackingStore extends KafkaTopicBasedBackingStore implements ConfigBackingStore {
private static final Logger log = LoggerFactory.getLogger(KafkaConfigBackingStore.class);
public static final String TARGET_STATE_PREFIX = "target-state-";
@ -339,6 +338,7 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
this.offset = -1;
this.topicAdminSupplier = adminSupplier;
this.clientId = Objects.requireNonNull(clientIdBase) + "configs";
this.time = time;
this.baseProducerProps = baseProducerProps(config);
// By default, Connect disables idempotent behavior for all producers, even though idempotence became
@ -357,7 +357,6 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
configLog = setupAndCreateKafkaBasedLog(this.topic, config);
this.configTransformer = configTransformer;
this.time = time;
}
@Override
@ -774,7 +773,7 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
.replicationFactor(config.getShort(DistributedConfig.CONFIG_STORAGE_REPLICATION_FACTOR_CONFIG))
.build();
return createKafkaBasedLog(topic, producerProps, consumerProps, new ConsumeCallback(), topicDescription, adminSupplier);
return createKafkaBasedLog(topic, producerProps, consumerProps, new ConsumeCallback(), topicDescription, adminSupplier, config, time);
}
/**
@ -845,22 +844,14 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
}
}
private KafkaBasedLog<String, byte[]> createKafkaBasedLog(String topic, Map<String, Object> producerProps,
Map<String, Object> consumerProps,
Callback<ConsumerRecord<String, byte[]>> consumedCallback,
final NewTopic topicDescription, Supplier<TopicAdmin> adminSupplier) {
java.util.function.Consumer<TopicAdmin> createTopics = admin -> {
log.debug("Creating admin client to manage Connect internal config topic");
// Create the topic if it doesn't exist
Set<String> newTopics = admin.createTopics(topicDescription);
if (!newTopics.contains(topic)) {
// It already existed, so check that the topic cleanup policy is compact only and not delete
log.debug("Using admin client to check cleanup policy of '{}' topic is '{}'", topic, TopicConfig.CLEANUP_POLICY_COMPACT);
admin.verifyTopicCleanupPolicyOnlyCompact(topic,
DistributedConfig.CONFIG_TOPIC_CONFIG, "connector configurations");
}
};
return new KafkaBasedLog<>(topic, producerProps, consumerProps, adminSupplier, consumedCallback, Time.SYSTEM, createTopics);
@Override
protected String getTopicConfig() {
return DistributedConfig.CONFIG_TOPIC_CONFIG;
}
@Override
protected String getTopicPurpose() {
return "connector configurations";
}
private class ConsumeCallback implements Callback<ConsumerRecord<String, byte[]>> {

View File

@ -26,7 +26,6 @@ import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
@ -68,7 +67,7 @@ import java.util.function.Supplier;
* to ensure correct behavior (e.g. acks, auto.offset.reset).
* </p>
*/
public class KafkaOffsetBackingStore implements OffsetBackingStore {
public class KafkaOffsetBackingStore extends KafkaTopicBasedBackingStore implements OffsetBackingStore {
private static final Logger log = LoggerFactory.getLogger(KafkaOffsetBackingStore.class);
/**
@ -100,7 +99,7 @@ public class KafkaOffsetBackingStore implements OffsetBackingStore {
topicAdmin,
consumedCallback,
Time.SYSTEM,
initialize(topic, newTopicDescription(topic, config))
topicInitializer(topic, newTopicDescription(topic, config), config, Time.SYSTEM)
);
}
};
@ -132,7 +131,7 @@ public class KafkaOffsetBackingStore implements OffsetBackingStore {
topicAdmin,
consumedCallback,
Time.SYSTEM,
initialize(topic, newTopicDescription(topic, config))
topicInitializer(topic, newTopicDescription(topic, config), config, Time.SYSTEM)
);
}
};
@ -221,16 +220,7 @@ public class KafkaOffsetBackingStore implements OffsetBackingStore {
}
NewTopic topicDescription = newTopicDescription(topic, config);
this.offsetLog = createKafkaBasedLog(topic, producerProps, consumerProps, consumedCallback, topicDescription, adminSupplier);
}
// Visible for testing
KafkaBasedLog<byte[], byte[]> createKafkaBasedLog(String topic, Map<String, Object> producerProps,
Map<String, Object> consumerProps,
Callback<ConsumerRecord<byte[], byte[]>> consumedCallback,
final NewTopic topicDescription, Supplier<TopicAdmin> adminSupplier) {
java.util.function.Consumer<TopicAdmin> createTopics = initialize(topic, topicDescription);
return new KafkaBasedLog<>(topic, producerProps, consumerProps, adminSupplier, consumedCallback, Time.SYSTEM, createTopics);
this.offsetLog = createKafkaBasedLog(topic, producerProps, consumerProps, consumedCallback, topicDescription, adminSupplier, config, Time.SYSTEM);
}
protected NewTopic newTopicDescription(final String topic, final WorkerConfig config) {
@ -245,20 +235,6 @@ public class KafkaOffsetBackingStore implements OffsetBackingStore {
.build();
}
protected java.util.function.Consumer<TopicAdmin> initialize(final String topic, final NewTopic topicDescription) {
return admin -> {
log.debug("Creating admin client to manage Connect internal offset topic");
// Create the topic if it doesn't exist
Set<String> newTopics = admin.createTopics(topicDescription);
if (!newTopics.contains(topic)) {
// It already existed, so check that the topic cleanup policy is compact only and not delete
log.debug("Using admin client to check cleanup policy for '{}' topic is '{}'", topic, TopicConfig.CLEANUP_POLICY_COMPACT);
admin.verifyTopicCleanupPolicyOnlyCompact(topic,
DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, "source connector offsets");
}
};
}
@Override
public void start() {
log.info("Starting KafkaOffsetBackingStore");
@ -355,6 +331,16 @@ public class KafkaOffsetBackingStore implements OffsetBackingStore {
}
};
@Override
protected String getTopicConfig() {
return DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG;
}
@Override
protected String getTopicPurpose() {
return "source connector offsets";
}
private static class SetCallbackFuture implements org.apache.kafka.clients.producer.Callback, Future<Void> {
private int numLeft;
private boolean completed = false;

View File

@ -23,7 +23,6 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
@ -91,7 +90,7 @@ import java.util.function.Supplier;
* obviously cannot take into account in-flight requests.
*
*/
public class KafkaStatusBackingStore implements StatusBackingStore {
public class KafkaStatusBackingStore extends KafkaTopicBasedBackingStore implements StatusBackingStore {
private static final Logger log = LoggerFactory.getLogger(KafkaStatusBackingStore.class);
public static final String TASK_STATUS_PREFIX = "status-task-";
@ -221,26 +220,7 @@ public class KafkaStatusBackingStore implements StatusBackingStore {
.build();
Callback<ConsumerRecord<String, byte[]>> readCallback = (error, record) -> read(record);
this.kafkaLog = createKafkaBasedLog(statusTopic, producerProps, consumerProps, readCallback, topicDescription, adminSupplier);
}
// Visible for testing
protected KafkaBasedLog<String, byte[]> createKafkaBasedLog(String topic, Map<String, Object> producerProps,
Map<String, Object> consumerProps,
Callback<ConsumerRecord<String, byte[]>> consumedCallback,
final NewTopic topicDescription, Supplier<TopicAdmin> adminSupplier) {
java.util.function.Consumer<TopicAdmin> createTopics = admin -> {
log.debug("Creating admin client to manage Connect internal status topic");
// Create the topic if it doesn't exist
Set<String> newTopics = admin.createTopics(topicDescription);
if (!newTopics.contains(topic)) {
// It already existed, so check that the topic cleanup policy is compact only and not delete
log.debug("Using admin client to check cleanup policy of '{}' topic is '{}'", topic, TopicConfig.CLEANUP_POLICY_COMPACT);
admin.verifyTopicCleanupPolicyOnlyCompact(topic,
DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG, "connector and task statuses");
}
};
return new KafkaBasedLog<>(topic, producerProps, consumerProps, adminSupplier, consumedCallback, time, createTopics);
this.kafkaLog = createKafkaBasedLog(statusTopic, producerProps, consumerProps, readCallback, topicDescription, adminSupplier, config, time);
}
@Override
@ -670,6 +650,16 @@ public class KafkaStatusBackingStore implements StatusBackingStore {
}
}
@Override
protected String getTopicConfig() {
return DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG;
}
@Override
protected String getTopicPurpose() {
return "connector and task statuses";
}
private static class CacheEntry<T extends AbstractStatus<?>> {
private T value = null;
private int sequence = 0;

View File

@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.storage;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.KafkaBasedLog;
import org.apache.kafka.connect.util.TopicAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Supplier;
public abstract class KafkaTopicBasedBackingStore {
private static final Logger log = LoggerFactory.getLogger(KafkaTopicBasedBackingStore.class);
Consumer<TopicAdmin> topicInitializer(String topic, NewTopic topicDescription, WorkerConfig config, Time time) {
return admin -> {
log.debug("Creating Connect internal topic for {}", getTopicPurpose());
// Create the topic if it doesn't exist
Set<String> newTopics = createTopics(topicDescription, admin, config, time);
if (!newTopics.contains(topic)) {
// It already existed, so check that the topic cleanup policy is compact only and not delete
log.debug("Using admin client to check cleanup policy of '{}' topic is '{}'", topic, TopicConfig.CLEANUP_POLICY_COMPACT);
admin.verifyTopicCleanupPolicyOnlyCompact(topic, getTopicConfig(), getTopicPurpose());
}
};
}
private Set<String> createTopics(NewTopic topicDescription, TopicAdmin admin, WorkerConfig config, Time time) {
// get the prefixless default api timeout and retry backoff for topic creation retry configs
AdminClientConfig adminClientConfig = new AdminClientConfig(config.originals());
long timeoutMs = adminClientConfig.getInt(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG);
long backOffMs = adminClientConfig.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG);
return admin.createTopicsWithRetry(topicDescription, timeoutMs, backOffMs, time);
}
// visible for testing
<K> KafkaBasedLog<K, byte[]> createKafkaBasedLog(String topic, Map<String, Object> producerProps,
Map<String, Object> consumerProps,
Callback<ConsumerRecord<K, byte[]>> consumedCallback,
final NewTopic topicDescription, Supplier<TopicAdmin> adminSupplier,
WorkerConfig config, Time time) {
Consumer<TopicAdmin> createTopics = topicInitializer(topic, topicDescription, config, time);
return new KafkaBasedLog<>(topic, producerProps, consumerProps, adminSupplier, consumedCallback, time, createTopics);
}
protected abstract String getTopicConfig();
protected abstract String getTopicPurpose();
}

View File

@ -38,12 +38,15 @@ import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.ClusterAuthorizationException;
import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.apache.kafka.common.errors.InvalidReplicationFactorException;
import org.apache.kafka.common.errors.LeaderNotAvailableException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.RetriableException;
@ -56,6 +59,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
@ -70,6 +74,9 @@ import java.util.stream.Collectors;
public class TopicAdmin implements AutoCloseable {
public static final TopicCreationResponse EMPTY_CREATION = new TopicCreationResponse(Collections.emptySet(), Collections.emptySet());
private static final List<Class<? extends Exception>> CAUSES_TO_RETRY_TOPIC_CREATION = Arrays.asList(
InvalidReplicationFactorException.class,
TimeoutException.class);
public static class TopicCreationResponse {
@ -328,6 +335,50 @@ public class TopicAdmin implements AutoCloseable {
return createOrFindTopics(topics).createdTopics();
}
/**
* Implements a retry logic around creating topic(s) in case it'd fail due to
* specific type of exceptions, see {@link TopicAdmin#retryableTopicCreationException(ConnectException)}
*
* @param topicDescription the specifications of the topic
* @param timeoutMs Timeout in milliseconds
* @param backOffMs Time for delay after initial failed attempt in milliseconds
* @param time {@link Time} instance
* @return the names of the topics that were created by this operation; never null but possibly empty,
* the same as {@link TopicAdmin#createTopics(NewTopic...)}
*/
public Set<String> createTopicsWithRetry(NewTopic topicDescription, long timeoutMs, long backOffMs, Time time) {
Timer timer = time.timer(timeoutMs);
do {
try {
return createTopics(topicDescription);
} catch (ConnectException e) {
if (timer.notExpired() && retryableTopicCreationException(e)) {
log.info("'{}' topic creation failed due to '{}', retrying, {}ms remaining",
topicDescription.name(), e.getMessage(), timer.remainingMs());
} else {
throw e;
}
}
timer.sleep(backOffMs);
} while (timer.notExpired());
throw new TimeoutException("Timeout expired while trying to create topic(s)");
}
private boolean retryableTopicCreationException(ConnectException e) {
// createTopics wraps the exception into ConnectException
// to retry the creation, it should be an ExecutionException from future get which was caused by InvalidReplicationFactorException
// or can be a TimeoutException
Throwable cause = e.getCause();
while (cause != null) {
final Throwable finalCause = cause;
if (CAUSES_TO_RETRY_TOPIC_CREATION.stream().anyMatch(exceptionClass -> exceptionClass.isInstance(finalCause))) {
return true;
}
cause = cause.getCause();
}
return false;
}
/**
* Attempt to find or create the topic described by the given definition, returning true if the topic was created or had
* already existed, or false if the topic did not exist and could not be created.

View File

@ -30,6 +30,7 @@ import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
@ -1609,7 +1610,8 @@ public class KafkaConfigBackingStoreTest {
PowerMock.expectPrivate(configStorage, "createKafkaBasedLog",
EasyMock.capture(capturedTopic), EasyMock.capture(capturedProducerProps),
EasyMock.capture(capturedConsumerProps), EasyMock.capture(capturedConsumedCallback),
EasyMock.capture(capturedNewTopic), EasyMock.capture(capturedAdminSupplier))
EasyMock.capture(capturedNewTopic), EasyMock.capture(capturedAdminSupplier),
EasyMock.anyObject(WorkerConfig.class), EasyMock.anyObject(Time.class))
.andReturn(storeLog);
}

View File

@ -147,7 +147,7 @@ public class KafkaOffsetBackingStoreTest {
doReturn(storeLog).when(store).createKafkaBasedLog(capturedTopic.capture(), capturedProducerProps.capture(),
capturedConsumerProps.capture(), capturedConsumedCallback.capture(),
capturedNewTopic.capture(), capturedAdminSupplier.capture());
capturedNewTopic.capture(), capturedAdminSupplier.capture(), any(), any());
}
@After
@ -482,7 +482,7 @@ public class KafkaOffsetBackingStoreTest {
doReturn(storeLog).when(store).createKafkaBasedLog(capturedTopic.capture(), capturedProducerProps.capture(),
capturedConsumerProps.capture(), capturedConsumedCallback.capture(),
capturedNewTopic.capture(), capturedAdminSupplier.capture());
capturedNewTopic.capture(), capturedAdminSupplier.capture(), any(), any());
store.configure(mockConfig(props));
store.start();

View File

@ -16,7 +16,6 @@
*/
package org.apache.kafka.connect.storage;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.common.config.ConfigException;
@ -47,18 +46,20 @@ import java.util.Optional;
import java.util.function.Supplier;
import static org.apache.kafka.clients.CommonClientConfigs.CLIENT_ID_CONFIG;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@SuppressWarnings("unchecked")
public class KafkaStatusBackingStoreTest {
@ -363,24 +364,22 @@ public class KafkaStatusBackingStoreTest {
String clientIdBase = "test-client-id-";
Supplier<TopicAdmin> topicAdminSupplier = () -> mock(TopicAdmin.class);
Map<String, Object> capturedProducerProps = new HashMap<>();
Map<String, Object> capturedConsumerProps = new HashMap<>();
ArgumentCaptor<Map<String, Object>> capturedProducerProps = ArgumentCaptor.forClass(Map.class);
ArgumentCaptor<Map<String, Object>> capturedConsumerProps = ArgumentCaptor.forClass(Map.class);
store = spy(new KafkaStatusBackingStore(new MockTime(), converter, topicAdminSupplier, clientIdBase));
KafkaBasedLog<String, byte[]> kafkaLog = mock(KafkaBasedLog.class);
doReturn(kafkaLog).when(store).createKafkaBasedLog(any(), capturedProducerProps.capture(),
capturedConsumerProps.capture(), any(),
any(), any(), any(), any());
store = new KafkaStatusBackingStore(new MockTime(), converter, topicAdminSupplier, clientIdBase) {
@Override
protected KafkaBasedLog<String, byte[]> createKafkaBasedLog(String topic, Map<String, Object> producerProps, Map<String, Object> consumerProps, org.apache.kafka.connect.util.Callback<ConsumerRecord<String, byte[]>> consumedCallback, NewTopic topicDescription, Supplier<TopicAdmin> adminSupplier) {
capturedProducerProps.putAll(producerProps);
capturedConsumerProps.putAll(consumerProps);
return kafkaBasedLog;
}
};
when(workerConfig.getString(DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG)).thenReturn("connect-statuses");
store.configure(workerConfig);
final String expectedClientId = clientIdBase + "statuses";
assertEquals(expectedClientId, capturedProducerProps.get(CLIENT_ID_CONFIG));
assertEquals(expectedClientId, capturedConsumerProps.get(CLIENT_ID_CONFIG));
assertEquals(expectedClientId, capturedProducerProps.getValue().get(CLIENT_ID_CONFIG));
assertEquals(expectedClientId, capturedConsumerProps.getValue().get(CLIENT_ID_CONFIG));
}
private static ConsumerRecord<String, byte[]> consumerRecord(long offset, String key, byte[] value) {

View File

@ -53,6 +53,7 @@ import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.connect.errors.ConnectException;
import org.junit.Test;
import org.mockito.Mockito;
import java.time.Duration;
import java.util.ArrayList;
@ -258,6 +259,40 @@ public class TopicAdminTest {
}
}
@Test
public void shouldRetryCreateTopicWhenAvailableBrokersAreNotEnoughForReplicationFactor() {
Cluster cluster = createCluster(1);
NewTopic newTopic = TopicAdmin.defineTopic("myTopic").partitions(1).replicationFactor((short) 2).compacted().build();
try (TopicAdmin admin = Mockito.spy(new TopicAdmin(null, new MockAdminClient(cluster.nodes(), cluster.nodeById(0))))) {
try {
admin.createTopicsWithRetry(newTopic, 2, 1, new MockTime());
} catch (Exception e) {
// not relevant
e.printStackTrace();
}
Mockito.verify(admin, Mockito.times(2)).createTopics(newTopic);
}
}
@Test
public void shouldRetryWhenTopicCreateThrowsWrappedTimeoutException() {
Cluster cluster = createCluster(1);
NewTopic newTopic = TopicAdmin.defineTopic("myTopic").partitions(1).replicationFactor((short) 1).compacted().build();
try (MockAdminClient mockAdminClient = new MockAdminClient(cluster.nodes(), cluster.nodeById(0));
TopicAdmin admin = Mockito.spy(new TopicAdmin(null, mockAdminClient))) {
mockAdminClient.timeoutNextRequest(1);
try {
admin.createTopicsWithRetry(newTopic, 2, 1, new MockTime());
} catch (Exception e) {
// not relevant
e.printStackTrace();
}
Mockito.verify(admin, Mockito.times(2)).createTopics(newTopic);
}
}
@Test
public void createShouldReturnFalseWhenSuppliedNullTopicDescription() {
Cluster cluster = createCluster(1);