Make Copycat CLI speific to standalone mode, clean up some config and get rid of config storage in standalone mode.

This commit is contained in:
Ewen Cheslack-Postava 2015-08-13 10:52:55 -07:00
parent 656a003894
commit 7bf807596a
18 changed files with 122 additions and 632 deletions

View File

@ -20,4 +20,4 @@ if [ "x$KAFKA_LOG4J_OPTS" = "x" ]; then
export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:$base_dir/../config/copycat-log4j.properties"
fi
exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.copycat.cli.Copycat "$@"
exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.copycat.cli.CopycatStandalone "$@"

View File

@ -21,4 +21,8 @@ value.converter=org.apache.kafka.copycat.json.JsonConverter
key.serializer=org.apache.kafka.copycat.json.JsonSerializer
value.serializer=org.apache.kafka.copycat.json.JsonSerializer
key.deserializer=org.apache.kafka.copycat.json.JsonDeserializer
value.deserializer=org.apache.kafka.copycat.json.JsonDeserializer
value.deserializer=org.apache.kafka.copycat.json.JsonDeserializer
offset.storage.file.filename=/tmp/copycat.offsets
# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000

View File

@ -1,31 +0,0 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.copycat.json.JsonConverter
value.converter=org.apache.kafka.copycat.json.JsonConverter
key.serializer=org.apache.kafka.copycat.json.JsonSerializer
value.serializer=org.apache.kafka.copycat.json.JsonSerializer
key.deserializer=org.apache.kafka.copycat.json.JsonDeserializer
value.deserializer=org.apache.kafka.copycat.json.JsonDeserializer
offset.storage.class=org.apache.kafka.copycat.storage.FileOffsetBackingStore
offset.storage.file.filename=/tmp/copycat.offsets
# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000
coordinator.standalone.storage=org.apache.kafka.copycat.runtime.standalone.FileConfigStorage
config.storage.file=/tmp/copycat.configs

View File

@ -1,94 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.copycat.cli;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
import java.util.Properties;
@InterfaceStability.Unstable
public class CopycatConfig extends AbstractConfig {
public static final String WORKER_PROPERTIES_CONFIG = "worker-config";
public static final String WORKER_PROPERTIES_CONFIG_DEFAULT = "";
private static final String WORKER_PROPERTIES_CONFIG_DOC =
"Path to a properties file with worker configuration.";
public static final String CREATE_CONNECTORS_CONFIG = "create-connectors";
public static final String CREATE_CONNECTORS_CONFIG_DEFAULT = "";
private static final String CREATE_CONNECTORS_CONFIG_DOC =
"List of paths to properties files with connector properties to use to create new connectors";
public static final String DELETE_CONNECTORS_CONFIG = "delete-connectors";
public static final String DELETE_CONNECTORS_CONFIG_DEFAULT = "";
private static final String DELETE_CONNECTORS_CONFIG_DOC = "List of names of a connectors to "
+ "stop and delete.";
private static ConfigDef config;
static {
config = new ConfigDef()
.define(WORKER_PROPERTIES_CONFIG, Type.STRING, WORKER_PROPERTIES_CONFIG_DEFAULT,
Importance.HIGH, WORKER_PROPERTIES_CONFIG_DOC)
.define(CREATE_CONNECTORS_CONFIG, Type.LIST, CREATE_CONNECTORS_CONFIG_DEFAULT,
Importance.HIGH, CREATE_CONNECTORS_CONFIG_DOC)
.define(DELETE_CONNECTORS_CONFIG, Type.LIST, DELETE_CONNECTORS_CONFIG_DEFAULT,
Importance.HIGH, DELETE_CONNECTORS_CONFIG_DOC);
}
public CopycatConfig(Properties props) {
super(config, props);
}
/**
* Parses command line arguments into a Properties object and instantiate a
* CopycatConfig with it.
* @param args
* @return
*/
public static CopycatConfig parseCommandLineArgs(String[] args) {
Properties props = new Properties();
for (int i = 0; i < args.length; i++) {
String arg = args[i];
String key, value;
// Check for foo=bar or --foo=bar syntax
if (arg.contains("=")) {
String[] parts = arg.split("=", 1);
key = parts[0];
value = parts[1];
} else {
key = args[i];
i += 1;
value = args[i];
}
// Check for -- prefix on key
if (key.startsWith("--"))
key = key.substring(2);
props.setProperty(key, value);
}
return new CopycatConfig(props);
}
}

View File

@ -18,7 +18,6 @@
package org.apache.kafka.copycat.cli;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.copycat.runtime.Coordinator;
import org.apache.kafka.copycat.runtime.Worker;
@ -28,6 +27,7 @@ import org.apache.kafka.copycat.util.FutureCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.Properties;
/**
@ -42,50 +42,29 @@ import java.util.Properties;
* </p>
*/
@InterfaceStability.Unstable
public class Copycat {
private static final Logger log = LoggerFactory.getLogger(Copycat.class);
public class CopycatStandalone {
private static final Logger log = LoggerFactory.getLogger(CopycatStandalone.class);
public static void main(String[] args) throws Exception {
CopycatConfig config;
Properties workerProps;
Properties connectorProps;
try {
config = CopycatConfig.parseCommandLineArgs(args);
} catch (ConfigException e) {
log.error(e.getMessage());
log.info("Usage: copycat [--worker-config worker.properties]"
+ " [--create-connectors connector1.properties,connector2.properties,...]"
+ " [--delete-connectors connector1-name,connector2-name,...]");
if (args.length < 2) {
log.info("Usage: CopycatStandalone worker.properties connector1.properties [connector2.properties ...]");
System.exit(1);
return;
}
String workerPropsFile = config.getString(CopycatConfig.WORKER_PROPERTIES_CONFIG);
String workerPropsFile = args[0];
workerProps = !workerPropsFile.isEmpty() ? Utils.loadProps(workerPropsFile) : new Properties();
WorkerConfig workerConfig = new WorkerConfig(workerProps);
Worker worker = new Worker(workerConfig);
Coordinator coordinator = new StandaloneCoordinator(worker, workerConfig.getUnusedProperties());
Coordinator coordinator = new StandaloneCoordinator(worker);
final org.apache.kafka.copycat.runtime.Copycat copycat = new org.apache.kafka.copycat.runtime.Copycat(worker, coordinator);
copycat.start();
try {
// Destroy any requested connectors
for (final String connName : config.getList(CopycatConfig.DELETE_CONNECTORS_CONFIG)) {
FutureCallback<Void> cb = new FutureCallback<>(new Callback<Void>() {
@Override
public void onCompletion(Throwable error, Void result) {
if (error != null)
log.error("Failed to stop job {}", connName);
}
});
coordinator.deleteConnector(connName, cb);
cb.get();
}
// Create any new connectors
for (final String connectorPropsFile : config.getList(CopycatConfig.CREATE_CONNECTORS_CONFIG)) {
for (final String connectorPropsFile : Arrays.copyOfRange(args, 1, args.length)) {
connectorProps = Utils.loadProps(connectorPropsFile);
FutureCallback<String> cb = new FutureCallback<>(new Callback<String>() {
@Override

View File

@ -82,12 +82,6 @@ public class WorkerConfig extends AbstractConfig {
+ " not per task. All task have shutdown triggered, then they are waited on sequentially.";
private static final String TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_DEFAULT = "5000";
public static final String OFFSET_STORAGE_CLASS_CONFIG = "offset.storage.class";
private static final String OFFSET_STORAGE_CLASS_DOC =
"OffsetBackingStore implementation to use for storing partition offset data";
public static final String OFFSET_STORAGE_CLASS_DEFAULT
= "org.apache.kafka.copycat.storage.MemoryOffsetBackingStore";
public static final String OFFSET_COMMIT_INTERVAL_MS_CONFIG = "offset.flush.interval.ms";
private static final String OFFSET_COMMIT_INTERVAL_MS_DOC
= "Interval at which to try committing offsets for tasks.";
@ -122,8 +116,6 @@ public class WorkerConfig extends AbstractConfig {
.define(TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG, Type.LONG,
TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_DEFAULT, Importance.LOW,
TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_DOC)
.define(OFFSET_STORAGE_CLASS_CONFIG, Type.CLASS, OFFSET_STORAGE_CLASS_DEFAULT,
Importance.LOW, OFFSET_STORAGE_CLASS_DOC)
.define(OFFSET_COMMIT_INTERVAL_MS_CONFIG, Type.LONG, OFFSET_COMMIT_INTERVAL_MS_DEFAULT,
Importance.LOW, OFFSET_COMMIT_INTERVAL_MS_DOC)
.define(OFFSET_COMMIT_TIMEOUT_MS_CONFIG, Type.LONG, OFFSET_COMMIT_TIMEOUT_MS_DEFAULT,

View File

@ -65,9 +65,7 @@ public class Worker<K, V> {
private SourceTaskOffsetCommitter sourceTaskOffsetCommitter;
public Worker(WorkerConfig config) {
this(new SystemTime(), config,
config.getConfiguredInstance(WorkerConfig.OFFSET_STORAGE_CLASS_CONFIG, OffsetBackingStore.class),
null, null, null, null);
this(new SystemTime(), config, null, null, null, null, null);
}
@SuppressWarnings("unchecked")
@ -78,7 +76,13 @@ public class Worker<K, V> {
this.config = config;
this.keyConverter = config.getConfiguredInstance(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, Converter.class);
this.valueConverter = config.getConfiguredInstance(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, Converter.class);
this.offsetBackingStore = offsetBackingStore;
if (offsetBackingStore != null) {
this.offsetBackingStore = offsetBackingStore;
} else {
this.offsetBackingStore = new FileOffsetBackingStore();
this.offsetBackingStore.configure(config.originals());
}
if (offsetKeySerializer != null) {
this.offsetKeySerializer = offsetKeySerializer;

View File

@ -1,60 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.copycat.runtime.standalone;
import java.util.Collection;
import java.util.Properties;
/**
* Interface used by StandaloneController to store configuration data for jobs. To be fault
* tolerant, all data required to resume jobs is stored here.
*/
interface ConfigStorage {
/**
* Configure this storage engine.
* @param props configuration properties
*/
void configure(Properties props);
/**
* Close this storage engine.
*/
void close();
/**
* Commit the new connector config.
* @param connector
* @param properties
*/
void putConnectorConfig(String connector, Properties properties);
/**
* Read back the config for the given connector.
* @param connector
* @return
*/
Properties getConnectorConfig(String connector);
/**
* Get a list of connector names that have associated state in the store.
* @return
*/
Collection<String> getConnectors();
}

View File

@ -1,111 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.copycat.runtime.standalone;
import org.apache.kafka.copycat.errors.CopycatRuntimeException;
import java.io.*;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
/**
* <p>
* Implementation of ConfigStorage that saves state to a local file. This allows a standalone
* node to tolerate faults/restarts.
* </p>
* <p>
* Currently the implementation is naive, inefficient, and only meant for testing or a small
* number of jobs.
* </p>
*/
public class FileConfigStorage implements ConfigStorage {
public static final String FILE_CONFIG = "config.storage.file";
public static final String FILE_DEFAULT = "configs.db";
private String filename;
private Map<String, Properties> connectorConfig = new HashMap<>();
@Override
public void configure(Properties props) {
filename = props.getProperty(FILE_CONFIG);
if (filename == null)
filename = FILE_DEFAULT;
load();
}
@Override
public void close() {
}
@Override
public void putConnectorConfig(String connector, Properties properties) {
if (properties == null)
connectorConfig.remove(connector);
else
connectorConfig.put(connector, properties);
save();
}
@Override
public Properties getConnectorConfig(String connector) {
return connectorConfig.get(connector);
}
@Override
public Collection<String> getConnectors() {
return connectorConfig.keySet();
}
/**
* Saves the current state to disk, overwriting previous data. This action is performed
* atomically.
*/
private void save() {
try {
String tempFilename = filename + ".temp";
ObjectOutputStream os = new ObjectOutputStream(new FileOutputStream(tempFilename));
os.writeObject(connectorConfig);
os.close();
// Overwrite the original. Since the nio file package is JDK7+ only, this is the best we
// can do.
File tempFile = new File(tempFilename);
tempFile.renameTo(new File(filename));
} catch (IOException e) {
throw new CopycatRuntimeException("Failed to save config data to file", e);
}
}
@SuppressWarnings("unchecked")
private void load() {
try {
ObjectInputStream is = new ObjectInputStream(new FileInputStream(filename));
Object data = is.readObject();
if (!(data instanceof Map))
throw new CopycatRuntimeException("Expected Map but found " + data.getClass());
connectorConfig = (Map<String, Properties>) data;
} catch (FileNotFoundException e) {
// Expected on first run
} catch (IOException | ClassNotFoundException e) {
throw new CopycatRuntimeException("Failed to load config data", e);
}
}
}

View File

@ -38,35 +38,15 @@ import java.util.*;
public class StandaloneCoordinator implements Coordinator {
private static final Logger log = LoggerFactory.getLogger(StandaloneCoordinator.class);
public static final String STORAGE_CONFIG = "coordinator.standalone.storage";
private Worker worker;
private Properties configs;
private ConfigStorage configStorage;
private HashMap<String, ConnectorState> connectors = new HashMap<>();
public StandaloneCoordinator(Worker worker, Properties props) {
public StandaloneCoordinator(Worker worker) {
this.worker = worker;
this.configs = props;
}
public synchronized void start() {
log.info("Coordinator starting");
String storage = configs.getProperty(STORAGE_CONFIG);
if (storage != null && !storage.isEmpty()) {
try {
configStorage = Utils.newInstance(storage, ConfigStorage.class);
} catch (ClassNotFoundException e) {
throw new CopycatRuntimeException("Couldn't configure storage", e);
}
configStorage.configure(configs);
} else {
configStorage = null;
}
restoreConnectors();
log.info("Coordinator started");
}
@ -82,11 +62,6 @@ public class StandaloneCoordinator implements Coordinator {
}
connectors.clear();
if (configStorage != null) {
configStorage.close();
configStorage = null;
}
log.info("Coordinator stopped");
}
@ -148,8 +123,6 @@ public class StandaloneCoordinator implements Coordinator {
}
ConnectorState state = new ConnectorState(connName, connector, maxTasks, topics);
connectors.put(connName, state);
if (configStorage != null)
configStorage.putConnectorConfig(connName, connectorProps);
log.info("Finished creating connector {}", connName);
@ -174,8 +147,6 @@ public class StandaloneCoordinator implements Coordinator {
stopConnector(state);
connectors.remove(state.name);
if (configStorage != null)
configStorage.putConnectorConfig(state.name, null);
log.info("Finished destroying connector {}", connName);
}
@ -252,23 +223,6 @@ public class StandaloneCoordinator implements Coordinator {
createConnectorTasks(state);
}
private void restoreConnectors() {
if (configStorage == null)
return;
Collection<String> connNames = configStorage.getConnectors();
for (String connName : connNames) {
log.info("Restoring connector {}", connName);
Properties connProps = configStorage.getConnectorConfig(connName);
ConnectorState connState = createConnector(connProps);
// Because this coordinator is standalone, connectors are only restored when this process
// starts and we know there can't be any existing tasks. So in this special case we're able
// to just create the tasks rather than having to check for existing tasks and sort out
// whether they need to be reconfigured.
createConnectorTasks(connState);
}
}
/**
* Requests reconfiguration of the task. This should only be triggered by
* {@link StandaloneConnectorContext}.

View File

@ -25,7 +25,6 @@ import java.io.*;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
/**
* Implementation of OffsetBackingStore that saves data locally to a file. To ensure this behaves
@ -42,9 +41,9 @@ public class FileOffsetBackingStore extends MemoryOffsetBackingStore {
}
@Override
public void configure(Properties props) {
public void configure(Map<String, ?> props) {
super.configure(props);
String filename = props.getProperty(OFFSET_STORAGE_FILE_FILENAME_CONFIG);
String filename = (String) props.get(OFFSET_STORAGE_FILE_FILENAME_CONFIG);
file = new File(filename);
}

View File

@ -25,7 +25,6 @@ import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@ -47,7 +46,7 @@ public class MemoryOffsetBackingStore implements OffsetBackingStore {
}
@Override
public void configure(Properties props) {
public void configure(Map<String, ?> props) {
}
@Override

View File

@ -17,8 +17,8 @@
package org.apache.kafka.copycat.storage;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.copycat.util.Callback;
import org.apache.kafka.copycat.util.Configurable;
import java.nio.ByteBuffer;
import java.util.Collection;

View File

@ -1,27 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.copycat.util;
import java.util.Properties;
/**
* Class that can be configured immediately after instantiation with a set of properties.
*/
public interface Configurable {
public void configure(Properties props);
}

View File

@ -1,88 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.copycat.runtime.standalone;
import org.apache.kafka.copycat.runtime.ConnectorConfig;
import org.apache.kafka.copycat.sink.SinkConnector;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.easymock.PowerMock;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import java.io.File;
import java.util.Properties;
@RunWith(PowerMockRunner.class)
@PrepareForTest({StandaloneCoordinator.class})
@PowerMockIgnore("javax.management.*")
public class StandaloneCoordinatorRestoreTest extends StandaloneCoordinatorTestBase {
private File coordinatorConfigFile;
@Before
public void setup() throws Exception {
Properties coordinatorProps = new Properties();
coordinatorProps.setProperty(StandaloneCoordinator.STORAGE_CONFIG,
FileConfigStorage.class.getName());
coordinatorConfigFile = File.createTempFile("test-coordinator-config", null);
coordinatorConfigFile.delete(); // Delete since we just needed a random file path
coordinatorProps.setProperty(FileConfigStorage.FILE_CONFIG,
coordinatorConfigFile.getAbsolutePath());
coordinator = new StandaloneCoordinator(worker, coordinatorProps);
connectorProps = new Properties();
connectorProps.setProperty(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME);
connectorProps.setProperty(SinkConnector.TOPICS_CONFIG, TOPICS_LIST_STR);
PowerMock.mockStatic(StandaloneCoordinator.class);
// These can be anything since connectors can pass along whatever they want.
taskProps = new Properties();
taskProps.setProperty("foo", "bar");
}
@After
public void tearDown() {
coordinatorConfigFile.delete();
}
@Test
public void testRestoreConnectors() throws Exception {
connector = PowerMock.createMock(BogusSourceClass.class);
expectAdd(BogusSourceClass.class, BogusSourceTask.class, false);
expectStop();
// Restarting should recreate the same connector
expectRestore(BogusSourceClass.class, BogusSourceTask.class);
expectStop();
PowerMock.replayAll();
// One run to seed the config storage with the job
coordinator.start();
coordinator.addConnector(connectorProps, createCallback);
coordinator.stop();
// Second run should restore the connector
coordinator.start();
coordinator.stop();
PowerMock.verifyAll();
}
}

View File

@ -17,31 +17,51 @@
package org.apache.kafka.copycat.runtime.standalone;
import org.apache.kafka.copycat.connector.Connector;
import org.apache.kafka.copycat.connector.Task;
import org.apache.kafka.copycat.errors.CopycatException;
import org.apache.kafka.copycat.runtime.ConnectorConfig;
import org.apache.kafka.copycat.runtime.Worker;
import org.apache.kafka.copycat.sink.SinkConnector;
import org.apache.kafka.copycat.sink.SinkTask;
import org.apache.kafka.copycat.source.SourceConnector;
import org.apache.kafka.copycat.source.SourceTask;
import org.apache.kafka.copycat.util.Callback;
import org.apache.kafka.copycat.util.ConnectorTaskId;
import org.apache.kafka.copycat.util.FutureCallback;
import org.easymock.EasyMock;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.easymock.PowerMock;
import org.powermock.api.easymock.annotation.Mock;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
@RunWith(PowerMockRunner.class)
@PrepareForTest({StandaloneCoordinator.class})
@PowerMockIgnore("javax.management.*")
public class StandaloneCoordinatorTest extends StandaloneCoordinatorTestBase {
public class StandaloneCoordinatorTest {
private static final String CONNECTOR_NAME = "test";
private static final String TOPICS_LIST_STR = "topic1,topic2";
private StandaloneCoordinator coordinator;
@Mock protected Worker worker;
private Connector connector;
@Mock protected Callback<String> createCallback;
private Properties connectorProps;
private Properties taskProps;
@Before
public void setup() {
worker = PowerMock.createMock(Worker.class);
coordinator = new StandaloneCoordinator(worker, new Properties());
coordinator = new StandaloneCoordinator(worker);
connectorProps = new Properties();
connectorProps.setProperty(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME);
@ -94,4 +114,74 @@ public class StandaloneCoordinatorTest extends StandaloneCoordinatorTestBase {
futureCb.get(1000L, TimeUnit.MILLISECONDS);
PowerMock.verifyAll();
}
private void expectAdd(Class<? extends Connector> connClass,
Class<? extends Task> taskClass,
boolean sink) throws Exception {
expectCreate(connClass, taskClass, sink, true);
}
private void expectRestore(Class<? extends Connector> connClass,
Class<? extends Task> taskClass) throws Exception {
// Restore never uses a callback. These tests always use sources
expectCreate(connClass, taskClass, false, false);
}
private void expectCreate(Class<? extends Connector> connClass,
Class<? extends Task> taskClass,
boolean sink, boolean expectCallback) throws Exception {
connectorProps.setProperty(ConnectorConfig.CONNECTOR_CLASS_CONFIG, connClass.getName());
PowerMock.expectPrivate(StandaloneCoordinator.class, "instantiateConnector", connClass.getName())
.andReturn(connector);
if (expectCallback) {
createCallback.onCompletion(null, CONNECTOR_NAME);
PowerMock.expectLastCall();
}
connector.initialize(EasyMock.anyObject(StandaloneConnectorContext.class));
PowerMock.expectLastCall();
connector.start(new Properties());
PowerMock.expectLastCall();
// Just return the connector properties for the individual task we generate by default
EasyMock.<Class<? extends Task>>expect(connector.getTaskClass()).andReturn(taskClass);
EasyMock.expect(connector.getTaskConfigs(ConnectorConfig.TASKS_MAX_DEFAULT))
.andReturn(Arrays.asList(taskProps));
// And we should instantiate the tasks. For a sink task, we should see added properties for
// the input topic partitions
Properties generatedTaskProps = new Properties();
generatedTaskProps.putAll(taskProps);
if (sink)
generatedTaskProps.setProperty(SinkTask.TOPICS_CONFIG, TOPICS_LIST_STR);
worker.addTask(new ConnectorTaskId(CONNECTOR_NAME, 0), taskClass.getName(), generatedTaskProps);
PowerMock.expectLastCall();
}
private void expectStop() throws CopycatException {
worker.stopTask(new ConnectorTaskId(CONNECTOR_NAME, 0));
EasyMock.expectLastCall();
connector.stop();
EasyMock.expectLastCall();
}
private void expectDestroy() throws CopycatException {
expectStop();
}
// We need to use a real class here due to some issue with mocking java.lang.Class
private abstract class BogusSourceClass extends SourceConnector {
}
private abstract class BogusSourceTask extends SourceTask {
}
private abstract class BogusSinkClass extends SinkConnector {
}
private abstract class BogusSinkTask extends SourceTask {
}
}

View File

@ -1,118 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.copycat.runtime.standalone;
import org.apache.kafka.copycat.connector.Connector;
import org.apache.kafka.copycat.connector.Task;
import org.apache.kafka.copycat.errors.CopycatException;
import org.apache.kafka.copycat.runtime.ConnectorConfig;
import org.apache.kafka.copycat.runtime.Worker;
import org.apache.kafka.copycat.sink.SinkConnector;
import org.apache.kafka.copycat.sink.SinkTask;
import org.apache.kafka.copycat.source.SourceConnector;
import org.apache.kafka.copycat.source.SourceTask;
import org.apache.kafka.copycat.util.Callback;
import org.apache.kafka.copycat.util.ConnectorTaskId;
import org.easymock.EasyMock;
import org.powermock.api.easymock.PowerMock;
import org.powermock.api.easymock.annotation.Mock;
import java.util.Arrays;
import java.util.Properties;
public class StandaloneCoordinatorTestBase {
protected static final String CONNECTOR_NAME = "test";
protected static final String TOPICS_LIST_STR = "topic1,topic2";
protected StandaloneCoordinator coordinator;
@Mock protected Worker worker;
protected Connector connector;
@Mock protected Callback<String> createCallback;
protected Properties connectorProps;
protected Properties taskProps;
protected void expectAdd(Class<? extends Connector> connClass,
Class<? extends Task> taskClass,
boolean sink) throws Exception {
expectCreate(connClass, taskClass, sink, true);
}
protected void expectRestore(Class<? extends Connector> connClass,
Class<? extends Task> taskClass) throws Exception {
// Restore never uses a callback. These tests always use sources
expectCreate(connClass, taskClass, false, false);
}
protected void expectCreate(Class<? extends Connector> connClass,
Class<? extends Task> taskClass,
boolean sink, boolean expectCallback) throws Exception {
connectorProps.setProperty(ConnectorConfig.CONNECTOR_CLASS_CONFIG, connClass.getName());
PowerMock.expectPrivate(StandaloneCoordinator.class, "instantiateConnector", connClass.getName())
.andReturn(connector);
if (expectCallback) {
createCallback.onCompletion(null, CONNECTOR_NAME);
PowerMock.expectLastCall();
}
connector.initialize(EasyMock.anyObject(StandaloneConnectorContext.class));
PowerMock.expectLastCall();
connector.start(new Properties());
PowerMock.expectLastCall();
// Just return the connector properties for the individual task we generate by default
EasyMock.<Class<? extends Task>>expect(connector.getTaskClass()).andReturn(taskClass);
EasyMock.expect(connector.getTaskConfigs(ConnectorConfig.TASKS_MAX_DEFAULT))
.andReturn(Arrays.asList(taskProps));
// And we should instantiate the tasks. For a sink task, we should see added properties for
// the input topic partitions
Properties generatedTaskProps = new Properties();
generatedTaskProps.putAll(taskProps);
if (sink)
generatedTaskProps.setProperty(SinkTask.TOPICS_CONFIG, TOPICS_LIST_STR);
worker.addTask(new ConnectorTaskId(CONNECTOR_NAME, 0), taskClass.getName(), generatedTaskProps);
PowerMock.expectLastCall();
}
protected void expectStop() throws CopycatException {
worker.stopTask(new ConnectorTaskId(CONNECTOR_NAME, 0));
EasyMock.expectLastCall();
connector.stop();
EasyMock.expectLastCall();
}
protected void expectDestroy() throws CopycatException {
expectStop();
}
// We need to use a real class here due to some issue with mocking java.lang.Class
protected abstract class BogusSourceClass extends SourceConnector {
}
protected abstract class BogusSourceTask extends SourceTask {
}
protected abstract class BogusSinkClass extends SinkConnector {
}
protected abstract class BogusSinkTask extends SourceTask {
}
}

View File

@ -30,14 +30,13 @@ import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import static org.junit.Assert.assertEquals;
public class FileOffsetBackingStoreTest {
FileOffsetBackingStore store;
Properties props;
Map<String, Object> props;
File tempFile;
private static Map<ByteBuffer, ByteBuffer> firstSet = new HashMap<>();
@ -51,9 +50,8 @@ public class FileOffsetBackingStoreTest {
public void setup() throws IOException {
store = new FileOffsetBackingStore();
tempFile = File.createTempFile("fileoffsetbackingstore", null);
props = new Properties();
props.setProperty(FileOffsetBackingStore.OFFSET_STORAGE_FILE_FILENAME_CONFIG,
tempFile.getAbsolutePath());
props = new HashMap<>();
props.put(FileOffsetBackingStore.OFFSET_STORAGE_FILE_FILENAME_CONFIG, tempFile.getAbsolutePath());
store.configure(props);
store.start();
}