mirror of https://github.com/apache/kafka.git
MINOR: Replaced Utils.join() with JDK API. (#15823)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
parent
41f5bf844d
commit
55a00be4e9
|
@ -24,7 +24,6 @@ import org.apache.kafka.common.metrics.MetricsReporter;
|
||||||
import org.apache.kafka.common.security.auth.SecurityProtocol;
|
import org.apache.kafka.common.security.auth.SecurityProtocol;
|
||||||
import org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter;
|
import org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter;
|
||||||
import org.apache.kafka.common.utils.Time;
|
import org.apache.kafka.common.utils.Time;
|
||||||
import org.apache.kafka.common.utils.Utils;
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
@ -131,7 +130,7 @@ public class CommonClientConfigs {
|
||||||
|
|
||||||
public static final String SECURITY_PROTOCOL_CONFIG = "security.protocol";
|
public static final String SECURITY_PROTOCOL_CONFIG = "security.protocol";
|
||||||
public static final String SECURITY_PROTOCOL_DOC = "Protocol used to communicate with brokers. Valid values are: " +
|
public static final String SECURITY_PROTOCOL_DOC = "Protocol used to communicate with brokers. Valid values are: " +
|
||||||
Utils.join(SecurityProtocol.names(), ", ") + ".";
|
String.join(", ", SecurityProtocol.names()) + ".";
|
||||||
public static final String DEFAULT_SECURITY_PROTOCOL = "PLAINTEXT";
|
public static final String DEFAULT_SECURITY_PROTOCOL = "PLAINTEXT";
|
||||||
|
|
||||||
public static final String SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG = "socket.connection.setup.timeout.ms";
|
public static final String SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG = "socket.connection.setup.timeout.ms";
|
||||||
|
|
|
@ -25,7 +25,6 @@ import org.apache.kafka.common.requests.FetchMetadata;
|
||||||
import org.apache.kafka.common.requests.FetchRequest.PartitionData;
|
import org.apache.kafka.common.requests.FetchRequest.PartitionData;
|
||||||
import org.apache.kafka.common.requests.FetchResponse;
|
import org.apache.kafka.common.requests.FetchResponse;
|
||||||
import org.apache.kafka.common.utils.LogContext;
|
import org.apache.kafka.common.utils.LogContext;
|
||||||
import org.apache.kafka.common.utils.Utils;
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
@ -40,6 +39,7 @@ import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Map.Entry;
|
import java.util.Map.Entry;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import static org.apache.kafka.common.requests.FetchMetadata.INVALID_SESSION_ID;
|
import static org.apache.kafka.common.requests.FetchMetadata.INVALID_SESSION_ID;
|
||||||
|
|
||||||
|
@ -392,14 +392,14 @@ public class FetchSessionHandler {
|
||||||
if (!log.isTraceEnabled()) {
|
if (!log.isTraceEnabled()) {
|
||||||
return String.format("%d partition(s)", partitions.size());
|
return String.format("%d partition(s)", partitions.size());
|
||||||
}
|
}
|
||||||
return "(" + Utils.join(partitions, ", ") + ")";
|
return "(" + partitions.stream().map(TopicPartition::toString).collect(Collectors.joining(", ")) + ")";
|
||||||
}
|
}
|
||||||
|
|
||||||
private String topicIdPartitionsToLogString(Collection<TopicIdPartition> partitions) {
|
private String topicIdPartitionsToLogString(Collection<TopicIdPartition> partitions) {
|
||||||
if (!log.isTraceEnabled()) {
|
if (!log.isTraceEnabled()) {
|
||||||
return String.format("%d partition(s)", partitions.size());
|
return String.format("%d partition(s)", partitions.size());
|
||||||
}
|
}
|
||||||
return "(" + Utils.join(partitions, ", ") + ")";
|
return "(" + partitions.stream().map(TopicIdPartition::toString).collect(Collectors.joining(", ")) + ")";
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -438,16 +438,16 @@ public class FetchSessionHandler {
|
||||||
extraIds = findMissing(ids, sessionTopicNames.keySet());
|
extraIds = findMissing(ids, sessionTopicNames.keySet());
|
||||||
}
|
}
|
||||||
if (!omitted.isEmpty()) {
|
if (!omitted.isEmpty()) {
|
||||||
bld.append("omittedPartitions=(").append(Utils.join(omitted, ", ")).append("), ");
|
bld.append("omittedPartitions=(").append(omitted.stream().map(TopicPartition::toString).collect(Collectors.joining(", "))).append("), ");
|
||||||
}
|
}
|
||||||
if (!extra.isEmpty()) {
|
if (!extra.isEmpty()) {
|
||||||
bld.append("extraPartitions=(").append(Utils.join(extra, ", ")).append("), ");
|
bld.append("extraPartitions=(").append(extra.stream().map(TopicPartition::toString).collect(Collectors.joining(", "))).append("), ");
|
||||||
}
|
}
|
||||||
if (!extraIds.isEmpty()) {
|
if (!extraIds.isEmpty()) {
|
||||||
bld.append("extraIds=(").append(Utils.join(extraIds, ", ")).append("), ");
|
bld.append("extraIds=(").append(extraIds.stream().map(Uuid::toString).collect(Collectors.joining(", "))).append("), ");
|
||||||
}
|
}
|
||||||
if ((!omitted.isEmpty()) || (!extra.isEmpty()) || (!extraIds.isEmpty())) {
|
if ((!omitted.isEmpty()) || (!extra.isEmpty()) || (!extraIds.isEmpty())) {
|
||||||
bld.append("response=(").append(Utils.join(topicPartitions, ", ")).append(")");
|
bld.append("response=(").append(topicPartitions.stream().map(TopicPartition::toString).collect(Collectors.joining(", "))).append(")");
|
||||||
return bld.toString();
|
return bld.toString();
|
||||||
}
|
}
|
||||||
return null;
|
return null;
|
||||||
|
@ -470,11 +470,11 @@ public class FetchSessionHandler {
|
||||||
findMissing(topicPartitions, sessionPartitions.keySet());
|
findMissing(topicPartitions, sessionPartitions.keySet());
|
||||||
StringBuilder bld = new StringBuilder();
|
StringBuilder bld = new StringBuilder();
|
||||||
if (!extra.isEmpty())
|
if (!extra.isEmpty())
|
||||||
bld.append("extraPartitions=(").append(Utils.join(extra, ", ")).append("), ");
|
bld.append("extraPartitions=(").append(extra.stream().map(TopicPartition::toString).collect(Collectors.joining(", "))).append("), ");
|
||||||
if (!extraIds.isEmpty())
|
if (!extraIds.isEmpty())
|
||||||
bld.append("extraIds=(").append(Utils.join(extraIds, ", ")).append("), ");
|
bld.append("extraIds=(").append(extraIds.stream().map(Uuid::toString).collect(Collectors.joining(", "))).append("), ");
|
||||||
if ((!extra.isEmpty()) || (!extraIds.isEmpty())) {
|
if ((!extra.isEmpty()) || (!extraIds.isEmpty())) {
|
||||||
bld.append("response=(").append(Utils.join(topicPartitions, ", ")).append(")");
|
bld.append("response=(").append(topicPartitions.stream().map(TopicPartition::toString).collect(Collectors.joining(", "))).append(")");
|
||||||
return bld.toString();
|
return bld.toString();
|
||||||
}
|
}
|
||||||
return null;
|
return null;
|
||||||
|
@ -499,7 +499,7 @@ public class FetchSessionHandler {
|
||||||
}
|
}
|
||||||
StringBuilder bld = new StringBuilder();
|
StringBuilder bld = new StringBuilder();
|
||||||
bld.append(" with response=(").
|
bld.append(" with response=(").
|
||||||
append(Utils.join(topicPartitions, ", ")).
|
append(topicPartitions.stream().map(TopicPartition::toString).collect(Collectors.joining(", "))).
|
||||||
append(")");
|
append(")");
|
||||||
String prefix = ", implied=(";
|
String prefix = ", implied=(";
|
||||||
String suffix = "";
|
String suffix = "";
|
||||||
|
|
|
@ -185,7 +185,7 @@ public class NodeApiVersions {
|
||||||
bld.append("(");
|
bld.append("(");
|
||||||
if (lineBreaks)
|
if (lineBreaks)
|
||||||
bld.append("\n\t");
|
bld.append("\n\t");
|
||||||
bld.append(Utils.join(apiKeysText.values(), separator));
|
bld.append(String.join(separator, apiKeysText.values()));
|
||||||
if (lineBreaks)
|
if (lineBreaks)
|
||||||
bld.append("\n");
|
bld.append("\n");
|
||||||
bld.append(")");
|
bld.append(")");
|
||||||
|
|
|
@ -21,13 +21,13 @@ import org.apache.kafka.common.ConsumerGroupState;
|
||||||
import org.apache.kafka.common.GroupType;
|
import org.apache.kafka.common.GroupType;
|
||||||
import org.apache.kafka.common.Node;
|
import org.apache.kafka.common.Node;
|
||||||
import org.apache.kafka.common.acl.AclOperation;
|
import org.apache.kafka.common.acl.AclOperation;
|
||||||
import org.apache.kafka.common.utils.Utils;
|
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A detailed description of a single consumer group in the cluster.
|
* A detailed description of a single consumer group in the cluster.
|
||||||
|
@ -161,7 +161,7 @@ public class ConsumerGroupDescription {
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return "(groupId=" + groupId +
|
return "(groupId=" + groupId +
|
||||||
", isSimpleConsumerGroup=" + isSimpleConsumerGroup +
|
", isSimpleConsumerGroup=" + isSimpleConsumerGroup +
|
||||||
", members=" + Utils.join(members, ",") +
|
", members=" + members.stream().map(MemberDescription::toString).collect(Collectors.joining(",")) +
|
||||||
", partitionAssignor=" + partitionAssignor +
|
", partitionAssignor=" + partitionAssignor +
|
||||||
", type=" + type +
|
", type=" + type +
|
||||||
", state=" + state +
|
", state=" + state +
|
||||||
|
|
|
@ -17,12 +17,12 @@
|
||||||
package org.apache.kafka.clients.admin;
|
package org.apache.kafka.clients.admin;
|
||||||
|
|
||||||
import org.apache.kafka.common.TopicPartition;
|
import org.apache.kafka.common.TopicPartition;
|
||||||
import org.apache.kafka.common.utils.Utils;
|
|
||||||
|
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A description of the assignments of a specific group member.
|
* A description of the assignments of a specific group member.
|
||||||
|
@ -64,6 +64,6 @@ public class MemberAssignment {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return "(topicPartitions=" + Utils.join(topicPartitions, ",") + ")";
|
return "(topicPartitions=" + topicPartitions.stream().map(TopicPartition::toString).collect(Collectors.joining(",")) + ")";
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,12 +20,12 @@ package org.apache.kafka.clients.admin;
|
||||||
import org.apache.kafka.common.TopicPartitionInfo;
|
import org.apache.kafka.common.TopicPartitionInfo;
|
||||||
import org.apache.kafka.common.Uuid;
|
import org.apache.kafka.common.Uuid;
|
||||||
import org.apache.kafka.common.acl.AclOperation;
|
import org.apache.kafka.common.acl.AclOperation;
|
||||||
import org.apache.kafka.common.utils.Utils;
|
|
||||||
|
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A detailed description of a single topic in the cluster.
|
* A detailed description of a single topic in the cluster.
|
||||||
|
@ -135,6 +135,6 @@ public class TopicDescription {
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return "(name=" + name + ", internal=" + internal + ", partitions=" +
|
return "(name=" + name + ", internal=" + internal + ", partitions=" +
|
||||||
Utils.join(partitions, ",") + ", authorizedOperations=" + authorizedOperations + ")";
|
partitions.stream().map(TopicPartitionInfo::toString).collect(Collectors.joining(",")) + ", authorizedOperations=" + authorizedOperations + ")";
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -133,7 +133,6 @@ import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.createSu
|
||||||
import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.refreshCommittedOffsets;
|
import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.refreshCommittedOffsets;
|
||||||
import static org.apache.kafka.common.utils.Utils.closeQuietly;
|
import static org.apache.kafka.common.utils.Utils.closeQuietly;
|
||||||
import static org.apache.kafka.common.utils.Utils.isBlank;
|
import static org.apache.kafka.common.utils.Utils.isBlank;
|
||||||
import static org.apache.kafka.common.utils.Utils.join;
|
|
||||||
import static org.apache.kafka.common.utils.Utils.swallow;
|
import static org.apache.kafka.common.utils.Utils.swallow;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1493,7 +1492,8 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
|
||||||
// See the ApplicationEventProcessor.process() method that handles this event for more detail.
|
// See the ApplicationEventProcessor.process() method that handles this event for more detail.
|
||||||
applicationEventHandler.add(new AssignmentChangeEvent(subscriptions.allConsumed(), time.milliseconds()));
|
applicationEventHandler.add(new AssignmentChangeEvent(subscriptions.allConsumed(), time.milliseconds()));
|
||||||
|
|
||||||
log.info("Assigned to partition(s): {}", join(partitions, ", "));
|
log.info("Assigned to partition(s): {}", partitions.stream().map(TopicPartition::toString).collect(Collectors.joining(", ")));
|
||||||
|
|
||||||
if (subscriptions.assignFromUser(new HashSet<>(partitions)))
|
if (subscriptions.assignFromUser(new HashSet<>(partitions)))
|
||||||
applicationEventHandler.add(new NewTopicsMetadataUpdateRequestEvent());
|
applicationEventHandler.add(new NewTopicsMetadataUpdateRequestEvent());
|
||||||
} finally {
|
} finally {
|
||||||
|
@ -1845,7 +1845,7 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
|
||||||
}
|
}
|
||||||
|
|
||||||
fetchBuffer.retainAll(currentTopicPartitions);
|
fetchBuffer.retainAll(currentTopicPartitions);
|
||||||
log.info("Subscribed to topic(s): {}", join(topics, ", "));
|
log.info("Subscribed to topic(s): {}", String.join(", ", topics));
|
||||||
if (subscriptions.subscribe(new HashSet<>(topics), listener))
|
if (subscriptions.subscribe(new HashSet<>(topics), listener))
|
||||||
metadata.requestUpdateForNewTopics();
|
metadata.requestUpdateForNewTopics();
|
||||||
|
|
||||||
|
|
|
@ -24,12 +24,12 @@ import org.apache.kafka.common.errors.WakeupException;
|
||||||
import org.apache.kafka.common.metrics.Sensor;
|
import org.apache.kafka.common.metrics.Sensor;
|
||||||
import org.apache.kafka.common.utils.LogContext;
|
import org.apache.kafka.common.utils.LogContext;
|
||||||
import org.apache.kafka.common.utils.Time;
|
import org.apache.kafka.common.utils.Time;
|
||||||
import org.apache.kafka.common.utils.Utils;
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
|
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.SortedSet;
|
import java.util.SortedSet;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This class encapsulates the invocation of the callback methods defined in the {@link ConsumerRebalanceListener}
|
* This class encapsulates the invocation of the callback methods defined in the {@link ConsumerRebalanceListener}
|
||||||
|
@ -54,7 +54,7 @@ public class ConsumerRebalanceListenerInvoker {
|
||||||
}
|
}
|
||||||
|
|
||||||
public Exception invokePartitionsAssigned(final SortedSet<TopicPartition> assignedPartitions) {
|
public Exception invokePartitionsAssigned(final SortedSet<TopicPartition> assignedPartitions) {
|
||||||
log.info("Adding newly assigned partitions: {}", Utils.join(assignedPartitions, ", "));
|
log.info("Adding newly assigned partitions: {}", assignedPartitions.stream().map(TopicPartition::toString).collect(Collectors.joining(", ")));
|
||||||
|
|
||||||
Optional<ConsumerRebalanceListener> listener = subscriptions.rebalanceListener();
|
Optional<ConsumerRebalanceListener> listener = subscriptions.rebalanceListener();
|
||||||
|
|
||||||
|
@ -76,11 +76,11 @@ public class ConsumerRebalanceListenerInvoker {
|
||||||
}
|
}
|
||||||
|
|
||||||
public Exception invokePartitionsRevoked(final SortedSet<TopicPartition> revokedPartitions) {
|
public Exception invokePartitionsRevoked(final SortedSet<TopicPartition> revokedPartitions) {
|
||||||
log.info("Revoke previously assigned partitions {}", Utils.join(revokedPartitions, ", "));
|
log.info("Revoke previously assigned partitions {}", revokedPartitions.stream().map(TopicPartition::toString).collect(Collectors.joining(", ")));
|
||||||
Set<TopicPartition> revokePausedPartitions = subscriptions.pausedPartitions();
|
Set<TopicPartition> revokePausedPartitions = subscriptions.pausedPartitions();
|
||||||
revokePausedPartitions.retainAll(revokedPartitions);
|
revokePausedPartitions.retainAll(revokedPartitions);
|
||||||
if (!revokePausedPartitions.isEmpty())
|
if (!revokePausedPartitions.isEmpty())
|
||||||
log.info("The pause flag in partitions [{}] will be removed due to revocation.", Utils.join(revokePausedPartitions, ", "));
|
log.info("The pause flag in partitions [{}] will be removed due to revocation.", revokePausedPartitions.stream().map(TopicPartition::toString).collect(Collectors.joining(", ")));
|
||||||
|
|
||||||
Optional<ConsumerRebalanceListener> listener = subscriptions.rebalanceListener();
|
Optional<ConsumerRebalanceListener> listener = subscriptions.rebalanceListener();
|
||||||
|
|
||||||
|
@ -102,11 +102,11 @@ public class ConsumerRebalanceListenerInvoker {
|
||||||
}
|
}
|
||||||
|
|
||||||
public Exception invokePartitionsLost(final SortedSet<TopicPartition> lostPartitions) {
|
public Exception invokePartitionsLost(final SortedSet<TopicPartition> lostPartitions) {
|
||||||
log.info("Lost previously assigned partitions {}", Utils.join(lostPartitions, ", "));
|
log.info("Lost previously assigned partitions {}", lostPartitions.stream().map(TopicPartition::toString).collect(Collectors.joining(", ")));
|
||||||
Set<TopicPartition> lostPausedPartitions = subscriptions.pausedPartitions();
|
Set<TopicPartition> lostPausedPartitions = subscriptions.pausedPartitions();
|
||||||
lostPausedPartitions.retainAll(lostPartitions);
|
lostPausedPartitions.retainAll(lostPartitions);
|
||||||
if (!lostPausedPartitions.isEmpty())
|
if (!lostPausedPartitions.isEmpty())
|
||||||
log.info("The pause flag in partitions [{}] will be removed due to partition lost.", Utils.join(lostPausedPartitions, ", "));
|
log.info("The pause flag in partitions [{}] will be removed due to partition lost.", lostPartitions.stream().map(TopicPartition::toString).collect(Collectors.joining(", ")));
|
||||||
|
|
||||||
Optional<ConsumerRebalanceListener> listener = subscriptions.rebalanceListener();
|
Optional<ConsumerRebalanceListener> listener = subscriptions.rebalanceListener();
|
||||||
|
|
||||||
|
|
|
@ -77,6 +77,7 @@ import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
import java.util.regex.Pattern;
|
import java.util.regex.Pattern;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import static org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG;
|
import static org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG;
|
||||||
import static org.apache.kafka.clients.consumer.ConsumerConfig.CLIENT_RACK_CONFIG;
|
import static org.apache.kafka.clients.consumer.ConsumerConfig.CLIENT_RACK_CONFIG;
|
||||||
|
@ -93,7 +94,6 @@ import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.createSu
|
||||||
import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.configuredConsumerInterceptors;
|
import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.configuredConsumerInterceptors;
|
||||||
import static org.apache.kafka.common.utils.Utils.closeQuietly;
|
import static org.apache.kafka.common.utils.Utils.closeQuietly;
|
||||||
import static org.apache.kafka.common.utils.Utils.isBlank;
|
import static org.apache.kafka.common.utils.Utils.isBlank;
|
||||||
import static org.apache.kafka.common.utils.Utils.join;
|
|
||||||
import static org.apache.kafka.common.utils.Utils.swallow;
|
import static org.apache.kafka.common.utils.Utils.swallow;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -473,7 +473,7 @@ public class LegacyKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
|
||||||
|
|
||||||
fetcher.clearBufferedDataForUnassignedPartitions(currentTopicPartitions);
|
fetcher.clearBufferedDataForUnassignedPartitions(currentTopicPartitions);
|
||||||
|
|
||||||
log.info("Subscribed to topic(s): {}", join(topics, ", "));
|
log.info("Subscribed to topic(s): {}", String.join(", ", topics));
|
||||||
if (this.subscriptions.subscribe(new HashSet<>(topics), listener))
|
if (this.subscriptions.subscribe(new HashSet<>(topics), listener))
|
||||||
metadata.requestUpdateForNewTopics();
|
metadata.requestUpdateForNewTopics();
|
||||||
}
|
}
|
||||||
|
@ -571,7 +571,7 @@ public class LegacyKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
|
||||||
if (coordinator != null)
|
if (coordinator != null)
|
||||||
this.coordinator.maybeAutoCommitOffsetsAsync(time.milliseconds());
|
this.coordinator.maybeAutoCommitOffsetsAsync(time.milliseconds());
|
||||||
|
|
||||||
log.info("Assigned to partition(s): {}", join(partitions, ", "));
|
log.info("Assigned to partition(s): {}", partitions.stream().map(TopicPartition::toString).collect(Collectors.joining(", ")));
|
||||||
if (this.subscriptions.assignFromUser(new HashSet<>(partitions)))
|
if (this.subscriptions.assignFromUser(new HashSet<>(partitions)))
|
||||||
metadata.requestUpdateForNewTopics();
|
metadata.requestUpdateForNewTopics();
|
||||||
}
|
}
|
||||||
|
|
|
@ -40,7 +40,6 @@ import org.apache.kafka.common.telemetry.internals.ClientTelemetryProvider;
|
||||||
import org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter;
|
import org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter;
|
||||||
import org.apache.kafka.common.utils.LogContext;
|
import org.apache.kafka.common.utils.LogContext;
|
||||||
import org.apache.kafka.common.utils.Time;
|
import org.apache.kafka.common.utils.Time;
|
||||||
import org.apache.kafka.common.utils.Utils;
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
@ -1181,7 +1180,7 @@ public class MembershipManagerImpl implements MembershipManager {
|
||||||
* Visible for testing
|
* Visible for testing
|
||||||
*/
|
*/
|
||||||
CompletableFuture<Void> revokePartitions(Set<TopicPartition> revokedPartitions) {
|
CompletableFuture<Void> revokePartitions(Set<TopicPartition> revokedPartitions) {
|
||||||
log.info("Revoking previously assigned partitions {}", Utils.join(revokedPartitions, ", "));
|
log.info("Revoking previously assigned partitions {}", revokedPartitions.stream().map(TopicPartition::toString).collect(Collectors.joining(", ")));
|
||||||
|
|
||||||
logPausedPartitionsBeingRevoked(revokedPartitions);
|
logPausedPartitionsBeingRevoked(revokedPartitions);
|
||||||
|
|
||||||
|
@ -1377,7 +1376,7 @@ public class MembershipManagerImpl implements MembershipManager {
|
||||||
Set<TopicPartition> revokePausedPartitions = subscriptions.pausedPartitions();
|
Set<TopicPartition> revokePausedPartitions = subscriptions.pausedPartitions();
|
||||||
revokePausedPartitions.retainAll(partitionsToRevoke);
|
revokePausedPartitions.retainAll(partitionsToRevoke);
|
||||||
if (!revokePausedPartitions.isEmpty()) {
|
if (!revokePausedPartitions.isEmpty()) {
|
||||||
log.info("The pause flag in partitions [{}] will be removed due to revocation.", Utils.join(revokePausedPartitions, ", "));
|
log.info("The pause flag in partitions [{}] will be removed due to revocation.", revokePausedPartitions.stream().map(TopicPartition::toString).collect(Collectors.joining(", ")));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -17,11 +17,10 @@
|
||||||
|
|
||||||
package org.apache.kafka.common;
|
package org.apache.kafka.common;
|
||||||
|
|
||||||
import org.apache.kafka.common.utils.Utils;
|
|
||||||
|
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A class containing leadership, replicas and ISR information for a topic partition.
|
* A class containing leadership, replicas and ISR information for a topic partition.
|
||||||
|
@ -116,10 +115,10 @@ public class TopicPartitionInfo {
|
||||||
}
|
}
|
||||||
|
|
||||||
public String toString() {
|
public String toString() {
|
||||||
String elrString = elr != null ? Utils.join(elr, ", ") : "N/A";
|
String elrString = elr != null ? elr.stream().map(Node::toString).collect(Collectors.joining(", ")) : "N/A";
|
||||||
String lastKnownElrString = lastKnownElr != null ? Utils.join(lastKnownElr, ", ") : "N/A";
|
String lastKnownElrString = lastKnownElr != null ? lastKnownElr.stream().map(Node::toString).collect(Collectors.joining(", ")) : "N/A";
|
||||||
return "(partition=" + partition + ", leader=" + leader + ", replicas=" +
|
return "(partition=" + partition + ", leader=" + leader + ", replicas=" +
|
||||||
Utils.join(replicas, ", ") + ", isr=" + Utils.join(isr, ", ") +
|
replicas.stream().map(Node::toString).collect(Collectors.joining(", ")) + ", isr=" + isr.stream().map(Node::toString).collect(Collectors.joining(", ")) +
|
||||||
", elr=" + elrString + ", lastKnownElr=" + lastKnownElrString + ")";
|
", elr=" + elrString + ", lastKnownElr=" + lastKnownElrString + ")";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -515,7 +515,7 @@ public class ConfigDef {
|
||||||
// Check all configurations are defined
|
// Check all configurations are defined
|
||||||
List<String> undefinedConfigKeys = undefinedDependentConfigs();
|
List<String> undefinedConfigKeys = undefinedDependentConfigs();
|
||||||
if (!undefinedConfigKeys.isEmpty()) {
|
if (!undefinedConfigKeys.isEmpty()) {
|
||||||
String joined = Utils.join(undefinedConfigKeys, ",");
|
String joined = undefinedConfigKeys.stream().map(String::toString).collect(Collectors.joining(","));
|
||||||
throw new ConfigException("Some configurations in are referred in the dependents, but not defined: " + joined);
|
throw new ConfigException("Some configurations in are referred in the dependents, but not defined: " + joined);
|
||||||
}
|
}
|
||||||
// parse all known keys
|
// parse all known keys
|
||||||
|
@ -806,7 +806,7 @@ public class ConfigDef {
|
||||||
return parsedValue.toString();
|
return parsedValue.toString();
|
||||||
case LIST:
|
case LIST:
|
||||||
List<?> valueList = (List<?>) parsedValue;
|
List<?> valueList = (List<?>) parsedValue;
|
||||||
return Utils.join(valueList, ",");
|
return valueList.stream().map(Object::toString).collect(Collectors.joining(","));
|
||||||
case CLASS:
|
case CLASS:
|
||||||
Class<?> clazz = (Class<?>) parsedValue;
|
Class<?> clazz = (Class<?>) parsedValue;
|
||||||
return clazz.getName();
|
return clazz.getName();
|
||||||
|
@ -1051,13 +1051,13 @@ public class ConfigDef {
|
||||||
public void ensureValid(String name, Object o) {
|
public void ensureValid(String name, Object o) {
|
||||||
String s = (String) o;
|
String s = (String) o;
|
||||||
if (!validStrings.contains(s)) {
|
if (!validStrings.contains(s)) {
|
||||||
throw new ConfigException(name, o, "String must be one of: " + Utils.join(validStrings, ", "));
|
throw new ConfigException(name, o, "String must be one of: " + String.join(", ", validStrings));
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return "[" + Utils.join(validStrings, ", ") + "]";
|
return "[" + String.join(", ", validStrings) + "]";
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1079,12 +1079,12 @@ public class ConfigDef {
|
||||||
public void ensureValid(String name, Object o) {
|
public void ensureValid(String name, Object o) {
|
||||||
String s = (String) o;
|
String s = (String) o;
|
||||||
if (s == null || !validStrings.contains(s.toUpperCase(Locale.ROOT))) {
|
if (s == null || !validStrings.contains(s.toUpperCase(Locale.ROOT))) {
|
||||||
throw new ConfigException(name, o, "String must be one of (case insensitive): " + Utils.join(validStrings, ", "));
|
throw new ConfigException(name, o, "String must be one of (case insensitive): " + String.join(", ", validStrings));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return "(case insensitive) [" + Utils.join(validStrings, ", ") + "]";
|
return "(case insensitive) [" + String.join(", ", validStrings) + "]";
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1205,7 +1205,8 @@ public class ConfigDef {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!foundIllegalCharacters.isEmpty()) {
|
if (!foundIllegalCharacters.isEmpty()) {
|
||||||
throw new ConfigException(name, value, "String may not contain control sequences but had the following ASCII chars: " + Utils.join(foundIllegalCharacters, ", "));
|
throw new ConfigException(name, value, "String may not contain control sequences but had the following ASCII chars: " +
|
||||||
|
foundIllegalCharacters.stream().map(Object::toString).collect(Collectors.joining(", ")));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -28,7 +28,6 @@ import org.apache.kafka.common.protocol.ApiKeys;
|
||||||
import org.apache.kafka.common.protocol.ByteBufferAccessor;
|
import org.apache.kafka.common.protocol.ByteBufferAccessor;
|
||||||
import org.apache.kafka.common.protocol.Errors;
|
import org.apache.kafka.common.protocol.Errors;
|
||||||
import org.apache.kafka.common.record.RecordBatch;
|
import org.apache.kafka.common.record.RecordBatch;
|
||||||
import org.apache.kafka.common.utils.Utils;
|
|
||||||
|
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
@ -325,8 +324,8 @@ public class FetchRequest extends AbstractRequest {
|
||||||
append(", maxBytes=").append(maxBytes).
|
append(", maxBytes=").append(maxBytes).
|
||||||
append(", fetchData=").append(toFetch).
|
append(", fetchData=").append(toFetch).
|
||||||
append(", isolationLevel=").append(isolationLevel).
|
append(", isolationLevel=").append(isolationLevel).
|
||||||
append(", removed=").append(Utils.join(removed, ", ")).
|
append(", removed=").append(removed.stream().map(TopicIdPartition::toString).collect(Collectors.joining(", "))).
|
||||||
append(", replaced=").append(Utils.join(replaced, ", ")).
|
append(", replaced=").append(replaced.stream().map(TopicIdPartition::toString).collect(Collectors.joining(", "))).
|
||||||
append(", metadata=").append(metadata).
|
append(", metadata=").append(metadata).
|
||||||
append(", rackId=").append(rackId).
|
append(", rackId=").append(rackId).
|
||||||
append(")");
|
append(")");
|
||||||
|
|
|
@ -29,7 +29,6 @@ import org.apache.kafka.common.protocol.ApiKeys;
|
||||||
import org.apache.kafka.common.protocol.ByteBufferAccessor;
|
import org.apache.kafka.common.protocol.ByteBufferAccessor;
|
||||||
import org.apache.kafka.common.protocol.Errors;
|
import org.apache.kafka.common.protocol.Errors;
|
||||||
import org.apache.kafka.common.utils.FlattenedIterator;
|
import org.apache.kafka.common.utils.FlattenedIterator;
|
||||||
import org.apache.kafka.common.utils.Utils;
|
|
||||||
|
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
@ -120,7 +119,7 @@ public final class LeaderAndIsrRequest extends AbstractControlRequest {
|
||||||
.append(", brokerEpoch=").append(brokerEpoch)
|
.append(", brokerEpoch=").append(brokerEpoch)
|
||||||
.append(", partitionStates=").append(partitionStates)
|
.append(", partitionStates=").append(partitionStates)
|
||||||
.append(", topicIds=").append(topicIds)
|
.append(", topicIds=").append(topicIds)
|
||||||
.append(", liveLeaders=(").append(Utils.join(liveLeaders, ", ")).append(")")
|
.append(", liveLeaders=(").append(liveLeaders.stream().map(Node::toString).collect(Collectors.joining(", "))).append(")")
|
||||||
.append(")");
|
.append(")");
|
||||||
return bld.toString();
|
return bld.toString();
|
||||||
|
|
||||||
|
|
|
@ -28,7 +28,6 @@ import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopi
|
||||||
import org.apache.kafka.common.protocol.ApiKeys;
|
import org.apache.kafka.common.protocol.ApiKeys;
|
||||||
import org.apache.kafka.common.protocol.ByteBufferAccessor;
|
import org.apache.kafka.common.protocol.ByteBufferAccessor;
|
||||||
import org.apache.kafka.common.protocol.Errors;
|
import org.apache.kafka.common.protocol.Errors;
|
||||||
import org.apache.kafka.common.utils.Utils;
|
|
||||||
|
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
@ -416,9 +415,9 @@ public class MetadataResponse extends AbstractResponse {
|
||||||
", partition=" + topicPartition +
|
", partition=" + topicPartition +
|
||||||
", leader=" + leaderId +
|
", leader=" + leaderId +
|
||||||
", leaderEpoch=" + leaderEpoch +
|
", leaderEpoch=" + leaderEpoch +
|
||||||
", replicas=" + Utils.join(replicaIds, ",") +
|
", replicas=" + replicaIds.stream().map(Object::toString).collect(Collectors.joining(",")) +
|
||||||
", isr=" + Utils.join(inSyncReplicaIds, ",") +
|
", isr=" + inSyncReplicaIds.stream().map(Object::toString).collect(Collectors.joining(",")) +
|
||||||
", offlineReplicas=" + Utils.join(offlineReplicaIds, ",") + ')';
|
", offlineReplicas=" + offlineReplicaIds.stream().map(Object::toString).collect(Collectors.joining(",")) + ')';
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -28,7 +28,6 @@ import org.apache.kafka.common.protocol.ApiKeys;
|
||||||
import org.apache.kafka.common.protocol.ByteBufferAccessor;
|
import org.apache.kafka.common.protocol.ByteBufferAccessor;
|
||||||
import org.apache.kafka.common.protocol.Errors;
|
import org.apache.kafka.common.protocol.Errors;
|
||||||
import org.apache.kafka.common.utils.MappedIterator;
|
import org.apache.kafka.common.utils.MappedIterator;
|
||||||
import org.apache.kafka.common.utils.Utils;
|
|
||||||
|
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
@ -101,7 +100,7 @@ public class StopReplicaRequest extends AbstractControlRequest {
|
||||||
append(", controllerEpoch=").append(controllerEpoch).
|
append(", controllerEpoch=").append(controllerEpoch).
|
||||||
append(", brokerEpoch=").append(brokerEpoch).
|
append(", brokerEpoch=").append(brokerEpoch).
|
||||||
append(", deletePartitions=").append(deletePartitions).
|
append(", deletePartitions=").append(deletePartitions).
|
||||||
append(", topicStates=").append(Utils.join(topicStates, ",")).
|
append(", topicStates=").append(topicStates.stream().map(StopReplicaTopicState::toString).collect(Collectors.joining(","))).
|
||||||
append(")");
|
append(")");
|
||||||
return bld.toString();
|
return bld.toString();
|
||||||
}
|
}
|
||||||
|
|
|
@ -30,7 +30,6 @@ import org.apache.kafka.common.protocol.ByteBufferAccessor;
|
||||||
import org.apache.kafka.common.protocol.Errors;
|
import org.apache.kafka.common.protocol.Errors;
|
||||||
import org.apache.kafka.common.security.auth.SecurityProtocol;
|
import org.apache.kafka.common.security.auth.SecurityProtocol;
|
||||||
import org.apache.kafka.common.utils.FlattenedIterator;
|
import org.apache.kafka.common.utils.FlattenedIterator;
|
||||||
import org.apache.kafka.common.utils.Utils;
|
|
||||||
|
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
@ -38,6 +37,7 @@ import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import static java.util.Collections.singletonList;
|
import static java.util.Collections.singletonList;
|
||||||
|
|
||||||
|
@ -141,7 +141,7 @@ public final class UpdateMetadataRequest extends AbstractControlRequest {
|
||||||
append(", type=").append(updateType).
|
append(", type=").append(updateType).
|
||||||
append(", brokerEpoch=").append(brokerEpoch).
|
append(", brokerEpoch=").append(brokerEpoch).
|
||||||
append(", partitionStates=").append(partitionStates).
|
append(", partitionStates=").append(partitionStates).
|
||||||
append(", liveBrokers=").append(Utils.join(liveBrokers, ", ")).
|
append(", liveBrokers=").append(liveBrokers.stream().map(UpdateMetadataBroker::toString).collect(Collectors.joining(", "))).
|
||||||
append(")");
|
append(")");
|
||||||
return bld.toString();
|
return bld.toString();
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,13 +17,13 @@
|
||||||
|
|
||||||
package org.apache.kafka.common.telemetry;
|
package org.apache.kafka.common.telemetry;
|
||||||
|
|
||||||
import org.apache.kafka.common.utils.Utils;
|
|
||||||
|
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.EnumMap;
|
import java.util.EnumMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* State that helps determine where client exists in the telemetry state i.e. subscribe->wait->push loop.
|
* State that helps determine where client exists in the telemetry state i.e. subscribe->wait->push loop.
|
||||||
|
@ -150,7 +150,7 @@ public enum ClientTelemetryState {
|
||||||
if (allowableStates != null && !allowableStates.isEmpty()) {
|
if (allowableStates != null && !allowableStates.isEmpty()) {
|
||||||
validStatesClause = String.format("the valid telemetry state transitions from %s are: %s",
|
validStatesClause = String.format("the valid telemetry state transitions from %s are: %s",
|
||||||
this,
|
this,
|
||||||
Utils.join(allowableStates, ", "));
|
allowableStates.stream().map(ClientTelemetryState::toString).collect(Collectors.joining(", ")));
|
||||||
} else {
|
} else {
|
||||||
validStatesClause = String.format("there are no valid telemetry state transitions from %s", this);
|
validStatesClause = String.format("there are no valid telemetry state transitions from %s", this);
|
||||||
}
|
}
|
||||||
|
|
|
@ -462,7 +462,7 @@ public final class Utils {
|
||||||
return constructor.newInstance(args);
|
return constructor.newInstance(args);
|
||||||
} catch (NoSuchMethodException e) {
|
} catch (NoSuchMethodException e) {
|
||||||
throw new ClassNotFoundException(String.format("Failed to find " +
|
throw new ClassNotFoundException(String.format("Failed to find " +
|
||||||
"constructor with %s for %s", Utils.join(argTypes, ", "), className), e);
|
"constructor with %s for %s", Arrays.stream(argTypes).map(Object::toString).collect(Collectors.joining(", ")), className), e);
|
||||||
} catch (InstantiationException e) {
|
} catch (InstantiationException e) {
|
||||||
throw new ClassNotFoundException(String.format("Failed to instantiate " +
|
throw new ClassNotFoundException(String.format("Failed to instantiate " +
|
||||||
"%s", className), e);
|
"%s", className), e);
|
||||||
|
@ -563,7 +563,7 @@ public final class Utils {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Formats a byte number as a human readable String ("3.2 MB")
|
* Formats a byte number as a human-readable String ("3.2 MB")
|
||||||
* @param bytes some size in bytes
|
* @param bytes some size in bytes
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
|
@ -584,46 +584,6 @@ public final class Utils {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Create a string representation of an array joined by the given separator
|
|
||||||
* @param strs The array of items
|
|
||||||
* @param separator The separator
|
|
||||||
* @return The string representation.
|
|
||||||
*/
|
|
||||||
public static <T> String join(T[] strs, String separator) {
|
|
||||||
return join(Arrays.asList(strs), separator);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Create a string representation of a collection joined by the given separator
|
|
||||||
* @param collection The list of items
|
|
||||||
* @param separator The separator
|
|
||||||
* @return The string representation.
|
|
||||||
*/
|
|
||||||
public static <T> String join(Collection<T> collection, String separator) {
|
|
||||||
Objects.requireNonNull(collection);
|
|
||||||
return mkString(collection.stream(), "", "", separator);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Create a string representation of a stream surrounded by `begin` and `end` and joined by `separator`.
|
|
||||||
*
|
|
||||||
* @return The string representation.
|
|
||||||
*/
|
|
||||||
public static <T> String mkString(Stream<T> stream, String begin, String end, String separator) {
|
|
||||||
Objects.requireNonNull(stream);
|
|
||||||
StringBuilder sb = new StringBuilder();
|
|
||||||
sb.append(begin);
|
|
||||||
Iterator<T> iter = stream.iterator();
|
|
||||||
while (iter.hasNext()) {
|
|
||||||
sb.append(iter.next());
|
|
||||||
if (iter.hasNext())
|
|
||||||
sb.append(separator);
|
|
||||||
}
|
|
||||||
sb.append(end);
|
|
||||||
return sb.toString();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Converts a {@code Map} class into a string, concatenating keys and values
|
* Converts a {@code Map} class into a string, concatenating keys and values
|
||||||
* Example:
|
* Example:
|
||||||
|
|
|
@ -217,7 +217,7 @@ public class AbstractPartitionAssignorTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
private static String toSortedString(List<?> partitions) {
|
private static String toSortedString(List<?> partitions) {
|
||||||
return Utils.join(partitions.stream().map(Object::toString).sorted().collect(Collectors.toList()), ", ");
|
return partitions.stream().map(Object::toString).sorted().collect(Collectors.joining(", "));
|
||||||
}
|
}
|
||||||
|
|
||||||
private static List<Subscription> subscriptions(List<List<String>> consumerTopics,
|
private static List<Subscription> subscriptions(List<List<String>> consumerTopics,
|
||||||
|
|
|
@ -158,20 +158,6 @@ public class UtilsTest {
|
||||||
assertEquals("10 MB", formatBytes(10 * 1024 * 1024));
|
assertEquals("10 MB", formatBytes(10 * 1024 * 1024));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testJoin() {
|
|
||||||
assertEquals("", Utils.join(Collections.emptyList(), ","));
|
|
||||||
assertEquals("1", Utils.join(asList("1"), ","));
|
|
||||||
assertEquals("1,2,3", Utils.join(asList(1, 2, 3), ","));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testMkString() {
|
|
||||||
assertEquals("[]", Utils.mkString(Stream.empty(), "[", "]", ","));
|
|
||||||
assertEquals("(1)", Utils.mkString(Stream.of("1"), "(", ")", ","));
|
|
||||||
assertEquals("{1,2,3}", Utils.mkString(Stream.of(1, 2, 3), "{", "}", ","));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testAbs() {
|
public void testAbs() {
|
||||||
assertEquals(0, Utils.abs(Integer.MIN_VALUE));
|
assertEquals(0, Utils.abs(Integer.MIN_VALUE));
|
||||||
|
|
|
@ -80,14 +80,14 @@ public class WorkerInfo {
|
||||||
*/
|
*/
|
||||||
protected final void addRuntimeInfo() {
|
protected final void addRuntimeInfo() {
|
||||||
List<String> jvmArgs = RUNTIME.getInputArguments();
|
List<String> jvmArgs = RUNTIME.getInputArguments();
|
||||||
values.put("jvm.args", Utils.join(jvmArgs, ", "));
|
values.put("jvm.args", String.join(", ", jvmArgs));
|
||||||
String[] jvmSpec = {
|
String[] jvmSpec = {
|
||||||
RUNTIME.getVmVendor(),
|
RUNTIME.getVmVendor(),
|
||||||
RUNTIME.getVmName(),
|
RUNTIME.getVmName(),
|
||||||
RUNTIME.getSystemProperties().get("java.version"),
|
RUNTIME.getSystemProperties().get("java.version"),
|
||||||
RUNTIME.getVmVersion()
|
RUNTIME.getVmVersion()
|
||||||
};
|
};
|
||||||
values.put("jvm.spec", Utils.join(jvmSpec, ", "));
|
values.put("jvm.spec", String.join(", ", jvmSpec));
|
||||||
values.put("jvm.classpath", RUNTIME.getClassPath());
|
values.put("jvm.classpath", RUNTIME.getClassPath());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -100,7 +100,7 @@ public class WorkerInfo {
|
||||||
OS.getArch(),
|
OS.getArch(),
|
||||||
OS.getVersion(),
|
OS.getVersion(),
|
||||||
};
|
};
|
||||||
values.put("os.spec", Utils.join(osInfo, ", "));
|
values.put("os.spec", String.join(", ", osInfo));
|
||||||
values.put("os.vcpus", String.valueOf(OS.getAvailableProcessors()));
|
values.put("os.vcpus", String.valueOf(OS.getAvailableProcessors()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -311,7 +311,7 @@ class WorkerSinkTask extends WorkerTask<ConsumerRecord<byte[], byte[]>, SinkReco
|
||||||
if (SinkConnectorConfig.hasTopicsConfig(taskConfig)) {
|
if (SinkConnectorConfig.hasTopicsConfig(taskConfig)) {
|
||||||
List<String> topics = SinkConnectorConfig.parseTopicsList(taskConfig);
|
List<String> topics = SinkConnectorConfig.parseTopicsList(taskConfig);
|
||||||
consumer.subscribe(topics, new HandleRebalance());
|
consumer.subscribe(topics, new HandleRebalance());
|
||||||
log.debug("{} Initializing and starting task for topics {}", this, Utils.join(topics, ", "));
|
log.debug("{} Initializing and starting task for topics {}", this, String.join(", ", topics));
|
||||||
} else {
|
} else {
|
||||||
String topicsRegexStr = taskConfig.get(SinkTask.TOPICS_REGEX_CONFIG);
|
String topicsRegexStr = taskConfig.get(SinkTask.TOPICS_REGEX_CONFIG);
|
||||||
Pattern pattern = Pattern.compile(topicsRegexStr);
|
Pattern pattern = Pattern.compile(topicsRegexStr);
|
||||||
|
|
|
@ -33,6 +33,7 @@ import java.security.NoSuchAlgorithmException;
|
||||||
import java.security.Provider;
|
import java.security.Provider;
|
||||||
import java.security.Security;
|
import java.security.Security;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -41,6 +42,7 @@ import java.util.Map;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
|
import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
|
||||||
import static org.apache.kafka.common.config.ConfigDef.Range.between;
|
import static org.apache.kafka.common.config.ConfigDef.Range.between;
|
||||||
|
@ -466,7 +468,8 @@ public class DistributedConfig extends WorkerConfig {
|
||||||
+ "compatibility");
|
+ "compatibility");
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
() -> "[" + Utils.join(ConnectProtocolCompatibility.values(), ", ") + "]"),
|
() -> Arrays.stream(ConnectProtocolCompatibility.values()).map(ConnectProtocolCompatibility::toString)
|
||||||
|
.collect(Collectors.joining(", ", "[", "]"))),
|
||||||
ConfigDef.Importance.LOW,
|
ConfigDef.Importance.LOW,
|
||||||
CONNECT_PROTOCOL_DOC)
|
CONNECT_PROTOCOL_DOC)
|
||||||
.define(SCHEDULED_REBALANCE_MAX_DELAY_MS_CONFIG,
|
.define(SCHEDULED_REBALANCE_MAX_DELAY_MS_CONFIG,
|
||||||
|
|
|
@ -129,7 +129,7 @@ public class Plugins {
|
||||||
}
|
}
|
||||||
|
|
||||||
private static <T> String pluginNames(Collection<PluginDesc<T>> plugins) {
|
private static <T> String pluginNames(Collection<PluginDesc<T>> plugins) {
|
||||||
return Utils.join(plugins, ", ");
|
return plugins.stream().map(PluginDesc::toString).collect(Collectors.joining(", "));
|
||||||
}
|
}
|
||||||
|
|
||||||
private <T> T newPlugin(Class<T> klass) {
|
private <T> T newPlugin(Class<T> klass) {
|
||||||
|
@ -317,7 +317,7 @@ public class Plugins {
|
||||||
"Failed to find any class that implements Connector and which name matches "
|
"Failed to find any class that implements Connector and which name matches "
|
||||||
+ connectorClassOrAlias
|
+ connectorClassOrAlias
|
||||||
+ ", available connectors are: "
|
+ ", available connectors are: "
|
||||||
+ Utils.join(connectors, ", ")
|
+ connectors.stream().map(PluginDesc::toString).collect(Collectors.joining(", "))
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
if (matches.size() > 1) {
|
if (matches.size() > 1) {
|
||||||
|
@ -325,7 +325,7 @@ public class Plugins {
|
||||||
"More than one connector matches alias "
|
"More than one connector matches alias "
|
||||||
+ connectorClassOrAlias
|
+ connectorClassOrAlias
|
||||||
+ ". Please use full package and class name instead. Classes found: "
|
+ ". Please use full package and class name instead. Classes found: "
|
||||||
+ Utils.join(connectors, ", ")
|
+ connectors.stream().map(PluginDesc::toString).collect(Collectors.joining(", "))
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1265,7 +1265,7 @@ public class KafkaConfigBackingStore extends KafkaTopicBasedBackingStore impleme
|
||||||
|
|
||||||
try {
|
try {
|
||||||
int taskNum = Integer.parseInt(parts[parts.length - 1]);
|
int taskNum = Integer.parseInt(parts[parts.length - 1]);
|
||||||
String connectorName = Utils.join(Arrays.copyOfRange(parts, 1, parts.length - 1), "-");
|
String connectorName = String.join("-", Arrays.copyOfRange(parts, 1, parts.length - 1));
|
||||||
return new ConnectorTaskId(connectorName, taskNum);
|
return new ConnectorTaskId(connectorName, taskNum);
|
||||||
} catch (NumberFormatException e) {
|
} catch (NumberFormatException e) {
|
||||||
return null;
|
return null;
|
||||||
|
|
|
@ -30,7 +30,6 @@ import org.apache.kafka.common.serialization.StringDeserializer;
|
||||||
import org.apache.kafka.common.serialization.StringSerializer;
|
import org.apache.kafka.common.serialization.StringSerializer;
|
||||||
import org.apache.kafka.common.utils.ThreadUtils;
|
import org.apache.kafka.common.utils.ThreadUtils;
|
||||||
import org.apache.kafka.common.utils.Time;
|
import org.apache.kafka.common.utils.Time;
|
||||||
import org.apache.kafka.common.utils.Utils;
|
|
||||||
import org.apache.kafka.connect.data.Schema;
|
import org.apache.kafka.connect.data.Schema;
|
||||||
import org.apache.kafka.connect.data.SchemaAndValue;
|
import org.apache.kafka.connect.data.SchemaAndValue;
|
||||||
import org.apache.kafka.connect.data.SchemaBuilder;
|
import org.apache.kafka.connect.data.SchemaBuilder;
|
||||||
|
@ -534,7 +533,7 @@ public class KafkaStatusBackingStore extends KafkaTopicBasedBackingStore impleme
|
||||||
|
|
||||||
try {
|
try {
|
||||||
int taskNum = Integer.parseInt(parts[parts.length - 1]);
|
int taskNum = Integer.parseInt(parts[parts.length - 1]);
|
||||||
String connectorName = Utils.join(Arrays.copyOfRange(parts, 2, parts.length - 1), "-");
|
String connectorName = String.join("-", Arrays.copyOfRange(parts, 2, parts.length - 1));
|
||||||
return new ConnectorTaskId(connectorName, taskNum);
|
return new ConnectorTaskId(connectorName, taskNum);
|
||||||
} catch (NumberFormatException e) {
|
} catch (NumberFormatException e) {
|
||||||
log.warn("Invalid task status key {}", key);
|
log.warn("Invalid task status key {}", key);
|
||||||
|
|
|
@ -47,7 +47,6 @@ import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
|
||||||
import org.apache.kafka.common.errors.UnsupportedVersionException;
|
import org.apache.kafka.common.errors.UnsupportedVersionException;
|
||||||
import org.apache.kafka.common.utils.Time;
|
import org.apache.kafka.common.utils.Time;
|
||||||
import org.apache.kafka.common.utils.Timer;
|
import org.apache.kafka.common.utils.Timer;
|
||||||
import org.apache.kafka.common.utils.Utils;
|
|
||||||
import org.apache.kafka.connect.errors.ConnectException;
|
import org.apache.kafka.connect.errors.ConnectException;
|
||||||
import org.apache.kafka.connect.errors.RetriableException;
|
import org.apache.kafka.connect.errors.RetriableException;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
|
@ -399,7 +398,7 @@ public class TopicAdmin implements AutoCloseable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (topicsByName.isEmpty()) return EMPTY_CREATION;
|
if (topicsByName.isEmpty()) return EMPTY_CREATION;
|
||||||
String topicNameList = Utils.join(topicsByName.keySet(), "', '");
|
String topicNameList = String.join("', '", topicsByName.keySet());
|
||||||
|
|
||||||
// Attempt to create any missing topics
|
// Attempt to create any missing topics
|
||||||
CreateTopicsOptions args = new CreateTopicsOptions().validateOnly(false);
|
CreateTopicsOptions args = new CreateTopicsOptions().validateOnly(false);
|
||||||
|
|
|
@ -24,12 +24,13 @@ import org.apache.kafka.common.message.FetchResponseData
|
||||||
import org.apache.kafka.common.protocol.Errors
|
import org.apache.kafka.common.protocol.Errors
|
||||||
import org.apache.kafka.common.requests.FetchMetadata.{FINAL_EPOCH, INITIAL_EPOCH, INVALID_SESSION_ID}
|
import org.apache.kafka.common.requests.FetchMetadata.{FINAL_EPOCH, INITIAL_EPOCH, INVALID_SESSION_ID}
|
||||||
import org.apache.kafka.common.requests.{FetchRequest, FetchResponse, FetchMetadata => JFetchMetadata}
|
import org.apache.kafka.common.requests.{FetchRequest, FetchResponse, FetchMetadata => JFetchMetadata}
|
||||||
import org.apache.kafka.common.utils.{ImplicitLinkedHashCollection, Time, Utils}
|
import org.apache.kafka.common.utils.{ImplicitLinkedHashCollection, Time}
|
||||||
import org.apache.kafka.server.metrics.KafkaMetricsGroup
|
import org.apache.kafka.server.metrics.KafkaMetricsGroup
|
||||||
import java.util
|
import java.util
|
||||||
import java.util.{Collections, Optional}
|
import java.util.{Collections, Optional}
|
||||||
import java.util.concurrent.{ThreadLocalRandom, TimeUnit}
|
import java.util.concurrent.{ThreadLocalRandom, TimeUnit}
|
||||||
|
|
||||||
|
|
||||||
import scala.collection.mutable
|
import scala.collection.mutable
|
||||||
import scala.math.Ordered.orderingToOrdered
|
import scala.math.Ordered.orderingToOrdered
|
||||||
|
|
||||||
|
@ -47,7 +48,7 @@ object FetchSession {
|
||||||
|
|
||||||
def partitionsToLogString(partitions: util.Collection[TopicIdPartition], traceEnabled: Boolean): String = {
|
def partitionsToLogString(partitions: util.Collection[TopicIdPartition], traceEnabled: Boolean): String = {
|
||||||
if (traceEnabled) {
|
if (traceEnabled) {
|
||||||
"(" + Utils.join(partitions, ", ") + ")"
|
"(" + String.join(", ", partitions.toString) + ")"
|
||||||
} else {
|
} else {
|
||||||
s"${partitions.size} partition(s)"
|
s"${partitions.size} partition(s)"
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,7 +17,6 @@
|
||||||
package org.apache.kafka.coordinator.group;
|
package org.apache.kafka.coordinator.group;
|
||||||
|
|
||||||
import org.apache.kafka.common.record.CompressionType;
|
import org.apache.kafka.common.record.CompressionType;
|
||||||
import org.apache.kafka.common.utils.Utils;
|
|
||||||
import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
|
import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
|
||||||
import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
|
import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
|
||||||
import org.apache.kafka.coordinator.group.assignor.UniformAssignor;
|
import org.apache.kafka.coordinator.group.assignor.UniformAssignor;
|
||||||
|
@ -25,6 +24,7 @@ import org.apache.kafka.coordinator.group.assignor.UniformAssignor;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The group coordinator configurations.
|
* The group coordinator configurations.
|
||||||
|
@ -53,7 +53,8 @@ public class GroupCoordinatorConfig {
|
||||||
public static final boolean NEW_GROUP_COORDINATOR_ENABLE_DEFAULT = false;
|
public static final boolean NEW_GROUP_COORDINATOR_ENABLE_DEFAULT = false;
|
||||||
|
|
||||||
public final static String GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG = "group.coordinator.rebalance.protocols";
|
public final static String GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG = "group.coordinator.rebalance.protocols";
|
||||||
public final static String GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DOC = "The list of enabled rebalance protocols. Supported protocols: " + Utils.join(Group.GroupType.values(), ",") + ". " +
|
public final static String GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DOC = "The list of enabled rebalance protocols. Supported protocols: " +
|
||||||
|
Arrays.stream(Group.GroupType.values()).map(Group.GroupType::toString).collect(Collectors.joining(",")) + ". " +
|
||||||
"The " + Group.GroupType.CONSUMER + " rebalance protocol is in early access and therefore must not be used in production.";
|
"The " + Group.GroupType.CONSUMER + " rebalance protocol is in early access and therefore must not be used in production.";
|
||||||
public static final List<String> GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DEFAULT = Collections.singletonList(Group.GroupType.CLASSIC.toString());
|
public static final List<String> GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DEFAULT = Collections.singletonList(Group.GroupType.CLASSIC.toString());
|
||||||
|
|
||||||
|
|
|
@ -19,7 +19,6 @@ package org.apache.kafka.server.config;
|
||||||
|
|
||||||
import org.apache.kafka.common.config.TopicConfig;
|
import org.apache.kafka.common.config.TopicConfig;
|
||||||
import org.apache.kafka.common.security.auth.SecurityProtocol;
|
import org.apache.kafka.common.security.auth.SecurityProtocol;
|
||||||
import org.apache.kafka.common.utils.Utils;
|
|
||||||
import org.apache.kafka.server.common.MetadataVersion;
|
import org.apache.kafka.server.common.MetadataVersion;
|
||||||
|
|
||||||
public class ReplicationConfigs {
|
public class ReplicationConfigs {
|
||||||
|
@ -116,7 +115,7 @@ public class ReplicationConfigs {
|
||||||
public static final String INTER_BROKER_SECURITY_PROTOCOL_DEFAULT = SecurityProtocol.PLAINTEXT.toString();
|
public static final String INTER_BROKER_SECURITY_PROTOCOL_DEFAULT = SecurityProtocol.PLAINTEXT.toString();
|
||||||
public static final String INTER_BROKER_LISTENER_NAME_CONFIG = "inter.broker.listener.name";
|
public static final String INTER_BROKER_LISTENER_NAME_CONFIG = "inter.broker.listener.name";
|
||||||
public static final String INTER_BROKER_SECURITY_PROTOCOL_DOC = "Security protocol used to communicate between brokers. Valid values are: " +
|
public static final String INTER_BROKER_SECURITY_PROTOCOL_DOC = "Security protocol used to communicate between brokers. Valid values are: " +
|
||||||
Utils.join(SecurityProtocol.names(), ", ") + ". It is an error to set this and " + INTER_BROKER_LISTENER_NAME_CONFIG +
|
String.join(", ", SecurityProtocol.names()) + ". It is an error to set this and " + INTER_BROKER_LISTENER_NAME_CONFIG +
|
||||||
" properties at the same time.";
|
" properties at the same time.";
|
||||||
public static final String INTER_BROKER_LISTENER_NAME_DOC = "Name of listener used for communication between brokers. If this is unset, the listener name is defined by " + INTER_BROKER_SECURITY_PROTOCOL_CONFIG +
|
public static final String INTER_BROKER_LISTENER_NAME_DOC = "Name of listener used for communication between brokers. If this is unset, the listener name is defined by " + INTER_BROKER_SECURITY_PROTOCOL_CONFIG +
|
||||||
"It is an error to set this and " + INTER_BROKER_SECURITY_PROTOCOL_CONFIG + " properties at the same time.";
|
"It is an error to set this and " + INTER_BROKER_SECURITY_PROTOCOL_CONFIG + " properties at the same time.";
|
||||||
|
|
|
@ -216,7 +216,7 @@ public final class TieredStorageTestContext implements AutoCloseable {
|
||||||
Function0<String> messageSupplier = () ->
|
Function0<String> messageSupplier = () ->
|
||||||
String.format("Could not consume %d records of %s from offset %d in %d ms. %d message(s) consumed:%s%s",
|
String.format("Could not consume %d records of %s from offset %d in %d ms. %d message(s) consumed:%s%s",
|
||||||
expectedTotalCount, topicPartition, fetchOffset, timeoutMs, records.size(), sep,
|
expectedTotalCount, topicPartition, fetchOffset, timeoutMs, records.size(), sep,
|
||||||
Utils.join(records, sep));
|
records.stream().map(Object::toString).collect(Collectors.joining(sep)));
|
||||||
TestUtils.pollRecordsUntilTrue(consumer, pollAction, messageSupplier, timeoutMs);
|
TestUtils.pollRecordsUntilTrue(consumer, pollAction, messageSupplier, timeoutMs);
|
||||||
return records;
|
return records;
|
||||||
}
|
}
|
||||||
|
|
|
@ -120,7 +120,7 @@ public final class BrokerLocalStorage {
|
||||||
"in the log directory is %d which is %s the expected offset %s. The directory of %s is " +
|
"in the log directory is %d which is %s the expected offset %s. The directory of %s is " +
|
||||||
"made of the following files: %s", brokerId, topicPartition,
|
"made of the following files: %s", brokerId, topicPartition,
|
||||||
offsetHolder.firstLogFileBaseOffset, pos, offset, topicPartition,
|
offsetHolder.firstLogFileBaseOffset, pos, offset, topicPartition,
|
||||||
Utils.join(offsetHolder.partitionFiles, System.lineSeparator()));
|
String.join(System.lineSeparator(), offsetHolder.partitionFiles));
|
||||||
throw new AssertionError(message);
|
throw new AssertionError(message);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -29,7 +29,6 @@ import org.apache.kafka.common.serialization.Deserializer;
|
||||||
import org.apache.kafka.common.serialization.Serde;
|
import org.apache.kafka.common.serialization.Serde;
|
||||||
import org.apache.kafka.common.serialization.Serdes;
|
import org.apache.kafka.common.serialization.Serdes;
|
||||||
import org.apache.kafka.common.serialization.Serializer;
|
import org.apache.kafka.common.serialization.Serializer;
|
||||||
import org.apache.kafka.common.utils.Utils;
|
|
||||||
import org.apache.kafka.streams.errors.StreamsException;
|
import org.apache.kafka.streams.errors.StreamsException;
|
||||||
import org.apache.kafka.streams.internals.UpgradeFromValues;
|
import org.apache.kafka.streams.internals.UpgradeFromValues;
|
||||||
import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
|
import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
|
||||||
|
@ -257,7 +256,7 @@ public class StreamsConfigTest {
|
||||||
@Test
|
@Test
|
||||||
public void shouldSupportMultipleBootstrapServers() {
|
public void shouldSupportMultipleBootstrapServers() {
|
||||||
final List<String> expectedBootstrapServers = Arrays.asList("broker1:9092", "broker2:9092");
|
final List<String> expectedBootstrapServers = Arrays.asList("broker1:9092", "broker2:9092");
|
||||||
final String bootstrapServersString = Utils.join(expectedBootstrapServers, ",");
|
final String bootstrapServersString = String.join(",", expectedBootstrapServers);
|
||||||
final Properties props = new Properties();
|
final Properties props = new Properties();
|
||||||
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "irrelevant");
|
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "irrelevant");
|
||||||
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServersString);
|
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServersString);
|
||||||
|
|
|
@ -38,6 +38,7 @@ import java.time.Instant;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Comparator;
|
import java.util.Comparator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Objects;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.Properties;
|
import java.util.Properties;
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
|
@ -230,8 +231,8 @@ public class MetadataQuorumCommand {
|
||||||
"\nHighWatermark: " + quorumInfo.highWatermark() +
|
"\nHighWatermark: " + quorumInfo.highWatermark() +
|
||||||
"\nMaxFollowerLag: " + maxFollowerLag +
|
"\nMaxFollowerLag: " + maxFollowerLag +
|
||||||
"\nMaxFollowerLagTimeMs: " + maxFollowerLagTimeMs +
|
"\nMaxFollowerLagTimeMs: " + maxFollowerLagTimeMs +
|
||||||
"\nCurrentVoters: " + Utils.mkString(quorumInfo.voters().stream().map(v -> v.replicaId()), "[", "]", ",") +
|
"\nCurrentVoters: " + quorumInfo.voters().stream().map(QuorumInfo.ReplicaState::replicaId).map(Object::toString).collect(Collectors.joining(",", "[", "]")) +
|
||||||
"\nCurrentObservers: " + Utils.mkString(quorumInfo.observers().stream().map(v -> v.replicaId()), "[", "]", ",")
|
"\nCurrentObservers: " + quorumInfo.observers().stream().map(QuorumInfo.ReplicaState::replicaId).map(Objects::toString).collect(Collectors.joining(",", "[", "]"))
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -410,7 +410,7 @@ public abstract class TransactionsCommand {
|
||||||
String.valueOf(result.transactionTimeoutMs()),
|
String.valueOf(result.transactionTimeoutMs()),
|
||||||
transactionStartTimeMsColumnValue,
|
transactionStartTimeMsColumnValue,
|
||||||
transactionDurationMsColumnValue,
|
transactionDurationMsColumnValue,
|
||||||
Utils.join(result.topicPartitions(), ",")
|
result.topicPartitions().stream().map(TopicPartition::toString).collect(Collectors.joining(","))
|
||||||
);
|
);
|
||||||
|
|
||||||
ToolsUtils.prettyPrintTable(HEADERS, singletonList(row), out);
|
ToolsUtils.prettyPrintTable(HEADERS, singletonList(row), out);
|
||||||
|
|
|
@ -53,6 +53,7 @@ import java.io.IOException;
|
||||||
import java.io.PrintStream;
|
import java.io.PrintStream;
|
||||||
import java.time.Duration;
|
import java.time.Duration;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
@ -60,6 +61,7 @@ import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Properties;
|
import java.util.Properties;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import static net.sourceforge.argparse4j.impl.Arguments.store;
|
import static net.sourceforge.argparse4j.impl.Arguments.store;
|
||||||
import static net.sourceforge.argparse4j.impl.Arguments.storeTrue;
|
import static net.sourceforge.argparse4j.impl.Arguments.storeTrue;
|
||||||
|
@ -536,7 +538,8 @@ public class VerifiableConsumer implements Closeable, OffsetCommitCallback, Cons
|
||||||
.setDefault(ConsumerConfig.DEFAULT_GROUP_PROTOCOL)
|
.setDefault(ConsumerConfig.DEFAULT_GROUP_PROTOCOL)
|
||||||
.metavar("GROUP_PROTOCOL")
|
.metavar("GROUP_PROTOCOL")
|
||||||
.dest("groupProtocol")
|
.dest("groupProtocol")
|
||||||
.help(String.format("Group protocol (must be one of %s)", Utils.join(GroupProtocol.values(), ", ")));
|
.help(String.format("Group protocol (must be one of %s)", Arrays.stream(GroupProtocol.values())
|
||||||
|
.map(Object::toString).collect(Collectors.joining(", "))));
|
||||||
|
|
||||||
parser.addArgument("--group-remote-assignor")
|
parser.addArgument("--group-remote-assignor")
|
||||||
.action(store())
|
.action(store())
|
||||||
|
|
|
@ -132,7 +132,7 @@ public class ConsumerGroupCommand {
|
||||||
Set<ConsumerGroupState> parsedStates = Arrays.stream(input.split(",")).map(s -> ConsumerGroupState.parse(s.trim())).collect(Collectors.toSet());
|
Set<ConsumerGroupState> parsedStates = Arrays.stream(input.split(",")).map(s -> ConsumerGroupState.parse(s.trim())).collect(Collectors.toSet());
|
||||||
if (parsedStates.contains(ConsumerGroupState.UNKNOWN)) {
|
if (parsedStates.contains(ConsumerGroupState.UNKNOWN)) {
|
||||||
Collection<ConsumerGroupState> validStates = Arrays.stream(ConsumerGroupState.values()).filter(s -> s != ConsumerGroupState.UNKNOWN).collect(Collectors.toList());
|
Collection<ConsumerGroupState> validStates = Arrays.stream(ConsumerGroupState.values()).filter(s -> s != ConsumerGroupState.UNKNOWN).collect(Collectors.toList());
|
||||||
throw new IllegalArgumentException("Invalid state list '" + input + "'. Valid states are: " + Utils.join(validStates, ", "));
|
throw new IllegalArgumentException("Invalid state list '" + input + "'. Valid states are: " + validStates.stream().map(ConsumerGroupState::toString).collect(Collectors.joining(", ")));
|
||||||
}
|
}
|
||||||
return parsedStates;
|
return parsedStates;
|
||||||
}
|
}
|
||||||
|
@ -629,7 +629,7 @@ public class ConsumerGroupCommand {
|
||||||
|
|
||||||
switch (topLevelResult) {
|
switch (topLevelResult) {
|
||||||
case NONE:
|
case NONE:
|
||||||
System.out.println("Request succeed for deleting offsets with topic " + Utils.mkString(topics.stream(), "", "", ", ") + " group " + groupId);
|
System.out.println("Request succeed for deleting offsets with topic " + String.join(", ", topics) + " group " + groupId);
|
||||||
break;
|
break;
|
||||||
case INVALID_GROUP_ID:
|
case INVALID_GROUP_ID:
|
||||||
printError("'" + groupId + "' is not valid.", Optional.empty());
|
printError("'" + groupId + "' is not valid.", Optional.empty());
|
||||||
|
@ -1159,7 +1159,7 @@ public class ConsumerGroupCommand {
|
||||||
? CsvUtils.writerFor(CsvUtils.CsvRecordNoGroup.class)
|
? CsvUtils.writerFor(CsvUtils.CsvRecordNoGroup.class)
|
||||||
: CsvUtils.writerFor(CsvUtils.CsvRecordWithGroup.class);
|
: CsvUtils.writerFor(CsvUtils.CsvRecordWithGroup.class);
|
||||||
|
|
||||||
return Utils.mkString(assignments.entrySet().stream().flatMap(e -> {
|
return assignments.entrySet().stream().flatMap(e -> {
|
||||||
String groupId = e.getKey();
|
String groupId = e.getKey();
|
||||||
Map<TopicPartition, OffsetAndMetadata> partitionInfo = e.getValue();
|
Map<TopicPartition, OffsetAndMetadata> partitionInfo = e.getValue();
|
||||||
|
|
||||||
|
@ -1176,7 +1176,7 @@ public class ConsumerGroupCommand {
|
||||||
throw new RuntimeException(err);
|
throw new RuntimeException(err);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}), "", "", "");
|
}).collect(Collectors.joining());
|
||||||
}
|
}
|
||||||
|
|
||||||
Map<String, Throwable> deleteGroups() {
|
Map<String, Throwable> deleteGroups() {
|
||||||
|
@ -1202,13 +1202,13 @@ public class ConsumerGroupCommand {
|
||||||
});
|
});
|
||||||
|
|
||||||
if (failed.isEmpty())
|
if (failed.isEmpty())
|
||||||
System.out.println("Deletion of requested consumer groups (" + Utils.mkString(success.keySet().stream(), "'", "'", "', '") + ") was successful.");
|
System.out.println("Deletion of requested consumer groups (" + "'" + success.keySet().stream().map(Object::toString).collect(Collectors.joining(", ")) + "'" + ") was successful.");
|
||||||
else {
|
else {
|
||||||
printError("Deletion of some consumer groups failed:", Optional.empty());
|
printError("Deletion of some consumer groups failed:", Optional.empty());
|
||||||
failed.forEach((group, error) -> System.out.println("* Group '" + group + "' could not be deleted due to: " + error));
|
failed.forEach((group, error) -> System.out.println("* Group '" + group + "' could not be deleted due to: " + error));
|
||||||
|
|
||||||
if (!success.isEmpty())
|
if (!success.isEmpty())
|
||||||
System.out.println("\nThese consumer groups were deleted successfully: " + Utils.mkString(success.keySet().stream(), "'", "', '", "'"));
|
System.out.println("\nThese consumer groups were deleted successfully: " + "'" + success.keySet().stream().map(Object::toString).collect(Collectors.joining("'")) + "', '");
|
||||||
}
|
}
|
||||||
|
|
||||||
failed.putAll(success);
|
failed.putAll(success);
|
||||||
|
|
|
@ -26,8 +26,8 @@ import java.util.Arrays;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import static org.apache.kafka.common.utils.Utils.join;
|
|
||||||
import static org.apache.kafka.tools.ToolsUtils.minus;
|
import static org.apache.kafka.tools.ToolsUtils.minus;
|
||||||
|
|
||||||
public class ConsumerGroupCommandOptions extends CommandDefaultOptions {
|
public class ConsumerGroupCommandOptions extends CommandDefaultOptions {
|
||||||
|
@ -213,11 +213,11 @@ public class ConsumerGroupCommandOptions extends CommandDefaultOptions {
|
||||||
if (options.has(describeOpt)) {
|
if (options.has(describeOpt)) {
|
||||||
if (!options.has(groupOpt) && !options.has(allGroupsOpt))
|
if (!options.has(groupOpt) && !options.has(allGroupsOpt))
|
||||||
CommandLineUtils.printUsageAndExit(parser,
|
CommandLineUtils.printUsageAndExit(parser,
|
||||||
"Option " + describeOpt + " takes one of these options: " + join(allGroupSelectionScopeOpts, ", "));
|
"Option " + describeOpt + " takes one of these options: " + allConsumerGroupLevelOpts.stream().map(Object::toString).collect(Collectors.joining(", ")));
|
||||||
List<OptionSpec<?>> mutuallyExclusiveOpts = Arrays.asList(membersOpt, offsetsOpt, stateOpt);
|
List<OptionSpec<?>> mutuallyExclusiveOpts = Arrays.asList(membersOpt, offsetsOpt, stateOpt);
|
||||||
if (mutuallyExclusiveOpts.stream().mapToInt(o -> options.has(o) ? 1 : 0).sum() > 1) {
|
if (mutuallyExclusiveOpts.stream().mapToInt(o -> options.has(o) ? 1 : 0).sum() > 1) {
|
||||||
CommandLineUtils.printUsageAndExit(parser,
|
CommandLineUtils.printUsageAndExit(parser,
|
||||||
"Option " + describeOpt + " takes at most one of these options: " + join(mutuallyExclusiveOpts, ", "));
|
"Option " + describeOpt + " takes at most one of these options: " + mutuallyExclusiveOpts.stream().map(Object::toString).collect(Collectors.joining(", ")));
|
||||||
}
|
}
|
||||||
if (options.has(stateOpt) && options.valueOf(stateOpt) != null)
|
if (options.has(stateOpt) && options.valueOf(stateOpt) != null)
|
||||||
CommandLineUtils.printUsageAndExit(parser,
|
CommandLineUtils.printUsageAndExit(parser,
|
||||||
|
@ -230,7 +230,7 @@ public class ConsumerGroupCommandOptions extends CommandDefaultOptions {
|
||||||
if (options.has(deleteOpt)) {
|
if (options.has(deleteOpt)) {
|
||||||
if (!options.has(groupOpt) && !options.has(allGroupsOpt))
|
if (!options.has(groupOpt) && !options.has(allGroupsOpt))
|
||||||
CommandLineUtils.printUsageAndExit(parser,
|
CommandLineUtils.printUsageAndExit(parser,
|
||||||
"Option " + deleteOpt + " takes one of these options: " + join(allGroupSelectionScopeOpts, ", "));
|
"Option " + deleteOpt + " takes one of these options: " + allGroupSelectionScopeOpts.stream().map(Object::toString).collect(Collectors.joining(", ")));
|
||||||
if (options.has(topicOpt))
|
if (options.has(topicOpt))
|
||||||
CommandLineUtils.printUsageAndExit(parser, "The consumer does not support topic-specific offset " +
|
CommandLineUtils.printUsageAndExit(parser, "The consumer does not support topic-specific offset " +
|
||||||
"deletion from a consumer group.");
|
"deletion from a consumer group.");
|
||||||
|
@ -239,7 +239,7 @@ public class ConsumerGroupCommandOptions extends CommandDefaultOptions {
|
||||||
if (options.has(deleteOffsetsOpt)) {
|
if (options.has(deleteOffsetsOpt)) {
|
||||||
if (!options.has(groupOpt) || !options.has(topicOpt))
|
if (!options.has(groupOpt) || !options.has(topicOpt))
|
||||||
CommandLineUtils.printUsageAndExit(parser,
|
CommandLineUtils.printUsageAndExit(parser,
|
||||||
"Option " + deleteOffsetsOpt + " takes the following options: " + join(allDeleteOffsetsOpts, ", "));
|
"Option " + deleteOffsetsOpt + " takes the following options: " + allGroupSelectionScopeOpts.stream().map(Object::toString).collect(Collectors.joining(", ")));
|
||||||
}
|
}
|
||||||
|
|
||||||
if (options.has(resetOffsetsOpt)) {
|
if (options.has(resetOffsetsOpt)) {
|
||||||
|
@ -255,7 +255,7 @@ public class ConsumerGroupCommandOptions extends CommandDefaultOptions {
|
||||||
|
|
||||||
if (!options.has(groupOpt) && !options.has(allGroupsOpt))
|
if (!options.has(groupOpt) && !options.has(allGroupsOpt))
|
||||||
CommandLineUtils.printUsageAndExit(parser,
|
CommandLineUtils.printUsageAndExit(parser,
|
||||||
"Option " + resetOffsetsOpt + " takes one of these options: " + join(allGroupSelectionScopeOpts, ", "));
|
"Option " + resetOffsetsOpt + " takes one of these options: " + allGroupSelectionScopeOpts.stream().map(Object::toString).collect(Collectors.joining(", ")));
|
||||||
CommandLineUtils.checkInvalidArgs(parser, options, resetToOffsetOpt, minus(allResetOffsetScenarioOpts, resetToOffsetOpt));
|
CommandLineUtils.checkInvalidArgs(parser, options, resetToOffsetOpt, minus(allResetOffsetScenarioOpts, resetToOffsetOpt));
|
||||||
CommandLineUtils.checkInvalidArgs(parser, options, resetToDatetimeOpt, minus(allResetOffsetScenarioOpts, resetToDatetimeOpt));
|
CommandLineUtils.checkInvalidArgs(parser, options, resetToDatetimeOpt, minus(allResetOffsetScenarioOpts, resetToDatetimeOpt));
|
||||||
CommandLineUtils.checkInvalidArgs(parser, options, resetByDurationOpt, minus(allResetOffsetScenarioOpts, resetByDurationOpt));
|
CommandLineUtils.checkInvalidArgs(parser, options, resetByDurationOpt, minus(allResetOffsetScenarioOpts, resetByDurationOpt));
|
||||||
|
|
|
@ -488,12 +488,12 @@ public class ReassignPartitionsCommand {
|
||||||
targetParts.forEach(t -> brokers.addAll(t.getValue()));
|
targetParts.forEach(t -> brokers.addAll(t.getValue()));
|
||||||
|
|
||||||
System.out.printf("Clearing broker-level throttles on broker%s %s%n",
|
System.out.printf("Clearing broker-level throttles on broker%s %s%n",
|
||||||
brokers.size() == 1 ? "" : "s", Utils.join(brokers, ","));
|
brokers.size() == 1 ? "" : "s", brokers.stream().map(Object::toString).collect(Collectors.joining(",")));
|
||||||
clearBrokerLevelThrottles(adminClient, brokers);
|
clearBrokerLevelThrottles(adminClient, brokers);
|
||||||
|
|
||||||
Set<String> topics = targetParts.stream().map(t -> t.getKey().topic()).collect(Collectors.toSet());
|
Set<String> topics = targetParts.stream().map(t -> t.getKey().topic()).collect(Collectors.toSet());
|
||||||
System.out.printf("Clearing topic-level throttles on topic%s %s%n",
|
System.out.printf("Clearing topic-level throttles on topic%s %s%n",
|
||||||
topics.size() == 1 ? "" : "s", Utils.join(topics, ","));
|
topics.size() == 1 ? "" : "s", String.join(",", topics));
|
||||||
clearTopicLevelThrottles(adminClient, topics);
|
clearTopicLevelThrottles(adminClient, topics);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -37,7 +37,6 @@ import org.apache.kafka.common.Node;
|
||||||
import org.apache.kafka.common.TopicPartition;
|
import org.apache.kafka.common.TopicPartition;
|
||||||
import org.apache.kafka.common.TopicPartitionInfo;
|
import org.apache.kafka.common.TopicPartitionInfo;
|
||||||
import org.apache.kafka.common.internals.KafkaFutureImpl;
|
import org.apache.kafka.common.internals.KafkaFutureImpl;
|
||||||
import org.apache.kafka.common.utils.Utils;
|
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
import org.mockito.ArgumentMatcher;
|
import org.mockito.ArgumentMatcher;
|
||||||
import org.mockito.ArgumentMatchers;
|
import org.mockito.ArgumentMatchers;
|
||||||
|
@ -196,7 +195,7 @@ public class ConsumerGroupServiceTest {
|
||||||
public void testAdminRequestsForResetOffsets() {
|
public void testAdminRequestsForResetOffsets() {
|
||||||
List<String> args = new ArrayList<>(Arrays.asList("--bootstrap-server", "localhost:9092", "--group", GROUP, "--reset-offsets", "--to-latest"));
|
List<String> args = new ArrayList<>(Arrays.asList("--bootstrap-server", "localhost:9092", "--group", GROUP, "--reset-offsets", "--to-latest"));
|
||||||
List<String> topicsWithoutPartitionsSpecified = TOPICS.subList(1, TOPICS.size());
|
List<String> topicsWithoutPartitionsSpecified = TOPICS.subList(1, TOPICS.size());
|
||||||
List<String> topicArgs = new ArrayList<>(Arrays.asList("--topic", TOPICS.get(0) + ":" + Utils.mkString(IntStream.range(0, NUM_PARTITIONS).mapToObj(Integer::toString), "", "", ",")));
|
List<String> topicArgs = new ArrayList<>(Arrays.asList("--topic", TOPICS.get(0) + ":" + (IntStream.range(0, NUM_PARTITIONS).mapToObj(Integer::toString).collect(Collectors.joining(",")))));
|
||||||
topicsWithoutPartitionsSpecified.forEach(topic -> topicArgs.addAll(Arrays.asList("--topic", topic)));
|
topicsWithoutPartitionsSpecified.forEach(topic -> topicArgs.addAll(Arrays.asList("--topic", topic)));
|
||||||
|
|
||||||
args.addAll(topicArgs);
|
args.addAll(topicArgs);
|
||||||
|
|
|
@ -27,7 +27,6 @@ import org.apache.kafka.common.KafkaFuture;
|
||||||
import org.apache.kafka.common.utils.Exit;
|
import org.apache.kafka.common.utils.Exit;
|
||||||
import org.apache.kafka.common.utils.Scheduler;
|
import org.apache.kafka.common.utils.Scheduler;
|
||||||
import org.apache.kafka.common.utils.Time;
|
import org.apache.kafka.common.utils.Time;
|
||||||
import org.apache.kafka.common.utils.Utils;
|
|
||||||
import org.apache.kafka.trogdor.common.JsonUtil;
|
import org.apache.kafka.trogdor.common.JsonUtil;
|
||||||
import org.apache.kafka.trogdor.common.Node;
|
import org.apache.kafka.trogdor.common.Node;
|
||||||
import org.apache.kafka.trogdor.common.Platform;
|
import org.apache.kafka.trogdor.common.Platform;
|
||||||
|
@ -175,7 +174,7 @@ public final class Agent {
|
||||||
Set<String> nodes = controller.targetNodes(platform.topology());
|
Set<String> nodes = controller.targetNodes(platform.topology());
|
||||||
if (!nodes.contains(platform.curNode().name())) {
|
if (!nodes.contains(platform.curNode().name())) {
|
||||||
out.println("This task is not configured to run on this node. It runs on node(s): " +
|
out.println("This task is not configured to run on this node. It runs on node(s): " +
|
||||||
Utils.join(nodes, ", ") + ", whereas this node is " +
|
String.join(", ", nodes) + ", whereas this node is " +
|
||||||
platform.curNode().name());
|
platform.curNode().name());
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,7 +20,6 @@ package org.apache.kafka.trogdor.basic;
|
||||||
import com.fasterxml.jackson.databind.JsonNode;
|
import com.fasterxml.jackson.databind.JsonNode;
|
||||||
import org.apache.kafka.common.utils.Scheduler;
|
import org.apache.kafka.common.utils.Scheduler;
|
||||||
import org.apache.kafka.common.utils.Shell;
|
import org.apache.kafka.common.utils.Shell;
|
||||||
import org.apache.kafka.common.utils.Utils;
|
|
||||||
import org.apache.kafka.trogdor.common.Node;
|
import org.apache.kafka.trogdor.common.Node;
|
||||||
import org.apache.kafka.trogdor.common.Platform;
|
import org.apache.kafka.trogdor.common.Platform;
|
||||||
import org.apache.kafka.trogdor.common.Topology;
|
import org.apache.kafka.trogdor.common.Topology;
|
||||||
|
@ -49,10 +48,10 @@ public class BasicPlatform implements Platform {
|
||||||
public String run(Node curNode, String[] command) throws IOException {
|
public String run(Node curNode, String[] command) throws IOException {
|
||||||
try {
|
try {
|
||||||
String result = Shell.execCommand(command);
|
String result = Shell.execCommand(command);
|
||||||
log.info("RUN: {}. RESULT: [{}]", Utils.join(command, " "), result);
|
log.info("RUN: {}. RESULT: [{}]", String.join(" ", command), result);
|
||||||
return result;
|
return result;
|
||||||
} catch (RuntimeException | IOException e) {
|
} catch (RuntimeException | IOException e) {
|
||||||
log.info("RUN: {}. ERROR: [{}]", Utils.join(command, " "), e.getMessage());
|
log.info("RUN: {}. ERROR: [{}]", String.join(" ", command), e.getMessage());
|
||||||
throw e;
|
throw e;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -64,7 +63,7 @@ public class BasicPlatform implements Platform {
|
||||||
if (this.curNode == null) {
|
if (this.curNode == null) {
|
||||||
throw new RuntimeException(String.format("No node named %s found " +
|
throw new RuntimeException(String.format("No node named %s found " +
|
||||||
"in the cluster! Cluster nodes are: %s", curNodeName,
|
"in the cluster! Cluster nodes are: %s", curNodeName,
|
||||||
Utils.join(topology.nodes().keySet(), ",")));
|
String.join(",", topology.nodes().keySet())));
|
||||||
}
|
}
|
||||||
this.topology = topology;
|
this.topology = topology;
|
||||||
this.scheduler = scheduler;
|
this.scheduler = scheduler;
|
||||||
|
@ -83,7 +82,7 @@ public class BasicPlatform implements Platform {
|
||||||
if (this.curNode == null) {
|
if (this.curNode == null) {
|
||||||
throw new RuntimeException(String.format("No node named %s found " +
|
throw new RuntimeException(String.format("No node named %s found " +
|
||||||
"in the cluster! Cluster nodes are: %s", curNodeName,
|
"in the cluster! Cluster nodes are: %s", curNodeName,
|
||||||
Utils.join(topology.nodes().keySet(), ",")));
|
String.join(",", topology.nodes().keySet())));
|
||||||
}
|
}
|
||||||
this.commandRunner = new ShellCommandRunner();
|
this.commandRunner = new ShellCommandRunner();
|
||||||
}
|
}
|
||||||
|
|
|
@ -36,7 +36,6 @@ import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
|
||||||
import org.apache.kafka.common.internals.KafkaFutureImpl;
|
import org.apache.kafka.common.internals.KafkaFutureImpl;
|
||||||
import org.apache.kafka.common.requests.CreateTopicsRequest;
|
import org.apache.kafka.common.requests.CreateTopicsRequest;
|
||||||
import org.apache.kafka.common.utils.Time;
|
import org.apache.kafka.common.utils.Time;
|
||||||
import org.apache.kafka.common.utils.Utils;
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
@ -226,7 +225,7 @@ public final class WorkerUtils {
|
||||||
}
|
}
|
||||||
if (Time.SYSTEM.milliseconds() > startMs + CREATE_TOPICS_CALL_TIMEOUT) {
|
if (Time.SYSTEM.milliseconds() > startMs + CREATE_TOPICS_CALL_TIMEOUT) {
|
||||||
String str = "Unable to create topic(s): " +
|
String str = "Unable to create topic(s): " +
|
||||||
Utils.join(topicsToCreate, ", ") + "after " + tries + " attempt(s)";
|
String.join(", ", topicsToCreate) + "after " + tries + " attempt(s)";
|
||||||
log.warn(str);
|
log.warn(str);
|
||||||
throw new TimeoutException(str);
|
throw new TimeoutException(str);
|
||||||
}
|
}
|
||||||
|
|
|
@ -142,7 +142,7 @@ public final class TaskManager {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
log.info("Created TaskManager for agent(s) on: {}",
|
log.info("Created TaskManager for agent(s) on: {}",
|
||||||
Utils.join(nodeManagers.keySet(), ", "));
|
String.join(", ", nodeManagers.keySet()));
|
||||||
}
|
}
|
||||||
|
|
||||||
class ManagedTask {
|
class ManagedTask {
|
||||||
|
@ -239,7 +239,7 @@ public final class TaskManager {
|
||||||
}
|
}
|
||||||
if (!nonExistentNodeNames.isEmpty()) {
|
if (!nonExistentNodeNames.isEmpty()) {
|
||||||
throw new KafkaException("Unknown node names: " +
|
throw new KafkaException("Unknown node names: " +
|
||||||
Utils.join(nonExistentNodeNames, ", "));
|
String.join(", ", nonExistentNodeNames));
|
||||||
}
|
}
|
||||||
if (validNodeNames.isEmpty()) {
|
if (validNodeNames.isEmpty()) {
|
||||||
throw new KafkaException("No node names specified.");
|
throw new KafkaException("No node names specified.");
|
||||||
|
@ -398,7 +398,7 @@ public final class TaskManager {
|
||||||
task.maybeSetError("Unable to find nodes for task: " + e.getMessage());
|
task.maybeSetError("Unable to find nodes for task: " + e.getMessage());
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
log.info("Running task {} on node(s): {}", task.id, Utils.join(nodeNames, ", "));
|
log.info("Running task {} on node(s): {}", task.id, String.join(", ", nodeNames));
|
||||||
task.state = TaskStateType.RUNNING;
|
task.state = TaskStateType.RUNNING;
|
||||||
task.startedMs = time.milliseconds();
|
task.startedMs = time.milliseconds();
|
||||||
for (String workerName : nodeNames) {
|
for (String workerName : nodeNames) {
|
||||||
|
@ -597,7 +597,7 @@ public final class TaskManager {
|
||||||
task.doneMs = time.milliseconds();
|
task.doneMs = time.milliseconds();
|
||||||
task.state = TaskStateType.DONE;
|
task.state = TaskStateType.DONE;
|
||||||
log.info("{}: Task {} is now complete on {} with error: {}",
|
log.info("{}: Task {} is now complete on {} with error: {}",
|
||||||
nodeName, task.id, Utils.join(task.workerIds.keySet(), ", "),
|
nodeName, task.id, String.join(", ", task.workerIds.keySet()),
|
||||||
task.error.isEmpty() ? "(none)" : task.error);
|
task.error.isEmpty() ? "(none)" : task.error);
|
||||||
} else if ((task.state == TaskStateType.RUNNING) && (!task.error.isEmpty())) {
|
} else if ((task.state == TaskStateType.RUNNING) && (!task.error.isEmpty())) {
|
||||||
log.info("{}: task {} stopped with error {}. Stopping worker(s): {}",
|
log.info("{}: task {} stopped with error {}. Stopping worker(s): {}",
|
||||||
|
|
|
@ -19,7 +19,6 @@ package org.apache.kafka.trogdor.fault;
|
||||||
|
|
||||||
import com.fasterxml.jackson.databind.node.TextNode;
|
import com.fasterxml.jackson.databind.node.TextNode;
|
||||||
import org.apache.kafka.common.internals.KafkaFutureImpl;
|
import org.apache.kafka.common.internals.KafkaFutureImpl;
|
||||||
import org.apache.kafka.common.utils.Utils;
|
|
||||||
import org.apache.kafka.trogdor.common.Platform;
|
import org.apache.kafka.trogdor.common.Platform;
|
||||||
import org.apache.kafka.trogdor.task.TaskWorker;
|
import org.apache.kafka.trogdor.task.TaskWorker;
|
||||||
import org.apache.kafka.trogdor.task.WorkerStatusTracker;
|
import org.apache.kafka.trogdor.task.WorkerStatusTracker;
|
||||||
|
@ -28,6 +27,7 @@ import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
public class ProcessStopFaultWorker implements TaskWorker {
|
public class ProcessStopFaultWorker implements TaskWorker {
|
||||||
private static final Logger log = LoggerFactory.getLogger(ProcessStopFaultWorker.class);
|
private static final Logger log = LoggerFactory.getLogger(ProcessStopFaultWorker.class);
|
||||||
|
@ -80,7 +80,7 @@ public class ProcessStopFaultWorker implements TaskWorker {
|
||||||
id, javaProcessName, signalName);
|
id, javaProcessName, signalName);
|
||||||
} else {
|
} else {
|
||||||
log.info("{}: sending {} to {} pid(s) {}",
|
log.info("{}: sending {} to {} pid(s) {}",
|
||||||
id, signalName, javaProcessName, Utils.join(pids, ", "));
|
id, signalName, javaProcessName, pids.stream().map(Object::toString).collect(Collectors.joining(",")));
|
||||||
for (Integer pid : pids) {
|
for (Integer pid : pids) {
|
||||||
platform.runCommand(new String[] {"kill", "-" + signalName, pid.toString()});
|
platform.runCommand(new String[] {"kill", "-" + signalName, pid.toString()});
|
||||||
}
|
}
|
||||||
|
|
|
@ -64,6 +64,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.locks.Condition;
|
import java.util.concurrent.locks.Condition;
|
||||||
import java.util.concurrent.locks.Lock;
|
import java.util.concurrent.locks.Lock;
|
||||||
import java.util.concurrent.locks.ReentrantLock;
|
import java.util.concurrent.locks.ReentrantLock;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
|
||||||
public class RoundTripWorker implements TaskWorker {
|
public class RoundTripWorker implements TaskWorker {
|
||||||
private static final int THROTTLE_PERIOD_MS = 100;
|
private static final int THROTTLE_PERIOD_MS = 100;
|
||||||
|
@ -318,7 +320,7 @@ public class RoundTripWorker implements TaskWorker {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
log.info("{}: consumer waiting for {} message(s), starting with: {}",
|
log.info("{}: consumer waiting for {} message(s), starting with: {}",
|
||||||
id, numToReceive, Utils.join(list, ", "));
|
id, numToReceive, list.stream().map(Object::toString).collect(Collectors.joining(", ")));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -18,8 +18,6 @@
|
||||||
package org.apache.kafka.trogdor.basic;
|
package org.apache.kafka.trogdor.basic;
|
||||||
|
|
||||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
|
||||||
import org.apache.kafka.common.utils.Utils;
|
|
||||||
import org.apache.kafka.test.TestUtils;
|
import org.apache.kafka.test.TestUtils;
|
||||||
import org.apache.kafka.trogdor.common.Platform;
|
import org.apache.kafka.trogdor.common.Platform;
|
||||||
|
|
||||||
|
@ -57,7 +55,7 @@ public class BasicPlatformTest {
|
||||||
Platform platform = Platform.Config.parse("bob01", configFile.getPath());
|
Platform platform = Platform.Config.parse("bob01", configFile.getPath());
|
||||||
assertEquals("BasicPlatform", platform.name());
|
assertEquals("BasicPlatform", platform.name());
|
||||||
assertEquals(2, platform.topology().nodes().size());
|
assertEquals(2, platform.topology().nodes().size());
|
||||||
assertEquals("bob01, bob02", Utils.join(platform.topology().nodes().keySet(), ", "));
|
assertEquals("bob01, bob02", String.join(", ", platform.topology().nodes().keySet()));
|
||||||
} finally {
|
} finally {
|
||||||
Files.delete(configFile.toPath());
|
Files.delete(configFile.toPath());
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,7 +17,6 @@
|
||||||
|
|
||||||
package org.apache.kafka.trogdor.common;
|
package org.apache.kafka.trogdor.common;
|
||||||
|
|
||||||
import org.apache.kafka.common.utils.Utils;
|
|
||||||
import org.apache.kafka.trogdor.basic.BasicPlatform;
|
import org.apache.kafka.trogdor.basic.BasicPlatform;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
@ -45,11 +44,11 @@ public class CapturingCommandRunner implements BasicPlatform.CommandRunner {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String run(Node curNode, String[] command) {
|
public String run(Node curNode, String[] command) {
|
||||||
String line = Utils.join(command, " ");
|
String line = String.join(" ", command);
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
getOrCreate(curNode.name()).add(line);
|
getOrCreate(curNode.name()).add(line);
|
||||||
}
|
}
|
||||||
log.debug("RAN {}: {}", curNode, Utils.join(command, " "));
|
log.debug("RAN {}: {}", curNode, String.join(" ", command));
|
||||||
return "";
|
return "";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -19,7 +19,6 @@ package org.apache.kafka.trogdor.common;
|
||||||
|
|
||||||
import org.apache.kafka.common.utils.Scheduler;
|
import org.apache.kafka.common.utils.Scheduler;
|
||||||
import org.apache.kafka.common.utils.ThreadUtils;
|
import org.apache.kafka.common.utils.ThreadUtils;
|
||||||
import org.apache.kafka.common.utils.Utils;
|
|
||||||
import org.apache.kafka.trogdor.agent.Agent;
|
import org.apache.kafka.trogdor.agent.Agent;
|
||||||
import org.apache.kafka.trogdor.agent.AgentClient;
|
import org.apache.kafka.trogdor.agent.AgentClient;
|
||||||
import org.apache.kafka.trogdor.agent.AgentRestResource;
|
import org.apache.kafka.trogdor.agent.AgentRestResource;
|
||||||
|
@ -134,7 +133,7 @@ public class MiniTrogdorCluster implements AutoCloseable {
|
||||||
*/
|
*/
|
||||||
public MiniTrogdorCluster build() throws Exception {
|
public MiniTrogdorCluster build() throws Exception {
|
||||||
log.info("Creating MiniTrogdorCluster with agents: {} and coordinator: {}",
|
log.info("Creating MiniTrogdorCluster with agents: {} and coordinator: {}",
|
||||||
Utils.join(agentNames, ", "), coordinatorName);
|
String.join(", ", agentNames), coordinatorName);
|
||||||
TreeMap<String, NodeData> nodes = new TreeMap<>();
|
TreeMap<String, NodeData> nodes = new TreeMap<>();
|
||||||
for (String agentName : agentNames) {
|
for (String agentName : agentNames) {
|
||||||
NodeData node = getOrCreate(agentName, nodes);
|
NodeData node = getOrCreate(agentName, nodes);
|
||||||
|
|
|
@ -23,7 +23,6 @@ import com.fasterxml.jackson.databind.node.TextNode;
|
||||||
import org.apache.kafka.common.utils.MockScheduler;
|
import org.apache.kafka.common.utils.MockScheduler;
|
||||||
import org.apache.kafka.common.utils.MockTime;
|
import org.apache.kafka.common.utils.MockTime;
|
||||||
import org.apache.kafka.common.utils.Scheduler;
|
import org.apache.kafka.common.utils.Scheduler;
|
||||||
import org.apache.kafka.common.utils.Utils;
|
|
||||||
import org.apache.kafka.test.TestUtils;
|
import org.apache.kafka.test.TestUtils;
|
||||||
import org.apache.kafka.trogdor.agent.AgentClient;
|
import org.apache.kafka.trogdor.agent.AgentClient;
|
||||||
import org.apache.kafka.trogdor.common.CapturingCommandRunner;
|
import org.apache.kafka.trogdor.common.CapturingCommandRunner;
|
||||||
|
@ -361,7 +360,7 @@ public class CoordinatorTest {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return Utils.join(expectedLines, ", ");
|
return String.join(", ", expectedLines);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue