mirror of https://github.com/apache/kafka.git
Simplify Copycat exceptions, make them a subclass of KafkaException.
This commit is contained in:
parent
8c108b0cac
commit
a3a47a6e8f
|
@ -18,7 +18,6 @@
|
|||
package org.apache.kafka.copycat.connector;
|
||||
|
||||
import org.apache.kafka.common.annotation.InterfaceStability;
|
||||
import org.apache.kafka.copycat.errors.CopycatException;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
|
@ -81,9 +80,8 @@ public abstract class Connector {
|
|||
* either just been instantiated and initialized or {@link #stop()} has been invoked.
|
||||
*
|
||||
* @param props configuration settings
|
||||
* @throws CopycatException
|
||||
*/
|
||||
public abstract void start(Properties props) throws CopycatException;
|
||||
public abstract void start(Properties props);
|
||||
|
||||
/**
|
||||
* Reconfigure this Connector. Most implementations will not override this, using the default
|
||||
|
@ -92,9 +90,8 @@ public abstract class Connector {
|
|||
* efficiently, e.g. without shutting down network connections to the external system.
|
||||
*
|
||||
* @param props new configuration settings
|
||||
* @throws CopycatException
|
||||
*/
|
||||
public void reconfigure(Properties props) throws CopycatException {
|
||||
public void reconfigure(Properties props) {
|
||||
stop();
|
||||
start(props);
|
||||
}
|
||||
|
@ -115,8 +112,6 @@ public abstract class Connector {
|
|||
|
||||
/**
|
||||
* Stop this connector.
|
||||
*
|
||||
* @throws CopycatException
|
||||
*/
|
||||
public abstract void stop() throws CopycatException;
|
||||
public abstract void stop();
|
||||
}
|
||||
|
|
|
@ -18,7 +18,6 @@
|
|||
package org.apache.kafka.copycat.connector;
|
||||
|
||||
import org.apache.kafka.common.annotation.InterfaceStability;
|
||||
import org.apache.kafka.copycat.errors.CopycatException;
|
||||
|
||||
import java.util.Properties;
|
||||
|
||||
|
@ -45,8 +44,6 @@ public interface Task {
|
|||
|
||||
/**
|
||||
* Stop this task.
|
||||
*
|
||||
* @throws CopycatException
|
||||
*/
|
||||
void stop() throws CopycatException;
|
||||
void stop();
|
||||
}
|
||||
|
|
|
@ -17,13 +17,14 @@
|
|||
|
||||
package org.apache.kafka.copycat.errors;
|
||||
|
||||
import org.apache.kafka.common.KafkaException;
|
||||
import org.apache.kafka.common.annotation.InterfaceStability;
|
||||
|
||||
/**
|
||||
* CopycatException is the top-level exception type generated by Copycat and connectors.
|
||||
*/
|
||||
@InterfaceStability.Unstable
|
||||
public class CopycatException extends Exception {
|
||||
public class CopycatException extends KafkaException {
|
||||
|
||||
public CopycatException(String s) {
|
||||
super(s);
|
||||
|
|
|
@ -1,35 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
**/
|
||||
|
||||
package org.apache.kafka.copycat.errors;
|
||||
|
||||
import org.apache.kafka.common.annotation.InterfaceStability;
|
||||
|
||||
@InterfaceStability.Unstable
|
||||
public class CopycatRuntimeException extends RuntimeException {
|
||||
public CopycatRuntimeException(String s) {
|
||||
super(s);
|
||||
}
|
||||
|
||||
public CopycatRuntimeException(String s, Throwable throwable) {
|
||||
super(s, throwable);
|
||||
}
|
||||
|
||||
public CopycatRuntimeException(Throwable throwable) {
|
||||
super(throwable);
|
||||
}
|
||||
}
|
|
@ -19,7 +19,6 @@ package org.apache.kafka.copycat.sink;
|
|||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.apache.kafka.common.annotation.InterfaceStability;
|
||||
import org.apache.kafka.copycat.connector.Task;
|
||||
import org.apache.kafka.copycat.errors.CopycatException;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.Map;
|
||||
|
@ -52,7 +51,7 @@ public abstract class SinkTask implements Task {
|
|||
*
|
||||
* @param records the set of records to send
|
||||
*/
|
||||
public abstract void put(Collection<SinkRecord> records) throws CopycatException;
|
||||
public abstract void put(Collection<SinkRecord> records);
|
||||
|
||||
/**
|
||||
* Flush all records that have been {@link #put} for the specified topic-partitions. The
|
||||
|
|
|
@ -52,7 +52,7 @@ public class ConnectorReconfigurationTest {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void start(Properties props) throws CopycatException {
|
||||
public void start(Properties props) {
|
||||
configureOrder = order++;
|
||||
}
|
||||
|
||||
|
@ -67,7 +67,7 @@ public class ConnectorReconfigurationTest {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void stop() throws CopycatException {
|
||||
public void stop() {
|
||||
stopOrder = order++;
|
||||
if (stopException)
|
||||
throw new CopycatException("error");
|
||||
|
|
|
@ -18,7 +18,6 @@
|
|||
package org.apache.kafka.copycat.file;
|
||||
|
||||
import org.apache.kafka.copycat.connector.Task;
|
||||
import org.apache.kafka.copycat.errors.CopycatException;
|
||||
import org.apache.kafka.copycat.sink.SinkConnector;
|
||||
|
||||
import java.util.ArrayList;
|
||||
|
@ -35,7 +34,7 @@ public class FileStreamSinkConnector extends SinkConnector {
|
|||
private String filename;
|
||||
|
||||
@Override
|
||||
public void start(Properties props) throws CopycatException {
|
||||
public void start(Properties props) {
|
||||
filename = props.getProperty(FILE_CONFIG);
|
||||
}
|
||||
|
||||
|
@ -57,7 +56,7 @@ public class FileStreamSinkConnector extends SinkConnector {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void stop() throws CopycatException {
|
||||
public void stop() {
|
||||
// Nothing to do since FileStreamSinkConnector has no background monitoring.
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,7 +19,6 @@ package org.apache.kafka.copycat.file;
|
|||
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.apache.kafka.copycat.errors.CopycatException;
|
||||
import org.apache.kafka.copycat.errors.CopycatRuntimeException;
|
||||
import org.apache.kafka.copycat.sink.SinkRecord;
|
||||
import org.apache.kafka.copycat.sink.SinkTask;
|
||||
import org.slf4j.Logger;
|
||||
|
@ -57,14 +56,13 @@ public class FileStreamSinkTask extends SinkTask {
|
|||
try {
|
||||
outputStream = new PrintStream(new File(filename));
|
||||
} catch (FileNotFoundException e) {
|
||||
throw new CopycatRuntimeException(
|
||||
"Couldn't find or create file for FileStreamSinkTask: {}", e);
|
||||
throw new CopycatException("Couldn't find or create file for FileStreamSinkTask: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void put(Collection<SinkRecord> sinkRecords) throws CopycatException {
|
||||
public void put(Collection<SinkRecord> sinkRecords) {
|
||||
for (SinkRecord record : sinkRecords) {
|
||||
outputStream.println(record.getValue());
|
||||
}
|
||||
|
@ -76,6 +74,6 @@ public class FileStreamSinkTask extends SinkTask {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void stop() throws CopycatException {
|
||||
public void stop() {
|
||||
}
|
||||
}
|
||||
|
|
|
@ -37,7 +37,7 @@ public class FileStreamSourceConnector extends SourceConnector {
|
|||
private String topic;
|
||||
|
||||
@Override
|
||||
public void start(Properties props) throws CopycatException {
|
||||
public void start(Properties props) {
|
||||
filename = props.getProperty(FILE_CONFIG);
|
||||
topic = props.getProperty(TOPIC_CONFIG);
|
||||
if (topic == null || topic.isEmpty())
|
||||
|
@ -64,7 +64,7 @@ public class FileStreamSourceConnector extends SourceConnector {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void stop() throws CopycatException {
|
||||
public void stop() {
|
||||
// Nothing to do since FileStreamSourceConnector has no background monitoring.
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,7 +18,6 @@
|
|||
package org.apache.kafka.copycat.file;
|
||||
|
||||
import org.apache.kafka.copycat.errors.CopycatException;
|
||||
import org.apache.kafka.copycat.errors.CopycatRuntimeException;
|
||||
import org.apache.kafka.copycat.source.SourceRecord;
|
||||
import org.apache.kafka.copycat.source.SourceTask;
|
||||
import org.slf4j.Logger;
|
||||
|
@ -63,19 +62,19 @@ public class FileStreamSourceTask extends SourceTask {
|
|||
skipLeft -= skipped;
|
||||
} catch (IOException e) {
|
||||
log.error("Error while trying to seek to previous offset in file: ", e);
|
||||
throw new CopycatRuntimeException(e);
|
||||
throw new CopycatException(e);
|
||||
}
|
||||
}
|
||||
log.debug("Skipped to offset {}", lastRecordedOffset);
|
||||
}
|
||||
streamOffset = (lastRecordedOffset != null) ? lastRecordedOffset : 0L;
|
||||
} catch (FileNotFoundException e) {
|
||||
throw new CopycatRuntimeException("Couldn't find file for FileStreamSourceTask: {}", e);
|
||||
throw new CopycatException("Couldn't find file for FileStreamSourceTask: {}", e);
|
||||
}
|
||||
}
|
||||
topic = props.getProperty(FileStreamSourceConnector.TOPIC_CONFIG);
|
||||
if (topic == null)
|
||||
throw new CopycatRuntimeException("ConsoleSourceTask config missing topic setting");
|
||||
throw new CopycatException("ConsoleSourceTask config missing topic setting");
|
||||
reader = new BufferedReader(new InputStreamReader(stream));
|
||||
}
|
||||
|
||||
|
@ -161,7 +160,7 @@ public class FileStreamSourceTask extends SourceTask {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void stop() throws CopycatException {
|
||||
public void stop() {
|
||||
log.trace("Stopping");
|
||||
synchronized (this) {
|
||||
try {
|
||||
|
|
|
@ -19,7 +19,6 @@ package org.apache.kafka.copycat.file;
|
|||
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.apache.kafka.copycat.connector.ConnectorContext;
|
||||
import org.apache.kafka.copycat.errors.CopycatException;
|
||||
import org.apache.kafka.copycat.sink.SinkConnector;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
@ -57,7 +56,7 @@ public class FileStreamSinkConnectorTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testSinkTasks() throws CopycatException {
|
||||
public void testSinkTasks() {
|
||||
PowerMock.replayAll();
|
||||
|
||||
connector.start(sinkProperties);
|
||||
|
@ -75,7 +74,7 @@ public class FileStreamSinkConnectorTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testTaskClass() throws CopycatException {
|
||||
public void testTaskClass() {
|
||||
PowerMock.replayAll();
|
||||
|
||||
connector.start(sinkProperties);
|
||||
|
|
|
@ -18,7 +18,6 @@
|
|||
package org.apache.kafka.copycat.file;
|
||||
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.apache.kafka.copycat.errors.CopycatException;
|
||||
import org.apache.kafka.copycat.sink.SinkRecord;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
@ -44,7 +43,7 @@ public class FileStreamSinkTaskTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testPutFlush() throws CopycatException {
|
||||
public void testPutFlush() {
|
||||
HashMap<TopicPartition, Long> offsets = new HashMap<>();
|
||||
|
||||
// We do not call task.start() since it would override the output stream
|
||||
|
|
|
@ -51,7 +51,7 @@ public class FileStreamSourceConnectorTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testSourceTasks() throws CopycatException {
|
||||
public void testSourceTasks() {
|
||||
PowerMock.replayAll();
|
||||
|
||||
connector.start(sourceProperties);
|
||||
|
@ -74,7 +74,7 @@ public class FileStreamSourceConnectorTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testSourceTasksStdin() throws CopycatException {
|
||||
public void testSourceTasksStdin() {
|
||||
PowerMock.replayAll();
|
||||
|
||||
sourceProperties.remove(FileStreamSourceConnector.FILE_CONFIG);
|
||||
|
@ -87,13 +87,13 @@ public class FileStreamSourceConnectorTest {
|
|||
}
|
||||
|
||||
@Test(expected = CopycatException.class)
|
||||
public void testMultipleSourcesInvalid() throws CopycatException {
|
||||
public void testMultipleSourcesInvalid() {
|
||||
sourceProperties.setProperty(FileStreamSourceConnector.TOPIC_CONFIG, MULTIPLE_TOPICS);
|
||||
connector.start(sourceProperties);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTaskClass() throws CopycatException {
|
||||
public void testTaskClass() {
|
||||
PowerMock.replayAll();
|
||||
|
||||
connector.start(sourceProperties);
|
||||
|
|
|
@ -18,7 +18,6 @@
|
|||
package org.apache.kafka.copycat.file;
|
||||
|
||||
import org.apache.kafka.copycat.errors.CopycatException;
|
||||
import org.apache.kafka.copycat.errors.CopycatRuntimeException;
|
||||
import org.apache.kafka.copycat.source.SourceRecord;
|
||||
import org.apache.kafka.copycat.source.SourceTaskContext;
|
||||
import org.apache.kafka.copycat.storage.OffsetStorageReader;
|
||||
|
@ -72,7 +71,7 @@ public class FileStreamSourceTaskTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testNormalLifecycle() throws InterruptedException, CopycatException, IOException {
|
||||
public void testNormalLifecycle() throws InterruptedException, IOException {
|
||||
expectOffsetLookupReturnNone();
|
||||
replay();
|
||||
|
||||
|
@ -117,7 +116,7 @@ public class FileStreamSourceTaskTest {
|
|||
task.stop();
|
||||
}
|
||||
|
||||
@Test(expected = CopycatRuntimeException.class)
|
||||
@Test(expected = CopycatException.class)
|
||||
public void testMissingTopic() {
|
||||
expectOffsetLookupReturnNone();
|
||||
replay();
|
||||
|
@ -126,7 +125,7 @@ public class FileStreamSourceTaskTest {
|
|||
task.start(config);
|
||||
}
|
||||
|
||||
@Test(expected = CopycatRuntimeException.class)
|
||||
@Test(expected = CopycatException.class)
|
||||
public void testInvalidFile() {
|
||||
config.setProperty(FileStreamSourceConnector.FILE_CONFIG, "bogusfilename");
|
||||
task.start(config);
|
||||
|
|
|
@ -22,7 +22,7 @@ import com.fasterxml.jackson.databind.node.ArrayNode;
|
|||
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
|
||||
import com.fasterxml.jackson.databind.node.ObjectNode;
|
||||
import org.apache.kafka.copycat.data.*;
|
||||
import org.apache.kafka.copycat.errors.CopycatRuntimeException;
|
||||
import org.apache.kafka.copycat.errors.CopycatException;
|
||||
import org.apache.kafka.copycat.storage.Converter;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -74,7 +74,7 @@ public class JsonConverter implements Converter<JsonNode> {
|
|||
try {
|
||||
return value.binaryValue();
|
||||
} catch (IOException e) {
|
||||
throw new CopycatRuntimeException("Invalid bytes field", e);
|
||||
throw new CopycatException("Invalid bytes field", e);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
@ -89,7 +89,7 @@ public class JsonConverter implements Converter<JsonNode> {
|
|||
public Object convert(JsonNode jsonSchema, JsonNode value) {
|
||||
JsonNode elemSchema = jsonSchema.get(JsonSchema.ARRAY_ITEMS_FIELD_NAME);
|
||||
if (elemSchema == null)
|
||||
throw new CopycatRuntimeException("Array schema did not specify the element type");
|
||||
throw new CopycatException("Array schema did not specify the element type");
|
||||
ArrayList<Object> result = new ArrayList<>();
|
||||
for (JsonNode elem : value) {
|
||||
result.add(convertToCopycat(elemSchema, elem));
|
||||
|
@ -108,7 +108,7 @@ public class JsonConverter implements Converter<JsonNode> {
|
|||
@Override
|
||||
public Object toCopycatData(JsonNode value) {
|
||||
if (!value.isObject() || value.size() != 2 || !value.has(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME) || !value.has(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME))
|
||||
throw new CopycatRuntimeException("JSON value converted to Copycat must be in envelope containing schema");
|
||||
throw new CopycatException("JSON value converted to Copycat must be in envelope containing schema");
|
||||
|
||||
return convertToCopycat(value.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME), value.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME));
|
||||
}
|
||||
|
@ -143,7 +143,7 @@ public class JsonConverter implements Converter<JsonNode> {
|
|||
case MAP:
|
||||
throw new UnsupportedOperationException("map schema not supported");
|
||||
default:
|
||||
throw new CopycatRuntimeException("Couldn't translate unsupported schema type " + schema.getType().getName() + ".");
|
||||
throw new CopycatException("Couldn't translate unsupported schema type " + schema.getType().getName() + ".");
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -154,7 +154,7 @@ public class JsonConverter implements Converter<JsonNode> {
|
|||
|
||||
JsonNode schemaTypeNode = jsonSchema.get(JsonSchema.SCHEMA_TYPE_FIELD_NAME);
|
||||
if (schemaTypeNode == null || !schemaTypeNode.isTextual())
|
||||
throw new CopycatRuntimeException("Schema must contain 'type' field");
|
||||
throw new CopycatException("Schema must contain 'type' field");
|
||||
|
||||
switch (schemaTypeNode.textValue()) {
|
||||
case JsonSchema.BOOLEAN_TYPE_NAME:
|
||||
|
@ -174,10 +174,10 @@ public class JsonConverter implements Converter<JsonNode> {
|
|||
case JsonSchema.ARRAY_TYPE_NAME:
|
||||
JsonNode elemSchema = jsonSchema.get(JsonSchema.ARRAY_ITEMS_FIELD_NAME);
|
||||
if (elemSchema == null)
|
||||
throw new CopycatRuntimeException("Array schema did not specify the element type");
|
||||
throw new CopycatException("Array schema did not specify the element type");
|
||||
return Schema.createArray(asCopycatSchema(elemSchema));
|
||||
default:
|
||||
throw new CopycatRuntimeException("Unknown schema type: " + schemaTypeNode.textValue());
|
||||
throw new CopycatException("Unknown schema type: " + schemaTypeNode.textValue());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -231,7 +231,7 @@ public class JsonConverter implements Converter<JsonNode> {
|
|||
schema.set(JsonSchema.ARRAY_ITEMS_FIELD_NAME, itemSchema);
|
||||
} else {
|
||||
if (!itemSchema.equals(fieldSchemaAndValue.schema))
|
||||
throw new CopycatRuntimeException("Mismatching schemas found in a list.");
|
||||
throw new CopycatException("Mismatching schemas found in a list.");
|
||||
}
|
||||
|
||||
list.add(fieldSchemaAndValue.payload);
|
||||
|
@ -239,7 +239,7 @@ public class JsonConverter implements Converter<JsonNode> {
|
|||
return new JsonSchema.Envelope(schema, list);
|
||||
}
|
||||
|
||||
throw new CopycatRuntimeException("Couldn't convert " + value + " to Avro.");
|
||||
throw new CopycatException("Couldn't convert " + value + " to Avro.");
|
||||
}
|
||||
|
||||
|
||||
|
@ -249,13 +249,13 @@ public class JsonConverter implements Converter<JsonNode> {
|
|||
|
||||
JsonNode schemaTypeNode = jsonSchema.get(JsonSchema.SCHEMA_TYPE_FIELD_NAME);
|
||||
if (schemaTypeNode == null || !schemaTypeNode.isTextual())
|
||||
throw new CopycatRuntimeException("Schema must contain 'type' field. Schema: " + jsonSchema.toString());
|
||||
throw new CopycatException("Schema must contain 'type' field. Schema: " + jsonSchema.toString());
|
||||
|
||||
JsonToCopycatTypeConverter typeConverter = TO_COPYCAT_CONVERTERS.get(schemaTypeNode.textValue());
|
||||
if (typeConverter != null)
|
||||
return typeConverter.convert(jsonSchema, jsonValue);
|
||||
|
||||
throw new CopycatRuntimeException("Unknown schema type: " + schemaTypeNode);
|
||||
throw new CopycatException("Unknown schema type: " + schemaTypeNode);
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -27,7 +27,6 @@ import org.apache.kafka.common.utils.Utils;
|
|||
import org.apache.kafka.copycat.cli.WorkerConfig;
|
||||
import org.apache.kafka.copycat.connector.Task;
|
||||
import org.apache.kafka.copycat.errors.CopycatException;
|
||||
import org.apache.kafka.copycat.errors.CopycatRuntimeException;
|
||||
import org.apache.kafka.copycat.sink.SinkTask;
|
||||
import org.apache.kafka.copycat.source.SourceTask;
|
||||
import org.apache.kafka.copycat.storage.*;
|
||||
|
@ -175,13 +174,12 @@ public class Worker<K, V> {
|
|||
* {@link org.apache.kafka.copycat.sink.SinkTask}.
|
||||
* @param props configuration options for the task
|
||||
*/
|
||||
public void addTask(ConnectorTaskId id, String taskClassName, Properties props)
|
||||
throws CopycatException {
|
||||
public void addTask(ConnectorTaskId id, String taskClassName, Properties props) {
|
||||
if (tasks.containsKey(id)) {
|
||||
String msg = "Task already exists in this worker; the herder should not have requested "
|
||||
+ "that this : " + id;
|
||||
log.error(msg);
|
||||
throw new CopycatRuntimeException(msg);
|
||||
throw new CopycatException(msg);
|
||||
}
|
||||
|
||||
final Task task = instantiateTask(taskClassName);
|
||||
|
@ -213,11 +211,11 @@ public class Worker<K, V> {
|
|||
try {
|
||||
return Utils.newInstance(Class.forName(taskClassName).asSubclass(Task.class));
|
||||
} catch (ClassNotFoundException e) {
|
||||
throw new CopycatRuntimeException("Task class not found", e);
|
||||
throw new CopycatException("Task class not found", e);
|
||||
}
|
||||
}
|
||||
|
||||
public void stopTask(ConnectorTaskId id) throws CopycatException {
|
||||
public void stopTask(ConnectorTaskId id) {
|
||||
WorkerTask task = getTask(id);
|
||||
task.stop();
|
||||
if (!task.awaitStop(config.getLong(WorkerConfig.TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG)))
|
||||
|
@ -230,7 +228,7 @@ public class Worker<K, V> {
|
|||
WorkerTask task = tasks.get(id);
|
||||
if (task == null) {
|
||||
log.error("Task not found: " + id);
|
||||
throw new CopycatRuntimeException("Task not found: " + id);
|
||||
throw new CopycatException("Task not found: " + id);
|
||||
}
|
||||
return task;
|
||||
}
|
||||
|
|
|
@ -23,7 +23,6 @@ import org.apache.kafka.clients.consumer.*;
|
|||
import org.apache.kafka.common.utils.Utils;
|
||||
import org.apache.kafka.copycat.cli.WorkerConfig;
|
||||
import org.apache.kafka.copycat.errors.CopycatException;
|
||||
import org.apache.kafka.copycat.errors.CopycatRuntimeException;
|
||||
import org.apache.kafka.copycat.sink.SinkRecord;
|
||||
import org.apache.kafka.copycat.sink.SinkTask;
|
||||
import org.apache.kafka.copycat.sink.SinkTaskContext;
|
||||
|
@ -72,7 +71,7 @@ class WorkerSinkTask<K, V> implements WorkerTask {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void stop() throws CopycatException {
|
||||
public void stop() {
|
||||
// Offset commit is handled upon exit in work thread
|
||||
task.stop();
|
||||
if (workThread != null)
|
||||
|
@ -156,7 +155,7 @@ class WorkerSinkTask<K, V> implements WorkerTask {
|
|||
private KafkaConsumer<K, V> createConsumer(Properties taskProps) {
|
||||
String topicsStr = taskProps.getProperty(SinkTask.TOPICS_CONFIG);
|
||||
if (topicsStr == null || topicsStr.isEmpty())
|
||||
throw new CopycatRuntimeException("Sink tasks require a list of topics.");
|
||||
throw new CopycatException("Sink tasks require a list of topics.");
|
||||
String[] topics = topicsStr.split(",");
|
||||
|
||||
// Include any unknown worker configs so consumer configs can be set globally on the worker
|
||||
|
@ -176,7 +175,7 @@ class WorkerSinkTask<K, V> implements WorkerTask {
|
|||
try {
|
||||
newConsumer = new KafkaConsumer<>(props);
|
||||
} catch (Throwable t) {
|
||||
throw new CopycatRuntimeException("Failed to create consumer", t);
|
||||
throw new CopycatException("Failed to create consumer", t);
|
||||
}
|
||||
|
||||
log.debug("Task {} subscribing to topics {}", id, topics);
|
||||
|
|
|
@ -23,7 +23,6 @@ import org.apache.kafka.clients.producer.KafkaProducer;
|
|||
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||
import org.apache.kafka.clients.producer.RecordMetadata;
|
||||
import org.apache.kafka.copycat.cli.WorkerConfig;
|
||||
import org.apache.kafka.copycat.errors.CopycatException;
|
||||
import org.apache.kafka.copycat.source.SourceRecord;
|
||||
import org.apache.kafka.copycat.source.SourceTask;
|
||||
import org.apache.kafka.copycat.source.SourceTaskContext;
|
||||
|
@ -98,7 +97,7 @@ class WorkerSourceTask<K, V> implements WorkerTask {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void stop() throws CopycatException {
|
||||
public void stop() {
|
||||
task.stop();
|
||||
commitOffsets();
|
||||
if (workThread != null)
|
||||
|
|
|
@ -17,8 +17,6 @@
|
|||
|
||||
package org.apache.kafka.copycat.runtime;
|
||||
|
||||
import org.apache.kafka.copycat.errors.CopycatException;
|
||||
|
||||
import java.util.Properties;
|
||||
|
||||
/**
|
||||
|
@ -36,10 +34,8 @@ interface WorkerTask {
|
|||
/**
|
||||
* Stop this task from processing messages. This method does not block, it only triggers
|
||||
* shutdown. Use #{@link #awaitStop} to block until completion.
|
||||
*
|
||||
* @throws CopycatException
|
||||
*/
|
||||
void stop() throws CopycatException;
|
||||
void stop();
|
||||
|
||||
/**
|
||||
* Wait for this task to finish stopping.
|
||||
|
|
|
@ -20,7 +20,6 @@ package org.apache.kafka.copycat.runtime.standalone;
|
|||
import org.apache.kafka.common.utils.Utils;
|
||||
import org.apache.kafka.copycat.connector.Connector;
|
||||
import org.apache.kafka.copycat.errors.CopycatException;
|
||||
import org.apache.kafka.copycat.errors.CopycatRuntimeException;
|
||||
import org.apache.kafka.copycat.runtime.ConnectorConfig;
|
||||
import org.apache.kafka.copycat.runtime.Herder;
|
||||
import org.apache.kafka.copycat.runtime.Worker;
|
||||
|
@ -74,7 +73,7 @@ public class StandaloneHerder implements Herder {
|
|||
callback.onCompletion(null, connState.name);
|
||||
// This should always be a new job, create jobs from scratch
|
||||
createConnectorTasks(connState);
|
||||
} catch (CopycatRuntimeException e) {
|
||||
} catch (CopycatException e) {
|
||||
if (callback != null)
|
||||
callback.onCompletion(e, null);
|
||||
}
|
||||
|
@ -86,7 +85,7 @@ public class StandaloneHerder implements Herder {
|
|||
destroyConnector(name);
|
||||
if (callback != null)
|
||||
callback.onCompletion(null, null);
|
||||
} catch (CopycatRuntimeException e) {
|
||||
} catch (CopycatException e) {
|
||||
if (callback != null)
|
||||
callback.onCompletion(e, null);
|
||||
}
|
||||
|
@ -104,7 +103,7 @@ public class StandaloneHerder implements Herder {
|
|||
|
||||
if (connectors.containsKey(connName)) {
|
||||
log.error("Ignoring request to create connector due to conflicting connector name");
|
||||
throw new CopycatRuntimeException("Connector with name " + connName + " already exists");
|
||||
throw new CopycatException("Connector with name " + connName + " already exists");
|
||||
}
|
||||
|
||||
final Connector connector;
|
||||
|
@ -113,13 +112,13 @@ public class StandaloneHerder implements Herder {
|
|||
} catch (Throwable t) {
|
||||
// Catches normal exceptions due to instantiation errors as well as any runtime errors that
|
||||
// may be caused by user code
|
||||
throw new CopycatRuntimeException("Failed to create connector instance", t);
|
||||
throw new CopycatException("Failed to create connector instance", t);
|
||||
}
|
||||
connector.initialize(new StandaloneConnectorContext(this, connName));
|
||||
try {
|
||||
connector.start(configs);
|
||||
} catch (CopycatException e) {
|
||||
throw new CopycatRuntimeException("Connector threw an exception while starting", e);
|
||||
throw new CopycatException("Connector threw an exception while starting", e);
|
||||
}
|
||||
ConnectorState state = new ConnectorState(connName, connector, maxTasks, topics);
|
||||
connectors.put(connName, state);
|
||||
|
@ -133,7 +132,7 @@ public class StandaloneHerder implements Herder {
|
|||
try {
|
||||
return Utils.newInstance(className, Connector.class);
|
||||
} catch (ClassNotFoundException e) {
|
||||
throw new CopycatRuntimeException("Couldn't instantiate connector class", e);
|
||||
throw new CopycatException("Couldn't instantiate connector class", e);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -142,7 +141,7 @@ public class StandaloneHerder implements Herder {
|
|||
ConnectorState state = connectors.get(connName);
|
||||
if (state == null) {
|
||||
log.error("Failed to destroy connector {} because it does not exist", connName);
|
||||
throw new CopycatRuntimeException("Connector does not exist");
|
||||
throw new CopycatException("Connector does not exist");
|
||||
}
|
||||
|
||||
stopConnector(state);
|
||||
|
|
|
@ -17,7 +17,7 @@
|
|||
|
||||
package org.apache.kafka.copycat.storage;
|
||||
|
||||
import org.apache.kafka.copycat.errors.CopycatRuntimeException;
|
||||
import org.apache.kafka.copycat.errors.CopycatException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -67,7 +67,7 @@ public class FileOffsetBackingStore extends MemoryOffsetBackingStore {
|
|||
ObjectInputStream is = new ObjectInputStream(new FileInputStream(file));
|
||||
Object obj = is.readObject();
|
||||
if (!(obj instanceof HashMap))
|
||||
throw new CopycatRuntimeException("Expected HashMap but found " + obj.getClass());
|
||||
throw new CopycatException("Expected HashMap but found " + obj.getClass());
|
||||
HashMap<String, Map<byte[], byte[]>> raw = (HashMap<String, Map<byte[], byte[]>>) obj;
|
||||
data = new HashMap<>();
|
||||
for (Map.Entry<String, Map<byte[], byte[]>> entry : raw.entrySet()) {
|
||||
|
@ -85,7 +85,7 @@ public class FileOffsetBackingStore extends MemoryOffsetBackingStore {
|
|||
// FileNotFoundException: Ignore, may be new.
|
||||
// EOFException: Ignore, this means the file was missing or corrupt
|
||||
} catch (IOException | ClassNotFoundException e) {
|
||||
throw new CopycatRuntimeException(e);
|
||||
throw new CopycatException(e);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -105,7 +105,7 @@ public class FileOffsetBackingStore extends MemoryOffsetBackingStore {
|
|||
os.writeObject(raw);
|
||||
os.close();
|
||||
} catch (IOException e) {
|
||||
throw new CopycatRuntimeException(e);
|
||||
throw new CopycatException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,7 +19,7 @@ package org.apache.kafka.copycat.storage;
|
|||
|
||||
import org.apache.kafka.common.serialization.Deserializer;
|
||||
import org.apache.kafka.common.serialization.Serializer;
|
||||
import org.apache.kafka.copycat.errors.CopycatRuntimeException;
|
||||
import org.apache.kafka.copycat.errors.CopycatException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -82,7 +82,7 @@ public class OffsetStorageReaderImpl<K, V> implements OffsetStorageReader {
|
|||
raw = backingStore.get(namespace, serializedToOriginal.keySet(), null).get();
|
||||
} catch (Exception e) {
|
||||
log.error("Failed to fetch offsets from namespace {}: ", namespace, e);
|
||||
throw new CopycatRuntimeException("Failed to fetch offsets.", e);
|
||||
throw new CopycatException("Failed to fetch offsets.", e);
|
||||
}
|
||||
|
||||
// Deserialize all the values and map back to the original keys
|
||||
|
|
|
@ -18,7 +18,7 @@
|
|||
package org.apache.kafka.copycat.storage;
|
||||
|
||||
import org.apache.kafka.common.serialization.Serializer;
|
||||
import org.apache.kafka.copycat.errors.CopycatRuntimeException;
|
||||
import org.apache.kafka.copycat.errors.CopycatException;
|
||||
import org.apache.kafka.copycat.util.Callback;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -114,7 +114,7 @@ public class OffsetStorageWriter<K, V> {
|
|||
if (flushing()) {
|
||||
log.error("Invalid call to OffsetStorageWriter flush() while already flushing, the "
|
||||
+ "framework should not allow this");
|
||||
throw new CopycatRuntimeException("OffsetStorageWriter is already flushing");
|
||||
throw new CopycatException("OffsetStorageWriter is already flushing");
|
||||
}
|
||||
|
||||
if (data.isEmpty())
|
||||
|
|
|
@ -23,7 +23,6 @@ import org.apache.kafka.common.utils.Time;
|
|||
import org.apache.kafka.clients.producer.KafkaProducer;
|
||||
import org.apache.kafka.copycat.cli.WorkerConfig;
|
||||
import org.apache.kafka.copycat.errors.CopycatException;
|
||||
import org.apache.kafka.copycat.errors.CopycatRuntimeException;
|
||||
import org.apache.kafka.copycat.source.SourceRecord;
|
||||
import org.apache.kafka.copycat.source.SourceTask;
|
||||
import org.apache.kafka.copycat.storage.*;
|
||||
|
@ -115,8 +114,8 @@ public class WorkerTest extends ThreadedTest {
|
|||
PowerMock.verifyAll();
|
||||
}
|
||||
|
||||
@Test(expected = CopycatRuntimeException.class)
|
||||
public void testStopInvalidTask() throws CopycatException {
|
||||
@Test(expected = CopycatException.class)
|
||||
public void testStopInvalidTask() {
|
||||
worker.stopTask(taskId);
|
||||
}
|
||||
|
||||
|
@ -174,7 +173,7 @@ public class WorkerTest extends ThreadedTest {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void stop() throws CopycatException {
|
||||
public void stop() {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,7 +19,6 @@ package org.apache.kafka.copycat.runtime.standalone;
|
|||
|
||||
import org.apache.kafka.copycat.connector.Connector;
|
||||
import org.apache.kafka.copycat.connector.Task;
|
||||
import org.apache.kafka.copycat.errors.CopycatException;
|
||||
import org.apache.kafka.copycat.runtime.ConnectorConfig;
|
||||
import org.apache.kafka.copycat.runtime.Worker;
|
||||
import org.apache.kafka.copycat.sink.SinkConnector;
|
||||
|
@ -160,14 +159,14 @@ public class StandaloneHerderTest {
|
|||
PowerMock.expectLastCall();
|
||||
}
|
||||
|
||||
private void expectStop() throws CopycatException {
|
||||
private void expectStop() {
|
||||
worker.stopTask(new ConnectorTaskId(CONNECTOR_NAME, 0));
|
||||
EasyMock.expectLastCall();
|
||||
connector.stop();
|
||||
EasyMock.expectLastCall();
|
||||
}
|
||||
|
||||
private void expectDestroy() throws CopycatException {
|
||||
private void expectDestroy() {
|
||||
expectStop();
|
||||
}
|
||||
|
||||
|
|
|
@ -18,7 +18,7 @@
|
|||
package org.apache.kafka.copycat.storage;
|
||||
|
||||
import org.apache.kafka.common.serialization.Serializer;
|
||||
import org.apache.kafka.copycat.errors.CopycatRuntimeException;
|
||||
import org.apache.kafka.copycat.errors.CopycatException;
|
||||
import org.apache.kafka.copycat.util.Callback;
|
||||
import org.easymock.Capture;
|
||||
import org.easymock.EasyMock;
|
||||
|
@ -133,7 +133,7 @@ public class OffsetStorageWriterTest {
|
|||
PowerMock.verifyAll();
|
||||
}
|
||||
|
||||
@Test(expected = CopycatRuntimeException.class)
|
||||
@Test(expected = CopycatException.class)
|
||||
public void testAlreadyFlushing() throws Exception {
|
||||
@SuppressWarnings("unchecked")
|
||||
final Callback<Void> callback = PowerMock.createMock(Callback.class);
|
||||
|
|
Loading…
Reference in New Issue