MINOR: Start Connect REST server in standalone mode to match distributed mode (KAFKA-7503 follow-up)

Start the Rest server in the standalone mode similar to how it's done for distributed mode.

Author: Magesh Nandakumar <magesh.n.kumar@gmail.com>

Reviewers: Arjun Satish <arjun@confluent.io>, Ewen Cheslack-Postava <ewen@confluent.io>

Closes #6148 from mageshn/KAFKA-7826
This commit is contained in:
Magesh Nandakumar 2019-01-16 22:58:30 -08:00 committed by Ewen Cheslack-Postava
parent 9a9310d074
commit dec68c9350
1 changed files with 6 additions and 0 deletions

View File

@ -22,6 +22,7 @@ import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.runtime.Connect;
import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.HerderProvider;
import org.apache.kafka.connect.runtime.Worker;
import org.apache.kafka.connect.runtime.WorkerInfo;
import org.apache.kafka.connect.runtime.isolation.Plugins;
@ -82,6 +83,9 @@ public class ConnectStandalone {
log.debug("Kafka cluster ID: {}", kafkaClusterId);
RestServer rest = new RestServer(config);
HerderProvider provider = new HerderProvider();
rest.start(provider, plugins);
URI advertisedUrl = rest.advertisedUrl();
String workerId = advertisedUrl.getHost() + ":" + advertisedUrl.getPort();
@ -93,6 +97,8 @@ public class ConnectStandalone {
try {
connect.start();
// herder has initialized now, and ready to be used by the RestServer.
provider.setHerder(herder);
for (final String connectorPropsFile : Arrays.copyOfRange(args, 1, args.length)) {
Map<String, String> connectorProps = Utils.propsToStringMap(Utils.loadProps(connectorPropsFile));
FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>(new Callback<Herder.Created<ConnectorInfo>>() {