mirror of https://github.com/apache/kafka.git
Style cleanup
This commit is contained in:
parent
6ba87debad
commit
122423eef7
|
@ -5,9 +5,9 @@
|
||||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
* (the "License"); you may not use this file except in compliance with
|
* (the "License"); you may not use this file except in compliance with
|
||||||
* the License. You may obtain a copy of the License at
|
* the License. You may obtain a copy of the License at
|
||||||
*
|
* <p/>
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
*
|
* <p/>
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
@ -70,27 +70,21 @@ public abstract class CopycatRecord {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean equals(Object o) {
|
public boolean equals(Object o) {
|
||||||
if (this == o) {
|
if (this == o)
|
||||||
return true;
|
return true;
|
||||||
}
|
if (o == null || getClass() != o.getClass())
|
||||||
if (o == null || getClass() != o.getClass()) {
|
|
||||||
return false;
|
return false;
|
||||||
}
|
|
||||||
|
|
||||||
CopycatRecord that = (CopycatRecord) o;
|
CopycatRecord that = (CopycatRecord) o;
|
||||||
|
|
||||||
if (key != null ? !key.equals(that.key) : that.key != null) {
|
if (key != null ? !key.equals(that.key) : that.key != null)
|
||||||
return false;
|
return false;
|
||||||
}
|
if (partition != null ? !partition.equals(that.partition) : that.partition != null)
|
||||||
if (partition != null ? !partition.equals(that.partition) : that.partition != null) {
|
|
||||||
return false;
|
return false;
|
||||||
}
|
if (topic != null ? !topic.equals(that.topic) : that.topic != null)
|
||||||
if (topic != null ? !topic.equals(that.topic) : that.topic != null) {
|
|
||||||
return false;
|
return false;
|
||||||
}
|
if (value != null ? !value.equals(that.value) : that.value != null)
|
||||||
if (value != null ? !value.equals(that.value) : that.value != null) {
|
|
||||||
return false;
|
return false;
|
||||||
}
|
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,9 +5,9 @@
|
||||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
* (the "License"); you may not use this file except in compliance with
|
* (the "License"); you may not use this file except in compliance with
|
||||||
* the License. You may obtain a copy of the License at
|
* the License. You may obtain a copy of the License at
|
||||||
*
|
* <p/>
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
*
|
* <p/>
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
@ -38,21 +38,17 @@ public class SinkRecord extends CopycatRecord {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean equals(Object o) {
|
public boolean equals(Object o) {
|
||||||
if (this == o) {
|
if (this == o)
|
||||||
return true;
|
return true;
|
||||||
}
|
if (o == null || getClass() != o.getClass())
|
||||||
if (o == null || getClass() != o.getClass()) {
|
|
||||||
return false;
|
return false;
|
||||||
}
|
if (!super.equals(o))
|
||||||
if (!super.equals(o)) {
|
|
||||||
return false;
|
return false;
|
||||||
}
|
|
||||||
|
|
||||||
SinkRecord that = (SinkRecord) o;
|
SinkRecord that = (SinkRecord) o;
|
||||||
|
|
||||||
if (offset != that.offset) {
|
if (offset != that.offset)
|
||||||
return false;
|
return false;
|
||||||
}
|
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,9 +5,9 @@
|
||||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
* (the "License"); you may not use this file except in compliance with
|
* (the "License"); you may not use this file except in compliance with
|
||||||
* the License. You may obtain a copy of the License at
|
* the License. You may obtain a copy of the License at
|
||||||
*
|
* <p/>
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
*
|
* <p/>
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
@ -66,24 +66,19 @@ public class SourceRecord extends CopycatRecord {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean equals(Object o) {
|
public boolean equals(Object o) {
|
||||||
if (this == o) {
|
if (this == o)
|
||||||
return true;
|
return true;
|
||||||
}
|
if (o == null || getClass() != o.getClass())
|
||||||
if (o == null || getClass() != o.getClass()) {
|
|
||||||
return false;
|
return false;
|
||||||
}
|
if (!super.equals(o))
|
||||||
if (!super.equals(o)) {
|
|
||||||
return false;
|
return false;
|
||||||
}
|
|
||||||
|
|
||||||
SourceRecord that = (SourceRecord) o;
|
SourceRecord that = (SourceRecord) o;
|
||||||
|
|
||||||
if (offset != null ? !offset.equals(that.offset) : that.offset != null) {
|
if (offset != null ? !offset.equals(that.offset) : that.offset != null)
|
||||||
return false;
|
return false;
|
||||||
}
|
if (stream != null ? !stream.equals(that.stream) : that.stream != null)
|
||||||
if (stream != null ? !stream.equals(that.stream) : that.stream != null) {
|
|
||||||
return false;
|
return false;
|
||||||
}
|
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
|
@ -38,9 +38,8 @@ public class ConnectorUtils {
|
||||||
* @param numGroups the number of output groups to generate.
|
* @param numGroups the number of output groups to generate.
|
||||||
*/
|
*/
|
||||||
public static <T> List<List<T>> groupPartitions(List<T> elements, int numGroups) {
|
public static <T> List<List<T>> groupPartitions(List<T> elements, int numGroups) {
|
||||||
if (numGroups <= 0) {
|
if (numGroups <= 0)
|
||||||
throw new IllegalArgumentException("Number of groups must be positive.");
|
throw new IllegalArgumentException("Number of groups must be positive.");
|
||||||
}
|
|
||||||
|
|
||||||
List<List<T>> result = new ArrayList<>(numGroups);
|
List<List<T>> result = new ArrayList<>(numGroups);
|
||||||
|
|
||||||
|
|
|
@ -69,9 +69,8 @@ public class ConnectorTest {
|
||||||
@Override
|
@Override
|
||||||
public void stop() throws CopycatException {
|
public void stop() throws CopycatException {
|
||||||
stopOrder = order++;
|
stopOrder = order++;
|
||||||
if (stopException) {
|
if (stopException)
|
||||||
throw new CopycatException("error");
|
throw new CopycatException("error");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
|
@ -49,9 +49,8 @@ public class FileStreamSinkConnector extends SinkConnector {
|
||||||
ArrayList<Properties> configs = new ArrayList<>();
|
ArrayList<Properties> configs = new ArrayList<>();
|
||||||
for (int i = 0; i < maxTasks; i++) {
|
for (int i = 0; i < maxTasks; i++) {
|
||||||
Properties config = new Properties();
|
Properties config = new Properties();
|
||||||
if (filename != null) {
|
if (filename != null)
|
||||||
config.setProperty(FILE_CONFIG, filename);
|
config.setProperty(FILE_CONFIG, filename);
|
||||||
}
|
|
||||||
configs.add(config);
|
configs.add(config);
|
||||||
}
|
}
|
||||||
return configs;
|
return configs;
|
||||||
|
|
|
@ -40,13 +40,10 @@ public class FileStreamSourceConnector extends SourceConnector {
|
||||||
public void start(Properties props) throws CopycatException {
|
public void start(Properties props) throws CopycatException {
|
||||||
filename = props.getProperty(FILE_CONFIG);
|
filename = props.getProperty(FILE_CONFIG);
|
||||||
topic = props.getProperty(TOPIC_CONFIG);
|
topic = props.getProperty(TOPIC_CONFIG);
|
||||||
if (topic == null || topic.isEmpty()) {
|
if (topic == null || topic.isEmpty())
|
||||||
throw new CopycatException("ConsoleConnector configuration must include 'topic' setting");
|
throw new CopycatException("ConsoleConnector configuration must include 'topic' setting");
|
||||||
}
|
if (topic.contains(","))
|
||||||
if (topic.contains(",")) {
|
throw new CopycatException("ConsoleConnector should only have a single topic when used as a source.");
|
||||||
throw new CopycatException("ConsoleConnector should only have a single topic when used as a"
|
|
||||||
+ " source.");
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -59,9 +56,8 @@ public class FileStreamSourceConnector extends SourceConnector {
|
||||||
ArrayList<Properties> configs = new ArrayList<>();
|
ArrayList<Properties> configs = new ArrayList<>();
|
||||||
// Only one input stream makes sense.
|
// Only one input stream makes sense.
|
||||||
Properties config = new Properties();
|
Properties config = new Properties();
|
||||||
if (filename != null) {
|
if (filename != null)
|
||||||
config.setProperty(FILE_CONFIG, filename);
|
config.setProperty(FILE_CONFIG, filename);
|
||||||
}
|
|
||||||
config.setProperty(TOPIC_CONFIG, topic);
|
config.setProperty(TOPIC_CONFIG, topic);
|
||||||
configs.add(config);
|
configs.add(config);
|
||||||
return configs;
|
return configs;
|
||||||
|
|
|
@ -74,9 +74,8 @@ public class FileStreamSourceTask extends SourceTask<Object, Object> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
topic = props.getProperty(FileStreamSourceConnector.TOPIC_CONFIG);
|
topic = props.getProperty(FileStreamSourceConnector.TOPIC_CONFIG);
|
||||||
if (topic == null) {
|
if (topic == null)
|
||||||
throw new CopycatRuntimeException("ConsoleSourceTask config missing topic setting");
|
throw new CopycatRuntimeException("ConsoleSourceTask config missing topic setting");
|
||||||
}
|
|
||||||
reader = new BufferedReader(new InputStreamReader(stream));
|
reader = new BufferedReader(new InputStreamReader(stream));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -90,9 +89,8 @@ public class FileStreamSourceTask extends SourceTask<Object, Object> {
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
readerCopy = reader;
|
readerCopy = reader;
|
||||||
}
|
}
|
||||||
if (readerCopy == null) {
|
if (readerCopy == null)
|
||||||
return null;
|
return null;
|
||||||
}
|
|
||||||
|
|
||||||
ArrayList<SourceRecord> records = null;
|
ArrayList<SourceRecord> records = null;
|
||||||
|
|
||||||
|
@ -112,9 +110,8 @@ public class FileStreamSourceTask extends SourceTask<Object, Object> {
|
||||||
do {
|
do {
|
||||||
line = extractLine();
|
line = extractLine();
|
||||||
if (line != null) {
|
if (line != null) {
|
||||||
if (records == null) {
|
if (records == null)
|
||||||
records = new ArrayList<>();
|
records = new ArrayList<>();
|
||||||
}
|
|
||||||
records.add(new SourceRecord(null, streamOffset, topic, line));
|
records.add(new SourceRecord(null, streamOffset, topic, line));
|
||||||
}
|
}
|
||||||
new ArrayList<SourceRecord>();
|
new ArrayList<SourceRecord>();
|
||||||
|
@ -122,9 +119,8 @@ public class FileStreamSourceTask extends SourceTask<Object, Object> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (nread <= 0) {
|
if (nread <= 0)
|
||||||
Thread.sleep(1);
|
Thread.sleep(1);
|
||||||
}
|
|
||||||
|
|
||||||
return records;
|
return records;
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
|
@ -143,9 +139,8 @@ public class FileStreamSourceTask extends SourceTask<Object, Object> {
|
||||||
break;
|
break;
|
||||||
} else if (buffer[i] == '\r') {
|
} else if (buffer[i] == '\r') {
|
||||||
// We need to check for \r\n, so we must skip this if we can't check the next char
|
// We need to check for \r\n, so we must skip this if we can't check the next char
|
||||||
if (i + 1 >= offset) {
|
if (i + 1 >= offset)
|
||||||
return null;
|
return null;
|
||||||
}
|
|
||||||
|
|
||||||
until = i;
|
until = i;
|
||||||
newStart = (buffer[i + 1] == '\n') ? i + 2 : i + 1;
|
newStart = (buffer[i + 1] == '\n') ? i + 2 : i + 1;
|
||||||
|
@ -157,9 +152,8 @@ public class FileStreamSourceTask extends SourceTask<Object, Object> {
|
||||||
String result = new String(buffer, 0, until);
|
String result = new String(buffer, 0, until);
|
||||||
System.arraycopy(buffer, newStart, buffer, 0, buffer.length - newStart);
|
System.arraycopy(buffer, newStart, buffer, 0, buffer.length - newStart);
|
||||||
offset = offset - newStart;
|
offset = offset - newStart;
|
||||||
if (streamOffset != null) {
|
if (streamOffset != null)
|
||||||
streamOffset += newStart;
|
streamOffset += newStart;
|
||||||
}
|
|
||||||
return result;
|
return result;
|
||||||
} else {
|
} else {
|
||||||
return null;
|
return null;
|
||||||
|
|
|
@ -62,10 +62,9 @@ public class FileStreamSourceTaskTest {
|
||||||
public void teardown() {
|
public void teardown() {
|
||||||
tempFile.delete();
|
tempFile.delete();
|
||||||
|
|
||||||
if (verifyMocks) {
|
if (verifyMocks)
|
||||||
PowerMock.verifyAll();
|
PowerMock.verifyAll();
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
private void replay() {
|
private void replay() {
|
||||||
PowerMock.replayAll();
|
PowerMock.replayAll();
|
||||||
|
|
|
@ -107,14 +107,12 @@ public class JsonConverter implements Converter {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Object toCopycatData(Object value) {
|
public Object toCopycatData(Object value) {
|
||||||
if (!(value instanceof JsonNode)) {
|
if (!(value instanceof JsonNode))
|
||||||
throw new CopycatRuntimeException("JsonConvert can only convert JsonNode objects.");
|
throw new CopycatRuntimeException("JsonConvert can only convert JsonNode objects.");
|
||||||
}
|
|
||||||
|
|
||||||
JsonNode data = (JsonNode) value;
|
JsonNode data = (JsonNode) value;
|
||||||
if (!data.isObject() || data.size() != 2 || !data.has(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME) || !data.has(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME)) {
|
if (!data.isObject() || data.size() != 2 || !data.has(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME) || !data.has(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME))
|
||||||
throw new CopycatRuntimeException("JSON data converted to Copycat must be in envelope containing schema");
|
throw new CopycatRuntimeException("JSON data converted to Copycat must be in envelope containing schema");
|
||||||
}
|
|
||||||
|
|
||||||
return convertToCopycat(data.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME), data.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME));
|
return convertToCopycat(data.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME), data.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME));
|
||||||
}
|
}
|
||||||
|
@ -155,14 +153,12 @@ public class JsonConverter implements Converter {
|
||||||
|
|
||||||
|
|
||||||
private static Schema asCopycatSchema(JsonNode jsonSchema) {
|
private static Schema asCopycatSchema(JsonNode jsonSchema) {
|
||||||
if (jsonSchema.isNull()) {
|
if (jsonSchema.isNull())
|
||||||
return null;
|
return null;
|
||||||
}
|
|
||||||
|
|
||||||
JsonNode schemaTypeNode = jsonSchema.get(JsonSchema.SCHEMA_TYPE_FIELD_NAME);
|
JsonNode schemaTypeNode = jsonSchema.get(JsonSchema.SCHEMA_TYPE_FIELD_NAME);
|
||||||
if (schemaTypeNode == null || !schemaTypeNode.isTextual()) {
|
if (schemaTypeNode == null || !schemaTypeNode.isTextual())
|
||||||
throw new CopycatRuntimeException("Schema must contain 'type' field");
|
throw new CopycatRuntimeException("Schema must contain 'type' field");
|
||||||
}
|
|
||||||
|
|
||||||
switch (schemaTypeNode.textValue()) {
|
switch (schemaTypeNode.textValue()) {
|
||||||
case JsonSchema.BOOLEAN_TYPE_NAME:
|
case JsonSchema.BOOLEAN_TYPE_NAME:
|
||||||
|
@ -238,10 +234,9 @@ public class JsonConverter implements Converter {
|
||||||
itemSchema = fieldSchemaAndValue.schema;
|
itemSchema = fieldSchemaAndValue.schema;
|
||||||
schema.set(JsonSchema.ARRAY_ITEMS_FIELD_NAME, itemSchema);
|
schema.set(JsonSchema.ARRAY_ITEMS_FIELD_NAME, itemSchema);
|
||||||
} else {
|
} else {
|
||||||
if (!itemSchema.equals(fieldSchemaAndValue.schema)) {
|
if (!itemSchema.equals(fieldSchemaAndValue.schema))
|
||||||
throw new CopycatRuntimeException("Mismatching schemas found in a list.");
|
throw new CopycatRuntimeException("Mismatching schemas found in a list.");
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
list.add(fieldSchemaAndValue.payload);
|
list.add(fieldSchemaAndValue.payload);
|
||||||
}
|
}
|
||||||
|
@ -253,19 +248,16 @@ public class JsonConverter implements Converter {
|
||||||
|
|
||||||
|
|
||||||
private static Object convertToCopycat(JsonNode jsonSchema, JsonNode jsonValue) {
|
private static Object convertToCopycat(JsonNode jsonSchema, JsonNode jsonValue) {
|
||||||
if (jsonSchema.isNull()) {
|
if (jsonSchema.isNull())
|
||||||
return null;
|
return null;
|
||||||
}
|
|
||||||
|
|
||||||
JsonNode schemaTypeNode = jsonSchema.get(JsonSchema.SCHEMA_TYPE_FIELD_NAME);
|
JsonNode schemaTypeNode = jsonSchema.get(JsonSchema.SCHEMA_TYPE_FIELD_NAME);
|
||||||
if (schemaTypeNode == null || !schemaTypeNode.isTextual()) {
|
if (schemaTypeNode == null || !schemaTypeNode.isTextual())
|
||||||
throw new CopycatRuntimeException("Schema must contain 'type' field. Schema: " + jsonSchema.toString());
|
throw new CopycatRuntimeException("Schema must contain 'type' field. Schema: " + jsonSchema.toString());
|
||||||
}
|
|
||||||
|
|
||||||
JsonToCopycatTypeConverter typeConverter = TO_COPYCAT_CONVERTERS.get(schemaTypeNode.textValue());
|
JsonToCopycatTypeConverter typeConverter = TO_COPYCAT_CONVERTERS.get(schemaTypeNode.textValue());
|
||||||
if (typeConverter != null) {
|
if (typeConverter != null)
|
||||||
return typeConverter.convert(jsonSchema, jsonValue);
|
return typeConverter.convert(jsonSchema, jsonValue);
|
||||||
}
|
|
||||||
|
|
||||||
throw new CopycatRuntimeException("Unknown schema type: " + schemaTypeNode);
|
throw new CopycatRuntimeException("Unknown schema type: " + schemaTypeNode);
|
||||||
}
|
}
|
||||||
|
|
|
@ -84,9 +84,8 @@ public class CopycatConfig extends AbstractConfig {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check for -- prefix on key
|
// Check for -- prefix on key
|
||||||
if (key.startsWith("--")) {
|
if (key.startsWith("--"))
|
||||||
key = key.substring(2);
|
key = key.substring(2);
|
||||||
}
|
|
||||||
|
|
||||||
props.setProperty(key, value);
|
props.setProperty(key, value);
|
||||||
}
|
}
|
||||||
|
|
|
@ -154,9 +154,8 @@ public class Worker {
|
||||||
for (Map.Entry<ConnectorTaskId, WorkerTask> entry : tasks.entrySet()) {
|
for (Map.Entry<ConnectorTaskId, WorkerTask> entry : tasks.entrySet()) {
|
||||||
WorkerTask task = entry.getValue();
|
WorkerTask task = entry.getValue();
|
||||||
log.debug("Waiting for task {} to finish shutting down", task);
|
log.debug("Waiting for task {} to finish shutting down", task);
|
||||||
if (!task.awaitStop(Math.max(limit - time.milliseconds(), 0))) {
|
if (!task.awaitStop(Math.max(limit - time.milliseconds(), 0)))
|
||||||
log.error("Graceful shutdown of task {} failed.", task);
|
log.error("Graceful shutdown of task {} failed.", task);
|
||||||
}
|
|
||||||
task.close();
|
task.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -221,9 +220,8 @@ public class Worker {
|
||||||
public void stopTask(ConnectorTaskId id) throws CopycatException {
|
public void stopTask(ConnectorTaskId id) throws CopycatException {
|
||||||
WorkerTask task = getTask(id);
|
WorkerTask task = getTask(id);
|
||||||
task.stop();
|
task.stop();
|
||||||
if (!task.awaitStop(config.getLong(WorkerConfig.TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG))) {
|
if (!task.awaitStop(config.getLong(WorkerConfig.TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG)))
|
||||||
log.error("Graceful stop of task {} failed.", task);
|
log.error("Graceful stop of task {} failed.", task);
|
||||||
}
|
|
||||||
task.close();
|
task.close();
|
||||||
tasks.remove(id);
|
tasks.remove(id);
|
||||||
}
|
}
|
||||||
|
|
|
@ -73,9 +73,8 @@ public class WorkerSinkTask implements WorkerTask {
|
||||||
public void stop() throws CopycatException {
|
public void stop() throws CopycatException {
|
||||||
// Offset commit is handled upon exit in work thread
|
// Offset commit is handled upon exit in work thread
|
||||||
task.stop();
|
task.stop();
|
||||||
if (workThread != null) {
|
if (workThread != null)
|
||||||
workThread.startGracefulShutdown();
|
workThread.startGracefulShutdown();
|
||||||
}
|
|
||||||
consumer.wakeup();
|
consumer.wakeup();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -84,9 +83,8 @@ public class WorkerSinkTask implements WorkerTask {
|
||||||
if (workThread != null) {
|
if (workThread != null) {
|
||||||
try {
|
try {
|
||||||
boolean success = workThread.awaitShutdown(timeoutMs, TimeUnit.MILLISECONDS);
|
boolean success = workThread.awaitShutdown(timeoutMs, TimeUnit.MILLISECONDS);
|
||||||
if (!success) {
|
if (!success)
|
||||||
workThread.forceShutdown();
|
workThread.forceShutdown();
|
||||||
}
|
|
||||||
return success;
|
return success;
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
return false;
|
return false;
|
||||||
|
@ -99,10 +97,9 @@ public class WorkerSinkTask implements WorkerTask {
|
||||||
public void close() {
|
public void close() {
|
||||||
// FIXME Kafka needs to add a timeout parameter here for us to properly obey the timeout
|
// FIXME Kafka needs to add a timeout parameter here for us to properly obey the timeout
|
||||||
// passed in
|
// passed in
|
||||||
if (consumer != null) {
|
if (consumer != null)
|
||||||
consumer.close();
|
consumer.close();
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
/** Poll for new messages with the given timeout. Should only be invoked by the worker thread. */
|
/** Poll for new messages with the given timeout. Should only be invoked by the worker thread. */
|
||||||
public void poll(long timeoutMs) {
|
public void poll(long timeoutMs) {
|
||||||
|
@ -156,9 +153,8 @@ public class WorkerSinkTask implements WorkerTask {
|
||||||
|
|
||||||
private KafkaConsumer<Object, Object> createConsumer(Properties taskProps) {
|
private KafkaConsumer<Object, Object> createConsumer(Properties taskProps) {
|
||||||
String topicsStr = taskProps.getProperty(SinkTask.TOPICS_CONFIG);
|
String topicsStr = taskProps.getProperty(SinkTask.TOPICS_CONFIG);
|
||||||
if (topicsStr == null || topicsStr.isEmpty()) {
|
if (topicsStr == null || topicsStr.isEmpty())
|
||||||
throw new CopycatRuntimeException("Sink tasks require a list of topics.");
|
throw new CopycatRuntimeException("Sink tasks require a list of topics.");
|
||||||
}
|
|
||||||
String[] topics = topicsStr.split(",");
|
String[] topics = topicsStr.split(",");
|
||||||
|
|
||||||
// Include any unknown worker configs so consumer configs can be set globally on the worker
|
// Include any unknown worker configs so consumer configs can be set globally on the worker
|
||||||
|
|
|
@ -99,19 +99,17 @@ public class WorkerSourceTask implements WorkerTask {
|
||||||
public void stop() throws CopycatException {
|
public void stop() throws CopycatException {
|
||||||
task.stop();
|
task.stop();
|
||||||
commitOffsets();
|
commitOffsets();
|
||||||
if (workThread != null) {
|
if (workThread != null)
|
||||||
workThread.startGracefulShutdown();
|
workThread.startGracefulShutdown();
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean awaitStop(long timeoutMs) {
|
public boolean awaitStop(long timeoutMs) {
|
||||||
if (workThread != null) {
|
if (workThread != null) {
|
||||||
try {
|
try {
|
||||||
boolean success = workThread.awaitShutdown(timeoutMs, TimeUnit.MILLISECONDS);
|
boolean success = workThread.awaitShutdown(timeoutMs, TimeUnit.MILLISECONDS);
|
||||||
if (!success) {
|
if (!success)
|
||||||
workThread.forceShutdown();
|
workThread.forceShutdown();
|
||||||
}
|
|
||||||
return success;
|
return success;
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
return false;
|
return false;
|
||||||
|
@ -165,9 +163,8 @@ public class WorkerSourceTask implements WorkerTask {
|
||||||
private synchronized void recordSent(final ProducerRecord<Object, Object> record) {
|
private synchronized void recordSent(final ProducerRecord<Object, Object> record) {
|
||||||
ProducerRecord<Object, Object> removed = outstandingMessages.remove(record);
|
ProducerRecord<Object, Object> removed = outstandingMessages.remove(record);
|
||||||
// While flushing, we may also see callbacks for items in the backlog
|
// While flushing, we may also see callbacks for items in the backlog
|
||||||
if (removed == null && flushing) {
|
if (removed == null && flushing)
|
||||||
removed = outstandingMessagesBacklog.remove(record);
|
removed = outstandingMessagesBacklog.remove(record);
|
||||||
}
|
|
||||||
// But if neither one had it, something is very wrong
|
// But if neither one had it, something is very wrong
|
||||||
if (removed == null) {
|
if (removed == null) {
|
||||||
log.error("Saw callback for record that was not present in the outstanding message set: "
|
log.error("Saw callback for record that was not present in the outstanding message set: "
|
||||||
|
@ -294,9 +291,8 @@ public class WorkerSourceTask implements WorkerTask {
|
||||||
try {
|
try {
|
||||||
while (getRunning()) {
|
while (getRunning()) {
|
||||||
List<SourceRecord> records = task.poll();
|
List<SourceRecord> records = task.poll();
|
||||||
if (records == null) {
|
if (records == null)
|
||||||
continue;
|
continue;
|
||||||
}
|
|
||||||
sendRecords(records);
|
sendRecords(records);
|
||||||
}
|
}
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
|
|
|
@ -57,11 +57,10 @@ public class FileConfigStorage implements ConfigStorage {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void putConnectorConfig(String connector, Properties properties) {
|
public void putConnectorConfig(String connector, Properties properties) {
|
||||||
if (properties == null) {
|
if (properties == null)
|
||||||
connectorConfig.remove(connector);
|
connectorConfig.remove(connector);
|
||||||
} else {
|
else
|
||||||
connectorConfig.put(connector, properties);
|
connectorConfig.put(connector, properties);
|
||||||
}
|
|
||||||
save();
|
save();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -91,31 +91,27 @@ public class StandaloneCoordinator implements Coordinator {
|
||||||
Callback<String> callback) {
|
Callback<String> callback) {
|
||||||
try {
|
try {
|
||||||
ConnectorState connState = createConnector(connectorProps);
|
ConnectorState connState = createConnector(connectorProps);
|
||||||
if (callback != null) {
|
if (callback != null)
|
||||||
callback.onCompletion(null, connState.name);
|
callback.onCompletion(null, connState.name);
|
||||||
}
|
|
||||||
// This should always be a new job, create jobs from scratch
|
// This should always be a new job, create jobs from scratch
|
||||||
createConnectorTasks(connState);
|
createConnectorTasks(connState);
|
||||||
} catch (CopycatRuntimeException e) {
|
} catch (CopycatRuntimeException e) {
|
||||||
if (callback != null) {
|
if (callback != null)
|
||||||
callback.onCompletion(e, null);
|
callback.onCompletion(e, null);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized void deleteConnector(String name, Callback<Void> callback) {
|
public synchronized void deleteConnector(String name, Callback<Void> callback) {
|
||||||
try {
|
try {
|
||||||
destroyConnector(name);
|
destroyConnector(name);
|
||||||
if (callback != null) {
|
if (callback != null)
|
||||||
callback.onCompletion(null, null);
|
callback.onCompletion(null, null);
|
||||||
}
|
|
||||||
} catch (CopycatRuntimeException e) {
|
} catch (CopycatRuntimeException e) {
|
||||||
if (callback != null) {
|
if (callback != null)
|
||||||
callback.onCompletion(e, null);
|
callback.onCompletion(e, null);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
// Creates the and configures the connector. Does not setup any tasks
|
// Creates the and configures the connector. Does not setup any tasks
|
||||||
private ConnectorState createConnector(Properties connectorProps) {
|
private ConnectorState createConnector(Properties connectorProps) {
|
||||||
|
@ -148,9 +144,8 @@ public class StandaloneCoordinator implements Coordinator {
|
||||||
}
|
}
|
||||||
ConnectorState state = new ConnectorState(connName, connector, maxTasks, topics);
|
ConnectorState state = new ConnectorState(connName, connector, maxTasks, topics);
|
||||||
connectors.put(connName, state);
|
connectors.put(connName, state);
|
||||||
if (configStorage != null) {
|
if (configStorage != null)
|
||||||
configStorage.putConnectorConfig(connName, connectorProps);
|
configStorage.putConnectorConfig(connName, connectorProps);
|
||||||
}
|
|
||||||
|
|
||||||
log.info("Finished creating connector {}", connName);
|
log.info("Finished creating connector {}", connName);
|
||||||
|
|
||||||
|
@ -171,9 +166,8 @@ public class StandaloneCoordinator implements Coordinator {
|
||||||
|
|
||||||
stopConnector(state);
|
stopConnector(state);
|
||||||
connectors.remove(state.name);
|
connectors.remove(state.name);
|
||||||
if (configStorage != null) {
|
if (configStorage != null)
|
||||||
configStorage.putConnectorConfig(state.name, null);
|
configStorage.putConnectorConfig(state.name, null);
|
||||||
}
|
|
||||||
|
|
||||||
log.info("Finished destroying connector {}", connName);
|
log.info("Finished destroying connector {}", connName);
|
||||||
}
|
}
|
||||||
|
@ -251,9 +245,8 @@ public class StandaloneCoordinator implements Coordinator {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void restoreConnectors() {
|
private void restoreConnectors() {
|
||||||
if (configStorage == null) {
|
if (configStorage == null)
|
||||||
return;
|
return;
|
||||||
}
|
|
||||||
|
|
||||||
Collection<String> connNames = configStorage.getConnectors();
|
Collection<String> connNames = configStorage.getConnectors();
|
||||||
for (String connName : connNames) {
|
for (String connName : connNames) {
|
||||||
|
|
|
@ -69,16 +69,14 @@ public class MemoryOffsetBackingStore implements OffsetBackingStore {
|
||||||
Map<ByteBuffer, ByteBuffer> result = new HashMap<>();
|
Map<ByteBuffer, ByteBuffer> result = new HashMap<>();
|
||||||
synchronized (MemoryOffsetBackingStore.this) {
|
synchronized (MemoryOffsetBackingStore.this) {
|
||||||
Map<ByteBuffer, ByteBuffer> namespaceData = data.get(namespace);
|
Map<ByteBuffer, ByteBuffer> namespaceData = data.get(namespace);
|
||||||
if (namespaceData == null) {
|
if (namespaceData == null)
|
||||||
return result;
|
return result;
|
||||||
}
|
|
||||||
for (ByteBuffer key : keys) {
|
for (ByteBuffer key : keys) {
|
||||||
result.put(key, namespaceData.get(key));
|
result.put(key, namespaceData.get(key));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (callback != null) {
|
if (callback != null)
|
||||||
callback.onCompletion(null, result);
|
callback.onCompletion(null, result);
|
||||||
}
|
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
@ -102,9 +100,8 @@ public class MemoryOffsetBackingStore implements OffsetBackingStore {
|
||||||
}
|
}
|
||||||
save();
|
save();
|
||||||
}
|
}
|
||||||
if (callback != null) {
|
if (callback != null)
|
||||||
callback.onCompletion(null, null);
|
callback.onCompletion(null, null);
|
||||||
}
|
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
|
@ -109,9 +109,8 @@ public class OffsetStorageWriter {
|
||||||
throw new CopycatRuntimeException("OffsetStorageWriter is already flushing");
|
throw new CopycatRuntimeException("OffsetStorageWriter is already flushing");
|
||||||
}
|
}
|
||||||
|
|
||||||
if (data.isEmpty()) {
|
if (data.isEmpty())
|
||||||
return false;
|
return false;
|
||||||
}
|
|
||||||
|
|
||||||
assert !flushing();
|
assert !flushing();
|
||||||
toFlush = data;
|
toFlush = data;
|
||||||
|
@ -157,10 +156,9 @@ public class OffsetStorageWriter {
|
||||||
@Override
|
@Override
|
||||||
public void onCompletion(Throwable error, Void result) {
|
public void onCompletion(Throwable error, Void result) {
|
||||||
boolean isCurrent = handleFinishWrite(flushId, error, result);
|
boolean isCurrent = handleFinishWrite(flushId, error, result);
|
||||||
if (isCurrent && callback != null) {
|
if (isCurrent && callback != null)
|
||||||
callback.onCompletion(error, result);
|
callback.onCompletion(error, result);
|
||||||
}
|
}
|
||||||
}
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -188,9 +186,8 @@ public class OffsetStorageWriter {
|
||||||
private synchronized boolean handleFinishWrite(long flushId, Throwable error, Void result) {
|
private synchronized boolean handleFinishWrite(long flushId, Throwable error, Void result) {
|
||||||
// Callbacks need to be handled carefully since the flush operation may have already timed
|
// Callbacks need to be handled carefully since the flush operation may have already timed
|
||||||
// out and been cancelled.
|
// out and been cancelled.
|
||||||
if (flushId != currentFlushId) {
|
if (flushId != currentFlushId)
|
||||||
return false;
|
return false;
|
||||||
}
|
|
||||||
|
|
||||||
if (error != null) {
|
if (error != null) {
|
||||||
cancelFlush();
|
cancelFlush();
|
||||||
|
|
|
@ -42,21 +42,17 @@ public class ConnectorTaskId implements Serializable {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean equals(Object o) {
|
public boolean equals(Object o) {
|
||||||
if (this == o) {
|
if (this == o)
|
||||||
return true;
|
return true;
|
||||||
}
|
if (o == null || getClass() != o.getClass())
|
||||||
if (o == null || getClass() != o.getClass()) {
|
|
||||||
return false;
|
return false;
|
||||||
}
|
|
||||||
|
|
||||||
ConnectorTaskId that = (ConnectorTaskId) o;
|
ConnectorTaskId that = (ConnectorTaskId) o;
|
||||||
|
|
||||||
if (task != that.task) {
|
if (task != that.task)
|
||||||
return false;
|
return false;
|
||||||
}
|
if (connector != null ? !connector.equals(that.connector) : that.connector != null)
|
||||||
if (connector != null ? !connector.equals(that.connector) : that.connector != null) {
|
|
||||||
return false;
|
return false;
|
||||||
}
|
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
|
@ -59,10 +59,9 @@ public abstract class ShutdownableThread extends Thread {
|
||||||
public ShutdownableThread(String name, boolean daemon) {
|
public ShutdownableThread(String name, boolean daemon) {
|
||||||
super(name);
|
super(name);
|
||||||
this.setDaemon(daemon);
|
this.setDaemon(daemon);
|
||||||
if (funcaughtExceptionHandler != null) {
|
if (funcaughtExceptionHandler != null)
|
||||||
this.setUncaughtExceptionHandler(funcaughtExceptionHandler);
|
this.setUncaughtExceptionHandler(funcaughtExceptionHandler);
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Implementations should override this method with the main body for the thread.
|
* Implementations should override this method with the main body for the thread.
|
||||||
|
@ -98,10 +97,9 @@ public abstract class ShutdownableThread extends Thread {
|
||||||
public void shutdown(long gracefulTimeout, TimeUnit unit)
|
public void shutdown(long gracefulTimeout, TimeUnit unit)
|
||||||
throws InterruptedException {
|
throws InterruptedException {
|
||||||
boolean success = gracefulShutdown(gracefulTimeout, unit);
|
boolean success = gracefulShutdown(gracefulTimeout, unit);
|
||||||
if (!success) {
|
if (!success)
|
||||||
forceShutdown();
|
forceShutdown();
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Attempt graceful shutdown
|
* Attempt graceful shutdown
|
||||||
|
|
|
@ -355,9 +355,8 @@ public class WorkerSinkTaskTest extends ThreadedTest {
|
||||||
@Override
|
@Override
|
||||||
public Object answer() throws Throwable {
|
public Object answer() throws Throwable {
|
||||||
time.sleep(consumerCommitDelayMs);
|
time.sleep(consumerCommitDelayMs);
|
||||||
if (invokeCallback) {
|
if (invokeCallback)
|
||||||
capturedCallback.getValue().onComplete(offsets, consumerCommitError);
|
capturedCallback.getValue().onComplete(offsets, consumerCommitError);
|
||||||
}
|
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
|
@ -207,9 +207,8 @@ public class OffsetStorageWriterTest {
|
||||||
return service.submit(new Callable<Void>() {
|
return service.submit(new Callable<Void>() {
|
||||||
@Override
|
@Override
|
||||||
public Void call() throws Exception {
|
public Void call() throws Exception {
|
||||||
if (waitForCompletion != null) {
|
if (waitForCompletion != null)
|
||||||
assertTrue(waitForCompletion.await(10000, TimeUnit.MILLISECONDS));
|
assertTrue(waitForCompletion.await(10000, TimeUnit.MILLISECONDS));
|
||||||
}
|
|
||||||
|
|
||||||
if (fail) {
|
if (fail) {
|
||||||
storeCallback.getValue().onCompletion(exception, null);
|
storeCallback.getValue().onCompletion(exception, null);
|
||||||
|
|
|
@ -26,14 +26,12 @@ public class TestBackgroundThreadExceptionHandler implements Thread.UncaughtExce
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void uncaughtException(Thread t, Throwable e) {
|
public void uncaughtException(Thread t, Throwable e) {
|
||||||
if (this.firstException == null) {
|
if (this.firstException == null)
|
||||||
this.firstException = e;
|
this.firstException = e;
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
public void verifyNoExceptions() {
|
public void verifyNoExceptions() {
|
||||||
if (this.firstException != null) {
|
if (this.firstException != null)
|
||||||
throw new AssertionError(this.firstException);
|
throw new AssertionError(this.firstException);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
Loading…
Reference in New Issue