mirror of https://github.com/apache/kafka.git
Address review comments, clean up some code styling.
This commit is contained in:
parent
25b5739ff0
commit
4674d136e1
57
build.gradle
57
build.gradle
|
@ -28,6 +28,11 @@ buildscript {
|
|||
}
|
||||
|
||||
def slf4jlog4j='org.slf4j:slf4j-log4j12:1.7.6'
|
||||
def slf4japi="org.slf4j:slf4j-api:1.7.6"
|
||||
def junit='junit:junit:4.6'
|
||||
def easymock='org.easymock:easymock:3.3.1'
|
||||
def powermock='org.powermock:powermock-module-junit4:1.6.2'
|
||||
def powermock_easymock='org.powermock:powermock-api-easymock:1.6.2'
|
||||
|
||||
allprojects {
|
||||
apply plugin: 'idea'
|
||||
|
@ -59,7 +64,7 @@ rat {
|
|||
// And some of the files that we have checked in should also be excluded from this check
|
||||
excludes.addAll([
|
||||
'**/.git/**',
|
||||
'build/**',
|
||||
'**/build/**',
|
||||
'CONTRIBUTING.md',
|
||||
'gradlew',
|
||||
'gradlew.bat',
|
||||
|
@ -244,8 +249,8 @@ project(':core') {
|
|||
compile 'org.scala-lang.modules:scala-parser-combinators_2.11:1.0.4'
|
||||
}
|
||||
|
||||
testCompile 'junit:junit:4.6'
|
||||
testCompile 'org.easymock:easymock:3.3.1'
|
||||
testCompile "$junit"
|
||||
testCompile "$easymock"
|
||||
testCompile 'org.objenesis:objenesis:1.2'
|
||||
testCompile "org.scalatest:scalatest_$baseScalaVersion:2.2.5"
|
||||
|
||||
|
@ -376,11 +381,11 @@ project(':clients') {
|
|||
archivesBaseName = "kafka-clients"
|
||||
|
||||
dependencies {
|
||||
compile "org.slf4j:slf4j-api:1.7.6"
|
||||
compile "$slf4japi"
|
||||
compile 'org.xerial.snappy:snappy-java:1.1.1.7'
|
||||
compile 'net.jpountz.lz4:lz4:1.2.0'
|
||||
|
||||
testCompile 'junit:junit:4.6'
|
||||
testCompile "$junit"
|
||||
testRuntime "$slf4jlog4j"
|
||||
}
|
||||
|
||||
|
@ -428,7 +433,7 @@ project(':tools') {
|
|||
compile 'com.fasterxml.jackson.core:jackson-databind:2.5.4'
|
||||
compile "$slf4jlog4j"
|
||||
|
||||
testCompile 'junit:junit:4.6'
|
||||
testCompile "$junit"
|
||||
testCompile project(path: ':clients', configuration: 'archives')
|
||||
}
|
||||
|
||||
|
@ -476,7 +481,7 @@ project(':log4j-appender') {
|
|||
compile project(':clients')
|
||||
compile "$slf4jlog4j"
|
||||
|
||||
testCompile 'junit:junit:4.6'
|
||||
testCompile "$junit"
|
||||
testCompile project(path: ':clients', configuration: 'archives')
|
||||
}
|
||||
|
||||
|
@ -508,9 +513,9 @@ project(':copycat-data') {
|
|||
|
||||
dependencies {
|
||||
compile project(':clients')
|
||||
compile "org.slf4j:slf4j-api:1.7.6"
|
||||
compile "$slf4japi"
|
||||
|
||||
testCompile 'junit:junit:4.6'
|
||||
testCompile "$junit"
|
||||
testRuntime "$slf4jlog4j"
|
||||
}
|
||||
|
||||
|
@ -551,9 +556,9 @@ project(':copycat-api') {
|
|||
|
||||
dependencies {
|
||||
compile project(':copycat-data')
|
||||
compile "org.slf4j:slf4j-api:1.7.6"
|
||||
compile "$slf4japi"
|
||||
|
||||
testCompile 'junit:junit:4.6'
|
||||
testCompile "$junit"
|
||||
testRuntime "$slf4jlog4j"
|
||||
}
|
||||
|
||||
|
@ -593,13 +598,13 @@ project(':copycat-json') {
|
|||
|
||||
dependencies {
|
||||
compile project(':copycat-api')
|
||||
compile "org.slf4j:slf4j-api:1.7.6"
|
||||
compile "$slf4japi"
|
||||
compile 'com.fasterxml.jackson.core:jackson-databind:2.5.4'
|
||||
|
||||
testCompile 'junit:junit:4.6'
|
||||
testCompile 'org.easymock:easymock:3.3.1'
|
||||
testCompile 'org.powermock:powermock-module-junit4:1.6.2'
|
||||
testCompile 'org.powermock:powermock-api-easymock:1.6.2'
|
||||
testCompile "$junit"
|
||||
testCompile "$easymock"
|
||||
testCompile "$powermock"
|
||||
testCompile "$powermock_easymock"
|
||||
testRuntime "$slf4jlog4j"
|
||||
}
|
||||
|
||||
|
@ -652,12 +657,12 @@ project(':copycat-runtime') {
|
|||
dependencies {
|
||||
compile project(':copycat-api')
|
||||
compile project(':clients')
|
||||
compile "org.slf4j:slf4j-api:1.7.6"
|
||||
compile "$slf4japi"
|
||||
|
||||
testCompile 'junit:junit:4.6'
|
||||
testCompile 'org.easymock:easymock:3.3.1'
|
||||
testCompile 'org.powermock:powermock-module-junit4:1.6.2'
|
||||
testCompile 'org.powermock:powermock-api-easymock:1.6.2'
|
||||
testCompile "$junit"
|
||||
testCompile "$easymock"
|
||||
testCompile "$powermock"
|
||||
testCompile "$powermock_easymock"
|
||||
testRuntime "$slf4jlog4j"
|
||||
testRuntime project(":copycat-json")
|
||||
}
|
||||
|
@ -698,12 +703,12 @@ project(':copycat-file') {
|
|||
|
||||
dependencies {
|
||||
compile project(':copycat-api')
|
||||
compile "org.slf4j:slf4j-api:1.7.6"
|
||||
compile "$slf4japi"
|
||||
|
||||
testCompile 'junit:junit:4.6'
|
||||
testCompile 'org.easymock:easymock:3.3.1'
|
||||
testCompile 'org.powermock:powermock-module-junit4:1.6.2'
|
||||
testCompile 'org.powermock:powermock-api-easymock:1.6.2'
|
||||
testCompile "$junit"
|
||||
testCompile "$easymock"
|
||||
testCompile "$powermock"
|
||||
testCompile "$powermock_easymock"
|
||||
testRuntime "$slf4jlog4j"
|
||||
}
|
||||
|
||||
|
|
|
@ -66,7 +66,8 @@ public abstract class Connector {
|
|||
* </p>
|
||||
*
|
||||
* @param ctx context object used to interact with the Copycat runtime
|
||||
* @param taskConfigs
|
||||
* @param taskConfigs existing task configurations, which may be used when generating new task configs to avoid
|
||||
* churn in partition to task assignments
|
||||
*/
|
||||
public void initialize(ConnectorContext ctx, List<Properties> taskConfigs) {
|
||||
context = ctx;
|
||||
|
|
|
@ -26,5 +26,5 @@ public interface ConnectorContext {
|
|||
* indicate to the runtime that something about the input/output has changed (e.g. streams
|
||||
* added/removed) and the running Tasks will need to be modified.
|
||||
*/
|
||||
public void requestTaskReconfiguration();
|
||||
void requestTaskReconfiguration();
|
||||
}
|
||||
|
|
|
@ -22,15 +22,10 @@ package org.apache.kafka.copycat.connector;
|
|||
* Base class for records containing data to be copied to/from Kafka. This corresponds closely to
|
||||
* Kafka's ProducerRecord and ConsumerRecord classes, and holds the data that may be used by both
|
||||
* sources and sinks (topic, partition, key, value). Although both implementations include a
|
||||
* notion of offest, it is not included here because they differ in type.
|
||||
* </p>
|
||||
* <p>
|
||||
* This class uses type parameters for keys and values. These are provided primarily for
|
||||
* connector developer convenience. Internally, Copycat will handle any primitive types or
|
||||
* org.apache.kafka.copycat.data types.
|
||||
* notion of offset, it is not included here because they differ in type.
|
||||
* </p>
|
||||
*/
|
||||
public class CopycatRecord {
|
||||
public abstract class CopycatRecord {
|
||||
private final String topic;
|
||||
private final Integer partition;
|
||||
private final Object key;
|
||||
|
|
|
@ -18,9 +18,6 @@
|
|||
package org.apache.kafka.copycat.errors;
|
||||
|
||||
public class CopycatRuntimeException extends RuntimeException {
|
||||
public CopycatRuntimeException() {
|
||||
}
|
||||
|
||||
public CopycatRuntimeException(String s) {
|
||||
super(s);
|
||||
}
|
||||
|
|
|
@ -29,7 +29,7 @@ public abstract class SinkTaskContext {
|
|||
private Map<TopicPartition, Long> offsets;
|
||||
|
||||
public SinkTaskContext() {
|
||||
offsets = new HashMap<TopicPartition, Long>();
|
||||
offsets = new HashMap<>();
|
||||
}
|
||||
|
||||
public void resetOffset(Map<TopicPartition, Long> offsets) {
|
||||
|
|
|
@ -33,7 +33,7 @@ public interface OffsetStorageReader {
|
|||
* @param stream object uniquely identifying the stream of data
|
||||
* @return object uniquely identifying the offset in the stream of data
|
||||
*/
|
||||
public Object getOffset(Object stream);
|
||||
Object getOffset(Object stream);
|
||||
|
||||
/**
|
||||
* <p>
|
||||
|
@ -52,5 +52,5 @@ public interface OffsetStorageReader {
|
|||
* @param streams set of identifiers for streams of data
|
||||
* @return a map of stream identifiers to decoded offsets
|
||||
*/
|
||||
public Map<Object, Object> getOffsets(Collection<Object> streams);
|
||||
Map<Object, Object> getOffsets(Collection<Object> streams);
|
||||
}
|
||||
|
|
|
@ -26,10 +26,10 @@ import java.util.List;
|
|||
*/
|
||||
public class ConnectorUtils {
|
||||
/**
|
||||
* Given a set of elements and a target number of groups, generates an set of groups of
|
||||
* Given a list of elements and a target number of groups, generates list of groups of
|
||||
* elements to match the target number of groups, spreading them evenly among the groups.
|
||||
* This generates groups with contiguous elements, which results in intuitive ordering if
|
||||
* your elements are also ordered (e.g. alphabetical sets of table names if you sort
|
||||
* your elements are also ordered (e.g. alphabetical lists of table names if you sort
|
||||
* table names alphabetically to generate the raw partitions) or can result in efficient
|
||||
* partitioning if elements are sorted according to some criteria that affects performance
|
||||
* (e.g. topic partitions with the same leader).
|
||||
|
@ -42,7 +42,7 @@ public class ConnectorUtils {
|
|||
throw new IllegalArgumentException("Number of groups must be positive.");
|
||||
}
|
||||
|
||||
List<List<T>> result = new ArrayList<List<T>>(numGroups);
|
||||
List<List<T>> result = new ArrayList<>(numGroups);
|
||||
|
||||
// Each group has either n+1 or n raw partitions
|
||||
int perGroup = elements.size() / numGroups;
|
||||
|
@ -51,7 +51,7 @@ public class ConnectorUtils {
|
|||
int assigned = 0;
|
||||
for (int group = 0; group < numGroups; group++) {
|
||||
int numThisGroup = group < leftover ? perGroup + 1 : perGroup;
|
||||
List<T> groupList = new ArrayList<T>(numThisGroup);
|
||||
List<T> groupList = new ArrayList<>(numThisGroup);
|
||||
for (int i = 0; i < numThisGroup; i++) {
|
||||
groupList.add(elements.get(assigned));
|
||||
assigned++;
|
||||
|
|
|
@ -19,6 +19,19 @@ package org.apache.kafka.copycat.data;
|
|||
|
||||
|
||||
public class BinaryData {
|
||||
/**
|
||||
* Compares regions of two byte arrays, returning a negative integer, zero, or positive integer when the first byte
|
||||
* array region is less than, equal to, or greater than the second byte array region, respectively.
|
||||
*
|
||||
* @param b1 first byte array
|
||||
* @param s1 start of region in first byte array
|
||||
* @param l1 length of region in first byte array
|
||||
* @param b2 second byte array
|
||||
* @param s2 start of region in second byte array
|
||||
* @param l2 length of region in second byte array
|
||||
* @return a negative integer, zero, or a positive integer as the first byte array is less than, equal to, or greater
|
||||
* than the second byte array
|
||||
*/
|
||||
public static int compareBytes(byte[] b1, int s1, int l1,
|
||||
byte[] b2, int s2, int l2) {
|
||||
int end1 = s1 + l1;
|
||||
|
@ -26,9 +39,8 @@ public class BinaryData {
|
|||
for (int i = s1, j = s2; i < end1 && j < end2; i++, j++) {
|
||||
int a = b1[i] & 0xff;
|
||||
int b = b2[j] & 0xff;
|
||||
if (a != b) {
|
||||
if (a != b)
|
||||
return a - b;
|
||||
}
|
||||
}
|
||||
return l1 - l2;
|
||||
}
|
||||
|
|
|
@ -46,7 +46,7 @@ public class FileStreamSinkConnector extends SinkConnector {
|
|||
|
||||
@Override
|
||||
public List<Properties> getTaskConfigs(int maxTasks) {
|
||||
ArrayList<Properties> configs = new ArrayList<Properties>();
|
||||
ArrayList<Properties> configs = new ArrayList<>();
|
||||
for (int i = 0; i < maxTasks; i++) {
|
||||
Properties config = new Properties();
|
||||
if (filename != null) {
|
||||
|
|
|
@ -56,7 +56,7 @@ public class FileStreamSourceConnector extends SourceConnector {
|
|||
|
||||
@Override
|
||||
public List<Properties> getTaskConfigs(int maxTasks) {
|
||||
ArrayList<Properties> configs = new ArrayList<Properties>();
|
||||
ArrayList<Properties> configs = new ArrayList<>();
|
||||
// Only one input stream makes sense.
|
||||
Properties config = new Properties();
|
||||
if (filename != null) {
|
||||
|
|
|
@ -113,7 +113,7 @@ public class FileStreamSourceTask extends SourceTask<Object, Object> {
|
|||
line = extractLine();
|
||||
if (line != null) {
|
||||
if (records == null) {
|
||||
records = new ArrayList<SourceRecord>();
|
||||
records = new ArrayList<>();
|
||||
}
|
||||
records.add(new SourceRecord(null, streamOffset, topic, line));
|
||||
}
|
||||
|
|
|
@ -27,7 +27,6 @@ import java.io.ByteArrayOutputStream;
|
|||
import java.io.PrintStream;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.Properties;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
|
@ -36,19 +35,17 @@ public class FileStreamSinkTaskTest {
|
|||
private FileStreamSinkTask task;
|
||||
private ByteArrayOutputStream os;
|
||||
private PrintStream printStream;
|
||||
private Properties config;
|
||||
|
||||
@Before
|
||||
public void setup() {
|
||||
os = new ByteArrayOutputStream();
|
||||
printStream = new PrintStream(os);
|
||||
task = new FileStreamSinkTask(printStream);
|
||||
config = new Properties();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPutFlush() throws CopycatException {
|
||||
HashMap<TopicPartition, Long> offsets = new HashMap<TopicPartition, Long>();
|
||||
HashMap<TopicPartition, Long> offsets = new HashMap<>();
|
||||
|
||||
// We do not call task.start() since it would override the output stream
|
||||
|
||||
|
|
|
@ -38,7 +38,7 @@ public class WorkerConfig extends AbstractConfig {
|
|||
public static final String CLUSTER_DEFAULT = "copycat";
|
||||
|
||||
public static final String BOOTSTRAP_SERVERS_CONFIG = "bootstrap.servers";
|
||||
public static final String BOOSTRAP_SERVERS_DOC
|
||||
public static final String BOOTSTRAP_SERVERS_DOC
|
||||
= "A list of host/port pairs to use for establishing the initial connection to the Kafka "
|
||||
+ "cluster. The client will make use of all servers irrespective of which servers are "
|
||||
+ "specified here for bootstrapping—this list only impacts the initial hosts used "
|
||||
|
@ -100,7 +100,7 @@ public class WorkerConfig extends AbstractConfig {
|
|||
config = new ConfigDef()
|
||||
.define(CLUSTER_CONFIG, Type.STRING, CLUSTER_DEFAULT, Importance.HIGH, CLUSTER_CONFIG_DOC)
|
||||
.define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, BOOTSTRAP_SERVERS_DEFAULT,
|
||||
Importance.HIGH, BOOSTRAP_SERVERS_DOC)
|
||||
Importance.HIGH, BOOTSTRAP_SERVERS_DOC)
|
||||
.define(CONVERTER_CLASS_CONFIG, Type.CLASS,
|
||||
Importance.HIGH, CONVERTER_CLASS_DOC)
|
||||
.define(KEY_SERIALIZER_CLASS_CONFIG, Type.CLASS,
|
||||
|
|
|
@ -50,8 +50,7 @@ public class SourceTaskOffsetCommitter {
|
|||
private Time time;
|
||||
private WorkerConfig config;
|
||||
private ScheduledExecutorService commitExecutorService = null;
|
||||
private HashMap<ConnectorTaskId, ScheduledFuture<?>> commiters
|
||||
= new HashMap<ConnectorTaskId, ScheduledFuture<?>>();
|
||||
private HashMap<ConnectorTaskId, ScheduledFuture<?>> committers = new HashMap<>();
|
||||
|
||||
SourceTaskOffsetCommitter(Time time, WorkerConfig config) {
|
||||
this.time = time;
|
||||
|
@ -78,11 +77,11 @@ public class SourceTaskOffsetCommitter {
|
|||
commit(workerTask);
|
||||
}
|
||||
}, commitIntervalMs, commitIntervalMs, TimeUnit.MILLISECONDS);
|
||||
commiters.put(id, commitFuture);
|
||||
committers.put(id, commitFuture);
|
||||
}
|
||||
|
||||
public void remove(ConnectorTaskId id) {
|
||||
ScheduledFuture<?> commitFuture = commiters.remove(id);
|
||||
ScheduledFuture<?> commitFuture = committers.remove(id);
|
||||
commitFuture.cancel(false);
|
||||
}
|
||||
|
||||
|
|
|
@ -60,7 +60,7 @@ public class Worker {
|
|||
private Serializer offsetValueSerializer;
|
||||
private Deserializer offsetKeyDeserializer;
|
||||
private Deserializer offsetValueDeserializer;
|
||||
private HashMap<ConnectorTaskId, WorkerTask> tasks = new HashMap<ConnectorTaskId, WorkerTask>();
|
||||
private HashMap<ConnectorTaskId, WorkerTask> tasks = new HashMap<>();
|
||||
private KafkaProducer producer;
|
||||
private SourceTaskOffsetCommitter sourceTaskOffsetCommitter;
|
||||
|
||||
|
@ -119,14 +119,14 @@ public class Worker {
|
|||
|
||||
Properties unusedConfigs = config.getUnusedProperties();
|
||||
|
||||
Map<String, Object> producerProps = new HashMap<String, Object>();
|
||||
Map<String, Object> producerProps = new HashMap<>();
|
||||
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Utils.join(config.getList(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG), ","));
|
||||
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, config.getClass(WorkerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName());
|
||||
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, config.getClass(WorkerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName());
|
||||
for (String propName : unusedConfigs.stringPropertyNames()) {
|
||||
producerProps.put(propName, unusedConfigs.getProperty(propName));
|
||||
}
|
||||
producer = new KafkaProducer<Object, Object>(producerProps);
|
||||
producer = new KafkaProducer<>(producerProps);
|
||||
|
||||
offsetBackingStore.start();
|
||||
sourceTaskOffsetCommitter = new SourceTaskOffsetCommitter(time, config);
|
||||
|
@ -232,7 +232,7 @@ public class Worker {
|
|||
WorkerTask task = tasks.get(id);
|
||||
if (task == null) {
|
||||
log.error("Task not found: " + id);
|
||||
throw new CopycatRuntimeException();
|
||||
throw new CopycatRuntimeException("Task not found: " + id);
|
||||
}
|
||||
return task;
|
||||
}
|
||||
|
|
|
@ -121,7 +121,7 @@ public class WorkerSinkTask implements WorkerTask {
|
|||
* the write commit. This should only be invoked by the WorkerSinkTaskThread.
|
||||
**/
|
||||
public void commitOffsets(long now, boolean sync, final int seqno, boolean flush) {
|
||||
HashMap<TopicPartition, Long> offsets = new HashMap<TopicPartition, Long>();
|
||||
HashMap<TopicPartition, Long> offsets = new HashMap<>();
|
||||
for (TopicPartition tp : consumer.subscriptions()) {
|
||||
offsets.put(tp, consumer.position(tp));
|
||||
}
|
||||
|
@ -176,7 +176,7 @@ public class WorkerSinkTask implements WorkerTask {
|
|||
|
||||
KafkaConsumer<Object, Object> newConsumer;
|
||||
try {
|
||||
newConsumer = new KafkaConsumer<Object, Object>(props);
|
||||
newConsumer = new KafkaConsumer<>(props);
|
||||
} catch (Throwable t) {
|
||||
throw new CopycatRuntimeException("Failed to create consumer", t);
|
||||
}
|
||||
|
@ -206,7 +206,7 @@ public class WorkerSinkTask implements WorkerTask {
|
|||
private void deliverMessages(ConsumerRecords<Object, Object> msgs) {
|
||||
// Finally, deliver this batch to the sink
|
||||
if (msgs.count() > 0) {
|
||||
List<SinkRecord> records = new ArrayList<SinkRecord>();
|
||||
List<SinkRecord> records = new ArrayList<>();
|
||||
for (ConsumerRecord<Object, Object> msg : msgs) {
|
||||
log.trace("Consuming message with key {}, value {}", msg.key(), msg.value());
|
||||
records.add(
|
||||
|
|
|
@ -82,10 +82,8 @@ public class WorkerSourceTask implements WorkerTask {
|
|||
this.workerConfig = workerConfig;
|
||||
this.time = time;
|
||||
|
||||
this.outstandingMessages
|
||||
= new IdentityHashMap<ProducerRecord<Object, Object>, ProducerRecord<Object, Object>>();
|
||||
this.outstandingMessagesBacklog
|
||||
= new IdentityHashMap<ProducerRecord<Object, Object>, ProducerRecord<Object, Object>>();
|
||||
this.outstandingMessages = new IdentityHashMap<>();
|
||||
this.outstandingMessagesBacklog = new IdentityHashMap<>();
|
||||
this.flushing = false;
|
||||
}
|
||||
|
||||
|
@ -135,7 +133,7 @@ public class WorkerSourceTask implements WorkerTask {
|
|||
private synchronized void sendRecords(List<SourceRecord> records) {
|
||||
for (SourceRecord record : records) {
|
||||
final ProducerRecord<Object, Object> producerRecord
|
||||
= new ProducerRecord<Object, Object>(record.getTopic(), record.getPartition(),
|
||||
= new ProducerRecord<>(record.getTopic(), record.getPartition(),
|
||||
converter.fromCopycatData(record.getKey()),
|
||||
converter.fromCopycatData(record.getValue()));
|
||||
log.trace("Appending record with key {}, value {}", record.getKey(), record.getValue());
|
||||
|
|
|
@ -40,7 +40,7 @@ public class FileConfigStorage implements ConfigStorage {
|
|||
public static final String FILE_DEFAULT = "configs.db";
|
||||
|
||||
private String filename;
|
||||
private Map<String, Properties> connectorConfig = new HashMap<String, Properties>();
|
||||
private Map<String, Properties> connectorConfig = new HashMap<>();
|
||||
|
||||
@Override
|
||||
public void configure(Properties props) {
|
||||
|
@ -101,9 +101,7 @@ public class FileConfigStorage implements ConfigStorage {
|
|||
connectorConfig = (Map<String, Properties>) is.readObject();
|
||||
} catch (FileNotFoundException e) {
|
||||
// Expected on first run
|
||||
} catch (IOException e) {
|
||||
throw new CopycatRuntimeException("Failed to load config data", e);
|
||||
} catch (ClassNotFoundException e) {
|
||||
} catch (IOException | ClassNotFoundException e) {
|
||||
throw new CopycatRuntimeException("Failed to load config data", e);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -43,7 +43,7 @@ public class StandaloneCoordinator implements Coordinator {
|
|||
private Worker worker;
|
||||
private Properties configs;
|
||||
private ConfigStorage configStorage;
|
||||
private HashMap<String, ConnectorState> connectors = new HashMap<String, ConnectorState>();
|
||||
private HashMap<String, ConnectorState> connectors = new HashMap<>();
|
||||
|
||||
public StandaloneCoordinator(Worker worker, Properties props) {
|
||||
this.worker = worker;
|
||||
|
@ -196,7 +196,7 @@ public class StandaloneCoordinator implements Coordinator {
|
|||
List<Properties> taskConfigs = state.connector.getTaskConfigs(state.maxTasks);
|
||||
|
||||
// Generate the final configs, including framework provided settings
|
||||
Map<ConnectorTaskId, Properties> taskProps = new HashMap<ConnectorTaskId, Properties>();
|
||||
Map<ConnectorTaskId, Properties> taskProps = new HashMap<>();
|
||||
for (int i = 0; i < taskConfigs.size(); i++) {
|
||||
ConnectorTaskId taskId = new ConnectorTaskId(state.name, i);
|
||||
Properties config = taskConfigs.get(i);
|
||||
|
@ -297,7 +297,7 @@ public class StandaloneCoordinator implements Coordinator {
|
|||
this.connector = connector;
|
||||
this.maxTasks = maxTasks;
|
||||
this.inputTopics = inputTopics;
|
||||
this.tasks = new HashSet<ConnectorTaskId>();
|
||||
this.tasks = new HashSet<>();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -67,9 +67,9 @@ public class FileOffsetBackingStore extends MemoryOffsetBackingStore {
|
|||
ObjectInputStream is = new ObjectInputStream(new FileInputStream(file));
|
||||
HashMap<String, Map<byte[], byte[]>> raw
|
||||
= (HashMap<String, Map<byte[], byte[]>>) is.readObject();
|
||||
data = new HashMap<String, Map<ByteBuffer, ByteBuffer>>();
|
||||
data = new HashMap<>();
|
||||
for (Map.Entry<String, Map<byte[], byte[]>> entry : raw.entrySet()) {
|
||||
HashMap<ByteBuffer, ByteBuffer> converted = new HashMap<ByteBuffer, ByteBuffer>();
|
||||
HashMap<ByteBuffer, ByteBuffer> converted = new HashMap<>();
|
||||
for (Map.Entry<byte[], byte[]> mapEntry : entry.getValue().entrySet()) {
|
||||
ByteBuffer key = (mapEntry.getKey() != null) ? ByteBuffer.wrap(mapEntry.getKey()) : null;
|
||||
ByteBuffer value = (mapEntry.getValue() != null) ? ByteBuffer.wrap(mapEntry.getValue()) :
|
||||
|
@ -79,13 +79,10 @@ public class FileOffsetBackingStore extends MemoryOffsetBackingStore {
|
|||
data.put(entry.getKey(), converted);
|
||||
}
|
||||
is.close();
|
||||
} catch (FileNotFoundException e) {
|
||||
// Ignore, may be new
|
||||
} catch (EOFException e) {
|
||||
// Ignore, this means the file was missing or corrupt
|
||||
} catch (IOException e) {
|
||||
throw new CopycatRuntimeException(e);
|
||||
} catch (ClassNotFoundException e) {
|
||||
} catch (FileNotFoundException | EOFException e) {
|
||||
// FileNotFoundException: Ignore, may be new.
|
||||
// EOFException: Ignore, this means the file was missing or corrupt
|
||||
} catch (IOException | ClassNotFoundException e) {
|
||||
throw new CopycatRuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
@ -93,9 +90,9 @@ public class FileOffsetBackingStore extends MemoryOffsetBackingStore {
|
|||
protected void save() {
|
||||
try {
|
||||
ObjectOutputStream os = new ObjectOutputStream(new FileOutputStream(file));
|
||||
HashMap<String, Map<byte[], byte[]>> raw = new HashMap<String, Map<byte[], byte[]>>();
|
||||
HashMap<String, Map<byte[], byte[]>> raw = new HashMap<>();
|
||||
for (Map.Entry<String, Map<ByteBuffer, ByteBuffer>> entry : data.entrySet()) {
|
||||
HashMap<byte[], byte[]> converted = new HashMap<byte[], byte[]>();
|
||||
HashMap<byte[], byte[]> converted = new HashMap<>();
|
||||
for (Map.Entry<ByteBuffer, ByteBuffer> mapEntry : entry.getValue().entrySet()) {
|
||||
byte[] key = (mapEntry.getKey() != null) ? mapEntry.getKey().array() : null;
|
||||
byte[] value = (mapEntry.getValue() != null) ? mapEntry.getValue().array() : null;
|
||||
|
|
|
@ -39,7 +39,7 @@ import java.util.concurrent.Future;
|
|||
public class MemoryOffsetBackingStore implements OffsetBackingStore {
|
||||
private static final Logger log = LoggerFactory.getLogger(MemoryOffsetBackingStore.class);
|
||||
|
||||
protected HashMap<String, Map<ByteBuffer, ByteBuffer>> data = new HashMap<String, Map<ByteBuffer, ByteBuffer>>();
|
||||
protected HashMap<String, Map<ByteBuffer, ByteBuffer>> data = new HashMap<>();
|
||||
protected ExecutorService executor = Executors.newSingleThreadExecutor();
|
||||
|
||||
public MemoryOffsetBackingStore() {
|
||||
|
@ -66,7 +66,7 @@ public class MemoryOffsetBackingStore implements OffsetBackingStore {
|
|||
return executor.submit(new Callable<Map<ByteBuffer, ByteBuffer>>() {
|
||||
@Override
|
||||
public Map<ByteBuffer, ByteBuffer> call() throws Exception {
|
||||
Map<ByteBuffer, ByteBuffer> result = new HashMap<ByteBuffer, ByteBuffer>();
|
||||
Map<ByteBuffer, ByteBuffer> result = new HashMap<>();
|
||||
synchronized (MemoryOffsetBackingStore.this) {
|
||||
Map<ByteBuffer, ByteBuffer> namespaceData = data.get(namespace);
|
||||
if (namespaceData == null) {
|
||||
|
@ -94,7 +94,7 @@ public class MemoryOffsetBackingStore implements OffsetBackingStore {
|
|||
synchronized (MemoryOffsetBackingStore.this) {
|
||||
Map<ByteBuffer, ByteBuffer> namespaceData = data.get(namespace);
|
||||
if (namespaceData == null) {
|
||||
namespaceData = new HashMap<ByteBuffer, ByteBuffer>();
|
||||
namespaceData = new HashMap<>();
|
||||
data.put(namespace, namespaceData);
|
||||
}
|
||||
for (Map.Entry<ByteBuffer, ByteBuffer> entry : values.entrySet()) {
|
||||
|
|
|
@ -62,7 +62,7 @@ public class OffsetStorageReaderImpl implements OffsetStorageReader {
|
|||
@Override
|
||||
public Map<Object, Object> getOffsets(Collection<Object> streams) {
|
||||
// Serialize keys so backing store can work with them
|
||||
Map<ByteBuffer, Object> serializedToOriginal = new HashMap<ByteBuffer, Object>(streams.size());
|
||||
Map<ByteBuffer, Object> serializedToOriginal = new HashMap<>(streams.size());
|
||||
for (Object key : streams) {
|
||||
try {
|
||||
byte[] keySerialized = keySerializer.serialize(namespace, converter.fromCopycatData(key));
|
||||
|
@ -85,7 +85,7 @@ public class OffsetStorageReaderImpl implements OffsetStorageReader {
|
|||
}
|
||||
|
||||
// Deserialize all the values and map back to the original keys
|
||||
Map<Object, Object> result = new HashMap<Object, Object>(streams.size());
|
||||
Map<Object, Object> result = new HashMap<>(streams.size());
|
||||
for (Map.Entry<ByteBuffer, ByteBuffer> rawEntry : raw.entrySet()) {
|
||||
try {
|
||||
// Since null could be a valid key, explicitly check whether map contains the key
|
||||
|
|
|
@ -71,7 +71,7 @@ public class OffsetStorageWriter {
|
|||
private final Serializer keySerializer;
|
||||
private final Serializer valueSerializer;
|
||||
private final String namespace;
|
||||
private Map<Object, Object> data = new HashMap<Object, Object>();
|
||||
private Map<Object, Object> data = new HashMap<>();
|
||||
|
||||
// Not synchronized, should only be accessed by flush thread
|
||||
private Map<Object, Object> toFlush = null;
|
||||
|
@ -115,7 +115,7 @@ public class OffsetStorageWriter {
|
|||
|
||||
assert !flushing();
|
||||
toFlush = data;
|
||||
data = new HashMap<Object, Object>();
|
||||
data = new HashMap<>();
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -133,7 +133,7 @@ public class OffsetStorageWriter {
|
|||
// Serialize
|
||||
Map<ByteBuffer, ByteBuffer> offsetsSerialized;
|
||||
try {
|
||||
offsetsSerialized = new HashMap<ByteBuffer, ByteBuffer>();
|
||||
offsetsSerialized = new HashMap<>();
|
||||
for (Map.Entry<Object, Object> entry : toFlush.entrySet()) {
|
||||
byte[] key = keySerializer.serialize(namespace, converter.fromCopycatData(entry.getKey()));
|
||||
ByteBuffer keyBuffer = (key != null) ? ByteBuffer.wrap(key) : null;
|
||||
|
|
|
@ -81,10 +81,7 @@ public abstract class ShutdownableThread extends Thread {
|
|||
public void run() {
|
||||
try {
|
||||
execute();
|
||||
} catch (Error e) {
|
||||
log.error("Thread {} exiting with uncaught exception: ", getName(), e);
|
||||
throw e;
|
||||
} catch (RuntimeException e) {
|
||||
} catch (Error | RuntimeException e) {
|
||||
log.error("Thread {} exiting with uncaught exception: ", getName(), e);
|
||||
throw e;
|
||||
} finally {
|
||||
|
|
|
@ -133,7 +133,7 @@ public class WorkerSinkTaskTest extends ThreadedTest {
|
|||
GenericRecord record = new GenericRecordBuilder(schema).build();
|
||||
byte[] rawKey = "key".getBytes(), rawValue = "value".getBytes();
|
||||
|
||||
ConsumerRecords<Object, Object> records = new ConsumerRecords<Object, Object>(
|
||||
ConsumerRecords<Object, Object> records = new ConsumerRecords<>(
|
||||
Collections.singletonMap(
|
||||
new TopicPartition("topic", 0),
|
||||
Collections.singletonList(
|
||||
|
@ -309,7 +309,7 @@ public class WorkerSinkTaskTest extends ThreadedTest {
|
|||
public ConsumerRecords<Object, Object> answer() throws Throwable {
|
||||
// "Sleep" so time will progress
|
||||
time.sleep(pollDelayMs);
|
||||
ConsumerRecords<Object, Object> records = new ConsumerRecords<Object, Object>(
|
||||
ConsumerRecords<Object, Object> records = new ConsumerRecords<>(
|
||||
Collections.singletonMap(new TopicPartition(TOPIC, PARTITION), Arrays.asList(
|
||||
new ConsumerRecord<Object, Object>(TOPIC, PARTITION,
|
||||
FIRST_OFFSET + recordsReturned, KEY,
|
||||
|
|
|
@ -196,7 +196,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
|
|||
public void testSendRecordsConvertsData() throws Exception {
|
||||
createWorkerTask();
|
||||
|
||||
List<SourceRecord> records = new ArrayList<SourceRecord>();
|
||||
List<SourceRecord> records = new ArrayList<>();
|
||||
// Can just use the same record for key and value
|
||||
records.add(new SourceRecord(STREAM_BYTES, OFFSET_BYTES, "topic", null, RECORD, RECORD));
|
||||
|
||||
|
|
|
@ -85,7 +85,7 @@ public class StandaloneCoordinatorTest extends StandaloneCoordinatorTestBase {
|
|||
PowerMock.replayAll();
|
||||
|
||||
coordinator.addConnector(connectorProps, createCallback);
|
||||
FutureCallback<Void> futureCb = new FutureCallback<Void>(new Callback() {
|
||||
FutureCallback<Void> futureCb = new FutureCallback<>(new Callback() {
|
||||
@Override
|
||||
public void onCompletion(Throwable error, Object result) {
|
||||
|
||||
|
|
|
@ -40,7 +40,7 @@ public class FileOffsetBackingStoreTest {
|
|||
Properties props;
|
||||
File tempFile;
|
||||
|
||||
private static Map<ByteBuffer, ByteBuffer> firstSet = new HashMap<ByteBuffer, ByteBuffer>();
|
||||
private static Map<ByteBuffer, ByteBuffer> firstSet = new HashMap<>();
|
||||
|
||||
static {
|
||||
firstSet.put(buffer("key"), buffer("value"));
|
||||
|
|
Loading…
Reference in New Issue