diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigValue.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigValue.java index 985e05f3540..a62fcc28853 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/ConfigValue.java +++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigValue.java @@ -96,7 +96,7 @@ public class ConfigValue { @Override public String toString() { - StringBuffer sb = new StringBuffer(); + StringBuilder sb = new StringBuilder(); sb.append("[") .append(name) .append(",") diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Percentiles.java b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Percentiles.java index 78c93e88fa0..12064f51f52 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Percentiles.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Percentiles.java @@ -27,7 +27,7 @@ import org.apache.kafka.common.metrics.stats.Histogram.LinearBinScheme; */ public class Percentiles extends SampledStat implements CompoundStat { - public static enum BucketSizing { + public enum BucketSizing { CONSTANT, LINEAR } diff --git a/clients/src/main/java/org/apache/kafka/common/network/Mode.java b/clients/src/main/java/org/apache/kafka/common/network/Mode.java index 67de44d7117..4d8ef3b601a 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/Mode.java +++ b/clients/src/main/java/org/apache/kafka/common/network/Mode.java @@ -16,4 +16,4 @@ */ package org.apache.kafka.common.network; -public enum Mode { CLIENT, SERVER }; +public enum Mode { CLIENT, SERVER } diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java index f32399da65e..3905c827169 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java @@ -182,7 +182,7 @@ public enum Errors { private final short code; private final ApiException exception; - private Errors(int code, ApiException exception) { + Errors(int code, ApiException exception) { this.code = (short) code; this.exception = exception; } diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java index d5fbed71d22..fe48832b49b 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java @@ -66,7 +66,7 @@ public enum SecurityProtocol { /* Whether this security protocol is for testing/debugging */ private final boolean isTesting; - private SecurityProtocol(int id, String name, boolean isTesting) { + SecurityProtocol(int id, String name, boolean isTesting) { this.id = (short) id; this.name = name; this.isTesting = isTesting; diff --git a/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockInputStream.java b/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockInputStream.java index 92718d896bd..a408580271b 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockInputStream.java +++ b/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockInputStream.java @@ -199,9 +199,8 @@ public final class KafkaLZ4BlockInputStream extends FilterInputStream { if (finished) { return -1; } - int value = buffer[bufferOffset++] & 0xFF; - return value; + return buffer[bufferOffset++] & 0xFF; } @Override diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java index c2ace3296c3..24adb361740 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java @@ -45,8 +45,8 @@ public class ControlledShutdownRequest extends AbstractRequest { public AbstractResponse getErrorResponse(int versionId, Throwable e) { switch (versionId) { case 0: - throw new IllegalArgumentException(String.format("Version 0 is not supported. It is only supported by " + - "the Scala request class for controlled shutdown")); + throw new IllegalArgumentException("Version 0 is not supported. It is only supported by " + + "the Scala request class for controlled shutdown"); case 1: return new ControlledShutdownResponse(Errors.forException(e).code(), Collections.emptySet()); default: diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerInterceptorsTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerInterceptorsTest.java index 2135eb2cf30..659a29f41eb 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerInterceptorsTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerInterceptorsTest.java @@ -54,9 +54,8 @@ public class ProducerInterceptorsTest { if (throwExceptionOnSend) throw new KafkaException("Injected exception in AppendProducerInterceptor.onSend"); - ProducerRecord newRecord = new ProducerRecord<>( + return new ProducerRecord<>( record.topic(), record.partition(), record.key(), record.value().concat(appendStr)); - return newRecord; } @Override diff --git a/clients/src/test/java/org/apache/kafka/test/MockProducerInterceptor.java b/clients/src/test/java/org/apache/kafka/test/MockProducerInterceptor.java index 9c4721bdc75..22a588d7377 100644 --- a/clients/src/test/java/org/apache/kafka/test/MockProducerInterceptor.java +++ b/clients/src/test/java/org/apache/kafka/test/MockProducerInterceptor.java @@ -63,9 +63,8 @@ public class MockProducerInterceptor implements ClusterResourceListener, Produce @Override public ProducerRecord onSend(ProducerRecord record) { ONSEND_COUNT.incrementAndGet(); - ProducerRecord newRecord = new ProducerRecord<>( + return new ProducerRecord<>( record.topic(), record.partition(), record.key(), record.value().concat(appendStr)); - return newRecord; } @Override diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetBackingStore.java index 1b74a905361..e307eed751b 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetBackingStore.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetBackingStore.java @@ -43,13 +43,13 @@ public interface OffsetBackingStore { /** * Start this offset store. */ - public void start(); + void start(); /** * Stop the backing store. Implementations should attempt to shutdown gracefully, but not block * indefinitely. */ - public void stop(); + void stop(); /** * Get the values for the specified keys @@ -57,7 +57,7 @@ public interface OffsetBackingStore { * @param callback callback to invoke on completion * @return future for the resulting map from key to value */ - public Future> get( + Future> get( Collection keys, Callback> callback); @@ -67,12 +67,12 @@ public interface OffsetBackingStore { * @param callback callback to invoke on completion * @return void future for the operation */ - public Future set(Map values, + Future set(Map values, Callback callback); /** * Configure class with the given key-value pairs * @param config can be DistributedConfig or StandaloneConfig */ - public void configure(WorkerConfig config); + void configure(WorkerConfig config); } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java index 010c0b254cc..e05aa41299b 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java @@ -84,7 +84,7 @@ public class StandaloneHerderTest { private enum SourceSink { SOURCE, SINK - }; + } private StandaloneHerder herder; diff --git a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala index e3156789540..5d219ff32ad 100644 --- a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala +++ b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala @@ -25,7 +25,7 @@ import kafka.api._ import kafka.network._ import kafka.utils._ import kafka.common.{ErrorMapping, TopicAndPartition} -import org.apache.kafka.common.network.{NetworkReceive, Receive} +import org.apache.kafka.common.network.{NetworkReceive} import org.apache.kafka.common.utils.Utils._ /** diff --git a/log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java b/log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java index 5759105b383..ed96e7b6e5a 100644 --- a/log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java +++ b/log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java @@ -254,9 +254,7 @@ public class KafkaLog4jAppender extends AppenderSkeleton { if (syncSend) { try { response.get(); - } catch (InterruptedException ex) { - throw new RuntimeException(ex); - } catch (ExecutionException ex) { + } catch (InterruptedException | ExecutionException ex) { throw new RuntimeException(ex); } } diff --git a/tools/src/main/java/org/apache/kafka/tools/ThroughputThrottler.java b/tools/src/main/java/org/apache/kafka/tools/ThroughputThrottler.java index 68de638a1b9..b5504846457 100644 --- a/tools/src/main/java/org/apache/kafka/tools/ThroughputThrottler.java +++ b/tools/src/main/java/org/apache/kafka/tools/ThroughputThrottler.java @@ -42,10 +42,11 @@ public class ThroughputThrottler { private static final long NS_PER_SEC = 1000 * NS_PER_MS; private static final long MIN_SLEEP_NS = 2 * NS_PER_MS; - long sleepTimeNs; - long sleepDeficitNs = 0; - long targetThroughput = -1; - long startMs; + private final long startMs; + private final long sleepTimeNs; + private final long targetThroughput; + + private long sleepDeficitNs = 0; private boolean wakeup = false; /** diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java index 8db442e3f91..cd172176b51 100644 --- a/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java +++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java @@ -46,8 +46,8 @@ import java.io.Closeable; import java.io.IOException; import java.io.PrintStream; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -215,7 +215,7 @@ public class VerifiableConsumer implements Closeable, OffsetCommitCallback, Cons public void run() { try { - consumer.subscribe(Arrays.asList(topic), this); + consumer.subscribe(Collections.singletonList(topic), this); while (true) { ConsumerRecords records = consumer.poll(Long.MAX_VALUE); diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableLog4jAppender.java b/tools/src/main/java/org/apache/kafka/tools/VerifiableLog4jAppender.java index ffbf7dc1a73..daf569cd771 100644 --- a/tools/src/main/java/org/apache/kafka/tools/VerifiableLog4jAppender.java +++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableLog4jAppender.java @@ -163,13 +163,8 @@ public class VerifiableLog4jAppender { */ public static Properties loadProps(String filename) throws IOException, FileNotFoundException { Properties props = new Properties(); - InputStream propStream = null; - try { - propStream = new FileInputStream(filename); + try (InputStream propStream = new FileInputStream(filename)) { props.load(propStream); - } finally { - if (propStream != null) - propStream.close(); } return props; } diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java b/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java index 30f08e8880f..54d53f10bea 100644 --- a/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java +++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java @@ -165,13 +165,8 @@ public class VerifiableProducer { */ public static Properties loadProps(String filename) throws IOException, FileNotFoundException { Properties props = new Properties(); - InputStream propStream = null; - try { - propStream = new FileInputStream(filename); + try (InputStream propStream = new FileInputStream(filename)) { props.load(propStream); - } finally { - if (propStream != null) - propStream.close(); } return props; }