Address Gwen's review comments.

This commit is contained in:
Ewen Cheslack-Postava 2015-08-11 19:42:57 -07:00
parent 6787a85e87
commit d713a21990
11 changed files with 26 additions and 446 deletions

View File

@ -543,7 +543,7 @@ project(':copycat-data') {
archives.extendsFrom (testCompile)
}
/* FIXME
/* FIXME Re-enable this with KAFKA-2367 when the placeholder data API is replaced
checkstyle {
configFile = new File(rootDir, "checkstyle/checkstyle.xml")
}

View File

@ -1,83 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE module PUBLIC
"-//Puppy Crawl//DTD Check Configuration 1.3//EN"
"http://www.puppycrawl.com/dtds/configuration_1_3.dtd">
<!--
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
-->
<module name="Checker">
<property name="localeLanguage" value="en"/>
<module name="FileTabCharacter"/>
<!-- header -->
<module name="RegexpHeader">
<property name="header" value="/\*\*\nLicensed to the Apache.*"/>
</module>
<module name="TreeWalker">
<!-- code cleanup -->
<module name="UnusedImports"/>
<module name="RedundantImport"/>
<module name="IllegalImport" />
<module name="EqualsHashCode"/>
<module name="SimplifyBooleanExpression"/>
<module name="OneStatementPerLine"/>
<module name="UnnecessaryParentheses" />
<module name="SimplifyBooleanReturn"/>
<!-- style -->
<module name="DefaultComesLast"/>
<module name="EmptyStatement"/>
<module name="ArrayTypeStyle"/>
<module name="UpperEll"/>
<module name="LeftCurly"/>
<module name="RightCurly"/>
<module name="EmptyStatement"/>
<module name="ConstantName">
<property name="format" value="(^[A-Z][A-Z0-9]*(_[A-Z0-9]+)*$)|(^log$)"/>
</module>
<module name="LocalVariableName"/>
<module name="LocalFinalVariableName"/>
<module name="ClassTypeParameterName"/>
<module name="MemberName"/>
<module name="MethodTypeParameterName"/>
<module name="PackageName"/>
<module name="ParameterName"/>
<module name="StaticVariableName"/>
<module name="TypeName"/>
<!-- dependencies -->
<module name="ImportControl">
<property name="file" value="${basedir}/checkstyle/import-control.xml"/>
</module>
<!-- whitespace -->
<module name="GenericWhitespace"/>
<module name="NoWhitespaceBefore"/>
<module name="WhitespaceAfter" />
<module name="NoWhitespaceAfter"/>
<module name="WhitespaceAround">
<property name="allowEmptyConstructors" value="true"/>
<property name="allowEmptyMethods" value="true"/>
</module>
<module name="Indentation"/>
<module name="MethodParamPad"/>
<module name="ParenPad"/>
<module name="TypecastParenPad"/>
</module>
</module>

View File

@ -1,102 +0,0 @@
<!DOCTYPE import-control PUBLIC
"-//Puppy Crawl//DTD Import Control 1.1//EN"
"http://www.puppycrawl.com/dtds/import_control_1_1.dtd">
<!--
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
-->
<import-control pkg="org.apache.kafka">
<!-- THINK HARD ABOUT THE LAYERING OF THE PROJECT BEFORE CHANGING THIS FILE -->
<!-- common library dependencies -->
<allow pkg="java" />
<allow pkg="javax.management" />
<allow pkg="org.slf4j" />
<allow pkg="org.junit" />
<!-- no one depends on the server -->
<disallow pkg="kafka" />
<!-- anyone can use public classes -->
<allow pkg="org.apache.kafka.common" exact-match="true" />
<allow pkg="org.apache.kafka.common.utils" />
<subpackage name="common">
<disallow pkg="org.apache.kafka.clients" />
<allow pkg="org.apache.kafka.common" exact-match="true" />
<allow pkg="org.apache.kafka.test" />
<subpackage name="config">
<allow pkg="org.apache.kafka.common.config" />
<!-- for testing -->
<allow pkg="org.apache.kafka.common.metrics" />
</subpackage>
<subpackage name="metrics">
<allow pkg="org.apache.kafka.common.metrics" />
</subpackage>
<subpackage name="network">
<allow pkg="org.apache.kafka.common.metrics" />
</subpackage>
<subpackage name="protocol">
<allow pkg="org.apache.kafka.common.errors" />
<allow pkg="org.apache.kafka.common.protocol.types" />
</subpackage>
<subpackage name="record">
<allow pkg="net.jpountz" />
<allow pkg="org.apache.kafka.common.record" />
</subpackage>
<subpackage name="requests">
<allow pkg="org.apache.kafka.common.protocol" />
<allow pkg="org.apache.kafka.common.network" />
<!-- for testing -->
<allow pkg="org.apache.kafka.common.errors" />
</subpackage>
<subpackage name="serialization">
<allow class="org.apache.kafka.common.errors.SerializationException" />
</subpackage>
</subpackage>
<subpackage name="clients">
<allow pkg="org.apache.kafka.common" />
<allow pkg="org.slf4j" />
<allow pkg="org.apache.kafka.clients" exact-match="true"/>
<allow pkg="org.apache.kafka.test" />
<subpackage name="consumer">
<allow pkg="org.apache.kafka.clients.consumer" />
</subpackage>
<subpackage name="producer">
<allow pkg="org.apache.kafka.clients.producer" />
</subpackage>
<subpackage name="tools">
<allow pkg="org.apache.kafka.clients.producer" />
<allow pkg="org.apache.kafka.clients.consumer" />
</subpackage>
</subpackage>
<subpackage name="test">
<allow pkg="org.apache.kafka" />
</subpackage>
</import-control>

View File

@ -278,6 +278,8 @@ public class Utils {
throw new KafkaException("Could not instantiate class " + c.getName(), e);
} catch (InstantiationException e) {
throw new KafkaException("Could not instantiate class " + c.getName() + " Does it have a public no-argument constructor?", e);
} catch (NullPointerException e) {
throw new KafkaException("Requested class was null", e);
}
}

View File

@ -59,10 +59,9 @@ public abstract class Connector {
* This version is only used to recover from failures.
* </p>
* <p>
* The default implementation ignores the provided simply ignores the provided Task
* configurations. During recovery, Copycat will request an updated set of configurations and
* update the running Tasks appropriately. However, Connectors should implement special
* handling of this case if it will avoid unnecessary changes to running Tasks.
* The default implementation ignores the provided Task configurations. During recovery, Copycat will request
* an updated set of configurations and update the running Tasks appropriately. However, Connectors should
* implement special handling of this case if it will avoid unnecessary changes to running Tasks.
* </p>
*
* @param ctx context object used to interact with the Copycat runtime

View File

@ -32,10 +32,25 @@ public abstract class SinkTaskContext {
offsets = new HashMap<>();
}
/**
* Reset the consumer offsets for the given topic partitions. SinkTasks should use this when they are started
* if they manage offsets in the sink data store rather than using Kafka consumer offsets. For example, an HDFS
* connector might record offsets in HDFS to provide exactly once delivery. When the SinkTask is started or
* a rebalance occurs, the task would reload offsets from HDFS and use this method to reset the consumer to those
* offsets.
*
* SinkTasks that do not manage their own offsets do not need to use this method.
*
* @param offsets map of offsets for topic partitions
*/
public void resetOffset(Map<TopicPartition, Long> offsets) {
this.offsets = offsets;
}
/**
* Get offsets that the SinkTask has submitted to be reset. Used by the Copycat framework.
* @return the map of offsets
*/
public Map<TopicPartition, Long> getOffsets() {
return offsets;
}

View File

@ -25,7 +25,7 @@ import java.util.Properties;
import static org.junit.Assert.assertEquals;
public class ConnectorTest {
public class ConnectorReconfigurationTest {
@Test
public void testDefaultReconfigure() throws Exception {

View File

@ -58,6 +58,6 @@ public class FileStreamSinkConnector extends SinkConnector {
@Override
public void stop() throws CopycatException {
// Nothing to do since ConsoleConnector has no background monitoring.
// Nothing to do since FileStreamSinkConnector has no background monitoring.
}
}

View File

@ -41,9 +41,9 @@ public class FileStreamSourceConnector extends SourceConnector {
filename = props.getProperty(FILE_CONFIG);
topic = props.getProperty(TOPIC_CONFIG);
if (topic == null || topic.isEmpty())
throw new CopycatException("ConsoleConnector configuration must include 'topic' setting");
throw new CopycatException("FileStreamSourceConnector configuration must include 'topic' setting");
if (topic.contains(","))
throw new CopycatException("ConsoleConnector should only have a single topic when used as a source.");
throw new CopycatException("FileStreamSourceConnector should only have a single topic when used as a source.");
}
@Override
@ -65,6 +65,6 @@ public class FileStreamSourceConnector extends SourceConnector {
@Override
public void stop() throws CopycatException {
// Nothing to do since ConsoleConnector has no background monitoring.
// Nothing to do since FileStreamSourceConnector has no background monitoring.
}
}

View File

@ -1,83 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE module PUBLIC
"-//Puppy Crawl//DTD Check Configuration 1.3//EN"
"http://www.puppycrawl.com/dtds/configuration_1_3.dtd">
<!--
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
-->
<module name="Checker">
<property name="localeLanguage" value="en"/>
<module name="FileTabCharacter"/>
<!-- header -->
<module name="RegexpHeader">
<property name="header" value="/\*\*\nLicensed to the Apache.*"/>
</module>
<module name="TreeWalker">
<!-- code cleanup -->
<module name="UnusedImports"/>
<module name="RedundantImport"/>
<module name="IllegalImport" />
<module name="EqualsHashCode"/>
<module name="SimplifyBooleanExpression"/>
<module name="OneStatementPerLine"/>
<module name="UnnecessaryParentheses" />
<module name="SimplifyBooleanReturn"/>
<!-- style -->
<module name="DefaultComesLast"/>
<module name="EmptyStatement"/>
<module name="ArrayTypeStyle"/>
<module name="UpperEll"/>
<module name="LeftCurly"/>
<module name="RightCurly"/>
<module name="EmptyStatement"/>
<module name="ConstantName">
<property name="format" value="(^[A-Z][A-Z0-9]*(_[A-Z0-9]+)*$)|(^log$)"/>
</module>
<module name="LocalVariableName"/>
<module name="LocalFinalVariableName"/>
<module name="ClassTypeParameterName"/>
<module name="MemberName"/>
<module name="MethodTypeParameterName"/>
<module name="PackageName"/>
<module name="ParameterName"/>
<module name="StaticVariableName"/>
<module name="TypeName"/>
<!-- dependencies -->
<module name="ImportControl">
<property name="file" value="${basedir}/checkstyle/import-control.xml"/>
</module>
<!-- whitespace -->
<module name="GenericWhitespace"/>
<module name="NoWhitespaceBefore"/>
<module name="WhitespaceAfter" />
<module name="NoWhitespaceAfter"/>
<module name="WhitespaceAround">
<property name="allowEmptyConstructors" value="true"/>
<property name="allowEmptyMethods" value="true"/>
</module>
<module name="Indentation"/>
<module name="MethodParamPad"/>
<module name="ParenPad"/>
<module name="TypecastParenPad"/>
</module>
</module>

View File

@ -1,168 +0,0 @@
<!DOCTYPE import-control PUBLIC
"-//Puppy Crawl//DTD Import Control 1.1//EN"
"http://www.puppycrawl.com/dtds/import_control_1_1.dtd">
<!--
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
-->
<import-control pkg="org.apache.kafka">
<!-- THINK HARD ABOUT THE LAYERING OF THE PROJECT BEFORE CHANGING THIS FILE -->
<!-- common library dependencies -->
<allow pkg="java" />
<allow pkg="javax.management" />
<allow pkg="org.slf4j" />
<allow pkg="org.junit" />
<!-- no one depends on the server -->
<disallow pkg="kafka" />
<!-- anyone can use public classes -->
<allow pkg="org.apache.kafka.common" exact-match="true" />
<allow pkg="org.apache.kafka.common.utils" />
<subpackage name="common">
<disallow pkg="org.apache.kafka.clients" />
<allow pkg="org.apache.kafka.common" exact-match="true" />
<allow pkg="org.apache.kafka.test" />
<subpackage name="config">
<allow pkg="org.apache.kafka.common.config" />
<!-- for testing -->
<allow pkg="org.apache.kafka.common.metrics" />
</subpackage>
<subpackage name="metrics">
<allow pkg="org.apache.kafka.common.metrics" />
</subpackage>
<subpackage name="network">
<allow pkg="org.apache.kafka.common.metrics" />
</subpackage>
<subpackage name="protocol">
<allow pkg="org.apache.kafka.common.errors" />
<allow pkg="org.apache.kafka.common.protocol.types" />
</subpackage>
<subpackage name="record">
<allow pkg="net.jpountz" />
<allow pkg="org.apache.kafka.common.record" />
</subpackage>
<subpackage name="requests">
<allow pkg="org.apache.kafka.common.protocol" />
<allow pkg="org.apache.kafka.common.network" />
<!-- for testing -->
<allow pkg="org.apache.kafka.common.errors" />
</subpackage>
<subpackage name="serialization">
<allow class="org.apache.kafka.common.errors.SerializationException" />
</subpackage>
</subpackage>
<subpackage name="clients">
<allow pkg="org.apache.kafka.common" />
<allow pkg="org.slf4j" />
<allow pkg="org.apache.kafka.clients" exact-match="true"/>
<allow pkg="org.apache.kafka.test" />
<subpackage name="consumer">
<allow pkg="org.apache.kafka.clients.consumer" />
</subpackage>
<subpackage name="producer">
<allow pkg="org.apache.kafka.clients.producer" />
</subpackage>
<subpackage name="tools">
<allow pkg="org.apache.kafka.clients.producer" />
<allow pkg="org.apache.kafka.clients.consumer" />
<allow pkg="com.fasterxml.jackson" />
<allow pkg="net.sourceforge.argparse4j" />
</subpackage>
</subpackage>
<subpackage name="log4jappender">
<allow pkg="org.apache.log4j" />
<allow pkg="org.apache.kafka.clients" />
<allow pkg="org.apache.kafka.common" />
<allow pkg="org.apache.kafka.test" />
</subpackage>
<subpackage name="test">
<allow pkg="org.apache.kafka" />
</subpackage>
<subpackage name="copycat">
<allow pkg="org.apache.kafka.copycat.data" />
<allow pkg="org.apache.kafka.copycat.errors" />
<subpackage name="source">
<allow pkg="org.apache.kafka.copycat.connector" />
<allow pkg="org.apache.kafka.copycat.storage" />
</subpackage>
<subpackage name="sink">
<allow pkg="org.apache.kafka.copycat.connector" />
<allow pkg="org.apache.kafka.copycat.storage" />
</subpackage>
<subpackage name="runtime">
<allow pkg="org.apache.kafka.copycat" />
<allow pkg="org.apache.kafka.common" />
<allow pkg="org.apache.kafka.clients" />
<!-- for tests -->
<allow pkg="org.easymock" />
<allow pkg="org.powermock" />
</subpackage>
<subpackage name="cli">
<allow pkg="org.apache.kafka.copycat.runtime" />
<allow pkg="org.apache.kafka.copycat.util" />
<allow pkg="org.apache.kafka.common" />
</subpackage>
<subpackage name="storage">
<allow pkg="org.apache.kafka.copycat" />
<allow pkg="org.apache.kafka.common.serialization" />
<!-- for tests -->
<allow pkg="org.easymock" />
<allow pkg="org.powermock" />
</subpackage>
<subpackage name="util">
<allow pkg="org.apache.kafka.copycat" />
</subpackage>
<subpackage name="json">
<allow pkg="com.fasterxml.jackson" />
<allow pkg="org.apache.kafka.common.serialization" />
<allow pkg="org.apache.kafka.common.errors" />
<allow pkg="org.apache.kafka.copycat.storage" />
</subpackage>
<subpackage name="file">
<allow pkg="org.apache.kafka.copycat" />
<!-- for tests -->
<allow pkg="org.easymock" />
<allow pkg="org.powermock" />
</subpackage>
</subpackage>
</import-control>