From 6aac009a2dc6912160ec02c0eac86597d68e8e50 Mon Sep 17 00:00:00 2001 From: Yash Mayya Date: Sat, 18 May 2024 01:39:30 +0530 Subject: [PATCH] MINOR: Remove deprecated constructors from Connect's Kafka*BackingStore classes (#15865) - These constructors were deprecated over 3 years ago in KAFKA-10021: Changed Kafka backing stores to use shared admin client to get end offsets and create topics #9780. - While these classes are not a part of Connect's public API, deprecation was still introduced instead of outright removal because they are useful utility classes that might've been used outside of Connect. - The KafkaOffsetBackingStore's deprecated constructor was removed in KAFKA-14785: Connect offset read REST API #13434. - This patch removes the deprecated constructors for KafkaConfigBackingStore and KafkaStatusBackingStore. Reviewers: Chia-Ping Tsai --- .../storage/KafkaConfigBackingStore.java | 19 ++------------ .../storage/KafkaOffsetBackingStore.java | 20 ++------------- .../storage/KafkaStatusBackingStore.java | 25 +++---------------- .../KafkaConfigBackingStoreMockitoTest.java | 2 +- .../storage/KafkaConfigBackingStoreTest.java | 3 ++- .../KafkaStatusBackingStoreFormatTest.java | 7 +++--- .../storage/KafkaStatusBackingStoreTest.java | 2 +- 7 files changed, 15 insertions(+), 63 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java index e26f7d88f19..c24dd6c7907 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java @@ -53,7 +53,6 @@ import org.apache.kafka.connect.util.Callback; import org.apache.kafka.connect.util.ConnectUtils; import org.apache.kafka.connect.util.ConnectorTaskId; import org.apache.kafka.connect.util.KafkaBasedLog; -import org.apache.kafka.connect.util.SharedTopicAdmin; import org.apache.kafka.connect.util.TopicAdmin; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -300,7 +299,6 @@ public class KafkaConfigBackingStore extends KafkaTopicBasedBackingStore impleme final Map> connectorConfigs = new HashMap<>(); final Map> taskConfigs = new HashMap<>(); private final Supplier topicAdminSupplier; - private SharedTopicAdmin ownTopicAdmin; private final String clientId; // Set of connectors where we saw a task commit with an incomplete set of task config updates, indicating the data @@ -334,11 +332,6 @@ public class KafkaConfigBackingStore extends KafkaTopicBasedBackingStore impleme this.configLog = configLog; } - @Deprecated - public KafkaConfigBackingStore(Converter converter, DistributedConfig config, WorkerConfigTransformer configTransformer) { - this(converter, config, configTransformer, null, "connect-distributed-"); - } - public KafkaConfigBackingStore(Converter converter, DistributedConfig config, WorkerConfigTransformer configTransformer, Supplier adminSupplier, String clientIdBase) { this(converter, config, configTransformer, adminSupplier, clientIdBase, Time.SYSTEM); } @@ -411,7 +404,6 @@ public class KafkaConfigBackingStore extends KafkaTopicBasedBackingStore impleme log.info("Closing KafkaConfigBackingStore"); relinquishWritePrivileges(); - Utils.closeQuietly(ownTopicAdmin, "admin for config topic"); Utils.closeQuietly(configLog::stop, "KafkaBasedLog for config topic"); log.info("Closed KafkaConfigBackingStore"); @@ -794,14 +786,7 @@ public class KafkaConfigBackingStore extends KafkaTopicBasedBackingStore impleme Map adminProps = new HashMap<>(originals); ConnectUtils.addMetricsContextProperties(adminProps, config, clusterId); adminProps.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId); - Supplier adminSupplier; - if (topicAdminSupplier != null) { - adminSupplier = topicAdminSupplier; - } else { - // Create our own topic admin supplier that we'll close when we're stopped - ownTopicAdmin = new SharedTopicAdmin(adminProps); - adminSupplier = ownTopicAdmin; - } + Map topicSettings = config instanceof DistributedConfig ? ((DistributedConfig) config).configStorageTopicSettings() : Collections.emptyMap(); @@ -812,7 +797,7 @@ public class KafkaConfigBackingStore extends KafkaTopicBasedBackingStore impleme .replicationFactor(config.getShort(DistributedConfig.CONFIG_STORAGE_REPLICATION_FACTOR_CONFIG)) .build(); - return createKafkaBasedLog(topic, producerProps, consumerProps, new ConsumeCallback(), topicDescription, adminSupplier, config, time); + return createKafkaBasedLog(topic, producerProps, consumerProps, new ConsumeCallback(), topicDescription, topicAdminSupplier, config, time); } /** diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java index 4d447759edd..e3960748b9c 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java @@ -37,7 +37,6 @@ import org.apache.kafka.connect.util.Callback; import org.apache.kafka.connect.util.ConnectUtils; import org.apache.kafka.connect.util.ConvertingFutureCallback; import org.apache.kafka.connect.util.KafkaBasedLog; -import org.apache.kafka.connect.util.SharedTopicAdmin; import org.apache.kafka.connect.util.TopicAdmin; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -149,7 +148,6 @@ public class KafkaOffsetBackingStore extends KafkaTopicBasedBackingStore impleme private Converter keyConverter; private final Supplier topicAdminSupplier; private final Supplier clientIdBase; - private SharedTopicAdmin ownTopicAdmin; protected boolean exactlyOnce; /** @@ -211,17 +209,9 @@ public class KafkaOffsetBackingStore extends KafkaTopicBasedBackingStore impleme Map adminProps = new HashMap<>(originals); adminProps.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId); ConnectUtils.addMetricsContextProperties(adminProps, config, clusterId); - Supplier adminSupplier; - if (topicAdminSupplier != null) { - adminSupplier = topicAdminSupplier; - } else { - // Create our own topic admin supplier that we'll close when we're stopped - this.ownTopicAdmin = new SharedTopicAdmin(adminProps); - adminSupplier = ownTopicAdmin; - } NewTopic topicDescription = newTopicDescription(topic, config); - this.offsetLog = createKafkaBasedLog(topic, producerProps, consumerProps, consumedCallback, topicDescription, adminSupplier, config, Time.SYSTEM); + this.offsetLog = createKafkaBasedLog(topic, producerProps, consumerProps, consumedCallback, topicDescription, topicAdminSupplier, config, Time.SYSTEM); } protected NewTopic newTopicDescription(final String topic, final WorkerConfig config) { @@ -268,13 +258,7 @@ public class KafkaOffsetBackingStore extends KafkaTopicBasedBackingStore impleme @Override public void stop() { log.info("Stopping KafkaOffsetBackingStore"); - try { - offsetLog.stop(); - } finally { - if (ownTopicAdmin != null) { - ownTopicAdmin.close(); - } - } + offsetLog.stop(); log.info("Stopped KafkaOffsetBackingStore"); } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java index 0ffc3eed4f3..4f2d832fc83 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java @@ -44,7 +44,6 @@ import org.apache.kafka.connect.util.Callback; import org.apache.kafka.connect.util.ConnectUtils; import org.apache.kafka.connect.util.ConnectorTaskId; import org.apache.kafka.connect.util.KafkaBasedLog; -import org.apache.kafka.connect.util.SharedTopicAdmin; import org.apache.kafka.connect.util.Table; import org.apache.kafka.connect.util.TopicAdmin; import org.slf4j.Logger; @@ -139,14 +138,8 @@ public class KafkaStatusBackingStore extends KafkaTopicBasedBackingStore impleme private String statusTopic; private KafkaBasedLog kafkaLog; private int generation; - private SharedTopicAdmin ownTopicAdmin; private ExecutorService sendRetryExecutor; - @Deprecated - public KafkaStatusBackingStore(Time time, Converter converter) { - this(time, converter, null, "connect-distributed-"); - } - public KafkaStatusBackingStore(Time time, Converter converter, Supplier topicAdminSupplier, String clientIdBase) { this.time = time; this.converter = converter; @@ -158,8 +151,9 @@ public class KafkaStatusBackingStore extends KafkaTopicBasedBackingStore impleme } // visible for testing - KafkaStatusBackingStore(Time time, Converter converter, String statusTopic, KafkaBasedLog kafkaLog) { - this(time, converter); + KafkaStatusBackingStore(Time time, Converter converter, String statusTopic, Supplier topicAdminSupplier, + KafkaBasedLog kafkaLog) { + this(time, converter, null, "connect-distributed-"); this.kafkaLog = kafkaLog; this.statusTopic = statusTopic; sendRetryExecutor = Executors.newSingleThreadExecutor( @@ -199,14 +193,6 @@ public class KafkaStatusBackingStore extends KafkaTopicBasedBackingStore impleme Map adminProps = new HashMap<>(originals); adminProps.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId); ConnectUtils.addMetricsContextProperties(adminProps, config, clusterId); - Supplier adminSupplier; - if (topicAdminSupplier != null) { - adminSupplier = topicAdminSupplier; - } else { - // Create our own topic admin supplier that we'll close when we're stopped - ownTopicAdmin = new SharedTopicAdmin(adminProps); - adminSupplier = ownTopicAdmin; - } Map topicSettings = config instanceof DistributedConfig ? ((DistributedConfig) config).statusStorageTopicSettings() @@ -219,7 +205,7 @@ public class KafkaStatusBackingStore extends KafkaTopicBasedBackingStore impleme .build(); Callback> readCallback = (error, record) -> read(record); - this.kafkaLog = createKafkaBasedLog(statusTopic, producerProps, consumerProps, readCallback, topicDescription, adminSupplier, config, time); + this.kafkaLog = createKafkaBasedLog(statusTopic, producerProps, consumerProps, readCallback, topicDescription, topicAdminSupplier, config, time); } @Override @@ -236,9 +222,6 @@ public class KafkaStatusBackingStore extends KafkaTopicBasedBackingStore impleme kafkaLog.stop(); } finally { ThreadUtils.shutdownExecutorServiceQuietly(sendRetryExecutor, 10, TimeUnit.SECONDS); - if (ownTopicAdmin != null) { - ownTopicAdmin.close(); - } } } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreMockitoTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreMockitoTest.java index 34827ba7dae..6c9057a3517 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreMockitoTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreMockitoTest.java @@ -221,7 +221,7 @@ public class KafkaConfigBackingStoreMockitoTest { doReturn("test-cluster").when(config).kafkaClusterId(); configStorage = Mockito.spy( new KafkaConfigBackingStore( - converter, config, null, null, CLIENT_ID_BASE, time) + converter, config, null, () -> null, CLIENT_ID_BASE, time) ); configStorage.setConfigLog(configLog); configStorage.setUpdateListener(configUpdateListener); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java index 61e73df2706..fc2caf75d9b 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java @@ -178,10 +178,11 @@ public class KafkaConfigBackingStoreTest { // The kafkaClusterId is used in the constructor for KafkaConfigBackingStore // So temporarily enter replay mode in order to mock that call EasyMock.replay(config); + Supplier topicAdminSupplier = () -> null; configStorage = PowerMock.createPartialMock( KafkaConfigBackingStore.class, new String[]{"createKafkaBasedLog", "createFencableProducer"}, - converter, config, null, null, CLIENT_ID_BASE, time); + converter, config, null, topicAdminSupplier, CLIENT_ID_BASE, time); Whitebox.setInternalState(configStorage, "configLog", storeLog); configStorage.setUpdateListener(configUpdateListener); // The mock must be reset and re-mocked for the remainder of the test. diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaStatusBackingStoreFormatTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaStatusBackingStoreFormatTest.java index a13b76e400f..ee53a494204 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaStatusBackingStoreFormatTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaStatusBackingStoreFormatTest.java @@ -64,16 +64,15 @@ public class KafkaStatusBackingStoreFormatTest { private Time time; private KafkaStatusBackingStore store; - private JsonConverter converter; - private KafkaBasedLog kafkaBasedLog = mock(KafkaBasedLog.class); + private final KafkaBasedLog kafkaBasedLog = mock(KafkaBasedLog.class); @Before public void setup() { time = new MockTime(); - converter = new JsonConverter(); + JsonConverter converter = new JsonConverter(); converter.configure(Collections.singletonMap(SCHEMAS_ENABLE_CONFIG, false), false); - store = new KafkaStatusBackingStore(new MockTime(), converter, STATUS_TOPIC, kafkaBasedLog); + store = new KafkaStatusBackingStore(new MockTime(), converter, STATUS_TOPIC, () -> null, kafkaBasedLog); } @Test diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaStatusBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaStatusBackingStoreTest.java index 11a4b378c96..76002b76371 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaStatusBackingStoreTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaStatusBackingStoreTest.java @@ -80,7 +80,7 @@ public class KafkaStatusBackingStoreTest { @Before public void setup() { - store = new KafkaStatusBackingStore(new MockTime(), converter, STATUS_TOPIC, kafkaBasedLog); + store = new KafkaStatusBackingStore(new MockTime(), converter, STATUS_TOPIC, () -> null, kafkaBasedLog); } @Test