mirror of https://github.com/apache/kafka.git
KAFKA-4500; Code quality improvements
- Removed redundant modifiers, not needed String.format() - Removed unnecessary semicolon, additional assignment, inlined return - Using StringBuilder for consistency across codebase - Using try-with-resources Author: Rekha Joshi <rekhajoshm@gmail.com> Author: Joshi <rekhajoshm@gmail.com> Reviewers: Ismael Juma <ismael@juma.me.uk> Closes #2222 from rekhajoshm/KAFKA-4500
This commit is contained in:
parent
135860225e
commit
a5c15ba037
|
@ -96,7 +96,7 @@ public class ConfigValue {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
StringBuffer sb = new StringBuffer();
|
StringBuilder sb = new StringBuilder();
|
||||||
sb.append("[")
|
sb.append("[")
|
||||||
.append(name)
|
.append(name)
|
||||||
.append(",")
|
.append(",")
|
||||||
|
|
|
@ -27,7 +27,7 @@ import org.apache.kafka.common.metrics.stats.Histogram.LinearBinScheme;
|
||||||
*/
|
*/
|
||||||
public class Percentiles extends SampledStat implements CompoundStat {
|
public class Percentiles extends SampledStat implements CompoundStat {
|
||||||
|
|
||||||
public static enum BucketSizing {
|
public enum BucketSizing {
|
||||||
CONSTANT, LINEAR
|
CONSTANT, LINEAR
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -16,4 +16,4 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.kafka.common.network;
|
package org.apache.kafka.common.network;
|
||||||
|
|
||||||
public enum Mode { CLIENT, SERVER };
|
public enum Mode { CLIENT, SERVER }
|
||||||
|
|
|
@ -182,7 +182,7 @@ public enum Errors {
|
||||||
private final short code;
|
private final short code;
|
||||||
private final ApiException exception;
|
private final ApiException exception;
|
||||||
|
|
||||||
private Errors(int code, ApiException exception) {
|
Errors(int code, ApiException exception) {
|
||||||
this.code = (short) code;
|
this.code = (short) code;
|
||||||
this.exception = exception;
|
this.exception = exception;
|
||||||
}
|
}
|
||||||
|
|
|
@ -66,7 +66,7 @@ public enum SecurityProtocol {
|
||||||
/* Whether this security protocol is for testing/debugging */
|
/* Whether this security protocol is for testing/debugging */
|
||||||
private final boolean isTesting;
|
private final boolean isTesting;
|
||||||
|
|
||||||
private SecurityProtocol(int id, String name, boolean isTesting) {
|
SecurityProtocol(int id, String name, boolean isTesting) {
|
||||||
this.id = (short) id;
|
this.id = (short) id;
|
||||||
this.name = name;
|
this.name = name;
|
||||||
this.isTesting = isTesting;
|
this.isTesting = isTesting;
|
||||||
|
|
|
@ -199,9 +199,8 @@ public final class KafkaLZ4BlockInputStream extends FilterInputStream {
|
||||||
if (finished) {
|
if (finished) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
int value = buffer[bufferOffset++] & 0xFF;
|
|
||||||
|
|
||||||
return value;
|
return buffer[bufferOffset++] & 0xFF;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -45,8 +45,8 @@ public class ControlledShutdownRequest extends AbstractRequest {
|
||||||
public AbstractResponse getErrorResponse(int versionId, Throwable e) {
|
public AbstractResponse getErrorResponse(int versionId, Throwable e) {
|
||||||
switch (versionId) {
|
switch (versionId) {
|
||||||
case 0:
|
case 0:
|
||||||
throw new IllegalArgumentException(String.format("Version 0 is not supported. It is only supported by " +
|
throw new IllegalArgumentException("Version 0 is not supported. It is only supported by " +
|
||||||
"the Scala request class for controlled shutdown"));
|
"the Scala request class for controlled shutdown");
|
||||||
case 1:
|
case 1:
|
||||||
return new ControlledShutdownResponse(Errors.forException(e).code(), Collections.<TopicPartition>emptySet());
|
return new ControlledShutdownResponse(Errors.forException(e).code(), Collections.<TopicPartition>emptySet());
|
||||||
default:
|
default:
|
||||||
|
|
|
@ -54,9 +54,8 @@ public class ProducerInterceptorsTest {
|
||||||
if (throwExceptionOnSend)
|
if (throwExceptionOnSend)
|
||||||
throw new KafkaException("Injected exception in AppendProducerInterceptor.onSend");
|
throw new KafkaException("Injected exception in AppendProducerInterceptor.onSend");
|
||||||
|
|
||||||
ProducerRecord<Integer, String> newRecord = new ProducerRecord<>(
|
return new ProducerRecord<>(
|
||||||
record.topic(), record.partition(), record.key(), record.value().concat(appendStr));
|
record.topic(), record.partition(), record.key(), record.value().concat(appendStr));
|
||||||
return newRecord;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -63,9 +63,8 @@ public class MockProducerInterceptor implements ClusterResourceListener, Produce
|
||||||
@Override
|
@Override
|
||||||
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
|
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
|
||||||
ONSEND_COUNT.incrementAndGet();
|
ONSEND_COUNT.incrementAndGet();
|
||||||
ProducerRecord<String, String> newRecord = new ProducerRecord<>(
|
return new ProducerRecord<>(
|
||||||
record.topic(), record.partition(), record.key(), record.value().concat(appendStr));
|
record.topic(), record.partition(), record.key(), record.value().concat(appendStr));
|
||||||
return newRecord;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -43,13 +43,13 @@ public interface OffsetBackingStore {
|
||||||
/**
|
/**
|
||||||
* Start this offset store.
|
* Start this offset store.
|
||||||
*/
|
*/
|
||||||
public void start();
|
void start();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Stop the backing store. Implementations should attempt to shutdown gracefully, but not block
|
* Stop the backing store. Implementations should attempt to shutdown gracefully, but not block
|
||||||
* indefinitely.
|
* indefinitely.
|
||||||
*/
|
*/
|
||||||
public void stop();
|
void stop();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the values for the specified keys
|
* Get the values for the specified keys
|
||||||
|
@ -57,7 +57,7 @@ public interface OffsetBackingStore {
|
||||||
* @param callback callback to invoke on completion
|
* @param callback callback to invoke on completion
|
||||||
* @return future for the resulting map from key to value
|
* @return future for the resulting map from key to value
|
||||||
*/
|
*/
|
||||||
public Future<Map<ByteBuffer, ByteBuffer>> get(
|
Future<Map<ByteBuffer, ByteBuffer>> get(
|
||||||
Collection<ByteBuffer> keys,
|
Collection<ByteBuffer> keys,
|
||||||
Callback<Map<ByteBuffer, ByteBuffer>> callback);
|
Callback<Map<ByteBuffer, ByteBuffer>> callback);
|
||||||
|
|
||||||
|
@ -67,12 +67,12 @@ public interface OffsetBackingStore {
|
||||||
* @param callback callback to invoke on completion
|
* @param callback callback to invoke on completion
|
||||||
* @return void future for the operation
|
* @return void future for the operation
|
||||||
*/
|
*/
|
||||||
public Future<Void> set(Map<ByteBuffer, ByteBuffer> values,
|
Future<Void> set(Map<ByteBuffer, ByteBuffer> values,
|
||||||
Callback<Void> callback);
|
Callback<Void> callback);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Configure class with the given key-value pairs
|
* Configure class with the given key-value pairs
|
||||||
* @param config can be DistributedConfig or StandaloneConfig
|
* @param config can be DistributedConfig or StandaloneConfig
|
||||||
*/
|
*/
|
||||||
public void configure(WorkerConfig config);
|
void configure(WorkerConfig config);
|
||||||
}
|
}
|
||||||
|
|
|
@ -84,7 +84,7 @@ public class StandaloneHerderTest {
|
||||||
|
|
||||||
private enum SourceSink {
|
private enum SourceSink {
|
||||||
SOURCE, SINK
|
SOURCE, SINK
|
||||||
};
|
}
|
||||||
|
|
||||||
private StandaloneHerder herder;
|
private StandaloneHerder herder;
|
||||||
|
|
||||||
|
|
|
@ -25,7 +25,7 @@ import kafka.api._
|
||||||
import kafka.network._
|
import kafka.network._
|
||||||
import kafka.utils._
|
import kafka.utils._
|
||||||
import kafka.common.{ErrorMapping, TopicAndPartition}
|
import kafka.common.{ErrorMapping, TopicAndPartition}
|
||||||
import org.apache.kafka.common.network.{NetworkReceive, Receive}
|
import org.apache.kafka.common.network.{NetworkReceive}
|
||||||
import org.apache.kafka.common.utils.Utils._
|
import org.apache.kafka.common.utils.Utils._
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -254,9 +254,7 @@ public class KafkaLog4jAppender extends AppenderSkeleton {
|
||||||
if (syncSend) {
|
if (syncSend) {
|
||||||
try {
|
try {
|
||||||
response.get();
|
response.get();
|
||||||
} catch (InterruptedException ex) {
|
} catch (InterruptedException | ExecutionException ex) {
|
||||||
throw new RuntimeException(ex);
|
|
||||||
} catch (ExecutionException ex) {
|
|
||||||
throw new RuntimeException(ex);
|
throw new RuntimeException(ex);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -42,10 +42,11 @@ public class ThroughputThrottler {
|
||||||
private static final long NS_PER_SEC = 1000 * NS_PER_MS;
|
private static final long NS_PER_SEC = 1000 * NS_PER_MS;
|
||||||
private static final long MIN_SLEEP_NS = 2 * NS_PER_MS;
|
private static final long MIN_SLEEP_NS = 2 * NS_PER_MS;
|
||||||
|
|
||||||
long sleepTimeNs;
|
private final long startMs;
|
||||||
long sleepDeficitNs = 0;
|
private final long sleepTimeNs;
|
||||||
long targetThroughput = -1;
|
private final long targetThroughput;
|
||||||
long startMs;
|
|
||||||
|
private long sleepDeficitNs = 0;
|
||||||
private boolean wakeup = false;
|
private boolean wakeup = false;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -46,8 +46,8 @@ import java.io.Closeable;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.PrintStream;
|
import java.io.PrintStream;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -215,7 +215,7 @@ public class VerifiableConsumer implements Closeable, OffsetCommitCallback, Cons
|
||||||
|
|
||||||
public void run() {
|
public void run() {
|
||||||
try {
|
try {
|
||||||
consumer.subscribe(Arrays.asList(topic), this);
|
consumer.subscribe(Collections.singletonList(topic), this);
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
|
ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
|
||||||
|
|
|
@ -163,13 +163,8 @@ public class VerifiableLog4jAppender {
|
||||||
*/
|
*/
|
||||||
public static Properties loadProps(String filename) throws IOException, FileNotFoundException {
|
public static Properties loadProps(String filename) throws IOException, FileNotFoundException {
|
||||||
Properties props = new Properties();
|
Properties props = new Properties();
|
||||||
InputStream propStream = null;
|
try (InputStream propStream = new FileInputStream(filename)) {
|
||||||
try {
|
|
||||||
propStream = new FileInputStream(filename);
|
|
||||||
props.load(propStream);
|
props.load(propStream);
|
||||||
} finally {
|
|
||||||
if (propStream != null)
|
|
||||||
propStream.close();
|
|
||||||
}
|
}
|
||||||
return props;
|
return props;
|
||||||
}
|
}
|
||||||
|
|
|
@ -165,13 +165,8 @@ public class VerifiableProducer {
|
||||||
*/
|
*/
|
||||||
public static Properties loadProps(String filename) throws IOException, FileNotFoundException {
|
public static Properties loadProps(String filename) throws IOException, FileNotFoundException {
|
||||||
Properties props = new Properties();
|
Properties props = new Properties();
|
||||||
InputStream propStream = null;
|
try (InputStream propStream = new FileInputStream(filename)) {
|
||||||
try {
|
|
||||||
propStream = new FileInputStream(filename);
|
|
||||||
props.load(propStream);
|
props.load(propStream);
|
||||||
} finally {
|
|
||||||
if (propStream != null)
|
|
||||||
propStream.close();
|
|
||||||
}
|
}
|
||||||
return props;
|
return props;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue