Style cleanup

This commit is contained in:
Ewen Cheslack-Postava 2015-07-31 00:12:36 -07:00
parent 6ba87debad
commit 122423eef7
23 changed files with 82 additions and 154 deletions

View File

@ -5,9 +5,9 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -70,27 +70,21 @@ public abstract class CopycatRecord {
@Override
public boolean equals(Object o) {
if (this == o) {
if (this == o)
return true;
}
if (o == null || getClass() != o.getClass()) {
if (o == null || getClass() != o.getClass())
return false;
}
CopycatRecord that = (CopycatRecord) o;
if (key != null ? !key.equals(that.key) : that.key != null) {
if (key != null ? !key.equals(that.key) : that.key != null)
return false;
}
if (partition != null ? !partition.equals(that.partition) : that.partition != null) {
if (partition != null ? !partition.equals(that.partition) : that.partition != null)
return false;
}
if (topic != null ? !topic.equals(that.topic) : that.topic != null) {
if (topic != null ? !topic.equals(that.topic) : that.topic != null)
return false;
}
if (value != null ? !value.equals(that.value) : that.value != null) {
if (value != null ? !value.equals(that.value) : that.value != null)
return false;
}
return true;
}

View File

@ -5,9 +5,9 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -38,21 +38,17 @@ public class SinkRecord extends CopycatRecord {
@Override
public boolean equals(Object o) {
if (this == o) {
if (this == o)
return true;
}
if (o == null || getClass() != o.getClass()) {
if (o == null || getClass() != o.getClass())
return false;
}
if (!super.equals(o)) {
if (!super.equals(o))
return false;
}
SinkRecord that = (SinkRecord) o;
if (offset != that.offset) {
if (offset != that.offset)
return false;
}
return true;
}

View File

@ -5,9 +5,9 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -66,24 +66,19 @@ public class SourceRecord extends CopycatRecord {
@Override
public boolean equals(Object o) {
if (this == o) {
if (this == o)
return true;
}
if (o == null || getClass() != o.getClass()) {
if (o == null || getClass() != o.getClass())
return false;
}
if (!super.equals(o)) {
if (!super.equals(o))
return false;
}
SourceRecord that = (SourceRecord) o;
if (offset != null ? !offset.equals(that.offset) : that.offset != null) {
if (offset != null ? !offset.equals(that.offset) : that.offset != null)
return false;
}
if (stream != null ? !stream.equals(that.stream) : that.stream != null) {
if (stream != null ? !stream.equals(that.stream) : that.stream != null)
return false;
}
return true;
}

View File

@ -38,9 +38,8 @@ public class ConnectorUtils {
* @param numGroups the number of output groups to generate.
*/
public static <T> List<List<T>> groupPartitions(List<T> elements, int numGroups) {
if (numGroups <= 0) {
if (numGroups <= 0)
throw new IllegalArgumentException("Number of groups must be positive.");
}
List<List<T>> result = new ArrayList<>(numGroups);

View File

@ -69,9 +69,8 @@ public class ConnectorTest {
@Override
public void stop() throws CopycatException {
stopOrder = order++;
if (stopException) {
if (stopException)
throw new CopycatException("error");
}
}
}
}

View File

@ -49,9 +49,8 @@ public class FileStreamSinkConnector extends SinkConnector {
ArrayList<Properties> configs = new ArrayList<>();
for (int i = 0; i < maxTasks; i++) {
Properties config = new Properties();
if (filename != null) {
if (filename != null)
config.setProperty(FILE_CONFIG, filename);
}
configs.add(config);
}
return configs;

View File

@ -40,13 +40,10 @@ public class FileStreamSourceConnector extends SourceConnector {
public void start(Properties props) throws CopycatException {
filename = props.getProperty(FILE_CONFIG);
topic = props.getProperty(TOPIC_CONFIG);
if (topic == null || topic.isEmpty()) {
if (topic == null || topic.isEmpty())
throw new CopycatException("ConsoleConnector configuration must include 'topic' setting");
}
if (topic.contains(",")) {
throw new CopycatException("ConsoleConnector should only have a single topic when used as a"
+ " source.");
}
if (topic.contains(","))
throw new CopycatException("ConsoleConnector should only have a single topic when used as a source.");
}
@Override
@ -59,9 +56,8 @@ public class FileStreamSourceConnector extends SourceConnector {
ArrayList<Properties> configs = new ArrayList<>();
// Only one input stream makes sense.
Properties config = new Properties();
if (filename != null) {
if (filename != null)
config.setProperty(FILE_CONFIG, filename);
}
config.setProperty(TOPIC_CONFIG, topic);
configs.add(config);
return configs;

View File

@ -74,9 +74,8 @@ public class FileStreamSourceTask extends SourceTask<Object, Object> {
}
}
topic = props.getProperty(FileStreamSourceConnector.TOPIC_CONFIG);
if (topic == null) {
if (topic == null)
throw new CopycatRuntimeException("ConsoleSourceTask config missing topic setting");
}
reader = new BufferedReader(new InputStreamReader(stream));
}
@ -90,9 +89,8 @@ public class FileStreamSourceTask extends SourceTask<Object, Object> {
synchronized (this) {
readerCopy = reader;
}
if (readerCopy == null) {
if (readerCopy == null)
return null;
}
ArrayList<SourceRecord> records = null;
@ -112,9 +110,8 @@ public class FileStreamSourceTask extends SourceTask<Object, Object> {
do {
line = extractLine();
if (line != null) {
if (records == null) {
if (records == null)
records = new ArrayList<>();
}
records.add(new SourceRecord(null, streamOffset, topic, line));
}
new ArrayList<SourceRecord>();
@ -122,9 +119,8 @@ public class FileStreamSourceTask extends SourceTask<Object, Object> {
}
}
if (nread <= 0) {
if (nread <= 0)
Thread.sleep(1);
}
return records;
} catch (IOException e) {
@ -143,9 +139,8 @@ public class FileStreamSourceTask extends SourceTask<Object, Object> {
break;
} else if (buffer[i] == '\r') {
// We need to check for \r\n, so we must skip this if we can't check the next char
if (i + 1 >= offset) {
if (i + 1 >= offset)
return null;
}
until = i;
newStart = (buffer[i + 1] == '\n') ? i + 2 : i + 1;
@ -157,9 +152,8 @@ public class FileStreamSourceTask extends SourceTask<Object, Object> {
String result = new String(buffer, 0, until);
System.arraycopy(buffer, newStart, buffer, 0, buffer.length - newStart);
offset = offset - newStart;
if (streamOffset != null) {
if (streamOffset != null)
streamOffset += newStart;
}
return result;
} else {
return null;

View File

@ -62,9 +62,8 @@ public class FileStreamSourceTaskTest {
public void teardown() {
tempFile.delete();
if (verifyMocks) {
if (verifyMocks)
PowerMock.verifyAll();
}
}
private void replay() {

View File

@ -107,14 +107,12 @@ public class JsonConverter implements Converter {
@Override
public Object toCopycatData(Object value) {
if (!(value instanceof JsonNode)) {
if (!(value instanceof JsonNode))
throw new CopycatRuntimeException("JsonConvert can only convert JsonNode objects.");
}
JsonNode data = (JsonNode) value;
if (!data.isObject() || data.size() != 2 || !data.has(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME) || !data.has(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME)) {
if (!data.isObject() || data.size() != 2 || !data.has(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME) || !data.has(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME))
throw new CopycatRuntimeException("JSON data converted to Copycat must be in envelope containing schema");
}
return convertToCopycat(data.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME), data.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME));
}
@ -155,14 +153,12 @@ public class JsonConverter implements Converter {
private static Schema asCopycatSchema(JsonNode jsonSchema) {
if (jsonSchema.isNull()) {
if (jsonSchema.isNull())
return null;
}
JsonNode schemaTypeNode = jsonSchema.get(JsonSchema.SCHEMA_TYPE_FIELD_NAME);
if (schemaTypeNode == null || !schemaTypeNode.isTextual()) {
if (schemaTypeNode == null || !schemaTypeNode.isTextual())
throw new CopycatRuntimeException("Schema must contain 'type' field");
}
switch (schemaTypeNode.textValue()) {
case JsonSchema.BOOLEAN_TYPE_NAME:
@ -238,9 +234,8 @@ public class JsonConverter implements Converter {
itemSchema = fieldSchemaAndValue.schema;
schema.set(JsonSchema.ARRAY_ITEMS_FIELD_NAME, itemSchema);
} else {
if (!itemSchema.equals(fieldSchemaAndValue.schema)) {
if (!itemSchema.equals(fieldSchemaAndValue.schema))
throw new CopycatRuntimeException("Mismatching schemas found in a list.");
}
}
list.add(fieldSchemaAndValue.payload);
@ -253,19 +248,16 @@ public class JsonConverter implements Converter {
private static Object convertToCopycat(JsonNode jsonSchema, JsonNode jsonValue) {
if (jsonSchema.isNull()) {
if (jsonSchema.isNull())
return null;
}
JsonNode schemaTypeNode = jsonSchema.get(JsonSchema.SCHEMA_TYPE_FIELD_NAME);
if (schemaTypeNode == null || !schemaTypeNode.isTextual()) {
if (schemaTypeNode == null || !schemaTypeNode.isTextual())
throw new CopycatRuntimeException("Schema must contain 'type' field. Schema: " + jsonSchema.toString());
}
JsonToCopycatTypeConverter typeConverter = TO_COPYCAT_CONVERTERS.get(schemaTypeNode.textValue());
if (typeConverter != null) {
if (typeConverter != null)
return typeConverter.convert(jsonSchema, jsonValue);
}
throw new CopycatRuntimeException("Unknown schema type: " + schemaTypeNode);
}

View File

@ -84,9 +84,8 @@ public class CopycatConfig extends AbstractConfig {
}
// Check for -- prefix on key
if (key.startsWith("--")) {
if (key.startsWith("--"))
key = key.substring(2);
}
props.setProperty(key, value);
}

View File

@ -154,9 +154,8 @@ public class Worker {
for (Map.Entry<ConnectorTaskId, WorkerTask> entry : tasks.entrySet()) {
WorkerTask task = entry.getValue();
log.debug("Waiting for task {} to finish shutting down", task);
if (!task.awaitStop(Math.max(limit - time.milliseconds(), 0))) {
if (!task.awaitStop(Math.max(limit - time.milliseconds(), 0)))
log.error("Graceful shutdown of task {} failed.", task);
}
task.close();
}
@ -221,9 +220,8 @@ public class Worker {
public void stopTask(ConnectorTaskId id) throws CopycatException {
WorkerTask task = getTask(id);
task.stop();
if (!task.awaitStop(config.getLong(WorkerConfig.TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG))) {
if (!task.awaitStop(config.getLong(WorkerConfig.TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG)))
log.error("Graceful stop of task {} failed.", task);
}
task.close();
tasks.remove(id);
}

View File

@ -73,9 +73,8 @@ public class WorkerSinkTask implements WorkerTask {
public void stop() throws CopycatException {
// Offset commit is handled upon exit in work thread
task.stop();
if (workThread != null) {
if (workThread != null)
workThread.startGracefulShutdown();
}
consumer.wakeup();
}
@ -84,9 +83,8 @@ public class WorkerSinkTask implements WorkerTask {
if (workThread != null) {
try {
boolean success = workThread.awaitShutdown(timeoutMs, TimeUnit.MILLISECONDS);
if (!success) {
if (!success)
workThread.forceShutdown();
}
return success;
} catch (InterruptedException e) {
return false;
@ -99,9 +97,8 @@ public class WorkerSinkTask implements WorkerTask {
public void close() {
// FIXME Kafka needs to add a timeout parameter here for us to properly obey the timeout
// passed in
if (consumer != null) {
if (consumer != null)
consumer.close();
}
}
/** Poll for new messages with the given timeout. Should only be invoked by the worker thread. */
@ -156,9 +153,8 @@ public class WorkerSinkTask implements WorkerTask {
private KafkaConsumer<Object, Object> createConsumer(Properties taskProps) {
String topicsStr = taskProps.getProperty(SinkTask.TOPICS_CONFIG);
if (topicsStr == null || topicsStr.isEmpty()) {
if (topicsStr == null || topicsStr.isEmpty())
throw new CopycatRuntimeException("Sink tasks require a list of topics.");
}
String[] topics = topicsStr.split(",");
// Include any unknown worker configs so consumer configs can be set globally on the worker

View File

@ -99,9 +99,8 @@ public class WorkerSourceTask implements WorkerTask {
public void stop() throws CopycatException {
task.stop();
commitOffsets();
if (workThread != null) {
if (workThread != null)
workThread.startGracefulShutdown();
}
}
@Override
@ -109,9 +108,8 @@ public class WorkerSourceTask implements WorkerTask {
if (workThread != null) {
try {
boolean success = workThread.awaitShutdown(timeoutMs, TimeUnit.MILLISECONDS);
if (!success) {
if (!success)
workThread.forceShutdown();
}
return success;
} catch (InterruptedException e) {
return false;
@ -165,9 +163,8 @@ public class WorkerSourceTask implements WorkerTask {
private synchronized void recordSent(final ProducerRecord<Object, Object> record) {
ProducerRecord<Object, Object> removed = outstandingMessages.remove(record);
// While flushing, we may also see callbacks for items in the backlog
if (removed == null && flushing) {
if (removed == null && flushing)
removed = outstandingMessagesBacklog.remove(record);
}
// But if neither one had it, something is very wrong
if (removed == null) {
log.error("Saw callback for record that was not present in the outstanding message set: "
@ -294,9 +291,8 @@ public class WorkerSourceTask implements WorkerTask {
try {
while (getRunning()) {
List<SourceRecord> records = task.poll();
if (records == null) {
if (records == null)
continue;
}
sendRecords(records);
}
} catch (InterruptedException e) {

View File

@ -57,11 +57,10 @@ public class FileConfigStorage implements ConfigStorage {
@Override
public void putConnectorConfig(String connector, Properties properties) {
if (properties == null) {
if (properties == null)
connectorConfig.remove(connector);
} else {
else
connectorConfig.put(connector, properties);
}
save();
}

View File

@ -91,15 +91,13 @@ public class StandaloneCoordinator implements Coordinator {
Callback<String> callback) {
try {
ConnectorState connState = createConnector(connectorProps);
if (callback != null) {
if (callback != null)
callback.onCompletion(null, connState.name);
}
// This should always be a new job, create jobs from scratch
createConnectorTasks(connState);
} catch (CopycatRuntimeException e) {
if (callback != null) {
if (callback != null)
callback.onCompletion(e, null);
}
}
}
@ -107,13 +105,11 @@ public class StandaloneCoordinator implements Coordinator {
public synchronized void deleteConnector(String name, Callback<Void> callback) {
try {
destroyConnector(name);
if (callback != null) {
if (callback != null)
callback.onCompletion(null, null);
}
} catch (CopycatRuntimeException e) {
if (callback != null) {
if (callback != null)
callback.onCompletion(e, null);
}
}
}
@ -148,9 +144,8 @@ public class StandaloneCoordinator implements Coordinator {
}
ConnectorState state = new ConnectorState(connName, connector, maxTasks, topics);
connectors.put(connName, state);
if (configStorage != null) {
if (configStorage != null)
configStorage.putConnectorConfig(connName, connectorProps);
}
log.info("Finished creating connector {}", connName);
@ -171,9 +166,8 @@ public class StandaloneCoordinator implements Coordinator {
stopConnector(state);
connectors.remove(state.name);
if (configStorage != null) {
if (configStorage != null)
configStorage.putConnectorConfig(state.name, null);
}
log.info("Finished destroying connector {}", connName);
}
@ -251,9 +245,8 @@ public class StandaloneCoordinator implements Coordinator {
}
private void restoreConnectors() {
if (configStorage == null) {
if (configStorage == null)
return;
}
Collection<String> connNames = configStorage.getConnectors();
for (String connName : connNames) {

View File

@ -69,16 +69,14 @@ public class MemoryOffsetBackingStore implements OffsetBackingStore {
Map<ByteBuffer, ByteBuffer> result = new HashMap<>();
synchronized (MemoryOffsetBackingStore.this) {
Map<ByteBuffer, ByteBuffer> namespaceData = data.get(namespace);
if (namespaceData == null) {
if (namespaceData == null)
return result;
}
for (ByteBuffer key : keys) {
result.put(key, namespaceData.get(key));
}
}
if (callback != null) {
if (callback != null)
callback.onCompletion(null, result);
}
return result;
}
});
@ -102,9 +100,8 @@ public class MemoryOffsetBackingStore implements OffsetBackingStore {
}
save();
}
if (callback != null) {
if (callback != null)
callback.onCompletion(null, null);
}
return null;
}
});

View File

@ -109,9 +109,8 @@ public class OffsetStorageWriter {
throw new CopycatRuntimeException("OffsetStorageWriter is already flushing");
}
if (data.isEmpty()) {
if (data.isEmpty())
return false;
}
assert !flushing();
toFlush = data;
@ -157,9 +156,8 @@ public class OffsetStorageWriter {
@Override
public void onCompletion(Throwable error, Void result) {
boolean isCurrent = handleFinishWrite(flushId, error, result);
if (isCurrent && callback != null) {
if (isCurrent && callback != null)
callback.onCompletion(error, result);
}
}
});
}
@ -188,9 +186,8 @@ public class OffsetStorageWriter {
private synchronized boolean handleFinishWrite(long flushId, Throwable error, Void result) {
// Callbacks need to be handled carefully since the flush operation may have already timed
// out and been cancelled.
if (flushId != currentFlushId) {
if (flushId != currentFlushId)
return false;
}
if (error != null) {
cancelFlush();

View File

@ -42,21 +42,17 @@ public class ConnectorTaskId implements Serializable {
@Override
public boolean equals(Object o) {
if (this == o) {
if (this == o)
return true;
}
if (o == null || getClass() != o.getClass()) {
if (o == null || getClass() != o.getClass())
return false;
}
ConnectorTaskId that = (ConnectorTaskId) o;
if (task != that.task) {
if (task != that.task)
return false;
}
if (connector != null ? !connector.equals(that.connector) : that.connector != null) {
if (connector != null ? !connector.equals(that.connector) : that.connector != null)
return false;
}
return true;
}

View File

@ -59,9 +59,8 @@ public abstract class ShutdownableThread extends Thread {
public ShutdownableThread(String name, boolean daemon) {
super(name);
this.setDaemon(daemon);
if (funcaughtExceptionHandler != null) {
if (funcaughtExceptionHandler != null)
this.setUncaughtExceptionHandler(funcaughtExceptionHandler);
}
}
/**
@ -98,9 +97,8 @@ public abstract class ShutdownableThread extends Thread {
public void shutdown(long gracefulTimeout, TimeUnit unit)
throws InterruptedException {
boolean success = gracefulShutdown(gracefulTimeout, unit);
if (!success) {
if (!success)
forceShutdown();
}
}
/**

View File

@ -355,9 +355,8 @@ public class WorkerSinkTaskTest extends ThreadedTest {
@Override
public Object answer() throws Throwable {
time.sleep(consumerCommitDelayMs);
if (invokeCallback) {
if (invokeCallback)
capturedCallback.getValue().onComplete(offsets, consumerCommitError);
}
return null;
}
});

View File

@ -207,9 +207,8 @@ public class OffsetStorageWriterTest {
return service.submit(new Callable<Void>() {
@Override
public Void call() throws Exception {
if (waitForCompletion != null) {
if (waitForCompletion != null)
assertTrue(waitForCompletion.await(10000, TimeUnit.MILLISECONDS));
}
if (fail) {
storeCallback.getValue().onCompletion(exception, null);

View File

@ -26,14 +26,12 @@ public class TestBackgroundThreadExceptionHandler implements Thread.UncaughtExce
@Override
public void uncaughtException(Thread t, Throwable e) {
if (this.firstException == null) {
if (this.firstException == null)
this.firstException = e;
}
}
public void verifyNoExceptions() {
if (this.firstException != null) {
if (this.firstException != null)
throw new AssertionError(this.firstException);
}
}
}