KAFKA-6586: Refactor ConnectDistributed and ConnectStandalone to re-use shared logic (#12947)

Reviewers: Christo Lolov  <christo_lolov@yahoo.com>, Chris Egerton <chrise@aiven.io>
This commit is contained in:
Yash Mayya 2022-12-21 23:43:56 +05:30 committed by GitHub
parent 2dcf306ef8
commit a286891566
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 239 additions and 157 deletions

View File

@ -0,0 +1,153 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.cli;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.runtime.Connect;
import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.WorkerInfo;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.runtime.rest.RestClient;
import org.apache.kafka.connect.runtime.rest.RestServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.URI;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
/**
* Common initialization logic for Kafka Connect, intended for use by command line utilities
*
* @param <T> the type of {@link WorkerConfig} to be used
*/
public abstract class AbstractConnectCli<T extends WorkerConfig> {
private static final Logger log = LoggerFactory.getLogger(AbstractConnectCli.class);
private final String[] args;
private final Time time = Time.SYSTEM;
/**
*
* @param args the CLI arguments to be processed. Note that if one or more arguments are passed, the first argument is
* assumed to be the Connect worker properties file and is processed in {@link #run()}. The remaining arguments
* can be handled in {@link #processExtraArgs(Herder, Connect, String[])}
*/
protected AbstractConnectCli(String... args) {
this.args = args;
}
protected abstract String usage();
/**
* The first CLI argument is assumed to be the Connect worker properties file and is processed by default. This method
* can be overridden if there are more arguments that need to be processed.
*
* @param herder the {@link Herder} instance that can be used to perform operations on the Connect cluster
* @param connect the {@link Connect} instance that can be stopped (via {@link Connect#stop()}) if there's an error
* encountered while processing the additional CLI arguments.
* @param extraArgs the extra CLI arguments that need to be processed
*/
protected void processExtraArgs(Herder herder, Connect connect, String[] extraArgs) {
}
protected abstract Herder createHerder(T config, String workerId, Plugins plugins,
ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy,
RestServer restServer, RestClient restClient);
protected abstract T createConfig(Map<String, String> workerProps);
/**
* Validate {@link #args}, process worker properties from the first CLI argument, and start {@link Connect}
*/
public void run() {
if (args.length < 1 || Arrays.asList(args).contains("--help")) {
log.info("Usage: {}", usage());
Exit.exit(1);
}
try {
String workerPropsFile = args[0];
Map<String, String> workerProps = !workerPropsFile.isEmpty() ?
Utils.propsToStringMap(Utils.loadProps(workerPropsFile)) : Collections.emptyMap();
String[] extraArgs = Arrays.copyOfRange(args, 1, args.length);
Connect connect = startConnect(workerProps, extraArgs);
// Shutdown will be triggered by Ctrl-C or via HTTP shutdown request
connect.awaitStop();
} catch (Throwable t) {
log.error("Stopping due to error", t);
Exit.exit(2);
}
}
/**
* Initialize and start an instance of {@link Connect}
*
* @param workerProps the worker properties map used to initialize the {@link WorkerConfig}
* @param extraArgs any additional CLI arguments that may need to be processed via
* {@link #processExtraArgs(Herder, Connect, String[])}
* @return a started instance of {@link Connect}
*/
public Connect startConnect(Map<String, String> workerProps, String... extraArgs) {
log.info("Kafka Connect worker initializing ...");
long initStart = time.hiResClockMs();
WorkerInfo initInfo = new WorkerInfo();
initInfo.logAll();
log.info("Scanning for plugin classes. This might take a moment ...");
Plugins plugins = new Plugins(workerProps);
plugins.compareAndSwapWithDelegatingLoader();
T config = createConfig(workerProps);
log.debug("Kafka cluster ID: {}", config.kafkaClusterId());
RestClient restClient = new RestClient(config);
RestServer restServer = new RestServer(config, restClient);
restServer.initializeServer();
URI advertisedUrl = restServer.advertisedUrl();
String workerId = advertisedUrl.getHost() + ":" + advertisedUrl.getPort();
ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy = plugins.newPlugin(
config.getString(WorkerConfig.CONNECTOR_CLIENT_POLICY_CLASS_CONFIG),
config, ConnectorClientConfigOverridePolicy.class);
Herder herder = createHerder(config, workerId, plugins, connectorClientConfigOverridePolicy, restServer, restClient);
final Connect connect = new Connect(herder, restServer);
log.info("Kafka Connect worker initialization took {}ms", time.hiResClockMs() - initStart);
try {
connect.start();
} catch (Exception e) {
log.error("Failed to start Connect", e);
connect.stop();
Exit.exit(3);
}
processExtraArgs(herder, connect, extraArgs);
return connect;
}
}

View File

@ -16,15 +16,11 @@
*/ */
package org.apache.kafka.connect.cli; package org.apache.kafka.connect.cli;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy; import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.runtime.Connect; import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.Worker; import org.apache.kafka.connect.runtime.Worker;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.WorkerConfigTransformer; import org.apache.kafka.connect.runtime.WorkerConfigTransformer;
import org.apache.kafka.connect.runtime.WorkerInfo;
import org.apache.kafka.connect.runtime.distributed.DistributedConfig; import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.apache.kafka.connect.runtime.distributed.DistributedHerder; import org.apache.kafka.connect.runtime.distributed.DistributedHerder;
import org.apache.kafka.connect.runtime.isolation.Plugins; import org.apache.kafka.connect.runtime.isolation.Plugins;
@ -41,9 +37,6 @@ import org.apache.kafka.connect.util.SharedTopicAdmin;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.net.URI;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
@ -51,65 +44,32 @@ import static org.apache.kafka.clients.CommonClientConfigs.CLIENT_ID_CONFIG;
/** /**
* <p> * <p>
* Command line utility that runs Kafka Connect in distributed mode. In this mode, the process joints a group of other workers * Command line utility that runs Kafka Connect in distributed mode. In this mode, the process joins a group of other
* and work is distributed among them. This is useful for running Connect as a service, where connectors can be * workers and work (connectors and tasks) is distributed among them. This is useful for running Connect as a service,
* submitted to the cluster to be automatically executed in a scalable, distributed fashion. This also allows you to * where connectors can be submitted to the cluster to be automatically executed in a scalable, distributed fashion.
* easily scale out horizontally, elastically adding or removing capacity simply by starting or stopping worker * This also allows you to easily scale out horizontally, elastically adding or removing capacity simply by starting or
* instances. * stopping worker instances.
* </p> * </p>
*/ */
public class ConnectDistributed { public class ConnectDistributed extends AbstractConnectCli<DistributedConfig> {
private static final Logger log = LoggerFactory.getLogger(ConnectDistributed.class); private static final Logger log = LoggerFactory.getLogger(ConnectDistributed.class);
private final Time time = Time.SYSTEM; public ConnectDistributed(String... args) {
private final long initStart = time.hiResClockMs(); super(args);
public static void main(String[] args) {
if (args.length < 1 || Arrays.asList(args).contains("--help")) {
log.info("Usage: ConnectDistributed worker.properties");
Exit.exit(1);
}
try {
WorkerInfo initInfo = new WorkerInfo();
initInfo.logAll();
String workerPropsFile = args[0];
Map<String, String> workerProps = !workerPropsFile.isEmpty() ?
Utils.propsToStringMap(Utils.loadProps(workerPropsFile)) : Collections.emptyMap();
ConnectDistributed connectDistributed = new ConnectDistributed();
Connect connect = connectDistributed.startConnect(workerProps);
// Shutdown will be triggered by Ctrl-C or via HTTP shutdown request
connect.awaitStop();
} catch (Throwable t) {
log.error("Stopping due to error", t);
Exit.exit(2);
}
} }
public Connect startConnect(Map<String, String> workerProps) { @Override
log.info("Scanning for plugin classes. This might take a moment ..."); protected String usage() {
Plugins plugins = new Plugins(workerProps); return "ConnectDistributed worker.properties";
plugins.compareAndSwapWithDelegatingLoader(); }
DistributedConfig config = new DistributedConfig(workerProps);
@Override
protected Herder createHerder(DistributedConfig config, String workerId, Plugins plugins,
ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy,
RestServer restServer, RestClient restClient) {
String kafkaClusterId = config.kafkaClusterId(); String kafkaClusterId = config.kafkaClusterId();
log.debug("Kafka cluster ID: {}", kafkaClusterId);
RestClient restClient = new RestClient(config);
RestServer rest = new RestServer(config, restClient);
rest.initializeServer();
URI advertisedUrl = rest.advertisedUrl();
String workerId = advertisedUrl.getHost() + ":" + advertisedUrl.getPort();
String clientIdBase = ConnectUtils.clientIdBase(config); String clientIdBase = ConnectUtils.clientIdBase(config);
// Create the admin client to be shared by all backing stores. // Create the admin client to be shared by all backing stores.
Map<String, Object> adminProps = new HashMap<>(config.originals()); Map<String, Object> adminProps = new HashMap<>(config.originals());
ConnectUtils.addMetricsContextProperties(adminProps, config, kafkaClusterId); ConnectUtils.addMetricsContextProperties(adminProps, config, kafkaClusterId);
@ -119,15 +79,11 @@ public class ConnectDistributed {
KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore(sharedAdmin, () -> clientIdBase); KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore(sharedAdmin, () -> clientIdBase);
offsetBackingStore.configure(config); offsetBackingStore.configure(config);
ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy = plugins.newPlugin( Worker worker = new Worker(workerId, Time.SYSTEM, plugins, config, offsetBackingStore, connectorClientConfigOverridePolicy);
config.getString(WorkerConfig.CONNECTOR_CLIENT_POLICY_CLASS_CONFIG),
config, ConnectorClientConfigOverridePolicy.class);
Worker worker = new Worker(workerId, time, plugins, config, offsetBackingStore, connectorClientConfigOverridePolicy);
WorkerConfigTransformer configTransformer = worker.configTransformer(); WorkerConfigTransformer configTransformer = worker.configTransformer();
Converter internalValueConverter = worker.getInternalValueConverter(); Converter internalValueConverter = worker.getInternalValueConverter();
StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, internalValueConverter, sharedAdmin, clientIdBase); StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(Time.SYSTEM, internalValueConverter, sharedAdmin, clientIdBase);
statusBackingStore.configure(config); statusBackingStore.configure(config);
ConfigBackingStore configBackingStore = new KafkaConfigBackingStore( ConfigBackingStore configBackingStore = new KafkaConfigBackingStore(
@ -139,21 +95,18 @@ public class ConnectDistributed {
// Pass the shared admin to the distributed herder as an additional AutoCloseable object that should be closed when the // Pass the shared admin to the distributed herder as an additional AutoCloseable object that should be closed when the
// herder is stopped. This is easier than having to track and own the lifecycle ourselves. // herder is stopped. This is easier than having to track and own the lifecycle ourselves.
DistributedHerder herder = new DistributedHerder(config, time, worker, return new DistributedHerder(config, Time.SYSTEM, worker,
kafkaClusterId, statusBackingStore, configBackingStore, kafkaClusterId, statusBackingStore, configBackingStore,
advertisedUrl.toString(), restClient, connectorClientConfigOverridePolicy, sharedAdmin); restServer.advertisedUrl().toString(), restClient, connectorClientConfigOverridePolicy, sharedAdmin);
final Connect connect = new Connect(herder, rest);
log.info("Kafka Connect distributed worker initialization took {}ms", time.hiResClockMs() - initStart);
try {
connect.start();
} catch (Exception e) {
log.error("Failed to start Connect", e);
connect.stop();
Exit.exit(3);
}
return connect;
} }
@Override
protected DistributedConfig createConfig(Map<String, String> workerProps) {
return new DistributedConfig(workerProps);
}
public static void main(String[] args) {
ConnectDistributed connectDistributed = new ConnectDistributed(args);
connectDistributed.run();
}
} }

View File

@ -24,9 +24,8 @@ import org.apache.kafka.connect.runtime.Connect;
import org.apache.kafka.connect.runtime.ConnectorConfig; import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.Herder; import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.Worker; import org.apache.kafka.connect.runtime.Worker;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.WorkerInfo;
import org.apache.kafka.connect.runtime.isolation.Plugins; import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.runtime.rest.RestClient;
import org.apache.kafka.connect.runtime.rest.RestServer; import org.apache.kafka.connect.runtime.rest.RestServer;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo; import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
import org.apache.kafka.connect.runtime.standalone.StandaloneConfig; import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
@ -37,98 +36,75 @@ import org.apache.kafka.connect.util.FutureCallback;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.net.URI;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map; import java.util.Map;
/** /**
* <p> * <p>
* Command line utility that runs Kafka Connect as a standalone process. In this mode, work is not * Command line utility that runs Kafka Connect as a standalone process. In this mode, work (connectors and tasks) is not
* distributed. Instead, all the normal Connect machinery works within a single process. This is * distributed. Instead, all the normal Connect machinery works within a single process. This is useful for ad hoc,
* useful for ad hoc, small, or experimental jobs. * small, or experimental jobs.
* </p> * </p>
* <p> * <p>
* By default, no job configs or offset data is persistent. You can make jobs persistent and * Connector and task configs are stored in memory and are not persistent. However, connector offset data is persistent
* fault tolerant by overriding the settings to use file storage for both. * since it uses file storage (configurable via {@link StandaloneConfig#OFFSET_STORAGE_FILE_FILENAME_CONFIG})
* </p> * </p>
*/ */
public class ConnectStandalone { public class ConnectStandalone extends AbstractConnectCli<StandaloneConfig> {
private static final Logger log = LoggerFactory.getLogger(ConnectStandalone.class); private static final Logger log = LoggerFactory.getLogger(ConnectStandalone.class);
public static void main(String[] args) { protected ConnectStandalone(String... args) {
super(args);
}
if (args.length < 1 || Arrays.asList(args).contains("--help")) { @Override
log.info("Usage: ConnectStandalone worker.properties [connector1.properties connector2.properties ...]"); protected String usage() {
Exit.exit(1); return "ConnectStandalone worker.properties [connector1.properties connector2.properties ...]";
} }
@Override
protected void processExtraArgs(Herder herder, Connect connect, String[] extraArgs) {
try { try {
Time time = Time.SYSTEM; for (final String connectorPropsFile : extraArgs) {
log.info("Kafka Connect standalone worker initializing ..."); Map<String, String> connectorProps = Utils.propsToStringMap(Utils.loadProps(connectorPropsFile));
long initStart = time.hiResClockMs(); FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>((error, info) -> {
WorkerInfo initInfo = new WorkerInfo(); if (error != null)
initInfo.logAll(); log.error("Failed to create connector for {}", connectorPropsFile);
else
String workerPropsFile = args[0]; log.info("Created connector {}", info.result().name());
Map<String, String> workerProps = !workerPropsFile.isEmpty() ? });
Utils.propsToStringMap(Utils.loadProps(workerPropsFile)) : Collections.emptyMap(); herder.putConnectorConfig(
connectorProps.get(ConnectorConfig.NAME_CONFIG),
log.info("Scanning for plugin classes. This might take a moment ..."); connectorProps, false, cb);
Plugins plugins = new Plugins(workerProps); cb.get();
plugins.compareAndSwapWithDelegatingLoader();
StandaloneConfig config = new StandaloneConfig(workerProps);
String kafkaClusterId = config.kafkaClusterId();
log.debug("Kafka cluster ID: {}", kafkaClusterId);
// Do not initialize a RestClient because the ConnectorsResource will not use it in standalone mode.
RestServer rest = new RestServer(config, null);
rest.initializeServer();
URI advertisedUrl = rest.advertisedUrl();
String workerId = advertisedUrl.getHost() + ":" + advertisedUrl.getPort();
OffsetBackingStore offsetBackingStore = new FileOffsetBackingStore();
offsetBackingStore.configure(config);
ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy = plugins.newPlugin(
config.getString(WorkerConfig.CONNECTOR_CLIENT_POLICY_CLASS_CONFIG),
config, ConnectorClientConfigOverridePolicy.class);
Worker worker = new Worker(workerId, time, plugins, config, offsetBackingStore,
connectorClientConfigOverridePolicy);
Herder herder = new StandaloneHerder(worker, kafkaClusterId, connectorClientConfigOverridePolicy);
final Connect connect = new Connect(herder, rest);
log.info("Kafka Connect standalone worker initialization took {}ms", time.hiResClockMs() - initStart);
try {
connect.start();
for (final String connectorPropsFile : Arrays.copyOfRange(args, 1, args.length)) {
Map<String, String> connectorProps = Utils.propsToStringMap(Utils.loadProps(connectorPropsFile));
FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>((error, info) -> {
if (error != null)
log.error("Failed to create job for {}", connectorPropsFile);
else
log.info("Created connector {}", info.result().name());
});
herder.putConnectorConfig(
connectorProps.get(ConnectorConfig.NAME_CONFIG),
connectorProps, false, cb);
cb.get();
}
} catch (Throwable t) {
log.error("Stopping after connector error", t);
connect.stop();
Exit.exit(3);
} }
// Shutdown will be triggered by Ctrl-C or via HTTP shutdown request
connect.awaitStop();
} catch (Throwable t) { } catch (Throwable t) {
log.error("Stopping due to error", t); log.error("Stopping after connector error", t);
Exit.exit(2); connect.stop();
Exit.exit(3);
} }
} }
@Override
protected Herder createHerder(StandaloneConfig config, String workerId, Plugins plugins,
ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy,
RestServer restServer, RestClient restClient) {
OffsetBackingStore offsetBackingStore = new FileOffsetBackingStore();
offsetBackingStore.configure(config);
Worker worker = new Worker(workerId, Time.SYSTEM, plugins, config, offsetBackingStore,
connectorClientConfigOverridePolicy);
return new StandaloneHerder(worker, config.kafkaClusterId(), connectorClientConfigOverridePolicy);
}
@Override
protected StandaloneConfig createConfig(Map<String, String> workerProps) {
return new StandaloneConfig(workerProps);
}
public static void main(String[] args) {
ConnectStandalone connectStandalone = new ConnectStandalone(args);
connectStandalone.run();
}
} }