mirror of https://github.com/apache/kafka.git
Add CLI tools for Copycat.
This commit is contained in:
parent
e14942cb20
commit
31cd1caf3c
|
@ -0,0 +1,17 @@
|
|||
#!/bin/sh
|
||||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.copycat.cli.Copycat "$@"
|
|
@ -65,6 +65,14 @@ do
|
|||
CLASSPATH=$CLASSPATH:$file
|
||||
done
|
||||
|
||||
for pkg in "copycat-api" "copycat-avro" "copycat-data" "copycat-file" "copycat-runtime"
|
||||
do
|
||||
for file in $base_dir/${pkg}/build/libs/${pkg}*.jar $base_dir/${pkg}/build/dependant-libs/*.jar;
|
||||
do
|
||||
CLASSPATH=$CLASSPATH:$file
|
||||
done
|
||||
done
|
||||
|
||||
# classpath addition for release
|
||||
for file in $base_dir/libs/*.jar;
|
||||
do
|
||||
|
|
12
build.gradle
12
build.gradle
|
@ -579,6 +579,18 @@ project(':copycat-avro') {
|
|||
configFile = new File(rootDir, "checkstyle/checkstyle.xml")
|
||||
}
|
||||
test.dependsOn('checkstyleMain', 'checkstyleTest')
|
||||
|
||||
tasks.create(name: "copyDependantLibs", type: Copy) {
|
||||
from (configurations.runtime) {
|
||||
exclude('kafka-clients*')
|
||||
exclude('copycat-*')
|
||||
}
|
||||
into "$buildDir/dependant-libs"
|
||||
}
|
||||
|
||||
jar {
|
||||
dependsOn copyDependantLibs
|
||||
}
|
||||
}
|
||||
|
||||
project(':copycat-runtime') {
|
||||
|
|
|
@ -0,0 +1,19 @@
|
|||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
name=local-console-sink
|
||||
connector.class=org.apache.kafka.copycat.file.FileStreamSinkConnector
|
||||
tasks.max=1
|
||||
topics=test
|
|
@ -0,0 +1,19 @@
|
|||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
name=local-console-source
|
||||
connector.class=org.apache.kafka.copycat.file.FileStreamSourceConnector
|
||||
tasks.max=1
|
||||
topic=test
|
|
@ -0,0 +1,20 @@
|
|||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
name=local-file-sink
|
||||
connector.class=org.apache.kafka.copycat.file.FileStreamSinkConnector
|
||||
tasks.max=1
|
||||
file=test.sink.txt
|
||||
topics=test
|
|
@ -0,0 +1,20 @@
|
|||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
name=local-file-source
|
||||
connector.class=org.apache.kafka.copycat.file.FileStreamSourceConnector
|
||||
tasks.max=1
|
||||
file=test.txt
|
||||
topic=test
|
|
@ -0,0 +1,26 @@
|
|||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
zookeeper.connect=localhost:2181
|
||||
bootstrap.servers=localhost:9092
|
||||
schema.registry.url=http://localhost:8081
|
||||
|
||||
offset.storage.class=org.apache.kafka.copycat.storage.FileOffsetBackingStore
|
||||
offset.storage.file.filename=/tmp/copycat.offsets
|
||||
# Flush much faster than normal, which is useful for testing/debugging
|
||||
offset.flush.interval.ms=10000
|
||||
|
||||
coordinator.standalone.storage=org.apache.kafka.copycat.runtime.standalone.FileConfigStorage
|
||||
config.storage.file=/tmp/copycat.configs
|
|
@ -0,0 +1,19 @@
|
|||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
# These are defaults. This file just demonstrates how to override some settings.
|
||||
zookeeper.connect=localhost:2181
|
||||
bootstrap.servers=localhost:9092
|
||||
schema.registry.url=http://localhost:8081
|
|
@ -20,7 +20,6 @@ package org.apache.kafka.copycat.cli;
|
|||
import org.apache.kafka.common.config.ConfigException;
|
||||
import org.apache.kafka.common.utils.Utils;
|
||||
import org.apache.kafka.copycat.runtime.Coordinator;
|
||||
import org.apache.kafka.copycat.runtime.Copycat;
|
||||
import org.apache.kafka.copycat.runtime.Worker;
|
||||
import org.apache.kafka.copycat.runtime.standalone.StandaloneCoordinator;
|
||||
import org.apache.kafka.copycat.util.Callback;
|
||||
|
@ -41,16 +40,16 @@ import java.util.Properties;
|
|||
* fault tolerant by overriding the settings to use file storage for both.
|
||||
* </p>
|
||||
*/
|
||||
public class CopycatCommand {
|
||||
private static final Logger log = LoggerFactory.getLogger(CopycatCommand.class);
|
||||
public class Copycat {
|
||||
private static final Logger log = LoggerFactory.getLogger(Copycat.class);
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
CopycatCommandConfig config;
|
||||
CopycatConfig config;
|
||||
Properties workerProps;
|
||||
Properties connectorProps;
|
||||
|
||||
try {
|
||||
config = CopycatCommandConfig.parseCommandLineArgs(args);
|
||||
config = CopycatConfig.parseCommandLineArgs(args);
|
||||
} catch (ConfigException e) {
|
||||
log.error(e.getMessage());
|
||||
log.info("Usage: copycat [--worker-config worker.properties]"
|
||||
|
@ -60,18 +59,18 @@ public class CopycatCommand {
|
|||
return;
|
||||
}
|
||||
|
||||
String workerPropsFile = config.getString(CopycatCommandConfig.WORKER_PROPERTIES_CONFIG);
|
||||
String workerPropsFile = config.getString(CopycatConfig.WORKER_PROPERTIES_CONFIG);
|
||||
workerProps = !workerPropsFile.isEmpty() ? Utils.loadProps(workerPropsFile) : new Properties();
|
||||
|
||||
WorkerConfig workerConfig = new WorkerConfig(workerProps);
|
||||
Worker worker = new Worker(workerConfig);
|
||||
Coordinator coordinator = new StandaloneCoordinator(worker, workerConfig.getUnusedProperties());
|
||||
final Copycat copycat = new Copycat(worker, coordinator);
|
||||
final org.apache.kafka.copycat.runtime.Copycat copycat = new org.apache.kafka.copycat.runtime.Copycat(worker, coordinator);
|
||||
copycat.start();
|
||||
|
||||
try {
|
||||
// Destroy any requested connectors
|
||||
for (final String connName : config.getList(CopycatCommandConfig.DELETE_CONNECTORS_CONFIG)) {
|
||||
for (final String connName : config.getList(CopycatConfig.DELETE_CONNECTORS_CONFIG)) {
|
||||
FutureCallback cb = new FutureCallback(new Callback<Void>() {
|
||||
@Override
|
||||
public void onCompletion(Throwable error, Void result) {
|
||||
|
@ -86,7 +85,7 @@ public class CopycatCommand {
|
|||
|
||||
// Create any new connectors
|
||||
for (final String connectorPropsFile : config
|
||||
.getList(CopycatCommandConfig.CREATE_CONNECTORS_CONFIG)) {
|
||||
.getList(CopycatConfig.CREATE_CONNECTORS_CONFIG)) {
|
||||
connectorProps = Utils.loadProps(connectorPropsFile);
|
||||
FutureCallback cb = new FutureCallback(new Callback<String>() {
|
||||
@Override
|
|
@ -24,7 +24,7 @@ import org.apache.kafka.common.config.ConfigDef.Type;
|
|||
|
||||
import java.util.Properties;
|
||||
|
||||
public class CopycatCommandConfig extends AbstractConfig {
|
||||
public class CopycatConfig extends AbstractConfig {
|
||||
|
||||
public static final String WORKER_PROPERTIES_CONFIG = "worker-config";
|
||||
public static final String WORKER_PROPERTIES_CONFIG_DEFAULT = "";
|
||||
|
@ -55,18 +55,18 @@ public class CopycatCommandConfig extends AbstractConfig {
|
|||
|
||||
private Properties originalProperties;
|
||||
|
||||
public CopycatCommandConfig(Properties props) {
|
||||
public CopycatConfig(Properties props) {
|
||||
super(config, props);
|
||||
this.originalProperties = props;
|
||||
}
|
||||
|
||||
/**
|
||||
* Parses command line arguments into a Properties object and instantiate a
|
||||
* CopycatCommandConfig with it.
|
||||
* CopycatConfig with it.
|
||||
* @param args
|
||||
* @return
|
||||
*/
|
||||
public static CopycatCommandConfig parseCommandLineArgs(String[] args) {
|
||||
public static CopycatConfig parseCommandLineArgs(String[] args) {
|
||||
Properties props = new Properties();
|
||||
for (int i = 0; i < args.length; i++) {
|
||||
String arg = args[i];
|
||||
|
@ -91,6 +91,6 @@ public class CopycatCommandConfig extends AbstractConfig {
|
|||
props.setProperty(key, value);
|
||||
}
|
||||
|
||||
return new CopycatCommandConfig(props);
|
||||
return new CopycatConfig(props);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue