KAFKA-4501; Java 9 compilation and runtime fixes

Compilation error fixes:
- Avoid ambiguity error when appending to Properties in Scala
code (https://github.com/scala/bug/issues/10418)
- Use position() and limit() to fix ambiguity issue (
https://github.com/scala/bug/issues/10418#issuecomment-316364778)
- Disable findBugs if Java 9 is used (
https://github.com/findbugsproject/findbugs/issues/105)

Compilation warning fixes:
- Avoid deprecated Class.newInstance in Utils.newInstance
- Silence a few Java 9 deprecation warnings
- var -> val and unused fixes

Runtime error fixes:
- Introduce Base64 class that works in Java 7 and Java 9

Also:
- Set --release option if building with Java 9

Note that tests involving EasyMock (https://github.com/easymock/easymock/issues/193)
or PowerMock (https://github.com/powermock/powermock/issues/783)
will fail as neither supports Java 9 currently.

Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Jason Gustafson <jason@confluent.io>

Closes #3647 from ijuma/kafka-4501-support-java-9
This commit is contained in:
Ismael Juma 2017-08-19 08:55:29 +01:00
parent 3e22c1c04a
commit ed96523a2c
73 changed files with 505 additions and 182 deletions

View File

@ -82,7 +82,7 @@ ext {
maxPermSizeArgs = []
if (!JavaVersion.current().isJava8Compatible())
maxPermSizeArgs = ['-XX:MaxPermSize=512m']
maxPermSizeArgs += '-XX:MaxPermSize=512m'
userMaxForks = project.hasProperty('maxParallelForks') ? maxParallelForks.toInteger() : null
@ -137,14 +137,24 @@ subprojects {
apply plugin: 'maven'
apply plugin: 'signing'
apply plugin: 'checkstyle'
apply plugin: 'findbugs'
if (!JavaVersion.current().isJava9Compatible())
apply plugin: 'findbugs'
sourceCompatibility = 1.7
targetCompatibility = 1.7
compileJava {
options.encoding = 'UTF-8'
// Add unchecked once we drop support for Java 7 as @SuppressWarnings("unchecked") is too buggy in Java 7
options.compilerArgs << "-Xlint:deprecation"
// -Xlint:unchecked is too buggy in Java 7, so we only enable for Java 8 or higher
if (JavaVersion.current().isJava8Compatible())
options.compilerArgs << "-Xlint:unchecked"
// --release is the recommended way to select the target release, but it's only supported in Java 9 so we also
// set --source and --target via `sourceCompatibility` and `targetCompatibility`. If/when Gradle supports `--release`
// natively (https://github.com/gradle/gradle/issues/2510), we should switch to that.
if (JavaVersion.current().isJava9Compatible())
options.compilerArgs << "--release" << "7"
}
uploadArchives {
@ -349,17 +359,19 @@ subprojects {
}
test.dependsOn('checkstyleMain', 'checkstyleTest')
findbugs {
toolVersion = "3.0.1"
excludeFilter = file("$rootDir/gradle/findbugs-exclude.xml")
ignoreFailures = false
}
test.dependsOn('findbugsMain')
if (!JavaVersion.current().isJava9Compatible()) {
findbugs {
toolVersion = "3.0.1"
excludeFilter = file("$rootDir/gradle/findbugs-exclude.xml")
ignoreFailures = false
}
test.dependsOn('findbugsMain')
tasks.withType(FindBugs) {
reports {
xml.enabled (project.hasProperty('xmlFindBugsReport'))
html.enabled (!project.hasProperty('xmlFindBugsReport'))
tasks.withType(FindBugs) {
reports {
xml.enabled(project.hasProperty('xmlFindBugsReport'))
html.enabled(!project.hasProperty('xmlFindBugsReport'))
}
}
}

View File

@ -98,7 +98,6 @@
</subpackage>
<subpackage name="scram">
<allow pkg="javax.crypto" />
<allow pkg="javax.xml.bind" />
<allow pkg="org.apache.kafka.common.errors" />
</subpackage>
</subpackage>
@ -247,7 +246,6 @@
<subpackage name="test">
<allow pkg="org.apache.kafka" />
<allow pkg="org.bouncycastle" />
<allow pkg="javax.xml.bind" />
</subpackage>
<subpackage name="connect">

View File

@ -25,9 +25,10 @@ public class PlainSaslServerProvider extends Provider {
private static final long serialVersionUID = 1L;
@SuppressWarnings("deprecation")
protected PlainSaslServerProvider() {
super("Simple SASL/PLAIN Server Provider", 1.0, "Simple SASL/PLAIN Server Provider for Kafka");
super.put("SaslServerFactory." + PlainSaslServer.PLAIN_MECHANISM, PlainSaslServerFactory.class.getName());
put("SaslServerFactory." + PlainSaslServer.PLAIN_MECHANISM, PlainSaslServerFactory.class.getName());
}
public static void initialize() {

View File

@ -19,9 +19,8 @@ package org.apache.kafka.common.security.scram;
import java.util.Collection;
import java.util.Properties;
import javax.xml.bind.DatatypeConverter;
import org.apache.kafka.common.security.authenticator.CredentialCache;
import org.apache.kafka.common.utils.Base64;
/**
* SCRAM Credential persistence utility functions. Implements format conversion used
@ -41,11 +40,11 @@ public class ScramCredentialUtils {
public static String credentialToString(ScramCredential credential) {
return String.format("%s=%s,%s=%s,%s=%s,%s=%d",
SALT,
DatatypeConverter.printBase64Binary(credential.salt()),
Base64.encoder().encodeToString(credential.salt()),
STORED_KEY,
DatatypeConverter.printBase64Binary(credential.storedKey()),
Base64.encoder().encodeToString(credential.storedKey()),
SERVER_KEY,
DatatypeConverter.printBase64Binary(credential.serverKey()),
Base64.encoder().encodeToString(credential.serverKey()),
ITERATIONS,
credential.iterations());
}
@ -56,9 +55,9 @@ public class ScramCredentialUtils {
!props.containsKey(SERVER_KEY) || !props.containsKey(ITERATIONS)) {
throw new IllegalArgumentException("Credentials not valid: " + str);
}
byte[] salt = DatatypeConverter.parseBase64Binary(props.getProperty(SALT));
byte[] storedKey = DatatypeConverter.parseBase64Binary(props.getProperty(STORED_KEY));
byte[] serverKey = DatatypeConverter.parseBase64Binary(props.getProperty(SERVER_KEY));
byte[] salt = Base64.decoder().decode(props.getProperty(SALT));
byte[] storedKey = Base64.decoder().decode(props.getProperty(STORED_KEY));
byte[] serverKey = Base64.decoder().decode(props.getProperty(SERVER_KEY));
int iterations = Integer.parseInt(props.getProperty(ITERATIONS));
return new ScramCredential(salt, storedKey, serverKey, iterations);
}

View File

@ -16,12 +16,13 @@
*/
package org.apache.kafka.common.security.scram;
import org.apache.kafka.common.utils.Base64;
import java.nio.charset.StandardCharsets;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.security.sasl.SaslException;
import javax.xml.bind.DatatypeConverter;
/**
* SCRAM request/response message creation and parsing based on
@ -140,7 +141,7 @@ public class ScramMessages {
}
this.nonce = matcher.group("nonce");
String salt = matcher.group("salt");
this.salt = DatatypeConverter.parseBase64Binary(salt);
this.salt = Base64.decoder().decode(salt);
}
public ServerFirstMessage(String clientNonce, String serverNonce, byte[] salt, int iterations) {
this.nonce = clientNonce + serverNonce;
@ -157,7 +158,7 @@ public class ScramMessages {
return iterations;
}
String toMessage() {
return String.format("r=%s,s=%s,i=%d", nonce, DatatypeConverter.printBase64Binary(salt), iterations);
return String.format("r=%s,s=%s,i=%d", nonce, Base64.encoder().encodeToString(salt), iterations);
}
}
/**
@ -184,9 +185,9 @@ public class ScramMessages {
if (!matcher.matches())
throw new SaslException("Invalid SCRAM client final message format: " + message);
this.channelBinding = DatatypeConverter.parseBase64Binary(matcher.group("channel"));
this.channelBinding = Base64.decoder().decode(matcher.group("channel"));
this.nonce = matcher.group("nonce");
this.proof = DatatypeConverter.parseBase64Binary(matcher.group("proof"));
this.proof = Base64.decoder().decode(matcher.group("proof"));
}
public ClientFinalMessage(byte[] channelBinding, String nonce) {
this.channelBinding = channelBinding;
@ -206,13 +207,13 @@ public class ScramMessages {
}
public String clientFinalMessageWithoutProof() {
return String.format("c=%s,r=%s",
DatatypeConverter.printBase64Binary(channelBinding),
Base64.encoder().encodeToString(channelBinding),
nonce);
}
String toMessage() {
return String.format("%s,p=%s",
clientFinalMessageWithoutProof(),
DatatypeConverter.printBase64Binary(proof));
Base64.encoder().encodeToString(proof));
}
}
/**
@ -243,7 +244,7 @@ public class ScramMessages {
// ignore
}
if (error == null) {
this.serverSignature = DatatypeConverter.parseBase64Binary(matcher.group("signature"));
this.serverSignature = Base64.decoder().decode(matcher.group("signature"));
this.error = null;
} else {
this.serverSignature = null;
@ -264,7 +265,7 @@ public class ScramMessages {
if (error != null)
return "e=" + error;
else
return "v=" + DatatypeConverter.printBase64Binary(serverSignature);
return "v=" + Base64.encoder().encodeToString(serverSignature);
}
}
}

View File

@ -25,10 +25,11 @@ public class ScramSaslClientProvider extends Provider {
private static final long serialVersionUID = 1L;
@SuppressWarnings("deprecation")
protected ScramSaslClientProvider() {
super("SASL/SCRAM Client Provider", 1.0, "SASL/SCRAM Client Provider for Kafka");
for (ScramMechanism mechanism : ScramMechanism.values())
super.put("SaslClientFactory." + mechanism.mechanismName(), ScramSaslClientFactory.class.getName());
put("SaslClientFactory." + mechanism.mechanismName(), ScramSaslClientFactory.class.getName());
}
public static void initialize() {

View File

@ -25,10 +25,11 @@ public class ScramSaslServerProvider extends Provider {
private static final long serialVersionUID = 1L;
@SuppressWarnings("deprecation")
protected ScramSaslServerProvider() {
super("SASL/SCRAM Server Provider", 1.0, "SASL/SCRAM Server Provider for Kafka");
for (ScramMechanism mechanism : ScramMechanism.values())
super.put("SaslServerFactory." + mechanism.mechanismName(), ScramSaslServerFactory.class.getName());
put("SaslServerFactory." + mechanism.mechanismName(), ScramSaslServerFactory.class.getName());
}
public static void initialize() {

View File

@ -0,0 +1,261 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.utils;
import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
/**
* Temporary class in order to support Java 7 and Java 9. `DatatypeConverter` is not in the base module of Java 9
* and `java.util.Base64` was only introduced in Java 8.
*/
public final class Base64 {
private static final Factory FACTORY;
static {
if (Java.IS_JAVA8_COMPATIBLE)
FACTORY = new Java8Factory();
else
FACTORY = new Java7Factory();
}
private Base64() {}
public static Encoder encoder() {
return FACTORY.encoder();
}
public static Encoder urlEncoderNoPadding() {
return FACTORY.urlEncoderNoPadding();
}
public static Decoder decoder() {
return FACTORY.decoder();
}
/* Contains a subset of methods from java.util.Base64.Encoder (introduced in Java 8) */
public interface Encoder {
String encodeToString(byte[] bytes);
}
/* Contains a subset of methods from java.util.Base64.Decoder (introduced in Java 8) */
public interface Decoder {
byte[] decode(String string);
}
private interface Factory {
Encoder urlEncoderNoPadding();
Encoder encoder();
Decoder decoder();
}
private static class Java8Factory implements Factory {
// Static final MethodHandles are optimised better by HotSpot
private static final MethodHandle URL_ENCODE_NO_PADDING;
private static final MethodHandle ENCODE;
private static final MethodHandle DECODE;
private static final Encoder URL_ENCODER_NO_PADDING;
private static final Encoder ENCODER;
private static final Decoder DECODER;
static {
try {
Class<?> base64Class = Class.forName("java.util.Base64");
MethodHandles.Lookup lookup = MethodHandles.publicLookup();
Class<?> juEncoderClass = Class.forName("java.util.Base64$Encoder");
MethodHandle getEncoder = lookup.findStatic(base64Class, "getEncoder",
MethodType.methodType(juEncoderClass));
Object juEncoder;
try {
juEncoder = getEncoder.invoke();
} catch (Throwable throwable) {
// Invoked method doesn't throw checked exceptions, so safe to cast
throw (RuntimeException) throwable;
}
MethodHandle encode = lookup.findVirtual(juEncoderClass, "encodeToString",
MethodType.methodType(String.class, byte[].class));
ENCODE = encode.bindTo(juEncoder);
MethodHandle getUrlEncoder = lookup.findStatic(base64Class, "getUrlEncoder",
MethodType.methodType(juEncoderClass));
Object juUrlEncoderNoPassing;
try {
juUrlEncoderNoPassing = lookup.findVirtual(juEncoderClass, "withoutPadding",
MethodType.methodType(juEncoderClass)).invoke(getUrlEncoder.invoke());
} catch (Throwable throwable) {
// Invoked method doesn't throw checked exceptions, so safe to cast
throw (RuntimeException) throwable;
}
URL_ENCODE_NO_PADDING = encode.bindTo(juUrlEncoderNoPassing);
Class<?> juDecoderClass = Class.forName("java.util.Base64$Decoder");
MethodHandle getDecoder = lookup.findStatic(base64Class, "getDecoder",
MethodType.methodType(juDecoderClass));
MethodHandle decode = lookup.findVirtual(juDecoderClass, "decode",
MethodType.methodType(byte[].class, String.class));
try {
DECODE = decode.bindTo(getDecoder.invoke());
} catch (Throwable throwable) {
// Invoked method doesn't throw checked exceptions, so safe to cast
throw (RuntimeException) throwable;
}
URL_ENCODER_NO_PADDING = new Encoder() {
@Override
public String encodeToString(byte[] bytes) {
try {
return (String) URL_ENCODE_NO_PADDING.invokeExact(bytes);
} catch (Throwable throwable) {
// Invoked method doesn't throw checked exceptions, so safe to cast
throw (RuntimeException) throwable;
}
}
};
ENCODER = new Encoder() {
@Override
public String encodeToString(byte[] bytes) {
try {
return (String) ENCODE.invokeExact(bytes);
} catch (Throwable throwable) {
// Invoked method doesn't throw checked exceptions, so safe to cast
throw (RuntimeException) throwable;
}
}
};
DECODER = new Decoder() {
@Override
public byte[] decode(String string) {
try {
return (byte[]) DECODE.invokeExact(string);
} catch (Throwable throwable) {
// Invoked method doesn't throw checked exceptions, so safe to cast
throw (RuntimeException) throwable;
}
}
};
} catch (ReflectiveOperationException e) {
// Should never happen
throw new RuntimeException(e);
}
}
@Override
public Encoder urlEncoderNoPadding() {
return URL_ENCODER_NO_PADDING;
}
@Override
public Encoder encoder() {
return ENCODER;
}
@Override
public Decoder decoder() {
return DECODER;
}
}
private static class Java7Factory implements Factory {
// Static final MethodHandles are optimised better by HotSpot
private static final MethodHandle PRINT;
private static final MethodHandle PARSE;
static {
try {
Class<?> cls = Class.forName("javax.xml.bind.DatatypeConverter");
MethodHandles.Lookup lookup = MethodHandles.publicLookup();
PRINT = lookup.findStatic(cls, "printBase64Binary", MethodType.methodType(String.class,
byte[].class));
PARSE = lookup.findStatic(cls, "parseBase64Binary", MethodType.methodType(byte[].class,
String.class));
} catch (ReflectiveOperationException e) {
// Should never happen
throw new RuntimeException(e);
}
}
public static final Encoder URL_ENCODER_NO_PADDING = new Encoder() {
@Override
public String encodeToString(byte[] bytes) {
String base64EncodedUUID = Java7Factory.encodeToString(bytes);
//Convert to URL safe variant by replacing + and / with - and _ respectively.
String urlSafeBase64EncodedUUID = base64EncodedUUID.replace("+", "-")
.replace("/", "_");
// Remove the "==" padding at the end.
return urlSafeBase64EncodedUUID.substring(0, urlSafeBase64EncodedUUID.length() - 2);
}
};
public static final Encoder ENCODER = new Encoder() {
@Override
public String encodeToString(byte[] bytes) {
return Java7Factory.encodeToString(bytes);
}
};
public static final Decoder DECODER = new Decoder() {
@Override
public byte[] decode(String string) {
try {
return (byte[]) PARSE.invokeExact(string);
} catch (Throwable throwable) {
// Invoked method doesn't throw checked exceptions, so safe to cast
throw (RuntimeException) throwable;
}
}
};
private static String encodeToString(byte[] bytes) {
try {
return (String) PRINT.invokeExact(bytes);
} catch (Throwable throwable) {
// Invoked method doesn't throw checked exceptions, so safe to cast
throw (RuntimeException) throwable;
}
}
@Override
public Encoder urlEncoderNoPadding() {
return URL_ENCODER_NO_PADDING;
}
@Override
public Encoder encoder() {
return ENCODER;
}
@Override
public Decoder decoder() {
return DECODER;
}
}
}

View File

@ -32,7 +32,7 @@ import java.util.zip.Checksum;
*
* NOTE: This class is intended for INTERNAL usage only within Kafka.
*/
public class Crc32C {
public final class Crc32C {
private static final ChecksumFactory CHECKSUM_FACTORY;
@ -43,6 +43,8 @@ public class Crc32C {
CHECKSUM_FACTORY = new PureJavaChecksumFactory();
}
private Crc32C() {}
/**
* Compute the CRC32C (Castagnoli) of the segment of the byte array given by the specified size and offset
*

View File

@ -41,6 +41,9 @@ public final class Java {
public static final boolean IS_JAVA9_COMPATIBLE = JVM_MAJOR_VERSION > 1 ||
(JVM_MAJOR_VERSION == 1 && JVM_MINOR_VERSION >= 9);
public static final boolean IS_JAVA8_COMPATIBLE = JVM_MAJOR_VERSION > 1 ||
(JVM_MAJOR_VERSION == 1 && JVM_MINOR_VERSION >= 8);
public static boolean isIBMJdk() {
return System.getProperty("java.vendor").contains("IBM");
}

View File

@ -284,14 +284,14 @@ public class Utils {
* Instantiate the class
*/
public static <T> T newInstance(Class<T> c) {
if (c == null)
throw new KafkaException("class cannot be null");
try {
return c.newInstance();
} catch (IllegalAccessException e) {
return c.getDeclaredConstructor().newInstance();
} catch (NoSuchMethodException e) {
throw new KafkaException("Could not find a public no-argument constructor for " + c.getName(), e);
} catch (ReflectiveOperationException | RuntimeException e) {
throw new KafkaException("Could not instantiate class " + c.getName(), e);
} catch (InstantiationException e) {
throw new KafkaException("Could not instantiate class " + c.getName() + " Does it have a public no-argument constructor?", e);
} catch (NullPointerException e) {
throw new KafkaException("Requested class was null", e);
}
}

View File

@ -105,9 +105,10 @@ public class TestDigestLoginModule extends PlainLoginModule {
private static final long serialVersionUID = 1L;
@SuppressWarnings("deprecation")
protected DigestSaslServerProvider() {
super("Test SASL/Digest-MD5 Server Provider", 1.0, "Test SASL/Digest-MD5 Server Provider for Kafka");
super.put("SaslServerFactory.DIGEST-MD5", TestDigestLoginModule.DigestSaslServerFactory.class.getName());
put("SaslServerFactory.DIGEST-MD5", TestDigestLoginModule.DigestSaslServerFactory.class.getName());
}
}
}

View File

@ -16,10 +16,9 @@
*/
package org.apache.kafka.common.security.scram;
import org.apache.kafka.common.utils.Base64;
import org.junit.Test;
import javax.xml.bind.DatatypeConverter;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
@ -55,13 +54,13 @@ public class ScramFormatterTest {
String serverNonce = serverFirst.nonce().substring(clientNonce.length());
assertEquals("%hvYDpWUa2RaTCAfuxFIlj)hNlF$k0", serverNonce);
byte[] salt = serverFirst.salt();
assertArrayEquals(DatatypeConverter.parseBase64Binary("W22ZaJ0SNY7soEsUEjb6gQ=="), salt);
assertArrayEquals(Base64.decoder().decode("W22ZaJ0SNY7soEsUEjb6gQ=="), salt);
int iterations = serverFirst.iterations();
assertEquals(4096, iterations);
byte[] channelBinding = clientFinal.channelBinding();
assertArrayEquals(DatatypeConverter.parseBase64Binary("biws"), channelBinding);
assertArrayEquals(Base64.decoder().decode("biws"), channelBinding);
byte[] serverSignature = serverFinal.serverSignature();
assertArrayEquals(DatatypeConverter.parseBase64Binary("6rriTRBi23WpRR/wtup+mMhUZUn/dB5nLTJRsjl95G4="), serverSignature);
assertArrayEquals(Base64.decoder().decode("6rriTRBi23WpRR/wtup+mMhUZUn/dB5nLTJRsjl95G4="), serverSignature);
byte[] saltedPassword = formatter.saltedPassword(password, salt, iterations);
byte[] serverKey = formatter.serverKey(saltedPassword);

View File

@ -16,13 +16,13 @@
*/
package org.apache.kafka.common.security.scram;
import org.apache.kafka.common.utils.Base64;
import org.junit.Before;
import org.junit.Test;
import java.nio.charset.StandardCharsets;
import javax.security.sasl.SaslException;
import javax.xml.bind.DatatypeConverter;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
@ -288,11 +288,11 @@ public class ScramMessagesTest {
}
private String randomBytesAsString() {
return DatatypeConverter.printBase64Binary(formatter.secureRandomBytes());
return Base64.encoder().encodeToString(formatter.secureRandomBytes());
}
private byte[] toBytes(String base64Str) {
return DatatypeConverter.parseBase64Binary(base64Str);
return Base64.decoder().decode(base64Str);
};
private void checkClientFirstMessage(ClientFirstMessage message, String saslName, String nonce, String authzid) {
@ -303,14 +303,14 @@ public class ScramMessagesTest {
private void checkServerFirstMessage(ServerFirstMessage message, String nonce, String salt, int iterations) {
assertEquals(nonce, message.nonce());
assertArrayEquals(DatatypeConverter.parseBase64Binary(salt), message.salt());
assertArrayEquals(Base64.decoder().decode(salt), message.salt());
assertEquals(iterations, message.iterations());
}
private void checkClientFinalMessage(ClientFinalMessage message, String channelBinding, String nonce, String proof) {
assertArrayEquals(DatatypeConverter.parseBase64Binary(channelBinding), message.channelBinding());
assertArrayEquals(Base64.decoder().decode(channelBinding), message.channelBinding());
assertEquals(nonce, message.nonce());
assertArrayEquals(DatatypeConverter.parseBase64Binary(proof), message.proof());
assertArrayEquals(Base64.decoder().decode(proof), message.proof());
}
private void checkServerFinalMessage(ServerFinalMessage message, String error, String serverSignature) {
@ -318,7 +318,7 @@ public class ScramMessagesTest {
if (serverSignature == null)
assertNull("Unexpected server signature", message.serverSignature());
else
assertArrayEquals(DatatypeConverter.parseBase64Binary(serverSignature), message.serverSignature());
assertArrayEquals(Base64.decoder().decode(serverSignature), message.serverSignature());
}
@SuppressWarnings("unchecked")

View File

@ -22,11 +22,11 @@ import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.utils.Base64;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.xml.bind.DatatypeConverter;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
@ -292,7 +292,7 @@ public class TestUtils {
// Convert into normal variant and add padding at the end.
String originalClusterId = String.format("%s==", clusterId.replace("_", "/").replace("-", "+"));
byte[] decodedUuid = DatatypeConverter.parseBase64Binary(originalClusterId);
byte[] decodedUuid = Base64.decoder().decode(originalClusterId);
// We expect 16 bytes, same as the input UUID.
assertEquals(decodedUuid.length, 16);

View File

@ -20,6 +20,7 @@ package kafka
import java.util.Properties
import joptsimple.OptionParser
import kafka.utils.Implicits._
import kafka.server.{KafkaServer, KafkaServerStartable}
import kafka.utils.{CommandLineUtils, Exit, Logging}
import org.apache.kafka.common.utils.Utils
@ -47,7 +48,7 @@ object Kafka extends Logging {
CommandLineUtils.printUsageAndDie(optionParser, "Found non argument parameters: " + options.nonOptionArguments().toArray.mkString(","))
}
props.putAll(CommandLineUtils.parseKeyValueArgs(options.valuesOf(overrideOpt).asScala))
props ++= CommandLineUtils.parseKeyValueArgs(options.valuesOf(overrideOpt).asScala)
}
props
}

View File

@ -24,6 +24,7 @@ import kafka.common.InvalidConfigException
import kafka.log.LogConfig
import kafka.server.{ConfigEntityName, ConfigType, DynamicConfig, QuotaId}
import kafka.utils.{CommandLineUtils, ZkUtils}
import kafka.utils.Implicits._
import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.common.security.scram._
import org.apache.kafka.common.utils.Utils
@ -95,7 +96,7 @@ object ConfigCommand extends Config {
if (invalidConfigs.nonEmpty)
throw new InvalidConfigException(s"Invalid config(s): ${invalidConfigs.mkString(",")}")
configs.putAll(configsToBeAdded)
configs ++= configsToBeAdded
configsToBeDeleted.foreach(configs.remove(_))
utils.changeConfigs(zkUtils, entityType, entityName, configs)

View File

@ -25,6 +25,7 @@ import joptsimple.{OptionParser, OptionSpec}
import kafka.api.{OffsetFetchRequest, OffsetFetchResponse, OffsetRequest, PartitionOffsetRequestInfo}
import kafka.client.ClientUtils
import kafka.common.{OffsetMetadataAndError, TopicAndPartition}
import kafka.utils.Implicits._
import kafka.consumer.SimpleConsumer
import kafka.utils._
import org.I0Itec.zkclient.exception.ZkNoNodeException
@ -514,7 +515,8 @@ object ConsumerGroupCommand extends Logging {
properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000")
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deserializer)
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer)
if (opts.options.has(opts.commandConfigOpt)) properties.putAll(Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt)))
if (opts.options.has(opts.commandConfigOpt))
properties ++= Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt))
new KafkaConsumer(properties)
}

View File

@ -21,6 +21,7 @@ import java.util.Properties
import joptsimple._
import kafka.common.AdminCommandFailedException
import kafka.utils.Implicits._
import kafka.consumer.Whitelist
import kafka.log.LogConfig
import kafka.server.ConfigType
@ -130,7 +131,7 @@ object TopicCommand extends Logging {
val configsToBeAdded = parseTopicConfigsToBeAdded(opts)
val configsToBeDeleted = parseTopicConfigsToBeDeleted(opts)
// compile the final set of configs
configs.putAll(configsToBeAdded)
configs ++= configsToBeAdded
configsToBeDeleted.foreach(config => configs.remove(config))
AdminUtils.changeTopicConfig(zkUtils, topic, configs)
println("Updated config for topic \"%s\".".format(topic))

View File

@ -62,7 +62,7 @@ object ZkSecurityMigrator extends Logging {
+ "authentication.")
def run(args: Array[String]) {
var jaasFile = System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)
val jaasFile = System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)
val parser = new OptionParser(false)
val zkAclOpt = parser.accepts("zookeeper.acl", "Indicates whether to make the Kafka znodes in ZooKeeper secure or unsecure."
+ " The options are 'secure' and 'unsecure'").withRequiredArg().ofType(classOf[String])

View File

@ -33,7 +33,7 @@ object FetchResponsePartitionData {
val messageSetSize = buffer.getInt
val messageSetBuffer = buffer.slice()
messageSetBuffer.limit(messageSetSize)
buffer.position(buffer.position + messageSetSize)
buffer.position(buffer.position() + messageSetSize)
new FetchResponsePartitionData(error, hw, new ByteBufferMessageSet(messageSetBuffer))
}

View File

@ -93,7 +93,7 @@ case class ProducerRequest(versionId: Short = ProducerRequest.CurrentVersion,
val partitionMessageData = partitionAndData._2
val bytes = partitionMessageData.buffer
buffer.putInt(partition)
buffer.putInt(bytes.limit)
buffer.putInt(bytes.limit())
buffer.put(bytes)
bytes.rewind
})

View File

@ -764,7 +764,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
topicDeleted || successful
}.keys
reassignedPartitions.foreach(p => removePartitionFromReassignedPartitions(p))
var partitionsToReassign: mutable.Map[TopicAndPartition, ReassignedPartitionsContext] = new mutable.HashMap
val partitionsToReassign = mutable.Map[TopicAndPartition, ReassignedPartitionsContext]()
partitionsToReassign ++= partitionsBeingReassigned
partitionsToReassign --= reassignedPartitions
controllerContext.partitionsBeingReassigned ++= partitionsToReassign

View File

@ -17,7 +17,6 @@
package kafka.javaapi
import kafka.cluster.BrokerEndPoint
import org.apache.kafka.common.protocol.Errors
import scala.collection.JavaConverters._
private[javaapi] object MetadataListImplicits {

View File

@ -85,7 +85,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
val scalaReturn = underlying.consume(scalaTopicCountMap, keyDecoder, valueDecoder)
val ret = new java.util.HashMap[String,java.util.List[KafkaStream[K,V]]]
for ((topic, streams) <- scalaReturn) {
var javaStreamList = new java.util.ArrayList[KafkaStream[K,V]]
val javaStreamList = new java.util.ArrayList[KafkaStream[K,V]]
for (stream <- streams)
javaStreamList.add(stream)
ret.put(topic, javaStreamList)

View File

@ -17,7 +17,7 @@
package kafka.log
import java.io.{File, IOException, RandomAccessFile}
import java.io.{File, RandomAccessFile}
import java.nio.{ByteBuffer, MappedByteBuffer}
import java.nio.channels.FileChannel
import java.util.concurrent.locks.{Lock, ReentrantLock}
@ -69,7 +69,7 @@ abstract class AbstractIndex[K, V](@volatile var file: File, val baseOffset: Lon
idx.position(0)
else
// if this is a pre-existing index, assume it is valid and set position to last entry
idx.position(roundDownToExactMultiple(idx.limit, entrySize))
idx.position(roundDownToExactMultiple(idx.limit(), entrySize))
idx
} finally {
CoreUtils.swallow(raf.close())
@ -80,11 +80,11 @@ abstract class AbstractIndex[K, V](@volatile var file: File, val baseOffset: Lon
* The maximum number of entries this index can hold
*/
@volatile
private[this] var _maxEntries = mmap.limit / entrySize
private[this] var _maxEntries = mmap.limit() / entrySize
/** The number of entries in this index */
@volatile
protected var _entries = mmap.position / entrySize
protected var _entries = mmap.position() / entrySize
/**
* True iff there are no more slots available in this index
@ -105,7 +105,7 @@ abstract class AbstractIndex[K, V](@volatile var file: File, val baseOffset: Lon
inLock(lock) {
val raf = new RandomAccessFile(file, "rw")
val roundedNewSize = roundDownToExactMultiple(newSize, entrySize)
val position = mmap.position
val position = mmap.position()
/* Windows won't let us modify the file length while the file is mmapped :-( */
if (OperatingSystem.IS_WINDOWS)
@ -113,7 +113,7 @@ abstract class AbstractIndex[K, V](@volatile var file: File, val baseOffset: Lon
try {
raf.setLength(roundedNewSize)
mmap = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, roundedNewSize)
_maxEntries = mmap.limit / entrySize
_maxEntries = mmap.limit() / entrySize
mmap.position(position)
} finally {
CoreUtils.swallow(raf.close())

View File

@ -550,7 +550,7 @@ private[log] class Cleaner(val id: Int,
// if any messages are to be retained, write them out
val outputBuffer = result.output
if (outputBuffer.position > 0) {
if (outputBuffer.position() > 0) {
outputBuffer.flip()
val retained = MemoryRecords.readableRecords(outputBuffer)
dest.append(firstOffset = retained.batches.iterator.next().baseOffset,
@ -558,11 +558,11 @@ private[log] class Cleaner(val id: Int,
largestTimestamp = result.maxTimestamp,
shallowOffsetOfMaxTimestamp = result.shallowOffsetOfMaxTimestamp,
records = retained)
throttler.maybeThrottle(outputBuffer.limit)
throttler.maybeThrottle(outputBuffer.limit())
}
// if we read bytes but didn't get even one complete message, our I/O buffer is too small, grow it and try again
if (readBuffer.limit > 0 && result.messagesRead == 0)
if (readBuffer.limit() > 0 && result.messagesRead == 0)
growBuffers(maxLogMessageSize)
}
restoreBuffers()

View File

@ -23,6 +23,7 @@ import scala.collection.JavaConverters._
import kafka.api.ApiVersion
import kafka.message.{BrokerCompressionCodec, Message}
import kafka.server.{KafkaConfig, ThrottledReplicaListValidator}
import kafka.utils.Implicits._
import org.apache.kafka.common.errors.InvalidConfigurationException
import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, TopicConfig}
import org.apache.kafka.common.record.TimestampType
@ -269,8 +270,8 @@ object LogConfig {
*/
def fromProps(defaults: java.util.Map[_ <: Object, _ <: Object], overrides: Properties): LogConfig = {
val props = new Properties()
props.putAll(defaults)
props.putAll(overrides)
defaults.asScala.foreach { case (k, v) => props.put(k, v) }
props ++= overrides
LogConfig(props)
}

View File

@ -58,7 +58,7 @@ class OffsetIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writabl
private[this] var _lastOffset = lastEntry.offset
debug("Loaded index file %s with maxEntries = %d, maxIndexSize = %d, entries = %d, lastOffset = %d, file position = %d"
.format(file.getAbsolutePath, maxEntries, maxIndexSize, _entries, _lastOffset, mmap.position))
.format(file.getAbsolutePath, maxEntries, maxIndexSize, _entries, _lastOffset, mmap.position()))
/**
* The last entry in the index
@ -144,7 +144,7 @@ class OffsetIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writabl
mmap.putInt(position)
_entries += 1
_lastOffset = offset
require(_entries * entrySize == mmap.position, entries + " entries but file position in index is " + mmap.position + ".")
require(_entries * entrySize == mmap.position(), entries + " entries but file position in index is " + mmap.position() + ".")
} else {
throw new InvalidOffsetException("Attempt to append an offset (%d) to position %d no larger than the last offset appended (%d) to %s."
.format(offset, entries, _lastOffset, file.getAbsolutePath))

View File

@ -149,7 +149,7 @@ class SkimpyOffsetMap(val memory: Int, val hashAlgorithm: String = "MD5") extend
this.lookups = 0L
this.probes = 0L
this.lastOffset = -1L
Arrays.fill(bytes.array, bytes.arrayOffset, bytes.arrayOffset + bytes.limit, 0.toByte)
Arrays.fill(bytes.array, bytes.arrayOffset, bytes.arrayOffset + bytes.limit(), 0.toByte)
}
/**

View File

@ -338,12 +338,12 @@ object ProducerStateManager {
buffer.flip()
// now fill in the CRC
val crc = Crc32C.compute(buffer, ProducerEntriesOffset, buffer.limit - ProducerEntriesOffset)
val crc = Crc32C.compute(buffer, ProducerEntriesOffset, buffer.limit() - ProducerEntriesOffset)
ByteUtils.writeUnsignedInt(buffer, CrcOffset, crc)
val fos = new FileOutputStream(file)
try {
fos.write(buffer.array, buffer.arrayOffset, buffer.limit)
fos.write(buffer.array, buffer.arrayOffset, buffer.limit())
} finally {
fos.close()
}

View File

@ -126,7 +126,7 @@ class TimeIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writable:
mmap.putLong(timestamp)
mmap.putInt((offset - baseOffset).toInt)
_entries += 1
require(_entries * entrySize == mmap.position, _entries + " entries but file position in index is " + mmap.position + ".")
require(_entries * entrySize == mmap.position(), _entries + " entries but file position in index is " + mmap.position() + ".")
}
}
}

View File

@ -174,7 +174,7 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi
/**
* The total number of bytes in this message set, including any partial trailing messages
*/
def sizeInBytes: Int = buffer.limit
def sizeInBytes: Int = buffer.limit()
/**
* The total number of bytes in this message set not including any partial, trailing messages

View File

@ -222,7 +222,7 @@ class Message(val buffer: ByteBuffer,
* Compute the checksum of the message from the message contents
*/
def computeChecksum: Long =
Crc32.crc32(buffer, MagicOffset, buffer.limit - MagicOffset)
Crc32.crc32(buffer, MagicOffset, buffer.limit() - MagicOffset)
/**
* Retrieve the previously computed CRC for this message
@ -245,7 +245,7 @@ class Message(val buffer: ByteBuffer,
/**
* The complete serialized size of this message in bytes (including crc, header attributes, etc)
*/
def size: Int = buffer.limit
def size: Int = buffer.limit()
/**
* The position where the key size is stored.

View File

@ -23,6 +23,7 @@ import kafka.api.TopicMetadata
import kafka.cluster.BrokerEndPoint
import kafka.common.UnavailableProducerException
import kafka.utils.Logging
import kafka.utils.Implicits._
import scala.collection.mutable.HashMap
@ -35,7 +36,7 @@ object ProducerPool {
val props = new Properties()
props.put("host", broker.host)
props.put("port", broker.port.toString)
props.putAll(config.props.props)
props ++= config.props.props
new SyncProducer(new SyncProducerConfig(props))
}
}

View File

@ -58,7 +58,7 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging {
*/
if (logger.isDebugEnabled) {
val buffer = new RequestOrResponseSend("", request).buffer
trace("verifying sendbuffer of size " + buffer.limit)
trace("verifying sendbuffer of size " + buffer.limit())
val requestTypeId = buffer.getShort()
if(requestTypeId == ApiKeys.PRODUCE.id) {
val request = ProducerRequest.readFrom(buffer)

View File

@ -19,8 +19,6 @@ package kafka.security.auth
import kafka.common.{BaseEnum, KafkaException}
import org.apache.kafka.common.acl.AclOperation
import scala.util.{Failure, Success, Try}
/**
* Different operations a client may perform on kafka resources.
*/

View File

@ -19,8 +19,6 @@ package kafka.security.auth
import kafka.common.{BaseEnum, KafkaException}
import org.apache.kafka.common.acl.AclPermissionType
import scala.util.{Failure, Success, Try}
sealed trait PermissionType extends BaseEnum {
val toJava: AclPermissionType
}

View File

@ -26,7 +26,6 @@ import kafka.server.KafkaConfig
import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
import kafka.utils._
import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException}
import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.common.security.auth.KafkaPrincipal
import scala.collection.JavaConverters._
import org.apache.log4j.Logger

View File

@ -25,6 +25,7 @@ import kafka.log.{LogConfig, LogManager}
import kafka.security.CredentialProvider
import kafka.server.Constants._
import kafka.server.QuotaFactory.QuotaManagers
import kafka.utils.Implicits._
import kafka.utils.Logging
import org.apache.kafka.common.config.ConfigDef.Validator
import org.apache.kafka.common.config.ConfigException
@ -32,7 +33,6 @@ import org.apache.kafka.common.metrics.Quota
import org.apache.kafka.common.metrics.Quota._
import scala.collection.JavaConverters._
import scala.collection.mutable
/**
* The ConfigHandler is used to process config change notifications received by the DynamicConfigManager
@ -55,7 +55,7 @@ class TopicConfigHandler(private val logManager: LogManager, kafkaConfig: KafkaC
if (logs.nonEmpty) {
/* combine the default properties with the overrides in zk to create the new LogConfig */
val props = new Properties()
props.putAll(logManager.defaultConfig.originals)
props ++= logManager.defaultConfig.originals.asScala
topicConfig.asScala.foreach { case (key, value) =>
if (!configNamesToExclude.contains(key)) props.put(key, value)
}

View File

@ -349,7 +349,7 @@ final class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: Stri
def cancel(): List[T] = {
val iter = operations.iterator()
var cancelled = new ListBuffer[T]()
val cancelled = new ListBuffer[T]()
while (iter.hasNext) {
val curr = iter.next()
curr.cancel()

View File

@ -17,7 +17,6 @@
package kafka.server
import kafka.common.TopicAndPartition
import org.apache.kafka.common.TopicPartition
/**

View File

@ -26,6 +26,7 @@ import kafka.coordinator.group.OffsetConfig
import kafka.coordinator.transaction.{TransactionLog, TransactionStateManager}
import kafka.message.{BrokerCompressionCodec, CompressionCodec, Message, MessageSet}
import kafka.utils.CoreUtils
import kafka.utils.Implicits._
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.common.config.ConfigDef.ValidList
import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, ConfigException, SaslConfigs, SslConfigs, TopicConfig}
@ -875,8 +876,8 @@ object KafkaConfig {
def fromProps(defaults: Properties, overrides: Properties, doLog: Boolean): KafkaConfig = {
val props = new Properties()
props.putAll(defaults)
props.putAll(overrides)
props ++= defaults
props ++= overrides
fromProps(props, doLog)
}

View File

@ -29,6 +29,7 @@ import kafka.consumer._
import kafka.message._
import kafka.metrics.KafkaMetricsReporter
import kafka.utils._
import kafka.utils.Implicits._
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.common.errors.WakeupException
import org.apache.kafka.common.record.TimestampType
@ -173,8 +174,8 @@ object ConsoleConsumer extends Logging {
def getOldConsumerProps(config: ConsumerConfig): Properties = {
val props = new Properties
props.putAll(config.consumerProps)
props.putAll(config.extraConsumerProps)
props ++= config.consumerProps
props ++= config.extraConsumerProps
setAutoOffsetResetValue(config, props)
props.put("zookeeper.connect", config.zkConnectionStr)
@ -201,8 +202,8 @@ object ConsoleConsumer extends Logging {
def getNewConsumerProps(config: ConsumerConfig): Properties = {
val props = new Properties
props.putAll(config.consumerProps)
props.putAll(config.extraConsumerProps)
props ++= config.consumerProps
props ++= config.extraConsumerProps
setAutoOffsetResetValue(config, props)
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.bootstrapServer)
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer")

View File

@ -21,6 +21,7 @@ import kafka.common._
import kafka.message._
import kafka.serializer._
import kafka.utils.{CommandLineUtils, Exit, ToolsUtils}
import kafka.utils.Implicits._
import kafka.producer.{NewShinyProducer, OldProducer}
import java.util.Properties
import java.io._
@ -74,7 +75,7 @@ object ConsoleProducer {
def getReaderProps(config: ProducerConfig): Properties = {
val props = new Properties
props.put("topic",config.topic)
props.putAll(config.cmdLineProps)
props ++= config.cmdLineProps
props
}
@ -106,7 +107,7 @@ object ConsoleProducer {
if (config.options.has(config.producerConfigOpt))
Utils.loadProps(config.options.valueOf(config.producerConfigOpt))
else new Properties
props.putAll(config.extraProducerProps)
props ++= config.extraProducerProps
props
}

View File

@ -443,14 +443,14 @@ object DumpLogSegments {
}
def recordOutOfOrderIndexTimestamp(file: File, indexTimestamp: Long, prevIndexTimestamp: Long) {
var outOfOrderSeq = outOfOrderTimestamp.getOrElse(file.getAbsolutePath, new ArrayBuffer[(Long, Long)]())
val outOfOrderSeq = outOfOrderTimestamp.getOrElse(file.getAbsolutePath, new ArrayBuffer[(Long, Long)]())
if (outOfOrderSeq.isEmpty)
outOfOrderTimestamp.put(file.getAbsolutePath, outOfOrderSeq)
outOfOrderSeq += ((indexTimestamp, prevIndexTimestamp))
}
def recordShallowOffsetNotFound(file: File, indexOffset: Long, logOffset: Long) {
var shallowOffsetNotFoundSeq = shallowOffsetNotFound.getOrElse(file.getAbsolutePath, new ArrayBuffer[(Long, Long)]())
val shallowOffsetNotFoundSeq = shallowOffsetNotFound.getOrElse(file.getAbsolutePath, new ArrayBuffer[(Long, Long)]())
if (shallowOffsetNotFoundSeq.isEmpty)
shallowOffsetNotFound.put(file.getAbsolutePath, shallowOffsetNotFoundSeq)
shallowOffsetNotFoundSeq += ((indexOffset, logOffset))

View File

@ -17,7 +17,7 @@
package kafka.tools
import java.io.{FileOutputStream, FileWriter, OutputStreamWriter}
import java.io.{FileOutputStream, OutputStreamWriter}
import java.nio.charset.StandardCharsets
import joptsimple._

View File

@ -71,8 +71,8 @@ object GetOffsetShell {
ToolsUtils.validatePortOrDie(parser, brokerList)
val metadataTargetBrokers = ClientUtils.parseBrokerList(brokerList)
val topic = options.valueOf(topicOpt)
var partitionList = options.valueOf(partitionOpt)
var time = options.valueOf(timeOpt).longValue
val partitionList = options.valueOf(partitionOpt)
val time = options.valueOf(timeOpt).longValue
val nOffsets = options.valueOf(nOffsetsOpt).intValue
val maxWaitMs = options.valueOf(maxWaitMsOpt).intValue()

View File

@ -22,7 +22,6 @@ import java.nio.charset.StandardCharsets
import joptsimple._
import kafka.utils.{CommandLineUtils, Exit, Logging, ZkUtils}
import org.I0Itec.zkclient.ZkClient
import org.apache.kafka.common.security.JaasUtils

View File

@ -28,7 +28,7 @@ import joptsimple.OptionParser
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.math._
import kafka.utils.{CommandLineUtils, Exit, Logging}
import kafka.utils.{CommandLineUtils , Exit, Logging}
/**
@ -177,14 +177,14 @@ object JmxTool extends Logging {
}
def queryAttributes(mbsc: MBeanServerConnection, names: Iterable[ObjectName], attributesWhitelist: Option[Array[String]]) = {
var attributes = new mutable.HashMap[String, Any]()
for(name <- names) {
val attributes = new mutable.HashMap[String, Any]()
for (name <- names) {
val mbean = mbsc.getMBeanInfo(name)
for(attrObj <- mbsc.getAttributes(name, mbean.getAttributes.map(_.getName)).asScala) {
for (attrObj <- mbsc.getAttributes(name, mbean.getAttributes.map(_.getName)).asScala) {
val attr = attrObj.asInstanceOf[Attribute]
attributesWhitelist match {
case Some(allowedAttributes) =>
if(allowedAttributes.contains(attr.getName))
if (allowedAttributes.contains(attr.getName))
attributes(name + ":" + attr.getName) = attr.getValue
case None => attributes(name + ":" + attr.getName) = attr.getValue
}

View File

@ -20,6 +20,7 @@ package kafka.tools
import kafka.metrics.KafkaMetricsReporter
import kafka.producer.{NewShinyProducer, OldProducer}
import kafka.utils.{CommandLineUtils, Exit, Logging, ToolsUtils, VerifiableProperties}
import kafka.utils.Implicits._
import kafka.message.CompressionCodec
import kafka.serializer._
import java.util.concurrent.{CountDownLatch, Executors}
@ -205,7 +206,7 @@ object ProducerPerformance extends Logging {
val producer =
if (config.useNewProducer) {
import org.apache.kafka.clients.producer.ProducerConfig
props.putAll(config.producerProps)
props ++= config.producerProps
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.brokerList)
props.put(ProducerConfig.SEND_BUFFER_CONFIG, (64 * 1024).toString)
props.put(ProducerConfig.CLIENT_ID_CONFIG, "producer-performance")
@ -217,7 +218,7 @@ object ProducerPerformance extends Logging {
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
new NewShinyProducer(props)
} else {
props.putAll(config.producerProps)
props ++= config.producerProps
props.put("metadata.broker.list", config.brokerList)
props.put("compression.codec", config.compressionCodec.codec.toString)
props.put("send.buffer.bytes", (64 * 1024).toString)

View File

@ -18,7 +18,7 @@
package kafka.tools
import joptsimple.OptionParser
import java.util.concurrent.{Executors, CountDownLatch}
import java.util.concurrent.CountDownLatch
import java.util.Properties
import kafka.consumer._
import kafka.utils.{ToolsUtils, CommandLineUtils, Logging, ZkUtils}

View File

@ -24,7 +24,6 @@ import java.util.concurrent.locks.{Lock, ReadWriteLock}
import java.lang.management._
import java.util.{Properties, UUID}
import javax.management._
import javax.xml.bind.DatatypeConverter
import org.apache.kafka.common.protocol.SecurityProtocol
@ -32,7 +31,7 @@ import scala.collection._
import scala.collection.mutable
import kafka.cluster.EndPoint
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.utils.{KafkaThread, Utils}
import org.apache.kafka.common.utils.{Base64, KafkaThread, Utils}
/**
* General helper functions!
@ -279,7 +278,7 @@ object CoreUtils extends Logging {
def generateUuidAsBase64(): String = {
val uuid = UUID.randomUUID()
urlSafeBase64EncodeNoPadding(getBytesFromUuid(uuid))
Base64.urlEncoderNoPadding.encodeToString(getBytesFromUuid(uuid))
}
def getBytesFromUuid(uuid: UUID): Array[Byte] = {
@ -290,14 +289,6 @@ object CoreUtils extends Logging {
uuidBytes.array
}
def urlSafeBase64EncodeNoPadding(data: Array[Byte]): String = {
val base64EncodedUUID = DatatypeConverter.printBase64Binary(data)
//Convert to URL safe variant by replacing + and / with - and _ respectively.
val urlSafeBase64EncodedUUID = base64EncodedUUID.replace("+", "-").replace("/", "_")
// Remove the "==" padding at the end.
urlSafeBase64EncodedUUID.substring(0, urlSafeBase64EncodedUUID.length - 2)
}
def propsWith(key: String, value: String): Properties = {
propsWith((key, value))
}

View File

@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.utils
import java.util
import java.util.Properties
import scala.collection.JavaConverters._
/**
* In order to have these implicits in scope, add the following import:
*
* `import kafka.utils.Implicits._`
*/
object Implicits {
/**
* The java.util.Properties.putAll override introduced in Java 9 is seen as an overload by the
* Scala compiler causing ambiguity errors in some cases. The `++=` methods introduced via
* implicits provide a concise alternative.
*
* See https://github.com/scala/bug/issues/10418 for more details.
*/
implicit class PropertiesOps(properties: Properties) {
def ++=(props: Properties): Unit =
(properties: util.Hashtable[AnyRef, AnyRef]).putAll(props)
def ++=(map: collection.Map[String, AnyRef]): Unit =
(properties: util.Hashtable[AnyRef, AnyRef]).putAll(map.asJava)
}
}

View File

@ -18,7 +18,6 @@
package kafka.utils
import kafka.api.LeaderAndIsr
import kafka.common.TopicAndPartition
import kafka.controller.{IsrChangeNotificationListener, LeaderIsrAndControllerEpoch}
import kafka.utils.ZkUtils._
import org.apache.kafka.common.TopicPartition

View File

@ -16,13 +16,9 @@
*/
package kafka.utils
import java.util
import java.util.Comparator
import joptsimple.OptionParser
import org.apache.kafka.common.{Metric, MetricName}
import scala.collection.immutable.ListMap
import scala.collection.mutable
object ToolsUtils {
@ -32,9 +28,8 @@ object ToolsUtils {
hostPort.split(",")
else
Array(hostPort)
val validHostPort = hostPorts.filter {
hostPortData =>
org.apache.kafka.common.utils.Utils.getPort(hostPortData) != null
val validHostPort = hostPorts.filter { hostPortData =>
org.apache.kafka.common.utils.Utils.getPort(hostPortData) != null
}
val isValid = !validHostPort.isEmpty && validHostPort.size == hostPorts.length
if(!isValid)

View File

@ -27,12 +27,12 @@ import kafka.log.LogConfig
import kafka.server.{Defaults, KafkaConfig, KafkaServer}
import org.apache.kafka.clients.admin._
import kafka.utils.{Logging, TestUtils, ZkUtils}
import kafka.utils.Implicits._
import org.apache.kafka.clients.admin.NewTopic
import org.apache.kafka.common.KafkaFuture
import org.apache.kafka.common.acl.{AccessControlEntry, AclBinding, AclBindingFilter, AclOperation, AclPermissionType}
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.errors.{InvalidRequestException, SecurityDisabledException, TimeoutException, TopicExistsException, UnknownTopicOrPartitionException}
import org.apache.kafka.common.protocol.ApiKeys
import org.junit.{After, Before, Rule, Test}
import org.apache.kafka.common.requests.MetadataResponse
import org.apache.kafka.common.resource.{Resource, ResourceType}
@ -235,8 +235,8 @@ class AdminClientIntegrationTest extends KafkaServerTestHarness with Logging {
val brokerResource1 = new ConfigResource(ConfigResource.Type.BROKER, servers(1).config.brokerId.toString)
val brokerResource2 = new ConfigResource(ConfigResource.Type.BROKER, servers(2).config.brokerId.toString)
val configResources = Seq(topicResource1, topicResource2, brokerResource1, brokerResource2)
var describeResult = client.describeConfigs(configResources.asJava)
var configs = describeResult.all.get
val describeResult = client.describeConfigs(configResources.asJava)
val configs = describeResult.all.get
assertEquals(4, configs.size)
@ -378,7 +378,7 @@ class AdminClientIntegrationTest extends KafkaServerTestHarness with Logging {
if (!config.containsKey(KafkaConfig.SslTruststorePasswordProp))
config.setProperty(KafkaConfig.SslTruststorePasswordProp, "some.invalid.pass")
}
cfgs.foreach(_.putAll(serverConfig))
cfgs.foreach(_ ++= serverConfig)
cfgs.map(KafkaConfig.fromProps)
}

View File

@ -19,7 +19,7 @@ import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
import org.apache.kafka.common.record.TimestampType
import org.apache.kafka.common.serialization.ByteArrayDeserializer
import org.apache.kafka.common.{PartitionInfo, TopicPartition}
import kafka.utils.{Logging, ShutdownableThread, TestUtils}
import kafka.utils.{ShutdownableThread, TestUtils}
import kafka.server.KafkaConfig
import org.junit.Assert._
import org.junit.{Before, Test}

View File

@ -25,6 +25,7 @@ import kafka.common.TopicAndPartition
import kafka.integration.KafkaServerTestHarness
import kafka.server._
import kafka.utils._
import kafka.utils.Implicits._
import org.apache.kafka.clients.consumer._
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.{ClusterResource, ClusterResourceListener, TopicPartition}
@ -105,7 +106,7 @@ class EndToEndClusterIdTest extends KafkaServerTestHarness {
override def generateConfigs = {
val cfgs = TestUtils.createBrokerConfigs(serverCount, zkConnect, interBrokerSecurityProtocol = Some(securityProtocol),
trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties)
cfgs.foreach(_.putAll(serverConfig))
cfgs.foreach(_ ++= serverConfig)
cfgs.map(KafkaConfig.fromProps)
}

View File

@ -20,6 +20,7 @@ package kafka.api
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
import kafka.utils.TestUtils
import kafka.utils.Implicits._
import java.util.Properties
import org.apache.kafka.clients.producer.KafkaProducer
@ -54,7 +55,7 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness {
config.setProperty(KafkaConfig.InterBrokerListenerNameProp, listenerName.value)
config.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp, s"${listenerName.value}:${securityProtocol.name}")
}
cfgs.foreach(_.putAll(serverConfig))
cfgs.foreach(_ ++= serverConfig)
cfgs.map(KafkaConfig.fromProps)
}
@ -65,10 +66,10 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness {
super.setUp()
producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[org.apache.kafka.common.serialization.ByteArraySerializer])
producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[org.apache.kafka.common.serialization.ByteArraySerializer])
producerConfig.putAll(producerSecurityProps)
producerConfig ++= producerSecurityProps
consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[org.apache.kafka.common.serialization.ByteArrayDeserializer])
consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[org.apache.kafka.common.serialization.ByteArrayDeserializer])
consumerConfig.putAll(consumerSecurityProps)
consumerConfig ++= consumerSecurityProps
for (_ <- 0 until producerCount)
producers += createNewProducer
for (_ <- 0 until consumerCount) {

View File

@ -20,6 +20,7 @@ import kafka.consumer.SimpleConsumer
import kafka.integration.KafkaServerTestHarness
import kafka.server.KafkaConfig
import kafka.utils.{ShutdownableThread, TestUtils}
import kafka.utils.Implicits._
import org.apache.kafka.clients.producer._
import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
import org.junit.Assert._
@ -123,7 +124,7 @@ class ProducerBounceTest extends KafkaServerTestHarness {
producerConfig.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true")
producerConfig.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1")
val producerConfigWithCompression = new Properties()
producerConfigWithCompression.putAll(producerConfig)
producerConfigWithCompression ++= producerConfig
producerConfigWithCompression.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4")
val producers = List(
TestUtils.createNewProducer(brokerList, bufferSize = producerBufferSize / 4, retries = 10, props = Some(producerConfig)),

View File

@ -19,6 +19,7 @@ package kafka.api
import java.util.Properties
import kafka.utils.TestUtils
import kafka.utils.Implicits._
import org.apache.kafka.common.protocol.SecurityProtocol
import org.apache.kafka.common.config.SaslConfigs
import org.apache.kafka.common.errors.GroupAuthorizationException
@ -58,7 +59,7 @@ abstract class SaslEndToEndAuthorizationTest extends EndToEndAuthorizationTest {
val consumer1 = consumers.head
val consumer2Config = new Properties
consumer2Config.putAll(consumerConfig)
consumer2Config ++= consumerConfig
// consumer2 retrieves its credentials from the static JAAS configuration, so we test also this path
consumer2Config.remove(SaslConfigs.SASL_JAAS_CONFIG)

View File

@ -18,7 +18,7 @@ import kafka.security.auth.{All, Allow, Alter, AlterConfigs, Authorizer, Cluster
import org.apache.kafka.common.protocol.SecurityProtocol
import kafka.server.KafkaConfig
import kafka.utils.{CoreUtils, JaasTestUtils, TestUtils}
import org.apache.kafka.clients.admin.{AdminClient, CreateAclsOptions, DeleteAclsOptions, KafkaAdminClient}
import org.apache.kafka.clients.admin.{AdminClient, CreateAclsOptions, DeleteAclsOptions}
import org.apache.kafka.common.acl.{AccessControlEntry, AccessControlEntryFilter, AclBinding, AclBindingFilter, AclOperation, AclPermissionType}
import org.apache.kafka.common.errors.{ClusterAuthorizationException, InvalidRequestException}
import org.apache.kafka.common.resource.{Resource, ResourceFilter, ResourceType}

View File

@ -26,6 +26,7 @@ import kafka.api.SaslSetup
import kafka.coordinator.group.OffsetConfig
import kafka.utils.JaasTestUtils.JaasSection
import kafka.utils.TestUtils
import kafka.utils.Implicits._
import kafka.zk.ZooKeeperTestHarness
import org.apache.kafka.clients.consumer.{ConsumerRecord, KafkaConsumer}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
@ -84,7 +85,7 @@ abstract class MultipleListenersWithSameSecurityProtocolBaseTest extends ZooKeep
props.put(KafkaConfig.SaslEnabledMechanismsProp, kafkaServerSaslMechanisms.mkString(","))
props.put(KafkaConfig.SaslKerberosServiceNameProp, "kafka")
props.putAll(TestUtils.sslConfigs(Mode.SERVER, false, Some(trustStoreFile), s"server$brokerId"))
props ++= TestUtils.sslConfigs(Mode.SERVER, false, Some(trustStoreFile), s"server$brokerId")
// set listener-specific configs and set an invalid path for the global config to verify that the overrides work
Seq(SecureInternal, SecureExternal).foreach { listenerName =>

View File

@ -169,13 +169,12 @@ object ReplicationQuotasTestRig {
def logOutput(config: ExperimentDef, replicas: Map[Int, Seq[Int]], newAssignment: Map[TopicAndPartition, Seq[Int]]): Unit = {
val actual = zkUtils.getPartitionAssignmentForTopics(Seq(topicName))(topicName)
val existing = zkUtils.getReplicaAssignmentForTopics(newAssignment.map(_._1.topic).toSeq)
//Long stats
println("The replicas are " + replicas.toSeq.sortBy(_._1).map("\n" + _))
println("This is the current replica assignment:\n" + actual.toSeq)
println("proposed assignment is: \n" + newAssignment)
println("This is the assigment we eneded up with" + actual)
println("This is the assignment we ended up with" + actual)
//Test Stats
println(s"numBrokers: ${config.brokers}")

View File

@ -184,7 +184,7 @@ object TestLinearWriteSpeed {
def write(): Int = {
buffer.put(content)
content.rewind()
content.limit
content.limit()
}
def close() {
raf.close()
@ -198,7 +198,7 @@ object TestLinearWriteSpeed {
def write(): Int = {
channel.write(content)
content.rewind()
content.limit
content.limit()
}
def close() {
raf.close()

View File

@ -24,7 +24,7 @@ import kafka.server.{ConfigEntityName, QuotaId}
import kafka.utils.{Logging, ZkUtils}
import kafka.zk.ZooKeeperTestHarness
import org.apache.kafka.common.security.scram.{ScramCredential, ScramCredentialUtils, ScramMechanism}
import org.apache.kafka.common.security.scram.ScramCredentialUtils
import org.easymock.EasyMock
import org.junit.Assert._
import org.junit.Test

View File

@ -17,7 +17,7 @@
package kafka.consumer
import java.util.{Collections, Properties}
import java.util.Properties
import org.junit.Assert._
import kafka.common.MessageStreamsExistException
@ -28,7 +28,6 @@ import kafka.serializer._
import kafka.server._
import kafka.utils.TestUtils._
import kafka.utils._
import org.I0Itec.zkclient.ZkClient
import org.apache.log4j.{Level, Logger}
import org.junit.{Test, After, Before}

View File

@ -422,12 +422,12 @@ class TransactionMarkerChannelManagerTest {
@Test
def shouldCreateMetricsOnStarting(): Unit = {
val metrics = Metrics.defaultRegistry.allMetrics
val metrics = Metrics.defaultRegistry.allMetrics.asScala
assertEquals(1, Metrics.defaultRegistry.allMetrics.asScala
assertEquals(1, metrics
.filterKeys(_.getMBeanName == "kafka.coordinator.transaction:type=TransactionMarkerChannelManager,name=UnknownDestinationQueueSize")
.size)
assertEquals(1, Metrics.defaultRegistry.allMetrics.asScala
assertEquals(1, metrics
.filterKeys(_.getMBeanName == "kafka.coordinator.transaction:type=TransactionMarkerChannelManager,name=LogAppendRetryQueueSize")
.size)
}

View File

@ -22,6 +22,7 @@ import java.util.Properties
import kafka.server.{BrokerTopicStats, LogDirFailureChannel}
import kafka.utils.{MockTime, Pool, TestUtils}
import kafka.utils.Implicits._
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.utils.Utils
import org.junit.After
@ -66,7 +67,7 @@ abstract class AbstractLogCleanerIntegrationTest {
props.put(LogConfig.MinCleanableDirtyRatioProp, minCleanableDirtyRatio: java.lang.Float)
props.put(LogConfig.MessageTimestampDifferenceMaxMsProp, Long.MaxValue.toString)
props.put(LogConfig.MinCompactionLagMsProp, compactionLag: java.lang.Long)
props.putAll(propertyOverrides)
props ++= propertyOverrides
props
}

View File

@ -104,7 +104,7 @@ class ByteBufferMessageSetTest extends BaseMessageSetTestCases {
val emptyMessageList : List[Message] = Nil
val emptyMessageSet = new ByteBufferMessageSet(NoCompressionCodec, emptyMessageList: _*)
val regularMessgeSet = new ByteBufferMessageSet(NoCompressionCodec, messageList: _*)
val buffer = ByteBuffer.allocate(emptyMessageSet.buffer.limit + regularMessgeSet.buffer.limit)
val buffer = ByteBuffer.allocate(emptyMessageSet.buffer.limit() + regularMessgeSet.buffer.limit())
buffer.put(emptyMessageSet.buffer)
buffer.put(regularMessgeSet.buffer)
buffer.rewind
@ -122,7 +122,7 @@ class ByteBufferMessageSetTest extends BaseMessageSetTestCases {
val emptyMessageList : List[Message] = Nil
val emptyMessageSet = new ByteBufferMessageSet(DefaultCompressionCodec, emptyMessageList: _*)
val regularMessgeSet = new ByteBufferMessageSet(DefaultCompressionCodec, messageList: _*)
val buffer = ByteBuffer.allocate(emptyMessageSet.buffer.limit + regularMessgeSet.buffer.limit)
val buffer = ByteBuffer.allocate(emptyMessageSet.buffer.limit() + regularMessgeSet.buffer.limit())
buffer.put(emptyMessageSet.buffer)
buffer.put(regularMessgeSet.buffer)
buffer.rewind

View File

@ -38,7 +38,8 @@ import kafka.security.auth.{Acl, Authorizer, Resource}
import kafka.serializer.{DefaultEncoder, Encoder, StringEncoder}
import kafka.server._
import kafka.server.checkpoints.OffsetCheckpointFile
import kafka.utils.ZkUtils._
import ZkUtils._
import Implicits._
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.consumer.{ConsumerRecord, KafkaConsumer, OffsetAndMetadata, RangeAssignor}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
@ -252,10 +253,10 @@ object TestUtils extends Logging {
rack.foreach(props.put(KafkaConfig.RackProp, _))
if (protocolAndPorts.exists { case (protocol, _) => usesSslTransportLayer(protocol) })
props.putAll(sslConfigs(Mode.SERVER, false, trustStoreFile, s"server$nodeId"))
props ++= sslConfigs(Mode.SERVER, false, trustStoreFile, s"server$nodeId")
if (protocolAndPorts.exists { case (protocol, _) => usesSaslAuthentication(protocol) })
props.putAll(JaasTestUtils.saslConfigs(saslProperties))
props ++= JaasTestUtils.saslConfigs(saslProperties)
interBrokerSecurityProtocol.foreach { protocol =>
props.put(KafkaConfig.InterBrokerSecurityProtocolProp, protocol.name)
@ -388,9 +389,9 @@ object TestUtils extends Logging {
* Check that the buffer content from buffer.position() to buffer.limit() is equal
*/
def checkEquals(b1: ByteBuffer, b2: ByteBuffer) {
assertEquals("Buffers should have equal length", b1.limit - b1.position, b2.limit - b2.position)
for(i <- 0 until b1.limit - b1.position)
assertEquals("byte " + i + " byte not equal.", b1.get(b1.position + i), b2.get(b1.position + i))
assertEquals("Buffers should have equal length", b1.limit() - b1.position(), b2.limit() - b2.position())
for(i <- 0 until b1.limit() - b1.position())
assertEquals("byte " + i + " byte not equal.", b1.get(b1.position() + i), b2.get(b1.position() + i))
}
/**
@ -484,8 +485,8 @@ object TestUtils extends Logging {
*/
def hexString(buffer: ByteBuffer): String = {
val builder = new StringBuilder("0x")
for(i <- 0 until buffer.limit)
builder.append(String.format("%x", Integer.valueOf(buffer.get(buffer.position + i))))
for(i <- 0 until buffer.limit())
builder.append(String.format("%x", Integer.valueOf(buffer.get(buffer.position() + i))))
builder.toString
}
@ -503,7 +504,7 @@ object TestUtils extends Logging {
//override any explicitly specified properties
if (producerProps != null)
props.putAll(producerProps)
props ++= producerProps
props.put("serializer.class", encoder)
props.put("key.serializer.class", keyEncoder)
@ -518,10 +519,10 @@ object TestUtils extends Logging {
saslProperties: Option[Properties]): Properties = {
val props = new Properties
if (usesSslTransportLayer(securityProtocol))
props.putAll(sslConfigs(mode, securityProtocol == SecurityProtocol.SSL, trustStoreFile, certAlias))
props ++= sslConfigs(mode, securityProtocol == SecurityProtocol.SSL, trustStoreFile, certAlias)
if (usesSaslAuthentication(securityProtocol))
props.putAll(JaasTestUtils.saslConfigs(saslProperties))
props ++= JaasTestUtils.saslConfigs(saslProperties)
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol.name)
props
}
@ -572,7 +573,7 @@ object TestUtils extends Logging {
* SSL client auth fails.
*/
if (!producerProps.containsKey(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG))
producerProps.putAll(producerSecurityConfigs(securityProtocol, trustStoreFile, saslProperties))
producerProps ++= producerSecurityConfigs(securityProtocol, trustStoreFile, saslProperties)
new KafkaProducer[K, V](producerProps, keySerializer, valueSerializer)
}
@ -633,7 +634,7 @@ object TestUtils extends Logging {
* SSL client auth fails.
*/
if(!consumerProps.containsKey(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG))
consumerProps.putAll(consumerSecurityConfigs(securityProtocol, trustStoreFile, saslProperties))
consumerProps ++= consumerSecurityConfigs(securityProtocol, trustStoreFile, saslProperties)
new KafkaConsumer[Array[Byte],Array[Byte]](consumerProps)
}
@ -1208,7 +1209,7 @@ object TestUtils extends Logging {
def copyOf(props: Properties): Properties = {
val copy = new Properties()
copy.putAll(props)
copy ++= props
copy
}

View File

@ -28,7 +28,7 @@ import org.junit.Assert._
import kafka.common.KafkaException
import kafka.utils.CoreUtils.inLock
import org.junit.Test
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.common.utils.{Base64, Utils}
class UtilsTest extends JUnitSuite {
@ -157,13 +157,15 @@ class UtilsTest extends JUnitSuite {
def testUrlSafeBase64EncodeUUID() {
// Test a UUID that has no + or / characters in base64 encoding [a149b4a3-06e1-4b49-a8cb-8a9c4a59fa46 ->(base64)-> oUm0owbhS0moy4qcSln6Rg==]
val clusterId1 = CoreUtils.urlSafeBase64EncodeNoPadding(CoreUtils.getBytesFromUuid(UUID.fromString("a149b4a3-06e1-4b49-a8cb-8a9c4a59fa46")))
val clusterId1 = Base64.urlEncoderNoPadding.encodeToString(CoreUtils.getBytesFromUuid(UUID.fromString(
"a149b4a3-06e1-4b49-a8cb-8a9c4a59fa46")))
assertEquals(clusterId1, "oUm0owbhS0moy4qcSln6Rg")
assertEquals(clusterId1.length, 22)
assertTrue(clusterIdPattern.matcher(clusterId1).matches())
// Test a UUID that has + or / characters in base64 encoding [d418ec02-277e-4853-81e6-afe30259daec ->(base64)-> 1BjsAid+SFOB5q/jAlna7A==]
val clusterId2 = CoreUtils.urlSafeBase64EncodeNoPadding(CoreUtils.getBytesFromUuid(UUID.fromString("d418ec02-277e-4853-81e6-afe30259daec")))
val clusterId2 = Base64.urlEncoderNoPadding.encodeToString(CoreUtils.getBytesFromUuid(UUID.fromString(
"d418ec02-277e-4853-81e6-afe30259daec")))
assertEquals(clusterId2, "1BjsAid-SFOB5q_jAlna7A")
assertEquals(clusterId2.length, 22)
assertTrue(clusterIdPattern.matcher(clusterId2).matches())