From 5b3b385881d5518ef1b2c63cb55244cf80a80da2 Mon Sep 17 00:00:00 2001 From: Andras Katona <41361962+akatona84@users.noreply.github.com> Date: Wed, 24 May 2023 11:05:02 +0200 Subject: [PATCH] KAFKA-13504: Retry connect internal topics' creation in case of InvalidReplicationFactorException (#11565) In case the Kafka Broker cluster and the Kafka Connect cluster is started together and Connect would want to create its topics, there's a high chance to fail the creation with InvalidReplicationFactorException. --------- Co-authored-by: Daniel Urban Reviewers: Daniel Urban , Mickael Maison , Viktor Somogyi-Vass , Chris Egerton , Laszlo Hunyadi --- .../storage/KafkaConfigBackingStore.java | 31 +++----- .../storage/KafkaOffsetBackingStore.java | 42 ++++------- .../storage/KafkaStatusBackingStore.java | 34 +++------ .../storage/KafkaTopicBasedBackingStore.java | 73 +++++++++++++++++++ .../apache/kafka/connect/util/TopicAdmin.java | 51 +++++++++++++ .../storage/KafkaConfigBackingStoreTest.java | 4 +- .../storage/KafkaOffsetBackingStoreTest.java | 4 +- .../storage/KafkaStatusBackingStoreTest.java | 41 +++++------ .../kafka/connect/util/TopicAdminTest.java | 35 +++++++++ 9 files changed, 221 insertions(+), 94 deletions(-) create mode 100644 connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaTopicBasedBackingStore.java diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java index 6c211d84e0f..cf07e6c870c 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java @@ -27,7 +27,6 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.IsolationLevel; import org.apache.kafka.common.config.ConfigException; -import org.apache.kafka.common.config.TopicConfig; import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.ByteArraySerializer; @@ -193,7 +192,7 @@ import static org.apache.kafka.connect.util.ConnectUtils.className; * rebalance must be deferred. *

*/ -public class KafkaConfigBackingStore implements ConfigBackingStore { +public class KafkaConfigBackingStore extends KafkaTopicBasedBackingStore implements ConfigBackingStore { private static final Logger log = LoggerFactory.getLogger(KafkaConfigBackingStore.class); public static final String TARGET_STATE_PREFIX = "target-state-"; @@ -339,6 +338,7 @@ public class KafkaConfigBackingStore implements ConfigBackingStore { this.offset = -1; this.topicAdminSupplier = adminSupplier; this.clientId = Objects.requireNonNull(clientIdBase) + "configs"; + this.time = time; this.baseProducerProps = baseProducerProps(config); // By default, Connect disables idempotent behavior for all producers, even though idempotence became @@ -357,7 +357,6 @@ public class KafkaConfigBackingStore implements ConfigBackingStore { configLog = setupAndCreateKafkaBasedLog(this.topic, config); this.configTransformer = configTransformer; - this.time = time; } @Override @@ -774,7 +773,7 @@ public class KafkaConfigBackingStore implements ConfigBackingStore { .replicationFactor(config.getShort(DistributedConfig.CONFIG_STORAGE_REPLICATION_FACTOR_CONFIG)) .build(); - return createKafkaBasedLog(topic, producerProps, consumerProps, new ConsumeCallback(), topicDescription, adminSupplier); + return createKafkaBasedLog(topic, producerProps, consumerProps, new ConsumeCallback(), topicDescription, adminSupplier, config, time); } /** @@ -845,22 +844,14 @@ public class KafkaConfigBackingStore implements ConfigBackingStore { } } - private KafkaBasedLog createKafkaBasedLog(String topic, Map producerProps, - Map consumerProps, - Callback> consumedCallback, - final NewTopic topicDescription, Supplier adminSupplier) { - java.util.function.Consumer createTopics = admin -> { - log.debug("Creating admin client to manage Connect internal config topic"); - // Create the topic if it doesn't exist - Set newTopics = admin.createTopics(topicDescription); - if (!newTopics.contains(topic)) { - // It already existed, so check that the topic cleanup policy is compact only and not delete - log.debug("Using admin client to check cleanup policy of '{}' topic is '{}'", topic, TopicConfig.CLEANUP_POLICY_COMPACT); - admin.verifyTopicCleanupPolicyOnlyCompact(topic, - DistributedConfig.CONFIG_TOPIC_CONFIG, "connector configurations"); - } - }; - return new KafkaBasedLog<>(topic, producerProps, consumerProps, adminSupplier, consumedCallback, Time.SYSTEM, createTopics); + @Override + protected String getTopicConfig() { + return DistributedConfig.CONFIG_TOPIC_CONFIG; + } + + @Override + protected String getTopicPurpose() { + return "connector configurations"; } private class ConsumeCallback implements Callback> { diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java index 6b35598d63d..290b91406ed 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java @@ -26,7 +26,6 @@ import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.IsolationLevel; import org.apache.kafka.common.config.ConfigException; -import org.apache.kafka.common.config.TopicConfig; import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.ByteArraySerializer; @@ -68,7 +67,7 @@ import java.util.function.Supplier; * to ensure correct behavior (e.g. acks, auto.offset.reset). *

*/ -public class KafkaOffsetBackingStore implements OffsetBackingStore { +public class KafkaOffsetBackingStore extends KafkaTopicBasedBackingStore implements OffsetBackingStore { private static final Logger log = LoggerFactory.getLogger(KafkaOffsetBackingStore.class); /** @@ -100,7 +99,7 @@ public class KafkaOffsetBackingStore implements OffsetBackingStore { topicAdmin, consumedCallback, Time.SYSTEM, - initialize(topic, newTopicDescription(topic, config)) + topicInitializer(topic, newTopicDescription(topic, config), config, Time.SYSTEM) ); } }; @@ -132,7 +131,7 @@ public class KafkaOffsetBackingStore implements OffsetBackingStore { topicAdmin, consumedCallback, Time.SYSTEM, - initialize(topic, newTopicDescription(topic, config)) + topicInitializer(topic, newTopicDescription(topic, config), config, Time.SYSTEM) ); } }; @@ -221,16 +220,7 @@ public class KafkaOffsetBackingStore implements OffsetBackingStore { } NewTopic topicDescription = newTopicDescription(topic, config); - this.offsetLog = createKafkaBasedLog(topic, producerProps, consumerProps, consumedCallback, topicDescription, adminSupplier); - } - - // Visible for testing - KafkaBasedLog createKafkaBasedLog(String topic, Map producerProps, - Map consumerProps, - Callback> consumedCallback, - final NewTopic topicDescription, Supplier adminSupplier) { - java.util.function.Consumer createTopics = initialize(topic, topicDescription); - return new KafkaBasedLog<>(topic, producerProps, consumerProps, adminSupplier, consumedCallback, Time.SYSTEM, createTopics); + this.offsetLog = createKafkaBasedLog(topic, producerProps, consumerProps, consumedCallback, topicDescription, adminSupplier, config, Time.SYSTEM); } protected NewTopic newTopicDescription(final String topic, final WorkerConfig config) { @@ -245,20 +235,6 @@ public class KafkaOffsetBackingStore implements OffsetBackingStore { .build(); } - protected java.util.function.Consumer initialize(final String topic, final NewTopic topicDescription) { - return admin -> { - log.debug("Creating admin client to manage Connect internal offset topic"); - // Create the topic if it doesn't exist - Set newTopics = admin.createTopics(topicDescription); - if (!newTopics.contains(topic)) { - // It already existed, so check that the topic cleanup policy is compact only and not delete - log.debug("Using admin client to check cleanup policy for '{}' topic is '{}'", topic, TopicConfig.CLEANUP_POLICY_COMPACT); - admin.verifyTopicCleanupPolicyOnlyCompact(topic, - DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, "source connector offsets"); - } - }; - } - @Override public void start() { log.info("Starting KafkaOffsetBackingStore"); @@ -355,6 +331,16 @@ public class KafkaOffsetBackingStore implements OffsetBackingStore { } }; + @Override + protected String getTopicConfig() { + return DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG; + } + + @Override + protected String getTopicPurpose() { + return "source connector offsets"; + } + private static class SetCallbackFuture implements org.apache.kafka.clients.producer.Callback, Future { private int numLeft; private boolean completed = false; diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java index f233534056e..20f47db2262 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java @@ -23,7 +23,6 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.config.ConfigException; -import org.apache.kafka.common.config.TopicConfig; import org.apache.kafka.common.errors.RetriableException; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.ByteArraySerializer; @@ -91,7 +90,7 @@ import java.util.function.Supplier; * obviously cannot take into account in-flight requests. * */ -public class KafkaStatusBackingStore implements StatusBackingStore { +public class KafkaStatusBackingStore extends KafkaTopicBasedBackingStore implements StatusBackingStore { private static final Logger log = LoggerFactory.getLogger(KafkaStatusBackingStore.class); public static final String TASK_STATUS_PREFIX = "status-task-"; @@ -221,26 +220,7 @@ public class KafkaStatusBackingStore implements StatusBackingStore { .build(); Callback> readCallback = (error, record) -> read(record); - this.kafkaLog = createKafkaBasedLog(statusTopic, producerProps, consumerProps, readCallback, topicDescription, adminSupplier); - } - - // Visible for testing - protected KafkaBasedLog createKafkaBasedLog(String topic, Map producerProps, - Map consumerProps, - Callback> consumedCallback, - final NewTopic topicDescription, Supplier adminSupplier) { - java.util.function.Consumer createTopics = admin -> { - log.debug("Creating admin client to manage Connect internal status topic"); - // Create the topic if it doesn't exist - Set newTopics = admin.createTopics(topicDescription); - if (!newTopics.contains(topic)) { - // It already existed, so check that the topic cleanup policy is compact only and not delete - log.debug("Using admin client to check cleanup policy of '{}' topic is '{}'", topic, TopicConfig.CLEANUP_POLICY_COMPACT); - admin.verifyTopicCleanupPolicyOnlyCompact(topic, - DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG, "connector and task statuses"); - } - }; - return new KafkaBasedLog<>(topic, producerProps, consumerProps, adminSupplier, consumedCallback, time, createTopics); + this.kafkaLog = createKafkaBasedLog(statusTopic, producerProps, consumerProps, readCallback, topicDescription, adminSupplier, config, time); } @Override @@ -670,6 +650,16 @@ public class KafkaStatusBackingStore implements StatusBackingStore { } } + @Override + protected String getTopicConfig() { + return DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG; + } + + @Override + protected String getTopicPurpose() { + return "connector and task statuses"; + } + private static class CacheEntry> { private T value = null; private int sequence = 0; diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaTopicBasedBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaTopicBasedBackingStore.java new file mode 100644 index 00000000000..34b42ef4ed2 --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaTopicBasedBackingStore.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.storage; + +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.connect.runtime.WorkerConfig; +import org.apache.kafka.connect.util.Callback; +import org.apache.kafka.connect.util.KafkaBasedLog; +import org.apache.kafka.connect.util.TopicAdmin; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.Set; +import java.util.function.Consumer; +import java.util.function.Supplier; + +public abstract class KafkaTopicBasedBackingStore { + private static final Logger log = LoggerFactory.getLogger(KafkaTopicBasedBackingStore.class); + + Consumer topicInitializer(String topic, NewTopic topicDescription, WorkerConfig config, Time time) { + return admin -> { + log.debug("Creating Connect internal topic for {}", getTopicPurpose()); + // Create the topic if it doesn't exist + Set newTopics = createTopics(topicDescription, admin, config, time); + if (!newTopics.contains(topic)) { + // It already existed, so check that the topic cleanup policy is compact only and not delete + log.debug("Using admin client to check cleanup policy of '{}' topic is '{}'", topic, TopicConfig.CLEANUP_POLICY_COMPACT); + admin.verifyTopicCleanupPolicyOnlyCompact(topic, getTopicConfig(), getTopicPurpose()); + } + }; + } + + private Set createTopics(NewTopic topicDescription, TopicAdmin admin, WorkerConfig config, Time time) { + // get the prefixless default api timeout and retry backoff for topic creation retry configs + AdminClientConfig adminClientConfig = new AdminClientConfig(config.originals()); + long timeoutMs = adminClientConfig.getInt(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG); + long backOffMs = adminClientConfig.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG); + return admin.createTopicsWithRetry(topicDescription, timeoutMs, backOffMs, time); + } + + // visible for testing + KafkaBasedLog createKafkaBasedLog(String topic, Map producerProps, + Map consumerProps, + Callback> consumedCallback, + final NewTopic topicDescription, Supplier adminSupplier, + WorkerConfig config, Time time) { + Consumer createTopics = topicInitializer(topic, topicDescription, config, time); + return new KafkaBasedLog<>(topic, producerProps, consumerProps, adminSupplier, consumedCallback, time, createTopics); + } + + protected abstract String getTopicConfig(); + + protected abstract String getTopicPurpose(); +} diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java index 691852430a0..f8c30dc13b3 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java @@ -38,12 +38,15 @@ import org.apache.kafka.common.config.TopicConfig; import org.apache.kafka.common.errors.AuthorizationException; import org.apache.kafka.common.errors.ClusterAuthorizationException; import org.apache.kafka.common.errors.InvalidConfigurationException; +import org.apache.kafka.common.errors.InvalidReplicationFactorException; import org.apache.kafka.common.errors.LeaderNotAvailableException; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.errors.TopicAuthorizationException; import org.apache.kafka.common.errors.TopicExistsException; import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.apache.kafka.common.errors.UnsupportedVersionException; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.errors.RetriableException; @@ -56,6 +59,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; @@ -70,6 +74,9 @@ import java.util.stream.Collectors; public class TopicAdmin implements AutoCloseable { public static final TopicCreationResponse EMPTY_CREATION = new TopicCreationResponse(Collections.emptySet(), Collections.emptySet()); + private static final List> CAUSES_TO_RETRY_TOPIC_CREATION = Arrays.asList( + InvalidReplicationFactorException.class, + TimeoutException.class); public static class TopicCreationResponse { @@ -328,6 +335,50 @@ public class TopicAdmin implements AutoCloseable { return createOrFindTopics(topics).createdTopics(); } + /** + * Implements a retry logic around creating topic(s) in case it'd fail due to + * specific type of exceptions, see {@link TopicAdmin#retryableTopicCreationException(ConnectException)} + * + * @param topicDescription the specifications of the topic + * @param timeoutMs Timeout in milliseconds + * @param backOffMs Time for delay after initial failed attempt in milliseconds + * @param time {@link Time} instance + * @return the names of the topics that were created by this operation; never null but possibly empty, + * the same as {@link TopicAdmin#createTopics(NewTopic...)} + */ + public Set createTopicsWithRetry(NewTopic topicDescription, long timeoutMs, long backOffMs, Time time) { + Timer timer = time.timer(timeoutMs); + do { + try { + return createTopics(topicDescription); + } catch (ConnectException e) { + if (timer.notExpired() && retryableTopicCreationException(e)) { + log.info("'{}' topic creation failed due to '{}', retrying, {}ms remaining", + topicDescription.name(), e.getMessage(), timer.remainingMs()); + } else { + throw e; + } + } + timer.sleep(backOffMs); + } while (timer.notExpired()); + throw new TimeoutException("Timeout expired while trying to create topic(s)"); + } + + private boolean retryableTopicCreationException(ConnectException e) { + // createTopics wraps the exception into ConnectException + // to retry the creation, it should be an ExecutionException from future get which was caused by InvalidReplicationFactorException + // or can be a TimeoutException + Throwable cause = e.getCause(); + while (cause != null) { + final Throwable finalCause = cause; + if (CAUSES_TO_RETRY_TOPIC_CREATION.stream().anyMatch(exceptionClass -> exceptionClass.isInstance(finalCause))) { + return true; + } + cause = cause.getCause(); + } + return false; + } + /** * Attempt to find or create the topic described by the given definition, returning true if the topic was created or had * already existed, or false if the topic did not exist and could not be created. diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java index 1f347825307..f72379d27fc 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java @@ -30,6 +30,7 @@ import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.Time; import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaAndValue; @@ -1609,7 +1610,8 @@ public class KafkaConfigBackingStoreTest { PowerMock.expectPrivate(configStorage, "createKafkaBasedLog", EasyMock.capture(capturedTopic), EasyMock.capture(capturedProducerProps), EasyMock.capture(capturedConsumerProps), EasyMock.capture(capturedConsumedCallback), - EasyMock.capture(capturedNewTopic), EasyMock.capture(capturedAdminSupplier)) + EasyMock.capture(capturedNewTopic), EasyMock.capture(capturedAdminSupplier), + EasyMock.anyObject(WorkerConfig.class), EasyMock.anyObject(Time.class)) .andReturn(storeLog); } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java index 1d5d4ce4f71..2aaf97d3263 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java @@ -147,7 +147,7 @@ public class KafkaOffsetBackingStoreTest { doReturn(storeLog).when(store).createKafkaBasedLog(capturedTopic.capture(), capturedProducerProps.capture(), capturedConsumerProps.capture(), capturedConsumedCallback.capture(), - capturedNewTopic.capture(), capturedAdminSupplier.capture()); + capturedNewTopic.capture(), capturedAdminSupplier.capture(), any(), any()); } @After @@ -482,7 +482,7 @@ public class KafkaOffsetBackingStoreTest { doReturn(storeLog).when(store).createKafkaBasedLog(capturedTopic.capture(), capturedProducerProps.capture(), capturedConsumerProps.capture(), capturedConsumedCallback.capture(), - capturedNewTopic.capture(), capturedAdminSupplier.capture()); + capturedNewTopic.capture(), capturedAdminSupplier.capture(), any(), any()); store.configure(mockConfig(props)); store.start(); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaStatusBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaStatusBackingStoreTest.java index 0b4293f31a5..71fe342ee28 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaStatusBackingStoreTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaStatusBackingStoreTest.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.connect.storage; -import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.common.config.ConfigException; @@ -47,18 +46,20 @@ import java.util.Optional; import java.util.function.Supplier; import static org.apache.kafka.clients.CommonClientConfigs.CLIENT_ID_CONFIG; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.Mockito.any; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; @SuppressWarnings("unchecked") public class KafkaStatusBackingStoreTest { @@ -363,24 +364,22 @@ public class KafkaStatusBackingStoreTest { String clientIdBase = "test-client-id-"; Supplier topicAdminSupplier = () -> mock(TopicAdmin.class); - Map capturedProducerProps = new HashMap<>(); - Map capturedConsumerProps = new HashMap<>(); + ArgumentCaptor> capturedProducerProps = ArgumentCaptor.forClass(Map.class); + ArgumentCaptor> capturedConsumerProps = ArgumentCaptor.forClass(Map.class); + + store = spy(new KafkaStatusBackingStore(new MockTime(), converter, topicAdminSupplier, clientIdBase)); + KafkaBasedLog kafkaLog = mock(KafkaBasedLog.class); + doReturn(kafkaLog).when(store).createKafkaBasedLog(any(), capturedProducerProps.capture(), + capturedConsumerProps.capture(), any(), + any(), any(), any(), any()); - store = new KafkaStatusBackingStore(new MockTime(), converter, topicAdminSupplier, clientIdBase) { - @Override - protected KafkaBasedLog createKafkaBasedLog(String topic, Map producerProps, Map consumerProps, org.apache.kafka.connect.util.Callback> consumedCallback, NewTopic topicDescription, Supplier adminSupplier) { - capturedProducerProps.putAll(producerProps); - capturedConsumerProps.putAll(consumerProps); - return kafkaBasedLog; - } - }; when(workerConfig.getString(DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG)).thenReturn("connect-statuses"); store.configure(workerConfig); final String expectedClientId = clientIdBase + "statuses"; - assertEquals(expectedClientId, capturedProducerProps.get(CLIENT_ID_CONFIG)); - assertEquals(expectedClientId, capturedConsumerProps.get(CLIENT_ID_CONFIG)); + assertEquals(expectedClientId, capturedProducerProps.getValue().get(CLIENT_ID_CONFIG)); + assertEquals(expectedClientId, capturedConsumerProps.getValue().get(CLIENT_ID_CONFIG)); } private static ConsumerRecord consumerRecord(long offset, String key, byte[] value) { diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java index 1104f394938..8f5df8f66cc 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java @@ -53,6 +53,7 @@ import org.apache.kafka.common.requests.MetadataResponse; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.connect.errors.ConnectException; import org.junit.Test; +import org.mockito.Mockito; import java.time.Duration; import java.util.ArrayList; @@ -258,6 +259,40 @@ public class TopicAdminTest { } } + @Test + public void shouldRetryCreateTopicWhenAvailableBrokersAreNotEnoughForReplicationFactor() { + Cluster cluster = createCluster(1); + NewTopic newTopic = TopicAdmin.defineTopic("myTopic").partitions(1).replicationFactor((short) 2).compacted().build(); + + try (TopicAdmin admin = Mockito.spy(new TopicAdmin(null, new MockAdminClient(cluster.nodes(), cluster.nodeById(0))))) { + try { + admin.createTopicsWithRetry(newTopic, 2, 1, new MockTime()); + } catch (Exception e) { + // not relevant + e.printStackTrace(); + } + Mockito.verify(admin, Mockito.times(2)).createTopics(newTopic); + } + } + + @Test + public void shouldRetryWhenTopicCreateThrowsWrappedTimeoutException() { + Cluster cluster = createCluster(1); + NewTopic newTopic = TopicAdmin.defineTopic("myTopic").partitions(1).replicationFactor((short) 1).compacted().build(); + + try (MockAdminClient mockAdminClient = new MockAdminClient(cluster.nodes(), cluster.nodeById(0)); + TopicAdmin admin = Mockito.spy(new TopicAdmin(null, mockAdminClient))) { + mockAdminClient.timeoutNextRequest(1); + try { + admin.createTopicsWithRetry(newTopic, 2, 1, new MockTime()); + } catch (Exception e) { + // not relevant + e.printStackTrace(); + } + Mockito.verify(admin, Mockito.times(2)).createTopics(newTopic); + } + } + @Test public void createShouldReturnFalseWhenSuppliedNullTopicDescription() { Cluster cluster = createCluster(1);