Address review comments, clean up some code styling.

This commit is contained in:
Ewen Cheslack-Postava 2015-07-30 21:18:54 -07:00
parent 25b5739ff0
commit 4674d136e1
29 changed files with 103 additions and 107 deletions

View File

@ -28,6 +28,11 @@ buildscript {
}
def slf4jlog4j='org.slf4j:slf4j-log4j12:1.7.6'
def slf4japi="org.slf4j:slf4j-api:1.7.6"
def junit='junit:junit:4.6'
def easymock='org.easymock:easymock:3.3.1'
def powermock='org.powermock:powermock-module-junit4:1.6.2'
def powermock_easymock='org.powermock:powermock-api-easymock:1.6.2'
allprojects {
apply plugin: 'idea'
@ -59,7 +64,7 @@ rat {
// And some of the files that we have checked in should also be excluded from this check
excludes.addAll([
'**/.git/**',
'build/**',
'**/build/**',
'CONTRIBUTING.md',
'gradlew',
'gradlew.bat',
@ -244,8 +249,8 @@ project(':core') {
compile 'org.scala-lang.modules:scala-parser-combinators_2.11:1.0.4'
}
testCompile 'junit:junit:4.6'
testCompile 'org.easymock:easymock:3.3.1'
testCompile "$junit"
testCompile "$easymock"
testCompile 'org.objenesis:objenesis:1.2'
testCompile "org.scalatest:scalatest_$baseScalaVersion:2.2.5"
@ -376,11 +381,11 @@ project(':clients') {
archivesBaseName = "kafka-clients"
dependencies {
compile "org.slf4j:slf4j-api:1.7.6"
compile "$slf4japi"
compile 'org.xerial.snappy:snappy-java:1.1.1.7'
compile 'net.jpountz.lz4:lz4:1.2.0'
testCompile 'junit:junit:4.6'
testCompile "$junit"
testRuntime "$slf4jlog4j"
}
@ -428,7 +433,7 @@ project(':tools') {
compile 'com.fasterxml.jackson.core:jackson-databind:2.5.4'
compile "$slf4jlog4j"
testCompile 'junit:junit:4.6'
testCompile "$junit"
testCompile project(path: ':clients', configuration: 'archives')
}
@ -476,7 +481,7 @@ project(':log4j-appender') {
compile project(':clients')
compile "$slf4jlog4j"
testCompile 'junit:junit:4.6'
testCompile "$junit"
testCompile project(path: ':clients', configuration: 'archives')
}
@ -508,9 +513,9 @@ project(':copycat-data') {
dependencies {
compile project(':clients')
compile "org.slf4j:slf4j-api:1.7.6"
compile "$slf4japi"
testCompile 'junit:junit:4.6'
testCompile "$junit"
testRuntime "$slf4jlog4j"
}
@ -551,9 +556,9 @@ project(':copycat-api') {
dependencies {
compile project(':copycat-data')
compile "org.slf4j:slf4j-api:1.7.6"
compile "$slf4japi"
testCompile 'junit:junit:4.6'
testCompile "$junit"
testRuntime "$slf4jlog4j"
}
@ -593,13 +598,13 @@ project(':copycat-json') {
dependencies {
compile project(':copycat-api')
compile "org.slf4j:slf4j-api:1.7.6"
compile "$slf4japi"
compile 'com.fasterxml.jackson.core:jackson-databind:2.5.4'
testCompile 'junit:junit:4.6'
testCompile 'org.easymock:easymock:3.3.1'
testCompile 'org.powermock:powermock-module-junit4:1.6.2'
testCompile 'org.powermock:powermock-api-easymock:1.6.2'
testCompile "$junit"
testCompile "$easymock"
testCompile "$powermock"
testCompile "$powermock_easymock"
testRuntime "$slf4jlog4j"
}
@ -652,12 +657,12 @@ project(':copycat-runtime') {
dependencies {
compile project(':copycat-api')
compile project(':clients')
compile "org.slf4j:slf4j-api:1.7.6"
compile "$slf4japi"
testCompile 'junit:junit:4.6'
testCompile 'org.easymock:easymock:3.3.1'
testCompile 'org.powermock:powermock-module-junit4:1.6.2'
testCompile 'org.powermock:powermock-api-easymock:1.6.2'
testCompile "$junit"
testCompile "$easymock"
testCompile "$powermock"
testCompile "$powermock_easymock"
testRuntime "$slf4jlog4j"
testRuntime project(":copycat-json")
}
@ -698,12 +703,12 @@ project(':copycat-file') {
dependencies {
compile project(':copycat-api')
compile "org.slf4j:slf4j-api:1.7.6"
compile "$slf4japi"
testCompile 'junit:junit:4.6'
testCompile 'org.easymock:easymock:3.3.1'
testCompile 'org.powermock:powermock-module-junit4:1.6.2'
testCompile 'org.powermock:powermock-api-easymock:1.6.2'
testCompile "$junit"
testCompile "$easymock"
testCompile "$powermock"
testCompile "$powermock_easymock"
testRuntime "$slf4jlog4j"
}

View File

@ -66,7 +66,8 @@ public abstract class Connector {
* </p>
*
* @param ctx context object used to interact with the Copycat runtime
* @param taskConfigs
* @param taskConfigs existing task configurations, which may be used when generating new task configs to avoid
* churn in partition to task assignments
*/
public void initialize(ConnectorContext ctx, List<Properties> taskConfigs) {
context = ctx;

View File

@ -26,5 +26,5 @@ public interface ConnectorContext {
* indicate to the runtime that something about the input/output has changed (e.g. streams
* added/removed) and the running Tasks will need to be modified.
*/
public void requestTaskReconfiguration();
void requestTaskReconfiguration();
}

View File

@ -22,15 +22,10 @@ package org.apache.kafka.copycat.connector;
* Base class for records containing data to be copied to/from Kafka. This corresponds closely to
* Kafka's ProducerRecord and ConsumerRecord classes, and holds the data that may be used by both
* sources and sinks (topic, partition, key, value). Although both implementations include a
* notion of offest, it is not included here because they differ in type.
* </p>
* <p>
* This class uses type parameters for keys and values. These are provided primarily for
* connector developer convenience. Internally, Copycat will handle any primitive types or
* org.apache.kafka.copycat.data types.
* notion of offset, it is not included here because they differ in type.
* </p>
*/
public class CopycatRecord {
public abstract class CopycatRecord {
private final String topic;
private final Integer partition;
private final Object key;

View File

@ -18,9 +18,6 @@
package org.apache.kafka.copycat.errors;
public class CopycatRuntimeException extends RuntimeException {
public CopycatRuntimeException() {
}
public CopycatRuntimeException(String s) {
super(s);
}

View File

@ -29,7 +29,7 @@ public abstract class SinkTaskContext {
private Map<TopicPartition, Long> offsets;
public SinkTaskContext() {
offsets = new HashMap<TopicPartition, Long>();
offsets = new HashMap<>();
}
public void resetOffset(Map<TopicPartition, Long> offsets) {

View File

@ -33,7 +33,7 @@ public interface OffsetStorageReader {
* @param stream object uniquely identifying the stream of data
* @return object uniquely identifying the offset in the stream of data
*/
public Object getOffset(Object stream);
Object getOffset(Object stream);
/**
* <p>
@ -52,5 +52,5 @@ public interface OffsetStorageReader {
* @param streams set of identifiers for streams of data
* @return a map of stream identifiers to decoded offsets
*/
public Map<Object, Object> getOffsets(Collection<Object> streams);
Map<Object, Object> getOffsets(Collection<Object> streams);
}

View File

@ -26,10 +26,10 @@ import java.util.List;
*/
public class ConnectorUtils {
/**
* Given a set of elements and a target number of groups, generates an set of groups of
* Given a list of elements and a target number of groups, generates list of groups of
* elements to match the target number of groups, spreading them evenly among the groups.
* This generates groups with contiguous elements, which results in intuitive ordering if
* your elements are also ordered (e.g. alphabetical sets of table names if you sort
* your elements are also ordered (e.g. alphabetical lists of table names if you sort
* table names alphabetically to generate the raw partitions) or can result in efficient
* partitioning if elements are sorted according to some criteria that affects performance
* (e.g. topic partitions with the same leader).
@ -42,7 +42,7 @@ public class ConnectorUtils {
throw new IllegalArgumentException("Number of groups must be positive.");
}
List<List<T>> result = new ArrayList<List<T>>(numGroups);
List<List<T>> result = new ArrayList<>(numGroups);
// Each group has either n+1 or n raw partitions
int perGroup = elements.size() / numGroups;
@ -51,7 +51,7 @@ public class ConnectorUtils {
int assigned = 0;
for (int group = 0; group < numGroups; group++) {
int numThisGroup = group < leftover ? perGroup + 1 : perGroup;
List<T> groupList = new ArrayList<T>(numThisGroup);
List<T> groupList = new ArrayList<>(numThisGroup);
for (int i = 0; i < numThisGroup; i++) {
groupList.add(elements.get(assigned));
assigned++;

View File

@ -19,6 +19,19 @@ package org.apache.kafka.copycat.data;
public class BinaryData {
/**
* Compares regions of two byte arrays, returning a negative integer, zero, or positive integer when the first byte
* array region is less than, equal to, or greater than the second byte array region, respectively.
*
* @param b1 first byte array
* @param s1 start of region in first byte array
* @param l1 length of region in first byte array
* @param b2 second byte array
* @param s2 start of region in second byte array
* @param l2 length of region in second byte array
* @return a negative integer, zero, or a positive integer as the first byte array is less than, equal to, or greater
* than the second byte array
*/
public static int compareBytes(byte[] b1, int s1, int l1,
byte[] b2, int s2, int l2) {
int end1 = s1 + l1;
@ -26,10 +39,9 @@ public class BinaryData {
for (int i = s1, j = s2; i < end1 && j < end2; i++, j++) {
int a = b1[i] & 0xff;
int b = b2[j] & 0xff;
if (a != b) {
if (a != b)
return a - b;
}
}
return l1 - l2;
}
}

View File

@ -46,7 +46,7 @@ public class FileStreamSinkConnector extends SinkConnector {
@Override
public List<Properties> getTaskConfigs(int maxTasks) {
ArrayList<Properties> configs = new ArrayList<Properties>();
ArrayList<Properties> configs = new ArrayList<>();
for (int i = 0; i < maxTasks; i++) {
Properties config = new Properties();
if (filename != null) {

View File

@ -56,7 +56,7 @@ public class FileStreamSourceConnector extends SourceConnector {
@Override
public List<Properties> getTaskConfigs(int maxTasks) {
ArrayList<Properties> configs = new ArrayList<Properties>();
ArrayList<Properties> configs = new ArrayList<>();
// Only one input stream makes sense.
Properties config = new Properties();
if (filename != null) {

View File

@ -113,7 +113,7 @@ public class FileStreamSourceTask extends SourceTask<Object, Object> {
line = extractLine();
if (line != null) {
if (records == null) {
records = new ArrayList<SourceRecord>();
records = new ArrayList<>();
}
records.add(new SourceRecord(null, streamOffset, topic, line));
}

View File

@ -27,7 +27,6 @@ import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Properties;
import static org.junit.Assert.assertEquals;
@ -36,19 +35,17 @@ public class FileStreamSinkTaskTest {
private FileStreamSinkTask task;
private ByteArrayOutputStream os;
private PrintStream printStream;
private Properties config;
@Before
public void setup() {
os = new ByteArrayOutputStream();
printStream = new PrintStream(os);
task = new FileStreamSinkTask(printStream);
config = new Properties();
}
@Test
public void testPutFlush() throws CopycatException {
HashMap<TopicPartition, Long> offsets = new HashMap<TopicPartition, Long>();
HashMap<TopicPartition, Long> offsets = new HashMap<>();
// We do not call task.start() since it would override the output stream

View File

@ -38,7 +38,7 @@ public class WorkerConfig extends AbstractConfig {
public static final String CLUSTER_DEFAULT = "copycat";
public static final String BOOTSTRAP_SERVERS_CONFIG = "bootstrap.servers";
public static final String BOOSTRAP_SERVERS_DOC
public static final String BOOTSTRAP_SERVERS_DOC
= "A list of host/port pairs to use for establishing the initial connection to the Kafka "
+ "cluster. The client will make use of all servers irrespective of which servers are "
+ "specified here for bootstrapping&mdash;this list only impacts the initial hosts used "
@ -100,7 +100,7 @@ public class WorkerConfig extends AbstractConfig {
config = new ConfigDef()
.define(CLUSTER_CONFIG, Type.STRING, CLUSTER_DEFAULT, Importance.HIGH, CLUSTER_CONFIG_DOC)
.define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, BOOTSTRAP_SERVERS_DEFAULT,
Importance.HIGH, BOOSTRAP_SERVERS_DOC)
Importance.HIGH, BOOTSTRAP_SERVERS_DOC)
.define(CONVERTER_CLASS_CONFIG, Type.CLASS,
Importance.HIGH, CONVERTER_CLASS_DOC)
.define(KEY_SERIALIZER_CLASS_CONFIG, Type.CLASS,

View File

@ -50,8 +50,7 @@ public class SourceTaskOffsetCommitter {
private Time time;
private WorkerConfig config;
private ScheduledExecutorService commitExecutorService = null;
private HashMap<ConnectorTaskId, ScheduledFuture<?>> commiters
= new HashMap<ConnectorTaskId, ScheduledFuture<?>>();
private HashMap<ConnectorTaskId, ScheduledFuture<?>> committers = new HashMap<>();
SourceTaskOffsetCommitter(Time time, WorkerConfig config) {
this.time = time;
@ -78,11 +77,11 @@ public class SourceTaskOffsetCommitter {
commit(workerTask);
}
}, commitIntervalMs, commitIntervalMs, TimeUnit.MILLISECONDS);
commiters.put(id, commitFuture);
committers.put(id, commitFuture);
}
public void remove(ConnectorTaskId id) {
ScheduledFuture<?> commitFuture = commiters.remove(id);
ScheduledFuture<?> commitFuture = committers.remove(id);
commitFuture.cancel(false);
}

View File

@ -60,7 +60,7 @@ public class Worker {
private Serializer offsetValueSerializer;
private Deserializer offsetKeyDeserializer;
private Deserializer offsetValueDeserializer;
private HashMap<ConnectorTaskId, WorkerTask> tasks = new HashMap<ConnectorTaskId, WorkerTask>();
private HashMap<ConnectorTaskId, WorkerTask> tasks = new HashMap<>();
private KafkaProducer producer;
private SourceTaskOffsetCommitter sourceTaskOffsetCommitter;
@ -119,14 +119,14 @@ public class Worker {
Properties unusedConfigs = config.getUnusedProperties();
Map<String, Object> producerProps = new HashMap<String, Object>();
Map<String, Object> producerProps = new HashMap<>();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Utils.join(config.getList(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG), ","));
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, config.getClass(WorkerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName());
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, config.getClass(WorkerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName());
for (String propName : unusedConfigs.stringPropertyNames()) {
producerProps.put(propName, unusedConfigs.getProperty(propName));
}
producer = new KafkaProducer<Object, Object>(producerProps);
producer = new KafkaProducer<>(producerProps);
offsetBackingStore.start();
sourceTaskOffsetCommitter = new SourceTaskOffsetCommitter(time, config);
@ -232,7 +232,7 @@ public class Worker {
WorkerTask task = tasks.get(id);
if (task == null) {
log.error("Task not found: " + id);
throw new CopycatRuntimeException();
throw new CopycatRuntimeException("Task not found: " + id);
}
return task;
}

View File

@ -121,7 +121,7 @@ public class WorkerSinkTask implements WorkerTask {
* the write commit. This should only be invoked by the WorkerSinkTaskThread.
**/
public void commitOffsets(long now, boolean sync, final int seqno, boolean flush) {
HashMap<TopicPartition, Long> offsets = new HashMap<TopicPartition, Long>();
HashMap<TopicPartition, Long> offsets = new HashMap<>();
for (TopicPartition tp : consumer.subscriptions()) {
offsets.put(tp, consumer.position(tp));
}
@ -176,7 +176,7 @@ public class WorkerSinkTask implements WorkerTask {
KafkaConsumer<Object, Object> newConsumer;
try {
newConsumer = new KafkaConsumer<Object, Object>(props);
newConsumer = new KafkaConsumer<>(props);
} catch (Throwable t) {
throw new CopycatRuntimeException("Failed to create consumer", t);
}
@ -206,7 +206,7 @@ public class WorkerSinkTask implements WorkerTask {
private void deliverMessages(ConsumerRecords<Object, Object> msgs) {
// Finally, deliver this batch to the sink
if (msgs.count() > 0) {
List<SinkRecord> records = new ArrayList<SinkRecord>();
List<SinkRecord> records = new ArrayList<>();
for (ConsumerRecord<Object, Object> msg : msgs) {
log.trace("Consuming message with key {}, value {}", msg.key(), msg.value());
records.add(

View File

@ -82,10 +82,8 @@ public class WorkerSourceTask implements WorkerTask {
this.workerConfig = workerConfig;
this.time = time;
this.outstandingMessages
= new IdentityHashMap<ProducerRecord<Object, Object>, ProducerRecord<Object, Object>>();
this.outstandingMessagesBacklog
= new IdentityHashMap<ProducerRecord<Object, Object>, ProducerRecord<Object, Object>>();
this.outstandingMessages = new IdentityHashMap<>();
this.outstandingMessagesBacklog = new IdentityHashMap<>();
this.flushing = false;
}
@ -135,7 +133,7 @@ public class WorkerSourceTask implements WorkerTask {
private synchronized void sendRecords(List<SourceRecord> records) {
for (SourceRecord record : records) {
final ProducerRecord<Object, Object> producerRecord
= new ProducerRecord<Object, Object>(record.getTopic(), record.getPartition(),
= new ProducerRecord<>(record.getTopic(), record.getPartition(),
converter.fromCopycatData(record.getKey()),
converter.fromCopycatData(record.getValue()));
log.trace("Appending record with key {}, value {}", record.getKey(), record.getValue());

View File

@ -40,7 +40,7 @@ public class FileConfigStorage implements ConfigStorage {
public static final String FILE_DEFAULT = "configs.db";
private String filename;
private Map<String, Properties> connectorConfig = new HashMap<String, Properties>();
private Map<String, Properties> connectorConfig = new HashMap<>();
@Override
public void configure(Properties props) {
@ -101,9 +101,7 @@ public class FileConfigStorage implements ConfigStorage {
connectorConfig = (Map<String, Properties>) is.readObject();
} catch (FileNotFoundException e) {
// Expected on first run
} catch (IOException e) {
throw new CopycatRuntimeException("Failed to load config data", e);
} catch (ClassNotFoundException e) {
} catch (IOException | ClassNotFoundException e) {
throw new CopycatRuntimeException("Failed to load config data", e);
}
}

View File

@ -43,7 +43,7 @@ public class StandaloneCoordinator implements Coordinator {
private Worker worker;
private Properties configs;
private ConfigStorage configStorage;
private HashMap<String, ConnectorState> connectors = new HashMap<String, ConnectorState>();
private HashMap<String, ConnectorState> connectors = new HashMap<>();
public StandaloneCoordinator(Worker worker, Properties props) {
this.worker = worker;
@ -196,7 +196,7 @@ public class StandaloneCoordinator implements Coordinator {
List<Properties> taskConfigs = state.connector.getTaskConfigs(state.maxTasks);
// Generate the final configs, including framework provided settings
Map<ConnectorTaskId, Properties> taskProps = new HashMap<ConnectorTaskId, Properties>();
Map<ConnectorTaskId, Properties> taskProps = new HashMap<>();
for (int i = 0; i < taskConfigs.size(); i++) {
ConnectorTaskId taskId = new ConnectorTaskId(state.name, i);
Properties config = taskConfigs.get(i);
@ -297,7 +297,7 @@ public class StandaloneCoordinator implements Coordinator {
this.connector = connector;
this.maxTasks = maxTasks;
this.inputTopics = inputTopics;
this.tasks = new HashSet<ConnectorTaskId>();
this.tasks = new HashSet<>();
}
}
}

View File

@ -67,9 +67,9 @@ public class FileOffsetBackingStore extends MemoryOffsetBackingStore {
ObjectInputStream is = new ObjectInputStream(new FileInputStream(file));
HashMap<String, Map<byte[], byte[]>> raw
= (HashMap<String, Map<byte[], byte[]>>) is.readObject();
data = new HashMap<String, Map<ByteBuffer, ByteBuffer>>();
data = new HashMap<>();
for (Map.Entry<String, Map<byte[], byte[]>> entry : raw.entrySet()) {
HashMap<ByteBuffer, ByteBuffer> converted = new HashMap<ByteBuffer, ByteBuffer>();
HashMap<ByteBuffer, ByteBuffer> converted = new HashMap<>();
for (Map.Entry<byte[], byte[]> mapEntry : entry.getValue().entrySet()) {
ByteBuffer key = (mapEntry.getKey() != null) ? ByteBuffer.wrap(mapEntry.getKey()) : null;
ByteBuffer value = (mapEntry.getValue() != null) ? ByteBuffer.wrap(mapEntry.getValue()) :
@ -79,13 +79,10 @@ public class FileOffsetBackingStore extends MemoryOffsetBackingStore {
data.put(entry.getKey(), converted);
}
is.close();
} catch (FileNotFoundException e) {
// Ignore, may be new
} catch (EOFException e) {
// Ignore, this means the file was missing or corrupt
} catch (IOException e) {
throw new CopycatRuntimeException(e);
} catch (ClassNotFoundException e) {
} catch (FileNotFoundException | EOFException e) {
// FileNotFoundException: Ignore, may be new.
// EOFException: Ignore, this means the file was missing or corrupt
} catch (IOException | ClassNotFoundException e) {
throw new CopycatRuntimeException(e);
}
}
@ -93,9 +90,9 @@ public class FileOffsetBackingStore extends MemoryOffsetBackingStore {
protected void save() {
try {
ObjectOutputStream os = new ObjectOutputStream(new FileOutputStream(file));
HashMap<String, Map<byte[], byte[]>> raw = new HashMap<String, Map<byte[], byte[]>>();
HashMap<String, Map<byte[], byte[]>> raw = new HashMap<>();
for (Map.Entry<String, Map<ByteBuffer, ByteBuffer>> entry : data.entrySet()) {
HashMap<byte[], byte[]> converted = new HashMap<byte[], byte[]>();
HashMap<byte[], byte[]> converted = new HashMap<>();
for (Map.Entry<ByteBuffer, ByteBuffer> mapEntry : entry.getValue().entrySet()) {
byte[] key = (mapEntry.getKey() != null) ? mapEntry.getKey().array() : null;
byte[] value = (mapEntry.getValue() != null) ? mapEntry.getValue().array() : null;

View File

@ -39,7 +39,7 @@ import java.util.concurrent.Future;
public class MemoryOffsetBackingStore implements OffsetBackingStore {
private static final Logger log = LoggerFactory.getLogger(MemoryOffsetBackingStore.class);
protected HashMap<String, Map<ByteBuffer, ByteBuffer>> data = new HashMap<String, Map<ByteBuffer, ByteBuffer>>();
protected HashMap<String, Map<ByteBuffer, ByteBuffer>> data = new HashMap<>();
protected ExecutorService executor = Executors.newSingleThreadExecutor();
public MemoryOffsetBackingStore() {
@ -66,7 +66,7 @@ public class MemoryOffsetBackingStore implements OffsetBackingStore {
return executor.submit(new Callable<Map<ByteBuffer, ByteBuffer>>() {
@Override
public Map<ByteBuffer, ByteBuffer> call() throws Exception {
Map<ByteBuffer, ByteBuffer> result = new HashMap<ByteBuffer, ByteBuffer>();
Map<ByteBuffer, ByteBuffer> result = new HashMap<>();
synchronized (MemoryOffsetBackingStore.this) {
Map<ByteBuffer, ByteBuffer> namespaceData = data.get(namespace);
if (namespaceData == null) {
@ -94,7 +94,7 @@ public class MemoryOffsetBackingStore implements OffsetBackingStore {
synchronized (MemoryOffsetBackingStore.this) {
Map<ByteBuffer, ByteBuffer> namespaceData = data.get(namespace);
if (namespaceData == null) {
namespaceData = new HashMap<ByteBuffer, ByteBuffer>();
namespaceData = new HashMap<>();
data.put(namespace, namespaceData);
}
for (Map.Entry<ByteBuffer, ByteBuffer> entry : values.entrySet()) {

View File

@ -62,7 +62,7 @@ public class OffsetStorageReaderImpl implements OffsetStorageReader {
@Override
public Map<Object, Object> getOffsets(Collection<Object> streams) {
// Serialize keys so backing store can work with them
Map<ByteBuffer, Object> serializedToOriginal = new HashMap<ByteBuffer, Object>(streams.size());
Map<ByteBuffer, Object> serializedToOriginal = new HashMap<>(streams.size());
for (Object key : streams) {
try {
byte[] keySerialized = keySerializer.serialize(namespace, converter.fromCopycatData(key));
@ -85,7 +85,7 @@ public class OffsetStorageReaderImpl implements OffsetStorageReader {
}
// Deserialize all the values and map back to the original keys
Map<Object, Object> result = new HashMap<Object, Object>(streams.size());
Map<Object, Object> result = new HashMap<>(streams.size());
for (Map.Entry<ByteBuffer, ByteBuffer> rawEntry : raw.entrySet()) {
try {
// Since null could be a valid key, explicitly check whether map contains the key

View File

@ -71,7 +71,7 @@ public class OffsetStorageWriter {
private final Serializer keySerializer;
private final Serializer valueSerializer;
private final String namespace;
private Map<Object, Object> data = new HashMap<Object, Object>();
private Map<Object, Object> data = new HashMap<>();
// Not synchronized, should only be accessed by flush thread
private Map<Object, Object> toFlush = null;
@ -115,7 +115,7 @@ public class OffsetStorageWriter {
assert !flushing();
toFlush = data;
data = new HashMap<Object, Object>();
data = new HashMap<>();
return true;
}
@ -133,7 +133,7 @@ public class OffsetStorageWriter {
// Serialize
Map<ByteBuffer, ByteBuffer> offsetsSerialized;
try {
offsetsSerialized = new HashMap<ByteBuffer, ByteBuffer>();
offsetsSerialized = new HashMap<>();
for (Map.Entry<Object, Object> entry : toFlush.entrySet()) {
byte[] key = keySerializer.serialize(namespace, converter.fromCopycatData(entry.getKey()));
ByteBuffer keyBuffer = (key != null) ? ByteBuffer.wrap(key) : null;

View File

@ -81,10 +81,7 @@ public abstract class ShutdownableThread extends Thread {
public void run() {
try {
execute();
} catch (Error e) {
log.error("Thread {} exiting with uncaught exception: ", getName(), e);
throw e;
} catch (RuntimeException e) {
} catch (Error | RuntimeException e) {
log.error("Thread {} exiting with uncaught exception: ", getName(), e);
throw e;
} finally {

View File

@ -133,7 +133,7 @@ public class WorkerSinkTaskTest extends ThreadedTest {
GenericRecord record = new GenericRecordBuilder(schema).build();
byte[] rawKey = "key".getBytes(), rawValue = "value".getBytes();
ConsumerRecords<Object, Object> records = new ConsumerRecords<Object, Object>(
ConsumerRecords<Object, Object> records = new ConsumerRecords<>(
Collections.singletonMap(
new TopicPartition("topic", 0),
Collections.singletonList(
@ -309,7 +309,7 @@ public class WorkerSinkTaskTest extends ThreadedTest {
public ConsumerRecords<Object, Object> answer() throws Throwable {
// "Sleep" so time will progress
time.sleep(pollDelayMs);
ConsumerRecords<Object, Object> records = new ConsumerRecords<Object, Object>(
ConsumerRecords<Object, Object> records = new ConsumerRecords<>(
Collections.singletonMap(new TopicPartition(TOPIC, PARTITION), Arrays.asList(
new ConsumerRecord<Object, Object>(TOPIC, PARTITION,
FIRST_OFFSET + recordsReturned, KEY,

View File

@ -196,7 +196,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
public void testSendRecordsConvertsData() throws Exception {
createWorkerTask();
List<SourceRecord> records = new ArrayList<SourceRecord>();
List<SourceRecord> records = new ArrayList<>();
// Can just use the same record for key and value
records.add(new SourceRecord(STREAM_BYTES, OFFSET_BYTES, "topic", null, RECORD, RECORD));

View File

@ -85,7 +85,7 @@ public class StandaloneCoordinatorTest extends StandaloneCoordinatorTestBase {
PowerMock.replayAll();
coordinator.addConnector(connectorProps, createCallback);
FutureCallback<Void> futureCb = new FutureCallback<Void>(new Callback() {
FutureCallback<Void> futureCb = new FutureCallback<>(new Callback() {
@Override
public void onCompletion(Throwable error, Object result) {

View File

@ -40,7 +40,7 @@ public class FileOffsetBackingStoreTest {
Properties props;
File tempFile;
private static Map<ByteBuffer, ByteBuffer> firstSet = new HashMap<ByteBuffer, ByteBuffer>();
private static Map<ByteBuffer, ByteBuffer> firstSet = new HashMap<>();
static {
firstSet.put(buffer("key"), buffer("value"));