diff --git a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java index a65e2467f1b..f5a08da2c93 100644 --- a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java +++ b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java @@ -24,7 +24,6 @@ import org.apache.kafka.common.metrics.MetricsReporter; import org.apache.kafka.common.security.auth.SecurityProtocol; import org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter; import org.apache.kafka.common.utils.Time; -import org.apache.kafka.common.utils.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -131,7 +130,7 @@ public class CommonClientConfigs { public static final String SECURITY_PROTOCOL_CONFIG = "security.protocol"; public static final String SECURITY_PROTOCOL_DOC = "Protocol used to communicate with brokers. Valid values are: " + - Utils.join(SecurityProtocol.names(), ", ") + "."; + String.join(", ", SecurityProtocol.names()) + "."; public static final String DEFAULT_SECURITY_PROTOCOL = "PLAINTEXT"; public static final String SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG = "socket.connection.setup.timeout.ms"; diff --git a/clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java b/clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java index e4f6f09f7e8..f925f15a7e8 100644 --- a/clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java @@ -25,7 +25,6 @@ import org.apache.kafka.common.requests.FetchMetadata; import org.apache.kafka.common.requests.FetchRequest.PartitionData; import org.apache.kafka.common.requests.FetchResponse; import org.apache.kafka.common.utils.LogContext; -import org.apache.kafka.common.utils.Utils; import org.slf4j.Logger; import java.util.ArrayList; @@ -40,6 +39,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import java.util.stream.Collectors; import static org.apache.kafka.common.requests.FetchMetadata.INVALID_SESSION_ID; @@ -392,14 +392,14 @@ public class FetchSessionHandler { if (!log.isTraceEnabled()) { return String.format("%d partition(s)", partitions.size()); } - return "(" + Utils.join(partitions, ", ") + ")"; + return "(" + partitions.stream().map(TopicPartition::toString).collect(Collectors.joining(", ")) + ")"; } private String topicIdPartitionsToLogString(Collection partitions) { if (!log.isTraceEnabled()) { return String.format("%d partition(s)", partitions.size()); } - return "(" + Utils.join(partitions, ", ") + ")"; + return "(" + partitions.stream().map(TopicIdPartition::toString).collect(Collectors.joining(", ")) + ")"; } /** @@ -438,16 +438,16 @@ public class FetchSessionHandler { extraIds = findMissing(ids, sessionTopicNames.keySet()); } if (!omitted.isEmpty()) { - bld.append("omittedPartitions=(").append(Utils.join(omitted, ", ")).append("), "); + bld.append("omittedPartitions=(").append(omitted.stream().map(TopicPartition::toString).collect(Collectors.joining(", "))).append("), "); } if (!extra.isEmpty()) { - bld.append("extraPartitions=(").append(Utils.join(extra, ", ")).append("), "); + bld.append("extraPartitions=(").append(extra.stream().map(TopicPartition::toString).collect(Collectors.joining(", "))).append("), "); } if (!extraIds.isEmpty()) { - bld.append("extraIds=(").append(Utils.join(extraIds, ", ")).append("), "); + bld.append("extraIds=(").append(extraIds.stream().map(Uuid::toString).collect(Collectors.joining(", "))).append("), "); } if ((!omitted.isEmpty()) || (!extra.isEmpty()) || (!extraIds.isEmpty())) { - bld.append("response=(").append(Utils.join(topicPartitions, ", ")).append(")"); + bld.append("response=(").append(topicPartitions.stream().map(TopicPartition::toString).collect(Collectors.joining(", "))).append(")"); return bld.toString(); } return null; @@ -470,11 +470,11 @@ public class FetchSessionHandler { findMissing(topicPartitions, sessionPartitions.keySet()); StringBuilder bld = new StringBuilder(); if (!extra.isEmpty()) - bld.append("extraPartitions=(").append(Utils.join(extra, ", ")).append("), "); + bld.append("extraPartitions=(").append(extra.stream().map(TopicPartition::toString).collect(Collectors.joining(", "))).append("), "); if (!extraIds.isEmpty()) - bld.append("extraIds=(").append(Utils.join(extraIds, ", ")).append("), "); + bld.append("extraIds=(").append(extraIds.stream().map(Uuid::toString).collect(Collectors.joining(", "))).append("), "); if ((!extra.isEmpty()) || (!extraIds.isEmpty())) { - bld.append("response=(").append(Utils.join(topicPartitions, ", ")).append(")"); + bld.append("response=(").append(topicPartitions.stream().map(TopicPartition::toString).collect(Collectors.joining(", "))).append(")"); return bld.toString(); } return null; @@ -499,7 +499,7 @@ public class FetchSessionHandler { } StringBuilder bld = new StringBuilder(); bld.append(" with response=("). - append(Utils.join(topicPartitions, ", ")). + append(topicPartitions.stream().map(TopicPartition::toString).collect(Collectors.joining(", "))). append(")"); String prefix = ", implied=("; String suffix = ""; diff --git a/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java b/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java index e1a4b879052..838718652f3 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java +++ b/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java @@ -185,7 +185,7 @@ public class NodeApiVersions { bld.append("("); if (lineBreaks) bld.append("\n\t"); - bld.append(Utils.join(apiKeysText.values(), separator)); + bld.append(String.join(separator, apiKeysText.values())); if (lineBreaks) bld.append("\n"); bld.append(")"); diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupDescription.java b/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupDescription.java index a9d555cb391..13ec5965eed 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupDescription.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupDescription.java @@ -21,13 +21,13 @@ import org.apache.kafka.common.ConsumerGroupState; import org.apache.kafka.common.GroupType; import org.apache.kafka.common.Node; import org.apache.kafka.common.acl.AclOperation; -import org.apache.kafka.common.utils.Utils; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.Objects; import java.util.Set; +import java.util.stream.Collectors; /** * A detailed description of a single consumer group in the cluster. @@ -161,7 +161,7 @@ public class ConsumerGroupDescription { public String toString() { return "(groupId=" + groupId + ", isSimpleConsumerGroup=" + isSimpleConsumerGroup + - ", members=" + Utils.join(members, ",") + + ", members=" + members.stream().map(MemberDescription::toString).collect(Collectors.joining(",")) + ", partitionAssignor=" + partitionAssignor + ", type=" + type + ", state=" + state + diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/MemberAssignment.java b/clients/src/main/java/org/apache/kafka/clients/admin/MemberAssignment.java index ac3f119b46a..495ddb09745 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/MemberAssignment.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/MemberAssignment.java @@ -17,12 +17,12 @@ package org.apache.kafka.clients.admin; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.utils.Utils; import java.util.Collections; import java.util.HashSet; import java.util.Objects; import java.util.Set; +import java.util.stream.Collectors; /** * A description of the assignments of a specific group member. @@ -64,6 +64,6 @@ public class MemberAssignment { @Override public String toString() { - return "(topicPartitions=" + Utils.join(topicPartitions, ",") + ")"; + return "(topicPartitions=" + topicPartitions.stream().map(TopicPartition::toString).collect(Collectors.joining(",")) + ")"; } } diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/TopicDescription.java b/clients/src/main/java/org/apache/kafka/clients/admin/TopicDescription.java index e8700d4d067..c3bbaf318a2 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/TopicDescription.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/TopicDescription.java @@ -20,12 +20,12 @@ package org.apache.kafka.clients.admin; import org.apache.kafka.common.TopicPartitionInfo; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.acl.AclOperation; -import org.apache.kafka.common.utils.Utils; import java.util.Collections; import java.util.List; import java.util.Objects; import java.util.Set; +import java.util.stream.Collectors; /** * A detailed description of a single topic in the cluster. @@ -135,6 +135,6 @@ public class TopicDescription { @Override public String toString() { return "(name=" + name + ", internal=" + internal + ", partitions=" + - Utils.join(partitions, ",") + ", authorizedOperations=" + authorizedOperations + ")"; + partitions.stream().map(TopicPartitionInfo::toString).collect(Collectors.joining(",")) + ", authorizedOperations=" + authorizedOperations + ")"; } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java index 02d97a9312f..f6f1ad9b37c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java @@ -133,7 +133,6 @@ import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.createSu import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.refreshCommittedOffsets; import static org.apache.kafka.common.utils.Utils.closeQuietly; import static org.apache.kafka.common.utils.Utils.isBlank; -import static org.apache.kafka.common.utils.Utils.join; import static org.apache.kafka.common.utils.Utils.swallow; /** @@ -1493,7 +1492,8 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { // See the ApplicationEventProcessor.process() method that handles this event for more detail. applicationEventHandler.add(new AssignmentChangeEvent(subscriptions.allConsumed(), time.milliseconds())); - log.info("Assigned to partition(s): {}", join(partitions, ", ")); + log.info("Assigned to partition(s): {}", partitions.stream().map(TopicPartition::toString).collect(Collectors.joining(", "))); + if (subscriptions.assignFromUser(new HashSet<>(partitions))) applicationEventHandler.add(new NewTopicsMetadataUpdateRequestEvent()); } finally { @@ -1845,7 +1845,7 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { } fetchBuffer.retainAll(currentTopicPartitions); - log.info("Subscribed to topic(s): {}", join(topics, ", ")); + log.info("Subscribed to topic(s): {}", String.join(", ", topics)); if (subscriptions.subscribe(new HashSet<>(topics), listener)) metadata.requestUpdateForNewTopics(); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerRebalanceListenerInvoker.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerRebalanceListenerInvoker.java index dcdd303fe95..1c055ba82f9 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerRebalanceListenerInvoker.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerRebalanceListenerInvoker.java @@ -24,12 +24,12 @@ import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; -import org.apache.kafka.common.utils.Utils; import org.slf4j.Logger; import java.util.Optional; import java.util.Set; import java.util.SortedSet; +import java.util.stream.Collectors; /** * This class encapsulates the invocation of the callback methods defined in the {@link ConsumerRebalanceListener} @@ -54,7 +54,7 @@ public class ConsumerRebalanceListenerInvoker { } public Exception invokePartitionsAssigned(final SortedSet assignedPartitions) { - log.info("Adding newly assigned partitions: {}", Utils.join(assignedPartitions, ", ")); + log.info("Adding newly assigned partitions: {}", assignedPartitions.stream().map(TopicPartition::toString).collect(Collectors.joining(", "))); Optional listener = subscriptions.rebalanceListener(); @@ -76,11 +76,11 @@ public class ConsumerRebalanceListenerInvoker { } public Exception invokePartitionsRevoked(final SortedSet revokedPartitions) { - log.info("Revoke previously assigned partitions {}", Utils.join(revokedPartitions, ", ")); + log.info("Revoke previously assigned partitions {}", revokedPartitions.stream().map(TopicPartition::toString).collect(Collectors.joining(", "))); Set revokePausedPartitions = subscriptions.pausedPartitions(); revokePausedPartitions.retainAll(revokedPartitions); if (!revokePausedPartitions.isEmpty()) - log.info("The pause flag in partitions [{}] will be removed due to revocation.", Utils.join(revokePausedPartitions, ", ")); + log.info("The pause flag in partitions [{}] will be removed due to revocation.", revokePausedPartitions.stream().map(TopicPartition::toString).collect(Collectors.joining(", "))); Optional listener = subscriptions.rebalanceListener(); @@ -102,11 +102,11 @@ public class ConsumerRebalanceListenerInvoker { } public Exception invokePartitionsLost(final SortedSet lostPartitions) { - log.info("Lost previously assigned partitions {}", Utils.join(lostPartitions, ", ")); + log.info("Lost previously assigned partitions {}", lostPartitions.stream().map(TopicPartition::toString).collect(Collectors.joining(", "))); Set lostPausedPartitions = subscriptions.pausedPartitions(); lostPausedPartitions.retainAll(lostPartitions); if (!lostPausedPartitions.isEmpty()) - log.info("The pause flag in partitions [{}] will be removed due to partition lost.", Utils.join(lostPausedPartitions, ", ")); + log.info("The pause flag in partitions [{}] will be removed due to partition lost.", lostPartitions.stream().map(TopicPartition::toString).collect(Collectors.joining(", "))); Optional listener = subscriptions.rebalanceListener(); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/LegacyKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/LegacyKafkaConsumer.java index bd9a0155e13..641bc81d8a2 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/LegacyKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/LegacyKafkaConsumer.java @@ -77,6 +77,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Pattern; +import java.util.stream.Collectors; import static org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG; import static org.apache.kafka.clients.consumer.ConsumerConfig.CLIENT_RACK_CONFIG; @@ -93,7 +94,6 @@ import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.createSu import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.configuredConsumerInterceptors; import static org.apache.kafka.common.utils.Utils.closeQuietly; import static org.apache.kafka.common.utils.Utils.isBlank; -import static org.apache.kafka.common.utils.Utils.join; import static org.apache.kafka.common.utils.Utils.swallow; /** @@ -473,7 +473,7 @@ public class LegacyKafkaConsumer implements ConsumerDelegate { fetcher.clearBufferedDataForUnassignedPartitions(currentTopicPartitions); - log.info("Subscribed to topic(s): {}", join(topics, ", ")); + log.info("Subscribed to topic(s): {}", String.join(", ", topics)); if (this.subscriptions.subscribe(new HashSet<>(topics), listener)) metadata.requestUpdateForNewTopics(); } @@ -571,7 +571,7 @@ public class LegacyKafkaConsumer implements ConsumerDelegate { if (coordinator != null) this.coordinator.maybeAutoCommitOffsetsAsync(time.milliseconds()); - log.info("Assigned to partition(s): {}", join(partitions, ", ")); + log.info("Assigned to partition(s): {}", partitions.stream().map(TopicPartition::toString).collect(Collectors.joining(", "))); if (this.subscriptions.assignFromUser(new HashSet<>(partitions))) metadata.requestUpdateForNewTopics(); } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java index 47107469959..8e93de5a24c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java @@ -40,7 +40,6 @@ import org.apache.kafka.common.telemetry.internals.ClientTelemetryProvider; import org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; -import org.apache.kafka.common.utils.Utils; import org.slf4j.Logger; import java.util.ArrayList; @@ -1181,7 +1180,7 @@ public class MembershipManagerImpl implements MembershipManager { * Visible for testing */ CompletableFuture revokePartitions(Set revokedPartitions) { - log.info("Revoking previously assigned partitions {}", Utils.join(revokedPartitions, ", ")); + log.info("Revoking previously assigned partitions {}", revokedPartitions.stream().map(TopicPartition::toString).collect(Collectors.joining(", "))); logPausedPartitionsBeingRevoked(revokedPartitions); @@ -1377,7 +1376,7 @@ public class MembershipManagerImpl implements MembershipManager { Set revokePausedPartitions = subscriptions.pausedPartitions(); revokePausedPartitions.retainAll(partitionsToRevoke); if (!revokePausedPartitions.isEmpty()) { - log.info("The pause flag in partitions [{}] will be removed due to revocation.", Utils.join(revokePausedPartitions, ", ")); + log.info("The pause flag in partitions [{}] will be removed due to revocation.", revokePausedPartitions.stream().map(TopicPartition::toString).collect(Collectors.joining(", "))); } } diff --git a/clients/src/main/java/org/apache/kafka/common/TopicPartitionInfo.java b/clients/src/main/java/org/apache/kafka/common/TopicPartitionInfo.java index 693ff92c775..73d7a27d946 100644 --- a/clients/src/main/java/org/apache/kafka/common/TopicPartitionInfo.java +++ b/clients/src/main/java/org/apache/kafka/common/TopicPartitionInfo.java @@ -17,11 +17,10 @@ package org.apache.kafka.common; -import org.apache.kafka.common.utils.Utils; - import java.util.Collections; import java.util.List; import java.util.Objects; +import java.util.stream.Collectors; /** * A class containing leadership, replicas and ISR information for a topic partition. @@ -116,10 +115,10 @@ public class TopicPartitionInfo { } public String toString() { - String elrString = elr != null ? Utils.join(elr, ", ") : "N/A"; - String lastKnownElrString = lastKnownElr != null ? Utils.join(lastKnownElr, ", ") : "N/A"; + String elrString = elr != null ? elr.stream().map(Node::toString).collect(Collectors.joining(", ")) : "N/A"; + String lastKnownElrString = lastKnownElr != null ? lastKnownElr.stream().map(Node::toString).collect(Collectors.joining(", ")) : "N/A"; return "(partition=" + partition + ", leader=" + leader + ", replicas=" + - Utils.join(replicas, ", ") + ", isr=" + Utils.join(isr, ", ") + + replicas.stream().map(Node::toString).collect(Collectors.joining(", ")) + ", isr=" + isr.stream().map(Node::toString).collect(Collectors.joining(", ")) + ", elr=" + elrString + ", lastKnownElr=" + lastKnownElrString + ")"; } diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java index e7fa08c804b..d82d06fa162 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java +++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java @@ -515,7 +515,7 @@ public class ConfigDef { // Check all configurations are defined List undefinedConfigKeys = undefinedDependentConfigs(); if (!undefinedConfigKeys.isEmpty()) { - String joined = Utils.join(undefinedConfigKeys, ","); + String joined = undefinedConfigKeys.stream().map(String::toString).collect(Collectors.joining(",")); throw new ConfigException("Some configurations in are referred in the dependents, but not defined: " + joined); } // parse all known keys @@ -806,7 +806,7 @@ public class ConfigDef { return parsedValue.toString(); case LIST: List valueList = (List) parsedValue; - return Utils.join(valueList, ","); + return valueList.stream().map(Object::toString).collect(Collectors.joining(",")); case CLASS: Class clazz = (Class) parsedValue; return clazz.getName(); @@ -1051,13 +1051,13 @@ public class ConfigDef { public void ensureValid(String name, Object o) { String s = (String) o; if (!validStrings.contains(s)) { - throw new ConfigException(name, o, "String must be one of: " + Utils.join(validStrings, ", ")); + throw new ConfigException(name, o, "String must be one of: " + String.join(", ", validStrings)); } } public String toString() { - return "[" + Utils.join(validStrings, ", ") + "]"; + return "[" + String.join(", ", validStrings) + "]"; } } @@ -1079,12 +1079,12 @@ public class ConfigDef { public void ensureValid(String name, Object o) { String s = (String) o; if (s == null || !validStrings.contains(s.toUpperCase(Locale.ROOT))) { - throw new ConfigException(name, o, "String must be one of (case insensitive): " + Utils.join(validStrings, ", ")); + throw new ConfigException(name, o, "String must be one of (case insensitive): " + String.join(", ", validStrings)); } } public String toString() { - return "(case insensitive) [" + Utils.join(validStrings, ", ") + "]"; + return "(case insensitive) [" + String.join(", ", validStrings) + "]"; } } @@ -1205,7 +1205,8 @@ public class ConfigDef { } if (!foundIllegalCharacters.isEmpty()) { - throw new ConfigException(name, value, "String may not contain control sequences but had the following ASCII chars: " + Utils.join(foundIllegalCharacters, ", ")); + throw new ConfigException(name, value, "String may not contain control sequences but had the following ASCII chars: " + + foundIllegalCharacters.stream().map(Object::toString).collect(Collectors.joining(", "))); } } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java index 3a286225a65..2065a15d942 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java @@ -28,7 +28,6 @@ import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.ByteBufferAccessor; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.record.RecordBatch; -import org.apache.kafka.common.utils.Utils; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -325,8 +324,8 @@ public class FetchRequest extends AbstractRequest { append(", maxBytes=").append(maxBytes). append(", fetchData=").append(toFetch). append(", isolationLevel=").append(isolationLevel). - append(", removed=").append(Utils.join(removed, ", ")). - append(", replaced=").append(Utils.join(replaced, ", ")). + append(", removed=").append(removed.stream().map(TopicIdPartition::toString).collect(Collectors.joining(", "))). + append(", replaced=").append(replaced.stream().map(TopicIdPartition::toString).collect(Collectors.joining(", "))). append(", metadata=").append(metadata). append(", rackId=").append(rackId). append(")"); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java index 251810e30e9..8caddb00541 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java @@ -29,7 +29,6 @@ import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.ByteBufferAccessor; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.utils.FlattenedIterator; -import org.apache.kafka.common.utils.Utils; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -120,7 +119,7 @@ public final class LeaderAndIsrRequest extends AbstractControlRequest { .append(", brokerEpoch=").append(brokerEpoch) .append(", partitionStates=").append(partitionStates) .append(", topicIds=").append(topicIds) - .append(", liveLeaders=(").append(Utils.join(liveLeaders, ", ")).append(")") + .append(", liveLeaders=(").append(liveLeaders.stream().map(Node::toString).collect(Collectors.joining(", "))).append(")") .append(")"); return bld.toString(); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java index 6e58e209b35..dc60221d182 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java @@ -28,7 +28,6 @@ import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopi import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.ByteBufferAccessor; import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.common.utils.Utils; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -416,9 +415,9 @@ public class MetadataResponse extends AbstractResponse { ", partition=" + topicPartition + ", leader=" + leaderId + ", leaderEpoch=" + leaderEpoch + - ", replicas=" + Utils.join(replicaIds, ",") + - ", isr=" + Utils.join(inSyncReplicaIds, ",") + - ", offlineReplicas=" + Utils.join(offlineReplicaIds, ",") + ')'; + ", replicas=" + replicaIds.stream().map(Object::toString).collect(Collectors.joining(",")) + + ", isr=" + inSyncReplicaIds.stream().map(Object::toString).collect(Collectors.joining(",")) + + ", offlineReplicas=" + offlineReplicaIds.stream().map(Object::toString).collect(Collectors.joining(",")) + ')'; } } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java index df746b56c84..940a16f0a85 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java @@ -28,7 +28,6 @@ import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.ByteBufferAccessor; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.utils.MappedIterator; -import org.apache.kafka.common.utils.Utils; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -101,7 +100,7 @@ public class StopReplicaRequest extends AbstractControlRequest { append(", controllerEpoch=").append(controllerEpoch). append(", brokerEpoch=").append(brokerEpoch). append(", deletePartitions=").append(deletePartitions). - append(", topicStates=").append(Utils.join(topicStates, ",")). + append(", topicStates=").append(topicStates.stream().map(StopReplicaTopicState::toString).collect(Collectors.joining(","))). append(")"); return bld.toString(); } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java index 245fff7ffce..b846fb7b0f9 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java @@ -30,7 +30,6 @@ import org.apache.kafka.common.protocol.ByteBufferAccessor; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.security.auth.SecurityProtocol; import org.apache.kafka.common.utils.FlattenedIterator; -import org.apache.kafka.common.utils.Utils; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -38,6 +37,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import static java.util.Collections.singletonList; @@ -141,7 +141,7 @@ public final class UpdateMetadataRequest extends AbstractControlRequest { append(", type=").append(updateType). append(", brokerEpoch=").append(brokerEpoch). append(", partitionStates=").append(partitionStates). - append(", liveBrokers=").append(Utils.join(liveBrokers, ", ")). + append(", liveBrokers=").append(liveBrokers.stream().map(UpdateMetadataBroker::toString).collect(Collectors.joining(", "))). append(")"); return bld.toString(); } diff --git a/clients/src/main/java/org/apache/kafka/common/telemetry/ClientTelemetryState.java b/clients/src/main/java/org/apache/kafka/common/telemetry/ClientTelemetryState.java index 7dbb2703947..c6d1ad41d73 100644 --- a/clients/src/main/java/org/apache/kafka/common/telemetry/ClientTelemetryState.java +++ b/clients/src/main/java/org/apache/kafka/common/telemetry/ClientTelemetryState.java @@ -17,13 +17,13 @@ package org.apache.kafka.common.telemetry; -import org.apache.kafka.common.utils.Utils; import java.util.Arrays; import java.util.Collections; import java.util.EnumMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; /** * State that helps determine where client exists in the telemetry state i.e. subscribe->wait->push loop. @@ -150,7 +150,7 @@ public enum ClientTelemetryState { if (allowableStates != null && !allowableStates.isEmpty()) { validStatesClause = String.format("the valid telemetry state transitions from %s are: %s", this, - Utils.join(allowableStates, ", ")); + allowableStates.stream().map(ClientTelemetryState::toString).collect(Collectors.joining(", "))); } else { validStatesClause = String.format("there are no valid telemetry state transitions from %s", this); } diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java index f62aee76cbc..ce67fbdb0c7 100644 --- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java @@ -462,7 +462,7 @@ public final class Utils { return constructor.newInstance(args); } catch (NoSuchMethodException e) { throw new ClassNotFoundException(String.format("Failed to find " + - "constructor with %s for %s", Utils.join(argTypes, ", "), className), e); + "constructor with %s for %s", Arrays.stream(argTypes).map(Object::toString).collect(Collectors.joining(", ")), className), e); } catch (InstantiationException e) { throw new ClassNotFoundException(String.format("Failed to instantiate " + "%s", className), e); @@ -563,7 +563,7 @@ public final class Utils { } /** - * Formats a byte number as a human readable String ("3.2 MB") + * Formats a byte number as a human-readable String ("3.2 MB") * @param bytes some size in bytes * @return */ @@ -584,46 +584,6 @@ public final class Utils { } } - /** - * Create a string representation of an array joined by the given separator - * @param strs The array of items - * @param separator The separator - * @return The string representation. - */ - public static String join(T[] strs, String separator) { - return join(Arrays.asList(strs), separator); - } - - /** - * Create a string representation of a collection joined by the given separator - * @param collection The list of items - * @param separator The separator - * @return The string representation. - */ - public static String join(Collection collection, String separator) { - Objects.requireNonNull(collection); - return mkString(collection.stream(), "", "", separator); - } - - /** - * Create a string representation of a stream surrounded by `begin` and `end` and joined by `separator`. - * - * @return The string representation. - */ - public static String mkString(Stream stream, String begin, String end, String separator) { - Objects.requireNonNull(stream); - StringBuilder sb = new StringBuilder(); - sb.append(begin); - Iterator iter = stream.iterator(); - while (iter.hasNext()) { - sb.append(iter.next()); - if (iter.hasNext()) - sb.append(separator); - } - sb.append(end); - return sb.toString(); - } - /** * Converts a {@code Map} class into a string, concatenating keys and values * Example: diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractPartitionAssignorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractPartitionAssignorTest.java index 88cfe35df6a..048d168bd99 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractPartitionAssignorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractPartitionAssignorTest.java @@ -217,7 +217,7 @@ public class AbstractPartitionAssignorTest { } private static String toSortedString(List partitions) { - return Utils.join(partitions.stream().map(Object::toString).sorted().collect(Collectors.toList()), ", "); + return partitions.stream().map(Object::toString).sorted().collect(Collectors.joining(", ")); } private static List subscriptions(List> consumerTopics, diff --git a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java index 467dd53ac76..102d330ac75 100755 --- a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java @@ -158,20 +158,6 @@ public class UtilsTest { assertEquals("10 MB", formatBytes(10 * 1024 * 1024)); } - @Test - public void testJoin() { - assertEquals("", Utils.join(Collections.emptyList(), ",")); - assertEquals("1", Utils.join(asList("1"), ",")); - assertEquals("1,2,3", Utils.join(asList(1, 2, 3), ",")); - } - - @Test - public void testMkString() { - assertEquals("[]", Utils.mkString(Stream.empty(), "[", "]", ",")); - assertEquals("(1)", Utils.mkString(Stream.of("1"), "(", ")", ",")); - assertEquals("{1,2,3}", Utils.mkString(Stream.of(1, 2, 3), "{", "}", ",")); - } - @Test public void testAbs() { assertEquals(0, Utils.abs(Integer.MIN_VALUE)); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerInfo.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerInfo.java index 52e457a3f4c..15717eaa643 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerInfo.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerInfo.java @@ -80,14 +80,14 @@ public class WorkerInfo { */ protected final void addRuntimeInfo() { List jvmArgs = RUNTIME.getInputArguments(); - values.put("jvm.args", Utils.join(jvmArgs, ", ")); + values.put("jvm.args", String.join(", ", jvmArgs)); String[] jvmSpec = { RUNTIME.getVmVendor(), RUNTIME.getVmName(), RUNTIME.getSystemProperties().get("java.version"), RUNTIME.getVmVersion() }; - values.put("jvm.spec", Utils.join(jvmSpec, ", ")); + values.put("jvm.spec", String.join(", ", jvmSpec)); values.put("jvm.classpath", RUNTIME.getClassPath()); } @@ -100,7 +100,7 @@ public class WorkerInfo { OS.getArch(), OS.getVersion(), }; - values.put("os.spec", Utils.join(osInfo, ", ")); + values.put("os.spec", String.join(", ", osInfo)); values.put("os.vcpus", String.valueOf(OS.getAvailableProcessors())); } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java index eb085063a93..1e75b1abc77 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java @@ -311,7 +311,7 @@ class WorkerSinkTask extends WorkerTask, SinkReco if (SinkConnectorConfig.hasTopicsConfig(taskConfig)) { List topics = SinkConnectorConfig.parseTopicsList(taskConfig); consumer.subscribe(topics, new HandleRebalance()); - log.debug("{} Initializing and starting task for topics {}", this, Utils.join(topics, ", ")); + log.debug("{} Initializing and starting task for topics {}", this, String.join(", ", topics)); } else { String topicsRegexStr = taskConfig.get(SinkTask.TOPICS_REGEX_CONFIG); Pattern pattern = Pattern.compile(topicsRegexStr); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java index 6378093c4fc..14826e982d6 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java @@ -33,6 +33,7 @@ import java.security.NoSuchAlgorithmException; import java.security.Provider; import java.security.Security; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashSet; import java.util.List; @@ -41,6 +42,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import static org.apache.kafka.common.config.ConfigDef.Range.atLeast; import static org.apache.kafka.common.config.ConfigDef.Range.between; @@ -466,7 +468,8 @@ public class DistributedConfig extends WorkerConfig { + "compatibility"); } }, - () -> "[" + Utils.join(ConnectProtocolCompatibility.values(), ", ") + "]"), + () -> Arrays.stream(ConnectProtocolCompatibility.values()).map(ConnectProtocolCompatibility::toString) + .collect(Collectors.joining(", ", "[", "]"))), ConfigDef.Importance.LOW, CONNECT_PROTOCOL_DOC) .define(SCHEDULED_REBALANCE_MAX_DELAY_MS_CONFIG, diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java index 109391dd740..681394f21af 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java @@ -129,7 +129,7 @@ public class Plugins { } private static String pluginNames(Collection> plugins) { - return Utils.join(plugins, ", "); + return plugins.stream().map(PluginDesc::toString).collect(Collectors.joining(", ")); } private T newPlugin(Class klass) { @@ -317,7 +317,7 @@ public class Plugins { "Failed to find any class that implements Connector and which name matches " + connectorClassOrAlias + ", available connectors are: " - + Utils.join(connectors, ", ") + + connectors.stream().map(PluginDesc::toString).collect(Collectors.joining(", ")) ); } if (matches.size() > 1) { @@ -325,7 +325,7 @@ public class Plugins { "More than one connector matches alias " + connectorClassOrAlias + ". Please use full package and class name instead. Classes found: " - + Utils.join(connectors, ", ") + + connectors.stream().map(PluginDesc::toString).collect(Collectors.joining(", ")) ); } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java index da1722a8b46..e26f7d88f19 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java @@ -1265,7 +1265,7 @@ public class KafkaConfigBackingStore extends KafkaTopicBasedBackingStore impleme try { int taskNum = Integer.parseInt(parts[parts.length - 1]); - String connectorName = Utils.join(Arrays.copyOfRange(parts, 1, parts.length - 1), "-"); + String connectorName = String.join("-", Arrays.copyOfRange(parts, 1, parts.length - 1)); return new ConnectorTaskId(connectorName, taskNum); } catch (NumberFormatException e) { return null; diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java index 4843b8799b8..0ffc3eed4f3 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java @@ -30,7 +30,6 @@ import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.utils.ThreadUtils; import org.apache.kafka.common.utils.Time; -import org.apache.kafka.common.utils.Utils; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaAndValue; import org.apache.kafka.connect.data.SchemaBuilder; @@ -534,7 +533,7 @@ public class KafkaStatusBackingStore extends KafkaTopicBasedBackingStore impleme try { int taskNum = Integer.parseInt(parts[parts.length - 1]); - String connectorName = Utils.join(Arrays.copyOfRange(parts, 2, parts.length - 1), "-"); + String connectorName = String.join("-", Arrays.copyOfRange(parts, 2, parts.length - 1)); return new ConnectorTaskId(connectorName, taskNum); } catch (NumberFormatException e) { log.warn("Invalid task status key {}", key); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java index a9a90db45af..f8f9b660a38 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java @@ -47,7 +47,6 @@ import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Timer; -import org.apache.kafka.common.utils.Utils; import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.errors.RetriableException; import org.slf4j.Logger; @@ -399,7 +398,7 @@ public class TopicAdmin implements AutoCloseable { } } if (topicsByName.isEmpty()) return EMPTY_CREATION; - String topicNameList = Utils.join(topicsByName.keySet(), "', '"); + String topicNameList = String.join("', '", topicsByName.keySet()); // Attempt to create any missing topics CreateTopicsOptions args = new CreateTopicsOptions().validateOnly(false); diff --git a/core/src/main/scala/kafka/server/FetchSession.scala b/core/src/main/scala/kafka/server/FetchSession.scala index e950cba7720..2724e6bf9db 100644 --- a/core/src/main/scala/kafka/server/FetchSession.scala +++ b/core/src/main/scala/kafka/server/FetchSession.scala @@ -24,12 +24,13 @@ import org.apache.kafka.common.message.FetchResponseData import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.requests.FetchMetadata.{FINAL_EPOCH, INITIAL_EPOCH, INVALID_SESSION_ID} import org.apache.kafka.common.requests.{FetchRequest, FetchResponse, FetchMetadata => JFetchMetadata} -import org.apache.kafka.common.utils.{ImplicitLinkedHashCollection, Time, Utils} +import org.apache.kafka.common.utils.{ImplicitLinkedHashCollection, Time} import org.apache.kafka.server.metrics.KafkaMetricsGroup import java.util import java.util.{Collections, Optional} import java.util.concurrent.{ThreadLocalRandom, TimeUnit} + import scala.collection.mutable import scala.math.Ordered.orderingToOrdered @@ -47,7 +48,7 @@ object FetchSession { def partitionsToLogString(partitions: util.Collection[TopicIdPartition], traceEnabled: Boolean): String = { if (traceEnabled) { - "(" + Utils.join(partitions, ", ") + ")" + "(" + String.join(", ", partitions.toString) + ")" } else { s"${partitions.size} partition(s)" } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java index ed1ba70bb6e..fdbeb093c80 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java @@ -17,7 +17,6 @@ package org.apache.kafka.coordinator.group; import org.apache.kafka.common.record.CompressionType; -import org.apache.kafka.common.utils.Utils; import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; import org.apache.kafka.coordinator.group.assignor.RangeAssignor; import org.apache.kafka.coordinator.group.assignor.UniformAssignor; @@ -25,6 +24,7 @@ import org.apache.kafka.coordinator.group.assignor.UniformAssignor; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.stream.Collectors; /** * The group coordinator configurations. @@ -53,7 +53,8 @@ public class GroupCoordinatorConfig { public static final boolean NEW_GROUP_COORDINATOR_ENABLE_DEFAULT = false; public final static String GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG = "group.coordinator.rebalance.protocols"; - public final static String GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DOC = "The list of enabled rebalance protocols. Supported protocols: " + Utils.join(Group.GroupType.values(), ",") + ". " + + public final static String GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DOC = "The list of enabled rebalance protocols. Supported protocols: " + + Arrays.stream(Group.GroupType.values()).map(Group.GroupType::toString).collect(Collectors.joining(",")) + ". " + "The " + Group.GroupType.CONSUMER + " rebalance protocol is in early access and therefore must not be used in production."; public static final List GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DEFAULT = Collections.singletonList(Group.GroupType.CLASSIC.toString()); diff --git a/server/src/main/java/org/apache/kafka/server/config/ReplicationConfigs.java b/server/src/main/java/org/apache/kafka/server/config/ReplicationConfigs.java index 0d33f89f057..1bcec15513c 100644 --- a/server/src/main/java/org/apache/kafka/server/config/ReplicationConfigs.java +++ b/server/src/main/java/org/apache/kafka/server/config/ReplicationConfigs.java @@ -19,7 +19,6 @@ package org.apache.kafka.server.config; import org.apache.kafka.common.config.TopicConfig; import org.apache.kafka.common.security.auth.SecurityProtocol; -import org.apache.kafka.common.utils.Utils; import org.apache.kafka.server.common.MetadataVersion; public class ReplicationConfigs { @@ -116,7 +115,7 @@ public class ReplicationConfigs { public static final String INTER_BROKER_SECURITY_PROTOCOL_DEFAULT = SecurityProtocol.PLAINTEXT.toString(); public static final String INTER_BROKER_LISTENER_NAME_CONFIG = "inter.broker.listener.name"; public static final String INTER_BROKER_SECURITY_PROTOCOL_DOC = "Security protocol used to communicate between brokers. Valid values are: " + - Utils.join(SecurityProtocol.names(), ", ") + ". It is an error to set this and " + INTER_BROKER_LISTENER_NAME_CONFIG + + String.join(", ", SecurityProtocol.names()) + ". It is an error to set this and " + INTER_BROKER_LISTENER_NAME_CONFIG + " properties at the same time."; public static final String INTER_BROKER_LISTENER_NAME_DOC = "Name of listener used for communication between brokers. If this is unset, the listener name is defined by " + INTER_BROKER_SECURITY_PROTOCOL_CONFIG + "It is an error to set this and " + INTER_BROKER_SECURITY_PROTOCOL_CONFIG + " properties at the same time."; diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestContext.java b/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestContext.java index 8d475fbfe3c..b3e2312145d 100644 --- a/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestContext.java +++ b/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestContext.java @@ -216,7 +216,7 @@ public final class TieredStorageTestContext implements AutoCloseable { Function0 messageSupplier = () -> String.format("Could not consume %d records of %s from offset %d in %d ms. %d message(s) consumed:%s%s", expectedTotalCount, topicPartition, fetchOffset, timeoutMs, records.size(), sep, - Utils.join(records, sep)); + records.stream().map(Object::toString).collect(Collectors.joining(sep))); TestUtils.pollRecordsUntilTrue(consumer, pollAction, messageSupplier, timeoutMs); return records; } diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/utils/BrokerLocalStorage.java b/storage/src/test/java/org/apache/kafka/tiered/storage/utils/BrokerLocalStorage.java index 46cb4b05b68..2d26e43f8d7 100644 --- a/storage/src/test/java/org/apache/kafka/tiered/storage/utils/BrokerLocalStorage.java +++ b/storage/src/test/java/org/apache/kafka/tiered/storage/utils/BrokerLocalStorage.java @@ -120,7 +120,7 @@ public final class BrokerLocalStorage { "in the log directory is %d which is %s the expected offset %s. The directory of %s is " + "made of the following files: %s", brokerId, topicPartition, offsetHolder.firstLogFileBaseOffset, pos, offset, topicPartition, - Utils.join(offsetHolder.partitionFiles, System.lineSeparator())); + String.join(System.lineSeparator(), offsetHolder.partitionFiles)); throw new AssertionError(message); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java index e7353002b22..bac603ab1d2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java @@ -29,7 +29,6 @@ import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serializer; -import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.internals.UpgradeFromValues; import org.apache.kafka.streams.processor.FailOnInvalidTimestamp; @@ -257,7 +256,7 @@ public class StreamsConfigTest { @Test public void shouldSupportMultipleBootstrapServers() { final List expectedBootstrapServers = Arrays.asList("broker1:9092", "broker2:9092"); - final String bootstrapServersString = Utils.join(expectedBootstrapServers, ","); + final String bootstrapServersString = String.join(",", expectedBootstrapServers); final Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "irrelevant"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServersString); diff --git a/tools/src/main/java/org/apache/kafka/tools/MetadataQuorumCommand.java b/tools/src/main/java/org/apache/kafka/tools/MetadataQuorumCommand.java index 618d69e7b83..88085fedf33 100644 --- a/tools/src/main/java/org/apache/kafka/tools/MetadataQuorumCommand.java +++ b/tools/src/main/java/org/apache/kafka/tools/MetadataQuorumCommand.java @@ -38,6 +38,7 @@ import java.time.Instant; import java.util.ArrayList; import java.util.Comparator; import java.util.List; +import java.util.Objects; import java.util.Optional; import java.util.Properties; import java.util.concurrent.ExecutionException; @@ -230,8 +231,8 @@ public class MetadataQuorumCommand { "\nHighWatermark: " + quorumInfo.highWatermark() + "\nMaxFollowerLag: " + maxFollowerLag + "\nMaxFollowerLagTimeMs: " + maxFollowerLagTimeMs + - "\nCurrentVoters: " + Utils.mkString(quorumInfo.voters().stream().map(v -> v.replicaId()), "[", "]", ",") + - "\nCurrentObservers: " + Utils.mkString(quorumInfo.observers().stream().map(v -> v.replicaId()), "[", "]", ",") + "\nCurrentVoters: " + quorumInfo.voters().stream().map(QuorumInfo.ReplicaState::replicaId).map(Object::toString).collect(Collectors.joining(",", "[", "]")) + + "\nCurrentObservers: " + quorumInfo.observers().stream().map(QuorumInfo.ReplicaState::replicaId).map(Objects::toString).collect(Collectors.joining(",", "[", "]")) ); } diff --git a/tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java b/tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java index 479db63e18a..72f9170124a 100644 --- a/tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java +++ b/tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java @@ -410,7 +410,7 @@ public abstract class TransactionsCommand { String.valueOf(result.transactionTimeoutMs()), transactionStartTimeMsColumnValue, transactionDurationMsColumnValue, - Utils.join(result.topicPartitions(), ",") + result.topicPartitions().stream().map(TopicPartition::toString).collect(Collectors.joining(",")) ); ToolsUtils.prettyPrintTable(HEADERS, singletonList(row), out); diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java index c24681fd163..1d045b1e7bb 100644 --- a/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java +++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java @@ -53,6 +53,7 @@ import java.io.IOException; import java.io.PrintStream; import java.time.Duration; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -60,6 +61,7 @@ import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.CountDownLatch; +import java.util.stream.Collectors; import static net.sourceforge.argparse4j.impl.Arguments.store; import static net.sourceforge.argparse4j.impl.Arguments.storeTrue; @@ -536,7 +538,8 @@ public class VerifiableConsumer implements Closeable, OffsetCommitCallback, Cons .setDefault(ConsumerConfig.DEFAULT_GROUP_PROTOCOL) .metavar("GROUP_PROTOCOL") .dest("groupProtocol") - .help(String.format("Group protocol (must be one of %s)", Utils.join(GroupProtocol.values(), ", "))); + .help(String.format("Group protocol (must be one of %s)", Arrays.stream(GroupProtocol.values()) + .map(Object::toString).collect(Collectors.joining(", ")))); parser.addArgument("--group-remote-assignor") .action(store()) diff --git a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java index 49dd4efc69d..92fa46328d4 100644 --- a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java +++ b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java @@ -132,7 +132,7 @@ public class ConsumerGroupCommand { Set parsedStates = Arrays.stream(input.split(",")).map(s -> ConsumerGroupState.parse(s.trim())).collect(Collectors.toSet()); if (parsedStates.contains(ConsumerGroupState.UNKNOWN)) { Collection validStates = Arrays.stream(ConsumerGroupState.values()).filter(s -> s != ConsumerGroupState.UNKNOWN).collect(Collectors.toList()); - throw new IllegalArgumentException("Invalid state list '" + input + "'. Valid states are: " + Utils.join(validStates, ", ")); + throw new IllegalArgumentException("Invalid state list '" + input + "'. Valid states are: " + validStates.stream().map(ConsumerGroupState::toString).collect(Collectors.joining(", "))); } return parsedStates; } @@ -629,7 +629,7 @@ public class ConsumerGroupCommand { switch (topLevelResult) { case NONE: - System.out.println("Request succeed for deleting offsets with topic " + Utils.mkString(topics.stream(), "", "", ", ") + " group " + groupId); + System.out.println("Request succeed for deleting offsets with topic " + String.join(", ", topics) + " group " + groupId); break; case INVALID_GROUP_ID: printError("'" + groupId + "' is not valid.", Optional.empty()); @@ -1159,7 +1159,7 @@ public class ConsumerGroupCommand { ? CsvUtils.writerFor(CsvUtils.CsvRecordNoGroup.class) : CsvUtils.writerFor(CsvUtils.CsvRecordWithGroup.class); - return Utils.mkString(assignments.entrySet().stream().flatMap(e -> { + return assignments.entrySet().stream().flatMap(e -> { String groupId = e.getKey(); Map partitionInfo = e.getValue(); @@ -1176,7 +1176,7 @@ public class ConsumerGroupCommand { throw new RuntimeException(err); } }); - }), "", "", ""); + }).collect(Collectors.joining()); } Map deleteGroups() { @@ -1202,13 +1202,13 @@ public class ConsumerGroupCommand { }); if (failed.isEmpty()) - System.out.println("Deletion of requested consumer groups (" + Utils.mkString(success.keySet().stream(), "'", "'", "', '") + ") was successful."); + System.out.println("Deletion of requested consumer groups (" + "'" + success.keySet().stream().map(Object::toString).collect(Collectors.joining(", ")) + "'" + ") was successful."); else { printError("Deletion of some consumer groups failed:", Optional.empty()); failed.forEach((group, error) -> System.out.println("* Group '" + group + "' could not be deleted due to: " + error)); if (!success.isEmpty()) - System.out.println("\nThese consumer groups were deleted successfully: " + Utils.mkString(success.keySet().stream(), "'", "', '", "'")); + System.out.println("\nThese consumer groups were deleted successfully: " + "'" + success.keySet().stream().map(Object::toString).collect(Collectors.joining("'")) + "', '"); } failed.putAll(success); diff --git a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandOptions.java b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandOptions.java index 121594be4ad..6ede2144b7f 100644 --- a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandOptions.java +++ b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandOptions.java @@ -26,8 +26,8 @@ import java.util.Arrays; import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.stream.Collectors; -import static org.apache.kafka.common.utils.Utils.join; import static org.apache.kafka.tools.ToolsUtils.minus; public class ConsumerGroupCommandOptions extends CommandDefaultOptions { @@ -213,11 +213,11 @@ public class ConsumerGroupCommandOptions extends CommandDefaultOptions { if (options.has(describeOpt)) { if (!options.has(groupOpt) && !options.has(allGroupsOpt)) CommandLineUtils.printUsageAndExit(parser, - "Option " + describeOpt + " takes one of these options: " + join(allGroupSelectionScopeOpts, ", ")); + "Option " + describeOpt + " takes one of these options: " + allConsumerGroupLevelOpts.stream().map(Object::toString).collect(Collectors.joining(", "))); List> mutuallyExclusiveOpts = Arrays.asList(membersOpt, offsetsOpt, stateOpt); if (mutuallyExclusiveOpts.stream().mapToInt(o -> options.has(o) ? 1 : 0).sum() > 1) { CommandLineUtils.printUsageAndExit(parser, - "Option " + describeOpt + " takes at most one of these options: " + join(mutuallyExclusiveOpts, ", ")); + "Option " + describeOpt + " takes at most one of these options: " + mutuallyExclusiveOpts.stream().map(Object::toString).collect(Collectors.joining(", "))); } if (options.has(stateOpt) && options.valueOf(stateOpt) != null) CommandLineUtils.printUsageAndExit(parser, @@ -230,7 +230,7 @@ public class ConsumerGroupCommandOptions extends CommandDefaultOptions { if (options.has(deleteOpt)) { if (!options.has(groupOpt) && !options.has(allGroupsOpt)) CommandLineUtils.printUsageAndExit(parser, - "Option " + deleteOpt + " takes one of these options: " + join(allGroupSelectionScopeOpts, ", ")); + "Option " + deleteOpt + " takes one of these options: " + allGroupSelectionScopeOpts.stream().map(Object::toString).collect(Collectors.joining(", "))); if (options.has(topicOpt)) CommandLineUtils.printUsageAndExit(parser, "The consumer does not support topic-specific offset " + "deletion from a consumer group."); @@ -239,7 +239,7 @@ public class ConsumerGroupCommandOptions extends CommandDefaultOptions { if (options.has(deleteOffsetsOpt)) { if (!options.has(groupOpt) || !options.has(topicOpt)) CommandLineUtils.printUsageAndExit(parser, - "Option " + deleteOffsetsOpt + " takes the following options: " + join(allDeleteOffsetsOpts, ", ")); + "Option " + deleteOffsetsOpt + " takes the following options: " + allGroupSelectionScopeOpts.stream().map(Object::toString).collect(Collectors.joining(", "))); } if (options.has(resetOffsetsOpt)) { @@ -255,7 +255,7 @@ public class ConsumerGroupCommandOptions extends CommandDefaultOptions { if (!options.has(groupOpt) && !options.has(allGroupsOpt)) CommandLineUtils.printUsageAndExit(parser, - "Option " + resetOffsetsOpt + " takes one of these options: " + join(allGroupSelectionScopeOpts, ", ")); + "Option " + resetOffsetsOpt + " takes one of these options: " + allGroupSelectionScopeOpts.stream().map(Object::toString).collect(Collectors.joining(", "))); CommandLineUtils.checkInvalidArgs(parser, options, resetToOffsetOpt, minus(allResetOffsetScenarioOpts, resetToOffsetOpt)); CommandLineUtils.checkInvalidArgs(parser, options, resetToDatetimeOpt, minus(allResetOffsetScenarioOpts, resetToDatetimeOpt)); CommandLineUtils.checkInvalidArgs(parser, options, resetByDurationOpt, minus(allResetOffsetScenarioOpts, resetByDurationOpt)); diff --git a/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java b/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java index 96fd33360db..4ee83e0bcd8 100644 --- a/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java +++ b/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java @@ -488,12 +488,12 @@ public class ReassignPartitionsCommand { targetParts.forEach(t -> brokers.addAll(t.getValue())); System.out.printf("Clearing broker-level throttles on broker%s %s%n", - brokers.size() == 1 ? "" : "s", Utils.join(brokers, ",")); + brokers.size() == 1 ? "" : "s", brokers.stream().map(Object::toString).collect(Collectors.joining(","))); clearBrokerLevelThrottles(adminClient, brokers); Set topics = targetParts.stream().map(t -> t.getKey().topic()).collect(Collectors.toSet()); System.out.printf("Clearing topic-level throttles on topic%s %s%n", - topics.size() == 1 ? "" : "s", Utils.join(topics, ",")); + topics.size() == 1 ? "" : "s", String.join(",", topics)); clearTopicLevelThrottles(adminClient, topics); } diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupServiceTest.java b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupServiceTest.java index 4fd7e3b9197..a30ce404248 100644 --- a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupServiceTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupServiceTest.java @@ -37,7 +37,6 @@ import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartitionInfo; import org.apache.kafka.common.internals.KafkaFutureImpl; -import org.apache.kafka.common.utils.Utils; import org.junit.jupiter.api.Test; import org.mockito.ArgumentMatcher; import org.mockito.ArgumentMatchers; @@ -196,7 +195,7 @@ public class ConsumerGroupServiceTest { public void testAdminRequestsForResetOffsets() { List args = new ArrayList<>(Arrays.asList("--bootstrap-server", "localhost:9092", "--group", GROUP, "--reset-offsets", "--to-latest")); List topicsWithoutPartitionsSpecified = TOPICS.subList(1, TOPICS.size()); - List topicArgs = new ArrayList<>(Arrays.asList("--topic", TOPICS.get(0) + ":" + Utils.mkString(IntStream.range(0, NUM_PARTITIONS).mapToObj(Integer::toString), "", "", ","))); + List topicArgs = new ArrayList<>(Arrays.asList("--topic", TOPICS.get(0) + ":" + (IntStream.range(0, NUM_PARTITIONS).mapToObj(Integer::toString).collect(Collectors.joining(","))))); topicsWithoutPartitionsSpecified.forEach(topic -> topicArgs.addAll(Arrays.asList("--topic", topic))); args.addAll(topicArgs); diff --git a/trogdor/src/main/java/org/apache/kafka/trogdor/agent/Agent.java b/trogdor/src/main/java/org/apache/kafka/trogdor/agent/Agent.java index ff507d6d74f..c3fe33249b1 100644 --- a/trogdor/src/main/java/org/apache/kafka/trogdor/agent/Agent.java +++ b/trogdor/src/main/java/org/apache/kafka/trogdor/agent/Agent.java @@ -27,7 +27,6 @@ import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.utils.Exit; import org.apache.kafka.common.utils.Scheduler; import org.apache.kafka.common.utils.Time; -import org.apache.kafka.common.utils.Utils; import org.apache.kafka.trogdor.common.JsonUtil; import org.apache.kafka.trogdor.common.Node; import org.apache.kafka.trogdor.common.Platform; @@ -175,7 +174,7 @@ public final class Agent { Set nodes = controller.targetNodes(platform.topology()); if (!nodes.contains(platform.curNode().name())) { out.println("This task is not configured to run on this node. It runs on node(s): " + - Utils.join(nodes, ", ") + ", whereas this node is " + + String.join(", ", nodes) + ", whereas this node is " + platform.curNode().name()); return false; } diff --git a/trogdor/src/main/java/org/apache/kafka/trogdor/basic/BasicPlatform.java b/trogdor/src/main/java/org/apache/kafka/trogdor/basic/BasicPlatform.java index 6922c2e0705..3552e4b3089 100644 --- a/trogdor/src/main/java/org/apache/kafka/trogdor/basic/BasicPlatform.java +++ b/trogdor/src/main/java/org/apache/kafka/trogdor/basic/BasicPlatform.java @@ -20,7 +20,6 @@ package org.apache.kafka.trogdor.basic; import com.fasterxml.jackson.databind.JsonNode; import org.apache.kafka.common.utils.Scheduler; import org.apache.kafka.common.utils.Shell; -import org.apache.kafka.common.utils.Utils; import org.apache.kafka.trogdor.common.Node; import org.apache.kafka.trogdor.common.Platform; import org.apache.kafka.trogdor.common.Topology; @@ -49,10 +48,10 @@ public class BasicPlatform implements Platform { public String run(Node curNode, String[] command) throws IOException { try { String result = Shell.execCommand(command); - log.info("RUN: {}. RESULT: [{}]", Utils.join(command, " "), result); + log.info("RUN: {}. RESULT: [{}]", String.join(" ", command), result); return result; } catch (RuntimeException | IOException e) { - log.info("RUN: {}. ERROR: [{}]", Utils.join(command, " "), e.getMessage()); + log.info("RUN: {}. ERROR: [{}]", String.join(" ", command), e.getMessage()); throw e; } } @@ -64,7 +63,7 @@ public class BasicPlatform implements Platform { if (this.curNode == null) { throw new RuntimeException(String.format("No node named %s found " + "in the cluster! Cluster nodes are: %s", curNodeName, - Utils.join(topology.nodes().keySet(), ","))); + String.join(",", topology.nodes().keySet()))); } this.topology = topology; this.scheduler = scheduler; @@ -83,7 +82,7 @@ public class BasicPlatform implements Platform { if (this.curNode == null) { throw new RuntimeException(String.format("No node named %s found " + "in the cluster! Cluster nodes are: %s", curNodeName, - Utils.join(topology.nodes().keySet(), ","))); + String.join(",", topology.nodes().keySet()))); } this.commandRunner = new ShellCommandRunner(); } diff --git a/trogdor/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java b/trogdor/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java index 23c0ba4fcd0..cd0e09ee638 100644 --- a/trogdor/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java +++ b/trogdor/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java @@ -36,7 +36,6 @@ import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.apache.kafka.common.internals.KafkaFutureImpl; import org.apache.kafka.common.requests.CreateTopicsRequest; import org.apache.kafka.common.utils.Time; -import org.apache.kafka.common.utils.Utils; import org.slf4j.Logger; import java.util.ArrayList; @@ -226,7 +225,7 @@ public final class WorkerUtils { } if (Time.SYSTEM.milliseconds() > startMs + CREATE_TOPICS_CALL_TIMEOUT) { String str = "Unable to create topic(s): " + - Utils.join(topicsToCreate, ", ") + "after " + tries + " attempt(s)"; + String.join(", ", topicsToCreate) + "after " + tries + " attempt(s)"; log.warn(str); throw new TimeoutException(str); } diff --git a/trogdor/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java b/trogdor/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java index 41eca1f3893..b270a377305 100644 --- a/trogdor/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java +++ b/trogdor/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java @@ -142,7 +142,7 @@ public final class TaskManager { } } log.info("Created TaskManager for agent(s) on: {}", - Utils.join(nodeManagers.keySet(), ", ")); + String.join(", ", nodeManagers.keySet())); } class ManagedTask { @@ -239,7 +239,7 @@ public final class TaskManager { } if (!nonExistentNodeNames.isEmpty()) { throw new KafkaException("Unknown node names: " + - Utils.join(nonExistentNodeNames, ", ")); + String.join(", ", nonExistentNodeNames)); } if (validNodeNames.isEmpty()) { throw new KafkaException("No node names specified."); @@ -398,7 +398,7 @@ public final class TaskManager { task.maybeSetError("Unable to find nodes for task: " + e.getMessage()); return null; } - log.info("Running task {} on node(s): {}", task.id, Utils.join(nodeNames, ", ")); + log.info("Running task {} on node(s): {}", task.id, String.join(", ", nodeNames)); task.state = TaskStateType.RUNNING; task.startedMs = time.milliseconds(); for (String workerName : nodeNames) { @@ -597,7 +597,7 @@ public final class TaskManager { task.doneMs = time.milliseconds(); task.state = TaskStateType.DONE; log.info("{}: Task {} is now complete on {} with error: {}", - nodeName, task.id, Utils.join(task.workerIds.keySet(), ", "), + nodeName, task.id, String.join(", ", task.workerIds.keySet()), task.error.isEmpty() ? "(none)" : task.error); } else if ((task.state == TaskStateType.RUNNING) && (!task.error.isEmpty())) { log.info("{}: task {} stopped with error {}. Stopping worker(s): {}", diff --git a/trogdor/src/main/java/org/apache/kafka/trogdor/fault/ProcessStopFaultWorker.java b/trogdor/src/main/java/org/apache/kafka/trogdor/fault/ProcessStopFaultWorker.java index ef97e7b034e..5d9d68e443c 100644 --- a/trogdor/src/main/java/org/apache/kafka/trogdor/fault/ProcessStopFaultWorker.java +++ b/trogdor/src/main/java/org/apache/kafka/trogdor/fault/ProcessStopFaultWorker.java @@ -19,7 +19,6 @@ package org.apache.kafka.trogdor.fault; import com.fasterxml.jackson.databind.node.TextNode; import org.apache.kafka.common.internals.KafkaFutureImpl; -import org.apache.kafka.common.utils.Utils; import org.apache.kafka.trogdor.common.Platform; import org.apache.kafka.trogdor.task.TaskWorker; import org.apache.kafka.trogdor.task.WorkerStatusTracker; @@ -28,6 +27,7 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.List; +import java.util.stream.Collectors; public class ProcessStopFaultWorker implements TaskWorker { private static final Logger log = LoggerFactory.getLogger(ProcessStopFaultWorker.class); @@ -80,7 +80,7 @@ public class ProcessStopFaultWorker implements TaskWorker { id, javaProcessName, signalName); } else { log.info("{}: sending {} to {} pid(s) {}", - id, signalName, javaProcessName, Utils.join(pids, ", ")); + id, signalName, javaProcessName, pids.stream().map(Object::toString).collect(Collectors.joining(","))); for (Integer pid : pids) { platform.runCommand(new String[] {"kill", "-" + signalName, pid.toString()}); } diff --git a/trogdor/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java index 4bf7446412d..241bda0c5d0 100644 --- a/trogdor/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java +++ b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java @@ -64,6 +64,8 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import java.util.stream.Collectors; + public class RoundTripWorker implements TaskWorker { private static final int THROTTLE_PERIOD_MS = 100; @@ -318,7 +320,7 @@ public class RoundTripWorker implements TaskWorker { } } log.info("{}: consumer waiting for {} message(s), starting with: {}", - id, numToReceive, Utils.join(list, ", ")); + id, numToReceive, list.stream().map(Object::toString).collect(Collectors.joining(", "))); } } diff --git a/trogdor/src/test/java/org/apache/kafka/trogdor/basic/BasicPlatformTest.java b/trogdor/src/test/java/org/apache/kafka/trogdor/basic/BasicPlatformTest.java index aabcf1d1be8..7277fb6db4a 100644 --- a/trogdor/src/test/java/org/apache/kafka/trogdor/basic/BasicPlatformTest.java +++ b/trogdor/src/test/java/org/apache/kafka/trogdor/basic/BasicPlatformTest.java @@ -18,8 +18,6 @@ package org.apache.kafka.trogdor.basic; import static org.junit.jupiter.api.Assertions.assertEquals; - -import org.apache.kafka.common.utils.Utils; import org.apache.kafka.test.TestUtils; import org.apache.kafka.trogdor.common.Platform; @@ -57,7 +55,7 @@ public class BasicPlatformTest { Platform platform = Platform.Config.parse("bob01", configFile.getPath()); assertEquals("BasicPlatform", platform.name()); assertEquals(2, platform.topology().nodes().size()); - assertEquals("bob01, bob02", Utils.join(platform.topology().nodes().keySet(), ", ")); + assertEquals("bob01, bob02", String.join(", ", platform.topology().nodes().keySet())); } finally { Files.delete(configFile.toPath()); } diff --git a/trogdor/src/test/java/org/apache/kafka/trogdor/common/CapturingCommandRunner.java b/trogdor/src/test/java/org/apache/kafka/trogdor/common/CapturingCommandRunner.java index 69247cf8357..654f14b26a0 100644 --- a/trogdor/src/test/java/org/apache/kafka/trogdor/common/CapturingCommandRunner.java +++ b/trogdor/src/test/java/org/apache/kafka/trogdor/common/CapturingCommandRunner.java @@ -17,7 +17,6 @@ package org.apache.kafka.trogdor.common; -import org.apache.kafka.common.utils.Utils; import org.apache.kafka.trogdor.basic.BasicPlatform; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,11 +44,11 @@ public class CapturingCommandRunner implements BasicPlatform.CommandRunner { @Override public String run(Node curNode, String[] command) { - String line = Utils.join(command, " "); + String line = String.join(" ", command); synchronized (this) { getOrCreate(curNode.name()).add(line); } - log.debug("RAN {}: {}", curNode, Utils.join(command, " ")); + log.debug("RAN {}: {}", curNode, String.join(" ", command)); return ""; } diff --git a/trogdor/src/test/java/org/apache/kafka/trogdor/common/MiniTrogdorCluster.java b/trogdor/src/test/java/org/apache/kafka/trogdor/common/MiniTrogdorCluster.java index d4659713090..7f5cfc998ce 100644 --- a/trogdor/src/test/java/org/apache/kafka/trogdor/common/MiniTrogdorCluster.java +++ b/trogdor/src/test/java/org/apache/kafka/trogdor/common/MiniTrogdorCluster.java @@ -19,7 +19,6 @@ package org.apache.kafka.trogdor.common; import org.apache.kafka.common.utils.Scheduler; import org.apache.kafka.common.utils.ThreadUtils; -import org.apache.kafka.common.utils.Utils; import org.apache.kafka.trogdor.agent.Agent; import org.apache.kafka.trogdor.agent.AgentClient; import org.apache.kafka.trogdor.agent.AgentRestResource; @@ -134,7 +133,7 @@ public class MiniTrogdorCluster implements AutoCloseable { */ public MiniTrogdorCluster build() throws Exception { log.info("Creating MiniTrogdorCluster with agents: {} and coordinator: {}", - Utils.join(agentNames, ", "), coordinatorName); + String.join(", ", agentNames), coordinatorName); TreeMap nodes = new TreeMap<>(); for (String agentName : agentNames) { NodeData node = getOrCreate(agentName, nodes); diff --git a/trogdor/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java b/trogdor/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java index 8cce2011a89..9a84f1aa2e4 100644 --- a/trogdor/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java +++ b/trogdor/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java @@ -23,7 +23,6 @@ import com.fasterxml.jackson.databind.node.TextNode; import org.apache.kafka.common.utils.MockScheduler; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Scheduler; -import org.apache.kafka.common.utils.Utils; import org.apache.kafka.test.TestUtils; import org.apache.kafka.trogdor.agent.AgentClient; import org.apache.kafka.trogdor.common.CapturingCommandRunner; @@ -361,7 +360,7 @@ public class CoordinatorTest { @Override public String toString() { - return Utils.join(expectedLines, ", "); + return String.join(", ", expectedLines); } }