From 9a0ea25fee85837748145d37c69cf4d9bb7f9933 Mon Sep 17 00:00:00 2001 From: Mickael Maison Date: Mon, 29 Oct 2018 13:37:23 +0000 Subject: [PATCH] MINOR: Use string/log interpolation instead of string concat in core and clients (#5850) Also removed a few unused imports and tweaked the log message slightly. Reviewers: Ismael Juma --- .../kafka/clients/CommonClientConfigs.java | 4 +-- .../clients/consumer/StickyAssignor.java | 17 +++++----- .../internals/TransactionManager.java | 2 +- .../kafka/common/record/MultiRecordsSend.java | 2 +- .../kafka/common/utils/AppInfoParser.java | 6 ++-- .../main/scala/kafka/admin/AdminUtils.scala | 4 +-- .../group/GroupMetadataManager.scala | 4 +-- .../transaction/TransactionStateManager.scala | 1 - .../main/scala/kafka/log/AbstractIndex.scala | 2 +- .../src/main/scala/kafka/log/LogCleaner.scala | 6 ++-- .../scala/kafka/log/LogCleanerManager.scala | 2 -- .../src/main/scala/kafka/log/LogManager.scala | 34 +++++++++---------- .../main/scala/kafka/log/OffsetIndex.scala | 1 - .../scala/kafka/network/SocketServer.scala | 8 ++--- .../kafka/server/DelegationTokenManager.scala | 18 +++++----- .../scala/kafka/server/ThrottledChannel.scala | 2 +- .../main/scala/kafka/tools/MirrorMaker.scala | 4 +-- .../kafka/tools/ReplicaVerificationTool.scala | 6 ++-- .../main/scala/kafka/utils/CoreUtils.scala | 2 +- .../src/main/scala/kafka/utils/FileLock.scala | 6 ++-- .../scala/kafka/utils/KafkaScheduler.scala | 2 +- core/src/main/scala/kafka/utils/ZkUtils.scala | 6 ++-- .../kafka/api/AuthorizerIntegrationTest.scala | 2 +- .../kafka/api/EndToEndAuthorizationTest.scala | 2 +- .../kafka/server/DelayedFetchTest.scala | 2 +- .../scala/other/kafka/StressTestLog.scala | 1 - .../TransactionStateManagerTest.scala | 1 - .../unit/kafka/server/KafkaApisTest.scala | 1 - .../unit/kafka/server/KafkaConfigTest.scala | 1 - .../server/ReplicaManagerQuotasTest.scala | 1 - .../unit/kafka/server/SimpleFetchTest.scala | 1 - .../epoch/LeaderEpochFileCacheTest.scala | 1 - 32 files changed, 71 insertions(+), 81 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java index c8e2357b0a6..491b5de886f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java +++ b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java @@ -113,8 +113,8 @@ public class CommonClientConfigs { HashMap rval = new HashMap<>(); if ((!config.originals().containsKey(RECONNECT_BACKOFF_MAX_MS_CONFIG)) && config.originals().containsKey(RECONNECT_BACKOFF_MS_CONFIG)) { - log.debug("Disabling exponential reconnect backoff because " + RECONNECT_BACKOFF_MS_CONFIG + - " is set, but " + RECONNECT_BACKOFF_MAX_MS_CONFIG + " is not."); + log.debug("Disabling exponential reconnect backoff because {} is set, but {} is not.", + RECONNECT_BACKOFF_MS_CONFIG, RECONNECT_BACKOFF_MAX_MS_CONFIG); rval.put(RECONNECT_BACKOFF_MAX_MS_CONFIG, parsedValues.get(RECONNECT_BACKOFF_MS_CONFIG)); } return rval; diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java index 0d74eed7835..4be34c2fdb1 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java @@ -334,7 +334,7 @@ public class StickyAssignor extends AbstractPartitionAssignor { List topicPartitions = entry.getValue(); for (TopicPartition topicPartition: topicPartitions) { if (allPartitions.containsKey(topicPartition)) - log.error(topicPartition + " is assigned to more than one consumer."); + log.error("{} is assigned to more than one consumer.", topicPartition); allPartitions.put(topicPartition, entry.getKey()); } } @@ -356,7 +356,8 @@ public class StickyAssignor extends AbstractPartitionAssignor { String otherConsumer = allPartitions.get(topicPartition); int otherConsumerPartitionCount = currentAssignment.get(otherConsumer).size(); if (consumerPartitionCount < otherConsumerPartitionCount) { - log.debug(topicPartition + " can be moved from consumer " + otherConsumer + " to consumer " + consumer + " for a more balanced assignment."); + log.debug("{} can be moved from consumer {} to consumer {} for a more balanced assignment.", + topicPartition, otherConsumer, consumer); return false; } } @@ -499,7 +500,7 @@ public class StickyAssignor extends AbstractPartitionAssignor { int currentAssignmentSize = currentPartitions.size(); int maxAssignmentSize = consumer2AllPotentialPartitions.get(consumer).size(); if (currentAssignmentSize > maxAssignmentSize) - log.error("The consumer " + consumer + " is assigned more partitions than the maximum possible."); + log.error("The consumer {} is assigned more partitions than the maximum possible.", consumer); if (currentAssignmentSize < maxAssignmentSize) // if a consumer is not assigned all its potential partitions it is subject to reassignment @@ -598,12 +599,12 @@ public class StickyAssignor extends AbstractPartitionAssignor { // the partition must have at least two consumers if (partition2AllPotentialConsumers.get(partition).size() <= 1) - log.error("Expected more than one potential consumer for partition '" + partition + "'"); + log.error("Expected more than one potential consumer for partition '{}'", partition); // the partition must have a current consumer String consumer = currentPartitionConsumer.get(partition); if (consumer == null) - log.error("Expected partition '" + partition + "' to be assigned to a consumer"); + log.error("Expected partition '{}' to be assigned to a consumer", partition); // check if a better-suited consumer exist for the partition; if so, reassign it for (String otherConsumer: partition2AllPotentialConsumers.get(partition)) { @@ -879,7 +880,7 @@ public class StickyAssignor extends AbstractPartitionAssignor { List path = new ArrayList<>(Collections.singleton(pair.srcMemberId)); if (isLinked(pair.dstMemberId, pair.srcMemberId, reducedPairs, path) && !in(path, cycles)) { cycles.add(new ArrayList<>(path)); - log.error("A cycle of length " + (path.size() - 1) + " was found: " + path.toString()); + log.error("A cycle of length {} was found: {}", path.size() - 1, path.toString()); } } @@ -896,9 +897,9 @@ public class StickyAssignor extends AbstractPartitionAssignor { for (Map.Entry>> topicMovements: this.partitionMovementsByTopic.entrySet()) { Set topicMovementPairs = topicMovements.getValue().keySet(); if (hasCycles(topicMovementPairs)) { - log.error("Stickiness is violated for topic " + topicMovements.getKey() + log.error("Stickiness is violated for topic {}" + "\nPartition movements for this topic occurred among the following consumer pairs:" - + "\n" + topicMovements.getValue().toString()); + + "\n{}", topicMovements.getKey(), topicMovements.getValue().toString()); return false; } } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java index 2cbd1e97d1a..620991b491f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java @@ -196,7 +196,7 @@ public class TransactionManager { } TransactionManager() { - this(new LogContext(), null, 0, 100); + this(new LogContext(), null, 0, 100L); } public synchronized TransactionalRequestResult initializeTransactions() { diff --git a/clients/src/main/java/org/apache/kafka/common/record/MultiRecordsSend.java b/clients/src/main/java/org/apache/kafka/common/record/MultiRecordsSend.java index 2bc8d1c5519..2e78a17451d 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MultiRecordsSend.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MultiRecordsSend.java @@ -102,7 +102,7 @@ public class MultiRecordsSend implements Send { totalWritten += totalWrittenPerCall; if (completed() && totalWritten != size) - log.error("mismatch in sending bytes over socket; expected: " + size + " actual: " + totalWritten); + log.error("mismatch in sending bytes over socket; expected: {} actual: {}", size, totalWritten); log.trace("Bytes written as part of multi-send call: {}, total bytes written so far: {}, expected bytes to write: {}", totalWrittenPerCall, totalWritten, size); diff --git a/clients/src/main/java/org/apache/kafka/common/utils/AppInfoParser.java b/clients/src/main/java/org/apache/kafka/common/utils/AppInfoParser.java index 17c4ba130cf..8a12fbc94af 100644 --- a/clients/src/main/java/org/apache/kafka/common/utils/AppInfoParser.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/AppInfoParser.java @@ -41,7 +41,7 @@ public class AppInfoParser { try (InputStream resourceStream = AppInfoParser.class.getResourceAsStream("/kafka/kafka-version.properties")) { props.load(resourceStream); } catch (Exception e) { - log.warn("Error while loading kafka-version.properties :" + e.getMessage()); + log.warn("Error while loading kafka-version.properties: {}", e.getMessage()); } VERSION = props.getProperty("version", "unknown").trim(); COMMIT_ID = props.getProperty("commitId", "unknown").trim(); @@ -106,8 +106,8 @@ public class AppInfoParser { public static class AppInfo implements AppInfoMBean { public AppInfo() { - log.info("Kafka version : " + AppInfoParser.getVersion()); - log.info("Kafka commitId : " + AppInfoParser.getCommitId()); + log.info("Kafka version: {}", AppInfoParser.getVersion()); + log.info("Kafka commitId: {}", AppInfoParser.getCommitId()); } @Override diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala index 4dae6eb25e4..e62e5a80596 100644 --- a/core/src/main/scala/kafka/admin/AdminUtils.scala +++ b/core/src/main/scala/kafka/admin/AdminUtils.scala @@ -473,10 +473,10 @@ object AdminUtils extends Logging with AdminUtilities { val jsonPartitionData = zkUtils.replicaAssignmentZkData(replicaAssignment.map(e => e._1.toString -> e._2)) if (!update) { - info("Topic creation " + jsonPartitionData.toString) + info(s"Topic creation $jsonPartitionData") zkUtils.createPersistentPath(zkPath, jsonPartitionData) } else { - info("Topic update " + jsonPartitionData.toString) + info(s"Topic update $jsonPartitionData") zkUtils.updatePersistentPath(zkPath, jsonPartitionData) } debug("Updated path %s with %s for replica assignment".format(zkPath, jsonPartitionData)) diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala index 626aaad5bc8..260b8024eba 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala @@ -29,7 +29,7 @@ import com.yammer.metrics.core.Gauge import kafka.api.{ApiVersion, KAFKA_0_10_1_IV0, KAFKA_2_1_IV0, KAFKA_2_1_IV1} import kafka.common.{MessageFormatter, OffsetAndMetadata} import kafka.metrics.KafkaMetricsGroup -import kafka.server.{FetchHighWatermark, FetchLogEnd, ReplicaManager} +import kafka.server.ReplicaManager import kafka.utils.CoreUtils.inLock import kafka.utils._ import kafka.zk.KafkaZkClient @@ -40,7 +40,7 @@ import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.protocol.types.Type._ import org.apache.kafka.common.protocol.types._ import org.apache.kafka.common.record._ -import org.apache.kafka.common.requests.{IsolationLevel, OffsetCommitRequest, OffsetFetchResponse} +import org.apache.kafka.common.requests.{OffsetCommitRequest, OffsetFetchResponse} import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.common.utils.{Time, Utils} diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala index 589407c2a2c..87e6d1310e2 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala @@ -33,7 +33,6 @@ import org.apache.kafka.common.{KafkaException, TopicPartition} import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.record.{FileRecords, MemoryRecords, SimpleRecord} -import org.apache.kafka.common.requests.IsolationLevel import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.common.requests.TransactionResult import org.apache.kafka.common.utils.{Time, Utils} diff --git a/core/src/main/scala/kafka/log/AbstractIndex.scala b/core/src/main/scala/kafka/log/AbstractIndex.scala index bf5cc252f15..b75ab8a7546 100644 --- a/core/src/main/scala/kafka/log/AbstractIndex.scala +++ b/core/src/main/scala/kafka/log/AbstractIndex.scala @@ -17,7 +17,7 @@ package kafka.log -import java.io.{Closeable, File, IOException, RandomAccessFile} +import java.io.{Closeable, File, RandomAccessFile} import java.nio.channels.FileChannel import java.nio.file.Files import java.nio.{ByteBuffer, MappedByteBuffer} diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index aca81548b89..8449e39d581 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -267,7 +267,7 @@ class LogCleaner(initialConfig: CleanerConfig, * choosing the dirtiest log, cleaning it, and then swapping in the cleaned segments. */ private class CleanerThread(threadId: Int) - extends ShutdownableThread(name = "kafka-log-cleaner-thread-" + threadId, isInterruptible = false) { + extends ShutdownableThread(name = s"kafka-log-cleaner-thread-$threadId", isInterruptible = false) { protected override def loggerName = classOf[LogCleaner].getName @@ -458,7 +458,7 @@ private[log] class Cleaner(val id: Int, protected override def loggerName = classOf[LogCleaner].getName - this.logIdent = "Cleaner " + id + ": " + this.logIdent = s"Cleaner $id: " /* buffer used for read i/o */ private var readBuffer = ByteBuffer.allocate(ioBufferSize) @@ -752,7 +752,7 @@ private[log] class Cleaner(val id: Int, if(readBuffer.capacity >= maxBufferSize || writeBuffer.capacity >= maxBufferSize) throw new IllegalStateException("This log contains a message larger than maximum allowable size of %s.".format(maxBufferSize)) val newSize = math.min(this.readBuffer.capacity * 2, maxBufferSize) - info("Growing cleaner I/O buffers from " + readBuffer.capacity + "bytes to " + newSize + " bytes.") + info(s"Growing cleaner I/O buffers from ${readBuffer.capacity} bytes to $newSize bytes.") this.readBuffer = ByteBuffer.allocate(newSize) this.writeBuffer = ByteBuffer.allocate(newSize) } diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala index 13d14c17123..2fc7b749852 100755 --- a/core/src/main/scala/kafka/log/LogCleanerManager.scala +++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala @@ -59,8 +59,6 @@ private[log] class LogCleanerManager(val logDirs: Seq[File], val logs: Pool[TopicPartition, Log], val logDirFailureChannel: LogDirFailureChannel) extends Logging with KafkaMetricsGroup { - import LogCleanerManager._ - protected override def loggerName = classOf[LogCleaner].getName // package-private for testing diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index bcf380154a4..39029b078d2 100755 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -256,7 +256,7 @@ class LogManager(logDirs: Seq[File], private[log] def hasLogsToBeDeleted: Boolean = !logsToBeDeleted.isEmpty private def loadLog(logDir: File, recoveryPoints: Map[TopicPartition, Long], logStartOffsets: Map[TopicPartition, Long]): Unit = { - debug("Loading log '" + logDir.getName + "'") + debug(s"Loading log '${logDir.getName}'") val topicPartition = Log.parseTopicPartitionName(logDir) val config = topicConfigs.getOrElse(topicPartition.topic, currentDefaultConfig) val logRecoveryPoint = recoveryPoints.getOrElse(topicPartition, 0L) @@ -324,7 +324,7 @@ class LogManager(logDirs: Seq[File], recoveryPoints = this.recoveryPointCheckpoints(dir).read } catch { case e: Exception => - warn("Error occurred while reading recovery-point-offset-checkpoint file of directory " + dir, e) + warn(s"Error occurred while reading recovery-point-offset-checkpoint file of directory $dir", e) warn("Resetting the recovery checkpoint to 0") } @@ -333,7 +333,7 @@ class LogManager(logDirs: Seq[File], logStartOffsets = this.logStartOffsetCheckpoints(dir).read } catch { case e: Exception => - warn("Error occurred while reading log-start-offset-checkpoint file of directory " + dir, e) + warn(s"Error occurred while reading log-start-offset-checkpoint file of directory $dir", e) } val jobsForDir = for { @@ -346,7 +346,7 @@ class LogManager(logDirs: Seq[File], } catch { case e: IOException => offlineDirs.add((dir.getAbsolutePath, e)) - error("Error while loading log dir " + dir.getAbsolutePath, e) + error(s"Error while loading log dir ${dir.getAbsolutePath}", e) } } } @@ -354,7 +354,7 @@ class LogManager(logDirs: Seq[File], } catch { case e: IOException => offlineDirs.add((dir.getAbsolutePath, e)) - error("Error while loading log dir " + dir.getAbsolutePath, e) + error(s"Error while loading log dir ${dir.getAbsolutePath}", e) } } @@ -375,7 +375,7 @@ class LogManager(logDirs: Seq[File], } } catch { case e: ExecutionException => - error("There was an error in one of the threads during logs loading: " + e.getCause) + error(s"There was an error in one of the threads during logs loading: ${e.getCause}") throw e.getCause } finally { threadPools.foreach(_.shutdown()) @@ -442,7 +442,7 @@ class LogManager(logDirs: Seq[File], // close logs in each dir for (dir <- liveLogDirs) { - debug("Flushing and closing logs at " + dir) + debug(s"Flushing and closing logs at $dir") val pool = Executors.newFixedThreadPool(numRecoveryThreadsPerDataDir) threadPools.append(pool) @@ -465,19 +465,19 @@ class LogManager(logDirs: Seq[File], dirJobs.foreach(_.get) // update the last flush point - debug("Updating recovery points at " + dir) + debug(s"Updating recovery points at $dir") checkpointLogRecoveryOffsetsInDir(dir) - debug("Updating log start offsets at " + dir) + debug(s"Updating log start offsets at $dir") checkpointLogStartOffsetsInDir(dir) // mark that the shutdown was clean by creating marker file - debug("Writing clean shutdown marker at " + dir) + debug(s"Writing clean shutdown marker at $dir") CoreUtils.swallow(Files.createFile(new File(dir, Log.CleanShutdownFile).toPath), this) } } catch { case e: ExecutionException => - error("There was an error in one of the threads during LogManager shutdown: " + e.getCause) + error(s"There was an error in one of the threads during LogManager shutdown: ${e.getCause}") throw e.getCause } finally { threadPools.foreach(_.shutdown()) @@ -894,13 +894,13 @@ class LogManager(logDirs: Seq[File], try { deletableLogs.foreach { case (topicPartition, log) => - debug("Garbage collecting '" + log.name + "'") + debug(s"Garbage collecting '${log.name}'") total += log.deleteOldSegments() val futureLog = futureLogs.get(topicPartition) if (futureLog != null) { // clean future logs - debug("Garbage collecting future log '" + futureLog.name + "'") + debug(s"Garbage collecting future log '${futureLog.name}'") total += futureLog.deleteOldSegments() } } @@ -910,7 +910,7 @@ class LogManager(logDirs: Seq[File], } } - debug("Log cleanup completed. " + total + " files deleted in " + + debug(s"Log cleanup completed. $total files deleted in " + (time.milliseconds - startMs) / 1000 + " seconds") } @@ -952,13 +952,13 @@ class LogManager(logDirs: Seq[File], for ((topicPartition, log) <- currentLogs.toList ++ futureLogs.toList) { try { val timeSinceLastFlush = time.milliseconds - log.lastFlushTime - debug("Checking if flush is needed on " + topicPartition.topic + " flush interval " + log.config.flushMs + - " last flushed " + log.lastFlushTime + " time since last flush: " + timeSinceLastFlush) + debug(s"Checking if flush is needed on ${topicPartition.topic} flush interval ${log.config.flushMs}" + + s" last flushed ${log.lastFlushTime} time since last flush: $timeSinceLastFlush") if(timeSinceLastFlush >= log.config.flushMs) log.flush } catch { case e: Throwable => - error("Error flushing topic " + topicPartition.topic, e) + error(s"Error flushing topic ${topicPartition.topic}", e) } } } diff --git a/core/src/main/scala/kafka/log/OffsetIndex.scala b/core/src/main/scala/kafka/log/OffsetIndex.scala index 23dabf793a9..6f246eedf1f 100755 --- a/core/src/main/scala/kafka/log/OffsetIndex.scala +++ b/core/src/main/scala/kafka/log/OffsetIndex.scala @@ -20,7 +20,6 @@ package kafka.log import java.io.File import java.nio.ByteBuffer -import kafka.common.IndexOffsetOverflowException import kafka.utils.CoreUtils.inLock import org.apache.kafka.common.errors.InvalidOffsetException diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index b5b3e4d0ec8..ae09a03b7dd 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -133,7 +133,7 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time } } ) - info("Started " + acceptors.size + " acceptor threads") + info(s"Started ${acceptors.size} acceptor threads") } /** @@ -335,7 +335,7 @@ private[kafka] abstract class AbstractServerThread(connectionQuotas: ConnectionQ */ def close(channel: SocketChannel): Unit = { if (channel != null) { - debug("Closing connection from " + channel.socket.getRemoteSocketAddress()) + debug(s"Closing connection from ${channel.socket.getRemoteSocketAddress()}") connectionQuotas.dec(channel.socket.getInetAddress) CoreUtils.swallow(channel.socket().close(), this, Level.ERROR) CoreUtils.swallow(channel.close(), this, Level.ERROR) @@ -627,7 +627,7 @@ private[kafka] class Processor(val id: Int, } } } finally { - debug("Closing selector - processor " + id) + debug(s"Closing selector - processor $id") CoreUtils.swallow(closeAll(), this, Level.ERROR) shutdownComplete() } @@ -658,7 +658,7 @@ private[kafka] class Processor(val id: Int, // There is no response to send to the client, we need to read more pipelined requests // that are sitting in the server's socket buffer updateRequestMetrics(response) - trace("Socket server received empty response to send, registering for read: " + response) + trace(s"Socket server received empty response to send, registering for read: $response") // Try unmuting the channel. If there was no quota violation and the channel has not been throttled, // it will be unmuted immediately. If the channel has been throttled, it will be unmuted only if the // throttling delay has already passed by now. diff --git a/core/src/main/scala/kafka/server/DelegationTokenManager.scala b/core/src/main/scala/kafka/server/DelegationTokenManager.scala index 0eb9b07b32d..90025f42989 100644 --- a/core/src/main/scala/kafka/server/DelegationTokenManager.scala +++ b/core/src/main/scala/kafka/server/DelegationTokenManager.scala @@ -161,7 +161,7 @@ class DelegationTokenManager(val config: KafkaConfig, val tokenCache: DelegationTokenCache, val time: Time, val zkClient: KafkaZkClient) extends Logging with KafkaMetricsGroup { - this.logIdent = "[Token Manager on Broker " + config.brokerId + "]: " + this.logIdent = s"[Token Manager on Broker ${config.brokerId}]: " import DelegationTokenManager._ @@ -201,7 +201,7 @@ class DelegationTokenManager(val config: KafkaConfig, private def loadCache() { lock.synchronized { val tokens = zkClient.getChildren(DelegationTokensZNode.path) - info(s"Loading the token cache. Total token count : " + tokens.size) + info(s"Loading the token cache. Total token count: ${tokens.size}") for (tokenId <- tokens) { try { getTokenFromZk(tokenId) match { @@ -209,7 +209,7 @@ class DelegationTokenManager(val config: KafkaConfig, case None => } } catch { - case ex: Throwable => error(s"Error while getting Token for tokenId :$tokenId", ex) + case ex: Throwable => error(s"Error while getting Token for tokenId: $tokenId", ex) } } } @@ -279,7 +279,7 @@ class DelegationTokenManager(val config: KafkaConfig, val hmac = createHmac(tokenId, secretKey) val token = new DelegationToken(tokenInfo, hmac) updateToken(token) - info(s"Created a delegation token : $tokenId for owner : $owner") + info(s"Created a delegation token: $tokenId for owner: $owner") responseCallback(CreateTokenResult(issueTimeStamp, expiryTimeStamp, maxLifeTimeStamp, tokenId, hmac, Errors.NONE)) } } @@ -317,7 +317,7 @@ class DelegationTokenManager(val config: KafkaConfig, tokenInfo.setExpiryTimestamp(expiryTimeStamp) updateToken(token) - info(s"Delegation token renewed for token : " + tokenInfo.tokenId + " for owner :" + tokenInfo.owner) + info(s"Delegation token renewed for token: ${tokenInfo.tokenId} for owner: ${tokenInfo.owner}") renewCallback(Errors.NONE, expiryTimeStamp) } } @@ -412,7 +412,7 @@ class DelegationTokenManager(val config: KafkaConfig, expireResponseCallback(Errors.DELEGATION_TOKEN_EXPIRED, -1) } else if (expireLifeTimeMs < 0) { //expire immediately removeToken(tokenInfo.tokenId) - info(s"Token expired for token : " + tokenInfo.tokenId + " for owner :" + tokenInfo.owner) + info(s"Token expired for token: ${tokenInfo.tokenId} for owner: ${tokenInfo.owner}") expireResponseCallback(Errors.NONE, now) } else { //set expiry time stamp @@ -420,7 +420,7 @@ class DelegationTokenManager(val config: KafkaConfig, tokenInfo.setExpiryTimestamp(expiryTimeStamp) updateToken(token) - info(s"Updated expiry time for token : " + tokenInfo.tokenId + " for owner :" + tokenInfo.owner) + info(s"Updated expiry time for token: ${tokenInfo.tokenId} for owner: ${tokenInfo.owner}") expireResponseCallback(Errors.NONE, expiryTimeStamp) } } @@ -457,7 +457,7 @@ class DelegationTokenManager(val config: KafkaConfig, for (tokenInfo <- getAllTokenInformation) { val now = time.milliseconds if (tokenInfo.maxTimestamp < now || tokenInfo.expiryTimestamp < now) { - info(s"Delegation token expired for token : " + tokenInfo.tokenId + " for owner :" + tokenInfo.owner) + info(s"Delegation token expired for token: ${tokenInfo.tokenId} for owner: ${tokenInfo.owner}") removeToken(tokenInfo.tokenId) } } @@ -480,7 +480,7 @@ class DelegationTokenManager(val config: KafkaConfig, override def processNotification(tokenIdBytes: Array[Byte]) { lock.synchronized { val tokenId = new String(tokenIdBytes, StandardCharsets.UTF_8) - info(s"Processing Token Notification for tokenId : $tokenId") + info(s"Processing Token Notification for tokenId: $tokenId") getTokenFromZk(tokenId) match { case Some(token) => updateCache(token) case None => removeCache(tokenId) diff --git a/core/src/main/scala/kafka/server/ThrottledChannel.scala b/core/src/main/scala/kafka/server/ThrottledChannel.scala index 8fe8649848d..c46188f606e 100644 --- a/core/src/main/scala/kafka/server/ThrottledChannel.scala +++ b/core/src/main/scala/kafka/server/ThrottledChannel.scala @@ -42,7 +42,7 @@ class ThrottledChannel(val request: RequestChannel.Request, val time: Time, val // Notify the socket server that throttling has been done for this channel. def notifyThrottlingDone(): Unit = { - trace("Channel throttled for: " + throttleTimeMs + " ms") + trace(s"Channel throttled for: $throttleTimeMs ms") channelThrottlingCallback(new network.RequestChannel.EndThrottlingResponse(request)) } diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index b6fd918032c..de2eba1533d 100755 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -359,7 +359,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { record.headers) override def run() { - info("Starting mirror maker thread " + threadName) + info(s"Starting mirror maker thread $threadName") try { consumerWrapper.init() @@ -425,7 +425,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { def shutdown() { try { - info(threadName + " shutting down") + info(s"$threadName shutting down") shuttingDown = true consumerWrapper.wakeup() } diff --git a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala index 9a5ac7bdeb5..6edd31557d1 100644 --- a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala +++ b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala @@ -118,7 +118,7 @@ object ReplicaVerificationTool extends Logging { try Pattern.compile(regex) catch { case _: PatternSyntaxException => - throw new RuntimeException(regex + " is an invalid regex.") + throw new RuntimeException(s"$regex is an invalid regex.") } val fetchSize = options.valueOf(fetchSizeOpt).intValue @@ -199,7 +199,7 @@ object ReplicaVerificationTool extends Logging { } }) fetcherThreads.foreach(_.start()) - println(ReplicaVerificationTool.getCurrentTimeString() + ": verification process is started.") + println(s"${ReplicaVerificationTool.getCurrentTimeString()}: verification process is started.") } @@ -300,7 +300,7 @@ private class ReplicaBuffer(expectedReplicasPerTopicPartition: Map[TopicPartitio debug("Begin verification") maxLag = -1L for ((topicPartition, fetchResponsePerReplica) <- recordsCache) { - debug("Verifying " + topicPartition) + debug(s"Verifying $topicPartition") assert(fetchResponsePerReplica.size == expectedReplicasPerTopicPartition(topicPartition), "fetched " + fetchResponsePerReplica.size + " replicas for " + topicPartition + ", but expected " + expectedReplicasPerTopicPartition(topicPartition) + " replicas") diff --git a/core/src/main/scala/kafka/utils/CoreUtils.scala b/core/src/main/scala/kafka/utils/CoreUtils.scala index 3a4399c44ce..a902701edd4 100755 --- a/core/src/main/scala/kafka/utils/CoreUtils.scala +++ b/core/src/main/scala/kafka/utils/CoreUtils.scala @@ -147,7 +147,7 @@ object CoreUtils extends Logging { } } catch { case e: Exception => { - error("Failed to register Mbean " + name, e) + error(s"Failed to register Mbean $name", e) false } } diff --git a/core/src/main/scala/kafka/utils/FileLock.scala b/core/src/main/scala/kafka/utils/FileLock.scala index c0afbfba230..fd31b12a2b0 100644 --- a/core/src/main/scala/kafka/utils/FileLock.scala +++ b/core/src/main/scala/kafka/utils/FileLock.scala @@ -36,7 +36,7 @@ class FileLock(val file: File) extends Logging { */ def lock() { this synchronized { - trace("Acquiring lock on " + file.getAbsolutePath) + trace(s"Acquiring lock on ${file.getAbsolutePath}") flock = channel.lock() } } @@ -46,7 +46,7 @@ class FileLock(val file: File) extends Logging { */ def tryLock(): Boolean = { this synchronized { - trace("Acquiring lock on " + file.getAbsolutePath) + trace(s"Acquiring lock on ${file.getAbsolutePath}") try { // weirdly this method will return null if the lock is held by another // process, but will throw an exception if the lock is held by this process @@ -64,7 +64,7 @@ class FileLock(val file: File) extends Logging { */ def unlock() { this synchronized { - trace("Releasing lock on " + file.getAbsolutePath) + trace(s"Releasing lock on ${file.getAbsolutePath}") if(flock != null) flock.release() } diff --git a/core/src/main/scala/kafka/utils/KafkaScheduler.scala b/core/src/main/scala/kafka/utils/KafkaScheduler.scala index 24eb17770ec..b4fae0bc85a 100755 --- a/core/src/main/scala/kafka/utils/KafkaScheduler.scala +++ b/core/src/main/scala/kafka/utils/KafkaScheduler.scala @@ -113,7 +113,7 @@ class KafkaScheduler(val threads: Int, trace("Beginning execution of scheduled task '%s'.".format(name)) fun() } catch { - case t: Throwable => error("Uncaught exception in scheduled task '" + name +"'", t) + case t: Throwable => error(s"Uncaught exception in scheduled task '$name'", t) } finally { trace("Completed execution of scheduled task '%s'.".format(name)) } diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index 331f7bbc8a8..dd850906b42 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -220,7 +220,7 @@ class ZkUtils(val zkClient: ZkClient, + "Probably this controller is still using the old format [%s] to store the broker id in zookeeper".format(controllerInfoString)) try controllerInfoString.toInt catch { - case t: Throwable => throw new KafkaException("Failed to parse the controller info: " + controllerInfoString + ". This is neither the new or the old format.", t) + case t: Throwable => throw new KafkaException(s"Failed to parse the controller info: $controllerInfoString. This is neither the new or the old format.", t) } } } @@ -415,11 +415,11 @@ class ZkUtils(val zkClient: ZkClient, case _: ZkNoNodeException => // the node disappeared; treat as if node existed and let caller handles this } if (storedData == null || storedData != data) { - info("conflict in " + path + " data: " + data + " stored data: " + storedData) + info(s"conflict in $path data: $data stored data: $storedData") throw e } else { // otherwise, the creation succeeded, return normally - info(path + " exists with value " + data + " during connection loss; this is ok") + info(s"$path exists with value $data during connection loss; this is ok") } } } diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index a2f10498666..9ef721491cc 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -25,7 +25,7 @@ import kafka.network.SocketServer import kafka.security.auth._ import kafka.server.{BaseRequestTest, KafkaConfig} import kafka.utils.TestUtils -import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig, NewPartitions} +import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig} import org.apache.kafka.clients.consumer._ import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener import org.apache.kafka.clients.producer._ diff --git a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala index a1f2cff7691..7fd68c2b5ac 100644 --- a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala +++ b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala @@ -18,7 +18,7 @@ package kafka.api import com.yammer.metrics.Metrics -import com.yammer.metrics.core.{Gauge, Metric, MetricName} +import com.yammer.metrics.core.Gauge import java.io.File import java.util.ArrayList diff --git a/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala b/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala index 890ea3b1ad7..9ed227464a6 100644 --- a/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala +++ b/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala @@ -23,7 +23,7 @@ import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.errors.FencedLeaderEpochException import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.record.MemoryRecords -import org.apache.kafka.common.requests.{FetchRequest, IsolationLevel} +import org.apache.kafka.common.requests.FetchRequest import org.easymock.{EasyMock, EasyMockSupport} import org.junit.Test import org.junit.Assert._ diff --git a/core/src/test/scala/other/kafka/StressTestLog.scala b/core/src/test/scala/other/kafka/StressTestLog.scala index 87e26fe2930..7821a0e007d 100755 --- a/core/src/test/scala/other/kafka/StressTestLog.scala +++ b/core/src/test/scala/other/kafka/StressTestLog.scala @@ -25,7 +25,6 @@ import kafka.server.{BrokerTopicStats, LogDirFailureChannel} import kafka.utils._ import org.apache.kafka.clients.consumer.OffsetOutOfRangeException import org.apache.kafka.common.record.FileRecords -import org.apache.kafka.common.requests.IsolationLevel import org.apache.kafka.common.utils.Utils /** diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala index d2fe7eacb23..6004cc01229 100644 --- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala @@ -28,7 +28,6 @@ import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.internals.Topic.TRANSACTION_STATE_TOPIC_NAME import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.record._ -import org.apache.kafka.common.requests.IsolationLevel import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.common.requests.TransactionResult import org.apache.kafka.common.utils.MockTime diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index b903c4a2092..a10800f8f38 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -17,7 +17,6 @@ package kafka.server -import java.lang.{Long => JLong} import java.net.InetAddress import java.util import java.util.{Collections, Optional} diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index 9ca72561748..10dba1a4793 100755 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -22,7 +22,6 @@ import java.util.Properties import kafka.api.{ApiVersion, KAFKA_0_8_2} import kafka.cluster.EndPoint import kafka.message._ -import kafka.metrics.KafkaMetricsConfig import kafka.utils.{CoreUtils, TestUtils} import org.apache.kafka.common.config.ConfigException import org.apache.kafka.common.metrics.Sensor diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala index 9b59e71b0a7..0c6d09c2fc1 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala @@ -30,7 +30,6 @@ import org.apache.kafka.common.record.{CompressionType, MemoryRecords, SimpleRec import org.apache.kafka.common.requests.FetchRequest.PartitionData import org.easymock.EasyMock import EasyMock._ -import org.apache.kafka.common.requests.IsolationLevel import org.junit.Assert._ import org.junit.{After, Test} diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala index ecfbd73cb50..1bb0e20ed6d 100644 --- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala +++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala @@ -32,7 +32,6 @@ import java.util.concurrent.atomic.AtomicBoolean import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.record.{CompressionType, MemoryRecords, SimpleRecord} -import org.apache.kafka.common.requests.IsolationLevel import org.easymock.EasyMock import org.junit.Assert._ diff --git a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala index 7ac606a2dc5..fc4dcfd424e 100644 --- a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala +++ b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala @@ -19,7 +19,6 @@ package kafka.server.epoch import java.io.File -import kafka.server.LogOffsetMetadata import kafka.server.checkpoints.{LeaderEpochCheckpoint, LeaderEpochCheckpointFile} import org.apache.kafka.common.requests.EpochEndOffset.{UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET} import kafka.utils.TestUtils