MINOR: Use string/log interpolation instead of string concat in core and clients (#5850)

Also removed a few unused imports and tweaked the log message slightly.

Reviewers: Ismael Juma <ismael@juma.me.uk>
This commit is contained in:
Mickael Maison 2018-10-29 13:37:23 +00:00 committed by Ismael Juma
parent eede70a805
commit 9a0ea25fee
32 changed files with 71 additions and 81 deletions

View File

@ -113,8 +113,8 @@ public class CommonClientConfigs {
HashMap<String, Object> rval = new HashMap<>(); HashMap<String, Object> rval = new HashMap<>();
if ((!config.originals().containsKey(RECONNECT_BACKOFF_MAX_MS_CONFIG)) && if ((!config.originals().containsKey(RECONNECT_BACKOFF_MAX_MS_CONFIG)) &&
config.originals().containsKey(RECONNECT_BACKOFF_MS_CONFIG)) { config.originals().containsKey(RECONNECT_BACKOFF_MS_CONFIG)) {
log.debug("Disabling exponential reconnect backoff because " + RECONNECT_BACKOFF_MS_CONFIG + log.debug("Disabling exponential reconnect backoff because {} is set, but {} is not.",
" is set, but " + RECONNECT_BACKOFF_MAX_MS_CONFIG + " is not."); RECONNECT_BACKOFF_MS_CONFIG, RECONNECT_BACKOFF_MAX_MS_CONFIG);
rval.put(RECONNECT_BACKOFF_MAX_MS_CONFIG, parsedValues.get(RECONNECT_BACKOFF_MS_CONFIG)); rval.put(RECONNECT_BACKOFF_MAX_MS_CONFIG, parsedValues.get(RECONNECT_BACKOFF_MS_CONFIG));
} }
return rval; return rval;

View File

@ -334,7 +334,7 @@ public class StickyAssignor extends AbstractPartitionAssignor {
List<TopicPartition> topicPartitions = entry.getValue(); List<TopicPartition> topicPartitions = entry.getValue();
for (TopicPartition topicPartition: topicPartitions) { for (TopicPartition topicPartition: topicPartitions) {
if (allPartitions.containsKey(topicPartition)) if (allPartitions.containsKey(topicPartition))
log.error(topicPartition + " is assigned to more than one consumer."); log.error("{} is assigned to more than one consumer.", topicPartition);
allPartitions.put(topicPartition, entry.getKey()); allPartitions.put(topicPartition, entry.getKey());
} }
} }
@ -356,7 +356,8 @@ public class StickyAssignor extends AbstractPartitionAssignor {
String otherConsumer = allPartitions.get(topicPartition); String otherConsumer = allPartitions.get(topicPartition);
int otherConsumerPartitionCount = currentAssignment.get(otherConsumer).size(); int otherConsumerPartitionCount = currentAssignment.get(otherConsumer).size();
if (consumerPartitionCount < otherConsumerPartitionCount) { if (consumerPartitionCount < otherConsumerPartitionCount) {
log.debug(topicPartition + " can be moved from consumer " + otherConsumer + " to consumer " + consumer + " for a more balanced assignment."); log.debug("{} can be moved from consumer {} to consumer {} for a more balanced assignment.",
topicPartition, otherConsumer, consumer);
return false; return false;
} }
} }
@ -499,7 +500,7 @@ public class StickyAssignor extends AbstractPartitionAssignor {
int currentAssignmentSize = currentPartitions.size(); int currentAssignmentSize = currentPartitions.size();
int maxAssignmentSize = consumer2AllPotentialPartitions.get(consumer).size(); int maxAssignmentSize = consumer2AllPotentialPartitions.get(consumer).size();
if (currentAssignmentSize > maxAssignmentSize) if (currentAssignmentSize > maxAssignmentSize)
log.error("The consumer " + consumer + " is assigned more partitions than the maximum possible."); log.error("The consumer {} is assigned more partitions than the maximum possible.", consumer);
if (currentAssignmentSize < maxAssignmentSize) if (currentAssignmentSize < maxAssignmentSize)
// if a consumer is not assigned all its potential partitions it is subject to reassignment // if a consumer is not assigned all its potential partitions it is subject to reassignment
@ -598,12 +599,12 @@ public class StickyAssignor extends AbstractPartitionAssignor {
// the partition must have at least two consumers // the partition must have at least two consumers
if (partition2AllPotentialConsumers.get(partition).size() <= 1) if (partition2AllPotentialConsumers.get(partition).size() <= 1)
log.error("Expected more than one potential consumer for partition '" + partition + "'"); log.error("Expected more than one potential consumer for partition '{}'", partition);
// the partition must have a current consumer // the partition must have a current consumer
String consumer = currentPartitionConsumer.get(partition); String consumer = currentPartitionConsumer.get(partition);
if (consumer == null) if (consumer == null)
log.error("Expected partition '" + partition + "' to be assigned to a consumer"); log.error("Expected partition '{}' to be assigned to a consumer", partition);
// check if a better-suited consumer exist for the partition; if so, reassign it // check if a better-suited consumer exist for the partition; if so, reassign it
for (String otherConsumer: partition2AllPotentialConsumers.get(partition)) { for (String otherConsumer: partition2AllPotentialConsumers.get(partition)) {
@ -879,7 +880,7 @@ public class StickyAssignor extends AbstractPartitionAssignor {
List<String> path = new ArrayList<>(Collections.singleton(pair.srcMemberId)); List<String> path = new ArrayList<>(Collections.singleton(pair.srcMemberId));
if (isLinked(pair.dstMemberId, pair.srcMemberId, reducedPairs, path) && !in(path, cycles)) { if (isLinked(pair.dstMemberId, pair.srcMemberId, reducedPairs, path) && !in(path, cycles)) {
cycles.add(new ArrayList<>(path)); cycles.add(new ArrayList<>(path));
log.error("A cycle of length " + (path.size() - 1) + " was found: " + path.toString()); log.error("A cycle of length {} was found: {}", path.size() - 1, path.toString());
} }
} }
@ -896,9 +897,9 @@ public class StickyAssignor extends AbstractPartitionAssignor {
for (Map.Entry<String, Map<ConsumerPair, Set<TopicPartition>>> topicMovements: this.partitionMovementsByTopic.entrySet()) { for (Map.Entry<String, Map<ConsumerPair, Set<TopicPartition>>> topicMovements: this.partitionMovementsByTopic.entrySet()) {
Set<ConsumerPair> topicMovementPairs = topicMovements.getValue().keySet(); Set<ConsumerPair> topicMovementPairs = topicMovements.getValue().keySet();
if (hasCycles(topicMovementPairs)) { if (hasCycles(topicMovementPairs)) {
log.error("Stickiness is violated for topic " + topicMovements.getKey() log.error("Stickiness is violated for topic {}"
+ "\nPartition movements for this topic occurred among the following consumer pairs:" + "\nPartition movements for this topic occurred among the following consumer pairs:"
+ "\n" + topicMovements.getValue().toString()); + "\n{}", topicMovements.getKey(), topicMovements.getValue().toString());
return false; return false;
} }
} }

View File

@ -196,7 +196,7 @@ public class TransactionManager {
} }
TransactionManager() { TransactionManager() {
this(new LogContext(), null, 0, 100); this(new LogContext(), null, 0, 100L);
} }
public synchronized TransactionalRequestResult initializeTransactions() { public synchronized TransactionalRequestResult initializeTransactions() {

View File

@ -102,7 +102,7 @@ public class MultiRecordsSend implements Send {
totalWritten += totalWrittenPerCall; totalWritten += totalWrittenPerCall;
if (completed() && totalWritten != size) if (completed() && totalWritten != size)
log.error("mismatch in sending bytes over socket; expected: " + size + " actual: " + totalWritten); log.error("mismatch in sending bytes over socket; expected: {} actual: {}", size, totalWritten);
log.trace("Bytes written as part of multi-send call: {}, total bytes written so far: {}, expected bytes to write: {}", log.trace("Bytes written as part of multi-send call: {}, total bytes written so far: {}, expected bytes to write: {}",
totalWrittenPerCall, totalWritten, size); totalWrittenPerCall, totalWritten, size);

View File

@ -41,7 +41,7 @@ public class AppInfoParser {
try (InputStream resourceStream = AppInfoParser.class.getResourceAsStream("/kafka/kafka-version.properties")) { try (InputStream resourceStream = AppInfoParser.class.getResourceAsStream("/kafka/kafka-version.properties")) {
props.load(resourceStream); props.load(resourceStream);
} catch (Exception e) { } catch (Exception e) {
log.warn("Error while loading kafka-version.properties :" + e.getMessage()); log.warn("Error while loading kafka-version.properties: {}", e.getMessage());
} }
VERSION = props.getProperty("version", "unknown").trim(); VERSION = props.getProperty("version", "unknown").trim();
COMMIT_ID = props.getProperty("commitId", "unknown").trim(); COMMIT_ID = props.getProperty("commitId", "unknown").trim();
@ -106,8 +106,8 @@ public class AppInfoParser {
public static class AppInfo implements AppInfoMBean { public static class AppInfo implements AppInfoMBean {
public AppInfo() { public AppInfo() {
log.info("Kafka version : " + AppInfoParser.getVersion()); log.info("Kafka version: {}", AppInfoParser.getVersion());
log.info("Kafka commitId : " + AppInfoParser.getCommitId()); log.info("Kafka commitId: {}", AppInfoParser.getCommitId());
} }
@Override @Override

View File

@ -473,10 +473,10 @@ object AdminUtils extends Logging with AdminUtilities {
val jsonPartitionData = zkUtils.replicaAssignmentZkData(replicaAssignment.map(e => e._1.toString -> e._2)) val jsonPartitionData = zkUtils.replicaAssignmentZkData(replicaAssignment.map(e => e._1.toString -> e._2))
if (!update) { if (!update) {
info("Topic creation " + jsonPartitionData.toString) info(s"Topic creation $jsonPartitionData")
zkUtils.createPersistentPath(zkPath, jsonPartitionData) zkUtils.createPersistentPath(zkPath, jsonPartitionData)
} else { } else {
info("Topic update " + jsonPartitionData.toString) info(s"Topic update $jsonPartitionData")
zkUtils.updatePersistentPath(zkPath, jsonPartitionData) zkUtils.updatePersistentPath(zkPath, jsonPartitionData)
} }
debug("Updated path %s with %s for replica assignment".format(zkPath, jsonPartitionData)) debug("Updated path %s with %s for replica assignment".format(zkPath, jsonPartitionData))

View File

@ -29,7 +29,7 @@ import com.yammer.metrics.core.Gauge
import kafka.api.{ApiVersion, KAFKA_0_10_1_IV0, KAFKA_2_1_IV0, KAFKA_2_1_IV1} import kafka.api.{ApiVersion, KAFKA_0_10_1_IV0, KAFKA_2_1_IV0, KAFKA_2_1_IV1}
import kafka.common.{MessageFormatter, OffsetAndMetadata} import kafka.common.{MessageFormatter, OffsetAndMetadata}
import kafka.metrics.KafkaMetricsGroup import kafka.metrics.KafkaMetricsGroup
import kafka.server.{FetchHighWatermark, FetchLogEnd, ReplicaManager} import kafka.server.ReplicaManager
import kafka.utils.CoreUtils.inLock import kafka.utils.CoreUtils.inLock
import kafka.utils._ import kafka.utils._
import kafka.zk.KafkaZkClient import kafka.zk.KafkaZkClient
@ -40,7 +40,7 @@ import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.protocol.types.Type._ import org.apache.kafka.common.protocol.types.Type._
import org.apache.kafka.common.protocol.types._ import org.apache.kafka.common.protocol.types._
import org.apache.kafka.common.record._ import org.apache.kafka.common.record._
import org.apache.kafka.common.requests.{IsolationLevel, OffsetCommitRequest, OffsetFetchResponse} import org.apache.kafka.common.requests.{OffsetCommitRequest, OffsetFetchResponse}
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.utils.{Time, Utils} import org.apache.kafka.common.utils.{Time, Utils}

View File

@ -33,7 +33,6 @@ import org.apache.kafka.common.{KafkaException, TopicPartition}
import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.{FileRecords, MemoryRecords, SimpleRecord} import org.apache.kafka.common.record.{FileRecords, MemoryRecords, SimpleRecord}
import org.apache.kafka.common.requests.IsolationLevel
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.requests.TransactionResult import org.apache.kafka.common.requests.TransactionResult
import org.apache.kafka.common.utils.{Time, Utils} import org.apache.kafka.common.utils.{Time, Utils}

View File

@ -17,7 +17,7 @@
package kafka.log package kafka.log
import java.io.{Closeable, File, IOException, RandomAccessFile} import java.io.{Closeable, File, RandomAccessFile}
import java.nio.channels.FileChannel import java.nio.channels.FileChannel
import java.nio.file.Files import java.nio.file.Files
import java.nio.{ByteBuffer, MappedByteBuffer} import java.nio.{ByteBuffer, MappedByteBuffer}

View File

@ -267,7 +267,7 @@ class LogCleaner(initialConfig: CleanerConfig,
* choosing the dirtiest log, cleaning it, and then swapping in the cleaned segments. * choosing the dirtiest log, cleaning it, and then swapping in the cleaned segments.
*/ */
private class CleanerThread(threadId: Int) private class CleanerThread(threadId: Int)
extends ShutdownableThread(name = "kafka-log-cleaner-thread-" + threadId, isInterruptible = false) { extends ShutdownableThread(name = s"kafka-log-cleaner-thread-$threadId", isInterruptible = false) {
protected override def loggerName = classOf[LogCleaner].getName protected override def loggerName = classOf[LogCleaner].getName
@ -458,7 +458,7 @@ private[log] class Cleaner(val id: Int,
protected override def loggerName = classOf[LogCleaner].getName protected override def loggerName = classOf[LogCleaner].getName
this.logIdent = "Cleaner " + id + ": " this.logIdent = s"Cleaner $id: "
/* buffer used for read i/o */ /* buffer used for read i/o */
private var readBuffer = ByteBuffer.allocate(ioBufferSize) private var readBuffer = ByteBuffer.allocate(ioBufferSize)
@ -752,7 +752,7 @@ private[log] class Cleaner(val id: Int,
if(readBuffer.capacity >= maxBufferSize || writeBuffer.capacity >= maxBufferSize) if(readBuffer.capacity >= maxBufferSize || writeBuffer.capacity >= maxBufferSize)
throw new IllegalStateException("This log contains a message larger than maximum allowable size of %s.".format(maxBufferSize)) throw new IllegalStateException("This log contains a message larger than maximum allowable size of %s.".format(maxBufferSize))
val newSize = math.min(this.readBuffer.capacity * 2, maxBufferSize) val newSize = math.min(this.readBuffer.capacity * 2, maxBufferSize)
info("Growing cleaner I/O buffers from " + readBuffer.capacity + "bytes to " + newSize + " bytes.") info(s"Growing cleaner I/O buffers from ${readBuffer.capacity} bytes to $newSize bytes.")
this.readBuffer = ByteBuffer.allocate(newSize) this.readBuffer = ByteBuffer.allocate(newSize)
this.writeBuffer = ByteBuffer.allocate(newSize) this.writeBuffer = ByteBuffer.allocate(newSize)
} }

View File

@ -59,8 +59,6 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
val logs: Pool[TopicPartition, Log], val logs: Pool[TopicPartition, Log],
val logDirFailureChannel: LogDirFailureChannel) extends Logging with KafkaMetricsGroup { val logDirFailureChannel: LogDirFailureChannel) extends Logging with KafkaMetricsGroup {
import LogCleanerManager._
protected override def loggerName = classOf[LogCleaner].getName protected override def loggerName = classOf[LogCleaner].getName
// package-private for testing // package-private for testing

View File

@ -256,7 +256,7 @@ class LogManager(logDirs: Seq[File],
private[log] def hasLogsToBeDeleted: Boolean = !logsToBeDeleted.isEmpty private[log] def hasLogsToBeDeleted: Boolean = !logsToBeDeleted.isEmpty
private def loadLog(logDir: File, recoveryPoints: Map[TopicPartition, Long], logStartOffsets: Map[TopicPartition, Long]): Unit = { private def loadLog(logDir: File, recoveryPoints: Map[TopicPartition, Long], logStartOffsets: Map[TopicPartition, Long]): Unit = {
debug("Loading log '" + logDir.getName + "'") debug(s"Loading log '${logDir.getName}'")
val topicPartition = Log.parseTopicPartitionName(logDir) val topicPartition = Log.parseTopicPartitionName(logDir)
val config = topicConfigs.getOrElse(topicPartition.topic, currentDefaultConfig) val config = topicConfigs.getOrElse(topicPartition.topic, currentDefaultConfig)
val logRecoveryPoint = recoveryPoints.getOrElse(topicPartition, 0L) val logRecoveryPoint = recoveryPoints.getOrElse(topicPartition, 0L)
@ -324,7 +324,7 @@ class LogManager(logDirs: Seq[File],
recoveryPoints = this.recoveryPointCheckpoints(dir).read recoveryPoints = this.recoveryPointCheckpoints(dir).read
} catch { } catch {
case e: Exception => case e: Exception =>
warn("Error occurred while reading recovery-point-offset-checkpoint file of directory " + dir, e) warn(s"Error occurred while reading recovery-point-offset-checkpoint file of directory $dir", e)
warn("Resetting the recovery checkpoint to 0") warn("Resetting the recovery checkpoint to 0")
} }
@ -333,7 +333,7 @@ class LogManager(logDirs: Seq[File],
logStartOffsets = this.logStartOffsetCheckpoints(dir).read logStartOffsets = this.logStartOffsetCheckpoints(dir).read
} catch { } catch {
case e: Exception => case e: Exception =>
warn("Error occurred while reading log-start-offset-checkpoint file of directory " + dir, e) warn(s"Error occurred while reading log-start-offset-checkpoint file of directory $dir", e)
} }
val jobsForDir = for { val jobsForDir = for {
@ -346,7 +346,7 @@ class LogManager(logDirs: Seq[File],
} catch { } catch {
case e: IOException => case e: IOException =>
offlineDirs.add((dir.getAbsolutePath, e)) offlineDirs.add((dir.getAbsolutePath, e))
error("Error while loading log dir " + dir.getAbsolutePath, e) error(s"Error while loading log dir ${dir.getAbsolutePath}", e)
} }
} }
} }
@ -354,7 +354,7 @@ class LogManager(logDirs: Seq[File],
} catch { } catch {
case e: IOException => case e: IOException =>
offlineDirs.add((dir.getAbsolutePath, e)) offlineDirs.add((dir.getAbsolutePath, e))
error("Error while loading log dir " + dir.getAbsolutePath, e) error(s"Error while loading log dir ${dir.getAbsolutePath}", e)
} }
} }
@ -375,7 +375,7 @@ class LogManager(logDirs: Seq[File],
} }
} catch { } catch {
case e: ExecutionException => case e: ExecutionException =>
error("There was an error in one of the threads during logs loading: " + e.getCause) error(s"There was an error in one of the threads during logs loading: ${e.getCause}")
throw e.getCause throw e.getCause
} finally { } finally {
threadPools.foreach(_.shutdown()) threadPools.foreach(_.shutdown())
@ -442,7 +442,7 @@ class LogManager(logDirs: Seq[File],
// close logs in each dir // close logs in each dir
for (dir <- liveLogDirs) { for (dir <- liveLogDirs) {
debug("Flushing and closing logs at " + dir) debug(s"Flushing and closing logs at $dir")
val pool = Executors.newFixedThreadPool(numRecoveryThreadsPerDataDir) val pool = Executors.newFixedThreadPool(numRecoveryThreadsPerDataDir)
threadPools.append(pool) threadPools.append(pool)
@ -465,19 +465,19 @@ class LogManager(logDirs: Seq[File],
dirJobs.foreach(_.get) dirJobs.foreach(_.get)
// update the last flush point // update the last flush point
debug("Updating recovery points at " + dir) debug(s"Updating recovery points at $dir")
checkpointLogRecoveryOffsetsInDir(dir) checkpointLogRecoveryOffsetsInDir(dir)
debug("Updating log start offsets at " + dir) debug(s"Updating log start offsets at $dir")
checkpointLogStartOffsetsInDir(dir) checkpointLogStartOffsetsInDir(dir)
// mark that the shutdown was clean by creating marker file // mark that the shutdown was clean by creating marker file
debug("Writing clean shutdown marker at " + dir) debug(s"Writing clean shutdown marker at $dir")
CoreUtils.swallow(Files.createFile(new File(dir, Log.CleanShutdownFile).toPath), this) CoreUtils.swallow(Files.createFile(new File(dir, Log.CleanShutdownFile).toPath), this)
} }
} catch { } catch {
case e: ExecutionException => case e: ExecutionException =>
error("There was an error in one of the threads during LogManager shutdown: " + e.getCause) error(s"There was an error in one of the threads during LogManager shutdown: ${e.getCause}")
throw e.getCause throw e.getCause
} finally { } finally {
threadPools.foreach(_.shutdown()) threadPools.foreach(_.shutdown())
@ -894,13 +894,13 @@ class LogManager(logDirs: Seq[File],
try { try {
deletableLogs.foreach { deletableLogs.foreach {
case (topicPartition, log) => case (topicPartition, log) =>
debug("Garbage collecting '" + log.name + "'") debug(s"Garbage collecting '${log.name}'")
total += log.deleteOldSegments() total += log.deleteOldSegments()
val futureLog = futureLogs.get(topicPartition) val futureLog = futureLogs.get(topicPartition)
if (futureLog != null) { if (futureLog != null) {
// clean future logs // clean future logs
debug("Garbage collecting future log '" + futureLog.name + "'") debug(s"Garbage collecting future log '${futureLog.name}'")
total += futureLog.deleteOldSegments() total += futureLog.deleteOldSegments()
} }
} }
@ -910,7 +910,7 @@ class LogManager(logDirs: Seq[File],
} }
} }
debug("Log cleanup completed. " + total + " files deleted in " + debug(s"Log cleanup completed. $total files deleted in " +
(time.milliseconds - startMs) / 1000 + " seconds") (time.milliseconds - startMs) / 1000 + " seconds")
} }
@ -952,13 +952,13 @@ class LogManager(logDirs: Seq[File],
for ((topicPartition, log) <- currentLogs.toList ++ futureLogs.toList) { for ((topicPartition, log) <- currentLogs.toList ++ futureLogs.toList) {
try { try {
val timeSinceLastFlush = time.milliseconds - log.lastFlushTime val timeSinceLastFlush = time.milliseconds - log.lastFlushTime
debug("Checking if flush is needed on " + topicPartition.topic + " flush interval " + log.config.flushMs + debug(s"Checking if flush is needed on ${topicPartition.topic} flush interval ${log.config.flushMs}" +
" last flushed " + log.lastFlushTime + " time since last flush: " + timeSinceLastFlush) s" last flushed ${log.lastFlushTime} time since last flush: $timeSinceLastFlush")
if(timeSinceLastFlush >= log.config.flushMs) if(timeSinceLastFlush >= log.config.flushMs)
log.flush log.flush
} catch { } catch {
case e: Throwable => case e: Throwable =>
error("Error flushing topic " + topicPartition.topic, e) error(s"Error flushing topic ${topicPartition.topic}", e)
} }
} }
} }

View File

@ -20,7 +20,6 @@ package kafka.log
import java.io.File import java.io.File
import java.nio.ByteBuffer import java.nio.ByteBuffer
import kafka.common.IndexOffsetOverflowException
import kafka.utils.CoreUtils.inLock import kafka.utils.CoreUtils.inLock
import org.apache.kafka.common.errors.InvalidOffsetException import org.apache.kafka.common.errors.InvalidOffsetException

View File

@ -133,7 +133,7 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
} }
} }
) )
info("Started " + acceptors.size + " acceptor threads") info(s"Started ${acceptors.size} acceptor threads")
} }
/** /**
@ -335,7 +335,7 @@ private[kafka] abstract class AbstractServerThread(connectionQuotas: ConnectionQ
*/ */
def close(channel: SocketChannel): Unit = { def close(channel: SocketChannel): Unit = {
if (channel != null) { if (channel != null) {
debug("Closing connection from " + channel.socket.getRemoteSocketAddress()) debug(s"Closing connection from ${channel.socket.getRemoteSocketAddress()}")
connectionQuotas.dec(channel.socket.getInetAddress) connectionQuotas.dec(channel.socket.getInetAddress)
CoreUtils.swallow(channel.socket().close(), this, Level.ERROR) CoreUtils.swallow(channel.socket().close(), this, Level.ERROR)
CoreUtils.swallow(channel.close(), this, Level.ERROR) CoreUtils.swallow(channel.close(), this, Level.ERROR)
@ -627,7 +627,7 @@ private[kafka] class Processor(val id: Int,
} }
} }
} finally { } finally {
debug("Closing selector - processor " + id) debug(s"Closing selector - processor $id")
CoreUtils.swallow(closeAll(), this, Level.ERROR) CoreUtils.swallow(closeAll(), this, Level.ERROR)
shutdownComplete() shutdownComplete()
} }
@ -658,7 +658,7 @@ private[kafka] class Processor(val id: Int,
// There is no response to send to the client, we need to read more pipelined requests // There is no response to send to the client, we need to read more pipelined requests
// that are sitting in the server's socket buffer // that are sitting in the server's socket buffer
updateRequestMetrics(response) updateRequestMetrics(response)
trace("Socket server received empty response to send, registering for read: " + response) trace(s"Socket server received empty response to send, registering for read: $response")
// Try unmuting the channel. If there was no quota violation and the channel has not been throttled, // Try unmuting the channel. If there was no quota violation and the channel has not been throttled,
// it will be unmuted immediately. If the channel has been throttled, it will be unmuted only if the // it will be unmuted immediately. If the channel has been throttled, it will be unmuted only if the
// throttling delay has already passed by now. // throttling delay has already passed by now.

View File

@ -161,7 +161,7 @@ class DelegationTokenManager(val config: KafkaConfig,
val tokenCache: DelegationTokenCache, val tokenCache: DelegationTokenCache,
val time: Time, val time: Time,
val zkClient: KafkaZkClient) extends Logging with KafkaMetricsGroup { val zkClient: KafkaZkClient) extends Logging with KafkaMetricsGroup {
this.logIdent = "[Token Manager on Broker " + config.brokerId + "]: " this.logIdent = s"[Token Manager on Broker ${config.brokerId}]: "
import DelegationTokenManager._ import DelegationTokenManager._
@ -201,7 +201,7 @@ class DelegationTokenManager(val config: KafkaConfig,
private def loadCache() { private def loadCache() {
lock.synchronized { lock.synchronized {
val tokens = zkClient.getChildren(DelegationTokensZNode.path) val tokens = zkClient.getChildren(DelegationTokensZNode.path)
info(s"Loading the token cache. Total token count : " + tokens.size) info(s"Loading the token cache. Total token count: ${tokens.size}")
for (tokenId <- tokens) { for (tokenId <- tokens) {
try { try {
getTokenFromZk(tokenId) match { getTokenFromZk(tokenId) match {
@ -209,7 +209,7 @@ class DelegationTokenManager(val config: KafkaConfig,
case None => case None =>
} }
} catch { } catch {
case ex: Throwable => error(s"Error while getting Token for tokenId :$tokenId", ex) case ex: Throwable => error(s"Error while getting Token for tokenId: $tokenId", ex)
} }
} }
} }
@ -279,7 +279,7 @@ class DelegationTokenManager(val config: KafkaConfig,
val hmac = createHmac(tokenId, secretKey) val hmac = createHmac(tokenId, secretKey)
val token = new DelegationToken(tokenInfo, hmac) val token = new DelegationToken(tokenInfo, hmac)
updateToken(token) updateToken(token)
info(s"Created a delegation token : $tokenId for owner : $owner") info(s"Created a delegation token: $tokenId for owner: $owner")
responseCallback(CreateTokenResult(issueTimeStamp, expiryTimeStamp, maxLifeTimeStamp, tokenId, hmac, Errors.NONE)) responseCallback(CreateTokenResult(issueTimeStamp, expiryTimeStamp, maxLifeTimeStamp, tokenId, hmac, Errors.NONE))
} }
} }
@ -317,7 +317,7 @@ class DelegationTokenManager(val config: KafkaConfig,
tokenInfo.setExpiryTimestamp(expiryTimeStamp) tokenInfo.setExpiryTimestamp(expiryTimeStamp)
updateToken(token) updateToken(token)
info(s"Delegation token renewed for token : " + tokenInfo.tokenId + " for owner :" + tokenInfo.owner) info(s"Delegation token renewed for token: ${tokenInfo.tokenId} for owner: ${tokenInfo.owner}")
renewCallback(Errors.NONE, expiryTimeStamp) renewCallback(Errors.NONE, expiryTimeStamp)
} }
} }
@ -412,7 +412,7 @@ class DelegationTokenManager(val config: KafkaConfig,
expireResponseCallback(Errors.DELEGATION_TOKEN_EXPIRED, -1) expireResponseCallback(Errors.DELEGATION_TOKEN_EXPIRED, -1)
} else if (expireLifeTimeMs < 0) { //expire immediately } else if (expireLifeTimeMs < 0) { //expire immediately
removeToken(tokenInfo.tokenId) removeToken(tokenInfo.tokenId)
info(s"Token expired for token : " + tokenInfo.tokenId + " for owner :" + tokenInfo.owner) info(s"Token expired for token: ${tokenInfo.tokenId} for owner: ${tokenInfo.owner}")
expireResponseCallback(Errors.NONE, now) expireResponseCallback(Errors.NONE, now)
} else { } else {
//set expiry time stamp //set expiry time stamp
@ -420,7 +420,7 @@ class DelegationTokenManager(val config: KafkaConfig,
tokenInfo.setExpiryTimestamp(expiryTimeStamp) tokenInfo.setExpiryTimestamp(expiryTimeStamp)
updateToken(token) updateToken(token)
info(s"Updated expiry time for token : " + tokenInfo.tokenId + " for owner :" + tokenInfo.owner) info(s"Updated expiry time for token: ${tokenInfo.tokenId} for owner: ${tokenInfo.owner}")
expireResponseCallback(Errors.NONE, expiryTimeStamp) expireResponseCallback(Errors.NONE, expiryTimeStamp)
} }
} }
@ -457,7 +457,7 @@ class DelegationTokenManager(val config: KafkaConfig,
for (tokenInfo <- getAllTokenInformation) { for (tokenInfo <- getAllTokenInformation) {
val now = time.milliseconds val now = time.milliseconds
if (tokenInfo.maxTimestamp < now || tokenInfo.expiryTimestamp < now) { if (tokenInfo.maxTimestamp < now || tokenInfo.expiryTimestamp < now) {
info(s"Delegation token expired for token : " + tokenInfo.tokenId + " for owner :" + tokenInfo.owner) info(s"Delegation token expired for token: ${tokenInfo.tokenId} for owner: ${tokenInfo.owner}")
removeToken(tokenInfo.tokenId) removeToken(tokenInfo.tokenId)
} }
} }
@ -480,7 +480,7 @@ class DelegationTokenManager(val config: KafkaConfig,
override def processNotification(tokenIdBytes: Array[Byte]) { override def processNotification(tokenIdBytes: Array[Byte]) {
lock.synchronized { lock.synchronized {
val tokenId = new String(tokenIdBytes, StandardCharsets.UTF_8) val tokenId = new String(tokenIdBytes, StandardCharsets.UTF_8)
info(s"Processing Token Notification for tokenId : $tokenId") info(s"Processing Token Notification for tokenId: $tokenId")
getTokenFromZk(tokenId) match { getTokenFromZk(tokenId) match {
case Some(token) => updateCache(token) case Some(token) => updateCache(token)
case None => removeCache(tokenId) case None => removeCache(tokenId)

View File

@ -42,7 +42,7 @@ class ThrottledChannel(val request: RequestChannel.Request, val time: Time, val
// Notify the socket server that throttling has been done for this channel. // Notify the socket server that throttling has been done for this channel.
def notifyThrottlingDone(): Unit = { def notifyThrottlingDone(): Unit = {
trace("Channel throttled for: " + throttleTimeMs + " ms") trace(s"Channel throttled for: $throttleTimeMs ms")
channelThrottlingCallback(new network.RequestChannel.EndThrottlingResponse(request)) channelThrottlingCallback(new network.RequestChannel.EndThrottlingResponse(request))
} }

View File

@ -359,7 +359,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
record.headers) record.headers)
override def run() { override def run() {
info("Starting mirror maker thread " + threadName) info(s"Starting mirror maker thread $threadName")
try { try {
consumerWrapper.init() consumerWrapper.init()
@ -425,7 +425,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
def shutdown() { def shutdown() {
try { try {
info(threadName + " shutting down") info(s"$threadName shutting down")
shuttingDown = true shuttingDown = true
consumerWrapper.wakeup() consumerWrapper.wakeup()
} }

View File

@ -118,7 +118,7 @@ object ReplicaVerificationTool extends Logging {
try Pattern.compile(regex) try Pattern.compile(regex)
catch { catch {
case _: PatternSyntaxException => case _: PatternSyntaxException =>
throw new RuntimeException(regex + " is an invalid regex.") throw new RuntimeException(s"$regex is an invalid regex.")
} }
val fetchSize = options.valueOf(fetchSizeOpt).intValue val fetchSize = options.valueOf(fetchSizeOpt).intValue
@ -199,7 +199,7 @@ object ReplicaVerificationTool extends Logging {
} }
}) })
fetcherThreads.foreach(_.start()) fetcherThreads.foreach(_.start())
println(ReplicaVerificationTool.getCurrentTimeString() + ": verification process is started.") println(s"${ReplicaVerificationTool.getCurrentTimeString()}: verification process is started.")
} }
@ -300,7 +300,7 @@ private class ReplicaBuffer(expectedReplicasPerTopicPartition: Map[TopicPartitio
debug("Begin verification") debug("Begin verification")
maxLag = -1L maxLag = -1L
for ((topicPartition, fetchResponsePerReplica) <- recordsCache) { for ((topicPartition, fetchResponsePerReplica) <- recordsCache) {
debug("Verifying " + topicPartition) debug(s"Verifying $topicPartition")
assert(fetchResponsePerReplica.size == expectedReplicasPerTopicPartition(topicPartition), assert(fetchResponsePerReplica.size == expectedReplicasPerTopicPartition(topicPartition),
"fetched " + fetchResponsePerReplica.size + " replicas for " + topicPartition + ", but expected " "fetched " + fetchResponsePerReplica.size + " replicas for " + topicPartition + ", but expected "
+ expectedReplicasPerTopicPartition(topicPartition) + " replicas") + expectedReplicasPerTopicPartition(topicPartition) + " replicas")

View File

@ -147,7 +147,7 @@ object CoreUtils extends Logging {
} }
} catch { } catch {
case e: Exception => { case e: Exception => {
error("Failed to register Mbean " + name, e) error(s"Failed to register Mbean $name", e)
false false
} }
} }

View File

@ -36,7 +36,7 @@ class FileLock(val file: File) extends Logging {
*/ */
def lock() { def lock() {
this synchronized { this synchronized {
trace("Acquiring lock on " + file.getAbsolutePath) trace(s"Acquiring lock on ${file.getAbsolutePath}")
flock = channel.lock() flock = channel.lock()
} }
} }
@ -46,7 +46,7 @@ class FileLock(val file: File) extends Logging {
*/ */
def tryLock(): Boolean = { def tryLock(): Boolean = {
this synchronized { this synchronized {
trace("Acquiring lock on " + file.getAbsolutePath) trace(s"Acquiring lock on ${file.getAbsolutePath}")
try { try {
// weirdly this method will return null if the lock is held by another // weirdly this method will return null if the lock is held by another
// process, but will throw an exception if the lock is held by this process // process, but will throw an exception if the lock is held by this process
@ -64,7 +64,7 @@ class FileLock(val file: File) extends Logging {
*/ */
def unlock() { def unlock() {
this synchronized { this synchronized {
trace("Releasing lock on " + file.getAbsolutePath) trace(s"Releasing lock on ${file.getAbsolutePath}")
if(flock != null) if(flock != null)
flock.release() flock.release()
} }

View File

@ -113,7 +113,7 @@ class KafkaScheduler(val threads: Int,
trace("Beginning execution of scheduled task '%s'.".format(name)) trace("Beginning execution of scheduled task '%s'.".format(name))
fun() fun()
} catch { } catch {
case t: Throwable => error("Uncaught exception in scheduled task '" + name +"'", t) case t: Throwable => error(s"Uncaught exception in scheduled task '$name'", t)
} finally { } finally {
trace("Completed execution of scheduled task '%s'.".format(name)) trace("Completed execution of scheduled task '%s'.".format(name))
} }

View File

@ -220,7 +220,7 @@ class ZkUtils(val zkClient: ZkClient,
+ "Probably this controller is still using the old format [%s] to store the broker id in zookeeper".format(controllerInfoString)) + "Probably this controller is still using the old format [%s] to store the broker id in zookeeper".format(controllerInfoString))
try controllerInfoString.toInt try controllerInfoString.toInt
catch { catch {
case t: Throwable => throw new KafkaException("Failed to parse the controller info: " + controllerInfoString + ". This is neither the new or the old format.", t) case t: Throwable => throw new KafkaException(s"Failed to parse the controller info: $controllerInfoString. This is neither the new or the old format.", t)
} }
} }
} }
@ -415,11 +415,11 @@ class ZkUtils(val zkClient: ZkClient,
case _: ZkNoNodeException => // the node disappeared; treat as if node existed and let caller handles this case _: ZkNoNodeException => // the node disappeared; treat as if node existed and let caller handles this
} }
if (storedData == null || storedData != data) { if (storedData == null || storedData != data) {
info("conflict in " + path + " data: " + data + " stored data: " + storedData) info(s"conflict in $path data: $data stored data: $storedData")
throw e throw e
} else { } else {
// otherwise, the creation succeeded, return normally // otherwise, the creation succeeded, return normally
info(path + " exists with value " + data + " during connection loss; this is ok") info(s"$path exists with value $data during connection loss; this is ok")
} }
} }
} }

View File

@ -25,7 +25,7 @@ import kafka.network.SocketServer
import kafka.security.auth._ import kafka.security.auth._
import kafka.server.{BaseRequestTest, KafkaConfig} import kafka.server.{BaseRequestTest, KafkaConfig}
import kafka.utils.TestUtils import kafka.utils.TestUtils
import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig, NewPartitions} import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig}
import org.apache.kafka.clients.consumer._ import org.apache.kafka.clients.consumer._
import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener
import org.apache.kafka.clients.producer._ import org.apache.kafka.clients.producer._

View File

@ -18,7 +18,7 @@
package kafka.api package kafka.api
import com.yammer.metrics.Metrics import com.yammer.metrics.Metrics
import com.yammer.metrics.core.{Gauge, Metric, MetricName} import com.yammer.metrics.core.Gauge
import java.io.File import java.io.File
import java.util.ArrayList import java.util.ArrayList

View File

@ -23,7 +23,7 @@ import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.FencedLeaderEpochException import org.apache.kafka.common.errors.FencedLeaderEpochException
import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.MemoryRecords import org.apache.kafka.common.record.MemoryRecords
import org.apache.kafka.common.requests.{FetchRequest, IsolationLevel} import org.apache.kafka.common.requests.FetchRequest
import org.easymock.{EasyMock, EasyMockSupport} import org.easymock.{EasyMock, EasyMockSupport}
import org.junit.Test import org.junit.Test
import org.junit.Assert._ import org.junit.Assert._

View File

@ -25,7 +25,6 @@ import kafka.server.{BrokerTopicStats, LogDirFailureChannel}
import kafka.utils._ import kafka.utils._
import org.apache.kafka.clients.consumer.OffsetOutOfRangeException import org.apache.kafka.clients.consumer.OffsetOutOfRangeException
import org.apache.kafka.common.record.FileRecords import org.apache.kafka.common.record.FileRecords
import org.apache.kafka.common.requests.IsolationLevel
import org.apache.kafka.common.utils.Utils import org.apache.kafka.common.utils.Utils
/** /**

View File

@ -28,7 +28,6 @@ import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.internals.Topic.TRANSACTION_STATE_TOPIC_NAME import org.apache.kafka.common.internals.Topic.TRANSACTION_STATE_TOPIC_NAME
import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record._ import org.apache.kafka.common.record._
import org.apache.kafka.common.requests.IsolationLevel
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.requests.TransactionResult import org.apache.kafka.common.requests.TransactionResult
import org.apache.kafka.common.utils.MockTime import org.apache.kafka.common.utils.MockTime

View File

@ -17,7 +17,6 @@
package kafka.server package kafka.server
import java.lang.{Long => JLong}
import java.net.InetAddress import java.net.InetAddress
import java.util import java.util
import java.util.{Collections, Optional} import java.util.{Collections, Optional}

View File

@ -22,7 +22,6 @@ import java.util.Properties
import kafka.api.{ApiVersion, KAFKA_0_8_2} import kafka.api.{ApiVersion, KAFKA_0_8_2}
import kafka.cluster.EndPoint import kafka.cluster.EndPoint
import kafka.message._ import kafka.message._
import kafka.metrics.KafkaMetricsConfig
import kafka.utils.{CoreUtils, TestUtils} import kafka.utils.{CoreUtils, TestUtils}
import org.apache.kafka.common.config.ConfigException import org.apache.kafka.common.config.ConfigException
import org.apache.kafka.common.metrics.Sensor import org.apache.kafka.common.metrics.Sensor

View File

@ -30,7 +30,6 @@ import org.apache.kafka.common.record.{CompressionType, MemoryRecords, SimpleRec
import org.apache.kafka.common.requests.FetchRequest.PartitionData import org.apache.kafka.common.requests.FetchRequest.PartitionData
import org.easymock.EasyMock import org.easymock.EasyMock
import EasyMock._ import EasyMock._
import org.apache.kafka.common.requests.IsolationLevel
import org.junit.Assert._ import org.junit.Assert._
import org.junit.{After, Test} import org.junit.{After, Test}

View File

@ -32,7 +32,6 @@ import java.util.concurrent.atomic.AtomicBoolean
import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.record.{CompressionType, MemoryRecords, SimpleRecord} import org.apache.kafka.common.record.{CompressionType, MemoryRecords, SimpleRecord}
import org.apache.kafka.common.requests.IsolationLevel
import org.easymock.EasyMock import org.easymock.EasyMock
import org.junit.Assert._ import org.junit.Assert._

View File

@ -19,7 +19,6 @@ package kafka.server.epoch
import java.io.File import java.io.File
import kafka.server.LogOffsetMetadata
import kafka.server.checkpoints.{LeaderEpochCheckpoint, LeaderEpochCheckpointFile} import kafka.server.checkpoints.{LeaderEpochCheckpoint, LeaderEpochCheckpointFile}
import org.apache.kafka.common.requests.EpochEndOffset.{UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET} import org.apache.kafka.common.requests.EpochEndOffset.{UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET}
import kafka.utils.TestUtils import kafka.utils.TestUtils