diff --git a/build.gradle b/build.gradle
index 48f3f2f9e24..f9862c6a0a7 100644
--- a/build.gradle
+++ b/build.gradle
@@ -82,7 +82,7 @@ ext {
maxPermSizeArgs = []
if (!JavaVersion.current().isJava8Compatible())
- maxPermSizeArgs = ['-XX:MaxPermSize=512m']
+ maxPermSizeArgs += '-XX:MaxPermSize=512m'
userMaxForks = project.hasProperty('maxParallelForks') ? maxParallelForks.toInteger() : null
@@ -137,14 +137,24 @@ subprojects {
apply plugin: 'maven'
apply plugin: 'signing'
apply plugin: 'checkstyle'
- apply plugin: 'findbugs'
+
+ if (!JavaVersion.current().isJava9Compatible())
+ apply plugin: 'findbugs'
sourceCompatibility = 1.7
+ targetCompatibility = 1.7
compileJava {
options.encoding = 'UTF-8'
- // Add unchecked once we drop support for Java 7 as @SuppressWarnings("unchecked") is too buggy in Java 7
options.compilerArgs << "-Xlint:deprecation"
+ // -Xlint:unchecked is too buggy in Java 7, so we only enable for Java 8 or higher
+ if (JavaVersion.current().isJava8Compatible())
+ options.compilerArgs << "-Xlint:unchecked"
+ // --release is the recommended way to select the target release, but it's only supported in Java 9 so we also
+ // set --source and --target via `sourceCompatibility` and `targetCompatibility`. If/when Gradle supports `--release`
+ // natively (https://github.com/gradle/gradle/issues/2510), we should switch to that.
+ if (JavaVersion.current().isJava9Compatible())
+ options.compilerArgs << "--release" << "7"
}
uploadArchives {
@@ -349,17 +359,19 @@ subprojects {
}
test.dependsOn('checkstyleMain', 'checkstyleTest')
- findbugs {
- toolVersion = "3.0.1"
- excludeFilter = file("$rootDir/gradle/findbugs-exclude.xml")
- ignoreFailures = false
- }
- test.dependsOn('findbugsMain')
+ if (!JavaVersion.current().isJava9Compatible()) {
+ findbugs {
+ toolVersion = "3.0.1"
+ excludeFilter = file("$rootDir/gradle/findbugs-exclude.xml")
+ ignoreFailures = false
+ }
+ test.dependsOn('findbugsMain')
- tasks.withType(FindBugs) {
- reports {
- xml.enabled (project.hasProperty('xmlFindBugsReport'))
- html.enabled (!project.hasProperty('xmlFindBugsReport'))
+ tasks.withType(FindBugs) {
+ reports {
+ xml.enabled(project.hasProperty('xmlFindBugsReport'))
+ html.enabled(!project.hasProperty('xmlFindBugsReport'))
+ }
}
}
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 4bd907bc93a..8c3e3ae628e 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -98,7 +98,6 @@
-
@@ -247,7 +246,6 @@
-
diff --git a/clients/src/main/java/org/apache/kafka/common/security/plain/PlainSaslServerProvider.java b/clients/src/main/java/org/apache/kafka/common/security/plain/PlainSaslServerProvider.java
index 51998a9ebc4..ae1424417f8 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/plain/PlainSaslServerProvider.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/plain/PlainSaslServerProvider.java
@@ -25,9 +25,10 @@ public class PlainSaslServerProvider extends Provider {
private static final long serialVersionUID = 1L;
+ @SuppressWarnings("deprecation")
protected PlainSaslServerProvider() {
super("Simple SASL/PLAIN Server Provider", 1.0, "Simple SASL/PLAIN Server Provider for Kafka");
- super.put("SaslServerFactory." + PlainSaslServer.PLAIN_MECHANISM, PlainSaslServerFactory.class.getName());
+ put("SaslServerFactory." + PlainSaslServer.PLAIN_MECHANISM, PlainSaslServerFactory.class.getName());
}
public static void initialize() {
diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramCredentialUtils.java b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramCredentialUtils.java
index 8120c156b4d..55b0651e8f0 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramCredentialUtils.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramCredentialUtils.java
@@ -19,9 +19,8 @@ package org.apache.kafka.common.security.scram;
import java.util.Collection;
import java.util.Properties;
-import javax.xml.bind.DatatypeConverter;
-
import org.apache.kafka.common.security.authenticator.CredentialCache;
+import org.apache.kafka.common.utils.Base64;
/**
* SCRAM Credential persistence utility functions. Implements format conversion used
@@ -41,11 +40,11 @@ public class ScramCredentialUtils {
public static String credentialToString(ScramCredential credential) {
return String.format("%s=%s,%s=%s,%s=%s,%s=%d",
SALT,
- DatatypeConverter.printBase64Binary(credential.salt()),
+ Base64.encoder().encodeToString(credential.salt()),
STORED_KEY,
- DatatypeConverter.printBase64Binary(credential.storedKey()),
+ Base64.encoder().encodeToString(credential.storedKey()),
SERVER_KEY,
- DatatypeConverter.printBase64Binary(credential.serverKey()),
+ Base64.encoder().encodeToString(credential.serverKey()),
ITERATIONS,
credential.iterations());
}
@@ -56,9 +55,9 @@ public class ScramCredentialUtils {
!props.containsKey(SERVER_KEY) || !props.containsKey(ITERATIONS)) {
throw new IllegalArgumentException("Credentials not valid: " + str);
}
- byte[] salt = DatatypeConverter.parseBase64Binary(props.getProperty(SALT));
- byte[] storedKey = DatatypeConverter.parseBase64Binary(props.getProperty(STORED_KEY));
- byte[] serverKey = DatatypeConverter.parseBase64Binary(props.getProperty(SERVER_KEY));
+ byte[] salt = Base64.decoder().decode(props.getProperty(SALT));
+ byte[] storedKey = Base64.decoder().decode(props.getProperty(STORED_KEY));
+ byte[] serverKey = Base64.decoder().decode(props.getProperty(SERVER_KEY));
int iterations = Integer.parseInt(props.getProperty(ITERATIONS));
return new ScramCredential(salt, storedKey, serverKey, iterations);
}
diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramMessages.java b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramMessages.java
index 6fd117dcee3..1ad7266e7bf 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramMessages.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramMessages.java
@@ -16,12 +16,13 @@
*/
package org.apache.kafka.common.security.scram;
+import org.apache.kafka.common.utils.Base64;
+
import java.nio.charset.StandardCharsets;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.security.sasl.SaslException;
-import javax.xml.bind.DatatypeConverter;
/**
* SCRAM request/response message creation and parsing based on
@@ -140,7 +141,7 @@ public class ScramMessages {
}
this.nonce = matcher.group("nonce");
String salt = matcher.group("salt");
- this.salt = DatatypeConverter.parseBase64Binary(salt);
+ this.salt = Base64.decoder().decode(salt);
}
public ServerFirstMessage(String clientNonce, String serverNonce, byte[] salt, int iterations) {
this.nonce = clientNonce + serverNonce;
@@ -157,7 +158,7 @@ public class ScramMessages {
return iterations;
}
String toMessage() {
- return String.format("r=%s,s=%s,i=%d", nonce, DatatypeConverter.printBase64Binary(salt), iterations);
+ return String.format("r=%s,s=%s,i=%d", nonce, Base64.encoder().encodeToString(salt), iterations);
}
}
/**
@@ -184,9 +185,9 @@ public class ScramMessages {
if (!matcher.matches())
throw new SaslException("Invalid SCRAM client final message format: " + message);
- this.channelBinding = DatatypeConverter.parseBase64Binary(matcher.group("channel"));
+ this.channelBinding = Base64.decoder().decode(matcher.group("channel"));
this.nonce = matcher.group("nonce");
- this.proof = DatatypeConverter.parseBase64Binary(matcher.group("proof"));
+ this.proof = Base64.decoder().decode(matcher.group("proof"));
}
public ClientFinalMessage(byte[] channelBinding, String nonce) {
this.channelBinding = channelBinding;
@@ -206,13 +207,13 @@ public class ScramMessages {
}
public String clientFinalMessageWithoutProof() {
return String.format("c=%s,r=%s",
- DatatypeConverter.printBase64Binary(channelBinding),
+ Base64.encoder().encodeToString(channelBinding),
nonce);
}
String toMessage() {
return String.format("%s,p=%s",
clientFinalMessageWithoutProof(),
- DatatypeConverter.printBase64Binary(proof));
+ Base64.encoder().encodeToString(proof));
}
}
/**
@@ -243,7 +244,7 @@ public class ScramMessages {
// ignore
}
if (error == null) {
- this.serverSignature = DatatypeConverter.parseBase64Binary(matcher.group("signature"));
+ this.serverSignature = Base64.decoder().decode(matcher.group("signature"));
this.error = null;
} else {
this.serverSignature = null;
@@ -264,7 +265,7 @@ public class ScramMessages {
if (error != null)
return "e=" + error;
else
- return "v=" + DatatypeConverter.printBase64Binary(serverSignature);
+ return "v=" + Base64.encoder().encodeToString(serverSignature);
}
}
}
diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramSaslClientProvider.java b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramSaslClientProvider.java
index fac673e6dd3..d389f044a99 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramSaslClientProvider.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramSaslClientProvider.java
@@ -25,10 +25,11 @@ public class ScramSaslClientProvider extends Provider {
private static final long serialVersionUID = 1L;
+ @SuppressWarnings("deprecation")
protected ScramSaslClientProvider() {
super("SASL/SCRAM Client Provider", 1.0, "SASL/SCRAM Client Provider for Kafka");
for (ScramMechanism mechanism : ScramMechanism.values())
- super.put("SaslClientFactory." + mechanism.mechanismName(), ScramSaslClientFactory.class.getName());
+ put("SaslClientFactory." + mechanism.mechanismName(), ScramSaslClientFactory.class.getName());
}
public static void initialize() {
diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramSaslServerProvider.java b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramSaslServerProvider.java
index 2f768a35fe7..9f2a6b3d256 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramSaslServerProvider.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramSaslServerProvider.java
@@ -25,10 +25,11 @@ public class ScramSaslServerProvider extends Provider {
private static final long serialVersionUID = 1L;
+ @SuppressWarnings("deprecation")
protected ScramSaslServerProvider() {
super("SASL/SCRAM Server Provider", 1.0, "SASL/SCRAM Server Provider for Kafka");
for (ScramMechanism mechanism : ScramMechanism.values())
- super.put("SaslServerFactory." + mechanism.mechanismName(), ScramSaslServerFactory.class.getName());
+ put("SaslServerFactory." + mechanism.mechanismName(), ScramSaslServerFactory.class.getName());
}
public static void initialize() {
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Base64.java b/clients/src/main/java/org/apache/kafka/common/utils/Base64.java
new file mode 100644
index 00000000000..e06e1eed5a0
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Base64.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.utils;
+
+import java.lang.invoke.MethodHandle;
+import java.lang.invoke.MethodHandles;
+import java.lang.invoke.MethodType;
+
+/**
+ * Temporary class in order to support Java 7 and Java 9. `DatatypeConverter` is not in the base module of Java 9
+ * and `java.util.Base64` was only introduced in Java 8.
+ */
+public final class Base64 {
+
+ private static final Factory FACTORY;
+
+ static {
+ if (Java.IS_JAVA8_COMPATIBLE)
+ FACTORY = new Java8Factory();
+ else
+ FACTORY = new Java7Factory();
+ }
+
+ private Base64() {}
+
+ public static Encoder encoder() {
+ return FACTORY.encoder();
+ }
+
+ public static Encoder urlEncoderNoPadding() {
+ return FACTORY.urlEncoderNoPadding();
+ }
+
+ public static Decoder decoder() {
+ return FACTORY.decoder();
+ }
+
+ /* Contains a subset of methods from java.util.Base64.Encoder (introduced in Java 8) */
+ public interface Encoder {
+ String encodeToString(byte[] bytes);
+ }
+
+ /* Contains a subset of methods from java.util.Base64.Decoder (introduced in Java 8) */
+ public interface Decoder {
+ byte[] decode(String string);
+ }
+
+ private interface Factory {
+ Encoder urlEncoderNoPadding();
+ Encoder encoder();
+ Decoder decoder();
+ }
+
+ private static class Java8Factory implements Factory {
+
+ // Static final MethodHandles are optimised better by HotSpot
+ private static final MethodHandle URL_ENCODE_NO_PADDING;
+ private static final MethodHandle ENCODE;
+ private static final MethodHandle DECODE;
+
+ private static final Encoder URL_ENCODER_NO_PADDING;
+ private static final Encoder ENCODER;
+ private static final Decoder DECODER;
+
+ static {
+ try {
+ Class> base64Class = Class.forName("java.util.Base64");
+
+ MethodHandles.Lookup lookup = MethodHandles.publicLookup();
+
+ Class> juEncoderClass = Class.forName("java.util.Base64$Encoder");
+
+ MethodHandle getEncoder = lookup.findStatic(base64Class, "getEncoder",
+ MethodType.methodType(juEncoderClass));
+ Object juEncoder;
+ try {
+ juEncoder = getEncoder.invoke();
+ } catch (Throwable throwable) {
+ // Invoked method doesn't throw checked exceptions, so safe to cast
+ throw (RuntimeException) throwable;
+ }
+ MethodHandle encode = lookup.findVirtual(juEncoderClass, "encodeToString",
+ MethodType.methodType(String.class, byte[].class));
+ ENCODE = encode.bindTo(juEncoder);
+
+
+ MethodHandle getUrlEncoder = lookup.findStatic(base64Class, "getUrlEncoder",
+ MethodType.methodType(juEncoderClass));
+ Object juUrlEncoderNoPassing;
+ try {
+ juUrlEncoderNoPassing = lookup.findVirtual(juEncoderClass, "withoutPadding",
+ MethodType.methodType(juEncoderClass)).invoke(getUrlEncoder.invoke());
+ } catch (Throwable throwable) {
+ // Invoked method doesn't throw checked exceptions, so safe to cast
+ throw (RuntimeException) throwable;
+ }
+ URL_ENCODE_NO_PADDING = encode.bindTo(juUrlEncoderNoPassing);
+
+ Class> juDecoderClass = Class.forName("java.util.Base64$Decoder");
+ MethodHandle getDecoder = lookup.findStatic(base64Class, "getDecoder",
+ MethodType.methodType(juDecoderClass));
+ MethodHandle decode = lookup.findVirtual(juDecoderClass, "decode",
+ MethodType.methodType(byte[].class, String.class));
+ try {
+ DECODE = decode.bindTo(getDecoder.invoke());
+ } catch (Throwable throwable) {
+ // Invoked method doesn't throw checked exceptions, so safe to cast
+ throw (RuntimeException) throwable;
+ }
+
+ URL_ENCODER_NO_PADDING = new Encoder() {
+ @Override
+ public String encodeToString(byte[] bytes) {
+ try {
+ return (String) URL_ENCODE_NO_PADDING.invokeExact(bytes);
+ } catch (Throwable throwable) {
+ // Invoked method doesn't throw checked exceptions, so safe to cast
+ throw (RuntimeException) throwable;
+ }
+ }
+ };
+
+ ENCODER = new Encoder() {
+ @Override
+ public String encodeToString(byte[] bytes) {
+ try {
+ return (String) ENCODE.invokeExact(bytes);
+ } catch (Throwable throwable) {
+ // Invoked method doesn't throw checked exceptions, so safe to cast
+ throw (RuntimeException) throwable;
+ }
+ }
+ };
+
+ DECODER = new Decoder() {
+ @Override
+ public byte[] decode(String string) {
+ try {
+ return (byte[]) DECODE.invokeExact(string);
+ } catch (Throwable throwable) {
+ // Invoked method doesn't throw checked exceptions, so safe to cast
+ throw (RuntimeException) throwable;
+ }
+ }
+ };
+
+ } catch (ReflectiveOperationException e) {
+ // Should never happen
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public Encoder urlEncoderNoPadding() {
+ return URL_ENCODER_NO_PADDING;
+ }
+
+ @Override
+ public Encoder encoder() {
+ return ENCODER;
+ }
+
+ @Override
+ public Decoder decoder() {
+ return DECODER;
+ }
+ }
+
+ private static class Java7Factory implements Factory {
+
+ // Static final MethodHandles are optimised better by HotSpot
+ private static final MethodHandle PRINT;
+ private static final MethodHandle PARSE;
+
+ static {
+ try {
+ Class> cls = Class.forName("javax.xml.bind.DatatypeConverter");
+ MethodHandles.Lookup lookup = MethodHandles.publicLookup();
+ PRINT = lookup.findStatic(cls, "printBase64Binary", MethodType.methodType(String.class,
+ byte[].class));
+ PARSE = lookup.findStatic(cls, "parseBase64Binary", MethodType.methodType(byte[].class,
+ String.class));
+ } catch (ReflectiveOperationException e) {
+ // Should never happen
+ throw new RuntimeException(e);
+ }
+ }
+
+ public static final Encoder URL_ENCODER_NO_PADDING = new Encoder() {
+
+ @Override
+ public String encodeToString(byte[] bytes) {
+ String base64EncodedUUID = Java7Factory.encodeToString(bytes);
+ //Convert to URL safe variant by replacing + and / with - and _ respectively.
+ String urlSafeBase64EncodedUUID = base64EncodedUUID.replace("+", "-")
+ .replace("/", "_");
+ // Remove the "==" padding at the end.
+ return urlSafeBase64EncodedUUID.substring(0, urlSafeBase64EncodedUUID.length() - 2);
+ }
+
+ };
+
+ public static final Encoder ENCODER = new Encoder() {
+ @Override
+ public String encodeToString(byte[] bytes) {
+ return Java7Factory.encodeToString(bytes);
+ }
+ };
+
+ public static final Decoder DECODER = new Decoder() {
+ @Override
+ public byte[] decode(String string) {
+ try {
+ return (byte[]) PARSE.invokeExact(string);
+ } catch (Throwable throwable) {
+ // Invoked method doesn't throw checked exceptions, so safe to cast
+ throw (RuntimeException) throwable;
+ }
+ }
+ };
+
+ private static String encodeToString(byte[] bytes) {
+ try {
+ return (String) PRINT.invokeExact(bytes);
+ } catch (Throwable throwable) {
+ // Invoked method doesn't throw checked exceptions, so safe to cast
+ throw (RuntimeException) throwable;
+ }
+ }
+
+ @Override
+ public Encoder urlEncoderNoPadding() {
+ return URL_ENCODER_NO_PADDING;
+ }
+
+ @Override
+ public Encoder encoder() {
+ return ENCODER;
+ }
+
+ @Override
+ public Decoder decoder() {
+ return DECODER;
+ }
+ }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Crc32C.java b/clients/src/main/java/org/apache/kafka/common/utils/Crc32C.java
index b547beb1f81..dfe22e8b351 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/Crc32C.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Crc32C.java
@@ -32,7 +32,7 @@ import java.util.zip.Checksum;
*
* NOTE: This class is intended for INTERNAL usage only within Kafka.
*/
-public class Crc32C {
+public final class Crc32C {
private static final ChecksumFactory CHECKSUM_FACTORY;
@@ -43,6 +43,8 @@ public class Crc32C {
CHECKSUM_FACTORY = new PureJavaChecksumFactory();
}
+ private Crc32C() {}
+
/**
* Compute the CRC32C (Castagnoli) of the segment of the byte array given by the specified size and offset
*
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Java.java b/clients/src/main/java/org/apache/kafka/common/utils/Java.java
index b374c245c69..38d95413812 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/Java.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Java.java
@@ -41,6 +41,9 @@ public final class Java {
public static final boolean IS_JAVA9_COMPATIBLE = JVM_MAJOR_VERSION > 1 ||
(JVM_MAJOR_VERSION == 1 && JVM_MINOR_VERSION >= 9);
+ public static final boolean IS_JAVA8_COMPATIBLE = JVM_MAJOR_VERSION > 1 ||
+ (JVM_MAJOR_VERSION == 1 && JVM_MINOR_VERSION >= 8);
+
public static boolean isIBMJdk() {
return System.getProperty("java.vendor").contains("IBM");
}
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index 75f8cf70169..ee82f9a350c 100755
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -284,14 +284,14 @@ public class Utils {
* Instantiate the class
*/
public static T newInstance(Class c) {
+ if (c == null)
+ throw new KafkaException("class cannot be null");
try {
- return c.newInstance();
- } catch (IllegalAccessException e) {
+ return c.getDeclaredConstructor().newInstance();
+ } catch (NoSuchMethodException e) {
+ throw new KafkaException("Could not find a public no-argument constructor for " + c.getName(), e);
+ } catch (ReflectiveOperationException | RuntimeException e) {
throw new KafkaException("Could not instantiate class " + c.getName(), e);
- } catch (InstantiationException e) {
- throw new KafkaException("Could not instantiate class " + c.getName() + " Does it have a public no-argument constructor?", e);
- } catch (NullPointerException e) {
- throw new KafkaException("Requested class was null", e);
}
}
diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/TestDigestLoginModule.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/TestDigestLoginModule.java
index 8f9bce5b9d0..f1ef740e5db 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/TestDigestLoginModule.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/TestDigestLoginModule.java
@@ -105,9 +105,10 @@ public class TestDigestLoginModule extends PlainLoginModule {
private static final long serialVersionUID = 1L;
+ @SuppressWarnings("deprecation")
protected DigestSaslServerProvider() {
super("Test SASL/Digest-MD5 Server Provider", 1.0, "Test SASL/Digest-MD5 Server Provider for Kafka");
- super.put("SaslServerFactory.DIGEST-MD5", TestDigestLoginModule.DigestSaslServerFactory.class.getName());
+ put("SaslServerFactory.DIGEST-MD5", TestDigestLoginModule.DigestSaslServerFactory.class.getName());
}
}
}
diff --git a/clients/src/test/java/org/apache/kafka/common/security/scram/ScramFormatterTest.java b/clients/src/test/java/org/apache/kafka/common/security/scram/ScramFormatterTest.java
index 371031bcb0d..a86e0ddb800 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/scram/ScramFormatterTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/scram/ScramFormatterTest.java
@@ -16,10 +16,9 @@
*/
package org.apache.kafka.common.security.scram;
+import org.apache.kafka.common.utils.Base64;
import org.junit.Test;
-import javax.xml.bind.DatatypeConverter;
-
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
@@ -55,13 +54,13 @@ public class ScramFormatterTest {
String serverNonce = serverFirst.nonce().substring(clientNonce.length());
assertEquals("%hvYDpWUa2RaTCAfuxFIlj)hNlF$k0", serverNonce);
byte[] salt = serverFirst.salt();
- assertArrayEquals(DatatypeConverter.parseBase64Binary("W22ZaJ0SNY7soEsUEjb6gQ=="), salt);
+ assertArrayEquals(Base64.decoder().decode("W22ZaJ0SNY7soEsUEjb6gQ=="), salt);
int iterations = serverFirst.iterations();
assertEquals(4096, iterations);
byte[] channelBinding = clientFinal.channelBinding();
- assertArrayEquals(DatatypeConverter.parseBase64Binary("biws"), channelBinding);
+ assertArrayEquals(Base64.decoder().decode("biws"), channelBinding);
byte[] serverSignature = serverFinal.serverSignature();
- assertArrayEquals(DatatypeConverter.parseBase64Binary("6rriTRBi23WpRR/wtup+mMhUZUn/dB5nLTJRsjl95G4="), serverSignature);
+ assertArrayEquals(Base64.decoder().decode("6rriTRBi23WpRR/wtup+mMhUZUn/dB5nLTJRsjl95G4="), serverSignature);
byte[] saltedPassword = formatter.saltedPassword(password, salt, iterations);
byte[] serverKey = formatter.serverKey(saltedPassword);
diff --git a/clients/src/test/java/org/apache/kafka/common/security/scram/ScramMessagesTest.java b/clients/src/test/java/org/apache/kafka/common/security/scram/ScramMessagesTest.java
index de97ce2e144..89e626095e8 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/scram/ScramMessagesTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/scram/ScramMessagesTest.java
@@ -16,13 +16,13 @@
*/
package org.apache.kafka.common.security.scram;
+import org.apache.kafka.common.utils.Base64;
import org.junit.Before;
import org.junit.Test;
import java.nio.charset.StandardCharsets;
import javax.security.sasl.SaslException;
-import javax.xml.bind.DatatypeConverter;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
@@ -288,11 +288,11 @@ public class ScramMessagesTest {
}
private String randomBytesAsString() {
- return DatatypeConverter.printBase64Binary(formatter.secureRandomBytes());
+ return Base64.encoder().encodeToString(formatter.secureRandomBytes());
}
private byte[] toBytes(String base64Str) {
- return DatatypeConverter.parseBase64Binary(base64Str);
+ return Base64.decoder().decode(base64Str);
};
private void checkClientFirstMessage(ClientFirstMessage message, String saslName, String nonce, String authzid) {
@@ -303,14 +303,14 @@ public class ScramMessagesTest {
private void checkServerFirstMessage(ServerFirstMessage message, String nonce, String salt, int iterations) {
assertEquals(nonce, message.nonce());
- assertArrayEquals(DatatypeConverter.parseBase64Binary(salt), message.salt());
+ assertArrayEquals(Base64.decoder().decode(salt), message.salt());
assertEquals(iterations, message.iterations());
}
private void checkClientFinalMessage(ClientFinalMessage message, String channelBinding, String nonce, String proof) {
- assertArrayEquals(DatatypeConverter.parseBase64Binary(channelBinding), message.channelBinding());
+ assertArrayEquals(Base64.decoder().decode(channelBinding), message.channelBinding());
assertEquals(nonce, message.nonce());
- assertArrayEquals(DatatypeConverter.parseBase64Binary(proof), message.proof());
+ assertArrayEquals(Base64.decoder().decode(proof), message.proof());
}
private void checkServerFinalMessage(ServerFinalMessage message, String error, String serverSignature) {
@@ -318,7 +318,7 @@ public class ScramMessagesTest {
if (serverSignature == null)
assertNull("Unexpected server signature", message.serverSignature());
else
- assertArrayEquals(DatatypeConverter.parseBase64Binary(serverSignature), message.serverSignature());
+ assertArrayEquals(Base64.decoder().decode(serverSignature), message.serverSignature());
}
@SuppressWarnings("unchecked")
diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
index a27d5cb1b64..47ca82308a8 100644
--- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java
+++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
@@ -22,11 +22,11 @@ import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.utils.Base64;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.xml.bind.DatatypeConverter;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -292,7 +292,7 @@ public class TestUtils {
// Convert into normal variant and add padding at the end.
String originalClusterId = String.format("%s==", clusterId.replace("_", "/").replace("-", "+"));
- byte[] decodedUuid = DatatypeConverter.parseBase64Binary(originalClusterId);
+ byte[] decodedUuid = Base64.decoder().decode(originalClusterId);
// We expect 16 bytes, same as the input UUID.
assertEquals(decodedUuid.length, 16);
diff --git a/core/src/main/scala/kafka/Kafka.scala b/core/src/main/scala/kafka/Kafka.scala
index 1c16c9680c6..7c5b420bc42 100755
--- a/core/src/main/scala/kafka/Kafka.scala
+++ b/core/src/main/scala/kafka/Kafka.scala
@@ -20,6 +20,7 @@ package kafka
import java.util.Properties
import joptsimple.OptionParser
+import kafka.utils.Implicits._
import kafka.server.{KafkaServer, KafkaServerStartable}
import kafka.utils.{CommandLineUtils, Exit, Logging}
import org.apache.kafka.common.utils.Utils
@@ -47,7 +48,7 @@ object Kafka extends Logging {
CommandLineUtils.printUsageAndDie(optionParser, "Found non argument parameters: " + options.nonOptionArguments().toArray.mkString(","))
}
- props.putAll(CommandLineUtils.parseKeyValueArgs(options.valuesOf(overrideOpt).asScala))
+ props ++= CommandLineUtils.parseKeyValueArgs(options.valuesOf(overrideOpt).asScala)
}
props
}
diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala
index b18dcc99937..366667b4e4c 100644
--- a/core/src/main/scala/kafka/admin/ConfigCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala
@@ -24,6 +24,7 @@ import kafka.common.InvalidConfigException
import kafka.log.LogConfig
import kafka.server.{ConfigEntityName, ConfigType, DynamicConfig, QuotaId}
import kafka.utils.{CommandLineUtils, ZkUtils}
+import kafka.utils.Implicits._
import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.common.security.scram._
import org.apache.kafka.common.utils.Utils
@@ -95,7 +96,7 @@ object ConfigCommand extends Config {
if (invalidConfigs.nonEmpty)
throw new InvalidConfigException(s"Invalid config(s): ${invalidConfigs.mkString(",")}")
- configs.putAll(configsToBeAdded)
+ configs ++= configsToBeAdded
configsToBeDeleted.foreach(configs.remove(_))
utils.changeConfigs(zkUtils, entityType, entityName, configs)
diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
index d1cd80324b2..1100e87b726 100755
--- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
@@ -25,6 +25,7 @@ import joptsimple.{OptionParser, OptionSpec}
import kafka.api.{OffsetFetchRequest, OffsetFetchResponse, OffsetRequest, PartitionOffsetRequestInfo}
import kafka.client.ClientUtils
import kafka.common.{OffsetMetadataAndError, TopicAndPartition}
+import kafka.utils.Implicits._
import kafka.consumer.SimpleConsumer
import kafka.utils._
import org.I0Itec.zkclient.exception.ZkNoNodeException
@@ -514,7 +515,8 @@ object ConsumerGroupCommand extends Logging {
properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000")
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deserializer)
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer)
- if (opts.options.has(opts.commandConfigOpt)) properties.putAll(Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt)))
+ if (opts.options.has(opts.commandConfigOpt))
+ properties ++= Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt))
new KafkaConsumer(properties)
}
diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala
index 882fe2165c0..2d3a76c0482 100755
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -21,6 +21,7 @@ import java.util.Properties
import joptsimple._
import kafka.common.AdminCommandFailedException
+import kafka.utils.Implicits._
import kafka.consumer.Whitelist
import kafka.log.LogConfig
import kafka.server.ConfigType
@@ -130,7 +131,7 @@ object TopicCommand extends Logging {
val configsToBeAdded = parseTopicConfigsToBeAdded(opts)
val configsToBeDeleted = parseTopicConfigsToBeDeleted(opts)
// compile the final set of configs
- configs.putAll(configsToBeAdded)
+ configs ++= configsToBeAdded
configsToBeDeleted.foreach(config => configs.remove(config))
AdminUtils.changeTopicConfig(zkUtils, topic, configs)
println("Updated config for topic \"%s\".".format(topic))
diff --git a/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala b/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
index e1d6e020e82..82b7dac047e 100644
--- a/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
+++ b/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
@@ -62,7 +62,7 @@ object ZkSecurityMigrator extends Logging {
+ "authentication.")
def run(args: Array[String]) {
- var jaasFile = System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)
+ val jaasFile = System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)
val parser = new OptionParser(false)
val zkAclOpt = parser.accepts("zookeeper.acl", "Indicates whether to make the Kafka znodes in ZooKeeper secure or unsecure."
+ " The options are 'secure' and 'unsecure'").withRequiredArg().ofType(classOf[String])
diff --git a/core/src/main/scala/kafka/api/FetchResponse.scala b/core/src/main/scala/kafka/api/FetchResponse.scala
index b5ef9126b01..ae2f19c4bcc 100644
--- a/core/src/main/scala/kafka/api/FetchResponse.scala
+++ b/core/src/main/scala/kafka/api/FetchResponse.scala
@@ -33,7 +33,7 @@ object FetchResponsePartitionData {
val messageSetSize = buffer.getInt
val messageSetBuffer = buffer.slice()
messageSetBuffer.limit(messageSetSize)
- buffer.position(buffer.position + messageSetSize)
+ buffer.position(buffer.position() + messageSetSize)
new FetchResponsePartitionData(error, hw, new ByteBufferMessageSet(messageSetBuffer))
}
diff --git a/core/src/main/scala/kafka/api/ProducerRequest.scala b/core/src/main/scala/kafka/api/ProducerRequest.scala
index 921e01197e2..9cdb14b6ba2 100644
--- a/core/src/main/scala/kafka/api/ProducerRequest.scala
+++ b/core/src/main/scala/kafka/api/ProducerRequest.scala
@@ -93,7 +93,7 @@ case class ProducerRequest(versionId: Short = ProducerRequest.CurrentVersion,
val partitionMessageData = partitionAndData._2
val bytes = partitionMessageData.buffer
buffer.putInt(partition)
- buffer.putInt(bytes.limit)
+ buffer.putInt(bytes.limit())
buffer.put(bytes)
bytes.rewind
})
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 780ae52c94f..4b07751bd4c 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -764,7 +764,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
topicDeleted || successful
}.keys
reassignedPartitions.foreach(p => removePartitionFromReassignedPartitions(p))
- var partitionsToReassign: mutable.Map[TopicAndPartition, ReassignedPartitionsContext] = new mutable.HashMap
+ val partitionsToReassign = mutable.Map[TopicAndPartition, ReassignedPartitionsContext]()
partitionsToReassign ++= partitionsBeingReassigned
partitionsToReassign --= reassignedPartitions
controllerContext.partitionsBeingReassigned ++= partitionsToReassign
diff --git a/core/src/main/scala/kafka/javaapi/TopicMetadata.scala b/core/src/main/scala/kafka/javaapi/TopicMetadata.scala
index c9ec48a2463..051445ceba5 100644
--- a/core/src/main/scala/kafka/javaapi/TopicMetadata.scala
+++ b/core/src/main/scala/kafka/javaapi/TopicMetadata.scala
@@ -17,7 +17,6 @@
package kafka.javaapi
import kafka.cluster.BrokerEndPoint
-import org.apache.kafka.common.protocol.Errors
import scala.collection.JavaConverters._
private[javaapi] object MetadataListImplicits {
diff --git a/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala
index e3b2ec18890..467d0a67c9c 100644
--- a/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala
@@ -85,7 +85,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
val scalaReturn = underlying.consume(scalaTopicCountMap, keyDecoder, valueDecoder)
val ret = new java.util.HashMap[String,java.util.List[KafkaStream[K,V]]]
for ((topic, streams) <- scalaReturn) {
- var javaStreamList = new java.util.ArrayList[KafkaStream[K,V]]
+ val javaStreamList = new java.util.ArrayList[KafkaStream[K,V]]
for (stream <- streams)
javaStreamList.add(stream)
ret.put(topic, javaStreamList)
diff --git a/core/src/main/scala/kafka/log/AbstractIndex.scala b/core/src/main/scala/kafka/log/AbstractIndex.scala
index d569ad9def9..2d7cc7e5d20 100644
--- a/core/src/main/scala/kafka/log/AbstractIndex.scala
+++ b/core/src/main/scala/kafka/log/AbstractIndex.scala
@@ -17,7 +17,7 @@
package kafka.log
-import java.io.{File, IOException, RandomAccessFile}
+import java.io.{File, RandomAccessFile}
import java.nio.{ByteBuffer, MappedByteBuffer}
import java.nio.channels.FileChannel
import java.util.concurrent.locks.{Lock, ReentrantLock}
@@ -69,7 +69,7 @@ abstract class AbstractIndex[K, V](@volatile var file: File, val baseOffset: Lon
idx.position(0)
else
// if this is a pre-existing index, assume it is valid and set position to last entry
- idx.position(roundDownToExactMultiple(idx.limit, entrySize))
+ idx.position(roundDownToExactMultiple(idx.limit(), entrySize))
idx
} finally {
CoreUtils.swallow(raf.close())
@@ -80,11 +80,11 @@ abstract class AbstractIndex[K, V](@volatile var file: File, val baseOffset: Lon
* The maximum number of entries this index can hold
*/
@volatile
- private[this] var _maxEntries = mmap.limit / entrySize
+ private[this] var _maxEntries = mmap.limit() / entrySize
/** The number of entries in this index */
@volatile
- protected var _entries = mmap.position / entrySize
+ protected var _entries = mmap.position() / entrySize
/**
* True iff there are no more slots available in this index
@@ -105,7 +105,7 @@ abstract class AbstractIndex[K, V](@volatile var file: File, val baseOffset: Lon
inLock(lock) {
val raf = new RandomAccessFile(file, "rw")
val roundedNewSize = roundDownToExactMultiple(newSize, entrySize)
- val position = mmap.position
+ val position = mmap.position()
/* Windows won't let us modify the file length while the file is mmapped :-( */
if (OperatingSystem.IS_WINDOWS)
@@ -113,7 +113,7 @@ abstract class AbstractIndex[K, V](@volatile var file: File, val baseOffset: Lon
try {
raf.setLength(roundedNewSize)
mmap = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, roundedNewSize)
- _maxEntries = mmap.limit / entrySize
+ _maxEntries = mmap.limit() / entrySize
mmap.position(position)
} finally {
CoreUtils.swallow(raf.close())
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index 85d6487cfe9..4f53b41df4f 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -550,7 +550,7 @@ private[log] class Cleaner(val id: Int,
// if any messages are to be retained, write them out
val outputBuffer = result.output
- if (outputBuffer.position > 0) {
+ if (outputBuffer.position() > 0) {
outputBuffer.flip()
val retained = MemoryRecords.readableRecords(outputBuffer)
dest.append(firstOffset = retained.batches.iterator.next().baseOffset,
@@ -558,11 +558,11 @@ private[log] class Cleaner(val id: Int,
largestTimestamp = result.maxTimestamp,
shallowOffsetOfMaxTimestamp = result.shallowOffsetOfMaxTimestamp,
records = retained)
- throttler.maybeThrottle(outputBuffer.limit)
+ throttler.maybeThrottle(outputBuffer.limit())
}
// if we read bytes but didn't get even one complete message, our I/O buffer is too small, grow it and try again
- if (readBuffer.limit > 0 && result.messagesRead == 0)
+ if (readBuffer.limit() > 0 && result.messagesRead == 0)
growBuffers(maxLogMessageSize)
}
restoreBuffers()
diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala
index 8f82e65a751..c47d2290d24 100755
--- a/core/src/main/scala/kafka/log/LogConfig.scala
+++ b/core/src/main/scala/kafka/log/LogConfig.scala
@@ -23,6 +23,7 @@ import scala.collection.JavaConverters._
import kafka.api.ApiVersion
import kafka.message.{BrokerCompressionCodec, Message}
import kafka.server.{KafkaConfig, ThrottledReplicaListValidator}
+import kafka.utils.Implicits._
import org.apache.kafka.common.errors.InvalidConfigurationException
import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, TopicConfig}
import org.apache.kafka.common.record.TimestampType
@@ -269,8 +270,8 @@ object LogConfig {
*/
def fromProps(defaults: java.util.Map[_ <: Object, _ <: Object], overrides: Properties): LogConfig = {
val props = new Properties()
- props.putAll(defaults)
- props.putAll(overrides)
+ defaults.asScala.foreach { case (k, v) => props.put(k, v) }
+ props ++= overrides
LogConfig(props)
}
diff --git a/core/src/main/scala/kafka/log/OffsetIndex.scala b/core/src/main/scala/kafka/log/OffsetIndex.scala
index 53c18fe1835..c1569726fe9 100755
--- a/core/src/main/scala/kafka/log/OffsetIndex.scala
+++ b/core/src/main/scala/kafka/log/OffsetIndex.scala
@@ -58,7 +58,7 @@ class OffsetIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writabl
private[this] var _lastOffset = lastEntry.offset
debug("Loaded index file %s with maxEntries = %d, maxIndexSize = %d, entries = %d, lastOffset = %d, file position = %d"
- .format(file.getAbsolutePath, maxEntries, maxIndexSize, _entries, _lastOffset, mmap.position))
+ .format(file.getAbsolutePath, maxEntries, maxIndexSize, _entries, _lastOffset, mmap.position()))
/**
* The last entry in the index
@@ -144,7 +144,7 @@ class OffsetIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writabl
mmap.putInt(position)
_entries += 1
_lastOffset = offset
- require(_entries * entrySize == mmap.position, entries + " entries but file position in index is " + mmap.position + ".")
+ require(_entries * entrySize == mmap.position(), entries + " entries but file position in index is " + mmap.position() + ".")
} else {
throw new InvalidOffsetException("Attempt to append an offset (%d) to position %d no larger than the last offset appended (%d) to %s."
.format(offset, entries, _lastOffset, file.getAbsolutePath))
diff --git a/core/src/main/scala/kafka/log/OffsetMap.scala b/core/src/main/scala/kafka/log/OffsetMap.scala
index 8b493c2c6ce..219bed3c6d2 100755
--- a/core/src/main/scala/kafka/log/OffsetMap.scala
+++ b/core/src/main/scala/kafka/log/OffsetMap.scala
@@ -149,7 +149,7 @@ class SkimpyOffsetMap(val memory: Int, val hashAlgorithm: String = "MD5") extend
this.lookups = 0L
this.probes = 0L
this.lastOffset = -1L
- Arrays.fill(bytes.array, bytes.arrayOffset, bytes.arrayOffset + bytes.limit, 0.toByte)
+ Arrays.fill(bytes.array, bytes.arrayOffset, bytes.arrayOffset + bytes.limit(), 0.toByte)
}
/**
diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala b/core/src/main/scala/kafka/log/ProducerStateManager.scala
index ce56a6c57a2..974a50e9ffb 100644
--- a/core/src/main/scala/kafka/log/ProducerStateManager.scala
+++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala
@@ -338,12 +338,12 @@ object ProducerStateManager {
buffer.flip()
// now fill in the CRC
- val crc = Crc32C.compute(buffer, ProducerEntriesOffset, buffer.limit - ProducerEntriesOffset)
+ val crc = Crc32C.compute(buffer, ProducerEntriesOffset, buffer.limit() - ProducerEntriesOffset)
ByteUtils.writeUnsignedInt(buffer, CrcOffset, crc)
val fos = new FileOutputStream(file)
try {
- fos.write(buffer.array, buffer.arrayOffset, buffer.limit)
+ fos.write(buffer.array, buffer.arrayOffset, buffer.limit())
} finally {
fos.close()
}
diff --git a/core/src/main/scala/kafka/log/TimeIndex.scala b/core/src/main/scala/kafka/log/TimeIndex.scala
index 6c9c32b0d43..aab93005a91 100644
--- a/core/src/main/scala/kafka/log/TimeIndex.scala
+++ b/core/src/main/scala/kafka/log/TimeIndex.scala
@@ -126,7 +126,7 @@ class TimeIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writable:
mmap.putLong(timestamp)
mmap.putInt((offset - baseOffset).toInt)
_entries += 1
- require(_entries * entrySize == mmap.position, _entries + " entries but file position in index is " + mmap.position + ".")
+ require(_entries * entrySize == mmap.position(), _entries + " entries but file position in index is " + mmap.position() + ".")
}
}
}
diff --git a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
index c6fa1ce7318..62e2125fabd 100644
--- a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
+++ b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
@@ -174,7 +174,7 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi
/**
* The total number of bytes in this message set, including any partial trailing messages
*/
- def sizeInBytes: Int = buffer.limit
+ def sizeInBytes: Int = buffer.limit()
/**
* The total number of bytes in this message set not including any partial, trailing messages
diff --git a/core/src/main/scala/kafka/message/Message.scala b/core/src/main/scala/kafka/message/Message.scala
index 35309290145..a46990160b6 100755
--- a/core/src/main/scala/kafka/message/Message.scala
+++ b/core/src/main/scala/kafka/message/Message.scala
@@ -222,7 +222,7 @@ class Message(val buffer: ByteBuffer,
* Compute the checksum of the message from the message contents
*/
def computeChecksum: Long =
- Crc32.crc32(buffer, MagicOffset, buffer.limit - MagicOffset)
+ Crc32.crc32(buffer, MagicOffset, buffer.limit() - MagicOffset)
/**
* Retrieve the previously computed CRC for this message
@@ -245,7 +245,7 @@ class Message(val buffer: ByteBuffer,
/**
* The complete serialized size of this message in bytes (including crc, header attributes, etc)
*/
- def size: Int = buffer.limit
+ def size: Int = buffer.limit()
/**
* The position where the key size is stored.
diff --git a/core/src/main/scala/kafka/producer/ProducerPool.scala b/core/src/main/scala/kafka/producer/ProducerPool.scala
index 60cef6397f7..6d4e4b7ccab 100644
--- a/core/src/main/scala/kafka/producer/ProducerPool.scala
+++ b/core/src/main/scala/kafka/producer/ProducerPool.scala
@@ -23,6 +23,7 @@ import kafka.api.TopicMetadata
import kafka.cluster.BrokerEndPoint
import kafka.common.UnavailableProducerException
import kafka.utils.Logging
+import kafka.utils.Implicits._
import scala.collection.mutable.HashMap
@@ -35,7 +36,7 @@ object ProducerPool {
val props = new Properties()
props.put("host", broker.host)
props.put("port", broker.port.toString)
- props.putAll(config.props.props)
+ props ++= config.props.props
new SyncProducer(new SyncProducerConfig(props))
}
}
diff --git a/core/src/main/scala/kafka/producer/SyncProducer.scala b/core/src/main/scala/kafka/producer/SyncProducer.scala
index f02648f53e4..04527c8975b 100644
--- a/core/src/main/scala/kafka/producer/SyncProducer.scala
+++ b/core/src/main/scala/kafka/producer/SyncProducer.scala
@@ -58,7 +58,7 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging {
*/
if (logger.isDebugEnabled) {
val buffer = new RequestOrResponseSend("", request).buffer
- trace("verifying sendbuffer of size " + buffer.limit)
+ trace("verifying sendbuffer of size " + buffer.limit())
val requestTypeId = buffer.getShort()
if(requestTypeId == ApiKeys.PRODUCE.id) {
val request = ProducerRequest.readFrom(buffer)
diff --git a/core/src/main/scala/kafka/security/auth/Operation.scala b/core/src/main/scala/kafka/security/auth/Operation.scala
index a13345ab332..0fa311a2ad9 100644
--- a/core/src/main/scala/kafka/security/auth/Operation.scala
+++ b/core/src/main/scala/kafka/security/auth/Operation.scala
@@ -19,8 +19,6 @@ package kafka.security.auth
import kafka.common.{BaseEnum, KafkaException}
import org.apache.kafka.common.acl.AclOperation
-import scala.util.{Failure, Success, Try}
-
/**
* Different operations a client may perform on kafka resources.
*/
diff --git a/core/src/main/scala/kafka/security/auth/PermissionType.scala b/core/src/main/scala/kafka/security/auth/PermissionType.scala
index c6033512e68..c75e6f6f19d 100644
--- a/core/src/main/scala/kafka/security/auth/PermissionType.scala
+++ b/core/src/main/scala/kafka/security/auth/PermissionType.scala
@@ -19,8 +19,6 @@ package kafka.security.auth
import kafka.common.{BaseEnum, KafkaException}
import org.apache.kafka.common.acl.AclPermissionType
-import scala.util.{Failure, Success, Try}
-
sealed trait PermissionType extends BaseEnum {
val toJava: AclPermissionType
}
diff --git a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
index 03eb9e30fbe..6218a2c4b2f 100644
--- a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
+++ b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
@@ -26,7 +26,6 @@ import kafka.server.KafkaConfig
import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
import kafka.utils._
import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException}
-import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.common.security.auth.KafkaPrincipal
import scala.collection.JavaConverters._
import org.apache.log4j.Logger
diff --git a/core/src/main/scala/kafka/server/ConfigHandler.scala b/core/src/main/scala/kafka/server/ConfigHandler.scala
index 149a1c176f3..79ffde88686 100644
--- a/core/src/main/scala/kafka/server/ConfigHandler.scala
+++ b/core/src/main/scala/kafka/server/ConfigHandler.scala
@@ -25,6 +25,7 @@ import kafka.log.{LogConfig, LogManager}
import kafka.security.CredentialProvider
import kafka.server.Constants._
import kafka.server.QuotaFactory.QuotaManagers
+import kafka.utils.Implicits._
import kafka.utils.Logging
import org.apache.kafka.common.config.ConfigDef.Validator
import org.apache.kafka.common.config.ConfigException
@@ -32,7 +33,6 @@ import org.apache.kafka.common.metrics.Quota
import org.apache.kafka.common.metrics.Quota._
import scala.collection.JavaConverters._
-import scala.collection.mutable
/**
* The ConfigHandler is used to process config change notifications received by the DynamicConfigManager
@@ -55,7 +55,7 @@ class TopicConfigHandler(private val logManager: LogManager, kafkaConfig: KafkaC
if (logs.nonEmpty) {
/* combine the default properties with the overrides in zk to create the new LogConfig */
val props = new Properties()
- props.putAll(logManager.defaultConfig.originals)
+ props ++= logManager.defaultConfig.originals.asScala
topicConfig.asScala.foreach { case (key, value) =>
if (!configNamesToExclude.contains(key)) props.put(key, value)
}
diff --git a/core/src/main/scala/kafka/server/DelayedOperation.scala b/core/src/main/scala/kafka/server/DelayedOperation.scala
index 4ae1b138a30..899739559aa 100644
--- a/core/src/main/scala/kafka/server/DelayedOperation.scala
+++ b/core/src/main/scala/kafka/server/DelayedOperation.scala
@@ -349,7 +349,7 @@ final class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: Stri
def cancel(): List[T] = {
val iter = operations.iterator()
- var cancelled = new ListBuffer[T]()
+ val cancelled = new ListBuffer[T]()
while (iter.hasNext) {
val curr = iter.next()
curr.cancel()
diff --git a/core/src/main/scala/kafka/server/DelayedOperationKey.scala b/core/src/main/scala/kafka/server/DelayedOperationKey.scala
index 19333396a2b..bfa7fc29ea5 100644
--- a/core/src/main/scala/kafka/server/DelayedOperationKey.scala
+++ b/core/src/main/scala/kafka/server/DelayedOperationKey.scala
@@ -17,7 +17,6 @@
package kafka.server
-import kafka.common.TopicAndPartition
import org.apache.kafka.common.TopicPartition
/**
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index a900e6ddd0f..89ba641a759 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -26,6 +26,7 @@ import kafka.coordinator.group.OffsetConfig
import kafka.coordinator.transaction.{TransactionLog, TransactionStateManager}
import kafka.message.{BrokerCompressionCodec, CompressionCodec, Message, MessageSet}
import kafka.utils.CoreUtils
+import kafka.utils.Implicits._
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.common.config.ConfigDef.ValidList
import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, ConfigException, SaslConfigs, SslConfigs, TopicConfig}
@@ -875,8 +876,8 @@ object KafkaConfig {
def fromProps(defaults: Properties, overrides: Properties, doLog: Boolean): KafkaConfig = {
val props = new Properties()
- props.putAll(defaults)
- props.putAll(overrides)
+ props ++= defaults
+ props ++= overrides
fromProps(props, doLog)
}
diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
index e81dba20c3a..a0818bc0057 100755
--- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
@@ -29,6 +29,7 @@ import kafka.consumer._
import kafka.message._
import kafka.metrics.KafkaMetricsReporter
import kafka.utils._
+import kafka.utils.Implicits._
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.common.errors.WakeupException
import org.apache.kafka.common.record.TimestampType
@@ -173,8 +174,8 @@ object ConsoleConsumer extends Logging {
def getOldConsumerProps(config: ConsumerConfig): Properties = {
val props = new Properties
- props.putAll(config.consumerProps)
- props.putAll(config.extraConsumerProps)
+ props ++= config.consumerProps
+ props ++= config.extraConsumerProps
setAutoOffsetResetValue(config, props)
props.put("zookeeper.connect", config.zkConnectionStr)
@@ -201,8 +202,8 @@ object ConsoleConsumer extends Logging {
def getNewConsumerProps(config: ConsumerConfig): Properties = {
val props = new Properties
- props.putAll(config.consumerProps)
- props.putAll(config.extraConsumerProps)
+ props ++= config.consumerProps
+ props ++= config.extraConsumerProps
setAutoOffsetResetValue(config, props)
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.bootstrapServer)
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer")
diff --git a/core/src/main/scala/kafka/tools/ConsoleProducer.scala b/core/src/main/scala/kafka/tools/ConsoleProducer.scala
index 1b221407b57..39bb0ff9671 100644
--- a/core/src/main/scala/kafka/tools/ConsoleProducer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleProducer.scala
@@ -21,6 +21,7 @@ import kafka.common._
import kafka.message._
import kafka.serializer._
import kafka.utils.{CommandLineUtils, Exit, ToolsUtils}
+import kafka.utils.Implicits._
import kafka.producer.{NewShinyProducer, OldProducer}
import java.util.Properties
import java.io._
@@ -74,7 +75,7 @@ object ConsoleProducer {
def getReaderProps(config: ProducerConfig): Properties = {
val props = new Properties
props.put("topic",config.topic)
- props.putAll(config.cmdLineProps)
+ props ++= config.cmdLineProps
props
}
@@ -106,7 +107,7 @@ object ConsoleProducer {
if (config.options.has(config.producerConfigOpt))
Utils.loadProps(config.options.valueOf(config.producerConfigOpt))
else new Properties
- props.putAll(config.extraProducerProps)
+ props ++= config.extraProducerProps
props
}
diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
index 7bec15f44e8..025617fc6b3 100755
--- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
+++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
@@ -443,14 +443,14 @@ object DumpLogSegments {
}
def recordOutOfOrderIndexTimestamp(file: File, indexTimestamp: Long, prevIndexTimestamp: Long) {
- var outOfOrderSeq = outOfOrderTimestamp.getOrElse(file.getAbsolutePath, new ArrayBuffer[(Long, Long)]())
+ val outOfOrderSeq = outOfOrderTimestamp.getOrElse(file.getAbsolutePath, new ArrayBuffer[(Long, Long)]())
if (outOfOrderSeq.isEmpty)
outOfOrderTimestamp.put(file.getAbsolutePath, outOfOrderSeq)
outOfOrderSeq += ((indexTimestamp, prevIndexTimestamp))
}
def recordShallowOffsetNotFound(file: File, indexOffset: Long, logOffset: Long) {
- var shallowOffsetNotFoundSeq = shallowOffsetNotFound.getOrElse(file.getAbsolutePath, new ArrayBuffer[(Long, Long)]())
+ val shallowOffsetNotFoundSeq = shallowOffsetNotFound.getOrElse(file.getAbsolutePath, new ArrayBuffer[(Long, Long)]())
if (shallowOffsetNotFoundSeq.isEmpty)
shallowOffsetNotFound.put(file.getAbsolutePath, shallowOffsetNotFoundSeq)
shallowOffsetNotFoundSeq += ((indexOffset, logOffset))
diff --git a/core/src/main/scala/kafka/tools/ExportZkOffsets.scala b/core/src/main/scala/kafka/tools/ExportZkOffsets.scala
index 49593c212c5..d8ce9b068ac 100644
--- a/core/src/main/scala/kafka/tools/ExportZkOffsets.scala
+++ b/core/src/main/scala/kafka/tools/ExportZkOffsets.scala
@@ -17,7 +17,7 @@
package kafka.tools
-import java.io.{FileOutputStream, FileWriter, OutputStreamWriter}
+import java.io.{FileOutputStream, OutputStreamWriter}
import java.nio.charset.StandardCharsets
import joptsimple._
diff --git a/core/src/main/scala/kafka/tools/GetOffsetShell.scala b/core/src/main/scala/kafka/tools/GetOffsetShell.scala
index f06c4122008..4104dedb9e0 100644
--- a/core/src/main/scala/kafka/tools/GetOffsetShell.scala
+++ b/core/src/main/scala/kafka/tools/GetOffsetShell.scala
@@ -71,8 +71,8 @@ object GetOffsetShell {
ToolsUtils.validatePortOrDie(parser, brokerList)
val metadataTargetBrokers = ClientUtils.parseBrokerList(brokerList)
val topic = options.valueOf(topicOpt)
- var partitionList = options.valueOf(partitionOpt)
- var time = options.valueOf(timeOpt).longValue
+ val partitionList = options.valueOf(partitionOpt)
+ val time = options.valueOf(timeOpt).longValue
val nOffsets = options.valueOf(nOffsetsOpt).intValue
val maxWaitMs = options.valueOf(maxWaitMsOpt).intValue()
diff --git a/core/src/main/scala/kafka/tools/ImportZkOffsets.scala b/core/src/main/scala/kafka/tools/ImportZkOffsets.scala
index d96569ba721..c345f94dff4 100644
--- a/core/src/main/scala/kafka/tools/ImportZkOffsets.scala
+++ b/core/src/main/scala/kafka/tools/ImportZkOffsets.scala
@@ -22,7 +22,6 @@ import java.nio.charset.StandardCharsets
import joptsimple._
import kafka.utils.{CommandLineUtils, Exit, Logging, ZkUtils}
-import org.I0Itec.zkclient.ZkClient
import org.apache.kafka.common.security.JaasUtils
diff --git a/core/src/main/scala/kafka/tools/JmxTool.scala b/core/src/main/scala/kafka/tools/JmxTool.scala
index 32738215b76..c1221414af7 100644
--- a/core/src/main/scala/kafka/tools/JmxTool.scala
+++ b/core/src/main/scala/kafka/tools/JmxTool.scala
@@ -28,7 +28,7 @@ import joptsimple.OptionParser
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.math._
-import kafka.utils.{CommandLineUtils, Exit, Logging}
+import kafka.utils.{CommandLineUtils , Exit, Logging}
/**
@@ -177,14 +177,14 @@ object JmxTool extends Logging {
}
def queryAttributes(mbsc: MBeanServerConnection, names: Iterable[ObjectName], attributesWhitelist: Option[Array[String]]) = {
- var attributes = new mutable.HashMap[String, Any]()
- for(name <- names) {
+ val attributes = new mutable.HashMap[String, Any]()
+ for (name <- names) {
val mbean = mbsc.getMBeanInfo(name)
- for(attrObj <- mbsc.getAttributes(name, mbean.getAttributes.map(_.getName)).asScala) {
+ for (attrObj <- mbsc.getAttributes(name, mbean.getAttributes.map(_.getName)).asScala) {
val attr = attrObj.asInstanceOf[Attribute]
attributesWhitelist match {
case Some(allowedAttributes) =>
- if(allowedAttributes.contains(attr.getName))
+ if (allowedAttributes.contains(attr.getName))
attributes(name + ":" + attr.getName) = attr.getValue
case None => attributes(name + ":" + attr.getName) = attr.getValue
}
diff --git a/core/src/main/scala/kafka/tools/ProducerPerformance.scala b/core/src/main/scala/kafka/tools/ProducerPerformance.scala
index 0f218319ac3..77f560b1a93 100644
--- a/core/src/main/scala/kafka/tools/ProducerPerformance.scala
+++ b/core/src/main/scala/kafka/tools/ProducerPerformance.scala
@@ -20,6 +20,7 @@ package kafka.tools
import kafka.metrics.KafkaMetricsReporter
import kafka.producer.{NewShinyProducer, OldProducer}
import kafka.utils.{CommandLineUtils, Exit, Logging, ToolsUtils, VerifiableProperties}
+import kafka.utils.Implicits._
import kafka.message.CompressionCodec
import kafka.serializer._
import java.util.concurrent.{CountDownLatch, Executors}
@@ -205,7 +206,7 @@ object ProducerPerformance extends Logging {
val producer =
if (config.useNewProducer) {
import org.apache.kafka.clients.producer.ProducerConfig
- props.putAll(config.producerProps)
+ props ++= config.producerProps
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.brokerList)
props.put(ProducerConfig.SEND_BUFFER_CONFIG, (64 * 1024).toString)
props.put(ProducerConfig.CLIENT_ID_CONFIG, "producer-performance")
@@ -217,7 +218,7 @@ object ProducerPerformance extends Logging {
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
new NewShinyProducer(props)
} else {
- props.putAll(config.producerProps)
+ props ++= config.producerProps
props.put("metadata.broker.list", config.brokerList)
props.put("compression.codec", config.compressionCodec.codec.toString)
props.put("send.buffer.bytes", (64 * 1024).toString)
diff --git a/core/src/main/scala/kafka/tools/ReplayLogProducer.scala b/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
index 5d4cc23ad1d..ca9c111163a 100644
--- a/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
+++ b/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
@@ -18,7 +18,7 @@
package kafka.tools
import joptsimple.OptionParser
-import java.util.concurrent.{Executors, CountDownLatch}
+import java.util.concurrent.CountDownLatch
import java.util.Properties
import kafka.consumer._
import kafka.utils.{ToolsUtils, CommandLineUtils, Logging, ZkUtils}
diff --git a/core/src/main/scala/kafka/utils/CoreUtils.scala b/core/src/main/scala/kafka/utils/CoreUtils.scala
index 0e8855cbc6e..ca753d50081 100755
--- a/core/src/main/scala/kafka/utils/CoreUtils.scala
+++ b/core/src/main/scala/kafka/utils/CoreUtils.scala
@@ -24,7 +24,6 @@ import java.util.concurrent.locks.{Lock, ReadWriteLock}
import java.lang.management._
import java.util.{Properties, UUID}
import javax.management._
-import javax.xml.bind.DatatypeConverter
import org.apache.kafka.common.protocol.SecurityProtocol
@@ -32,7 +31,7 @@ import scala.collection._
import scala.collection.mutable
import kafka.cluster.EndPoint
import org.apache.kafka.common.network.ListenerName
-import org.apache.kafka.common.utils.{KafkaThread, Utils}
+import org.apache.kafka.common.utils.{Base64, KafkaThread, Utils}
/**
* General helper functions!
@@ -279,7 +278,7 @@ object CoreUtils extends Logging {
def generateUuidAsBase64(): String = {
val uuid = UUID.randomUUID()
- urlSafeBase64EncodeNoPadding(getBytesFromUuid(uuid))
+ Base64.urlEncoderNoPadding.encodeToString(getBytesFromUuid(uuid))
}
def getBytesFromUuid(uuid: UUID): Array[Byte] = {
@@ -290,14 +289,6 @@ object CoreUtils extends Logging {
uuidBytes.array
}
- def urlSafeBase64EncodeNoPadding(data: Array[Byte]): String = {
- val base64EncodedUUID = DatatypeConverter.printBase64Binary(data)
- //Convert to URL safe variant by replacing + and / with - and _ respectively.
- val urlSafeBase64EncodedUUID = base64EncodedUUID.replace("+", "-").replace("/", "_")
- // Remove the "==" padding at the end.
- urlSafeBase64EncodedUUID.substring(0, urlSafeBase64EncodedUUID.length - 2)
- }
-
def propsWith(key: String, value: String): Properties = {
propsWith((key, value))
}
diff --git a/core/src/main/scala/kafka/utils/Implicits.scala b/core/src/main/scala/kafka/utils/Implicits.scala
new file mode 100644
index 00000000000..5196d45ea91
--- /dev/null
+++ b/core/src/main/scala/kafka/utils/Implicits.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.utils
+
+import java.util
+import java.util.Properties
+
+import scala.collection.JavaConverters._
+
+/**
+ * In order to have these implicits in scope, add the following import:
+ *
+ * `import kafka.utils.Implicits._`
+ */
+object Implicits {
+
+ /**
+ * The java.util.Properties.putAll override introduced in Java 9 is seen as an overload by the
+ * Scala compiler causing ambiguity errors in some cases. The `++=` methods introduced via
+ * implicits provide a concise alternative.
+ *
+ * See https://github.com/scala/bug/issues/10418 for more details.
+ */
+ implicit class PropertiesOps(properties: Properties) {
+
+ def ++=(props: Properties): Unit =
+ (properties: util.Hashtable[AnyRef, AnyRef]).putAll(props)
+
+ def ++=(map: collection.Map[String, AnyRef]): Unit =
+ (properties: util.Hashtable[AnyRef, AnyRef]).putAll(map.asJava)
+
+ }
+
+}
diff --git a/core/src/main/scala/kafka/utils/ReplicationUtils.scala b/core/src/main/scala/kafka/utils/ReplicationUtils.scala
index 9533ce9372c..cc080552167 100644
--- a/core/src/main/scala/kafka/utils/ReplicationUtils.scala
+++ b/core/src/main/scala/kafka/utils/ReplicationUtils.scala
@@ -18,7 +18,6 @@
package kafka.utils
import kafka.api.LeaderAndIsr
-import kafka.common.TopicAndPartition
import kafka.controller.{IsrChangeNotificationListener, LeaderIsrAndControllerEpoch}
import kafka.utils.ZkUtils._
import org.apache.kafka.common.TopicPartition
diff --git a/core/src/main/scala/kafka/utils/ToolsUtils.scala b/core/src/main/scala/kafka/utils/ToolsUtils.scala
index 1be5a454c2e..50e04f57196 100644
--- a/core/src/main/scala/kafka/utils/ToolsUtils.scala
+++ b/core/src/main/scala/kafka/utils/ToolsUtils.scala
@@ -16,13 +16,9 @@
*/
package kafka.utils
-import java.util
-import java.util.Comparator
-
import joptsimple.OptionParser
import org.apache.kafka.common.{Metric, MetricName}
-import scala.collection.immutable.ListMap
import scala.collection.mutable
object ToolsUtils {
@@ -32,9 +28,8 @@ object ToolsUtils {
hostPort.split(",")
else
Array(hostPort)
- val validHostPort = hostPorts.filter {
- hostPortData =>
- org.apache.kafka.common.utils.Utils.getPort(hostPortData) != null
+ val validHostPort = hostPorts.filter { hostPortData =>
+ org.apache.kafka.common.utils.Utils.getPort(hostPortData) != null
}
val isValid = !validHostPort.isEmpty && validHostPort.size == hostPorts.length
if(!isValid)
diff --git a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
index 012f25440ea..90d7838b541 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
@@ -27,12 +27,12 @@ import kafka.log.LogConfig
import kafka.server.{Defaults, KafkaConfig, KafkaServer}
import org.apache.kafka.clients.admin._
import kafka.utils.{Logging, TestUtils, ZkUtils}
+import kafka.utils.Implicits._
import org.apache.kafka.clients.admin.NewTopic
import org.apache.kafka.common.KafkaFuture
import org.apache.kafka.common.acl.{AccessControlEntry, AclBinding, AclBindingFilter, AclOperation, AclPermissionType}
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.errors.{InvalidRequestException, SecurityDisabledException, TimeoutException, TopicExistsException, UnknownTopicOrPartitionException}
-import org.apache.kafka.common.protocol.ApiKeys
import org.junit.{After, Before, Rule, Test}
import org.apache.kafka.common.requests.MetadataResponse
import org.apache.kafka.common.resource.{Resource, ResourceType}
@@ -235,8 +235,8 @@ class AdminClientIntegrationTest extends KafkaServerTestHarness with Logging {
val brokerResource1 = new ConfigResource(ConfigResource.Type.BROKER, servers(1).config.brokerId.toString)
val brokerResource2 = new ConfigResource(ConfigResource.Type.BROKER, servers(2).config.brokerId.toString)
val configResources = Seq(topicResource1, topicResource2, brokerResource1, brokerResource2)
- var describeResult = client.describeConfigs(configResources.asJava)
- var configs = describeResult.all.get
+ val describeResult = client.describeConfigs(configResources.asJava)
+ val configs = describeResult.all.get
assertEquals(4, configs.size)
@@ -378,7 +378,7 @@ class AdminClientIntegrationTest extends KafkaServerTestHarness with Logging {
if (!config.containsKey(KafkaConfig.SslTruststorePasswordProp))
config.setProperty(KafkaConfig.SslTruststorePasswordProp, "some.invalid.pass")
}
- cfgs.foreach(_.putAll(serverConfig))
+ cfgs.foreach(_ ++= serverConfig)
cfgs.map(KafkaConfig.fromProps)
}
diff --git a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
index 3c36bb02d48..e124468b923 100644
--- a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
@@ -19,7 +19,7 @@ import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
import org.apache.kafka.common.record.TimestampType
import org.apache.kafka.common.serialization.ByteArrayDeserializer
import org.apache.kafka.common.{PartitionInfo, TopicPartition}
-import kafka.utils.{Logging, ShutdownableThread, TestUtils}
+import kafka.utils.{ShutdownableThread, TestUtils}
import kafka.server.KafkaConfig
import org.junit.Assert._
import org.junit.{Before, Test}
diff --git a/core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala b/core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala
index 6c61cd96bf7..2cd0df29bde 100644
--- a/core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala
+++ b/core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala
@@ -25,6 +25,7 @@ import kafka.common.TopicAndPartition
import kafka.integration.KafkaServerTestHarness
import kafka.server._
import kafka.utils._
+import kafka.utils.Implicits._
import org.apache.kafka.clients.consumer._
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.{ClusterResource, ClusterResourceListener, TopicPartition}
@@ -105,7 +106,7 @@ class EndToEndClusterIdTest extends KafkaServerTestHarness {
override def generateConfigs = {
val cfgs = TestUtils.createBrokerConfigs(serverCount, zkConnect, interBrokerSecurityProtocol = Some(securityProtocol),
trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties)
- cfgs.foreach(_.putAll(serverConfig))
+ cfgs.foreach(_ ++= serverConfig)
cfgs.map(KafkaConfig.fromProps)
}
diff --git a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
index 5e3c7abca39..b8dc57beaf4 100644
--- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
@@ -20,6 +20,7 @@ package kafka.api
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
import kafka.utils.TestUtils
+import kafka.utils.Implicits._
import java.util.Properties
import org.apache.kafka.clients.producer.KafkaProducer
@@ -54,7 +55,7 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness {
config.setProperty(KafkaConfig.InterBrokerListenerNameProp, listenerName.value)
config.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp, s"${listenerName.value}:${securityProtocol.name}")
}
- cfgs.foreach(_.putAll(serverConfig))
+ cfgs.foreach(_ ++= serverConfig)
cfgs.map(KafkaConfig.fromProps)
}
@@ -65,10 +66,10 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness {
super.setUp()
producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[org.apache.kafka.common.serialization.ByteArraySerializer])
producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[org.apache.kafka.common.serialization.ByteArraySerializer])
- producerConfig.putAll(producerSecurityProps)
+ producerConfig ++= producerSecurityProps
consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[org.apache.kafka.common.serialization.ByteArrayDeserializer])
consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[org.apache.kafka.common.serialization.ByteArrayDeserializer])
- consumerConfig.putAll(consumerSecurityProps)
+ consumerConfig ++= consumerSecurityProps
for (_ <- 0 until producerCount)
producers += createNewProducer
for (_ <- 0 until consumerCount) {
diff --git a/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala
index aa92f40115a..a11972eec3d 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala
@@ -20,6 +20,7 @@ import kafka.consumer.SimpleConsumer
import kafka.integration.KafkaServerTestHarness
import kafka.server.KafkaConfig
import kafka.utils.{ShutdownableThread, TestUtils}
+import kafka.utils.Implicits._
import org.apache.kafka.clients.producer._
import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
import org.junit.Assert._
@@ -123,7 +124,7 @@ class ProducerBounceTest extends KafkaServerTestHarness {
producerConfig.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true")
producerConfig.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1")
val producerConfigWithCompression = new Properties()
- producerConfigWithCompression.putAll(producerConfig)
+ producerConfigWithCompression ++= producerConfig
producerConfigWithCompression.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4")
val producers = List(
TestUtils.createNewProducer(brokerList, bufferSize = producerBufferSize / 4, retries = 10, props = Some(producerConfig)),
diff --git a/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala
index cc9ee3ee74c..cbe882dabb9 100644
--- a/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala
@@ -19,6 +19,7 @@ package kafka.api
import java.util.Properties
import kafka.utils.TestUtils
+import kafka.utils.Implicits._
import org.apache.kafka.common.protocol.SecurityProtocol
import org.apache.kafka.common.config.SaslConfigs
import org.apache.kafka.common.errors.GroupAuthorizationException
@@ -58,7 +59,7 @@ abstract class SaslEndToEndAuthorizationTest extends EndToEndAuthorizationTest {
val consumer1 = consumers.head
val consumer2Config = new Properties
- consumer2Config.putAll(consumerConfig)
+ consumer2Config ++= consumerConfig
// consumer2 retrieves its credentials from the static JAAS configuration, so we test also this path
consumer2Config.remove(SaslConfigs.SASL_JAAS_CONFIG)
diff --git a/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
index 03afc9e5a85..c2b5993e4ce 100644
--- a/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
@@ -18,7 +18,7 @@ import kafka.security.auth.{All, Allow, Alter, AlterConfigs, Authorizer, Cluster
import org.apache.kafka.common.protocol.SecurityProtocol
import kafka.server.KafkaConfig
import kafka.utils.{CoreUtils, JaasTestUtils, TestUtils}
-import org.apache.kafka.clients.admin.{AdminClient, CreateAclsOptions, DeleteAclsOptions, KafkaAdminClient}
+import org.apache.kafka.clients.admin.{AdminClient, CreateAclsOptions, DeleteAclsOptions}
import org.apache.kafka.common.acl.{AccessControlEntry, AccessControlEntryFilter, AclBinding, AclBindingFilter, AclOperation, AclPermissionType}
import org.apache.kafka.common.errors.{ClusterAuthorizationException, InvalidRequestException}
import org.apache.kafka.common.resource.{Resource, ResourceFilter, ResourceType}
diff --git a/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala b/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala
index 4d879d0aa0a..40ec29343ec 100644
--- a/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala
+++ b/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala
@@ -26,6 +26,7 @@ import kafka.api.SaslSetup
import kafka.coordinator.group.OffsetConfig
import kafka.utils.JaasTestUtils.JaasSection
import kafka.utils.TestUtils
+import kafka.utils.Implicits._
import kafka.zk.ZooKeeperTestHarness
import org.apache.kafka.clients.consumer.{ConsumerRecord, KafkaConsumer}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
@@ -84,7 +85,7 @@ abstract class MultipleListenersWithSameSecurityProtocolBaseTest extends ZooKeep
props.put(KafkaConfig.SaslEnabledMechanismsProp, kafkaServerSaslMechanisms.mkString(","))
props.put(KafkaConfig.SaslKerberosServiceNameProp, "kafka")
- props.putAll(TestUtils.sslConfigs(Mode.SERVER, false, Some(trustStoreFile), s"server$brokerId"))
+ props ++= TestUtils.sslConfigs(Mode.SERVER, false, Some(trustStoreFile), s"server$brokerId")
// set listener-specific configs and set an invalid path for the global config to verify that the overrides work
Seq(SecureInternal, SecureExternal).foreach { listenerName =>
diff --git a/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala b/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala
index 8a26cac1c45..dd6c951d46a 100644
--- a/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala
+++ b/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala
@@ -169,13 +169,12 @@ object ReplicationQuotasTestRig {
def logOutput(config: ExperimentDef, replicas: Map[Int, Seq[Int]], newAssignment: Map[TopicAndPartition, Seq[Int]]): Unit = {
val actual = zkUtils.getPartitionAssignmentForTopics(Seq(topicName))(topicName)
- val existing = zkUtils.getReplicaAssignmentForTopics(newAssignment.map(_._1.topic).toSeq)
//Long stats
println("The replicas are " + replicas.toSeq.sortBy(_._1).map("\n" + _))
println("This is the current replica assignment:\n" + actual.toSeq)
println("proposed assignment is: \n" + newAssignment)
- println("This is the assigment we eneded up with" + actual)
+ println("This is the assignment we ended up with" + actual)
//Test Stats
println(s"numBrokers: ${config.brokers}")
diff --git a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
index e05f29d533b..16325ee4ec5 100755
--- a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
+++ b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
@@ -184,7 +184,7 @@ object TestLinearWriteSpeed {
def write(): Int = {
buffer.put(content)
content.rewind()
- content.limit
+ content.limit()
}
def close() {
raf.close()
@@ -198,7 +198,7 @@ object TestLinearWriteSpeed {
def write(): Int = {
channel.write(content)
content.rewind()
- content.limit
+ content.limit()
}
def close() {
raf.close()
diff --git a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
index 445ee09a832..af77c67b613 100644
--- a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
@@ -24,7 +24,7 @@ import kafka.server.{ConfigEntityName, QuotaId}
import kafka.utils.{Logging, ZkUtils}
import kafka.zk.ZooKeeperTestHarness
-import org.apache.kafka.common.security.scram.{ScramCredential, ScramCredentialUtils, ScramMechanism}
+import org.apache.kafka.common.security.scram.ScramCredentialUtils
import org.easymock.EasyMock
import org.junit.Assert._
import org.junit.Test
diff --git a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
index 09b1e7550d5..c36400b27a5 100644
--- a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
@@ -17,7 +17,7 @@
package kafka.consumer
-import java.util.{Collections, Properties}
+import java.util.Properties
import org.junit.Assert._
import kafka.common.MessageStreamsExistException
@@ -28,7 +28,6 @@ import kafka.serializer._
import kafka.server._
import kafka.utils.TestUtils._
import kafka.utils._
-import org.I0Itec.zkclient.ZkClient
import org.apache.log4j.{Level, Logger}
import org.junit.{Test, After, Before}
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
index 9e7fb13886a..e7fbb83ade8 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
@@ -422,12 +422,12 @@ class TransactionMarkerChannelManagerTest {
@Test
def shouldCreateMetricsOnStarting(): Unit = {
- val metrics = Metrics.defaultRegistry.allMetrics
+ val metrics = Metrics.defaultRegistry.allMetrics.asScala
- assertEquals(1, Metrics.defaultRegistry.allMetrics.asScala
+ assertEquals(1, metrics
.filterKeys(_.getMBeanName == "kafka.coordinator.transaction:type=TransactionMarkerChannelManager,name=UnknownDestinationQueueSize")
.size)
- assertEquals(1, Metrics.defaultRegistry.allMetrics.asScala
+ assertEquals(1, metrics
.filterKeys(_.getMBeanName == "kafka.coordinator.transaction:type=TransactionMarkerChannelManager,name=LogAppendRetryQueueSize")
.size)
}
diff --git a/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala
index 34baf89ede0..bff27006e28 100644
--- a/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala
@@ -22,6 +22,7 @@ import java.util.Properties
import kafka.server.{BrokerTopicStats, LogDirFailureChannel}
import kafka.utils.{MockTime, Pool, TestUtils}
+import kafka.utils.Implicits._
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.utils.Utils
import org.junit.After
@@ -66,7 +67,7 @@ abstract class AbstractLogCleanerIntegrationTest {
props.put(LogConfig.MinCleanableDirtyRatioProp, minCleanableDirtyRatio: java.lang.Float)
props.put(LogConfig.MessageTimestampDifferenceMaxMsProp, Long.MaxValue.toString)
props.put(LogConfig.MinCompactionLagMsProp, compactionLag: java.lang.Long)
- props.putAll(propertyOverrides)
+ props ++= propertyOverrides
props
}
diff --git a/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala b/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
index 5e224332758..00f9dc9f396 100644
--- a/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
+++ b/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
@@ -104,7 +104,7 @@ class ByteBufferMessageSetTest extends BaseMessageSetTestCases {
val emptyMessageList : List[Message] = Nil
val emptyMessageSet = new ByteBufferMessageSet(NoCompressionCodec, emptyMessageList: _*)
val regularMessgeSet = new ByteBufferMessageSet(NoCompressionCodec, messageList: _*)
- val buffer = ByteBuffer.allocate(emptyMessageSet.buffer.limit + regularMessgeSet.buffer.limit)
+ val buffer = ByteBuffer.allocate(emptyMessageSet.buffer.limit() + regularMessgeSet.buffer.limit())
buffer.put(emptyMessageSet.buffer)
buffer.put(regularMessgeSet.buffer)
buffer.rewind
@@ -122,7 +122,7 @@ class ByteBufferMessageSetTest extends BaseMessageSetTestCases {
val emptyMessageList : List[Message] = Nil
val emptyMessageSet = new ByteBufferMessageSet(DefaultCompressionCodec, emptyMessageList: _*)
val regularMessgeSet = new ByteBufferMessageSet(DefaultCompressionCodec, messageList: _*)
- val buffer = ByteBuffer.allocate(emptyMessageSet.buffer.limit + regularMessgeSet.buffer.limit)
+ val buffer = ByteBuffer.allocate(emptyMessageSet.buffer.limit() + regularMessgeSet.buffer.limit())
buffer.put(emptyMessageSet.buffer)
buffer.put(regularMessgeSet.buffer)
buffer.rewind
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index ad641c01fdb..d141a26e2b5 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -38,7 +38,8 @@ import kafka.security.auth.{Acl, Authorizer, Resource}
import kafka.serializer.{DefaultEncoder, Encoder, StringEncoder}
import kafka.server._
import kafka.server.checkpoints.OffsetCheckpointFile
-import kafka.utils.ZkUtils._
+import ZkUtils._
+import Implicits._
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.consumer.{ConsumerRecord, KafkaConsumer, OffsetAndMetadata, RangeAssignor}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
@@ -252,10 +253,10 @@ object TestUtils extends Logging {
rack.foreach(props.put(KafkaConfig.RackProp, _))
if (protocolAndPorts.exists { case (protocol, _) => usesSslTransportLayer(protocol) })
- props.putAll(sslConfigs(Mode.SERVER, false, trustStoreFile, s"server$nodeId"))
+ props ++= sslConfigs(Mode.SERVER, false, trustStoreFile, s"server$nodeId")
if (protocolAndPorts.exists { case (protocol, _) => usesSaslAuthentication(protocol) })
- props.putAll(JaasTestUtils.saslConfigs(saslProperties))
+ props ++= JaasTestUtils.saslConfigs(saslProperties)
interBrokerSecurityProtocol.foreach { protocol =>
props.put(KafkaConfig.InterBrokerSecurityProtocolProp, protocol.name)
@@ -388,9 +389,9 @@ object TestUtils extends Logging {
* Check that the buffer content from buffer.position() to buffer.limit() is equal
*/
def checkEquals(b1: ByteBuffer, b2: ByteBuffer) {
- assertEquals("Buffers should have equal length", b1.limit - b1.position, b2.limit - b2.position)
- for(i <- 0 until b1.limit - b1.position)
- assertEquals("byte " + i + " byte not equal.", b1.get(b1.position + i), b2.get(b1.position + i))
+ assertEquals("Buffers should have equal length", b1.limit() - b1.position(), b2.limit() - b2.position())
+ for(i <- 0 until b1.limit() - b1.position())
+ assertEquals("byte " + i + " byte not equal.", b1.get(b1.position() + i), b2.get(b1.position() + i))
}
/**
@@ -484,8 +485,8 @@ object TestUtils extends Logging {
*/
def hexString(buffer: ByteBuffer): String = {
val builder = new StringBuilder("0x")
- for(i <- 0 until buffer.limit)
- builder.append(String.format("%x", Integer.valueOf(buffer.get(buffer.position + i))))
+ for(i <- 0 until buffer.limit())
+ builder.append(String.format("%x", Integer.valueOf(buffer.get(buffer.position() + i))))
builder.toString
}
@@ -503,7 +504,7 @@ object TestUtils extends Logging {
//override any explicitly specified properties
if (producerProps != null)
- props.putAll(producerProps)
+ props ++= producerProps
props.put("serializer.class", encoder)
props.put("key.serializer.class", keyEncoder)
@@ -518,10 +519,10 @@ object TestUtils extends Logging {
saslProperties: Option[Properties]): Properties = {
val props = new Properties
if (usesSslTransportLayer(securityProtocol))
- props.putAll(sslConfigs(mode, securityProtocol == SecurityProtocol.SSL, trustStoreFile, certAlias))
+ props ++= sslConfigs(mode, securityProtocol == SecurityProtocol.SSL, trustStoreFile, certAlias)
if (usesSaslAuthentication(securityProtocol))
- props.putAll(JaasTestUtils.saslConfigs(saslProperties))
+ props ++= JaasTestUtils.saslConfigs(saslProperties)
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol.name)
props
}
@@ -572,7 +573,7 @@ object TestUtils extends Logging {
* SSL client auth fails.
*/
if (!producerProps.containsKey(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG))
- producerProps.putAll(producerSecurityConfigs(securityProtocol, trustStoreFile, saslProperties))
+ producerProps ++= producerSecurityConfigs(securityProtocol, trustStoreFile, saslProperties)
new KafkaProducer[K, V](producerProps, keySerializer, valueSerializer)
}
@@ -633,7 +634,7 @@ object TestUtils extends Logging {
* SSL client auth fails.
*/
if(!consumerProps.containsKey(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG))
- consumerProps.putAll(consumerSecurityConfigs(securityProtocol, trustStoreFile, saslProperties))
+ consumerProps ++= consumerSecurityConfigs(securityProtocol, trustStoreFile, saslProperties)
new KafkaConsumer[Array[Byte],Array[Byte]](consumerProps)
}
@@ -1208,7 +1209,7 @@ object TestUtils extends Logging {
def copyOf(props: Properties): Properties = {
val copy = new Properties()
- copy.putAll(props)
+ copy ++= props
copy
}
diff --git a/core/src/test/scala/unit/kafka/utils/UtilsTest.scala b/core/src/test/scala/unit/kafka/utils/UtilsTest.scala
index 22a0a167334..461767aaea4 100755
--- a/core/src/test/scala/unit/kafka/utils/UtilsTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/UtilsTest.scala
@@ -28,7 +28,7 @@ import org.junit.Assert._
import kafka.common.KafkaException
import kafka.utils.CoreUtils.inLock
import org.junit.Test
-import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.common.utils.{Base64, Utils}
class UtilsTest extends JUnitSuite {
@@ -157,13 +157,15 @@ class UtilsTest extends JUnitSuite {
def testUrlSafeBase64EncodeUUID() {
// Test a UUID that has no + or / characters in base64 encoding [a149b4a3-06e1-4b49-a8cb-8a9c4a59fa46 ->(base64)-> oUm0owbhS0moy4qcSln6Rg==]
- val clusterId1 = CoreUtils.urlSafeBase64EncodeNoPadding(CoreUtils.getBytesFromUuid(UUID.fromString("a149b4a3-06e1-4b49-a8cb-8a9c4a59fa46")))
+ val clusterId1 = Base64.urlEncoderNoPadding.encodeToString(CoreUtils.getBytesFromUuid(UUID.fromString(
+ "a149b4a3-06e1-4b49-a8cb-8a9c4a59fa46")))
assertEquals(clusterId1, "oUm0owbhS0moy4qcSln6Rg")
assertEquals(clusterId1.length, 22)
assertTrue(clusterIdPattern.matcher(clusterId1).matches())
// Test a UUID that has + or / characters in base64 encoding [d418ec02-277e-4853-81e6-afe30259daec ->(base64)-> 1BjsAid+SFOB5q/jAlna7A==]
- val clusterId2 = CoreUtils.urlSafeBase64EncodeNoPadding(CoreUtils.getBytesFromUuid(UUID.fromString("d418ec02-277e-4853-81e6-afe30259daec")))
+ val clusterId2 = Base64.urlEncoderNoPadding.encodeToString(CoreUtils.getBytesFromUuid(UUID.fromString(
+ "d418ec02-277e-4853-81e6-afe30259daec")))
assertEquals(clusterId2, "1BjsAid-SFOB5q_jAlna7A")
assertEquals(clusterId2.length, 22)
assertTrue(clusterIdPattern.matcher(clusterId2).matches())