mirror of https://github.com/apache/kafka.git
Add CLI tools for Copycat.
This commit is contained in:
parent
e14942cb20
commit
31cd1caf3c
|
@ -0,0 +1,17 @@
|
||||||
|
#!/bin/sh
|
||||||
|
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
# contributor license agreements. See the NOTICE file distributed with
|
||||||
|
# this work for additional information regarding copyright ownership.
|
||||||
|
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
# (the "License"); you may not use this file except in compliance with
|
||||||
|
# the License. You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.copycat.cli.Copycat "$@"
|
|
@ -65,6 +65,14 @@ do
|
||||||
CLASSPATH=$CLASSPATH:$file
|
CLASSPATH=$CLASSPATH:$file
|
||||||
done
|
done
|
||||||
|
|
||||||
|
for pkg in "copycat-api" "copycat-avro" "copycat-data" "copycat-file" "copycat-runtime"
|
||||||
|
do
|
||||||
|
for file in $base_dir/${pkg}/build/libs/${pkg}*.jar $base_dir/${pkg}/build/dependant-libs/*.jar;
|
||||||
|
do
|
||||||
|
CLASSPATH=$CLASSPATH:$file
|
||||||
|
done
|
||||||
|
done
|
||||||
|
|
||||||
# classpath addition for release
|
# classpath addition for release
|
||||||
for file in $base_dir/libs/*.jar;
|
for file in $base_dir/libs/*.jar;
|
||||||
do
|
do
|
||||||
|
|
12
build.gradle
12
build.gradle
|
@ -579,6 +579,18 @@ project(':copycat-avro') {
|
||||||
configFile = new File(rootDir, "checkstyle/checkstyle.xml")
|
configFile = new File(rootDir, "checkstyle/checkstyle.xml")
|
||||||
}
|
}
|
||||||
test.dependsOn('checkstyleMain', 'checkstyleTest')
|
test.dependsOn('checkstyleMain', 'checkstyleTest')
|
||||||
|
|
||||||
|
tasks.create(name: "copyDependantLibs", type: Copy) {
|
||||||
|
from (configurations.runtime) {
|
||||||
|
exclude('kafka-clients*')
|
||||||
|
exclude('copycat-*')
|
||||||
|
}
|
||||||
|
into "$buildDir/dependant-libs"
|
||||||
|
}
|
||||||
|
|
||||||
|
jar {
|
||||||
|
dependsOn copyDependantLibs
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
project(':copycat-runtime') {
|
project(':copycat-runtime') {
|
||||||
|
|
|
@ -0,0 +1,19 @@
|
||||||
|
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
# contributor license agreements. See the NOTICE file distributed with
|
||||||
|
# this work for additional information regarding copyright ownership.
|
||||||
|
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
# (the "License"); you may not use this file except in compliance with
|
||||||
|
# the License. You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
name=local-console-sink
|
||||||
|
connector.class=org.apache.kafka.copycat.file.FileStreamSinkConnector
|
||||||
|
tasks.max=1
|
||||||
|
topics=test
|
|
@ -0,0 +1,19 @@
|
||||||
|
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
# contributor license agreements. See the NOTICE file distributed with
|
||||||
|
# this work for additional information regarding copyright ownership.
|
||||||
|
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
# (the "License"); you may not use this file except in compliance with
|
||||||
|
# the License. You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
name=local-console-source
|
||||||
|
connector.class=org.apache.kafka.copycat.file.FileStreamSourceConnector
|
||||||
|
tasks.max=1
|
||||||
|
topic=test
|
|
@ -0,0 +1,20 @@
|
||||||
|
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
# contributor license agreements. See the NOTICE file distributed with
|
||||||
|
# this work for additional information regarding copyright ownership.
|
||||||
|
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
# (the "License"); you may not use this file except in compliance with
|
||||||
|
# the License. You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
name=local-file-sink
|
||||||
|
connector.class=org.apache.kafka.copycat.file.FileStreamSinkConnector
|
||||||
|
tasks.max=1
|
||||||
|
file=test.sink.txt
|
||||||
|
topics=test
|
|
@ -0,0 +1,20 @@
|
||||||
|
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
# contributor license agreements. See the NOTICE file distributed with
|
||||||
|
# this work for additional information regarding copyright ownership.
|
||||||
|
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
# (the "License"); you may not use this file except in compliance with
|
||||||
|
# the License. You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
name=local-file-source
|
||||||
|
connector.class=org.apache.kafka.copycat.file.FileStreamSourceConnector
|
||||||
|
tasks.max=1
|
||||||
|
file=test.txt
|
||||||
|
topic=test
|
|
@ -0,0 +1,26 @@
|
||||||
|
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
# contributor license agreements. See the NOTICE file distributed with
|
||||||
|
# this work for additional information regarding copyright ownership.
|
||||||
|
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
# (the "License"); you may not use this file except in compliance with
|
||||||
|
# the License. You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
zookeeper.connect=localhost:2181
|
||||||
|
bootstrap.servers=localhost:9092
|
||||||
|
schema.registry.url=http://localhost:8081
|
||||||
|
|
||||||
|
offset.storage.class=org.apache.kafka.copycat.storage.FileOffsetBackingStore
|
||||||
|
offset.storage.file.filename=/tmp/copycat.offsets
|
||||||
|
# Flush much faster than normal, which is useful for testing/debugging
|
||||||
|
offset.flush.interval.ms=10000
|
||||||
|
|
||||||
|
coordinator.standalone.storage=org.apache.kafka.copycat.runtime.standalone.FileConfigStorage
|
||||||
|
config.storage.file=/tmp/copycat.configs
|
|
@ -0,0 +1,19 @@
|
||||||
|
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
# contributor license agreements. See the NOTICE file distributed with
|
||||||
|
# this work for additional information regarding copyright ownership.
|
||||||
|
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
# (the "License"); you may not use this file except in compliance with
|
||||||
|
# the License. You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
# These are defaults. This file just demonstrates how to override some settings.
|
||||||
|
zookeeper.connect=localhost:2181
|
||||||
|
bootstrap.servers=localhost:9092
|
||||||
|
schema.registry.url=http://localhost:8081
|
|
@ -20,7 +20,6 @@ package org.apache.kafka.copycat.cli;
|
||||||
import org.apache.kafka.common.config.ConfigException;
|
import org.apache.kafka.common.config.ConfigException;
|
||||||
import org.apache.kafka.common.utils.Utils;
|
import org.apache.kafka.common.utils.Utils;
|
||||||
import org.apache.kafka.copycat.runtime.Coordinator;
|
import org.apache.kafka.copycat.runtime.Coordinator;
|
||||||
import org.apache.kafka.copycat.runtime.Copycat;
|
|
||||||
import org.apache.kafka.copycat.runtime.Worker;
|
import org.apache.kafka.copycat.runtime.Worker;
|
||||||
import org.apache.kafka.copycat.runtime.standalone.StandaloneCoordinator;
|
import org.apache.kafka.copycat.runtime.standalone.StandaloneCoordinator;
|
||||||
import org.apache.kafka.copycat.util.Callback;
|
import org.apache.kafka.copycat.util.Callback;
|
||||||
|
@ -41,16 +40,16 @@ import java.util.Properties;
|
||||||
* fault tolerant by overriding the settings to use file storage for both.
|
* fault tolerant by overriding the settings to use file storage for both.
|
||||||
* </p>
|
* </p>
|
||||||
*/
|
*/
|
||||||
public class CopycatCommand {
|
public class Copycat {
|
||||||
private static final Logger log = LoggerFactory.getLogger(CopycatCommand.class);
|
private static final Logger log = LoggerFactory.getLogger(Copycat.class);
|
||||||
|
|
||||||
public static void main(String[] args) throws Exception {
|
public static void main(String[] args) throws Exception {
|
||||||
CopycatCommandConfig config;
|
CopycatConfig config;
|
||||||
Properties workerProps;
|
Properties workerProps;
|
||||||
Properties connectorProps;
|
Properties connectorProps;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
config = CopycatCommandConfig.parseCommandLineArgs(args);
|
config = CopycatConfig.parseCommandLineArgs(args);
|
||||||
} catch (ConfigException e) {
|
} catch (ConfigException e) {
|
||||||
log.error(e.getMessage());
|
log.error(e.getMessage());
|
||||||
log.info("Usage: copycat [--worker-config worker.properties]"
|
log.info("Usage: copycat [--worker-config worker.properties]"
|
||||||
|
@ -60,18 +59,18 @@ public class CopycatCommand {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
String workerPropsFile = config.getString(CopycatCommandConfig.WORKER_PROPERTIES_CONFIG);
|
String workerPropsFile = config.getString(CopycatConfig.WORKER_PROPERTIES_CONFIG);
|
||||||
workerProps = !workerPropsFile.isEmpty() ? Utils.loadProps(workerPropsFile) : new Properties();
|
workerProps = !workerPropsFile.isEmpty() ? Utils.loadProps(workerPropsFile) : new Properties();
|
||||||
|
|
||||||
WorkerConfig workerConfig = new WorkerConfig(workerProps);
|
WorkerConfig workerConfig = new WorkerConfig(workerProps);
|
||||||
Worker worker = new Worker(workerConfig);
|
Worker worker = new Worker(workerConfig);
|
||||||
Coordinator coordinator = new StandaloneCoordinator(worker, workerConfig.getUnusedProperties());
|
Coordinator coordinator = new StandaloneCoordinator(worker, workerConfig.getUnusedProperties());
|
||||||
final Copycat copycat = new Copycat(worker, coordinator);
|
final org.apache.kafka.copycat.runtime.Copycat copycat = new org.apache.kafka.copycat.runtime.Copycat(worker, coordinator);
|
||||||
copycat.start();
|
copycat.start();
|
||||||
|
|
||||||
try {
|
try {
|
||||||
// Destroy any requested connectors
|
// Destroy any requested connectors
|
||||||
for (final String connName : config.getList(CopycatCommandConfig.DELETE_CONNECTORS_CONFIG)) {
|
for (final String connName : config.getList(CopycatConfig.DELETE_CONNECTORS_CONFIG)) {
|
||||||
FutureCallback cb = new FutureCallback(new Callback<Void>() {
|
FutureCallback cb = new FutureCallback(new Callback<Void>() {
|
||||||
@Override
|
@Override
|
||||||
public void onCompletion(Throwable error, Void result) {
|
public void onCompletion(Throwable error, Void result) {
|
||||||
|
@ -86,7 +85,7 @@ public class CopycatCommand {
|
||||||
|
|
||||||
// Create any new connectors
|
// Create any new connectors
|
||||||
for (final String connectorPropsFile : config
|
for (final String connectorPropsFile : config
|
||||||
.getList(CopycatCommandConfig.CREATE_CONNECTORS_CONFIG)) {
|
.getList(CopycatConfig.CREATE_CONNECTORS_CONFIG)) {
|
||||||
connectorProps = Utils.loadProps(connectorPropsFile);
|
connectorProps = Utils.loadProps(connectorPropsFile);
|
||||||
FutureCallback cb = new FutureCallback(new Callback<String>() {
|
FutureCallback cb = new FutureCallback(new Callback<String>() {
|
||||||
@Override
|
@Override
|
|
@ -24,7 +24,7 @@ import org.apache.kafka.common.config.ConfigDef.Type;
|
||||||
|
|
||||||
import java.util.Properties;
|
import java.util.Properties;
|
||||||
|
|
||||||
public class CopycatCommandConfig extends AbstractConfig {
|
public class CopycatConfig extends AbstractConfig {
|
||||||
|
|
||||||
public static final String WORKER_PROPERTIES_CONFIG = "worker-config";
|
public static final String WORKER_PROPERTIES_CONFIG = "worker-config";
|
||||||
public static final String WORKER_PROPERTIES_CONFIG_DEFAULT = "";
|
public static final String WORKER_PROPERTIES_CONFIG_DEFAULT = "";
|
||||||
|
@ -55,18 +55,18 @@ public class CopycatCommandConfig extends AbstractConfig {
|
||||||
|
|
||||||
private Properties originalProperties;
|
private Properties originalProperties;
|
||||||
|
|
||||||
public CopycatCommandConfig(Properties props) {
|
public CopycatConfig(Properties props) {
|
||||||
super(config, props);
|
super(config, props);
|
||||||
this.originalProperties = props;
|
this.originalProperties = props;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Parses command line arguments into a Properties object and instantiate a
|
* Parses command line arguments into a Properties object and instantiate a
|
||||||
* CopycatCommandConfig with it.
|
* CopycatConfig with it.
|
||||||
* @param args
|
* @param args
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
public static CopycatCommandConfig parseCommandLineArgs(String[] args) {
|
public static CopycatConfig parseCommandLineArgs(String[] args) {
|
||||||
Properties props = new Properties();
|
Properties props = new Properties();
|
||||||
for (int i = 0; i < args.length; i++) {
|
for (int i = 0; i < args.length; i++) {
|
||||||
String arg = args[i];
|
String arg = args[i];
|
||||||
|
@ -91,6 +91,6 @@ public class CopycatCommandConfig extends AbstractConfig {
|
||||||
props.setProperty(key, value);
|
props.setProperty(key, value);
|
||||||
}
|
}
|
||||||
|
|
||||||
return new CopycatCommandConfig(props);
|
return new CopycatConfig(props);
|
||||||
}
|
}
|
||||||
}
|
}
|
Loading…
Reference in New Issue