mirror of https://github.com/apache/kafka.git
MINOR: Remove deprecated constructors from Connect's Kafka*BackingStore classes (#15865)
- These constructors were deprecated over 3 years ago in KAFKA-10021: Changed Kafka backing stores to use shared admin client to get end offsets and create topics #9780. - While these classes are not a part of Connect's public API, deprecation was still introduced instead of outright removal because they are useful utility classes that might've been used outside of Connect. - The KafkaOffsetBackingStore's deprecated constructor was removed in KAFKA-14785: Connect offset read REST API #13434. - This patch removes the deprecated constructors for KafkaConfigBackingStore and KafkaStatusBackingStore. Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
parent
93a5efc4b7
commit
6aac009a2d
|
@ -53,7 +53,6 @@ import org.apache.kafka.connect.util.Callback;
|
|||
import org.apache.kafka.connect.util.ConnectUtils;
|
||||
import org.apache.kafka.connect.util.ConnectorTaskId;
|
||||
import org.apache.kafka.connect.util.KafkaBasedLog;
|
||||
import org.apache.kafka.connect.util.SharedTopicAdmin;
|
||||
import org.apache.kafka.connect.util.TopicAdmin;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -300,7 +299,6 @@ public class KafkaConfigBackingStore extends KafkaTopicBasedBackingStore impleme
|
|||
final Map<String, Map<String, String>> connectorConfigs = new HashMap<>();
|
||||
final Map<ConnectorTaskId, Map<String, String>> taskConfigs = new HashMap<>();
|
||||
private final Supplier<TopicAdmin> topicAdminSupplier;
|
||||
private SharedTopicAdmin ownTopicAdmin;
|
||||
private final String clientId;
|
||||
|
||||
// Set of connectors where we saw a task commit with an incomplete set of task config updates, indicating the data
|
||||
|
@ -334,11 +332,6 @@ public class KafkaConfigBackingStore extends KafkaTopicBasedBackingStore impleme
|
|||
this.configLog = configLog;
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
public KafkaConfigBackingStore(Converter converter, DistributedConfig config, WorkerConfigTransformer configTransformer) {
|
||||
this(converter, config, configTransformer, null, "connect-distributed-");
|
||||
}
|
||||
|
||||
public KafkaConfigBackingStore(Converter converter, DistributedConfig config, WorkerConfigTransformer configTransformer, Supplier<TopicAdmin> adminSupplier, String clientIdBase) {
|
||||
this(converter, config, configTransformer, adminSupplier, clientIdBase, Time.SYSTEM);
|
||||
}
|
||||
|
@ -411,7 +404,6 @@ public class KafkaConfigBackingStore extends KafkaTopicBasedBackingStore impleme
|
|||
log.info("Closing KafkaConfigBackingStore");
|
||||
|
||||
relinquishWritePrivileges();
|
||||
Utils.closeQuietly(ownTopicAdmin, "admin for config topic");
|
||||
Utils.closeQuietly(configLog::stop, "KafkaBasedLog for config topic");
|
||||
|
||||
log.info("Closed KafkaConfigBackingStore");
|
||||
|
@ -794,14 +786,7 @@ public class KafkaConfigBackingStore extends KafkaTopicBasedBackingStore impleme
|
|||
Map<String, Object> adminProps = new HashMap<>(originals);
|
||||
ConnectUtils.addMetricsContextProperties(adminProps, config, clusterId);
|
||||
adminProps.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId);
|
||||
Supplier<TopicAdmin> adminSupplier;
|
||||
if (topicAdminSupplier != null) {
|
||||
adminSupplier = topicAdminSupplier;
|
||||
} else {
|
||||
// Create our own topic admin supplier that we'll close when we're stopped
|
||||
ownTopicAdmin = new SharedTopicAdmin(adminProps);
|
||||
adminSupplier = ownTopicAdmin;
|
||||
}
|
||||
|
||||
Map<String, Object> topicSettings = config instanceof DistributedConfig
|
||||
? ((DistributedConfig) config).configStorageTopicSettings()
|
||||
: Collections.emptyMap();
|
||||
|
@ -812,7 +797,7 @@ public class KafkaConfigBackingStore extends KafkaTopicBasedBackingStore impleme
|
|||
.replicationFactor(config.getShort(DistributedConfig.CONFIG_STORAGE_REPLICATION_FACTOR_CONFIG))
|
||||
.build();
|
||||
|
||||
return createKafkaBasedLog(topic, producerProps, consumerProps, new ConsumeCallback(), topicDescription, adminSupplier, config, time);
|
||||
return createKafkaBasedLog(topic, producerProps, consumerProps, new ConsumeCallback(), topicDescription, topicAdminSupplier, config, time);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -37,7 +37,6 @@ import org.apache.kafka.connect.util.Callback;
|
|||
import org.apache.kafka.connect.util.ConnectUtils;
|
||||
import org.apache.kafka.connect.util.ConvertingFutureCallback;
|
||||
import org.apache.kafka.connect.util.KafkaBasedLog;
|
||||
import org.apache.kafka.connect.util.SharedTopicAdmin;
|
||||
import org.apache.kafka.connect.util.TopicAdmin;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -149,7 +148,6 @@ public class KafkaOffsetBackingStore extends KafkaTopicBasedBackingStore impleme
|
|||
private Converter keyConverter;
|
||||
private final Supplier<TopicAdmin> topicAdminSupplier;
|
||||
private final Supplier<String> clientIdBase;
|
||||
private SharedTopicAdmin ownTopicAdmin;
|
||||
protected boolean exactlyOnce;
|
||||
|
||||
/**
|
||||
|
@ -211,17 +209,9 @@ public class KafkaOffsetBackingStore extends KafkaTopicBasedBackingStore impleme
|
|||
Map<String, Object> adminProps = new HashMap<>(originals);
|
||||
adminProps.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId);
|
||||
ConnectUtils.addMetricsContextProperties(adminProps, config, clusterId);
|
||||
Supplier<TopicAdmin> adminSupplier;
|
||||
if (topicAdminSupplier != null) {
|
||||
adminSupplier = topicAdminSupplier;
|
||||
} else {
|
||||
// Create our own topic admin supplier that we'll close when we're stopped
|
||||
this.ownTopicAdmin = new SharedTopicAdmin(adminProps);
|
||||
adminSupplier = ownTopicAdmin;
|
||||
}
|
||||
NewTopic topicDescription = newTopicDescription(topic, config);
|
||||
|
||||
this.offsetLog = createKafkaBasedLog(topic, producerProps, consumerProps, consumedCallback, topicDescription, adminSupplier, config, Time.SYSTEM);
|
||||
this.offsetLog = createKafkaBasedLog(topic, producerProps, consumerProps, consumedCallback, topicDescription, topicAdminSupplier, config, Time.SYSTEM);
|
||||
}
|
||||
|
||||
protected NewTopic newTopicDescription(final String topic, final WorkerConfig config) {
|
||||
|
@ -268,13 +258,7 @@ public class KafkaOffsetBackingStore extends KafkaTopicBasedBackingStore impleme
|
|||
@Override
|
||||
public void stop() {
|
||||
log.info("Stopping KafkaOffsetBackingStore");
|
||||
try {
|
||||
offsetLog.stop();
|
||||
} finally {
|
||||
if (ownTopicAdmin != null) {
|
||||
ownTopicAdmin.close();
|
||||
}
|
||||
}
|
||||
offsetLog.stop();
|
||||
log.info("Stopped KafkaOffsetBackingStore");
|
||||
}
|
||||
|
||||
|
|
|
@ -44,7 +44,6 @@ import org.apache.kafka.connect.util.Callback;
|
|||
import org.apache.kafka.connect.util.ConnectUtils;
|
||||
import org.apache.kafka.connect.util.ConnectorTaskId;
|
||||
import org.apache.kafka.connect.util.KafkaBasedLog;
|
||||
import org.apache.kafka.connect.util.SharedTopicAdmin;
|
||||
import org.apache.kafka.connect.util.Table;
|
||||
import org.apache.kafka.connect.util.TopicAdmin;
|
||||
import org.slf4j.Logger;
|
||||
|
@ -139,14 +138,8 @@ public class KafkaStatusBackingStore extends KafkaTopicBasedBackingStore impleme
|
|||
private String statusTopic;
|
||||
private KafkaBasedLog<String, byte[]> kafkaLog;
|
||||
private int generation;
|
||||
private SharedTopicAdmin ownTopicAdmin;
|
||||
private ExecutorService sendRetryExecutor;
|
||||
|
||||
@Deprecated
|
||||
public KafkaStatusBackingStore(Time time, Converter converter) {
|
||||
this(time, converter, null, "connect-distributed-");
|
||||
}
|
||||
|
||||
public KafkaStatusBackingStore(Time time, Converter converter, Supplier<TopicAdmin> topicAdminSupplier, String clientIdBase) {
|
||||
this.time = time;
|
||||
this.converter = converter;
|
||||
|
@ -158,8 +151,9 @@ public class KafkaStatusBackingStore extends KafkaTopicBasedBackingStore impleme
|
|||
}
|
||||
|
||||
// visible for testing
|
||||
KafkaStatusBackingStore(Time time, Converter converter, String statusTopic, KafkaBasedLog<String, byte[]> kafkaLog) {
|
||||
this(time, converter);
|
||||
KafkaStatusBackingStore(Time time, Converter converter, String statusTopic, Supplier<TopicAdmin> topicAdminSupplier,
|
||||
KafkaBasedLog<String, byte[]> kafkaLog) {
|
||||
this(time, converter, null, "connect-distributed-");
|
||||
this.kafkaLog = kafkaLog;
|
||||
this.statusTopic = statusTopic;
|
||||
sendRetryExecutor = Executors.newSingleThreadExecutor(
|
||||
|
@ -199,14 +193,6 @@ public class KafkaStatusBackingStore extends KafkaTopicBasedBackingStore impleme
|
|||
Map<String, Object> adminProps = new HashMap<>(originals);
|
||||
adminProps.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId);
|
||||
ConnectUtils.addMetricsContextProperties(adminProps, config, clusterId);
|
||||
Supplier<TopicAdmin> adminSupplier;
|
||||
if (topicAdminSupplier != null) {
|
||||
adminSupplier = topicAdminSupplier;
|
||||
} else {
|
||||
// Create our own topic admin supplier that we'll close when we're stopped
|
||||
ownTopicAdmin = new SharedTopicAdmin(adminProps);
|
||||
adminSupplier = ownTopicAdmin;
|
||||
}
|
||||
|
||||
Map<String, Object> topicSettings = config instanceof DistributedConfig
|
||||
? ((DistributedConfig) config).statusStorageTopicSettings()
|
||||
|
@ -219,7 +205,7 @@ public class KafkaStatusBackingStore extends KafkaTopicBasedBackingStore impleme
|
|||
.build();
|
||||
|
||||
Callback<ConsumerRecord<String, byte[]>> readCallback = (error, record) -> read(record);
|
||||
this.kafkaLog = createKafkaBasedLog(statusTopic, producerProps, consumerProps, readCallback, topicDescription, adminSupplier, config, time);
|
||||
this.kafkaLog = createKafkaBasedLog(statusTopic, producerProps, consumerProps, readCallback, topicDescription, topicAdminSupplier, config, time);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -236,9 +222,6 @@ public class KafkaStatusBackingStore extends KafkaTopicBasedBackingStore impleme
|
|||
kafkaLog.stop();
|
||||
} finally {
|
||||
ThreadUtils.shutdownExecutorServiceQuietly(sendRetryExecutor, 10, TimeUnit.SECONDS);
|
||||
if (ownTopicAdmin != null) {
|
||||
ownTopicAdmin.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -221,7 +221,7 @@ public class KafkaConfigBackingStoreMockitoTest {
|
|||
doReturn("test-cluster").when(config).kafkaClusterId();
|
||||
configStorage = Mockito.spy(
|
||||
new KafkaConfigBackingStore(
|
||||
converter, config, null, null, CLIENT_ID_BASE, time)
|
||||
converter, config, null, () -> null, CLIENT_ID_BASE, time)
|
||||
);
|
||||
configStorage.setConfigLog(configLog);
|
||||
configStorage.setUpdateListener(configUpdateListener);
|
||||
|
|
|
@ -178,10 +178,11 @@ public class KafkaConfigBackingStoreTest {
|
|||
// The kafkaClusterId is used in the constructor for KafkaConfigBackingStore
|
||||
// So temporarily enter replay mode in order to mock that call
|
||||
EasyMock.replay(config);
|
||||
Supplier<TopicAdmin> topicAdminSupplier = () -> null;
|
||||
configStorage = PowerMock.createPartialMock(
|
||||
KafkaConfigBackingStore.class,
|
||||
new String[]{"createKafkaBasedLog", "createFencableProducer"},
|
||||
converter, config, null, null, CLIENT_ID_BASE, time);
|
||||
converter, config, null, topicAdminSupplier, CLIENT_ID_BASE, time);
|
||||
Whitebox.setInternalState(configStorage, "configLog", storeLog);
|
||||
configStorage.setUpdateListener(configUpdateListener);
|
||||
// The mock must be reset and re-mocked for the remainder of the test.
|
||||
|
|
|
@ -64,16 +64,15 @@ public class KafkaStatusBackingStoreFormatTest {
|
|||
|
||||
private Time time;
|
||||
private KafkaStatusBackingStore store;
|
||||
private JsonConverter converter;
|
||||
|
||||
private KafkaBasedLog<String, byte[]> kafkaBasedLog = mock(KafkaBasedLog.class);
|
||||
private final KafkaBasedLog<String, byte[]> kafkaBasedLog = mock(KafkaBasedLog.class);
|
||||
|
||||
@Before
|
||||
public void setup() {
|
||||
time = new MockTime();
|
||||
converter = new JsonConverter();
|
||||
JsonConverter converter = new JsonConverter();
|
||||
converter.configure(Collections.singletonMap(SCHEMAS_ENABLE_CONFIG, false), false);
|
||||
store = new KafkaStatusBackingStore(new MockTime(), converter, STATUS_TOPIC, kafkaBasedLog);
|
||||
store = new KafkaStatusBackingStore(new MockTime(), converter, STATUS_TOPIC, () -> null, kafkaBasedLog);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -80,7 +80,7 @@ public class KafkaStatusBackingStoreTest {
|
|||
|
||||
@Before
|
||||
public void setup() {
|
||||
store = new KafkaStatusBackingStore(new MockTime(), converter, STATUS_TOPIC, kafkaBasedLog);
|
||||
store = new KafkaStatusBackingStore(new MockTime(), converter, STATUS_TOPIC, () -> null, kafkaBasedLog);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
Loading…
Reference in New Issue