mirror of https://github.com/apache/kafka.git
Style cleanup
This commit is contained in:
parent
6ba87debad
commit
122423eef7
|
@ -5,9 +5,9 @@
|
|||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* <p/>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p/>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
|
@ -70,27 +70,21 @@ public abstract class CopycatRecord {
|
|||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) {
|
||||
if (this == o)
|
||||
return true;
|
||||
}
|
||||
if (o == null || getClass() != o.getClass()) {
|
||||
if (o == null || getClass() != o.getClass())
|
||||
return false;
|
||||
}
|
||||
|
||||
CopycatRecord that = (CopycatRecord) o;
|
||||
|
||||
if (key != null ? !key.equals(that.key) : that.key != null) {
|
||||
if (key != null ? !key.equals(that.key) : that.key != null)
|
||||
return false;
|
||||
}
|
||||
if (partition != null ? !partition.equals(that.partition) : that.partition != null) {
|
||||
if (partition != null ? !partition.equals(that.partition) : that.partition != null)
|
||||
return false;
|
||||
}
|
||||
if (topic != null ? !topic.equals(that.topic) : that.topic != null) {
|
||||
if (topic != null ? !topic.equals(that.topic) : that.topic != null)
|
||||
return false;
|
||||
}
|
||||
if (value != null ? !value.equals(that.value) : that.value != null) {
|
||||
if (value != null ? !value.equals(that.value) : that.value != null)
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
|
|
@ -5,9 +5,9 @@
|
|||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* <p/>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p/>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
|
@ -38,21 +38,17 @@ public class SinkRecord extends CopycatRecord {
|
|||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) {
|
||||
if (this == o)
|
||||
return true;
|
||||
}
|
||||
if (o == null || getClass() != o.getClass()) {
|
||||
if (o == null || getClass() != o.getClass())
|
||||
return false;
|
||||
}
|
||||
if (!super.equals(o)) {
|
||||
if (!super.equals(o))
|
||||
return false;
|
||||
}
|
||||
|
||||
SinkRecord that = (SinkRecord) o;
|
||||
|
||||
if (offset != that.offset) {
|
||||
if (offset != that.offset)
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
|
|
@ -5,9 +5,9 @@
|
|||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* <p/>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p/>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
|
@ -66,24 +66,19 @@ public class SourceRecord extends CopycatRecord {
|
|||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) {
|
||||
if (this == o)
|
||||
return true;
|
||||
}
|
||||
if (o == null || getClass() != o.getClass()) {
|
||||
if (o == null || getClass() != o.getClass())
|
||||
return false;
|
||||
}
|
||||
if (!super.equals(o)) {
|
||||
if (!super.equals(o))
|
||||
return false;
|
||||
}
|
||||
|
||||
SourceRecord that = (SourceRecord) o;
|
||||
|
||||
if (offset != null ? !offset.equals(that.offset) : that.offset != null) {
|
||||
if (offset != null ? !offset.equals(that.offset) : that.offset != null)
|
||||
return false;
|
||||
}
|
||||
if (stream != null ? !stream.equals(that.stream) : that.stream != null) {
|
||||
if (stream != null ? !stream.equals(that.stream) : that.stream != null)
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
|
|
@ -38,9 +38,8 @@ public class ConnectorUtils {
|
|||
* @param numGroups the number of output groups to generate.
|
||||
*/
|
||||
public static <T> List<List<T>> groupPartitions(List<T> elements, int numGroups) {
|
||||
if (numGroups <= 0) {
|
||||
if (numGroups <= 0)
|
||||
throw new IllegalArgumentException("Number of groups must be positive.");
|
||||
}
|
||||
|
||||
List<List<T>> result = new ArrayList<>(numGroups);
|
||||
|
||||
|
|
|
@ -69,9 +69,8 @@ public class ConnectorTest {
|
|||
@Override
|
||||
public void stop() throws CopycatException {
|
||||
stopOrder = order++;
|
||||
if (stopException) {
|
||||
if (stopException)
|
||||
throw new CopycatException("error");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -49,9 +49,8 @@ public class FileStreamSinkConnector extends SinkConnector {
|
|||
ArrayList<Properties> configs = new ArrayList<>();
|
||||
for (int i = 0; i < maxTasks; i++) {
|
||||
Properties config = new Properties();
|
||||
if (filename != null) {
|
||||
if (filename != null)
|
||||
config.setProperty(FILE_CONFIG, filename);
|
||||
}
|
||||
configs.add(config);
|
||||
}
|
||||
return configs;
|
||||
|
|
|
@ -40,13 +40,10 @@ public class FileStreamSourceConnector extends SourceConnector {
|
|||
public void start(Properties props) throws CopycatException {
|
||||
filename = props.getProperty(FILE_CONFIG);
|
||||
topic = props.getProperty(TOPIC_CONFIG);
|
||||
if (topic == null || topic.isEmpty()) {
|
||||
if (topic == null || topic.isEmpty())
|
||||
throw new CopycatException("ConsoleConnector configuration must include 'topic' setting");
|
||||
}
|
||||
if (topic.contains(",")) {
|
||||
throw new CopycatException("ConsoleConnector should only have a single topic when used as a"
|
||||
+ " source.");
|
||||
}
|
||||
if (topic.contains(","))
|
||||
throw new CopycatException("ConsoleConnector should only have a single topic when used as a source.");
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -59,9 +56,8 @@ public class FileStreamSourceConnector extends SourceConnector {
|
|||
ArrayList<Properties> configs = new ArrayList<>();
|
||||
// Only one input stream makes sense.
|
||||
Properties config = new Properties();
|
||||
if (filename != null) {
|
||||
if (filename != null)
|
||||
config.setProperty(FILE_CONFIG, filename);
|
||||
}
|
||||
config.setProperty(TOPIC_CONFIG, topic);
|
||||
configs.add(config);
|
||||
return configs;
|
||||
|
|
|
@ -74,9 +74,8 @@ public class FileStreamSourceTask extends SourceTask<Object, Object> {
|
|||
}
|
||||
}
|
||||
topic = props.getProperty(FileStreamSourceConnector.TOPIC_CONFIG);
|
||||
if (topic == null) {
|
||||
if (topic == null)
|
||||
throw new CopycatRuntimeException("ConsoleSourceTask config missing topic setting");
|
||||
}
|
||||
reader = new BufferedReader(new InputStreamReader(stream));
|
||||
}
|
||||
|
||||
|
@ -90,9 +89,8 @@ public class FileStreamSourceTask extends SourceTask<Object, Object> {
|
|||
synchronized (this) {
|
||||
readerCopy = reader;
|
||||
}
|
||||
if (readerCopy == null) {
|
||||
if (readerCopy == null)
|
||||
return null;
|
||||
}
|
||||
|
||||
ArrayList<SourceRecord> records = null;
|
||||
|
||||
|
@ -112,9 +110,8 @@ public class FileStreamSourceTask extends SourceTask<Object, Object> {
|
|||
do {
|
||||
line = extractLine();
|
||||
if (line != null) {
|
||||
if (records == null) {
|
||||
if (records == null)
|
||||
records = new ArrayList<>();
|
||||
}
|
||||
records.add(new SourceRecord(null, streamOffset, topic, line));
|
||||
}
|
||||
new ArrayList<SourceRecord>();
|
||||
|
@ -122,9 +119,8 @@ public class FileStreamSourceTask extends SourceTask<Object, Object> {
|
|||
}
|
||||
}
|
||||
|
||||
if (nread <= 0) {
|
||||
if (nread <= 0)
|
||||
Thread.sleep(1);
|
||||
}
|
||||
|
||||
return records;
|
||||
} catch (IOException e) {
|
||||
|
@ -143,9 +139,8 @@ public class FileStreamSourceTask extends SourceTask<Object, Object> {
|
|||
break;
|
||||
} else if (buffer[i] == '\r') {
|
||||
// We need to check for \r\n, so we must skip this if we can't check the next char
|
||||
if (i + 1 >= offset) {
|
||||
if (i + 1 >= offset)
|
||||
return null;
|
||||
}
|
||||
|
||||
until = i;
|
||||
newStart = (buffer[i + 1] == '\n') ? i + 2 : i + 1;
|
||||
|
@ -157,9 +152,8 @@ public class FileStreamSourceTask extends SourceTask<Object, Object> {
|
|||
String result = new String(buffer, 0, until);
|
||||
System.arraycopy(buffer, newStart, buffer, 0, buffer.length - newStart);
|
||||
offset = offset - newStart;
|
||||
if (streamOffset != null) {
|
||||
if (streamOffset != null)
|
||||
streamOffset += newStart;
|
||||
}
|
||||
return result;
|
||||
} else {
|
||||
return null;
|
||||
|
|
|
@ -62,9 +62,8 @@ public class FileStreamSourceTaskTest {
|
|||
public void teardown() {
|
||||
tempFile.delete();
|
||||
|
||||
if (verifyMocks) {
|
||||
if (verifyMocks)
|
||||
PowerMock.verifyAll();
|
||||
}
|
||||
}
|
||||
|
||||
private void replay() {
|
||||
|
|
|
@ -107,14 +107,12 @@ public class JsonConverter implements Converter {
|
|||
|
||||
@Override
|
||||
public Object toCopycatData(Object value) {
|
||||
if (!(value instanceof JsonNode)) {
|
||||
if (!(value instanceof JsonNode))
|
||||
throw new CopycatRuntimeException("JsonConvert can only convert JsonNode objects.");
|
||||
}
|
||||
|
||||
JsonNode data = (JsonNode) value;
|
||||
if (!data.isObject() || data.size() != 2 || !data.has(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME) || !data.has(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME)) {
|
||||
if (!data.isObject() || data.size() != 2 || !data.has(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME) || !data.has(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME))
|
||||
throw new CopycatRuntimeException("JSON data converted to Copycat must be in envelope containing schema");
|
||||
}
|
||||
|
||||
return convertToCopycat(data.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME), data.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME));
|
||||
}
|
||||
|
@ -155,14 +153,12 @@ public class JsonConverter implements Converter {
|
|||
|
||||
|
||||
private static Schema asCopycatSchema(JsonNode jsonSchema) {
|
||||
if (jsonSchema.isNull()) {
|
||||
if (jsonSchema.isNull())
|
||||
return null;
|
||||
}
|
||||
|
||||
JsonNode schemaTypeNode = jsonSchema.get(JsonSchema.SCHEMA_TYPE_FIELD_NAME);
|
||||
if (schemaTypeNode == null || !schemaTypeNode.isTextual()) {
|
||||
if (schemaTypeNode == null || !schemaTypeNode.isTextual())
|
||||
throw new CopycatRuntimeException("Schema must contain 'type' field");
|
||||
}
|
||||
|
||||
switch (schemaTypeNode.textValue()) {
|
||||
case JsonSchema.BOOLEAN_TYPE_NAME:
|
||||
|
@ -238,9 +234,8 @@ public class JsonConverter implements Converter {
|
|||
itemSchema = fieldSchemaAndValue.schema;
|
||||
schema.set(JsonSchema.ARRAY_ITEMS_FIELD_NAME, itemSchema);
|
||||
} else {
|
||||
if (!itemSchema.equals(fieldSchemaAndValue.schema)) {
|
||||
if (!itemSchema.equals(fieldSchemaAndValue.schema))
|
||||
throw new CopycatRuntimeException("Mismatching schemas found in a list.");
|
||||
}
|
||||
}
|
||||
|
||||
list.add(fieldSchemaAndValue.payload);
|
||||
|
@ -253,19 +248,16 @@ public class JsonConverter implements Converter {
|
|||
|
||||
|
||||
private static Object convertToCopycat(JsonNode jsonSchema, JsonNode jsonValue) {
|
||||
if (jsonSchema.isNull()) {
|
||||
if (jsonSchema.isNull())
|
||||
return null;
|
||||
}
|
||||
|
||||
JsonNode schemaTypeNode = jsonSchema.get(JsonSchema.SCHEMA_TYPE_FIELD_NAME);
|
||||
if (schemaTypeNode == null || !schemaTypeNode.isTextual()) {
|
||||
if (schemaTypeNode == null || !schemaTypeNode.isTextual())
|
||||
throw new CopycatRuntimeException("Schema must contain 'type' field. Schema: " + jsonSchema.toString());
|
||||
}
|
||||
|
||||
JsonToCopycatTypeConverter typeConverter = TO_COPYCAT_CONVERTERS.get(schemaTypeNode.textValue());
|
||||
if (typeConverter != null) {
|
||||
if (typeConverter != null)
|
||||
return typeConverter.convert(jsonSchema, jsonValue);
|
||||
}
|
||||
|
||||
throw new CopycatRuntimeException("Unknown schema type: " + schemaTypeNode);
|
||||
}
|
||||
|
|
|
@ -84,9 +84,8 @@ public class CopycatConfig extends AbstractConfig {
|
|||
}
|
||||
|
||||
// Check for -- prefix on key
|
||||
if (key.startsWith("--")) {
|
||||
if (key.startsWith("--"))
|
||||
key = key.substring(2);
|
||||
}
|
||||
|
||||
props.setProperty(key, value);
|
||||
}
|
||||
|
|
|
@ -154,9 +154,8 @@ public class Worker {
|
|||
for (Map.Entry<ConnectorTaskId, WorkerTask> entry : tasks.entrySet()) {
|
||||
WorkerTask task = entry.getValue();
|
||||
log.debug("Waiting for task {} to finish shutting down", task);
|
||||
if (!task.awaitStop(Math.max(limit - time.milliseconds(), 0))) {
|
||||
if (!task.awaitStop(Math.max(limit - time.milliseconds(), 0)))
|
||||
log.error("Graceful shutdown of task {} failed.", task);
|
||||
}
|
||||
task.close();
|
||||
}
|
||||
|
||||
|
@ -221,9 +220,8 @@ public class Worker {
|
|||
public void stopTask(ConnectorTaskId id) throws CopycatException {
|
||||
WorkerTask task = getTask(id);
|
||||
task.stop();
|
||||
if (!task.awaitStop(config.getLong(WorkerConfig.TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG))) {
|
||||
if (!task.awaitStop(config.getLong(WorkerConfig.TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG)))
|
||||
log.error("Graceful stop of task {} failed.", task);
|
||||
}
|
||||
task.close();
|
||||
tasks.remove(id);
|
||||
}
|
||||
|
|
|
@ -73,9 +73,8 @@ public class WorkerSinkTask implements WorkerTask {
|
|||
public void stop() throws CopycatException {
|
||||
// Offset commit is handled upon exit in work thread
|
||||
task.stop();
|
||||
if (workThread != null) {
|
||||
if (workThread != null)
|
||||
workThread.startGracefulShutdown();
|
||||
}
|
||||
consumer.wakeup();
|
||||
}
|
||||
|
||||
|
@ -84,9 +83,8 @@ public class WorkerSinkTask implements WorkerTask {
|
|||
if (workThread != null) {
|
||||
try {
|
||||
boolean success = workThread.awaitShutdown(timeoutMs, TimeUnit.MILLISECONDS);
|
||||
if (!success) {
|
||||
if (!success)
|
||||
workThread.forceShutdown();
|
||||
}
|
||||
return success;
|
||||
} catch (InterruptedException e) {
|
||||
return false;
|
||||
|
@ -99,9 +97,8 @@ public class WorkerSinkTask implements WorkerTask {
|
|||
public void close() {
|
||||
// FIXME Kafka needs to add a timeout parameter here for us to properly obey the timeout
|
||||
// passed in
|
||||
if (consumer != null) {
|
||||
if (consumer != null)
|
||||
consumer.close();
|
||||
}
|
||||
}
|
||||
|
||||
/** Poll for new messages with the given timeout. Should only be invoked by the worker thread. */
|
||||
|
@ -156,9 +153,8 @@ public class WorkerSinkTask implements WorkerTask {
|
|||
|
||||
private KafkaConsumer<Object, Object> createConsumer(Properties taskProps) {
|
||||
String topicsStr = taskProps.getProperty(SinkTask.TOPICS_CONFIG);
|
||||
if (topicsStr == null || topicsStr.isEmpty()) {
|
||||
if (topicsStr == null || topicsStr.isEmpty())
|
||||
throw new CopycatRuntimeException("Sink tasks require a list of topics.");
|
||||
}
|
||||
String[] topics = topicsStr.split(",");
|
||||
|
||||
// Include any unknown worker configs so consumer configs can be set globally on the worker
|
||||
|
|
|
@ -99,9 +99,8 @@ public class WorkerSourceTask implements WorkerTask {
|
|||
public void stop() throws CopycatException {
|
||||
task.stop();
|
||||
commitOffsets();
|
||||
if (workThread != null) {
|
||||
if (workThread != null)
|
||||
workThread.startGracefulShutdown();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -109,9 +108,8 @@ public class WorkerSourceTask implements WorkerTask {
|
|||
if (workThread != null) {
|
||||
try {
|
||||
boolean success = workThread.awaitShutdown(timeoutMs, TimeUnit.MILLISECONDS);
|
||||
if (!success) {
|
||||
if (!success)
|
||||
workThread.forceShutdown();
|
||||
}
|
||||
return success;
|
||||
} catch (InterruptedException e) {
|
||||
return false;
|
||||
|
@ -165,9 +163,8 @@ public class WorkerSourceTask implements WorkerTask {
|
|||
private synchronized void recordSent(final ProducerRecord<Object, Object> record) {
|
||||
ProducerRecord<Object, Object> removed = outstandingMessages.remove(record);
|
||||
// While flushing, we may also see callbacks for items in the backlog
|
||||
if (removed == null && flushing) {
|
||||
if (removed == null && flushing)
|
||||
removed = outstandingMessagesBacklog.remove(record);
|
||||
}
|
||||
// But if neither one had it, something is very wrong
|
||||
if (removed == null) {
|
||||
log.error("Saw callback for record that was not present in the outstanding message set: "
|
||||
|
@ -294,9 +291,8 @@ public class WorkerSourceTask implements WorkerTask {
|
|||
try {
|
||||
while (getRunning()) {
|
||||
List<SourceRecord> records = task.poll();
|
||||
if (records == null) {
|
||||
if (records == null)
|
||||
continue;
|
||||
}
|
||||
sendRecords(records);
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
|
|
|
@ -57,11 +57,10 @@ public class FileConfigStorage implements ConfigStorage {
|
|||
|
||||
@Override
|
||||
public void putConnectorConfig(String connector, Properties properties) {
|
||||
if (properties == null) {
|
||||
if (properties == null)
|
||||
connectorConfig.remove(connector);
|
||||
} else {
|
||||
else
|
||||
connectorConfig.put(connector, properties);
|
||||
}
|
||||
save();
|
||||
}
|
||||
|
||||
|
|
|
@ -91,15 +91,13 @@ public class StandaloneCoordinator implements Coordinator {
|
|||
Callback<String> callback) {
|
||||
try {
|
||||
ConnectorState connState = createConnector(connectorProps);
|
||||
if (callback != null) {
|
||||
if (callback != null)
|
||||
callback.onCompletion(null, connState.name);
|
||||
}
|
||||
// This should always be a new job, create jobs from scratch
|
||||
createConnectorTasks(connState);
|
||||
} catch (CopycatRuntimeException e) {
|
||||
if (callback != null) {
|
||||
if (callback != null)
|
||||
callback.onCompletion(e, null);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -107,13 +105,11 @@ public class StandaloneCoordinator implements Coordinator {
|
|||
public synchronized void deleteConnector(String name, Callback<Void> callback) {
|
||||
try {
|
||||
destroyConnector(name);
|
||||
if (callback != null) {
|
||||
if (callback != null)
|
||||
callback.onCompletion(null, null);
|
||||
}
|
||||
} catch (CopycatRuntimeException e) {
|
||||
if (callback != null) {
|
||||
if (callback != null)
|
||||
callback.onCompletion(e, null);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -148,9 +144,8 @@ public class StandaloneCoordinator implements Coordinator {
|
|||
}
|
||||
ConnectorState state = new ConnectorState(connName, connector, maxTasks, topics);
|
||||
connectors.put(connName, state);
|
||||
if (configStorage != null) {
|
||||
if (configStorage != null)
|
||||
configStorage.putConnectorConfig(connName, connectorProps);
|
||||
}
|
||||
|
||||
log.info("Finished creating connector {}", connName);
|
||||
|
||||
|
@ -171,9 +166,8 @@ public class StandaloneCoordinator implements Coordinator {
|
|||
|
||||
stopConnector(state);
|
||||
connectors.remove(state.name);
|
||||
if (configStorage != null) {
|
||||
if (configStorage != null)
|
||||
configStorage.putConnectorConfig(state.name, null);
|
||||
}
|
||||
|
||||
log.info("Finished destroying connector {}", connName);
|
||||
}
|
||||
|
@ -251,9 +245,8 @@ public class StandaloneCoordinator implements Coordinator {
|
|||
}
|
||||
|
||||
private void restoreConnectors() {
|
||||
if (configStorage == null) {
|
||||
if (configStorage == null)
|
||||
return;
|
||||
}
|
||||
|
||||
Collection<String> connNames = configStorage.getConnectors();
|
||||
for (String connName : connNames) {
|
||||
|
|
|
@ -69,16 +69,14 @@ public class MemoryOffsetBackingStore implements OffsetBackingStore {
|
|||
Map<ByteBuffer, ByteBuffer> result = new HashMap<>();
|
||||
synchronized (MemoryOffsetBackingStore.this) {
|
||||
Map<ByteBuffer, ByteBuffer> namespaceData = data.get(namespace);
|
||||
if (namespaceData == null) {
|
||||
if (namespaceData == null)
|
||||
return result;
|
||||
}
|
||||
for (ByteBuffer key : keys) {
|
||||
result.put(key, namespaceData.get(key));
|
||||
}
|
||||
}
|
||||
if (callback != null) {
|
||||
if (callback != null)
|
||||
callback.onCompletion(null, result);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
});
|
||||
|
@ -102,9 +100,8 @@ public class MemoryOffsetBackingStore implements OffsetBackingStore {
|
|||
}
|
||||
save();
|
||||
}
|
||||
if (callback != null) {
|
||||
if (callback != null)
|
||||
callback.onCompletion(null, null);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
});
|
||||
|
|
|
@ -109,9 +109,8 @@ public class OffsetStorageWriter {
|
|||
throw new CopycatRuntimeException("OffsetStorageWriter is already flushing");
|
||||
}
|
||||
|
||||
if (data.isEmpty()) {
|
||||
if (data.isEmpty())
|
||||
return false;
|
||||
}
|
||||
|
||||
assert !flushing();
|
||||
toFlush = data;
|
||||
|
@ -157,9 +156,8 @@ public class OffsetStorageWriter {
|
|||
@Override
|
||||
public void onCompletion(Throwable error, Void result) {
|
||||
boolean isCurrent = handleFinishWrite(flushId, error, result);
|
||||
if (isCurrent && callback != null) {
|
||||
if (isCurrent && callback != null)
|
||||
callback.onCompletion(error, result);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
@ -188,9 +186,8 @@ public class OffsetStorageWriter {
|
|||
private synchronized boolean handleFinishWrite(long flushId, Throwable error, Void result) {
|
||||
// Callbacks need to be handled carefully since the flush operation may have already timed
|
||||
// out and been cancelled.
|
||||
if (flushId != currentFlushId) {
|
||||
if (flushId != currentFlushId)
|
||||
return false;
|
||||
}
|
||||
|
||||
if (error != null) {
|
||||
cancelFlush();
|
||||
|
|
|
@ -42,21 +42,17 @@ public class ConnectorTaskId implements Serializable {
|
|||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) {
|
||||
if (this == o)
|
||||
return true;
|
||||
}
|
||||
if (o == null || getClass() != o.getClass()) {
|
||||
if (o == null || getClass() != o.getClass())
|
||||
return false;
|
||||
}
|
||||
|
||||
ConnectorTaskId that = (ConnectorTaskId) o;
|
||||
|
||||
if (task != that.task) {
|
||||
if (task != that.task)
|
||||
return false;
|
||||
}
|
||||
if (connector != null ? !connector.equals(that.connector) : that.connector != null) {
|
||||
if (connector != null ? !connector.equals(that.connector) : that.connector != null)
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
|
|
@ -59,9 +59,8 @@ public abstract class ShutdownableThread extends Thread {
|
|||
public ShutdownableThread(String name, boolean daemon) {
|
||||
super(name);
|
||||
this.setDaemon(daemon);
|
||||
if (funcaughtExceptionHandler != null) {
|
||||
if (funcaughtExceptionHandler != null)
|
||||
this.setUncaughtExceptionHandler(funcaughtExceptionHandler);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -98,9 +97,8 @@ public abstract class ShutdownableThread extends Thread {
|
|||
public void shutdown(long gracefulTimeout, TimeUnit unit)
|
||||
throws InterruptedException {
|
||||
boolean success = gracefulShutdown(gracefulTimeout, unit);
|
||||
if (!success) {
|
||||
if (!success)
|
||||
forceShutdown();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -355,9 +355,8 @@ public class WorkerSinkTaskTest extends ThreadedTest {
|
|||
@Override
|
||||
public Object answer() throws Throwable {
|
||||
time.sleep(consumerCommitDelayMs);
|
||||
if (invokeCallback) {
|
||||
if (invokeCallback)
|
||||
capturedCallback.getValue().onComplete(offsets, consumerCommitError);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
});
|
||||
|
|
|
@ -207,9 +207,8 @@ public class OffsetStorageWriterTest {
|
|||
return service.submit(new Callable<Void>() {
|
||||
@Override
|
||||
public Void call() throws Exception {
|
||||
if (waitForCompletion != null) {
|
||||
if (waitForCompletion != null)
|
||||
assertTrue(waitForCompletion.await(10000, TimeUnit.MILLISECONDS));
|
||||
}
|
||||
|
||||
if (fail) {
|
||||
storeCallback.getValue().onCompletion(exception, null);
|
||||
|
|
|
@ -26,14 +26,12 @@ public class TestBackgroundThreadExceptionHandler implements Thread.UncaughtExce
|
|||
|
||||
@Override
|
||||
public void uncaughtException(Thread t, Throwable e) {
|
||||
if (this.firstException == null) {
|
||||
if (this.firstException == null)
|
||||
this.firstException = e;
|
||||
}
|
||||
}
|
||||
|
||||
public void verifyNoExceptions() {
|
||||
if (this.firstException != null) {
|
||||
if (this.firstException != null)
|
||||
throw new AssertionError(this.firstException);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue