mirror of https://github.com/apache/kafka.git
Make Copycat CLI speific to standalone mode, clean up some config and get rid of config storage in standalone mode.
This commit is contained in:
parent
656a003894
commit
7bf807596a
|
@ -20,4 +20,4 @@ if [ "x$KAFKA_LOG4J_OPTS" = "x" ]; then
|
|||
export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:$base_dir/../config/copycat-log4j.properties"
|
||||
fi
|
||||
|
||||
exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.copycat.cli.Copycat "$@"
|
||||
exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.copycat.cli.CopycatStandalone "$@"
|
|
@ -21,4 +21,8 @@ value.converter=org.apache.kafka.copycat.json.JsonConverter
|
|||
key.serializer=org.apache.kafka.copycat.json.JsonSerializer
|
||||
value.serializer=org.apache.kafka.copycat.json.JsonSerializer
|
||||
key.deserializer=org.apache.kafka.copycat.json.JsonDeserializer
|
||||
value.deserializer=org.apache.kafka.copycat.json.JsonDeserializer
|
||||
value.deserializer=org.apache.kafka.copycat.json.JsonDeserializer
|
||||
|
||||
offset.storage.file.filename=/tmp/copycat.offsets
|
||||
# Flush much faster than normal, which is useful for testing/debugging
|
||||
offset.flush.interval.ms=10000
|
|
@ -1,31 +0,0 @@
|
|||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
bootstrap.servers=localhost:9092
|
||||
|
||||
key.converter=org.apache.kafka.copycat.json.JsonConverter
|
||||
value.converter=org.apache.kafka.copycat.json.JsonConverter
|
||||
key.serializer=org.apache.kafka.copycat.json.JsonSerializer
|
||||
value.serializer=org.apache.kafka.copycat.json.JsonSerializer
|
||||
key.deserializer=org.apache.kafka.copycat.json.JsonDeserializer
|
||||
value.deserializer=org.apache.kafka.copycat.json.JsonDeserializer
|
||||
|
||||
offset.storage.class=org.apache.kafka.copycat.storage.FileOffsetBackingStore
|
||||
offset.storage.file.filename=/tmp/copycat.offsets
|
||||
# Flush much faster than normal, which is useful for testing/debugging
|
||||
offset.flush.interval.ms=10000
|
||||
|
||||
coordinator.standalone.storage=org.apache.kafka.copycat.runtime.standalone.FileConfigStorage
|
||||
config.storage.file=/tmp/copycat.configs
|
|
@ -1,94 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
* <p/>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p/>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
**/
|
||||
|
||||
package org.apache.kafka.copycat.cli;
|
||||
|
||||
import org.apache.kafka.common.annotation.InterfaceStability;
|
||||
import org.apache.kafka.common.config.AbstractConfig;
|
||||
import org.apache.kafka.common.config.ConfigDef;
|
||||
import org.apache.kafka.common.config.ConfigDef.Importance;
|
||||
import org.apache.kafka.common.config.ConfigDef.Type;
|
||||
|
||||
import java.util.Properties;
|
||||
|
||||
@InterfaceStability.Unstable
|
||||
public class CopycatConfig extends AbstractConfig {
|
||||
|
||||
public static final String WORKER_PROPERTIES_CONFIG = "worker-config";
|
||||
public static final String WORKER_PROPERTIES_CONFIG_DEFAULT = "";
|
||||
private static final String WORKER_PROPERTIES_CONFIG_DOC =
|
||||
"Path to a properties file with worker configuration.";
|
||||
|
||||
public static final String CREATE_CONNECTORS_CONFIG = "create-connectors";
|
||||
public static final String CREATE_CONNECTORS_CONFIG_DEFAULT = "";
|
||||
private static final String CREATE_CONNECTORS_CONFIG_DOC =
|
||||
"List of paths to properties files with connector properties to use to create new connectors";
|
||||
|
||||
public static final String DELETE_CONNECTORS_CONFIG = "delete-connectors";
|
||||
public static final String DELETE_CONNECTORS_CONFIG_DEFAULT = "";
|
||||
private static final String DELETE_CONNECTORS_CONFIG_DOC = "List of names of a connectors to "
|
||||
+ "stop and delete.";
|
||||
|
||||
private static ConfigDef config;
|
||||
|
||||
static {
|
||||
config = new ConfigDef()
|
||||
.define(WORKER_PROPERTIES_CONFIG, Type.STRING, WORKER_PROPERTIES_CONFIG_DEFAULT,
|
||||
Importance.HIGH, WORKER_PROPERTIES_CONFIG_DOC)
|
||||
.define(CREATE_CONNECTORS_CONFIG, Type.LIST, CREATE_CONNECTORS_CONFIG_DEFAULT,
|
||||
Importance.HIGH, CREATE_CONNECTORS_CONFIG_DOC)
|
||||
.define(DELETE_CONNECTORS_CONFIG, Type.LIST, DELETE_CONNECTORS_CONFIG_DEFAULT,
|
||||
Importance.HIGH, DELETE_CONNECTORS_CONFIG_DOC);
|
||||
}
|
||||
|
||||
public CopycatConfig(Properties props) {
|
||||
super(config, props);
|
||||
}
|
||||
|
||||
/**
|
||||
* Parses command line arguments into a Properties object and instantiate a
|
||||
* CopycatConfig with it.
|
||||
* @param args
|
||||
* @return
|
||||
*/
|
||||
public static CopycatConfig parseCommandLineArgs(String[] args) {
|
||||
Properties props = new Properties();
|
||||
for (int i = 0; i < args.length; i++) {
|
||||
String arg = args[i];
|
||||
String key, value;
|
||||
|
||||
// Check for foo=bar or --foo=bar syntax
|
||||
if (arg.contains("=")) {
|
||||
String[] parts = arg.split("=", 1);
|
||||
key = parts[0];
|
||||
value = parts[1];
|
||||
} else {
|
||||
key = args[i];
|
||||
i += 1;
|
||||
value = args[i];
|
||||
}
|
||||
|
||||
// Check for -- prefix on key
|
||||
if (key.startsWith("--"))
|
||||
key = key.substring(2);
|
||||
|
||||
props.setProperty(key, value);
|
||||
}
|
||||
|
||||
return new CopycatConfig(props);
|
||||
}
|
||||
}
|
|
@ -18,7 +18,6 @@
|
|||
package org.apache.kafka.copycat.cli;
|
||||
|
||||
import org.apache.kafka.common.annotation.InterfaceStability;
|
||||
import org.apache.kafka.common.config.ConfigException;
|
||||
import org.apache.kafka.common.utils.Utils;
|
||||
import org.apache.kafka.copycat.runtime.Coordinator;
|
||||
import org.apache.kafka.copycat.runtime.Worker;
|
||||
|
@ -28,6 +27,7 @@ import org.apache.kafka.copycat.util.FutureCallback;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Properties;
|
||||
|
||||
/**
|
||||
|
@ -42,50 +42,29 @@ import java.util.Properties;
|
|||
* </p>
|
||||
*/
|
||||
@InterfaceStability.Unstable
|
||||
public class Copycat {
|
||||
private static final Logger log = LoggerFactory.getLogger(Copycat.class);
|
||||
public class CopycatStandalone {
|
||||
private static final Logger log = LoggerFactory.getLogger(CopycatStandalone.class);
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
CopycatConfig config;
|
||||
Properties workerProps;
|
||||
Properties connectorProps;
|
||||
|
||||
try {
|
||||
config = CopycatConfig.parseCommandLineArgs(args);
|
||||
} catch (ConfigException e) {
|
||||
log.error(e.getMessage());
|
||||
log.info("Usage: copycat [--worker-config worker.properties]"
|
||||
+ " [--create-connectors connector1.properties,connector2.properties,...]"
|
||||
+ " [--delete-connectors connector1-name,connector2-name,...]");
|
||||
if (args.length < 2) {
|
||||
log.info("Usage: CopycatStandalone worker.properties connector1.properties [connector2.properties ...]");
|
||||
System.exit(1);
|
||||
return;
|
||||
}
|
||||
|
||||
String workerPropsFile = config.getString(CopycatConfig.WORKER_PROPERTIES_CONFIG);
|
||||
String workerPropsFile = args[0];
|
||||
workerProps = !workerPropsFile.isEmpty() ? Utils.loadProps(workerPropsFile) : new Properties();
|
||||
|
||||
WorkerConfig workerConfig = new WorkerConfig(workerProps);
|
||||
Worker worker = new Worker(workerConfig);
|
||||
Coordinator coordinator = new StandaloneCoordinator(worker, workerConfig.getUnusedProperties());
|
||||
Coordinator coordinator = new StandaloneCoordinator(worker);
|
||||
final org.apache.kafka.copycat.runtime.Copycat copycat = new org.apache.kafka.copycat.runtime.Copycat(worker, coordinator);
|
||||
copycat.start();
|
||||
|
||||
try {
|
||||
// Destroy any requested connectors
|
||||
for (final String connName : config.getList(CopycatConfig.DELETE_CONNECTORS_CONFIG)) {
|
||||
FutureCallback<Void> cb = new FutureCallback<>(new Callback<Void>() {
|
||||
@Override
|
||||
public void onCompletion(Throwable error, Void result) {
|
||||
if (error != null)
|
||||
log.error("Failed to stop job {}", connName);
|
||||
}
|
||||
});
|
||||
coordinator.deleteConnector(connName, cb);
|
||||
cb.get();
|
||||
}
|
||||
|
||||
// Create any new connectors
|
||||
for (final String connectorPropsFile : config.getList(CopycatConfig.CREATE_CONNECTORS_CONFIG)) {
|
||||
for (final String connectorPropsFile : Arrays.copyOfRange(args, 1, args.length)) {
|
||||
connectorProps = Utils.loadProps(connectorPropsFile);
|
||||
FutureCallback<String> cb = new FutureCallback<>(new Callback<String>() {
|
||||
@Override
|
|
@ -82,12 +82,6 @@ public class WorkerConfig extends AbstractConfig {
|
|||
+ " not per task. All task have shutdown triggered, then they are waited on sequentially.";
|
||||
private static final String TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_DEFAULT = "5000";
|
||||
|
||||
public static final String OFFSET_STORAGE_CLASS_CONFIG = "offset.storage.class";
|
||||
private static final String OFFSET_STORAGE_CLASS_DOC =
|
||||
"OffsetBackingStore implementation to use for storing partition offset data";
|
||||
public static final String OFFSET_STORAGE_CLASS_DEFAULT
|
||||
= "org.apache.kafka.copycat.storage.MemoryOffsetBackingStore";
|
||||
|
||||
public static final String OFFSET_COMMIT_INTERVAL_MS_CONFIG = "offset.flush.interval.ms";
|
||||
private static final String OFFSET_COMMIT_INTERVAL_MS_DOC
|
||||
= "Interval at which to try committing offsets for tasks.";
|
||||
|
@ -122,8 +116,6 @@ public class WorkerConfig extends AbstractConfig {
|
|||
.define(TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG, Type.LONG,
|
||||
TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_DEFAULT, Importance.LOW,
|
||||
TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_DOC)
|
||||
.define(OFFSET_STORAGE_CLASS_CONFIG, Type.CLASS, OFFSET_STORAGE_CLASS_DEFAULT,
|
||||
Importance.LOW, OFFSET_STORAGE_CLASS_DOC)
|
||||
.define(OFFSET_COMMIT_INTERVAL_MS_CONFIG, Type.LONG, OFFSET_COMMIT_INTERVAL_MS_DEFAULT,
|
||||
Importance.LOW, OFFSET_COMMIT_INTERVAL_MS_DOC)
|
||||
.define(OFFSET_COMMIT_TIMEOUT_MS_CONFIG, Type.LONG, OFFSET_COMMIT_TIMEOUT_MS_DEFAULT,
|
||||
|
|
|
@ -65,9 +65,7 @@ public class Worker<K, V> {
|
|||
private SourceTaskOffsetCommitter sourceTaskOffsetCommitter;
|
||||
|
||||
public Worker(WorkerConfig config) {
|
||||
this(new SystemTime(), config,
|
||||
config.getConfiguredInstance(WorkerConfig.OFFSET_STORAGE_CLASS_CONFIG, OffsetBackingStore.class),
|
||||
null, null, null, null);
|
||||
this(new SystemTime(), config, null, null, null, null, null);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
|
@ -78,7 +76,13 @@ public class Worker<K, V> {
|
|||
this.config = config;
|
||||
this.keyConverter = config.getConfiguredInstance(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, Converter.class);
|
||||
this.valueConverter = config.getConfiguredInstance(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, Converter.class);
|
||||
this.offsetBackingStore = offsetBackingStore;
|
||||
|
||||
if (offsetBackingStore != null) {
|
||||
this.offsetBackingStore = offsetBackingStore;
|
||||
} else {
|
||||
this.offsetBackingStore = new FileOffsetBackingStore();
|
||||
this.offsetBackingStore.configure(config.originals());
|
||||
}
|
||||
|
||||
if (offsetKeySerializer != null) {
|
||||
this.offsetKeySerializer = offsetKeySerializer;
|
||||
|
|
|
@ -1,60 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
* <p/>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p/>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
**/
|
||||
|
||||
package org.apache.kafka.copycat.runtime.standalone;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.Properties;
|
||||
|
||||
/**
|
||||
* Interface used by StandaloneController to store configuration data for jobs. To be fault
|
||||
* tolerant, all data required to resume jobs is stored here.
|
||||
*/
|
||||
interface ConfigStorage {
|
||||
|
||||
/**
|
||||
* Configure this storage engine.
|
||||
* @param props configuration properties
|
||||
*/
|
||||
void configure(Properties props);
|
||||
|
||||
/**
|
||||
* Close this storage engine.
|
||||
*/
|
||||
void close();
|
||||
|
||||
/**
|
||||
* Commit the new connector config.
|
||||
* @param connector
|
||||
* @param properties
|
||||
*/
|
||||
void putConnectorConfig(String connector, Properties properties);
|
||||
|
||||
/**
|
||||
* Read back the config for the given connector.
|
||||
* @param connector
|
||||
* @return
|
||||
*/
|
||||
Properties getConnectorConfig(String connector);
|
||||
|
||||
/**
|
||||
* Get a list of connector names that have associated state in the store.
|
||||
* @return
|
||||
*/
|
||||
Collection<String> getConnectors();
|
||||
|
||||
}
|
|
@ -1,111 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
* <p/>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p/>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
**/
|
||||
|
||||
package org.apache.kafka.copycat.runtime.standalone;
|
||||
|
||||
import org.apache.kafka.copycat.errors.CopycatRuntimeException;
|
||||
|
||||
import java.io.*;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
* Implementation of ConfigStorage that saves state to a local file. This allows a standalone
|
||||
* node to tolerate faults/restarts.
|
||||
* </p>
|
||||
* <p>
|
||||
* Currently the implementation is naive, inefficient, and only meant for testing or a small
|
||||
* number of jobs.
|
||||
* </p>
|
||||
*/
|
||||
public class FileConfigStorage implements ConfigStorage {
|
||||
public static final String FILE_CONFIG = "config.storage.file";
|
||||
public static final String FILE_DEFAULT = "configs.db";
|
||||
|
||||
private String filename;
|
||||
private Map<String, Properties> connectorConfig = new HashMap<>();
|
||||
|
||||
@Override
|
||||
public void configure(Properties props) {
|
||||
filename = props.getProperty(FILE_CONFIG);
|
||||
if (filename == null)
|
||||
filename = FILE_DEFAULT;
|
||||
load();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void putConnectorConfig(String connector, Properties properties) {
|
||||
if (properties == null)
|
||||
connectorConfig.remove(connector);
|
||||
else
|
||||
connectorConfig.put(connector, properties);
|
||||
save();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Properties getConnectorConfig(String connector) {
|
||||
return connectorConfig.get(connector);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<String> getConnectors() {
|
||||
return connectorConfig.keySet();
|
||||
}
|
||||
|
||||
/**
|
||||
* Saves the current state to disk, overwriting previous data. This action is performed
|
||||
* atomically.
|
||||
*/
|
||||
private void save() {
|
||||
try {
|
||||
String tempFilename = filename + ".temp";
|
||||
ObjectOutputStream os = new ObjectOutputStream(new FileOutputStream(tempFilename));
|
||||
os.writeObject(connectorConfig);
|
||||
os.close();
|
||||
|
||||
// Overwrite the original. Since the nio file package is JDK7+ only, this is the best we
|
||||
// can do.
|
||||
File tempFile = new File(tempFilename);
|
||||
tempFile.renameTo(new File(filename));
|
||||
} catch (IOException e) {
|
||||
throw new CopycatRuntimeException("Failed to save config data to file", e);
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private void load() {
|
||||
try {
|
||||
ObjectInputStream is = new ObjectInputStream(new FileInputStream(filename));
|
||||
Object data = is.readObject();
|
||||
if (!(data instanceof Map))
|
||||
throw new CopycatRuntimeException("Expected Map but found " + data.getClass());
|
||||
connectorConfig = (Map<String, Properties>) data;
|
||||
} catch (FileNotFoundException e) {
|
||||
// Expected on first run
|
||||
} catch (IOException | ClassNotFoundException e) {
|
||||
throw new CopycatRuntimeException("Failed to load config data", e);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -38,35 +38,15 @@ import java.util.*;
|
|||
public class StandaloneCoordinator implements Coordinator {
|
||||
private static final Logger log = LoggerFactory.getLogger(StandaloneCoordinator.class);
|
||||
|
||||
public static final String STORAGE_CONFIG = "coordinator.standalone.storage";
|
||||
|
||||
private Worker worker;
|
||||
private Properties configs;
|
||||
private ConfigStorage configStorage;
|
||||
private HashMap<String, ConnectorState> connectors = new HashMap<>();
|
||||
|
||||
public StandaloneCoordinator(Worker worker, Properties props) {
|
||||
public StandaloneCoordinator(Worker worker) {
|
||||
this.worker = worker;
|
||||
this.configs = props;
|
||||
}
|
||||
|
||||
public synchronized void start() {
|
||||
log.info("Coordinator starting");
|
||||
|
||||
String storage = configs.getProperty(STORAGE_CONFIG);
|
||||
if (storage != null && !storage.isEmpty()) {
|
||||
try {
|
||||
configStorage = Utils.newInstance(storage, ConfigStorage.class);
|
||||
} catch (ClassNotFoundException e) {
|
||||
throw new CopycatRuntimeException("Couldn't configure storage", e);
|
||||
}
|
||||
configStorage.configure(configs);
|
||||
} else {
|
||||
configStorage = null;
|
||||
}
|
||||
|
||||
restoreConnectors();
|
||||
|
||||
log.info("Coordinator started");
|
||||
}
|
||||
|
||||
|
@ -82,11 +62,6 @@ public class StandaloneCoordinator implements Coordinator {
|
|||
}
|
||||
connectors.clear();
|
||||
|
||||
if (configStorage != null) {
|
||||
configStorage.close();
|
||||
configStorage = null;
|
||||
}
|
||||
|
||||
log.info("Coordinator stopped");
|
||||
}
|
||||
|
||||
|
@ -148,8 +123,6 @@ public class StandaloneCoordinator implements Coordinator {
|
|||
}
|
||||
ConnectorState state = new ConnectorState(connName, connector, maxTasks, topics);
|
||||
connectors.put(connName, state);
|
||||
if (configStorage != null)
|
||||
configStorage.putConnectorConfig(connName, connectorProps);
|
||||
|
||||
log.info("Finished creating connector {}", connName);
|
||||
|
||||
|
@ -174,8 +147,6 @@ public class StandaloneCoordinator implements Coordinator {
|
|||
|
||||
stopConnector(state);
|
||||
connectors.remove(state.name);
|
||||
if (configStorage != null)
|
||||
configStorage.putConnectorConfig(state.name, null);
|
||||
|
||||
log.info("Finished destroying connector {}", connName);
|
||||
}
|
||||
|
@ -252,23 +223,6 @@ public class StandaloneCoordinator implements Coordinator {
|
|||
createConnectorTasks(state);
|
||||
}
|
||||
|
||||
private void restoreConnectors() {
|
||||
if (configStorage == null)
|
||||
return;
|
||||
|
||||
Collection<String> connNames = configStorage.getConnectors();
|
||||
for (String connName : connNames) {
|
||||
log.info("Restoring connector {}", connName);
|
||||
Properties connProps = configStorage.getConnectorConfig(connName);
|
||||
ConnectorState connState = createConnector(connProps);
|
||||
// Because this coordinator is standalone, connectors are only restored when this process
|
||||
// starts and we know there can't be any existing tasks. So in this special case we're able
|
||||
// to just create the tasks rather than having to check for existing tasks and sort out
|
||||
// whether they need to be reconfigured.
|
||||
createConnectorTasks(connState);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Requests reconfiguration of the task. This should only be triggered by
|
||||
* {@link StandaloneConnectorContext}.
|
||||
|
|
|
@ -25,7 +25,6 @@ import java.io.*;
|
|||
import java.nio.ByteBuffer;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
|
||||
/**
|
||||
* Implementation of OffsetBackingStore that saves data locally to a file. To ensure this behaves
|
||||
|
@ -42,9 +41,9 @@ public class FileOffsetBackingStore extends MemoryOffsetBackingStore {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void configure(Properties props) {
|
||||
public void configure(Map<String, ?> props) {
|
||||
super.configure(props);
|
||||
String filename = props.getProperty(OFFSET_STORAGE_FILE_FILENAME_CONFIG);
|
||||
String filename = (String) props.get(OFFSET_STORAGE_FILE_FILENAME_CONFIG);
|
||||
file = new File(filename);
|
||||
}
|
||||
|
||||
|
|
|
@ -25,7 +25,6 @@ import java.nio.ByteBuffer;
|
|||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
|
@ -47,7 +46,7 @@ public class MemoryOffsetBackingStore implements OffsetBackingStore {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void configure(Properties props) {
|
||||
public void configure(Map<String, ?> props) {
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -17,8 +17,8 @@
|
|||
|
||||
package org.apache.kafka.copycat.storage;
|
||||
|
||||
import org.apache.kafka.common.Configurable;
|
||||
import org.apache.kafka.copycat.util.Callback;
|
||||
import org.apache.kafka.copycat.util.Configurable;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Collection;
|
||||
|
|
|
@ -1,27 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
* <p/>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p/>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
**/
|
||||
|
||||
package org.apache.kafka.copycat.util;
|
||||
|
||||
import java.util.Properties;
|
||||
|
||||
/**
|
||||
* Class that can be configured immediately after instantiation with a set of properties.
|
||||
*/
|
||||
public interface Configurable {
|
||||
public void configure(Properties props);
|
||||
}
|
|
@ -1,88 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
* <p/>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p/>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
**/
|
||||
|
||||
package org.apache.kafka.copycat.runtime.standalone;
|
||||
|
||||
import org.apache.kafka.copycat.runtime.ConnectorConfig;
|
||||
import org.apache.kafka.copycat.sink.SinkConnector;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.powermock.api.easymock.PowerMock;
|
||||
import org.powermock.core.classloader.annotations.PowerMockIgnore;
|
||||
import org.powermock.core.classloader.annotations.PrepareForTest;
|
||||
import org.powermock.modules.junit4.PowerMockRunner;
|
||||
|
||||
import java.io.File;
|
||||
import java.util.Properties;
|
||||
|
||||
@RunWith(PowerMockRunner.class)
|
||||
@PrepareForTest({StandaloneCoordinator.class})
|
||||
@PowerMockIgnore("javax.management.*")
|
||||
public class StandaloneCoordinatorRestoreTest extends StandaloneCoordinatorTestBase {
|
||||
private File coordinatorConfigFile;
|
||||
|
||||
@Before
|
||||
public void setup() throws Exception {
|
||||
Properties coordinatorProps = new Properties();
|
||||
coordinatorProps.setProperty(StandaloneCoordinator.STORAGE_CONFIG,
|
||||
FileConfigStorage.class.getName());
|
||||
coordinatorConfigFile = File.createTempFile("test-coordinator-config", null);
|
||||
coordinatorConfigFile.delete(); // Delete since we just needed a random file path
|
||||
coordinatorProps.setProperty(FileConfigStorage.FILE_CONFIG,
|
||||
coordinatorConfigFile.getAbsolutePath());
|
||||
coordinator = new StandaloneCoordinator(worker, coordinatorProps);
|
||||
|
||||
connectorProps = new Properties();
|
||||
connectorProps.setProperty(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME);
|
||||
connectorProps.setProperty(SinkConnector.TOPICS_CONFIG, TOPICS_LIST_STR);
|
||||
PowerMock.mockStatic(StandaloneCoordinator.class);
|
||||
|
||||
// These can be anything since connectors can pass along whatever they want.
|
||||
taskProps = new Properties();
|
||||
taskProps.setProperty("foo", "bar");
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() {
|
||||
coordinatorConfigFile.delete();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRestoreConnectors() throws Exception {
|
||||
connector = PowerMock.createMock(BogusSourceClass.class);
|
||||
expectAdd(BogusSourceClass.class, BogusSourceTask.class, false);
|
||||
expectStop();
|
||||
// Restarting should recreate the same connector
|
||||
expectRestore(BogusSourceClass.class, BogusSourceTask.class);
|
||||
expectStop();
|
||||
|
||||
PowerMock.replayAll();
|
||||
|
||||
// One run to seed the config storage with the job
|
||||
coordinator.start();
|
||||
coordinator.addConnector(connectorProps, createCallback);
|
||||
coordinator.stop();
|
||||
|
||||
// Second run should restore the connector
|
||||
coordinator.start();
|
||||
coordinator.stop();
|
||||
|
||||
PowerMock.verifyAll();
|
||||
}
|
||||
}
|
|
@ -17,31 +17,51 @@
|
|||
|
||||
package org.apache.kafka.copycat.runtime.standalone;
|
||||
|
||||
import org.apache.kafka.copycat.connector.Connector;
|
||||
import org.apache.kafka.copycat.connector.Task;
|
||||
import org.apache.kafka.copycat.errors.CopycatException;
|
||||
import org.apache.kafka.copycat.runtime.ConnectorConfig;
|
||||
import org.apache.kafka.copycat.runtime.Worker;
|
||||
import org.apache.kafka.copycat.sink.SinkConnector;
|
||||
import org.apache.kafka.copycat.sink.SinkTask;
|
||||
import org.apache.kafka.copycat.source.SourceConnector;
|
||||
import org.apache.kafka.copycat.source.SourceTask;
|
||||
import org.apache.kafka.copycat.util.Callback;
|
||||
import org.apache.kafka.copycat.util.ConnectorTaskId;
|
||||
import org.apache.kafka.copycat.util.FutureCallback;
|
||||
import org.easymock.EasyMock;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.powermock.api.easymock.PowerMock;
|
||||
import org.powermock.api.easymock.annotation.Mock;
|
||||
import org.powermock.core.classloader.annotations.PowerMockIgnore;
|
||||
import org.powermock.core.classloader.annotations.PrepareForTest;
|
||||
import org.powermock.modules.junit4.PowerMockRunner;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Properties;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@RunWith(PowerMockRunner.class)
|
||||
@PrepareForTest({StandaloneCoordinator.class})
|
||||
@PowerMockIgnore("javax.management.*")
|
||||
public class StandaloneCoordinatorTest extends StandaloneCoordinatorTestBase {
|
||||
public class StandaloneCoordinatorTest {
|
||||
private static final String CONNECTOR_NAME = "test";
|
||||
private static final String TOPICS_LIST_STR = "topic1,topic2";
|
||||
|
||||
private StandaloneCoordinator coordinator;
|
||||
@Mock protected Worker worker;
|
||||
private Connector connector;
|
||||
@Mock protected Callback<String> createCallback;
|
||||
|
||||
private Properties connectorProps;
|
||||
private Properties taskProps;
|
||||
|
||||
@Before
|
||||
public void setup() {
|
||||
worker = PowerMock.createMock(Worker.class);
|
||||
coordinator = new StandaloneCoordinator(worker, new Properties());
|
||||
coordinator = new StandaloneCoordinator(worker);
|
||||
|
||||
connectorProps = new Properties();
|
||||
connectorProps.setProperty(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME);
|
||||
|
@ -94,4 +114,74 @@ public class StandaloneCoordinatorTest extends StandaloneCoordinatorTestBase {
|
|||
futureCb.get(1000L, TimeUnit.MILLISECONDS);
|
||||
PowerMock.verifyAll();
|
||||
}
|
||||
|
||||
|
||||
private void expectAdd(Class<? extends Connector> connClass,
|
||||
Class<? extends Task> taskClass,
|
||||
boolean sink) throws Exception {
|
||||
expectCreate(connClass, taskClass, sink, true);
|
||||
}
|
||||
|
||||
private void expectRestore(Class<? extends Connector> connClass,
|
||||
Class<? extends Task> taskClass) throws Exception {
|
||||
// Restore never uses a callback. These tests always use sources
|
||||
expectCreate(connClass, taskClass, false, false);
|
||||
}
|
||||
|
||||
private void expectCreate(Class<? extends Connector> connClass,
|
||||
Class<? extends Task> taskClass,
|
||||
boolean sink, boolean expectCallback) throws Exception {
|
||||
connectorProps.setProperty(ConnectorConfig.CONNECTOR_CLASS_CONFIG, connClass.getName());
|
||||
|
||||
PowerMock.expectPrivate(StandaloneCoordinator.class, "instantiateConnector", connClass.getName())
|
||||
.andReturn(connector);
|
||||
if (expectCallback) {
|
||||
createCallback.onCompletion(null, CONNECTOR_NAME);
|
||||
PowerMock.expectLastCall();
|
||||
}
|
||||
|
||||
connector.initialize(EasyMock.anyObject(StandaloneConnectorContext.class));
|
||||
PowerMock.expectLastCall();
|
||||
connector.start(new Properties());
|
||||
PowerMock.expectLastCall();
|
||||
|
||||
// Just return the connector properties for the individual task we generate by default
|
||||
EasyMock.<Class<? extends Task>>expect(connector.getTaskClass()).andReturn(taskClass);
|
||||
|
||||
EasyMock.expect(connector.getTaskConfigs(ConnectorConfig.TASKS_MAX_DEFAULT))
|
||||
.andReturn(Arrays.asList(taskProps));
|
||||
// And we should instantiate the tasks. For a sink task, we should see added properties for
|
||||
// the input topic partitions
|
||||
Properties generatedTaskProps = new Properties();
|
||||
generatedTaskProps.putAll(taskProps);
|
||||
if (sink)
|
||||
generatedTaskProps.setProperty(SinkTask.TOPICS_CONFIG, TOPICS_LIST_STR);
|
||||
worker.addTask(new ConnectorTaskId(CONNECTOR_NAME, 0), taskClass.getName(), generatedTaskProps);
|
||||
PowerMock.expectLastCall();
|
||||
}
|
||||
|
||||
private void expectStop() throws CopycatException {
|
||||
worker.stopTask(new ConnectorTaskId(CONNECTOR_NAME, 0));
|
||||
EasyMock.expectLastCall();
|
||||
connector.stop();
|
||||
EasyMock.expectLastCall();
|
||||
}
|
||||
|
||||
private void expectDestroy() throws CopycatException {
|
||||
expectStop();
|
||||
}
|
||||
|
||||
// We need to use a real class here due to some issue with mocking java.lang.Class
|
||||
private abstract class BogusSourceClass extends SourceConnector {
|
||||
}
|
||||
|
||||
private abstract class BogusSourceTask extends SourceTask {
|
||||
}
|
||||
|
||||
private abstract class BogusSinkClass extends SinkConnector {
|
||||
}
|
||||
|
||||
private abstract class BogusSinkTask extends SourceTask {
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -1,118 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
* <p/>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p/>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
**/
|
||||
|
||||
package org.apache.kafka.copycat.runtime.standalone;
|
||||
|
||||
import org.apache.kafka.copycat.connector.Connector;
|
||||
import org.apache.kafka.copycat.connector.Task;
|
||||
import org.apache.kafka.copycat.errors.CopycatException;
|
||||
import org.apache.kafka.copycat.runtime.ConnectorConfig;
|
||||
import org.apache.kafka.copycat.runtime.Worker;
|
||||
import org.apache.kafka.copycat.sink.SinkConnector;
|
||||
import org.apache.kafka.copycat.sink.SinkTask;
|
||||
import org.apache.kafka.copycat.source.SourceConnector;
|
||||
import org.apache.kafka.copycat.source.SourceTask;
|
||||
import org.apache.kafka.copycat.util.Callback;
|
||||
import org.apache.kafka.copycat.util.ConnectorTaskId;
|
||||
import org.easymock.EasyMock;
|
||||
import org.powermock.api.easymock.PowerMock;
|
||||
import org.powermock.api.easymock.annotation.Mock;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Properties;
|
||||
|
||||
public class StandaloneCoordinatorTestBase {
|
||||
|
||||
protected static final String CONNECTOR_NAME = "test";
|
||||
protected static final String TOPICS_LIST_STR = "topic1,topic2";
|
||||
|
||||
protected StandaloneCoordinator coordinator;
|
||||
@Mock protected Worker worker;
|
||||
protected Connector connector;
|
||||
@Mock protected Callback<String> createCallback;
|
||||
|
||||
protected Properties connectorProps;
|
||||
protected Properties taskProps;
|
||||
|
||||
protected void expectAdd(Class<? extends Connector> connClass,
|
||||
Class<? extends Task> taskClass,
|
||||
boolean sink) throws Exception {
|
||||
expectCreate(connClass, taskClass, sink, true);
|
||||
}
|
||||
|
||||
protected void expectRestore(Class<? extends Connector> connClass,
|
||||
Class<? extends Task> taskClass) throws Exception {
|
||||
// Restore never uses a callback. These tests always use sources
|
||||
expectCreate(connClass, taskClass, false, false);
|
||||
}
|
||||
|
||||
protected void expectCreate(Class<? extends Connector> connClass,
|
||||
Class<? extends Task> taskClass,
|
||||
boolean sink, boolean expectCallback) throws Exception {
|
||||
connectorProps.setProperty(ConnectorConfig.CONNECTOR_CLASS_CONFIG, connClass.getName());
|
||||
|
||||
PowerMock.expectPrivate(StandaloneCoordinator.class, "instantiateConnector", connClass.getName())
|
||||
.andReturn(connector);
|
||||
if (expectCallback) {
|
||||
createCallback.onCompletion(null, CONNECTOR_NAME);
|
||||
PowerMock.expectLastCall();
|
||||
}
|
||||
|
||||
connector.initialize(EasyMock.anyObject(StandaloneConnectorContext.class));
|
||||
PowerMock.expectLastCall();
|
||||
connector.start(new Properties());
|
||||
PowerMock.expectLastCall();
|
||||
|
||||
// Just return the connector properties for the individual task we generate by default
|
||||
EasyMock.<Class<? extends Task>>expect(connector.getTaskClass()).andReturn(taskClass);
|
||||
|
||||
EasyMock.expect(connector.getTaskConfigs(ConnectorConfig.TASKS_MAX_DEFAULT))
|
||||
.andReturn(Arrays.asList(taskProps));
|
||||
// And we should instantiate the tasks. For a sink task, we should see added properties for
|
||||
// the input topic partitions
|
||||
Properties generatedTaskProps = new Properties();
|
||||
generatedTaskProps.putAll(taskProps);
|
||||
if (sink)
|
||||
generatedTaskProps.setProperty(SinkTask.TOPICS_CONFIG, TOPICS_LIST_STR);
|
||||
worker.addTask(new ConnectorTaskId(CONNECTOR_NAME, 0), taskClass.getName(), generatedTaskProps);
|
||||
PowerMock.expectLastCall();
|
||||
}
|
||||
|
||||
protected void expectStop() throws CopycatException {
|
||||
worker.stopTask(new ConnectorTaskId(CONNECTOR_NAME, 0));
|
||||
EasyMock.expectLastCall();
|
||||
connector.stop();
|
||||
EasyMock.expectLastCall();
|
||||
}
|
||||
|
||||
protected void expectDestroy() throws CopycatException {
|
||||
expectStop();
|
||||
}
|
||||
|
||||
// We need to use a real class here due to some issue with mocking java.lang.Class
|
||||
protected abstract class BogusSourceClass extends SourceConnector {
|
||||
}
|
||||
|
||||
protected abstract class BogusSourceTask extends SourceTask {
|
||||
}
|
||||
|
||||
protected abstract class BogusSinkClass extends SinkConnector {
|
||||
}
|
||||
|
||||
protected abstract class BogusSinkTask extends SourceTask {
|
||||
}
|
||||
}
|
|
@ -30,14 +30,13 @@ import java.nio.ByteBuffer;
|
|||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
public class FileOffsetBackingStoreTest {
|
||||
|
||||
FileOffsetBackingStore store;
|
||||
Properties props;
|
||||
Map<String, Object> props;
|
||||
File tempFile;
|
||||
|
||||
private static Map<ByteBuffer, ByteBuffer> firstSet = new HashMap<>();
|
||||
|
@ -51,9 +50,8 @@ public class FileOffsetBackingStoreTest {
|
|||
public void setup() throws IOException {
|
||||
store = new FileOffsetBackingStore();
|
||||
tempFile = File.createTempFile("fileoffsetbackingstore", null);
|
||||
props = new Properties();
|
||||
props.setProperty(FileOffsetBackingStore.OFFSET_STORAGE_FILE_FILENAME_CONFIG,
|
||||
tempFile.getAbsolutePath());
|
||||
props = new HashMap<>();
|
||||
props.put(FileOffsetBackingStore.OFFSET_STORAGE_FILE_FILENAME_CONFIG, tempFile.getAbsolutePath());
|
||||
store.configure(props);
|
||||
store.start();
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue