Address review comments, clean up some code styling.

This commit is contained in:
Ewen Cheslack-Postava 2015-07-30 21:18:54 -07:00
parent 25b5739ff0
commit 4674d136e1
29 changed files with 103 additions and 107 deletions

View File

@ -28,6 +28,11 @@ buildscript {
} }
def slf4jlog4j='org.slf4j:slf4j-log4j12:1.7.6' def slf4jlog4j='org.slf4j:slf4j-log4j12:1.7.6'
def slf4japi="org.slf4j:slf4j-api:1.7.6"
def junit='junit:junit:4.6'
def easymock='org.easymock:easymock:3.3.1'
def powermock='org.powermock:powermock-module-junit4:1.6.2'
def powermock_easymock='org.powermock:powermock-api-easymock:1.6.2'
allprojects { allprojects {
apply plugin: 'idea' apply plugin: 'idea'
@ -59,7 +64,7 @@ rat {
// And some of the files that we have checked in should also be excluded from this check // And some of the files that we have checked in should also be excluded from this check
excludes.addAll([ excludes.addAll([
'**/.git/**', '**/.git/**',
'build/**', '**/build/**',
'CONTRIBUTING.md', 'CONTRIBUTING.md',
'gradlew', 'gradlew',
'gradlew.bat', 'gradlew.bat',
@ -244,8 +249,8 @@ project(':core') {
compile 'org.scala-lang.modules:scala-parser-combinators_2.11:1.0.4' compile 'org.scala-lang.modules:scala-parser-combinators_2.11:1.0.4'
} }
testCompile 'junit:junit:4.6' testCompile "$junit"
testCompile 'org.easymock:easymock:3.3.1' testCompile "$easymock"
testCompile 'org.objenesis:objenesis:1.2' testCompile 'org.objenesis:objenesis:1.2'
testCompile "org.scalatest:scalatest_$baseScalaVersion:2.2.5" testCompile "org.scalatest:scalatest_$baseScalaVersion:2.2.5"
@ -376,11 +381,11 @@ project(':clients') {
archivesBaseName = "kafka-clients" archivesBaseName = "kafka-clients"
dependencies { dependencies {
compile "org.slf4j:slf4j-api:1.7.6" compile "$slf4japi"
compile 'org.xerial.snappy:snappy-java:1.1.1.7' compile 'org.xerial.snappy:snappy-java:1.1.1.7'
compile 'net.jpountz.lz4:lz4:1.2.0' compile 'net.jpountz.lz4:lz4:1.2.0'
testCompile 'junit:junit:4.6' testCompile "$junit"
testRuntime "$slf4jlog4j" testRuntime "$slf4jlog4j"
} }
@ -428,7 +433,7 @@ project(':tools') {
compile 'com.fasterxml.jackson.core:jackson-databind:2.5.4' compile 'com.fasterxml.jackson.core:jackson-databind:2.5.4'
compile "$slf4jlog4j" compile "$slf4jlog4j"
testCompile 'junit:junit:4.6' testCompile "$junit"
testCompile project(path: ':clients', configuration: 'archives') testCompile project(path: ':clients', configuration: 'archives')
} }
@ -476,7 +481,7 @@ project(':log4j-appender') {
compile project(':clients') compile project(':clients')
compile "$slf4jlog4j" compile "$slf4jlog4j"
testCompile 'junit:junit:4.6' testCompile "$junit"
testCompile project(path: ':clients', configuration: 'archives') testCompile project(path: ':clients', configuration: 'archives')
} }
@ -508,9 +513,9 @@ project(':copycat-data') {
dependencies { dependencies {
compile project(':clients') compile project(':clients')
compile "org.slf4j:slf4j-api:1.7.6" compile "$slf4japi"
testCompile 'junit:junit:4.6' testCompile "$junit"
testRuntime "$slf4jlog4j" testRuntime "$slf4jlog4j"
} }
@ -551,9 +556,9 @@ project(':copycat-api') {
dependencies { dependencies {
compile project(':copycat-data') compile project(':copycat-data')
compile "org.slf4j:slf4j-api:1.7.6" compile "$slf4japi"
testCompile 'junit:junit:4.6' testCompile "$junit"
testRuntime "$slf4jlog4j" testRuntime "$slf4jlog4j"
} }
@ -593,13 +598,13 @@ project(':copycat-json') {
dependencies { dependencies {
compile project(':copycat-api') compile project(':copycat-api')
compile "org.slf4j:slf4j-api:1.7.6" compile "$slf4japi"
compile 'com.fasterxml.jackson.core:jackson-databind:2.5.4' compile 'com.fasterxml.jackson.core:jackson-databind:2.5.4'
testCompile 'junit:junit:4.6' testCompile "$junit"
testCompile 'org.easymock:easymock:3.3.1' testCompile "$easymock"
testCompile 'org.powermock:powermock-module-junit4:1.6.2' testCompile "$powermock"
testCompile 'org.powermock:powermock-api-easymock:1.6.2' testCompile "$powermock_easymock"
testRuntime "$slf4jlog4j" testRuntime "$slf4jlog4j"
} }
@ -652,12 +657,12 @@ project(':copycat-runtime') {
dependencies { dependencies {
compile project(':copycat-api') compile project(':copycat-api')
compile project(':clients') compile project(':clients')
compile "org.slf4j:slf4j-api:1.7.6" compile "$slf4japi"
testCompile 'junit:junit:4.6' testCompile "$junit"
testCompile 'org.easymock:easymock:3.3.1' testCompile "$easymock"
testCompile 'org.powermock:powermock-module-junit4:1.6.2' testCompile "$powermock"
testCompile 'org.powermock:powermock-api-easymock:1.6.2' testCompile "$powermock_easymock"
testRuntime "$slf4jlog4j" testRuntime "$slf4jlog4j"
testRuntime project(":copycat-json") testRuntime project(":copycat-json")
} }
@ -698,12 +703,12 @@ project(':copycat-file') {
dependencies { dependencies {
compile project(':copycat-api') compile project(':copycat-api')
compile "org.slf4j:slf4j-api:1.7.6" compile "$slf4japi"
testCompile 'junit:junit:4.6' testCompile "$junit"
testCompile 'org.easymock:easymock:3.3.1' testCompile "$easymock"
testCompile 'org.powermock:powermock-module-junit4:1.6.2' testCompile "$powermock"
testCompile 'org.powermock:powermock-api-easymock:1.6.2' testCompile "$powermock_easymock"
testRuntime "$slf4jlog4j" testRuntime "$slf4jlog4j"
} }

View File

@ -66,7 +66,8 @@ public abstract class Connector {
* </p> * </p>
* *
* @param ctx context object used to interact with the Copycat runtime * @param ctx context object used to interact with the Copycat runtime
* @param taskConfigs * @param taskConfigs existing task configurations, which may be used when generating new task configs to avoid
* churn in partition to task assignments
*/ */
public void initialize(ConnectorContext ctx, List<Properties> taskConfigs) { public void initialize(ConnectorContext ctx, List<Properties> taskConfigs) {
context = ctx; context = ctx;

View File

@ -26,5 +26,5 @@ public interface ConnectorContext {
* indicate to the runtime that something about the input/output has changed (e.g. streams * indicate to the runtime that something about the input/output has changed (e.g. streams
* added/removed) and the running Tasks will need to be modified. * added/removed) and the running Tasks will need to be modified.
*/ */
public void requestTaskReconfiguration(); void requestTaskReconfiguration();
} }

View File

@ -22,15 +22,10 @@ package org.apache.kafka.copycat.connector;
* Base class for records containing data to be copied to/from Kafka. This corresponds closely to * Base class for records containing data to be copied to/from Kafka. This corresponds closely to
* Kafka's ProducerRecord and ConsumerRecord classes, and holds the data that may be used by both * Kafka's ProducerRecord and ConsumerRecord classes, and holds the data that may be used by both
* sources and sinks (topic, partition, key, value). Although both implementations include a * sources and sinks (topic, partition, key, value). Although both implementations include a
* notion of offest, it is not included here because they differ in type. * notion of offset, it is not included here because they differ in type.
* </p>
* <p>
* This class uses type parameters for keys and values. These are provided primarily for
* connector developer convenience. Internally, Copycat will handle any primitive types or
* org.apache.kafka.copycat.data types.
* </p> * </p>
*/ */
public class CopycatRecord { public abstract class CopycatRecord {
private final String topic; private final String topic;
private final Integer partition; private final Integer partition;
private final Object key; private final Object key;

View File

@ -18,9 +18,6 @@
package org.apache.kafka.copycat.errors; package org.apache.kafka.copycat.errors;
public class CopycatRuntimeException extends RuntimeException { public class CopycatRuntimeException extends RuntimeException {
public CopycatRuntimeException() {
}
public CopycatRuntimeException(String s) { public CopycatRuntimeException(String s) {
super(s); super(s);
} }

View File

@ -29,7 +29,7 @@ public abstract class SinkTaskContext {
private Map<TopicPartition, Long> offsets; private Map<TopicPartition, Long> offsets;
public SinkTaskContext() { public SinkTaskContext() {
offsets = new HashMap<TopicPartition, Long>(); offsets = new HashMap<>();
} }
public void resetOffset(Map<TopicPartition, Long> offsets) { public void resetOffset(Map<TopicPartition, Long> offsets) {

View File

@ -33,7 +33,7 @@ public interface OffsetStorageReader {
* @param stream object uniquely identifying the stream of data * @param stream object uniquely identifying the stream of data
* @return object uniquely identifying the offset in the stream of data * @return object uniquely identifying the offset in the stream of data
*/ */
public Object getOffset(Object stream); Object getOffset(Object stream);
/** /**
* <p> * <p>
@ -52,5 +52,5 @@ public interface OffsetStorageReader {
* @param streams set of identifiers for streams of data * @param streams set of identifiers for streams of data
* @return a map of stream identifiers to decoded offsets * @return a map of stream identifiers to decoded offsets
*/ */
public Map<Object, Object> getOffsets(Collection<Object> streams); Map<Object, Object> getOffsets(Collection<Object> streams);
} }

View File

@ -26,10 +26,10 @@ import java.util.List;
*/ */
public class ConnectorUtils { public class ConnectorUtils {
/** /**
* Given a set of elements and a target number of groups, generates an set of groups of * Given a list of elements and a target number of groups, generates list of groups of
* elements to match the target number of groups, spreading them evenly among the groups. * elements to match the target number of groups, spreading them evenly among the groups.
* This generates groups with contiguous elements, which results in intuitive ordering if * This generates groups with contiguous elements, which results in intuitive ordering if
* your elements are also ordered (e.g. alphabetical sets of table names if you sort * your elements are also ordered (e.g. alphabetical lists of table names if you sort
* table names alphabetically to generate the raw partitions) or can result in efficient * table names alphabetically to generate the raw partitions) or can result in efficient
* partitioning if elements are sorted according to some criteria that affects performance * partitioning if elements are sorted according to some criteria that affects performance
* (e.g. topic partitions with the same leader). * (e.g. topic partitions with the same leader).
@ -42,7 +42,7 @@ public class ConnectorUtils {
throw new IllegalArgumentException("Number of groups must be positive."); throw new IllegalArgumentException("Number of groups must be positive.");
} }
List<List<T>> result = new ArrayList<List<T>>(numGroups); List<List<T>> result = new ArrayList<>(numGroups);
// Each group has either n+1 or n raw partitions // Each group has either n+1 or n raw partitions
int perGroup = elements.size() / numGroups; int perGroup = elements.size() / numGroups;
@ -51,7 +51,7 @@ public class ConnectorUtils {
int assigned = 0; int assigned = 0;
for (int group = 0; group < numGroups; group++) { for (int group = 0; group < numGroups; group++) {
int numThisGroup = group < leftover ? perGroup + 1 : perGroup; int numThisGroup = group < leftover ? perGroup + 1 : perGroup;
List<T> groupList = new ArrayList<T>(numThisGroup); List<T> groupList = new ArrayList<>(numThisGroup);
for (int i = 0; i < numThisGroup; i++) { for (int i = 0; i < numThisGroup; i++) {
groupList.add(elements.get(assigned)); groupList.add(elements.get(assigned));
assigned++; assigned++;

View File

@ -19,6 +19,19 @@ package org.apache.kafka.copycat.data;
public class BinaryData { public class BinaryData {
/**
* Compares regions of two byte arrays, returning a negative integer, zero, or positive integer when the first byte
* array region is less than, equal to, or greater than the second byte array region, respectively.
*
* @param b1 first byte array
* @param s1 start of region in first byte array
* @param l1 length of region in first byte array
* @param b2 second byte array
* @param s2 start of region in second byte array
* @param l2 length of region in second byte array
* @return a negative integer, zero, or a positive integer as the first byte array is less than, equal to, or greater
* than the second byte array
*/
public static int compareBytes(byte[] b1, int s1, int l1, public static int compareBytes(byte[] b1, int s1, int l1,
byte[] b2, int s2, int l2) { byte[] b2, int s2, int l2) {
int end1 = s1 + l1; int end1 = s1 + l1;
@ -26,9 +39,8 @@ public class BinaryData {
for (int i = s1, j = s2; i < end1 && j < end2; i++, j++) { for (int i = s1, j = s2; i < end1 && j < end2; i++, j++) {
int a = b1[i] & 0xff; int a = b1[i] & 0xff;
int b = b2[j] & 0xff; int b = b2[j] & 0xff;
if (a != b) { if (a != b)
return a - b; return a - b;
}
} }
return l1 - l2; return l1 - l2;
} }

View File

@ -46,7 +46,7 @@ public class FileStreamSinkConnector extends SinkConnector {
@Override @Override
public List<Properties> getTaskConfigs(int maxTasks) { public List<Properties> getTaskConfigs(int maxTasks) {
ArrayList<Properties> configs = new ArrayList<Properties>(); ArrayList<Properties> configs = new ArrayList<>();
for (int i = 0; i < maxTasks; i++) { for (int i = 0; i < maxTasks; i++) {
Properties config = new Properties(); Properties config = new Properties();
if (filename != null) { if (filename != null) {

View File

@ -56,7 +56,7 @@ public class FileStreamSourceConnector extends SourceConnector {
@Override @Override
public List<Properties> getTaskConfigs(int maxTasks) { public List<Properties> getTaskConfigs(int maxTasks) {
ArrayList<Properties> configs = new ArrayList<Properties>(); ArrayList<Properties> configs = new ArrayList<>();
// Only one input stream makes sense. // Only one input stream makes sense.
Properties config = new Properties(); Properties config = new Properties();
if (filename != null) { if (filename != null) {

View File

@ -113,7 +113,7 @@ public class FileStreamSourceTask extends SourceTask<Object, Object> {
line = extractLine(); line = extractLine();
if (line != null) { if (line != null) {
if (records == null) { if (records == null) {
records = new ArrayList<SourceRecord>(); records = new ArrayList<>();
} }
records.add(new SourceRecord(null, streamOffset, topic, line)); records.add(new SourceRecord(null, streamOffset, topic, line));
} }

View File

@ -27,7 +27,6 @@ import java.io.ByteArrayOutputStream;
import java.io.PrintStream; import java.io.PrintStream;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashMap; import java.util.HashMap;
import java.util.Properties;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
@ -36,19 +35,17 @@ public class FileStreamSinkTaskTest {
private FileStreamSinkTask task; private FileStreamSinkTask task;
private ByteArrayOutputStream os; private ByteArrayOutputStream os;
private PrintStream printStream; private PrintStream printStream;
private Properties config;
@Before @Before
public void setup() { public void setup() {
os = new ByteArrayOutputStream(); os = new ByteArrayOutputStream();
printStream = new PrintStream(os); printStream = new PrintStream(os);
task = new FileStreamSinkTask(printStream); task = new FileStreamSinkTask(printStream);
config = new Properties();
} }
@Test @Test
public void testPutFlush() throws CopycatException { public void testPutFlush() throws CopycatException {
HashMap<TopicPartition, Long> offsets = new HashMap<TopicPartition, Long>(); HashMap<TopicPartition, Long> offsets = new HashMap<>();
// We do not call task.start() since it would override the output stream // We do not call task.start() since it would override the output stream

View File

@ -38,7 +38,7 @@ public class WorkerConfig extends AbstractConfig {
public static final String CLUSTER_DEFAULT = "copycat"; public static final String CLUSTER_DEFAULT = "copycat";
public static final String BOOTSTRAP_SERVERS_CONFIG = "bootstrap.servers"; public static final String BOOTSTRAP_SERVERS_CONFIG = "bootstrap.servers";
public static final String BOOSTRAP_SERVERS_DOC public static final String BOOTSTRAP_SERVERS_DOC
= "A list of host/port pairs to use for establishing the initial connection to the Kafka " = "A list of host/port pairs to use for establishing the initial connection to the Kafka "
+ "cluster. The client will make use of all servers irrespective of which servers are " + "cluster. The client will make use of all servers irrespective of which servers are "
+ "specified here for bootstrapping&mdash;this list only impacts the initial hosts used " + "specified here for bootstrapping&mdash;this list only impacts the initial hosts used "
@ -100,7 +100,7 @@ public class WorkerConfig extends AbstractConfig {
config = new ConfigDef() config = new ConfigDef()
.define(CLUSTER_CONFIG, Type.STRING, CLUSTER_DEFAULT, Importance.HIGH, CLUSTER_CONFIG_DOC) .define(CLUSTER_CONFIG, Type.STRING, CLUSTER_DEFAULT, Importance.HIGH, CLUSTER_CONFIG_DOC)
.define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, BOOTSTRAP_SERVERS_DEFAULT, .define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, BOOTSTRAP_SERVERS_DEFAULT,
Importance.HIGH, BOOSTRAP_SERVERS_DOC) Importance.HIGH, BOOTSTRAP_SERVERS_DOC)
.define(CONVERTER_CLASS_CONFIG, Type.CLASS, .define(CONVERTER_CLASS_CONFIG, Type.CLASS,
Importance.HIGH, CONVERTER_CLASS_DOC) Importance.HIGH, CONVERTER_CLASS_DOC)
.define(KEY_SERIALIZER_CLASS_CONFIG, Type.CLASS, .define(KEY_SERIALIZER_CLASS_CONFIG, Type.CLASS,

View File

@ -50,8 +50,7 @@ public class SourceTaskOffsetCommitter {
private Time time; private Time time;
private WorkerConfig config; private WorkerConfig config;
private ScheduledExecutorService commitExecutorService = null; private ScheduledExecutorService commitExecutorService = null;
private HashMap<ConnectorTaskId, ScheduledFuture<?>> commiters private HashMap<ConnectorTaskId, ScheduledFuture<?>> committers = new HashMap<>();
= new HashMap<ConnectorTaskId, ScheduledFuture<?>>();
SourceTaskOffsetCommitter(Time time, WorkerConfig config) { SourceTaskOffsetCommitter(Time time, WorkerConfig config) {
this.time = time; this.time = time;
@ -78,11 +77,11 @@ public class SourceTaskOffsetCommitter {
commit(workerTask); commit(workerTask);
} }
}, commitIntervalMs, commitIntervalMs, TimeUnit.MILLISECONDS); }, commitIntervalMs, commitIntervalMs, TimeUnit.MILLISECONDS);
commiters.put(id, commitFuture); committers.put(id, commitFuture);
} }
public void remove(ConnectorTaskId id) { public void remove(ConnectorTaskId id) {
ScheduledFuture<?> commitFuture = commiters.remove(id); ScheduledFuture<?> commitFuture = committers.remove(id);
commitFuture.cancel(false); commitFuture.cancel(false);
} }

View File

@ -60,7 +60,7 @@ public class Worker {
private Serializer offsetValueSerializer; private Serializer offsetValueSerializer;
private Deserializer offsetKeyDeserializer; private Deserializer offsetKeyDeserializer;
private Deserializer offsetValueDeserializer; private Deserializer offsetValueDeserializer;
private HashMap<ConnectorTaskId, WorkerTask> tasks = new HashMap<ConnectorTaskId, WorkerTask>(); private HashMap<ConnectorTaskId, WorkerTask> tasks = new HashMap<>();
private KafkaProducer producer; private KafkaProducer producer;
private SourceTaskOffsetCommitter sourceTaskOffsetCommitter; private SourceTaskOffsetCommitter sourceTaskOffsetCommitter;
@ -119,14 +119,14 @@ public class Worker {
Properties unusedConfigs = config.getUnusedProperties(); Properties unusedConfigs = config.getUnusedProperties();
Map<String, Object> producerProps = new HashMap<String, Object>(); Map<String, Object> producerProps = new HashMap<>();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Utils.join(config.getList(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG), ",")); producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Utils.join(config.getList(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG), ","));
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, config.getClass(WorkerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName()); producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, config.getClass(WorkerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName());
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, config.getClass(WorkerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName()); producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, config.getClass(WorkerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName());
for (String propName : unusedConfigs.stringPropertyNames()) { for (String propName : unusedConfigs.stringPropertyNames()) {
producerProps.put(propName, unusedConfigs.getProperty(propName)); producerProps.put(propName, unusedConfigs.getProperty(propName));
} }
producer = new KafkaProducer<Object, Object>(producerProps); producer = new KafkaProducer<>(producerProps);
offsetBackingStore.start(); offsetBackingStore.start();
sourceTaskOffsetCommitter = new SourceTaskOffsetCommitter(time, config); sourceTaskOffsetCommitter = new SourceTaskOffsetCommitter(time, config);
@ -232,7 +232,7 @@ public class Worker {
WorkerTask task = tasks.get(id); WorkerTask task = tasks.get(id);
if (task == null) { if (task == null) {
log.error("Task not found: " + id); log.error("Task not found: " + id);
throw new CopycatRuntimeException(); throw new CopycatRuntimeException("Task not found: " + id);
} }
return task; return task;
} }

View File

@ -121,7 +121,7 @@ public class WorkerSinkTask implements WorkerTask {
* the write commit. This should only be invoked by the WorkerSinkTaskThread. * the write commit. This should only be invoked by the WorkerSinkTaskThread.
**/ **/
public void commitOffsets(long now, boolean sync, final int seqno, boolean flush) { public void commitOffsets(long now, boolean sync, final int seqno, boolean flush) {
HashMap<TopicPartition, Long> offsets = new HashMap<TopicPartition, Long>(); HashMap<TopicPartition, Long> offsets = new HashMap<>();
for (TopicPartition tp : consumer.subscriptions()) { for (TopicPartition tp : consumer.subscriptions()) {
offsets.put(tp, consumer.position(tp)); offsets.put(tp, consumer.position(tp));
} }
@ -176,7 +176,7 @@ public class WorkerSinkTask implements WorkerTask {
KafkaConsumer<Object, Object> newConsumer; KafkaConsumer<Object, Object> newConsumer;
try { try {
newConsumer = new KafkaConsumer<Object, Object>(props); newConsumer = new KafkaConsumer<>(props);
} catch (Throwable t) { } catch (Throwable t) {
throw new CopycatRuntimeException("Failed to create consumer", t); throw new CopycatRuntimeException("Failed to create consumer", t);
} }
@ -206,7 +206,7 @@ public class WorkerSinkTask implements WorkerTask {
private void deliverMessages(ConsumerRecords<Object, Object> msgs) { private void deliverMessages(ConsumerRecords<Object, Object> msgs) {
// Finally, deliver this batch to the sink // Finally, deliver this batch to the sink
if (msgs.count() > 0) { if (msgs.count() > 0) {
List<SinkRecord> records = new ArrayList<SinkRecord>(); List<SinkRecord> records = new ArrayList<>();
for (ConsumerRecord<Object, Object> msg : msgs) { for (ConsumerRecord<Object, Object> msg : msgs) {
log.trace("Consuming message with key {}, value {}", msg.key(), msg.value()); log.trace("Consuming message with key {}, value {}", msg.key(), msg.value());
records.add( records.add(

View File

@ -82,10 +82,8 @@ public class WorkerSourceTask implements WorkerTask {
this.workerConfig = workerConfig; this.workerConfig = workerConfig;
this.time = time; this.time = time;
this.outstandingMessages this.outstandingMessages = new IdentityHashMap<>();
= new IdentityHashMap<ProducerRecord<Object, Object>, ProducerRecord<Object, Object>>(); this.outstandingMessagesBacklog = new IdentityHashMap<>();
this.outstandingMessagesBacklog
= new IdentityHashMap<ProducerRecord<Object, Object>, ProducerRecord<Object, Object>>();
this.flushing = false; this.flushing = false;
} }
@ -135,7 +133,7 @@ public class WorkerSourceTask implements WorkerTask {
private synchronized void sendRecords(List<SourceRecord> records) { private synchronized void sendRecords(List<SourceRecord> records) {
for (SourceRecord record : records) { for (SourceRecord record : records) {
final ProducerRecord<Object, Object> producerRecord final ProducerRecord<Object, Object> producerRecord
= new ProducerRecord<Object, Object>(record.getTopic(), record.getPartition(), = new ProducerRecord<>(record.getTopic(), record.getPartition(),
converter.fromCopycatData(record.getKey()), converter.fromCopycatData(record.getKey()),
converter.fromCopycatData(record.getValue())); converter.fromCopycatData(record.getValue()));
log.trace("Appending record with key {}, value {}", record.getKey(), record.getValue()); log.trace("Appending record with key {}, value {}", record.getKey(), record.getValue());

View File

@ -40,7 +40,7 @@ public class FileConfigStorage implements ConfigStorage {
public static final String FILE_DEFAULT = "configs.db"; public static final String FILE_DEFAULT = "configs.db";
private String filename; private String filename;
private Map<String, Properties> connectorConfig = new HashMap<String, Properties>(); private Map<String, Properties> connectorConfig = new HashMap<>();
@Override @Override
public void configure(Properties props) { public void configure(Properties props) {
@ -101,9 +101,7 @@ public class FileConfigStorage implements ConfigStorage {
connectorConfig = (Map<String, Properties>) is.readObject(); connectorConfig = (Map<String, Properties>) is.readObject();
} catch (FileNotFoundException e) { } catch (FileNotFoundException e) {
// Expected on first run // Expected on first run
} catch (IOException e) { } catch (IOException | ClassNotFoundException e) {
throw new CopycatRuntimeException("Failed to load config data", e);
} catch (ClassNotFoundException e) {
throw new CopycatRuntimeException("Failed to load config data", e); throw new CopycatRuntimeException("Failed to load config data", e);
} }
} }

View File

@ -43,7 +43,7 @@ public class StandaloneCoordinator implements Coordinator {
private Worker worker; private Worker worker;
private Properties configs; private Properties configs;
private ConfigStorage configStorage; private ConfigStorage configStorage;
private HashMap<String, ConnectorState> connectors = new HashMap<String, ConnectorState>(); private HashMap<String, ConnectorState> connectors = new HashMap<>();
public StandaloneCoordinator(Worker worker, Properties props) { public StandaloneCoordinator(Worker worker, Properties props) {
this.worker = worker; this.worker = worker;
@ -196,7 +196,7 @@ public class StandaloneCoordinator implements Coordinator {
List<Properties> taskConfigs = state.connector.getTaskConfigs(state.maxTasks); List<Properties> taskConfigs = state.connector.getTaskConfigs(state.maxTasks);
// Generate the final configs, including framework provided settings // Generate the final configs, including framework provided settings
Map<ConnectorTaskId, Properties> taskProps = new HashMap<ConnectorTaskId, Properties>(); Map<ConnectorTaskId, Properties> taskProps = new HashMap<>();
for (int i = 0; i < taskConfigs.size(); i++) { for (int i = 0; i < taskConfigs.size(); i++) {
ConnectorTaskId taskId = new ConnectorTaskId(state.name, i); ConnectorTaskId taskId = new ConnectorTaskId(state.name, i);
Properties config = taskConfigs.get(i); Properties config = taskConfigs.get(i);
@ -297,7 +297,7 @@ public class StandaloneCoordinator implements Coordinator {
this.connector = connector; this.connector = connector;
this.maxTasks = maxTasks; this.maxTasks = maxTasks;
this.inputTopics = inputTopics; this.inputTopics = inputTopics;
this.tasks = new HashSet<ConnectorTaskId>(); this.tasks = new HashSet<>();
} }
} }
} }

View File

@ -67,9 +67,9 @@ public class FileOffsetBackingStore extends MemoryOffsetBackingStore {
ObjectInputStream is = new ObjectInputStream(new FileInputStream(file)); ObjectInputStream is = new ObjectInputStream(new FileInputStream(file));
HashMap<String, Map<byte[], byte[]>> raw HashMap<String, Map<byte[], byte[]>> raw
= (HashMap<String, Map<byte[], byte[]>>) is.readObject(); = (HashMap<String, Map<byte[], byte[]>>) is.readObject();
data = new HashMap<String, Map<ByteBuffer, ByteBuffer>>(); data = new HashMap<>();
for (Map.Entry<String, Map<byte[], byte[]>> entry : raw.entrySet()) { for (Map.Entry<String, Map<byte[], byte[]>> entry : raw.entrySet()) {
HashMap<ByteBuffer, ByteBuffer> converted = new HashMap<ByteBuffer, ByteBuffer>(); HashMap<ByteBuffer, ByteBuffer> converted = new HashMap<>();
for (Map.Entry<byte[], byte[]> mapEntry : entry.getValue().entrySet()) { for (Map.Entry<byte[], byte[]> mapEntry : entry.getValue().entrySet()) {
ByteBuffer key = (mapEntry.getKey() != null) ? ByteBuffer.wrap(mapEntry.getKey()) : null; ByteBuffer key = (mapEntry.getKey() != null) ? ByteBuffer.wrap(mapEntry.getKey()) : null;
ByteBuffer value = (mapEntry.getValue() != null) ? ByteBuffer.wrap(mapEntry.getValue()) : ByteBuffer value = (mapEntry.getValue() != null) ? ByteBuffer.wrap(mapEntry.getValue()) :
@ -79,13 +79,10 @@ public class FileOffsetBackingStore extends MemoryOffsetBackingStore {
data.put(entry.getKey(), converted); data.put(entry.getKey(), converted);
} }
is.close(); is.close();
} catch (FileNotFoundException e) { } catch (FileNotFoundException | EOFException e) {
// Ignore, may be new // FileNotFoundException: Ignore, may be new.
} catch (EOFException e) { // EOFException: Ignore, this means the file was missing or corrupt
// Ignore, this means the file was missing or corrupt } catch (IOException | ClassNotFoundException e) {
} catch (IOException e) {
throw new CopycatRuntimeException(e);
} catch (ClassNotFoundException e) {
throw new CopycatRuntimeException(e); throw new CopycatRuntimeException(e);
} }
} }
@ -93,9 +90,9 @@ public class FileOffsetBackingStore extends MemoryOffsetBackingStore {
protected void save() { protected void save() {
try { try {
ObjectOutputStream os = new ObjectOutputStream(new FileOutputStream(file)); ObjectOutputStream os = new ObjectOutputStream(new FileOutputStream(file));
HashMap<String, Map<byte[], byte[]>> raw = new HashMap<String, Map<byte[], byte[]>>(); HashMap<String, Map<byte[], byte[]>> raw = new HashMap<>();
for (Map.Entry<String, Map<ByteBuffer, ByteBuffer>> entry : data.entrySet()) { for (Map.Entry<String, Map<ByteBuffer, ByteBuffer>> entry : data.entrySet()) {
HashMap<byte[], byte[]> converted = new HashMap<byte[], byte[]>(); HashMap<byte[], byte[]> converted = new HashMap<>();
for (Map.Entry<ByteBuffer, ByteBuffer> mapEntry : entry.getValue().entrySet()) { for (Map.Entry<ByteBuffer, ByteBuffer> mapEntry : entry.getValue().entrySet()) {
byte[] key = (mapEntry.getKey() != null) ? mapEntry.getKey().array() : null; byte[] key = (mapEntry.getKey() != null) ? mapEntry.getKey().array() : null;
byte[] value = (mapEntry.getValue() != null) ? mapEntry.getValue().array() : null; byte[] value = (mapEntry.getValue() != null) ? mapEntry.getValue().array() : null;

View File

@ -39,7 +39,7 @@ import java.util.concurrent.Future;
public class MemoryOffsetBackingStore implements OffsetBackingStore { public class MemoryOffsetBackingStore implements OffsetBackingStore {
private static final Logger log = LoggerFactory.getLogger(MemoryOffsetBackingStore.class); private static final Logger log = LoggerFactory.getLogger(MemoryOffsetBackingStore.class);
protected HashMap<String, Map<ByteBuffer, ByteBuffer>> data = new HashMap<String, Map<ByteBuffer, ByteBuffer>>(); protected HashMap<String, Map<ByteBuffer, ByteBuffer>> data = new HashMap<>();
protected ExecutorService executor = Executors.newSingleThreadExecutor(); protected ExecutorService executor = Executors.newSingleThreadExecutor();
public MemoryOffsetBackingStore() { public MemoryOffsetBackingStore() {
@ -66,7 +66,7 @@ public class MemoryOffsetBackingStore implements OffsetBackingStore {
return executor.submit(new Callable<Map<ByteBuffer, ByteBuffer>>() { return executor.submit(new Callable<Map<ByteBuffer, ByteBuffer>>() {
@Override @Override
public Map<ByteBuffer, ByteBuffer> call() throws Exception { public Map<ByteBuffer, ByteBuffer> call() throws Exception {
Map<ByteBuffer, ByteBuffer> result = new HashMap<ByteBuffer, ByteBuffer>(); Map<ByteBuffer, ByteBuffer> result = new HashMap<>();
synchronized (MemoryOffsetBackingStore.this) { synchronized (MemoryOffsetBackingStore.this) {
Map<ByteBuffer, ByteBuffer> namespaceData = data.get(namespace); Map<ByteBuffer, ByteBuffer> namespaceData = data.get(namespace);
if (namespaceData == null) { if (namespaceData == null) {
@ -94,7 +94,7 @@ public class MemoryOffsetBackingStore implements OffsetBackingStore {
synchronized (MemoryOffsetBackingStore.this) { synchronized (MemoryOffsetBackingStore.this) {
Map<ByteBuffer, ByteBuffer> namespaceData = data.get(namespace); Map<ByteBuffer, ByteBuffer> namespaceData = data.get(namespace);
if (namespaceData == null) { if (namespaceData == null) {
namespaceData = new HashMap<ByteBuffer, ByteBuffer>(); namespaceData = new HashMap<>();
data.put(namespace, namespaceData); data.put(namespace, namespaceData);
} }
for (Map.Entry<ByteBuffer, ByteBuffer> entry : values.entrySet()) { for (Map.Entry<ByteBuffer, ByteBuffer> entry : values.entrySet()) {

View File

@ -62,7 +62,7 @@ public class OffsetStorageReaderImpl implements OffsetStorageReader {
@Override @Override
public Map<Object, Object> getOffsets(Collection<Object> streams) { public Map<Object, Object> getOffsets(Collection<Object> streams) {
// Serialize keys so backing store can work with them // Serialize keys so backing store can work with them
Map<ByteBuffer, Object> serializedToOriginal = new HashMap<ByteBuffer, Object>(streams.size()); Map<ByteBuffer, Object> serializedToOriginal = new HashMap<>(streams.size());
for (Object key : streams) { for (Object key : streams) {
try { try {
byte[] keySerialized = keySerializer.serialize(namespace, converter.fromCopycatData(key)); byte[] keySerialized = keySerializer.serialize(namespace, converter.fromCopycatData(key));
@ -85,7 +85,7 @@ public class OffsetStorageReaderImpl implements OffsetStorageReader {
} }
// Deserialize all the values and map back to the original keys // Deserialize all the values and map back to the original keys
Map<Object, Object> result = new HashMap<Object, Object>(streams.size()); Map<Object, Object> result = new HashMap<>(streams.size());
for (Map.Entry<ByteBuffer, ByteBuffer> rawEntry : raw.entrySet()) { for (Map.Entry<ByteBuffer, ByteBuffer> rawEntry : raw.entrySet()) {
try { try {
// Since null could be a valid key, explicitly check whether map contains the key // Since null could be a valid key, explicitly check whether map contains the key

View File

@ -71,7 +71,7 @@ public class OffsetStorageWriter {
private final Serializer keySerializer; private final Serializer keySerializer;
private final Serializer valueSerializer; private final Serializer valueSerializer;
private final String namespace; private final String namespace;
private Map<Object, Object> data = new HashMap<Object, Object>(); private Map<Object, Object> data = new HashMap<>();
// Not synchronized, should only be accessed by flush thread // Not synchronized, should only be accessed by flush thread
private Map<Object, Object> toFlush = null; private Map<Object, Object> toFlush = null;
@ -115,7 +115,7 @@ public class OffsetStorageWriter {
assert !flushing(); assert !flushing();
toFlush = data; toFlush = data;
data = new HashMap<Object, Object>(); data = new HashMap<>();
return true; return true;
} }
@ -133,7 +133,7 @@ public class OffsetStorageWriter {
// Serialize // Serialize
Map<ByteBuffer, ByteBuffer> offsetsSerialized; Map<ByteBuffer, ByteBuffer> offsetsSerialized;
try { try {
offsetsSerialized = new HashMap<ByteBuffer, ByteBuffer>(); offsetsSerialized = new HashMap<>();
for (Map.Entry<Object, Object> entry : toFlush.entrySet()) { for (Map.Entry<Object, Object> entry : toFlush.entrySet()) {
byte[] key = keySerializer.serialize(namespace, converter.fromCopycatData(entry.getKey())); byte[] key = keySerializer.serialize(namespace, converter.fromCopycatData(entry.getKey()));
ByteBuffer keyBuffer = (key != null) ? ByteBuffer.wrap(key) : null; ByteBuffer keyBuffer = (key != null) ? ByteBuffer.wrap(key) : null;

View File

@ -81,10 +81,7 @@ public abstract class ShutdownableThread extends Thread {
public void run() { public void run() {
try { try {
execute(); execute();
} catch (Error e) { } catch (Error | RuntimeException e) {
log.error("Thread {} exiting with uncaught exception: ", getName(), e);
throw e;
} catch (RuntimeException e) {
log.error("Thread {} exiting with uncaught exception: ", getName(), e); log.error("Thread {} exiting with uncaught exception: ", getName(), e);
throw e; throw e;
} finally { } finally {

View File

@ -133,7 +133,7 @@ public class WorkerSinkTaskTest extends ThreadedTest {
GenericRecord record = new GenericRecordBuilder(schema).build(); GenericRecord record = new GenericRecordBuilder(schema).build();
byte[] rawKey = "key".getBytes(), rawValue = "value".getBytes(); byte[] rawKey = "key".getBytes(), rawValue = "value".getBytes();
ConsumerRecords<Object, Object> records = new ConsumerRecords<Object, Object>( ConsumerRecords<Object, Object> records = new ConsumerRecords<>(
Collections.singletonMap( Collections.singletonMap(
new TopicPartition("topic", 0), new TopicPartition("topic", 0),
Collections.singletonList( Collections.singletonList(
@ -309,7 +309,7 @@ public class WorkerSinkTaskTest extends ThreadedTest {
public ConsumerRecords<Object, Object> answer() throws Throwable { public ConsumerRecords<Object, Object> answer() throws Throwable {
// "Sleep" so time will progress // "Sleep" so time will progress
time.sleep(pollDelayMs); time.sleep(pollDelayMs);
ConsumerRecords<Object, Object> records = new ConsumerRecords<Object, Object>( ConsumerRecords<Object, Object> records = new ConsumerRecords<>(
Collections.singletonMap(new TopicPartition(TOPIC, PARTITION), Arrays.asList( Collections.singletonMap(new TopicPartition(TOPIC, PARTITION), Arrays.asList(
new ConsumerRecord<Object, Object>(TOPIC, PARTITION, new ConsumerRecord<Object, Object>(TOPIC, PARTITION,
FIRST_OFFSET + recordsReturned, KEY, FIRST_OFFSET + recordsReturned, KEY,

View File

@ -196,7 +196,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
public void testSendRecordsConvertsData() throws Exception { public void testSendRecordsConvertsData() throws Exception {
createWorkerTask(); createWorkerTask();
List<SourceRecord> records = new ArrayList<SourceRecord>(); List<SourceRecord> records = new ArrayList<>();
// Can just use the same record for key and value // Can just use the same record for key and value
records.add(new SourceRecord(STREAM_BYTES, OFFSET_BYTES, "topic", null, RECORD, RECORD)); records.add(new SourceRecord(STREAM_BYTES, OFFSET_BYTES, "topic", null, RECORD, RECORD));

View File

@ -85,7 +85,7 @@ public class StandaloneCoordinatorTest extends StandaloneCoordinatorTestBase {
PowerMock.replayAll(); PowerMock.replayAll();
coordinator.addConnector(connectorProps, createCallback); coordinator.addConnector(connectorProps, createCallback);
FutureCallback<Void> futureCb = new FutureCallback<Void>(new Callback() { FutureCallback<Void> futureCb = new FutureCallback<>(new Callback() {
@Override @Override
public void onCompletion(Throwable error, Object result) { public void onCompletion(Throwable error, Object result) {

View File

@ -40,7 +40,7 @@ public class FileOffsetBackingStoreTest {
Properties props; Properties props;
File tempFile; File tempFile;
private static Map<ByteBuffer, ByteBuffer> firstSet = new HashMap<ByteBuffer, ByteBuffer>(); private static Map<ByteBuffer, ByteBuffer> firstSet = new HashMap<>();
static { static {
firstSet.put(buffer("key"), buffer("value")); firstSet.put(buffer("key"), buffer("value"));