From 43fb2df7a4b2bc7637dcba9436a5435cdcb4fb27 Mon Sep 17 00:00:00 2001 From: Kamal C Date: Thu, 30 Mar 2017 13:17:09 +0100 Subject: [PATCH] MINOR: Map `mkString` format updated to default java format This is a minor change but it helps to improve the log readability. Author: Kamal C Reviewers: Ismael Juma Closes #2709 from Kamal15/util --- .../kafka/clients/CommonClientConfigs.java | 2 +- .../common/requests/CreateTopicsRequest.java | 7 ++- .../common/requests/DeleteRecordsRequest.java | 3 +- .../kafka/common/requests/FetchRequest.java | 3 +- .../common/requests/LeaderAndIsrRequest.java | 2 +- .../common/requests/ListOffsetRequest.java | 5 +- .../common/requests/OffsetCommitRequest.java | 3 +- .../kafka/common/requests/ProduceRequest.java | 2 +- .../requests/UpdateMetadataRequest.java | 4 +- .../org/apache/kafka/common/utils/Utils.java | 47 +++++-------------- .../apache/kafka/common/utils/UtilsTest.java | 2 +- .../internals/StreamPartitionAssignor.java | 2 +- .../kafka/streams/StreamsConfigTest.java | 4 +- .../kafka/tools/ClientCompatibilityTest.java | 3 +- 14 files changed, 30 insertions(+), 59 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java index 5006ee2dafa..b2c8937d51a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java +++ b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java @@ -73,7 +73,7 @@ public class CommonClientConfigs { public static final String METRIC_REPORTER_CLASSES_DOC = "A list of classes to use as metrics reporters. Implementing the MetricReporter interface allows plugging in classes that will be notified of new metric creation. The JmxReporter is always included to register JMX statistics."; public static final String SECURITY_PROTOCOL_CONFIG = "security.protocol"; - public static final String SECURITY_PROTOCOL_DOC = "Protocol used to communicate with brokers. Valid values are: " + Utils.mkString(nonTestingSecurityProtocolNames(), ", ") + "."; + public static final String SECURITY_PROTOCOL_DOC = "Protocol used to communicate with brokers. Valid values are: " + Utils.join(nonTestingSecurityProtocolNames(), ", ") + "."; public static final String DEFAULT_SECURITY_PROTOCOL = "PLAINTEXT"; public static final String CONNECTIONS_MAX_IDLE_MS_CONFIG = "connections.max.idle.ms"; diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java index 673810d5b06..072dde8cd36 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java @@ -20,7 +20,6 @@ import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.types.Struct; -import org.apache.kafka.common.utils.Utils; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -88,8 +87,8 @@ public class CreateTopicsRequest extends AbstractRequest { StringBuilder bld = new StringBuilder(); bld.append("(numPartitions=").append(numPartitions). append(", replicationFactor=").append(replicationFactor). - append(", replicasAssignments=").append(Utils.mkString(replicasAssignments)). - append(", configs=").append(Utils.mkString(configs)). + append(", replicasAssignments=").append(replicasAssignments). + append(", configs=").append(configs). append(")"); return bld.toString(); } @@ -123,7 +122,7 @@ public class CreateTopicsRequest extends AbstractRequest { public String toString() { StringBuilder bld = new StringBuilder(); bld.append("(type=CreateTopicsRequest"). - append(", topics=").append(Utils.mkString(topics)). + append(", topics=").append(topics). append(", timeout=").append(timeout). append(", validateOnly=").append(validateOnly). append(")"); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsRequest.java index f204c44a5f0..96f064c4e08 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsRequest.java @@ -22,7 +22,6 @@ import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.utils.CollectionUtils; -import org.apache.kafka.common.utils.Utils; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashMap; @@ -68,7 +67,7 @@ public class DeleteRecordsRequest extends AbstractRequest { StringBuilder builder = new StringBuilder(); builder.append("(type=DeleteRecordsRequest") .append(", timeout=").append(timeout) - .append(", partitionOffsets=(").append(Utils.mkString(partitionOffsets)) + .append(", partitionOffsets=(").append(partitionOffsets) .append("))"); return builder.toString(); } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java index 6549f503556..8cd281887bd 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java @@ -21,7 +21,6 @@ import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.record.MemoryRecords; -import org.apache.kafka.common.utils.Utils; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -151,7 +150,7 @@ public class FetchRequest extends AbstractRequest { append(", maxWait=").append(maxWait). append(", minBytes=").append(minBytes). append(", maxBytes=").append(maxBytes). - append(", fetchData=").append(Utils.mkString(fetchData)). + append(", fetchData=").append(fetchData). append(")"); return bld.toString(); } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java index f51cfa918b8..884375507d7 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java @@ -77,7 +77,7 @@ public class LeaderAndIsrRequest extends AbstractRequest { bld.append("(type=LeaderAndIsRequest") .append(", controllerId=").append(controllerId) .append(", controllerEpoch=").append(controllerEpoch) - .append(", partitionStates=").append(Utils.mkString(partitionStates)) + .append(", partitionStates=").append(partitionStates) .append(", liveLeaders=(").append(Utils.join(liveLeaders, ", ")).append(")") .append(")"); return bld.toString(); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java index 1d62a963d91..33270717ef2 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java @@ -22,7 +22,6 @@ import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.utils.CollectionUtils; -import org.apache.kafka.common.utils.Utils; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -128,10 +127,10 @@ public class ListOffsetRequest extends AbstractRequest { bld.append("(type=ListOffsetRequest") .append(", replicaId=").append(replicaId); if (offsetData != null) { - bld.append(", offsetData=").append(Utils.mkString(offsetData)); + bld.append(", offsetData=").append(offsetData); } if (partitionTimestamps != null) { - bld.append(", partitionTimestamps=").append(Utils.mkString(partitionTimestamps)); + bld.append(", partitionTimestamps=").append(partitionTimestamps); } bld.append(", minVersion=").append(minVersion); bld.append(")"); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java index 6459201ad3f..45975d0a76b 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java @@ -23,7 +23,6 @@ import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.utils.CollectionUtils; -import org.apache.kafka.common.utils.Utils; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -149,7 +148,7 @@ public class OffsetCommitRequest extends AbstractRequest { append(", memberId=").append(memberId). append(", generationId=").append(generationId). append(", retentionTime=").append(retentionTime). - append(", offsetData=").append(Utils.mkString(offsetData)). + append(", offsetData=").append(offsetData). append(")"); return bld.toString(); } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java index 0010ad65ba4..76313910dc8 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java @@ -82,7 +82,7 @@ public class ProduceRequest extends AbstractRequest { .append(", magic=").append(magic) .append(", acks=").append(acks) .append(", timeout=").append(timeout) - .append(", partitionRecords=(").append(Utils.mkString(partitionRecords)) + .append(", partitionRecords=(").append(partitionRecords) .append("))"); return bld.toString(); } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java index 41b0c84c1f0..fc7a33f91e5 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java @@ -67,8 +67,8 @@ public class UpdateMetadataRequest extends AbstractRequest { bld.append("(type: UpdateMetadataRequest="). append(", controllerId=").append(controllerId). append(", controllerEpoch=").append(controllerEpoch). - append(", partitionStates=").append(Utils.mkString(partitionStates)). - append(", liveBrokers=").append(Utils.join(liveBrokers, " ,")). + append(", partitionStates=").append(partitionStates). + append(", liveBrokers=").append(Utils.join(liveBrokers, ", ")). append(")"); return bld.toString(); } diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java index 696d1456017..796b019ccf0 100755 --- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java @@ -352,43 +352,39 @@ public class Utils { /** * Create a string representation of an array joined by the given separator * @param strs The array of items - * @param seperator The separator + * @param separator The separator * @return The string representation. */ - public static String join(T[] strs, String seperator) { - return join(Arrays.asList(strs), seperator); + public static String join(T[] strs, String separator) { + return join(Arrays.asList(strs), separator); } /** * Create a string representation of a list joined by the given separator * @param list The list of items - * @param seperator The separator + * @param separator The separator * @return The string representation. */ - public static String join(Collection list, String seperator) { + public static String join(Collection list, String separator) { StringBuilder sb = new StringBuilder(); Iterator iter = list.iterator(); while (iter.hasNext()) { sb.append(iter.next()); if (iter.hasNext()) - sb.append(seperator); + sb.append(separator); } return sb.toString(); } - public static String mkString(Map map) { - return mkString(map, "{", "}", "=", " ,"); - } - public static String mkString(Map map, String begin, String end, - String keyValueSeparator, String elementSeperator) { + String keyValueSeparator, String elementSeparator) { StringBuilder bld = new StringBuilder(); bld.append(begin); String prefix = ""; for (Map.Entry entry : map.entrySet()) { bld.append(prefix).append(entry.getKey()). append(keyValueSeparator).append(entry.getValue()); - prefix = elementSeperator; + prefix = elementSeparator; } bld.append(end); return bld.toString(); @@ -439,7 +435,7 @@ public class Utils { thread.setDaemon(daemon); thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { public void uncaughtException(Thread t, Throwable e) { - log.error("Uncaught exception in thread '" + t.getName() + "':", e); + log.error("Uncaught exception in thread '{}':", t.getName(), e); } }); return thread; @@ -544,25 +540,6 @@ public class Utils { return Arrays.asList(elems); } - /* - * Create a string from a collection - * @param coll the collection - * @param separator the separator - */ - public static CharSequence mkString(Collection coll, String separator) { - StringBuilder sb = new StringBuilder(); - Iterator iter = coll.iterator(); - if (iter.hasNext()) { - sb.append(iter.next().toString()); - - while (iter.hasNext()) { - sb.append(separator); - sb.append(iter.next().toString()); - } - } - return sb; - } - /** * Recursively delete the given file/directory and any subfiles (if any exist) * @@ -624,8 +601,8 @@ public class Utils { } catch (IOException outer) { try { Files.move(source, target, StandardCopyOption.REPLACE_EXISTING); - log.debug("Non-atomic move of " + source + " to " + target + " succeeded after atomic move failed due to " - + outer.getMessage()); + log.debug("Non-atomic move of {} to {} succeeded after atomic move failed due to {}", source, target, + outer.getMessage()); } catch (IOException inner) { inner.addSuppressed(outer); throw inner; @@ -663,7 +640,7 @@ public class Utils { try { closeable.close(); } catch (Throwable t) { - log.warn("Failed to close " + name, t); + log.warn("Failed to close {}", name, t); } } } diff --git a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java index 16742d502dc..512c29ce5fd 100755 --- a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java @@ -81,7 +81,7 @@ public class UtilsTest { assertEquals("1", Utils.join(Arrays.asList("1"), ",")); assertEquals("1,2,3", Utils.join(Arrays.asList(1, 2, 3), ",")); } - + @Test public void testAbs() { assertEquals(0, Utils.abs(Integer.MIN_VALUE)); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java index 859d6611228..24e6709bc60 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java @@ -736,7 +736,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable } else if (numPartitions != partitions) { final String[] topics = copartitionGroup.toArray(new String[copartitionGroup.size()]); Arrays.sort(topics); - throw new TopologyBuilderException(String.format("stream-thread [%s] Topics not co-partitioned: [%s]", threadName, Utils.mkString(Arrays.asList(topics), ","))); + throw new TopologyBuilderException(String.format("stream-thread [%s] Topics not co-partitioned: [%s]", threadName, Utils.join(Arrays.asList(topics), ","))); } } else if (allRepartitionTopicsNumPartitions.get(topic).numPartitions == NOT_AVAILABLE) { numPartitions = NOT_AVAILABLE; diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java index d345cbda229..612d7a2c8ae 100644 --- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java @@ -83,7 +83,7 @@ public class StreamsConfigTest { @Test public void defaultSerdeShouldBeConfigured() { - Map serializerConfigs = new HashMap(); + Map serializerConfigs = new HashMap<>(); serializerConfigs.put("key.serializer.encoding", "UTF8"); serializerConfigs.put("value.serializer.encoding", "UTF-16"); Serializer serializer = Serdes.String().serializer(); @@ -103,7 +103,7 @@ public class StreamsConfigTest { @Test public void shouldSupportMultipleBootstrapServers() { List expectedBootstrapServers = Arrays.asList("broker1:9092", "broker2:9092"); - String bootstrapServersString = Utils.mkString(expectedBootstrapServers, ",").toString(); + String bootstrapServersString = Utils.join(expectedBootstrapServers, ","); Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "irrelevant"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServersString); diff --git a/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java b/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java index 8274451d88f..2a7d3e6f071 100644 --- a/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java +++ b/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java @@ -39,7 +39,6 @@ import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.utils.Exit; import org.apache.kafka.common.utils.Time; -import org.apache.kafka.common.utils.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -208,7 +207,7 @@ public class ClientCompatibilityTest { @Override public String toString() { - return Utils.mkString(result); + return result.toString(); } }