Simplify Copycat exceptions, make them a subclass of KafkaException.

This commit is contained in:
Ewen Cheslack-Postava 2015-08-14 14:30:25 -07:00
parent 8c108b0cac
commit a3a47a6e8f
26 changed files with 74 additions and 135 deletions

View File

@ -18,7 +18,6 @@
package org.apache.kafka.copycat.connector;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.copycat.errors.CopycatException;
import java.util.List;
import java.util.Properties;
@ -81,9 +80,8 @@ public abstract class Connector {
* either just been instantiated and initialized or {@link #stop()} has been invoked.
*
* @param props configuration settings
* @throws CopycatException
*/
public abstract void start(Properties props) throws CopycatException;
public abstract void start(Properties props);
/**
* Reconfigure this Connector. Most implementations will not override this, using the default
@ -92,9 +90,8 @@ public abstract class Connector {
* efficiently, e.g. without shutting down network connections to the external system.
*
* @param props new configuration settings
* @throws CopycatException
*/
public void reconfigure(Properties props) throws CopycatException {
public void reconfigure(Properties props) {
stop();
start(props);
}
@ -115,8 +112,6 @@ public abstract class Connector {
/**
* Stop this connector.
*
* @throws CopycatException
*/
public abstract void stop() throws CopycatException;
public abstract void stop();
}

View File

@ -18,7 +18,6 @@
package org.apache.kafka.copycat.connector;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.copycat.errors.CopycatException;
import java.util.Properties;
@ -45,8 +44,6 @@ public interface Task {
/**
* Stop this task.
*
* @throws CopycatException
*/
void stop() throws CopycatException;
void stop();
}

View File

@ -17,13 +17,14 @@
package org.apache.kafka.copycat.errors;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.annotation.InterfaceStability;
/**
* CopycatException is the top-level exception type generated by Copycat and connectors.
*/
@InterfaceStability.Unstable
public class CopycatException extends Exception {
public class CopycatException extends KafkaException {
public CopycatException(String s) {
super(s);

View File

@ -1,35 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.copycat.errors;
import org.apache.kafka.common.annotation.InterfaceStability;
@InterfaceStability.Unstable
public class CopycatRuntimeException extends RuntimeException {
public CopycatRuntimeException(String s) {
super(s);
}
public CopycatRuntimeException(String s, Throwable throwable) {
super(s, throwable);
}
public CopycatRuntimeException(Throwable throwable) {
super(throwable);
}
}

View File

@ -19,7 +19,6 @@ package org.apache.kafka.copycat.sink;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.copycat.connector.Task;
import org.apache.kafka.copycat.errors.CopycatException;
import java.util.Collection;
import java.util.Map;
@ -52,7 +51,7 @@ public abstract class SinkTask implements Task {
*
* @param records the set of records to send
*/
public abstract void put(Collection<SinkRecord> records) throws CopycatException;
public abstract void put(Collection<SinkRecord> records);
/**
* Flush all records that have been {@link #put} for the specified topic-partitions. The

View File

@ -52,7 +52,7 @@ public class ConnectorReconfigurationTest {
}
@Override
public void start(Properties props) throws CopycatException {
public void start(Properties props) {
configureOrder = order++;
}
@ -67,7 +67,7 @@ public class ConnectorReconfigurationTest {
}
@Override
public void stop() throws CopycatException {
public void stop() {
stopOrder = order++;
if (stopException)
throw new CopycatException("error");

View File

@ -18,7 +18,6 @@
package org.apache.kafka.copycat.file;
import org.apache.kafka.copycat.connector.Task;
import org.apache.kafka.copycat.errors.CopycatException;
import org.apache.kafka.copycat.sink.SinkConnector;
import java.util.ArrayList;
@ -35,7 +34,7 @@ public class FileStreamSinkConnector extends SinkConnector {
private String filename;
@Override
public void start(Properties props) throws CopycatException {
public void start(Properties props) {
filename = props.getProperty(FILE_CONFIG);
}
@ -57,7 +56,7 @@ public class FileStreamSinkConnector extends SinkConnector {
}
@Override
public void stop() throws CopycatException {
public void stop() {
// Nothing to do since FileStreamSinkConnector has no background monitoring.
}
}

View File

@ -19,7 +19,6 @@ package org.apache.kafka.copycat.file;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.copycat.errors.CopycatException;
import org.apache.kafka.copycat.errors.CopycatRuntimeException;
import org.apache.kafka.copycat.sink.SinkRecord;
import org.apache.kafka.copycat.sink.SinkTask;
import org.slf4j.Logger;
@ -57,14 +56,13 @@ public class FileStreamSinkTask extends SinkTask {
try {
outputStream = new PrintStream(new File(filename));
} catch (FileNotFoundException e) {
throw new CopycatRuntimeException(
"Couldn't find or create file for FileStreamSinkTask: {}", e);
throw new CopycatException("Couldn't find or create file for FileStreamSinkTask: {}", e);
}
}
}
@Override
public void put(Collection<SinkRecord> sinkRecords) throws CopycatException {
public void put(Collection<SinkRecord> sinkRecords) {
for (SinkRecord record : sinkRecords) {
outputStream.println(record.getValue());
}
@ -76,6 +74,6 @@ public class FileStreamSinkTask extends SinkTask {
}
@Override
public void stop() throws CopycatException {
public void stop() {
}
}

View File

@ -37,7 +37,7 @@ public class FileStreamSourceConnector extends SourceConnector {
private String topic;
@Override
public void start(Properties props) throws CopycatException {
public void start(Properties props) {
filename = props.getProperty(FILE_CONFIG);
topic = props.getProperty(TOPIC_CONFIG);
if (topic == null || topic.isEmpty())
@ -64,7 +64,7 @@ public class FileStreamSourceConnector extends SourceConnector {
}
@Override
public void stop() throws CopycatException {
public void stop() {
// Nothing to do since FileStreamSourceConnector has no background monitoring.
}
}

View File

@ -18,7 +18,6 @@
package org.apache.kafka.copycat.file;
import org.apache.kafka.copycat.errors.CopycatException;
import org.apache.kafka.copycat.errors.CopycatRuntimeException;
import org.apache.kafka.copycat.source.SourceRecord;
import org.apache.kafka.copycat.source.SourceTask;
import org.slf4j.Logger;
@ -63,19 +62,19 @@ public class FileStreamSourceTask extends SourceTask {
skipLeft -= skipped;
} catch (IOException e) {
log.error("Error while trying to seek to previous offset in file: ", e);
throw new CopycatRuntimeException(e);
throw new CopycatException(e);
}
}
log.debug("Skipped to offset {}", lastRecordedOffset);
}
streamOffset = (lastRecordedOffset != null) ? lastRecordedOffset : 0L;
} catch (FileNotFoundException e) {
throw new CopycatRuntimeException("Couldn't find file for FileStreamSourceTask: {}", e);
throw new CopycatException("Couldn't find file for FileStreamSourceTask: {}", e);
}
}
topic = props.getProperty(FileStreamSourceConnector.TOPIC_CONFIG);
if (topic == null)
throw new CopycatRuntimeException("ConsoleSourceTask config missing topic setting");
throw new CopycatException("ConsoleSourceTask config missing topic setting");
reader = new BufferedReader(new InputStreamReader(stream));
}
@ -161,7 +160,7 @@ public class FileStreamSourceTask extends SourceTask {
}
@Override
public void stop() throws CopycatException {
public void stop() {
log.trace("Stopping");
synchronized (this) {
try {

View File

@ -19,7 +19,6 @@ package org.apache.kafka.copycat.file;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.copycat.connector.ConnectorContext;
import org.apache.kafka.copycat.errors.CopycatException;
import org.apache.kafka.copycat.sink.SinkConnector;
import org.junit.Before;
import org.junit.Test;
@ -57,7 +56,7 @@ public class FileStreamSinkConnectorTest {
}
@Test
public void testSinkTasks() throws CopycatException {
public void testSinkTasks() {
PowerMock.replayAll();
connector.start(sinkProperties);
@ -75,7 +74,7 @@ public class FileStreamSinkConnectorTest {
}
@Test
public void testTaskClass() throws CopycatException {
public void testTaskClass() {
PowerMock.replayAll();
connector.start(sinkProperties);

View File

@ -18,7 +18,6 @@
package org.apache.kafka.copycat.file;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.copycat.errors.CopycatException;
import org.apache.kafka.copycat.sink.SinkRecord;
import org.junit.Before;
import org.junit.Test;
@ -44,7 +43,7 @@ public class FileStreamSinkTaskTest {
}
@Test
public void testPutFlush() throws CopycatException {
public void testPutFlush() {
HashMap<TopicPartition, Long> offsets = new HashMap<>();
// We do not call task.start() since it would override the output stream

View File

@ -51,7 +51,7 @@ public class FileStreamSourceConnectorTest {
}
@Test
public void testSourceTasks() throws CopycatException {
public void testSourceTasks() {
PowerMock.replayAll();
connector.start(sourceProperties);
@ -74,7 +74,7 @@ public class FileStreamSourceConnectorTest {
}
@Test
public void testSourceTasksStdin() throws CopycatException {
public void testSourceTasksStdin() {
PowerMock.replayAll();
sourceProperties.remove(FileStreamSourceConnector.FILE_CONFIG);
@ -87,13 +87,13 @@ public class FileStreamSourceConnectorTest {
}
@Test(expected = CopycatException.class)
public void testMultipleSourcesInvalid() throws CopycatException {
public void testMultipleSourcesInvalid() {
sourceProperties.setProperty(FileStreamSourceConnector.TOPIC_CONFIG, MULTIPLE_TOPICS);
connector.start(sourceProperties);
}
@Test
public void testTaskClass() throws CopycatException {
public void testTaskClass() {
PowerMock.replayAll();
connector.start(sourceProperties);

View File

@ -18,7 +18,6 @@
package org.apache.kafka.copycat.file;
import org.apache.kafka.copycat.errors.CopycatException;
import org.apache.kafka.copycat.errors.CopycatRuntimeException;
import org.apache.kafka.copycat.source.SourceRecord;
import org.apache.kafka.copycat.source.SourceTaskContext;
import org.apache.kafka.copycat.storage.OffsetStorageReader;
@ -72,7 +71,7 @@ public class FileStreamSourceTaskTest {
}
@Test
public void testNormalLifecycle() throws InterruptedException, CopycatException, IOException {
public void testNormalLifecycle() throws InterruptedException, IOException {
expectOffsetLookupReturnNone();
replay();
@ -117,7 +116,7 @@ public class FileStreamSourceTaskTest {
task.stop();
}
@Test(expected = CopycatRuntimeException.class)
@Test(expected = CopycatException.class)
public void testMissingTopic() {
expectOffsetLookupReturnNone();
replay();
@ -126,7 +125,7 @@ public class FileStreamSourceTaskTest {
task.start(config);
}
@Test(expected = CopycatRuntimeException.class)
@Test(expected = CopycatException.class)
public void testInvalidFile() {
config.setProperty(FileStreamSourceConnector.FILE_CONFIG, "bogusfilename");
task.start(config);

View File

@ -22,7 +22,7 @@ import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.kafka.copycat.data.*;
import org.apache.kafka.copycat.errors.CopycatRuntimeException;
import org.apache.kafka.copycat.errors.CopycatException;
import org.apache.kafka.copycat.storage.Converter;
import java.io.IOException;
@ -74,7 +74,7 @@ public class JsonConverter implements Converter<JsonNode> {
try {
return value.binaryValue();
} catch (IOException e) {
throw new CopycatRuntimeException("Invalid bytes field", e);
throw new CopycatException("Invalid bytes field", e);
}
}
});
@ -89,7 +89,7 @@ public class JsonConverter implements Converter<JsonNode> {
public Object convert(JsonNode jsonSchema, JsonNode value) {
JsonNode elemSchema = jsonSchema.get(JsonSchema.ARRAY_ITEMS_FIELD_NAME);
if (elemSchema == null)
throw new CopycatRuntimeException("Array schema did not specify the element type");
throw new CopycatException("Array schema did not specify the element type");
ArrayList<Object> result = new ArrayList<>();
for (JsonNode elem : value) {
result.add(convertToCopycat(elemSchema, elem));
@ -108,7 +108,7 @@ public class JsonConverter implements Converter<JsonNode> {
@Override
public Object toCopycatData(JsonNode value) {
if (!value.isObject() || value.size() != 2 || !value.has(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME) || !value.has(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME))
throw new CopycatRuntimeException("JSON value converted to Copycat must be in envelope containing schema");
throw new CopycatException("JSON value converted to Copycat must be in envelope containing schema");
return convertToCopycat(value.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME), value.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME));
}
@ -143,7 +143,7 @@ public class JsonConverter implements Converter<JsonNode> {
case MAP:
throw new UnsupportedOperationException("map schema not supported");
default:
throw new CopycatRuntimeException("Couldn't translate unsupported schema type " + schema.getType().getName() + ".");
throw new CopycatException("Couldn't translate unsupported schema type " + schema.getType().getName() + ".");
}
}
@ -154,7 +154,7 @@ public class JsonConverter implements Converter<JsonNode> {
JsonNode schemaTypeNode = jsonSchema.get(JsonSchema.SCHEMA_TYPE_FIELD_NAME);
if (schemaTypeNode == null || !schemaTypeNode.isTextual())
throw new CopycatRuntimeException("Schema must contain 'type' field");
throw new CopycatException("Schema must contain 'type' field");
switch (schemaTypeNode.textValue()) {
case JsonSchema.BOOLEAN_TYPE_NAME:
@ -174,10 +174,10 @@ public class JsonConverter implements Converter<JsonNode> {
case JsonSchema.ARRAY_TYPE_NAME:
JsonNode elemSchema = jsonSchema.get(JsonSchema.ARRAY_ITEMS_FIELD_NAME);
if (elemSchema == null)
throw new CopycatRuntimeException("Array schema did not specify the element type");
throw new CopycatException("Array schema did not specify the element type");
return Schema.createArray(asCopycatSchema(elemSchema));
default:
throw new CopycatRuntimeException("Unknown schema type: " + schemaTypeNode.textValue());
throw new CopycatException("Unknown schema type: " + schemaTypeNode.textValue());
}
}
@ -231,7 +231,7 @@ public class JsonConverter implements Converter<JsonNode> {
schema.set(JsonSchema.ARRAY_ITEMS_FIELD_NAME, itemSchema);
} else {
if (!itemSchema.equals(fieldSchemaAndValue.schema))
throw new CopycatRuntimeException("Mismatching schemas found in a list.");
throw new CopycatException("Mismatching schemas found in a list.");
}
list.add(fieldSchemaAndValue.payload);
@ -239,7 +239,7 @@ public class JsonConverter implements Converter<JsonNode> {
return new JsonSchema.Envelope(schema, list);
}
throw new CopycatRuntimeException("Couldn't convert " + value + " to Avro.");
throw new CopycatException("Couldn't convert " + value + " to Avro.");
}
@ -249,13 +249,13 @@ public class JsonConverter implements Converter<JsonNode> {
JsonNode schemaTypeNode = jsonSchema.get(JsonSchema.SCHEMA_TYPE_FIELD_NAME);
if (schemaTypeNode == null || !schemaTypeNode.isTextual())
throw new CopycatRuntimeException("Schema must contain 'type' field. Schema: " + jsonSchema.toString());
throw new CopycatException("Schema must contain 'type' field. Schema: " + jsonSchema.toString());
JsonToCopycatTypeConverter typeConverter = TO_COPYCAT_CONVERTERS.get(schemaTypeNode.textValue());
if (typeConverter != null)
return typeConverter.convert(jsonSchema, jsonValue);
throw new CopycatRuntimeException("Unknown schema type: " + schemaTypeNode);
throw new CopycatException("Unknown schema type: " + schemaTypeNode);
}

View File

@ -27,7 +27,6 @@ import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.copycat.cli.WorkerConfig;
import org.apache.kafka.copycat.connector.Task;
import org.apache.kafka.copycat.errors.CopycatException;
import org.apache.kafka.copycat.errors.CopycatRuntimeException;
import org.apache.kafka.copycat.sink.SinkTask;
import org.apache.kafka.copycat.source.SourceTask;
import org.apache.kafka.copycat.storage.*;
@ -175,13 +174,12 @@ public class Worker<K, V> {
* {@link org.apache.kafka.copycat.sink.SinkTask}.
* @param props configuration options for the task
*/
public void addTask(ConnectorTaskId id, String taskClassName, Properties props)
throws CopycatException {
public void addTask(ConnectorTaskId id, String taskClassName, Properties props) {
if (tasks.containsKey(id)) {
String msg = "Task already exists in this worker; the herder should not have requested "
+ "that this : " + id;
log.error(msg);
throw new CopycatRuntimeException(msg);
throw new CopycatException(msg);
}
final Task task = instantiateTask(taskClassName);
@ -213,11 +211,11 @@ public class Worker<K, V> {
try {
return Utils.newInstance(Class.forName(taskClassName).asSubclass(Task.class));
} catch (ClassNotFoundException e) {
throw new CopycatRuntimeException("Task class not found", e);
throw new CopycatException("Task class not found", e);
}
}
public void stopTask(ConnectorTaskId id) throws CopycatException {
public void stopTask(ConnectorTaskId id) {
WorkerTask task = getTask(id);
task.stop();
if (!task.awaitStop(config.getLong(WorkerConfig.TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG)))
@ -230,7 +228,7 @@ public class Worker<K, V> {
WorkerTask task = tasks.get(id);
if (task == null) {
log.error("Task not found: " + id);
throw new CopycatRuntimeException("Task not found: " + id);
throw new CopycatException("Task not found: " + id);
}
return task;
}

View File

@ -23,7 +23,6 @@ import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.copycat.cli.WorkerConfig;
import org.apache.kafka.copycat.errors.CopycatException;
import org.apache.kafka.copycat.errors.CopycatRuntimeException;
import org.apache.kafka.copycat.sink.SinkRecord;
import org.apache.kafka.copycat.sink.SinkTask;
import org.apache.kafka.copycat.sink.SinkTaskContext;
@ -72,7 +71,7 @@ class WorkerSinkTask<K, V> implements WorkerTask {
}
@Override
public void stop() throws CopycatException {
public void stop() {
// Offset commit is handled upon exit in work thread
task.stop();
if (workThread != null)
@ -156,7 +155,7 @@ class WorkerSinkTask<K, V> implements WorkerTask {
private KafkaConsumer<K, V> createConsumer(Properties taskProps) {
String topicsStr = taskProps.getProperty(SinkTask.TOPICS_CONFIG);
if (topicsStr == null || topicsStr.isEmpty())
throw new CopycatRuntimeException("Sink tasks require a list of topics.");
throw new CopycatException("Sink tasks require a list of topics.");
String[] topics = topicsStr.split(",");
// Include any unknown worker configs so consumer configs can be set globally on the worker
@ -176,7 +175,7 @@ class WorkerSinkTask<K, V> implements WorkerTask {
try {
newConsumer = new KafkaConsumer<>(props);
} catch (Throwable t) {
throw new CopycatRuntimeException("Failed to create consumer", t);
throw new CopycatException("Failed to create consumer", t);
}
log.debug("Task {} subscribing to topics {}", id, topics);

View File

@ -23,7 +23,6 @@ import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.copycat.cli.WorkerConfig;
import org.apache.kafka.copycat.errors.CopycatException;
import org.apache.kafka.copycat.source.SourceRecord;
import org.apache.kafka.copycat.source.SourceTask;
import org.apache.kafka.copycat.source.SourceTaskContext;
@ -98,7 +97,7 @@ class WorkerSourceTask<K, V> implements WorkerTask {
}
@Override
public void stop() throws CopycatException {
public void stop() {
task.stop();
commitOffsets();
if (workThread != null)

View File

@ -17,8 +17,6 @@
package org.apache.kafka.copycat.runtime;
import org.apache.kafka.copycat.errors.CopycatException;
import java.util.Properties;
/**
@ -36,10 +34,8 @@ interface WorkerTask {
/**
* Stop this task from processing messages. This method does not block, it only triggers
* shutdown. Use #{@link #awaitStop} to block until completion.
*
* @throws CopycatException
*/
void stop() throws CopycatException;
void stop();
/**
* Wait for this task to finish stopping.

View File

@ -20,7 +20,6 @@ package org.apache.kafka.copycat.runtime.standalone;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.copycat.connector.Connector;
import org.apache.kafka.copycat.errors.CopycatException;
import org.apache.kafka.copycat.errors.CopycatRuntimeException;
import org.apache.kafka.copycat.runtime.ConnectorConfig;
import org.apache.kafka.copycat.runtime.Herder;
import org.apache.kafka.copycat.runtime.Worker;
@ -74,7 +73,7 @@ public class StandaloneHerder implements Herder {
callback.onCompletion(null, connState.name);
// This should always be a new job, create jobs from scratch
createConnectorTasks(connState);
} catch (CopycatRuntimeException e) {
} catch (CopycatException e) {
if (callback != null)
callback.onCompletion(e, null);
}
@ -86,7 +85,7 @@ public class StandaloneHerder implements Herder {
destroyConnector(name);
if (callback != null)
callback.onCompletion(null, null);
} catch (CopycatRuntimeException e) {
} catch (CopycatException e) {
if (callback != null)
callback.onCompletion(e, null);
}
@ -104,7 +103,7 @@ public class StandaloneHerder implements Herder {
if (connectors.containsKey(connName)) {
log.error("Ignoring request to create connector due to conflicting connector name");
throw new CopycatRuntimeException("Connector with name " + connName + " already exists");
throw new CopycatException("Connector with name " + connName + " already exists");
}
final Connector connector;
@ -113,13 +112,13 @@ public class StandaloneHerder implements Herder {
} catch (Throwable t) {
// Catches normal exceptions due to instantiation errors as well as any runtime errors that
// may be caused by user code
throw new CopycatRuntimeException("Failed to create connector instance", t);
throw new CopycatException("Failed to create connector instance", t);
}
connector.initialize(new StandaloneConnectorContext(this, connName));
try {
connector.start(configs);
} catch (CopycatException e) {
throw new CopycatRuntimeException("Connector threw an exception while starting", e);
throw new CopycatException("Connector threw an exception while starting", e);
}
ConnectorState state = new ConnectorState(connName, connector, maxTasks, topics);
connectors.put(connName, state);
@ -133,7 +132,7 @@ public class StandaloneHerder implements Herder {
try {
return Utils.newInstance(className, Connector.class);
} catch (ClassNotFoundException e) {
throw new CopycatRuntimeException("Couldn't instantiate connector class", e);
throw new CopycatException("Couldn't instantiate connector class", e);
}
}
@ -142,7 +141,7 @@ public class StandaloneHerder implements Herder {
ConnectorState state = connectors.get(connName);
if (state == null) {
log.error("Failed to destroy connector {} because it does not exist", connName);
throw new CopycatRuntimeException("Connector does not exist");
throw new CopycatException("Connector does not exist");
}
stopConnector(state);

View File

@ -17,7 +17,7 @@
package org.apache.kafka.copycat.storage;
import org.apache.kafka.copycat.errors.CopycatRuntimeException;
import org.apache.kafka.copycat.errors.CopycatException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -67,7 +67,7 @@ public class FileOffsetBackingStore extends MemoryOffsetBackingStore {
ObjectInputStream is = new ObjectInputStream(new FileInputStream(file));
Object obj = is.readObject();
if (!(obj instanceof HashMap))
throw new CopycatRuntimeException("Expected HashMap but found " + obj.getClass());
throw new CopycatException("Expected HashMap but found " + obj.getClass());
HashMap<String, Map<byte[], byte[]>> raw = (HashMap<String, Map<byte[], byte[]>>) obj;
data = new HashMap<>();
for (Map.Entry<String, Map<byte[], byte[]>> entry : raw.entrySet()) {
@ -85,7 +85,7 @@ public class FileOffsetBackingStore extends MemoryOffsetBackingStore {
// FileNotFoundException: Ignore, may be new.
// EOFException: Ignore, this means the file was missing or corrupt
} catch (IOException | ClassNotFoundException e) {
throw new CopycatRuntimeException(e);
throw new CopycatException(e);
}
}
@ -105,7 +105,7 @@ public class FileOffsetBackingStore extends MemoryOffsetBackingStore {
os.writeObject(raw);
os.close();
} catch (IOException e) {
throw new CopycatRuntimeException(e);
throw new CopycatException(e);
}
}
}

View File

@ -19,7 +19,7 @@ package org.apache.kafka.copycat.storage;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.copycat.errors.CopycatRuntimeException;
import org.apache.kafka.copycat.errors.CopycatException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -82,7 +82,7 @@ public class OffsetStorageReaderImpl<K, V> implements OffsetStorageReader {
raw = backingStore.get(namespace, serializedToOriginal.keySet(), null).get();
} catch (Exception e) {
log.error("Failed to fetch offsets from namespace {}: ", namespace, e);
throw new CopycatRuntimeException("Failed to fetch offsets.", e);
throw new CopycatException("Failed to fetch offsets.", e);
}
// Deserialize all the values and map back to the original keys

View File

@ -18,7 +18,7 @@
package org.apache.kafka.copycat.storage;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.copycat.errors.CopycatRuntimeException;
import org.apache.kafka.copycat.errors.CopycatException;
import org.apache.kafka.copycat.util.Callback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -114,7 +114,7 @@ public class OffsetStorageWriter<K, V> {
if (flushing()) {
log.error("Invalid call to OffsetStorageWriter flush() while already flushing, the "
+ "framework should not allow this");
throw new CopycatRuntimeException("OffsetStorageWriter is already flushing");
throw new CopycatException("OffsetStorageWriter is already flushing");
}
if (data.isEmpty())

View File

@ -23,7 +23,6 @@ import org.apache.kafka.common.utils.Time;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.copycat.cli.WorkerConfig;
import org.apache.kafka.copycat.errors.CopycatException;
import org.apache.kafka.copycat.errors.CopycatRuntimeException;
import org.apache.kafka.copycat.source.SourceRecord;
import org.apache.kafka.copycat.source.SourceTask;
import org.apache.kafka.copycat.storage.*;
@ -115,8 +114,8 @@ public class WorkerTest extends ThreadedTest {
PowerMock.verifyAll();
}
@Test(expected = CopycatRuntimeException.class)
public void testStopInvalidTask() throws CopycatException {
@Test(expected = CopycatException.class)
public void testStopInvalidTask() {
worker.stopTask(taskId);
}
@ -174,7 +173,7 @@ public class WorkerTest extends ThreadedTest {
}
@Override
public void stop() throws CopycatException {
public void stop() {
}
}
}

View File

@ -19,7 +19,6 @@ package org.apache.kafka.copycat.runtime.standalone;
import org.apache.kafka.copycat.connector.Connector;
import org.apache.kafka.copycat.connector.Task;
import org.apache.kafka.copycat.errors.CopycatException;
import org.apache.kafka.copycat.runtime.ConnectorConfig;
import org.apache.kafka.copycat.runtime.Worker;
import org.apache.kafka.copycat.sink.SinkConnector;
@ -160,14 +159,14 @@ public class StandaloneHerderTest {
PowerMock.expectLastCall();
}
private void expectStop() throws CopycatException {
private void expectStop() {
worker.stopTask(new ConnectorTaskId(CONNECTOR_NAME, 0));
EasyMock.expectLastCall();
connector.stop();
EasyMock.expectLastCall();
}
private void expectDestroy() throws CopycatException {
private void expectDestroy() {
expectStop();
}

View File

@ -18,7 +18,7 @@
package org.apache.kafka.copycat.storage;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.copycat.errors.CopycatRuntimeException;
import org.apache.kafka.copycat.errors.CopycatException;
import org.apache.kafka.copycat.util.Callback;
import org.easymock.Capture;
import org.easymock.EasyMock;
@ -133,7 +133,7 @@ public class OffsetStorageWriterTest {
PowerMock.verifyAll();
}
@Test(expected = CopycatRuntimeException.class)
@Test(expected = CopycatException.class)
public void testAlreadyFlushing() throws Exception {
@SuppressWarnings("unchecked")
final Callback<Void> callback = PowerMock.createMock(Callback.class);