Style cleanup

This commit is contained in:
Ewen Cheslack-Postava 2015-07-31 00:12:36 -07:00
parent 6ba87debad
commit 122423eef7
23 changed files with 82 additions and 154 deletions

View File

@ -5,9 +5,9 @@
* The ASF licenses this file to You under the Apache License, Version 2.0 * The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with * (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at * the License. You may obtain a copy of the License at
* * <p/>
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* * <p/>
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, * distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -70,27 +70,21 @@ public abstract class CopycatRecord {
@Override @Override
public boolean equals(Object o) { public boolean equals(Object o) {
if (this == o) { if (this == o)
return true; return true;
} if (o == null || getClass() != o.getClass())
if (o == null || getClass() != o.getClass()) {
return false; return false;
}
CopycatRecord that = (CopycatRecord) o; CopycatRecord that = (CopycatRecord) o;
if (key != null ? !key.equals(that.key) : that.key != null) { if (key != null ? !key.equals(that.key) : that.key != null)
return false; return false;
} if (partition != null ? !partition.equals(that.partition) : that.partition != null)
if (partition != null ? !partition.equals(that.partition) : that.partition != null) {
return false; return false;
} if (topic != null ? !topic.equals(that.topic) : that.topic != null)
if (topic != null ? !topic.equals(that.topic) : that.topic != null) {
return false; return false;
} if (value != null ? !value.equals(that.value) : that.value != null)
if (value != null ? !value.equals(that.value) : that.value != null) {
return false; return false;
}
return true; return true;
} }

View File

@ -5,9 +5,9 @@
* The ASF licenses this file to You under the Apache License, Version 2.0 * The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with * (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at * the License. You may obtain a copy of the License at
* * <p/>
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* * <p/>
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, * distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -38,21 +38,17 @@ public class SinkRecord extends CopycatRecord {
@Override @Override
public boolean equals(Object o) { public boolean equals(Object o) {
if (this == o) { if (this == o)
return true; return true;
} if (o == null || getClass() != o.getClass())
if (o == null || getClass() != o.getClass()) {
return false; return false;
} if (!super.equals(o))
if (!super.equals(o)) {
return false; return false;
}
SinkRecord that = (SinkRecord) o; SinkRecord that = (SinkRecord) o;
if (offset != that.offset) { if (offset != that.offset)
return false; return false;
}
return true; return true;
} }

View File

@ -5,9 +5,9 @@
* The ASF licenses this file to You under the Apache License, Version 2.0 * The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with * (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at * the License. You may obtain a copy of the License at
* * <p/>
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* * <p/>
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, * distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -66,24 +66,19 @@ public class SourceRecord extends CopycatRecord {
@Override @Override
public boolean equals(Object o) { public boolean equals(Object o) {
if (this == o) { if (this == o)
return true; return true;
} if (o == null || getClass() != o.getClass())
if (o == null || getClass() != o.getClass()) {
return false; return false;
} if (!super.equals(o))
if (!super.equals(o)) {
return false; return false;
}
SourceRecord that = (SourceRecord) o; SourceRecord that = (SourceRecord) o;
if (offset != null ? !offset.equals(that.offset) : that.offset != null) { if (offset != null ? !offset.equals(that.offset) : that.offset != null)
return false; return false;
} if (stream != null ? !stream.equals(that.stream) : that.stream != null)
if (stream != null ? !stream.equals(that.stream) : that.stream != null) {
return false; return false;
}
return true; return true;
} }

View File

@ -38,9 +38,8 @@ public class ConnectorUtils {
* @param numGroups the number of output groups to generate. * @param numGroups the number of output groups to generate.
*/ */
public static <T> List<List<T>> groupPartitions(List<T> elements, int numGroups) { public static <T> List<List<T>> groupPartitions(List<T> elements, int numGroups) {
if (numGroups <= 0) { if (numGroups <= 0)
throw new IllegalArgumentException("Number of groups must be positive."); throw new IllegalArgumentException("Number of groups must be positive.");
}
List<List<T>> result = new ArrayList<>(numGroups); List<List<T>> result = new ArrayList<>(numGroups);

View File

@ -69,9 +69,8 @@ public class ConnectorTest {
@Override @Override
public void stop() throws CopycatException { public void stop() throws CopycatException {
stopOrder = order++; stopOrder = order++;
if (stopException) { if (stopException)
throw new CopycatException("error"); throw new CopycatException("error");
} }
} }
} }
}

View File

@ -49,9 +49,8 @@ public class FileStreamSinkConnector extends SinkConnector {
ArrayList<Properties> configs = new ArrayList<>(); ArrayList<Properties> configs = new ArrayList<>();
for (int i = 0; i < maxTasks; i++) { for (int i = 0; i < maxTasks; i++) {
Properties config = new Properties(); Properties config = new Properties();
if (filename != null) { if (filename != null)
config.setProperty(FILE_CONFIG, filename); config.setProperty(FILE_CONFIG, filename);
}
configs.add(config); configs.add(config);
} }
return configs; return configs;

View File

@ -40,13 +40,10 @@ public class FileStreamSourceConnector extends SourceConnector {
public void start(Properties props) throws CopycatException { public void start(Properties props) throws CopycatException {
filename = props.getProperty(FILE_CONFIG); filename = props.getProperty(FILE_CONFIG);
topic = props.getProperty(TOPIC_CONFIG); topic = props.getProperty(TOPIC_CONFIG);
if (topic == null || topic.isEmpty()) { if (topic == null || topic.isEmpty())
throw new CopycatException("ConsoleConnector configuration must include 'topic' setting"); throw new CopycatException("ConsoleConnector configuration must include 'topic' setting");
} if (topic.contains(","))
if (topic.contains(",")) { throw new CopycatException("ConsoleConnector should only have a single topic when used as a source.");
throw new CopycatException("ConsoleConnector should only have a single topic when used as a"
+ " source.");
}
} }
@Override @Override
@ -59,9 +56,8 @@ public class FileStreamSourceConnector extends SourceConnector {
ArrayList<Properties> configs = new ArrayList<>(); ArrayList<Properties> configs = new ArrayList<>();
// Only one input stream makes sense. // Only one input stream makes sense.
Properties config = new Properties(); Properties config = new Properties();
if (filename != null) { if (filename != null)
config.setProperty(FILE_CONFIG, filename); config.setProperty(FILE_CONFIG, filename);
}
config.setProperty(TOPIC_CONFIG, topic); config.setProperty(TOPIC_CONFIG, topic);
configs.add(config); configs.add(config);
return configs; return configs;

View File

@ -74,9 +74,8 @@ public class FileStreamSourceTask extends SourceTask<Object, Object> {
} }
} }
topic = props.getProperty(FileStreamSourceConnector.TOPIC_CONFIG); topic = props.getProperty(FileStreamSourceConnector.TOPIC_CONFIG);
if (topic == null) { if (topic == null)
throw new CopycatRuntimeException("ConsoleSourceTask config missing topic setting"); throw new CopycatRuntimeException("ConsoleSourceTask config missing topic setting");
}
reader = new BufferedReader(new InputStreamReader(stream)); reader = new BufferedReader(new InputStreamReader(stream));
} }
@ -90,9 +89,8 @@ public class FileStreamSourceTask extends SourceTask<Object, Object> {
synchronized (this) { synchronized (this) {
readerCopy = reader; readerCopy = reader;
} }
if (readerCopy == null) { if (readerCopy == null)
return null; return null;
}
ArrayList<SourceRecord> records = null; ArrayList<SourceRecord> records = null;
@ -112,9 +110,8 @@ public class FileStreamSourceTask extends SourceTask<Object, Object> {
do { do {
line = extractLine(); line = extractLine();
if (line != null) { if (line != null) {
if (records == null) { if (records == null)
records = new ArrayList<>(); records = new ArrayList<>();
}
records.add(new SourceRecord(null, streamOffset, topic, line)); records.add(new SourceRecord(null, streamOffset, topic, line));
} }
new ArrayList<SourceRecord>(); new ArrayList<SourceRecord>();
@ -122,9 +119,8 @@ public class FileStreamSourceTask extends SourceTask<Object, Object> {
} }
} }
if (nread <= 0) { if (nread <= 0)
Thread.sleep(1); Thread.sleep(1);
}
return records; return records;
} catch (IOException e) { } catch (IOException e) {
@ -143,9 +139,8 @@ public class FileStreamSourceTask extends SourceTask<Object, Object> {
break; break;
} else if (buffer[i] == '\r') { } else if (buffer[i] == '\r') {
// We need to check for \r\n, so we must skip this if we can't check the next char // We need to check for \r\n, so we must skip this if we can't check the next char
if (i + 1 >= offset) { if (i + 1 >= offset)
return null; return null;
}
until = i; until = i;
newStart = (buffer[i + 1] == '\n') ? i + 2 : i + 1; newStart = (buffer[i + 1] == '\n') ? i + 2 : i + 1;
@ -157,9 +152,8 @@ public class FileStreamSourceTask extends SourceTask<Object, Object> {
String result = new String(buffer, 0, until); String result = new String(buffer, 0, until);
System.arraycopy(buffer, newStart, buffer, 0, buffer.length - newStart); System.arraycopy(buffer, newStart, buffer, 0, buffer.length - newStart);
offset = offset - newStart; offset = offset - newStart;
if (streamOffset != null) { if (streamOffset != null)
streamOffset += newStart; streamOffset += newStart;
}
return result; return result;
} else { } else {
return null; return null;

View File

@ -62,10 +62,9 @@ public class FileStreamSourceTaskTest {
public void teardown() { public void teardown() {
tempFile.delete(); tempFile.delete();
if (verifyMocks) { if (verifyMocks)
PowerMock.verifyAll(); PowerMock.verifyAll();
} }
}
private void replay() { private void replay() {
PowerMock.replayAll(); PowerMock.replayAll();

View File

@ -107,14 +107,12 @@ public class JsonConverter implements Converter {
@Override @Override
public Object toCopycatData(Object value) { public Object toCopycatData(Object value) {
if (!(value instanceof JsonNode)) { if (!(value instanceof JsonNode))
throw new CopycatRuntimeException("JsonConvert can only convert JsonNode objects."); throw new CopycatRuntimeException("JsonConvert can only convert JsonNode objects.");
}
JsonNode data = (JsonNode) value; JsonNode data = (JsonNode) value;
if (!data.isObject() || data.size() != 2 || !data.has(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME) || !data.has(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME)) { if (!data.isObject() || data.size() != 2 || !data.has(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME) || !data.has(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME))
throw new CopycatRuntimeException("JSON data converted to Copycat must be in envelope containing schema"); throw new CopycatRuntimeException("JSON data converted to Copycat must be in envelope containing schema");
}
return convertToCopycat(data.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME), data.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME)); return convertToCopycat(data.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME), data.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME));
} }
@ -155,14 +153,12 @@ public class JsonConverter implements Converter {
private static Schema asCopycatSchema(JsonNode jsonSchema) { private static Schema asCopycatSchema(JsonNode jsonSchema) {
if (jsonSchema.isNull()) { if (jsonSchema.isNull())
return null; return null;
}
JsonNode schemaTypeNode = jsonSchema.get(JsonSchema.SCHEMA_TYPE_FIELD_NAME); JsonNode schemaTypeNode = jsonSchema.get(JsonSchema.SCHEMA_TYPE_FIELD_NAME);
if (schemaTypeNode == null || !schemaTypeNode.isTextual()) { if (schemaTypeNode == null || !schemaTypeNode.isTextual())
throw new CopycatRuntimeException("Schema must contain 'type' field"); throw new CopycatRuntimeException("Schema must contain 'type' field");
}
switch (schemaTypeNode.textValue()) { switch (schemaTypeNode.textValue()) {
case JsonSchema.BOOLEAN_TYPE_NAME: case JsonSchema.BOOLEAN_TYPE_NAME:
@ -238,10 +234,9 @@ public class JsonConverter implements Converter {
itemSchema = fieldSchemaAndValue.schema; itemSchema = fieldSchemaAndValue.schema;
schema.set(JsonSchema.ARRAY_ITEMS_FIELD_NAME, itemSchema); schema.set(JsonSchema.ARRAY_ITEMS_FIELD_NAME, itemSchema);
} else { } else {
if (!itemSchema.equals(fieldSchemaAndValue.schema)) { if (!itemSchema.equals(fieldSchemaAndValue.schema))
throw new CopycatRuntimeException("Mismatching schemas found in a list."); throw new CopycatRuntimeException("Mismatching schemas found in a list.");
} }
}
list.add(fieldSchemaAndValue.payload); list.add(fieldSchemaAndValue.payload);
} }
@ -253,19 +248,16 @@ public class JsonConverter implements Converter {
private static Object convertToCopycat(JsonNode jsonSchema, JsonNode jsonValue) { private static Object convertToCopycat(JsonNode jsonSchema, JsonNode jsonValue) {
if (jsonSchema.isNull()) { if (jsonSchema.isNull())
return null; return null;
}
JsonNode schemaTypeNode = jsonSchema.get(JsonSchema.SCHEMA_TYPE_FIELD_NAME); JsonNode schemaTypeNode = jsonSchema.get(JsonSchema.SCHEMA_TYPE_FIELD_NAME);
if (schemaTypeNode == null || !schemaTypeNode.isTextual()) { if (schemaTypeNode == null || !schemaTypeNode.isTextual())
throw new CopycatRuntimeException("Schema must contain 'type' field. Schema: " + jsonSchema.toString()); throw new CopycatRuntimeException("Schema must contain 'type' field. Schema: " + jsonSchema.toString());
}
JsonToCopycatTypeConverter typeConverter = TO_COPYCAT_CONVERTERS.get(schemaTypeNode.textValue()); JsonToCopycatTypeConverter typeConverter = TO_COPYCAT_CONVERTERS.get(schemaTypeNode.textValue());
if (typeConverter != null) { if (typeConverter != null)
return typeConverter.convert(jsonSchema, jsonValue); return typeConverter.convert(jsonSchema, jsonValue);
}
throw new CopycatRuntimeException("Unknown schema type: " + schemaTypeNode); throw new CopycatRuntimeException("Unknown schema type: " + schemaTypeNode);
} }

View File

@ -84,9 +84,8 @@ public class CopycatConfig extends AbstractConfig {
} }
// Check for -- prefix on key // Check for -- prefix on key
if (key.startsWith("--")) { if (key.startsWith("--"))
key = key.substring(2); key = key.substring(2);
}
props.setProperty(key, value); props.setProperty(key, value);
} }

View File

@ -154,9 +154,8 @@ public class Worker {
for (Map.Entry<ConnectorTaskId, WorkerTask> entry : tasks.entrySet()) { for (Map.Entry<ConnectorTaskId, WorkerTask> entry : tasks.entrySet()) {
WorkerTask task = entry.getValue(); WorkerTask task = entry.getValue();
log.debug("Waiting for task {} to finish shutting down", task); log.debug("Waiting for task {} to finish shutting down", task);
if (!task.awaitStop(Math.max(limit - time.milliseconds(), 0))) { if (!task.awaitStop(Math.max(limit - time.milliseconds(), 0)))
log.error("Graceful shutdown of task {} failed.", task); log.error("Graceful shutdown of task {} failed.", task);
}
task.close(); task.close();
} }
@ -221,9 +220,8 @@ public class Worker {
public void stopTask(ConnectorTaskId id) throws CopycatException { public void stopTask(ConnectorTaskId id) throws CopycatException {
WorkerTask task = getTask(id); WorkerTask task = getTask(id);
task.stop(); task.stop();
if (!task.awaitStop(config.getLong(WorkerConfig.TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG))) { if (!task.awaitStop(config.getLong(WorkerConfig.TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG)))
log.error("Graceful stop of task {} failed.", task); log.error("Graceful stop of task {} failed.", task);
}
task.close(); task.close();
tasks.remove(id); tasks.remove(id);
} }

View File

@ -73,9 +73,8 @@ public class WorkerSinkTask implements WorkerTask {
public void stop() throws CopycatException { public void stop() throws CopycatException {
// Offset commit is handled upon exit in work thread // Offset commit is handled upon exit in work thread
task.stop(); task.stop();
if (workThread != null) { if (workThread != null)
workThread.startGracefulShutdown(); workThread.startGracefulShutdown();
}
consumer.wakeup(); consumer.wakeup();
} }
@ -84,9 +83,8 @@ public class WorkerSinkTask implements WorkerTask {
if (workThread != null) { if (workThread != null) {
try { try {
boolean success = workThread.awaitShutdown(timeoutMs, TimeUnit.MILLISECONDS); boolean success = workThread.awaitShutdown(timeoutMs, TimeUnit.MILLISECONDS);
if (!success) { if (!success)
workThread.forceShutdown(); workThread.forceShutdown();
}
return success; return success;
} catch (InterruptedException e) { } catch (InterruptedException e) {
return false; return false;
@ -99,10 +97,9 @@ public class WorkerSinkTask implements WorkerTask {
public void close() { public void close() {
// FIXME Kafka needs to add a timeout parameter here for us to properly obey the timeout // FIXME Kafka needs to add a timeout parameter here for us to properly obey the timeout
// passed in // passed in
if (consumer != null) { if (consumer != null)
consumer.close(); consumer.close();
} }
}
/** Poll for new messages with the given timeout. Should only be invoked by the worker thread. */ /** Poll for new messages with the given timeout. Should only be invoked by the worker thread. */
public void poll(long timeoutMs) { public void poll(long timeoutMs) {
@ -156,9 +153,8 @@ public class WorkerSinkTask implements WorkerTask {
private KafkaConsumer<Object, Object> createConsumer(Properties taskProps) { private KafkaConsumer<Object, Object> createConsumer(Properties taskProps) {
String topicsStr = taskProps.getProperty(SinkTask.TOPICS_CONFIG); String topicsStr = taskProps.getProperty(SinkTask.TOPICS_CONFIG);
if (topicsStr == null || topicsStr.isEmpty()) { if (topicsStr == null || topicsStr.isEmpty())
throw new CopycatRuntimeException("Sink tasks require a list of topics."); throw new CopycatRuntimeException("Sink tasks require a list of topics.");
}
String[] topics = topicsStr.split(","); String[] topics = topicsStr.split(",");
// Include any unknown worker configs so consumer configs can be set globally on the worker // Include any unknown worker configs so consumer configs can be set globally on the worker

View File

@ -99,19 +99,17 @@ public class WorkerSourceTask implements WorkerTask {
public void stop() throws CopycatException { public void stop() throws CopycatException {
task.stop(); task.stop();
commitOffsets(); commitOffsets();
if (workThread != null) { if (workThread != null)
workThread.startGracefulShutdown(); workThread.startGracefulShutdown();
} }
}
@Override @Override
public boolean awaitStop(long timeoutMs) { public boolean awaitStop(long timeoutMs) {
if (workThread != null) { if (workThread != null) {
try { try {
boolean success = workThread.awaitShutdown(timeoutMs, TimeUnit.MILLISECONDS); boolean success = workThread.awaitShutdown(timeoutMs, TimeUnit.MILLISECONDS);
if (!success) { if (!success)
workThread.forceShutdown(); workThread.forceShutdown();
}
return success; return success;
} catch (InterruptedException e) { } catch (InterruptedException e) {
return false; return false;
@ -165,9 +163,8 @@ public class WorkerSourceTask implements WorkerTask {
private synchronized void recordSent(final ProducerRecord<Object, Object> record) { private synchronized void recordSent(final ProducerRecord<Object, Object> record) {
ProducerRecord<Object, Object> removed = outstandingMessages.remove(record); ProducerRecord<Object, Object> removed = outstandingMessages.remove(record);
// While flushing, we may also see callbacks for items in the backlog // While flushing, we may also see callbacks for items in the backlog
if (removed == null && flushing) { if (removed == null && flushing)
removed = outstandingMessagesBacklog.remove(record); removed = outstandingMessagesBacklog.remove(record);
}
// But if neither one had it, something is very wrong // But if neither one had it, something is very wrong
if (removed == null) { if (removed == null) {
log.error("Saw callback for record that was not present in the outstanding message set: " log.error("Saw callback for record that was not present in the outstanding message set: "
@ -294,9 +291,8 @@ public class WorkerSourceTask implements WorkerTask {
try { try {
while (getRunning()) { while (getRunning()) {
List<SourceRecord> records = task.poll(); List<SourceRecord> records = task.poll();
if (records == null) { if (records == null)
continue; continue;
}
sendRecords(records); sendRecords(records);
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {

View File

@ -57,11 +57,10 @@ public class FileConfigStorage implements ConfigStorage {
@Override @Override
public void putConnectorConfig(String connector, Properties properties) { public void putConnectorConfig(String connector, Properties properties) {
if (properties == null) { if (properties == null)
connectorConfig.remove(connector); connectorConfig.remove(connector);
} else { else
connectorConfig.put(connector, properties); connectorConfig.put(connector, properties);
}
save(); save();
} }

View File

@ -91,31 +91,27 @@ public class StandaloneCoordinator implements Coordinator {
Callback<String> callback) { Callback<String> callback) {
try { try {
ConnectorState connState = createConnector(connectorProps); ConnectorState connState = createConnector(connectorProps);
if (callback != null) { if (callback != null)
callback.onCompletion(null, connState.name); callback.onCompletion(null, connState.name);
}
// This should always be a new job, create jobs from scratch // This should always be a new job, create jobs from scratch
createConnectorTasks(connState); createConnectorTasks(connState);
} catch (CopycatRuntimeException e) { } catch (CopycatRuntimeException e) {
if (callback != null) { if (callback != null)
callback.onCompletion(e, null); callback.onCompletion(e, null);
} }
} }
}
@Override @Override
public synchronized void deleteConnector(String name, Callback<Void> callback) { public synchronized void deleteConnector(String name, Callback<Void> callback) {
try { try {
destroyConnector(name); destroyConnector(name);
if (callback != null) { if (callback != null)
callback.onCompletion(null, null); callback.onCompletion(null, null);
}
} catch (CopycatRuntimeException e) { } catch (CopycatRuntimeException e) {
if (callback != null) { if (callback != null)
callback.onCompletion(e, null); callback.onCompletion(e, null);
} }
} }
}
// Creates the and configures the connector. Does not setup any tasks // Creates the and configures the connector. Does not setup any tasks
private ConnectorState createConnector(Properties connectorProps) { private ConnectorState createConnector(Properties connectorProps) {
@ -148,9 +144,8 @@ public class StandaloneCoordinator implements Coordinator {
} }
ConnectorState state = new ConnectorState(connName, connector, maxTasks, topics); ConnectorState state = new ConnectorState(connName, connector, maxTasks, topics);
connectors.put(connName, state); connectors.put(connName, state);
if (configStorage != null) { if (configStorage != null)
configStorage.putConnectorConfig(connName, connectorProps); configStorage.putConnectorConfig(connName, connectorProps);
}
log.info("Finished creating connector {}", connName); log.info("Finished creating connector {}", connName);
@ -171,9 +166,8 @@ public class StandaloneCoordinator implements Coordinator {
stopConnector(state); stopConnector(state);
connectors.remove(state.name); connectors.remove(state.name);
if (configStorage != null) { if (configStorage != null)
configStorage.putConnectorConfig(state.name, null); configStorage.putConnectorConfig(state.name, null);
}
log.info("Finished destroying connector {}", connName); log.info("Finished destroying connector {}", connName);
} }
@ -251,9 +245,8 @@ public class StandaloneCoordinator implements Coordinator {
} }
private void restoreConnectors() { private void restoreConnectors() {
if (configStorage == null) { if (configStorage == null)
return; return;
}
Collection<String> connNames = configStorage.getConnectors(); Collection<String> connNames = configStorage.getConnectors();
for (String connName : connNames) { for (String connName : connNames) {

View File

@ -69,16 +69,14 @@ public class MemoryOffsetBackingStore implements OffsetBackingStore {
Map<ByteBuffer, ByteBuffer> result = new HashMap<>(); Map<ByteBuffer, ByteBuffer> result = new HashMap<>();
synchronized (MemoryOffsetBackingStore.this) { synchronized (MemoryOffsetBackingStore.this) {
Map<ByteBuffer, ByteBuffer> namespaceData = data.get(namespace); Map<ByteBuffer, ByteBuffer> namespaceData = data.get(namespace);
if (namespaceData == null) { if (namespaceData == null)
return result; return result;
}
for (ByteBuffer key : keys) { for (ByteBuffer key : keys) {
result.put(key, namespaceData.get(key)); result.put(key, namespaceData.get(key));
} }
} }
if (callback != null) { if (callback != null)
callback.onCompletion(null, result); callback.onCompletion(null, result);
}
return result; return result;
} }
}); });
@ -102,9 +100,8 @@ public class MemoryOffsetBackingStore implements OffsetBackingStore {
} }
save(); save();
} }
if (callback != null) { if (callback != null)
callback.onCompletion(null, null); callback.onCompletion(null, null);
}
return null; return null;
} }
}); });

View File

@ -109,9 +109,8 @@ public class OffsetStorageWriter {
throw new CopycatRuntimeException("OffsetStorageWriter is already flushing"); throw new CopycatRuntimeException("OffsetStorageWriter is already flushing");
} }
if (data.isEmpty()) { if (data.isEmpty())
return false; return false;
}
assert !flushing(); assert !flushing();
toFlush = data; toFlush = data;
@ -157,10 +156,9 @@ public class OffsetStorageWriter {
@Override @Override
public void onCompletion(Throwable error, Void result) { public void onCompletion(Throwable error, Void result) {
boolean isCurrent = handleFinishWrite(flushId, error, result); boolean isCurrent = handleFinishWrite(flushId, error, result);
if (isCurrent && callback != null) { if (isCurrent && callback != null)
callback.onCompletion(error, result); callback.onCompletion(error, result);
} }
}
}); });
} }
@ -188,9 +186,8 @@ public class OffsetStorageWriter {
private synchronized boolean handleFinishWrite(long flushId, Throwable error, Void result) { private synchronized boolean handleFinishWrite(long flushId, Throwable error, Void result) {
// Callbacks need to be handled carefully since the flush operation may have already timed // Callbacks need to be handled carefully since the flush operation may have already timed
// out and been cancelled. // out and been cancelled.
if (flushId != currentFlushId) { if (flushId != currentFlushId)
return false; return false;
}
if (error != null) { if (error != null) {
cancelFlush(); cancelFlush();

View File

@ -42,21 +42,17 @@ public class ConnectorTaskId implements Serializable {
@Override @Override
public boolean equals(Object o) { public boolean equals(Object o) {
if (this == o) { if (this == o)
return true; return true;
} if (o == null || getClass() != o.getClass())
if (o == null || getClass() != o.getClass()) {
return false; return false;
}
ConnectorTaskId that = (ConnectorTaskId) o; ConnectorTaskId that = (ConnectorTaskId) o;
if (task != that.task) { if (task != that.task)
return false; return false;
} if (connector != null ? !connector.equals(that.connector) : that.connector != null)
if (connector != null ? !connector.equals(that.connector) : that.connector != null) {
return false; return false;
}
return true; return true;
} }

View File

@ -59,10 +59,9 @@ public abstract class ShutdownableThread extends Thread {
public ShutdownableThread(String name, boolean daemon) { public ShutdownableThread(String name, boolean daemon) {
super(name); super(name);
this.setDaemon(daemon); this.setDaemon(daemon);
if (funcaughtExceptionHandler != null) { if (funcaughtExceptionHandler != null)
this.setUncaughtExceptionHandler(funcaughtExceptionHandler); this.setUncaughtExceptionHandler(funcaughtExceptionHandler);
} }
}
/** /**
* Implementations should override this method with the main body for the thread. * Implementations should override this method with the main body for the thread.
@ -98,10 +97,9 @@ public abstract class ShutdownableThread extends Thread {
public void shutdown(long gracefulTimeout, TimeUnit unit) public void shutdown(long gracefulTimeout, TimeUnit unit)
throws InterruptedException { throws InterruptedException {
boolean success = gracefulShutdown(gracefulTimeout, unit); boolean success = gracefulShutdown(gracefulTimeout, unit);
if (!success) { if (!success)
forceShutdown(); forceShutdown();
} }
}
/** /**
* Attempt graceful shutdown * Attempt graceful shutdown

View File

@ -355,9 +355,8 @@ public class WorkerSinkTaskTest extends ThreadedTest {
@Override @Override
public Object answer() throws Throwable { public Object answer() throws Throwable {
time.sleep(consumerCommitDelayMs); time.sleep(consumerCommitDelayMs);
if (invokeCallback) { if (invokeCallback)
capturedCallback.getValue().onComplete(offsets, consumerCommitError); capturedCallback.getValue().onComplete(offsets, consumerCommitError);
}
return null; return null;
} }
}); });

View File

@ -207,9 +207,8 @@ public class OffsetStorageWriterTest {
return service.submit(new Callable<Void>() { return service.submit(new Callable<Void>() {
@Override @Override
public Void call() throws Exception { public Void call() throws Exception {
if (waitForCompletion != null) { if (waitForCompletion != null)
assertTrue(waitForCompletion.await(10000, TimeUnit.MILLISECONDS)); assertTrue(waitForCompletion.await(10000, TimeUnit.MILLISECONDS));
}
if (fail) { if (fail) {
storeCallback.getValue().onCompletion(exception, null); storeCallback.getValue().onCompletion(exception, null);

View File

@ -26,14 +26,12 @@ public class TestBackgroundThreadExceptionHandler implements Thread.UncaughtExce
@Override @Override
public void uncaughtException(Thread t, Throwable e) { public void uncaughtException(Thread t, Throwable e) {
if (this.firstException == null) { if (this.firstException == null)
this.firstException = e; this.firstException = e;
} }
}
public void verifyNoExceptions() { public void verifyNoExceptions() {
if (this.firstException != null) { if (this.firstException != null)
throw new AssertionError(this.firstException); throw new AssertionError(this.firstException);
} }
} }
}