KAFKA-4977: Fix findbugs issues in connect/runtime

Author: Colin P. Mccabe <cmccabe@confluent.io>

Reviewers: Konstantine Karantasis <konstantine@confluent.io>, Ewen Cheslack-Postava <ewen@confluent.io>

Closes #2763 from cmccabe/KAFKA-4977
This commit is contained in:
Colin P. Mccabe 2017-04-03 17:57:12 -07:00 committed by Ewen Cheslack-Postava
parent ca2979f847
commit f812a8fd93
6 changed files with 71 additions and 8 deletions

View File

@ -102,9 +102,9 @@ public class Worker {
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
// These settings are designed to ensure there is no data loss. They *may* be overridden via configs passed to the
// worker, but this may compromise the delivery guarantees of Kafka Connect.
producerProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, ((Integer) Integer.MAX_VALUE).toString());
producerProps.put(ProducerConfig.RETRIES_CONFIG, ((Integer) Integer.MAX_VALUE).toString());
producerProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, ((Long) Long.MAX_VALUE).toString());
producerProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, Integer.toString(Integer.MAX_VALUE));
producerProps.put(ProducerConfig.RETRIES_CONFIG, Integer.toString(Integer.MAX_VALUE));
producerProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, Long.toString(Long.MAX_VALUE));
producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
producerProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");
// User-specified overrides

View File

@ -54,6 +54,7 @@ import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentSkipListSet;
@ -1116,6 +1117,20 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
final int cmp = Long.compare(at, o.at);
return cmp == 0 ? Long.compare(seq, o.seq) : cmp;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof HerderRequest))
return false;
HerderRequest other = (HerderRequest) o;
return compareTo(other) == 0;
}
@Override
public int hashCode() {
return Objects.hash(at, seq);
}
}
private static final Callback<Void> forwardErrorCallback(final Callback<?> callback) {

View File

@ -21,6 +21,7 @@ import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.List;
import java.util.Objects;
public class ConnectorStateInfo {
@ -103,6 +104,21 @@ public class ConnectorStateInfo {
public int compareTo(TaskState that) {
return Integer.compare(this.id, that.id);
}
@Override
public boolean equals(Object o) {
if (o == this)
return true;
if (!(o instanceof TaskState))
return false;
TaskState other = (TaskState) o;
return compareTo(other) == 0;
}
@Override
public int hashCode() {
return Objects.hash(id);
}
}
}

View File

@ -159,9 +159,7 @@ public class SchemaSourceTask extends SourceTask {
count++;
return result;
} else {
synchronized (this) {
this.wait();
}
throttler.throttle();
return new ArrayList<>();
}
}
@ -170,4 +168,4 @@ public class SchemaSourceTask extends SourceTask {
public void stop() {
throttler.wakeup();
}
}
}

View File

@ -79,6 +79,6 @@ public class ConnectorTaskId implements Serializable, Comparable<ConnectorTaskId
int connectorCmp = connector.compareTo(o.connector);
if (connectorCmp != 0)
return connectorCmp;
return ((Integer) task).compareTo(o.task);
return Integer.compare(task, o.task);
}
}

View File

@ -61,4 +61,38 @@
benchmarking. -->
<Package name="org.apache.kafka.jmh.cache.generated"/>
</Match>
<Match>
<!-- Suppress warnings about comparing a config string to
ConfigDef.NO_DEFAULT_VALUE using object equality. This is intentional. -->
<Class name="org.apache.kafka.connect.runtime.AbstractHerder"/>
<Method name="convertConfigKey"/>
<Bug pattern="ES_COMPARING_STRINGS_WITH_EQ"/>
</Match>
<Match>
<!-- Suppress warnings about ignoring the return value of await.
This is done intentionally because we use other clues to determine
if the wait was cut short. -->
<Class name="org.apache.kafka.connect.runtime.WorkerSourceTask"/>
<Method name="execute"/>
<Bug pattern="RV_RETURN_VALUE_IGNORED"/>
</Match>
<Match>
<!-- Suppress some warnings about intentional switch statement fallthrough. -->
<Class name="org.apache.kafka.connect.runtime.WorkerConnector"/>
<Or>
<Method name="doStart"/>
<Method name="pause"/>
</Or>
<Bug pattern="SF_SWITCH_FALLTHROUGH"/>
</Match>
<Match>
<!-- Suppress some inconsistent synchronization warnings. TODO: fix these. See
KAFKA-4994. -->
<Class name="org.apache.kafka.connect.storage.OffsetStorageWriter"/>
<Bug pattern="IS2_INCONSISTENT_SYNC"/>
</Match>
</FindBugsFilter>