From a3a47a6e8f05891c333d0cabd08c75a8c1ed7179 Mon Sep 17 00:00:00 2001 From: Ewen Cheslack-Postava Date: Fri, 14 Aug 2015 14:30:25 -0700 Subject: [PATCH] Simplify Copycat exceptions, make them a subclass of KafkaException. --- .../kafka/copycat/connector/Connector.java | 11 ++---- .../apache/kafka/copycat/connector/Task.java | 5 +-- .../copycat/errors/CopycatException.java | 3 +- .../errors/CopycatRuntimeException.java | 35 ------------------- .../apache/kafka/copycat/sink/SinkTask.java | 3 +- .../ConnectorReconfigurationTest.java | 4 +-- .../copycat/file/FileStreamSinkConnector.java | 5 ++- .../copycat/file/FileStreamSinkTask.java | 8 ++--- .../file/FileStreamSourceConnector.java | 4 +-- .../copycat/file/FileStreamSourceTask.java | 9 +++-- .../file/FileStreamSinkConnectorTest.java | 5 ++- .../copycat/file/FileStreamSinkTaskTest.java | 3 +- .../file/FileStreamSourceConnectorTest.java | 8 ++--- .../file/FileStreamSourceTaskTest.java | 7 ++-- .../kafka/copycat/json/JsonConverter.java | 24 ++++++------- .../apache/kafka/copycat/runtime/Worker.java | 12 +++---- .../kafka/copycat/runtime/WorkerSinkTask.java | 7 ++-- .../copycat/runtime/WorkerSourceTask.java | 3 +- .../kafka/copycat/runtime/WorkerTask.java | 6 +--- .../runtime/standalone/StandaloneHerder.java | 15 ++++---- .../storage/FileOffsetBackingStore.java | 8 ++--- .../storage/OffsetStorageReaderImpl.java | 4 +-- .../copycat/storage/OffsetStorageWriter.java | 4 +-- .../kafka/copycat/runtime/WorkerTest.java | 7 ++-- .../standalone/StandaloneHerderTest.java | 5 ++- .../storage/OffsetStorageWriterTest.java | 4 +-- 26 files changed, 74 insertions(+), 135 deletions(-) delete mode 100644 copycat/api/src/main/java/org/apache/kafka/copycat/errors/CopycatRuntimeException.java diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/connector/Connector.java b/copycat/api/src/main/java/org/apache/kafka/copycat/connector/Connector.java index 7127ff71222..2ea3c956f4d 100644 --- a/copycat/api/src/main/java/org/apache/kafka/copycat/connector/Connector.java +++ b/copycat/api/src/main/java/org/apache/kafka/copycat/connector/Connector.java @@ -18,7 +18,6 @@ package org.apache.kafka.copycat.connector; import org.apache.kafka.common.annotation.InterfaceStability; -import org.apache.kafka.copycat.errors.CopycatException; import java.util.List; import java.util.Properties; @@ -81,9 +80,8 @@ public abstract class Connector { * either just been instantiated and initialized or {@link #stop()} has been invoked. * * @param props configuration settings - * @throws CopycatException */ - public abstract void start(Properties props) throws CopycatException; + public abstract void start(Properties props); /** * Reconfigure this Connector. Most implementations will not override this, using the default @@ -92,9 +90,8 @@ public abstract class Connector { * efficiently, e.g. without shutting down network connections to the external system. * * @param props new configuration settings - * @throws CopycatException */ - public void reconfigure(Properties props) throws CopycatException { + public void reconfigure(Properties props) { stop(); start(props); } @@ -115,8 +112,6 @@ public abstract class Connector { /** * Stop this connector. - * - * @throws CopycatException */ - public abstract void stop() throws CopycatException; + public abstract void stop(); } diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/connector/Task.java b/copycat/api/src/main/java/org/apache/kafka/copycat/connector/Task.java index 40fbe18dab5..cdaba0845c2 100644 --- a/copycat/api/src/main/java/org/apache/kafka/copycat/connector/Task.java +++ b/copycat/api/src/main/java/org/apache/kafka/copycat/connector/Task.java @@ -18,7 +18,6 @@ package org.apache.kafka.copycat.connector; import org.apache.kafka.common.annotation.InterfaceStability; -import org.apache.kafka.copycat.errors.CopycatException; import java.util.Properties; @@ -45,8 +44,6 @@ public interface Task { /** * Stop this task. - * - * @throws CopycatException */ - void stop() throws CopycatException; + void stop(); } diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/errors/CopycatException.java b/copycat/api/src/main/java/org/apache/kafka/copycat/errors/CopycatException.java index 5d37557dd32..c8f1bad98ce 100644 --- a/copycat/api/src/main/java/org/apache/kafka/copycat/errors/CopycatException.java +++ b/copycat/api/src/main/java/org/apache/kafka/copycat/errors/CopycatException.java @@ -17,13 +17,14 @@ package org.apache.kafka.copycat.errors; +import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.annotation.InterfaceStability; /** * CopycatException is the top-level exception type generated by Copycat and connectors. */ @InterfaceStability.Unstable -public class CopycatException extends Exception { +public class CopycatException extends KafkaException { public CopycatException(String s) { super(s); diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/errors/CopycatRuntimeException.java b/copycat/api/src/main/java/org/apache/kafka/copycat/errors/CopycatRuntimeException.java deleted file mode 100644 index 61e94c5eac8..00000000000 --- a/copycat/api/src/main/java/org/apache/kafka/copycat/errors/CopycatRuntimeException.java +++ /dev/null @@ -1,35 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -package org.apache.kafka.copycat.errors; - -import org.apache.kafka.common.annotation.InterfaceStability; - -@InterfaceStability.Unstable -public class CopycatRuntimeException extends RuntimeException { - public CopycatRuntimeException(String s) { - super(s); - } - - public CopycatRuntimeException(String s, Throwable throwable) { - super(s, throwable); - } - - public CopycatRuntimeException(Throwable throwable) { - super(throwable); - } -} diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTask.java b/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTask.java index 6eb6fd54a5c..49fbbe937ee 100644 --- a/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTask.java +++ b/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTask.java @@ -19,7 +19,6 @@ package org.apache.kafka.copycat.sink; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.annotation.InterfaceStability; import org.apache.kafka.copycat.connector.Task; -import org.apache.kafka.copycat.errors.CopycatException; import java.util.Collection; import java.util.Map; @@ -52,7 +51,7 @@ public abstract class SinkTask implements Task { * * @param records the set of records to send */ - public abstract void put(Collection records) throws CopycatException; + public abstract void put(Collection records); /** * Flush all records that have been {@link #put} for the specified topic-partitions. The diff --git a/copycat/api/src/test/java/org/apache/kafka/copycat/connector/ConnectorReconfigurationTest.java b/copycat/api/src/test/java/org/apache/kafka/copycat/connector/ConnectorReconfigurationTest.java index 72067a85c3c..e7ad2f3f33f 100644 --- a/copycat/api/src/test/java/org/apache/kafka/copycat/connector/ConnectorReconfigurationTest.java +++ b/copycat/api/src/test/java/org/apache/kafka/copycat/connector/ConnectorReconfigurationTest.java @@ -52,7 +52,7 @@ public class ConnectorReconfigurationTest { } @Override - public void start(Properties props) throws CopycatException { + public void start(Properties props) { configureOrder = order++; } @@ -67,7 +67,7 @@ public class ConnectorReconfigurationTest { } @Override - public void stop() throws CopycatException { + public void stop() { stopOrder = order++; if (stopException) throw new CopycatException("error"); diff --git a/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkConnector.java b/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkConnector.java index 651029909dd..e41364e818a 100644 --- a/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkConnector.java +++ b/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkConnector.java @@ -18,7 +18,6 @@ package org.apache.kafka.copycat.file; import org.apache.kafka.copycat.connector.Task; -import org.apache.kafka.copycat.errors.CopycatException; import org.apache.kafka.copycat.sink.SinkConnector; import java.util.ArrayList; @@ -35,7 +34,7 @@ public class FileStreamSinkConnector extends SinkConnector { private String filename; @Override - public void start(Properties props) throws CopycatException { + public void start(Properties props) { filename = props.getProperty(FILE_CONFIG); } @@ -57,7 +56,7 @@ public class FileStreamSinkConnector extends SinkConnector { } @Override - public void stop() throws CopycatException { + public void stop() { // Nothing to do since FileStreamSinkConnector has no background monitoring. } } diff --git a/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkTask.java b/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkTask.java index 16839ffdbf2..7e4ca7eb793 100644 --- a/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkTask.java +++ b/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkTask.java @@ -19,7 +19,6 @@ package org.apache.kafka.copycat.file; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.copycat.errors.CopycatException; -import org.apache.kafka.copycat.errors.CopycatRuntimeException; import org.apache.kafka.copycat.sink.SinkRecord; import org.apache.kafka.copycat.sink.SinkTask; import org.slf4j.Logger; @@ -57,14 +56,13 @@ public class FileStreamSinkTask extends SinkTask { try { outputStream = new PrintStream(new File(filename)); } catch (FileNotFoundException e) { - throw new CopycatRuntimeException( - "Couldn't find or create file for FileStreamSinkTask: {}", e); + throw new CopycatException("Couldn't find or create file for FileStreamSinkTask: {}", e); } } } @Override - public void put(Collection sinkRecords) throws CopycatException { + public void put(Collection sinkRecords) { for (SinkRecord record : sinkRecords) { outputStream.println(record.getValue()); } @@ -76,6 +74,6 @@ public class FileStreamSinkTask extends SinkTask { } @Override - public void stop() throws CopycatException { + public void stop() { } } diff --git a/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceConnector.java b/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceConnector.java index 9dd85ef2a50..4f9d8d0a746 100644 --- a/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceConnector.java +++ b/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceConnector.java @@ -37,7 +37,7 @@ public class FileStreamSourceConnector extends SourceConnector { private String topic; @Override - public void start(Properties props) throws CopycatException { + public void start(Properties props) { filename = props.getProperty(FILE_CONFIG); topic = props.getProperty(TOPIC_CONFIG); if (topic == null || topic.isEmpty()) @@ -64,7 +64,7 @@ public class FileStreamSourceConnector extends SourceConnector { } @Override - public void stop() throws CopycatException { + public void stop() { // Nothing to do since FileStreamSourceConnector has no background monitoring. } } diff --git a/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceTask.java b/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceTask.java index 1752ea790ed..572ae1fd119 100644 --- a/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceTask.java +++ b/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceTask.java @@ -18,7 +18,6 @@ package org.apache.kafka.copycat.file; import org.apache.kafka.copycat.errors.CopycatException; -import org.apache.kafka.copycat.errors.CopycatRuntimeException; import org.apache.kafka.copycat.source.SourceRecord; import org.apache.kafka.copycat.source.SourceTask; import org.slf4j.Logger; @@ -63,19 +62,19 @@ public class FileStreamSourceTask extends SourceTask { skipLeft -= skipped; } catch (IOException e) { log.error("Error while trying to seek to previous offset in file: ", e); - throw new CopycatRuntimeException(e); + throw new CopycatException(e); } } log.debug("Skipped to offset {}", lastRecordedOffset); } streamOffset = (lastRecordedOffset != null) ? lastRecordedOffset : 0L; } catch (FileNotFoundException e) { - throw new CopycatRuntimeException("Couldn't find file for FileStreamSourceTask: {}", e); + throw new CopycatException("Couldn't find file for FileStreamSourceTask: {}", e); } } topic = props.getProperty(FileStreamSourceConnector.TOPIC_CONFIG); if (topic == null) - throw new CopycatRuntimeException("ConsoleSourceTask config missing topic setting"); + throw new CopycatException("ConsoleSourceTask config missing topic setting"); reader = new BufferedReader(new InputStreamReader(stream)); } @@ -161,7 +160,7 @@ public class FileStreamSourceTask extends SourceTask { } @Override - public void stop() throws CopycatException { + public void stop() { log.trace("Stopping"); synchronized (this) { try { diff --git a/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSinkConnectorTest.java b/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSinkConnectorTest.java index 412aab16535..643fb434e9e 100644 --- a/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSinkConnectorTest.java +++ b/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSinkConnectorTest.java @@ -19,7 +19,6 @@ package org.apache.kafka.copycat.file; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.copycat.connector.ConnectorContext; -import org.apache.kafka.copycat.errors.CopycatException; import org.apache.kafka.copycat.sink.SinkConnector; import org.junit.Before; import org.junit.Test; @@ -57,7 +56,7 @@ public class FileStreamSinkConnectorTest { } @Test - public void testSinkTasks() throws CopycatException { + public void testSinkTasks() { PowerMock.replayAll(); connector.start(sinkProperties); @@ -75,7 +74,7 @@ public class FileStreamSinkConnectorTest { } @Test - public void testTaskClass() throws CopycatException { + public void testTaskClass() { PowerMock.replayAll(); connector.start(sinkProperties); diff --git a/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSinkTaskTest.java b/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSinkTaskTest.java index 1238543adc3..b4e1b0cae95 100644 --- a/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSinkTaskTest.java +++ b/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSinkTaskTest.java @@ -18,7 +18,6 @@ package org.apache.kafka.copycat.file; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.copycat.errors.CopycatException; import org.apache.kafka.copycat.sink.SinkRecord; import org.junit.Before; import org.junit.Test; @@ -44,7 +43,7 @@ public class FileStreamSinkTaskTest { } @Test - public void testPutFlush() throws CopycatException { + public void testPutFlush() { HashMap offsets = new HashMap<>(); // We do not call task.start() since it would override the output stream diff --git a/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSourceConnectorTest.java b/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSourceConnectorTest.java index b9c433480e9..e23055c475a 100644 --- a/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSourceConnectorTest.java +++ b/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSourceConnectorTest.java @@ -51,7 +51,7 @@ public class FileStreamSourceConnectorTest { } @Test - public void testSourceTasks() throws CopycatException { + public void testSourceTasks() { PowerMock.replayAll(); connector.start(sourceProperties); @@ -74,7 +74,7 @@ public class FileStreamSourceConnectorTest { } @Test - public void testSourceTasksStdin() throws CopycatException { + public void testSourceTasksStdin() { PowerMock.replayAll(); sourceProperties.remove(FileStreamSourceConnector.FILE_CONFIG); @@ -87,13 +87,13 @@ public class FileStreamSourceConnectorTest { } @Test(expected = CopycatException.class) - public void testMultipleSourcesInvalid() throws CopycatException { + public void testMultipleSourcesInvalid() { sourceProperties.setProperty(FileStreamSourceConnector.TOPIC_CONFIG, MULTIPLE_TOPICS); connector.start(sourceProperties); } @Test - public void testTaskClass() throws CopycatException { + public void testTaskClass() { PowerMock.replayAll(); connector.start(sourceProperties); diff --git a/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSourceTaskTest.java b/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSourceTaskTest.java index e2a01e59e3a..0ec71d3354d 100644 --- a/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSourceTaskTest.java +++ b/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSourceTaskTest.java @@ -18,7 +18,6 @@ package org.apache.kafka.copycat.file; import org.apache.kafka.copycat.errors.CopycatException; -import org.apache.kafka.copycat.errors.CopycatRuntimeException; import org.apache.kafka.copycat.source.SourceRecord; import org.apache.kafka.copycat.source.SourceTaskContext; import org.apache.kafka.copycat.storage.OffsetStorageReader; @@ -72,7 +71,7 @@ public class FileStreamSourceTaskTest { } @Test - public void testNormalLifecycle() throws InterruptedException, CopycatException, IOException { + public void testNormalLifecycle() throws InterruptedException, IOException { expectOffsetLookupReturnNone(); replay(); @@ -117,7 +116,7 @@ public class FileStreamSourceTaskTest { task.stop(); } - @Test(expected = CopycatRuntimeException.class) + @Test(expected = CopycatException.class) public void testMissingTopic() { expectOffsetLookupReturnNone(); replay(); @@ -126,7 +125,7 @@ public class FileStreamSourceTaskTest { task.start(config); } - @Test(expected = CopycatRuntimeException.class) + @Test(expected = CopycatException.class) public void testInvalidFile() { config.setProperty(FileStreamSourceConnector.FILE_CONFIG, "bogusfilename"); task.start(config); diff --git a/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonConverter.java b/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonConverter.java index fd6471846ef..36a6ca878e2 100644 --- a/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonConverter.java +++ b/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonConverter.java @@ -22,7 +22,7 @@ import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.JsonNodeFactory; import com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.kafka.copycat.data.*; -import org.apache.kafka.copycat.errors.CopycatRuntimeException; +import org.apache.kafka.copycat.errors.CopycatException; import org.apache.kafka.copycat.storage.Converter; import java.io.IOException; @@ -74,7 +74,7 @@ public class JsonConverter implements Converter { try { return value.binaryValue(); } catch (IOException e) { - throw new CopycatRuntimeException("Invalid bytes field", e); + throw new CopycatException("Invalid bytes field", e); } } }); @@ -89,7 +89,7 @@ public class JsonConverter implements Converter { public Object convert(JsonNode jsonSchema, JsonNode value) { JsonNode elemSchema = jsonSchema.get(JsonSchema.ARRAY_ITEMS_FIELD_NAME); if (elemSchema == null) - throw new CopycatRuntimeException("Array schema did not specify the element type"); + throw new CopycatException("Array schema did not specify the element type"); ArrayList result = new ArrayList<>(); for (JsonNode elem : value) { result.add(convertToCopycat(elemSchema, elem)); @@ -108,7 +108,7 @@ public class JsonConverter implements Converter { @Override public Object toCopycatData(JsonNode value) { if (!value.isObject() || value.size() != 2 || !value.has(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME) || !value.has(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME)) - throw new CopycatRuntimeException("JSON value converted to Copycat must be in envelope containing schema"); + throw new CopycatException("JSON value converted to Copycat must be in envelope containing schema"); return convertToCopycat(value.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME), value.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME)); } @@ -143,7 +143,7 @@ public class JsonConverter implements Converter { case MAP: throw new UnsupportedOperationException("map schema not supported"); default: - throw new CopycatRuntimeException("Couldn't translate unsupported schema type " + schema.getType().getName() + "."); + throw new CopycatException("Couldn't translate unsupported schema type " + schema.getType().getName() + "."); } } @@ -154,7 +154,7 @@ public class JsonConverter implements Converter { JsonNode schemaTypeNode = jsonSchema.get(JsonSchema.SCHEMA_TYPE_FIELD_NAME); if (schemaTypeNode == null || !schemaTypeNode.isTextual()) - throw new CopycatRuntimeException("Schema must contain 'type' field"); + throw new CopycatException("Schema must contain 'type' field"); switch (schemaTypeNode.textValue()) { case JsonSchema.BOOLEAN_TYPE_NAME: @@ -174,10 +174,10 @@ public class JsonConverter implements Converter { case JsonSchema.ARRAY_TYPE_NAME: JsonNode elemSchema = jsonSchema.get(JsonSchema.ARRAY_ITEMS_FIELD_NAME); if (elemSchema == null) - throw new CopycatRuntimeException("Array schema did not specify the element type"); + throw new CopycatException("Array schema did not specify the element type"); return Schema.createArray(asCopycatSchema(elemSchema)); default: - throw new CopycatRuntimeException("Unknown schema type: " + schemaTypeNode.textValue()); + throw new CopycatException("Unknown schema type: " + schemaTypeNode.textValue()); } } @@ -231,7 +231,7 @@ public class JsonConverter implements Converter { schema.set(JsonSchema.ARRAY_ITEMS_FIELD_NAME, itemSchema); } else { if (!itemSchema.equals(fieldSchemaAndValue.schema)) - throw new CopycatRuntimeException("Mismatching schemas found in a list."); + throw new CopycatException("Mismatching schemas found in a list."); } list.add(fieldSchemaAndValue.payload); @@ -239,7 +239,7 @@ public class JsonConverter implements Converter { return new JsonSchema.Envelope(schema, list); } - throw new CopycatRuntimeException("Couldn't convert " + value + " to Avro."); + throw new CopycatException("Couldn't convert " + value + " to Avro."); } @@ -249,13 +249,13 @@ public class JsonConverter implements Converter { JsonNode schemaTypeNode = jsonSchema.get(JsonSchema.SCHEMA_TYPE_FIELD_NAME); if (schemaTypeNode == null || !schemaTypeNode.isTextual()) - throw new CopycatRuntimeException("Schema must contain 'type' field. Schema: " + jsonSchema.toString()); + throw new CopycatException("Schema must contain 'type' field. Schema: " + jsonSchema.toString()); JsonToCopycatTypeConverter typeConverter = TO_COPYCAT_CONVERTERS.get(schemaTypeNode.textValue()); if (typeConverter != null) return typeConverter.convert(jsonSchema, jsonValue); - throw new CopycatRuntimeException("Unknown schema type: " + schemaTypeNode); + throw new CopycatException("Unknown schema type: " + schemaTypeNode); } diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Worker.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Worker.java index e4763861f73..55d07840205 100644 --- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Worker.java +++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Worker.java @@ -27,7 +27,6 @@ import org.apache.kafka.common.utils.Utils; import org.apache.kafka.copycat.cli.WorkerConfig; import org.apache.kafka.copycat.connector.Task; import org.apache.kafka.copycat.errors.CopycatException; -import org.apache.kafka.copycat.errors.CopycatRuntimeException; import org.apache.kafka.copycat.sink.SinkTask; import org.apache.kafka.copycat.source.SourceTask; import org.apache.kafka.copycat.storage.*; @@ -175,13 +174,12 @@ public class Worker { * {@link org.apache.kafka.copycat.sink.SinkTask}. * @param props configuration options for the task */ - public void addTask(ConnectorTaskId id, String taskClassName, Properties props) - throws CopycatException { + public void addTask(ConnectorTaskId id, String taskClassName, Properties props) { if (tasks.containsKey(id)) { String msg = "Task already exists in this worker; the herder should not have requested " + "that this : " + id; log.error(msg); - throw new CopycatRuntimeException(msg); + throw new CopycatException(msg); } final Task task = instantiateTask(taskClassName); @@ -213,11 +211,11 @@ public class Worker { try { return Utils.newInstance(Class.forName(taskClassName).asSubclass(Task.class)); } catch (ClassNotFoundException e) { - throw new CopycatRuntimeException("Task class not found", e); + throw new CopycatException("Task class not found", e); } } - public void stopTask(ConnectorTaskId id) throws CopycatException { + public void stopTask(ConnectorTaskId id) { WorkerTask task = getTask(id); task.stop(); if (!task.awaitStop(config.getLong(WorkerConfig.TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG))) @@ -230,7 +228,7 @@ public class Worker { WorkerTask task = tasks.get(id); if (task == null) { log.error("Task not found: " + id); - throw new CopycatRuntimeException("Task not found: " + id); + throw new CopycatException("Task not found: " + id); } return task; } diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java index b1893b9ba7a..4eaf756fd33 100644 --- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java +++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java @@ -23,7 +23,6 @@ import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.copycat.cli.WorkerConfig; import org.apache.kafka.copycat.errors.CopycatException; -import org.apache.kafka.copycat.errors.CopycatRuntimeException; import org.apache.kafka.copycat.sink.SinkRecord; import org.apache.kafka.copycat.sink.SinkTask; import org.apache.kafka.copycat.sink.SinkTaskContext; @@ -72,7 +71,7 @@ class WorkerSinkTask implements WorkerTask { } @Override - public void stop() throws CopycatException { + public void stop() { // Offset commit is handled upon exit in work thread task.stop(); if (workThread != null) @@ -156,7 +155,7 @@ class WorkerSinkTask implements WorkerTask { private KafkaConsumer createConsumer(Properties taskProps) { String topicsStr = taskProps.getProperty(SinkTask.TOPICS_CONFIG); if (topicsStr == null || topicsStr.isEmpty()) - throw new CopycatRuntimeException("Sink tasks require a list of topics."); + throw new CopycatException("Sink tasks require a list of topics."); String[] topics = topicsStr.split(","); // Include any unknown worker configs so consumer configs can be set globally on the worker @@ -176,7 +175,7 @@ class WorkerSinkTask implements WorkerTask { try { newConsumer = new KafkaConsumer<>(props); } catch (Throwable t) { - throw new CopycatRuntimeException("Failed to create consumer", t); + throw new CopycatException("Failed to create consumer", t); } log.debug("Task {} subscribing to topics {}", id, topics); diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java index a23fe090802..c80adb47eb1 100644 --- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java +++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java @@ -23,7 +23,6 @@ import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.copycat.cli.WorkerConfig; -import org.apache.kafka.copycat.errors.CopycatException; import org.apache.kafka.copycat.source.SourceRecord; import org.apache.kafka.copycat.source.SourceTask; import org.apache.kafka.copycat.source.SourceTaskContext; @@ -98,7 +97,7 @@ class WorkerSourceTask implements WorkerTask { } @Override - public void stop() throws CopycatException { + public void stop() { task.stop(); commitOffsets(); if (workThread != null) diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerTask.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerTask.java index 1cdf597699f..af225bb7e2d 100644 --- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerTask.java +++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerTask.java @@ -17,8 +17,6 @@ package org.apache.kafka.copycat.runtime; -import org.apache.kafka.copycat.errors.CopycatException; - import java.util.Properties; /** @@ -36,10 +34,8 @@ interface WorkerTask { /** * Stop this task from processing messages. This method does not block, it only triggers * shutdown. Use #{@link #awaitStop} to block until completion. - * - * @throws CopycatException */ - void stop() throws CopycatException; + void stop(); /** * Wait for this task to finish stopping. diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerder.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerder.java index f1ca3f17bcd..2ed91834de5 100644 --- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerder.java +++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerder.java @@ -20,7 +20,6 @@ package org.apache.kafka.copycat.runtime.standalone; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.copycat.connector.Connector; import org.apache.kafka.copycat.errors.CopycatException; -import org.apache.kafka.copycat.errors.CopycatRuntimeException; import org.apache.kafka.copycat.runtime.ConnectorConfig; import org.apache.kafka.copycat.runtime.Herder; import org.apache.kafka.copycat.runtime.Worker; @@ -74,7 +73,7 @@ public class StandaloneHerder implements Herder { callback.onCompletion(null, connState.name); // This should always be a new job, create jobs from scratch createConnectorTasks(connState); - } catch (CopycatRuntimeException e) { + } catch (CopycatException e) { if (callback != null) callback.onCompletion(e, null); } @@ -86,7 +85,7 @@ public class StandaloneHerder implements Herder { destroyConnector(name); if (callback != null) callback.onCompletion(null, null); - } catch (CopycatRuntimeException e) { + } catch (CopycatException e) { if (callback != null) callback.onCompletion(e, null); } @@ -104,7 +103,7 @@ public class StandaloneHerder implements Herder { if (connectors.containsKey(connName)) { log.error("Ignoring request to create connector due to conflicting connector name"); - throw new CopycatRuntimeException("Connector with name " + connName + " already exists"); + throw new CopycatException("Connector with name " + connName + " already exists"); } final Connector connector; @@ -113,13 +112,13 @@ public class StandaloneHerder implements Herder { } catch (Throwable t) { // Catches normal exceptions due to instantiation errors as well as any runtime errors that // may be caused by user code - throw new CopycatRuntimeException("Failed to create connector instance", t); + throw new CopycatException("Failed to create connector instance", t); } connector.initialize(new StandaloneConnectorContext(this, connName)); try { connector.start(configs); } catch (CopycatException e) { - throw new CopycatRuntimeException("Connector threw an exception while starting", e); + throw new CopycatException("Connector threw an exception while starting", e); } ConnectorState state = new ConnectorState(connName, connector, maxTasks, topics); connectors.put(connName, state); @@ -133,7 +132,7 @@ public class StandaloneHerder implements Herder { try { return Utils.newInstance(className, Connector.class); } catch (ClassNotFoundException e) { - throw new CopycatRuntimeException("Couldn't instantiate connector class", e); + throw new CopycatException("Couldn't instantiate connector class", e); } } @@ -142,7 +141,7 @@ public class StandaloneHerder implements Herder { ConnectorState state = connectors.get(connName); if (state == null) { log.error("Failed to destroy connector {} because it does not exist", connName); - throw new CopycatRuntimeException("Connector does not exist"); + throw new CopycatException("Connector does not exist"); } stopConnector(state); diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/FileOffsetBackingStore.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/FileOffsetBackingStore.java index 73da8f4bb57..dfa9e7808e0 100644 --- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/FileOffsetBackingStore.java +++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/FileOffsetBackingStore.java @@ -17,7 +17,7 @@ package org.apache.kafka.copycat.storage; -import org.apache.kafka.copycat.errors.CopycatRuntimeException; +import org.apache.kafka.copycat.errors.CopycatException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -67,7 +67,7 @@ public class FileOffsetBackingStore extends MemoryOffsetBackingStore { ObjectInputStream is = new ObjectInputStream(new FileInputStream(file)); Object obj = is.readObject(); if (!(obj instanceof HashMap)) - throw new CopycatRuntimeException("Expected HashMap but found " + obj.getClass()); + throw new CopycatException("Expected HashMap but found " + obj.getClass()); HashMap> raw = (HashMap>) obj; data = new HashMap<>(); for (Map.Entry> entry : raw.entrySet()) { @@ -85,7 +85,7 @@ public class FileOffsetBackingStore extends MemoryOffsetBackingStore { // FileNotFoundException: Ignore, may be new. // EOFException: Ignore, this means the file was missing or corrupt } catch (IOException | ClassNotFoundException e) { - throw new CopycatRuntimeException(e); + throw new CopycatException(e); } } @@ -105,7 +105,7 @@ public class FileOffsetBackingStore extends MemoryOffsetBackingStore { os.writeObject(raw); os.close(); } catch (IOException e) { - throw new CopycatRuntimeException(e); + throw new CopycatException(e); } } } diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReaderImpl.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReaderImpl.java index 99e2b6562f3..7a050dc9404 100644 --- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReaderImpl.java +++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReaderImpl.java @@ -19,7 +19,7 @@ package org.apache.kafka.copycat.storage; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serializer; -import org.apache.kafka.copycat.errors.CopycatRuntimeException; +import org.apache.kafka.copycat.errors.CopycatException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -82,7 +82,7 @@ public class OffsetStorageReaderImpl implements OffsetStorageReader { raw = backingStore.get(namespace, serializedToOriginal.keySet(), null).get(); } catch (Exception e) { log.error("Failed to fetch offsets from namespace {}: ", namespace, e); - throw new CopycatRuntimeException("Failed to fetch offsets.", e); + throw new CopycatException("Failed to fetch offsets.", e); } // Deserialize all the values and map back to the original keys diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageWriter.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageWriter.java index 8626968f2b4..c6e829c635a 100644 --- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageWriter.java +++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageWriter.java @@ -18,7 +18,7 @@ package org.apache.kafka.copycat.storage; import org.apache.kafka.common.serialization.Serializer; -import org.apache.kafka.copycat.errors.CopycatRuntimeException; +import org.apache.kafka.copycat.errors.CopycatException; import org.apache.kafka.copycat.util.Callback; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -114,7 +114,7 @@ public class OffsetStorageWriter { if (flushing()) { log.error("Invalid call to OffsetStorageWriter flush() while already flushing, the " + "framework should not allow this"); - throw new CopycatRuntimeException("OffsetStorageWriter is already flushing"); + throw new CopycatException("OffsetStorageWriter is already flushing"); } if (data.isEmpty()) diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerTest.java index bc08996a6af..32e7ff903fe 100644 --- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerTest.java +++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerTest.java @@ -23,7 +23,6 @@ import org.apache.kafka.common.utils.Time; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.copycat.cli.WorkerConfig; import org.apache.kafka.copycat.errors.CopycatException; -import org.apache.kafka.copycat.errors.CopycatRuntimeException; import org.apache.kafka.copycat.source.SourceRecord; import org.apache.kafka.copycat.source.SourceTask; import org.apache.kafka.copycat.storage.*; @@ -115,8 +114,8 @@ public class WorkerTest extends ThreadedTest { PowerMock.verifyAll(); } - @Test(expected = CopycatRuntimeException.class) - public void testStopInvalidTask() throws CopycatException { + @Test(expected = CopycatException.class) + public void testStopInvalidTask() { worker.stopTask(taskId); } @@ -174,7 +173,7 @@ public class WorkerTest extends ThreadedTest { } @Override - public void stop() throws CopycatException { + public void stop() { } } } diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerderTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerderTest.java index 04ea4ceae75..5ac7e389042 100644 --- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerderTest.java +++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerderTest.java @@ -19,7 +19,6 @@ package org.apache.kafka.copycat.runtime.standalone; import org.apache.kafka.copycat.connector.Connector; import org.apache.kafka.copycat.connector.Task; -import org.apache.kafka.copycat.errors.CopycatException; import org.apache.kafka.copycat.runtime.ConnectorConfig; import org.apache.kafka.copycat.runtime.Worker; import org.apache.kafka.copycat.sink.SinkConnector; @@ -160,14 +159,14 @@ public class StandaloneHerderTest { PowerMock.expectLastCall(); } - private void expectStop() throws CopycatException { + private void expectStop() { worker.stopTask(new ConnectorTaskId(CONNECTOR_NAME, 0)); EasyMock.expectLastCall(); connector.stop(); EasyMock.expectLastCall(); } - private void expectDestroy() throws CopycatException { + private void expectDestroy() { expectStop(); } diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/OffsetStorageWriterTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/OffsetStorageWriterTest.java index cce151446c5..3d49f05ac97 100644 --- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/OffsetStorageWriterTest.java +++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/OffsetStorageWriterTest.java @@ -18,7 +18,7 @@ package org.apache.kafka.copycat.storage; import org.apache.kafka.common.serialization.Serializer; -import org.apache.kafka.copycat.errors.CopycatRuntimeException; +import org.apache.kafka.copycat.errors.CopycatException; import org.apache.kafka.copycat.util.Callback; import org.easymock.Capture; import org.easymock.EasyMock; @@ -133,7 +133,7 @@ public class OffsetStorageWriterTest { PowerMock.verifyAll(); } - @Test(expected = CopycatRuntimeException.class) + @Test(expected = CopycatException.class) public void testAlreadyFlushing() throws Exception { @SuppressWarnings("unchecked") final Callback callback = PowerMock.createMock(Callback.class);