Add CLI tools for Copycat.

This commit is contained in:
Ewen Cheslack-Postava 2015-07-26 13:48:00 -07:00
parent e14942cb20
commit 31cd1caf3c
11 changed files with 173 additions and 14 deletions

17
bin/copycat.sh Executable file
View File

@ -0,0 +1,17 @@
#!/bin/sh
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.copycat.cli.Copycat "$@"

View File

@ -65,6 +65,14 @@ do
CLASSPATH=$CLASSPATH:$file
done
for pkg in "copycat-api" "copycat-avro" "copycat-data" "copycat-file" "copycat-runtime"
do
for file in $base_dir/${pkg}/build/libs/${pkg}*.jar $base_dir/${pkg}/build/dependant-libs/*.jar;
do
CLASSPATH=$CLASSPATH:$file
done
done
# classpath addition for release
for file in $base_dir/libs/*.jar;
do

View File

@ -579,6 +579,18 @@ project(':copycat-avro') {
configFile = new File(rootDir, "checkstyle/checkstyle.xml")
}
test.dependsOn('checkstyleMain', 'checkstyleTest')
tasks.create(name: "copyDependantLibs", type: Copy) {
from (configurations.runtime) {
exclude('kafka-clients*')
exclude('copycat-*')
}
into "$buildDir/dependant-libs"
}
jar {
dependsOn copyDependantLibs
}
}
project(':copycat-runtime') {

View File

@ -0,0 +1,19 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
name=local-console-sink
connector.class=org.apache.kafka.copycat.file.FileStreamSinkConnector
tasks.max=1
topics=test

View File

@ -0,0 +1,19 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
name=local-console-source
connector.class=org.apache.kafka.copycat.file.FileStreamSourceConnector
tasks.max=1
topic=test

View File

@ -0,0 +1,20 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
name=local-file-sink
connector.class=org.apache.kafka.copycat.file.FileStreamSinkConnector
tasks.max=1
file=test.sink.txt
topics=test

View File

@ -0,0 +1,20 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
name=local-file-source
connector.class=org.apache.kafka.copycat.file.FileStreamSourceConnector
tasks.max=1
file=test.txt
topic=test

View File

@ -0,0 +1,26 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
zookeeper.connect=localhost:2181
bootstrap.servers=localhost:9092
schema.registry.url=http://localhost:8081
offset.storage.class=org.apache.kafka.copycat.storage.FileOffsetBackingStore
offset.storage.file.filename=/tmp/copycat.offsets
# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000
coordinator.standalone.storage=org.apache.kafka.copycat.runtime.standalone.FileConfigStorage
config.storage.file=/tmp/copycat.configs

View File

@ -0,0 +1,19 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# These are defaults. This file just demonstrates how to override some settings.
zookeeper.connect=localhost:2181
bootstrap.servers=localhost:9092
schema.registry.url=http://localhost:8081

View File

@ -20,7 +20,6 @@ package org.apache.kafka.copycat.cli;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.copycat.runtime.Coordinator;
import org.apache.kafka.copycat.runtime.Copycat;
import org.apache.kafka.copycat.runtime.Worker;
import org.apache.kafka.copycat.runtime.standalone.StandaloneCoordinator;
import org.apache.kafka.copycat.util.Callback;
@ -41,16 +40,16 @@ import java.util.Properties;
* fault tolerant by overriding the settings to use file storage for both.
* </p>
*/
public class CopycatCommand {
private static final Logger log = LoggerFactory.getLogger(CopycatCommand.class);
public class Copycat {
private static final Logger log = LoggerFactory.getLogger(Copycat.class);
public static void main(String[] args) throws Exception {
CopycatCommandConfig config;
CopycatConfig config;
Properties workerProps;
Properties connectorProps;
try {
config = CopycatCommandConfig.parseCommandLineArgs(args);
config = CopycatConfig.parseCommandLineArgs(args);
} catch (ConfigException e) {
log.error(e.getMessage());
log.info("Usage: copycat [--worker-config worker.properties]"
@ -60,18 +59,18 @@ public class CopycatCommand {
return;
}
String workerPropsFile = config.getString(CopycatCommandConfig.WORKER_PROPERTIES_CONFIG);
String workerPropsFile = config.getString(CopycatConfig.WORKER_PROPERTIES_CONFIG);
workerProps = !workerPropsFile.isEmpty() ? Utils.loadProps(workerPropsFile) : new Properties();
WorkerConfig workerConfig = new WorkerConfig(workerProps);
Worker worker = new Worker(workerConfig);
Coordinator coordinator = new StandaloneCoordinator(worker, workerConfig.getUnusedProperties());
final Copycat copycat = new Copycat(worker, coordinator);
final org.apache.kafka.copycat.runtime.Copycat copycat = new org.apache.kafka.copycat.runtime.Copycat(worker, coordinator);
copycat.start();
try {
// Destroy any requested connectors
for (final String connName : config.getList(CopycatCommandConfig.DELETE_CONNECTORS_CONFIG)) {
for (final String connName : config.getList(CopycatConfig.DELETE_CONNECTORS_CONFIG)) {
FutureCallback cb = new FutureCallback(new Callback<Void>() {
@Override
public void onCompletion(Throwable error, Void result) {
@ -86,7 +85,7 @@ public class CopycatCommand {
// Create any new connectors
for (final String connectorPropsFile : config
.getList(CopycatCommandConfig.CREATE_CONNECTORS_CONFIG)) {
.getList(CopycatConfig.CREATE_CONNECTORS_CONFIG)) {
connectorProps = Utils.loadProps(connectorPropsFile);
FutureCallback cb = new FutureCallback(new Callback<String>() {
@Override

View File

@ -24,7 +24,7 @@ import org.apache.kafka.common.config.ConfigDef.Type;
import java.util.Properties;
public class CopycatCommandConfig extends AbstractConfig {
public class CopycatConfig extends AbstractConfig {
public static final String WORKER_PROPERTIES_CONFIG = "worker-config";
public static final String WORKER_PROPERTIES_CONFIG_DEFAULT = "";
@ -55,18 +55,18 @@ public class CopycatCommandConfig extends AbstractConfig {
private Properties originalProperties;
public CopycatCommandConfig(Properties props) {
public CopycatConfig(Properties props) {
super(config, props);
this.originalProperties = props;
}
/**
* Parses command line arguments into a Properties object and instantiate a
* CopycatCommandConfig with it.
* CopycatConfig with it.
* @param args
* @return
*/
public static CopycatCommandConfig parseCommandLineArgs(String[] args) {
public static CopycatConfig parseCommandLineArgs(String[] args) {
Properties props = new Properties();
for (int i = 0; i < args.length; i++) {
String arg = args[i];
@ -91,6 +91,6 @@ public class CopycatCommandConfig extends AbstractConfig {
props.setProperty(key, value);
}
return new CopycatCommandConfig(props);
return new CopycatConfig(props);
}
}