mirror of https://github.com/apache/kafka.git
MINOR: Cleanup Trogdor Module (#20214)
Now that Kafka support Java 17, this PR makes some changes in `trogdor` module. The changes mostly include: - Collections.emptyList(), Collections.singletonList() and Arrays.asList() are replaced with List.of() - Collections.emptyMap() and Collections.singletonMap() are replaced with Map.of() - Collections.singleton() is replaced with Set.of() Some minor cleanups around use of enhanced switch blocks and conversion of classes to record classes. Reviewers: Ken Huang <s7133700@gmail.com>, Vincent Jiang <vpotucek@me.com>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
parent
ddab943b0b
commit
cfe483b728
|
@ -45,7 +45,6 @@ import org.slf4j.LoggerFactory;
|
|||
import java.time.OffsetDateTime;
|
||||
import java.time.ZoneOffset;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
|
@ -275,9 +274,7 @@ public class AgentClient {
|
|||
System.out.printf("\tStart time: %s%n",
|
||||
dateString(status.serverStartMs(), localOffset));
|
||||
List<List<String>> lines = new ArrayList<>();
|
||||
List<String> header = new ArrayList<>(
|
||||
Arrays.asList("WORKER_ID", "TASK_ID", "STATE", "TASK_TYPE"));
|
||||
lines.add(header);
|
||||
lines.add(List.of("WORKER_ID", "TASK_ID", "STATE", "TASK_TYPE"));
|
||||
for (Map.Entry<Long, WorkerState> entry : status.workers().entrySet()) {
|
||||
List<String> cols = new ArrayList<>();
|
||||
cols.add(Long.toString(entry.getKey()));
|
||||
|
|
|
@ -21,7 +21,6 @@ import org.apache.kafka.trogdor.common.Node;
|
|||
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
|
@ -46,7 +45,7 @@ public class BasicNode implements Node {
|
|||
public BasicNode(String name, JsonNode root) {
|
||||
this.name = name;
|
||||
String hostname = "localhost";
|
||||
Set<String> tags = Collections.emptySet();
|
||||
Set<String> tags = new HashSet<>();
|
||||
Map<String, String> config = new HashMap<>();
|
||||
for (Map.Entry<String, JsonNode> entry : root.properties()) {
|
||||
String key = entry.getKey();
|
||||
|
|
|
@ -102,9 +102,7 @@ public class StringFormatter {
|
|||
String val = cols.get(x);
|
||||
int minWidth = widths.get(x);
|
||||
bld.append(val);
|
||||
for (int i = 0; i < minWidth - val.length(); i++) {
|
||||
bld.append(" ");
|
||||
}
|
||||
bld.append(" ".repeat(Math.max(0, minWidth - val.length())));
|
||||
}
|
||||
bld.append(String.format("%n"));
|
||||
}
|
||||
|
|
|
@ -55,7 +55,6 @@ import org.slf4j.LoggerFactory;
|
|||
import java.time.OffsetDateTime;
|
||||
import java.time.ZoneOffset;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
@ -471,9 +470,7 @@ public class CoordinatorClient {
|
|||
return "No matching tasks found.";
|
||||
}
|
||||
List<List<String>> lines = new ArrayList<>();
|
||||
List<String> header = new ArrayList<>(
|
||||
Arrays.asList("ID", "TYPE", "STATE", "INFO"));
|
||||
lines.add(header);
|
||||
lines.add(List.of("ID", "TYPE", "STATE", "INFO"));
|
||||
for (Map.Entry<String, TaskState> entry : response.tasks().entrySet()) {
|
||||
String taskId = entry.getKey();
|
||||
TaskState taskState = entry.getValue();
|
||||
|
|
|
@ -256,17 +256,12 @@ public final class TaskManager {
|
|||
}
|
||||
|
||||
TaskState taskState() {
|
||||
switch (state) {
|
||||
case PENDING:
|
||||
return new TaskPending(spec);
|
||||
case RUNNING:
|
||||
return new TaskRunning(spec, startedMs, getCombinedStatus());
|
||||
case STOPPING:
|
||||
return new TaskStopping(spec, startedMs, getCombinedStatus());
|
||||
case DONE:
|
||||
return new TaskDone(spec, startedMs, doneMs, error, cancelled, getCombinedStatus());
|
||||
}
|
||||
throw new RuntimeException("unreachable");
|
||||
return switch (state) {
|
||||
case PENDING -> new TaskPending(spec);
|
||||
case RUNNING -> new TaskRunning(spec, startedMs, getCombinedStatus());
|
||||
case STOPPING -> new TaskStopping(spec, startedMs, getCombinedStatus());
|
||||
case DONE -> new TaskDone(spec, startedMs, doneMs, error, cancelled, getCombinedStatus());
|
||||
};
|
||||
}
|
||||
|
||||
private JsonNode getCombinedStatus() {
|
||||
|
|
|
@ -24,7 +24,6 @@ import org.apache.kafka.trogdor.task.TaskWorker;
|
|||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
|
||||
public class DegradedNetworkFaultSpec extends TaskSpec {
|
||||
|
@ -75,7 +74,7 @@ public class DegradedNetworkFaultSpec extends TaskSpec {
|
|||
@JsonProperty("durationMs") long durationMs,
|
||||
@JsonProperty("nodeSpecs") Map<String, NodeDegradeSpec> nodeSpecs) {
|
||||
super(startMs, durationMs);
|
||||
this.nodeSpecs = nodeSpecs == null ? Collections.emptyMap() : Collections.unmodifiableMap(nodeSpecs);
|
||||
this.nodeSpecs = nodeSpecs == null ? Map.of() : Map.copyOf(nodeSpecs);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -29,7 +29,6 @@ import java.nio.file.Files;
|
|||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.TreeMap;
|
||||
|
@ -123,7 +122,7 @@ public final class Kibosh {
|
|||
private final List<KiboshFaultSpec> faults;
|
||||
|
||||
public static final KiboshControlFile EMPTY =
|
||||
new KiboshControlFile(Collections.emptyList());
|
||||
new KiboshControlFile(List.of());
|
||||
|
||||
public static KiboshControlFile read(Path controlPath) throws IOException {
|
||||
byte[] controlFileBytes = Files.readAllBytes(controlPath);
|
||||
|
|
|
@ -72,9 +72,7 @@ public class NetworkPartitionFaultWorker implements TaskWorker {
|
|||
TreeSet<String> toBlock = new TreeSet<>();
|
||||
for (Set<String> partitionSet : partitionSets) {
|
||||
if (!partitionSet.contains(curNode.name())) {
|
||||
for (String nodeName : partitionSet) {
|
||||
toBlock.add(nodeName);
|
||||
}
|
||||
toBlock.addAll(partitionSet);
|
||||
}
|
||||
}
|
||||
for (String nodeName : toBlock) {
|
||||
|
|
|
@ -22,15 +22,10 @@ import org.apache.kafka.trogdor.common.JsonUtil;
|
|||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* An error response.
|
||||
*/
|
||||
public class ErrorResponse {
|
||||
private final int code;
|
||||
private final String message;
|
||||
|
||||
public record ErrorResponse(int code, String message) {
|
||||
@JsonCreator
|
||||
public ErrorResponse(@JsonProperty("code") int code,
|
||||
@JsonProperty("message") String message) {
|
||||
|
@ -38,30 +33,18 @@ public class ErrorResponse {
|
|||
this.message = message;
|
||||
}
|
||||
|
||||
@Override
|
||||
@JsonProperty
|
||||
public int code() {
|
||||
return code;
|
||||
}
|
||||
|
||||
@Override
|
||||
@JsonProperty
|
||||
public String message() {
|
||||
return message;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) return true;
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
ErrorResponse that = (ErrorResponse) o;
|
||||
return Objects.equals(code, that.code) &&
|
||||
Objects.equals(message, that.message);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(code, message);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return JsonUtil.toJsonString(this);
|
||||
|
|
|
@ -23,16 +23,9 @@ import com.fasterxml.jackson.annotation.JsonProperty;
|
|||
/**
|
||||
* The request to /coordinator/tasks/{taskId}
|
||||
*/
|
||||
public class TaskRequest {
|
||||
private final String taskId;
|
||||
|
||||
public record TaskRequest(String taskId) {
|
||||
@JsonCreator
|
||||
public TaskRequest(@JsonProperty("taskId") String taskId) {
|
||||
this.taskId = taskId == null ? "" : taskId;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public String taskId() {
|
||||
return taskId;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,8 +21,6 @@ import com.fasterxml.jackson.annotation.JsonCreator;
|
|||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
|
||||
|
@ -69,8 +67,7 @@ public class TasksRequest extends Message {
|
|||
@JsonProperty("firstEndMs") long firstEndMs,
|
||||
@JsonProperty("lastEndMs") long lastEndMs,
|
||||
@JsonProperty("state") Optional<TaskStateType> state) {
|
||||
this.taskIds = Collections.unmodifiableSet((taskIds == null) ?
|
||||
new HashSet<>() : new HashSet<>(taskIds));
|
||||
this.taskIds = taskIds == null ? Set.of() : Set.copyOf(taskIds);
|
||||
this.firstStartMs = Math.max(0, firstStartMs);
|
||||
this.lastStartMs = Math.max(0, lastStartMs);
|
||||
this.firstEndMs = Math.max(0, firstEndMs);
|
||||
|
|
|
@ -20,7 +20,6 @@ package org.apache.kafka.trogdor.rest;
|
|||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
import java.util.TreeMap;
|
||||
|
||||
|
@ -32,7 +31,7 @@ public class TasksResponse extends Message {
|
|||
|
||||
@JsonCreator
|
||||
public TasksResponse(@JsonProperty("tasks") TreeMap<String, TaskState> tasks) {
|
||||
this.tasks = Collections.unmodifiableMap((tasks == null) ? new TreeMap<>() : tasks);
|
||||
this.tasks = tasks == null ? Map.of() : Map.copyOf(tasks);
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
|
|
|
@ -22,7 +22,6 @@ import org.apache.kafka.trogdor.common.JsonUtil;
|
|||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.fasterxml.jackson.annotation.JsonTypeInfo;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
|
||||
|
@ -112,6 +111,6 @@ public abstract class TaskSpec {
|
|||
}
|
||||
|
||||
protected static Map<String, String> configOrEmptyMap(Map<String, String> config) {
|
||||
return (config == null) ? Collections.emptyMap() : config;
|
||||
return config == null ? Map.of() : config;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -24,9 +24,9 @@ import org.apache.kafka.trogdor.task.TaskWorker;
|
|||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* This is the spec to pass in to be able to run the `ConfigurableProducerWorker` workload. This allows for customized
|
||||
|
@ -206,7 +206,7 @@ public final class ConfigurableProducerSpec extends TaskSpec {
|
|||
|
||||
@Override
|
||||
public TaskController newController(String id) {
|
||||
return topology -> Collections.singleton(producerNode);
|
||||
return topology -> Set.of(producerNode);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -24,7 +24,6 @@ import org.apache.kafka.trogdor.task.TaskWorker;
|
|||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.TreeSet;
|
||||
|
@ -56,8 +55,7 @@ public final class ConnectionStressSpec extends TaskSpec {
|
|||
@JsonProperty("numThreads") int numThreads,
|
||||
@JsonProperty("action") ConnectionStressAction action) {
|
||||
super(startMs, durationMs);
|
||||
this.clientNodes = (clientNodes == null) ? Collections.emptyList() :
|
||||
List.copyOf(clientNodes);
|
||||
this.clientNodes = clientNodes == null ? List.of() : List.copyOf(clientNodes);
|
||||
this.bootstrapServers = (bootstrapServers == null) ? "" : bootstrapServers;
|
||||
this.commonClientConf = configOrEmptyMap(commonClientConf);
|
||||
this.targetConnectionsPerSec = targetConnectionsPerSec;
|
||||
|
|
|
@ -129,13 +129,10 @@ public class ConnectionStressWorker implements TaskWorker {
|
|||
|
||||
interface Stressor extends AutoCloseable {
|
||||
static Stressor fromSpec(ConnectionStressSpec spec) {
|
||||
switch (spec.action()) {
|
||||
case CONNECT:
|
||||
return new ConnectStressor(spec);
|
||||
case FETCH_METADATA:
|
||||
return new FetchMetadataStressor(spec);
|
||||
}
|
||||
throw new RuntimeException("invalid spec.action " + spec.action());
|
||||
return switch (spec.action()) {
|
||||
case CONNECT -> new ConnectStressor(spec);
|
||||
case FETCH_METADATA -> new FetchMetadataStressor(spec);
|
||||
};
|
||||
}
|
||||
|
||||
boolean tryConnect();
|
||||
|
|
|
@ -29,7 +29,6 @@ import com.fasterxml.jackson.annotation.JsonProperty;
|
|||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
|
@ -189,7 +188,7 @@ public final class ConsumeBenchSpec extends TaskSpec {
|
|||
|
||||
@Override
|
||||
public TaskController newController(String id) {
|
||||
return topology -> Collections.singleton(consumerNode);
|
||||
return topology -> Set.of(consumerNode);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -58,7 +58,6 @@ import java.util.concurrent.ScheduledExecutorService;
|
|||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
|
||||
public class ConsumeBenchWorker implements TaskWorker {
|
||||
|
@ -132,7 +131,7 @@ public class ConsumeBenchWorker implements TaskWorker {
|
|||
}
|
||||
} else {
|
||||
List<TopicPartition> partitions = populatePartitionsByTopic(consumer.consumer(), partitionsByTopic)
|
||||
.values().stream().flatMap(List::stream).collect(Collectors.toList());
|
||||
.values().stream().flatMap(List::stream).toList();
|
||||
tasks.add(new ConsumeMessages(consumer, spec.recordProcessor(), partitions));
|
||||
|
||||
for (int i = 0; i < consumerCount - 1; i++) {
|
||||
|
@ -182,7 +181,7 @@ public class ConsumeBenchWorker implements TaskWorker {
|
|||
if (partitions.isEmpty()) {
|
||||
List<TopicPartition> fetchedPartitions = consumer.partitionsFor(topicName).stream()
|
||||
.map(partitionInfo -> new TopicPartition(partitionInfo.topic(), partitionInfo.partition()))
|
||||
.collect(Collectors.toList());
|
||||
.toList();
|
||||
partitions.addAll(fetchedPartitions);
|
||||
}
|
||||
|
||||
|
@ -550,7 +549,7 @@ public class ConsumeBenchWorker implements TaskWorker {
|
|||
this.consumerLock.lock();
|
||||
try {
|
||||
return consumer.assignment().stream()
|
||||
.map(TopicPartition::toString).collect(Collectors.toList());
|
||||
.map(TopicPartition::toString).toList();
|
||||
} finally {
|
||||
this.consumerLock.unlock();
|
||||
}
|
||||
|
|
|
@ -26,10 +26,9 @@ import com.fasterxml.jackson.annotation.JsonProperty;
|
|||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.fasterxml.jackson.databind.node.NullNode;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* ExternalCommandSpec describes a task that executes Trogdor tasks with the command.
|
||||
|
@ -78,7 +77,7 @@ public class ExternalCommandSpec extends TaskSpec {
|
|||
@JsonProperty("shutdownGracePeriodMs") Optional<Integer> shutdownGracePeriodMs) {
|
||||
super(startMs, durationMs);
|
||||
this.commandNode = (commandNode == null) ? "" : commandNode;
|
||||
this.command = (command == null) ? Collections.unmodifiableList(new ArrayList<>()) : command;
|
||||
this.command = (command == null) ? List.of() : command;
|
||||
this.workload = (workload == null) ? NullNode.instance : workload;
|
||||
this.shutdownGracePeriodMs = shutdownGracePeriodMs;
|
||||
}
|
||||
|
@ -105,7 +104,7 @@ public class ExternalCommandSpec extends TaskSpec {
|
|||
|
||||
@Override
|
||||
public TaskController newController(String id) {
|
||||
return topology -> Collections.singleton(commandNode);
|
||||
return topology -> Set.of(commandNode);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -18,7 +18,6 @@
|
|||
package org.apache.kafka.trogdor.workload;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
|
@ -175,7 +174,7 @@ public class Histogram {
|
|||
private List<PercentileSummary> summarizePercentiles(int[] countsCopy, float[] percentiles,
|
||||
long numSamples) {
|
||||
if (percentiles.length == 0) {
|
||||
return Collections.emptyList();
|
||||
return List.of();
|
||||
}
|
||||
List<PercentileSummary> summaries = new ArrayList<>(percentiles.length);
|
||||
int i = 0, j = 0;
|
||||
|
|
|
@ -24,7 +24,6 @@ import com.fasterxml.jackson.annotation.JsonCreator;
|
|||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
@ -59,12 +58,12 @@ public class PartitionsSpec extends Message {
|
|||
assignments.add(brokerId == null ? Integer.valueOf(0) : brokerId);
|
||||
}
|
||||
}
|
||||
partMap.put(partition, Collections.unmodifiableList(assignments));
|
||||
partMap.put(partition, List.copyOf(assignments));
|
||||
}
|
||||
}
|
||||
this.partitionAssignments = Collections.unmodifiableMap(partMap);
|
||||
this.partitionAssignments = Map.copyOf(partMap);
|
||||
if (configs == null) {
|
||||
this.configs = Collections.emptyMap();
|
||||
this.configs = Map.of();
|
||||
} else {
|
||||
this.configs = Map.copyOf(configs);
|
||||
}
|
||||
|
|
|
@ -24,9 +24,9 @@ import org.apache.kafka.trogdor.task.TaskWorker;
|
|||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* The specification for a benchmark that produces messages to a set of topics.
|
||||
|
@ -185,7 +185,7 @@ public final class ProduceBenchSpec extends TaskSpec {
|
|||
|
||||
@Override
|
||||
public TaskController newController(String id) {
|
||||
return topology -> Collections.singleton(producerNode);
|
||||
return topology -> Set.of(producerNode);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -22,27 +22,13 @@ import com.fasterxml.jackson.annotation.JsonProperty;
|
|||
|
||||
/**
|
||||
* Contains a percent value represented as an integer between 1 and 100 and a PayloadGenerator to specify
|
||||
* how often that PayloadGenerator should be used.
|
||||
* how often that PayloadGenerator should be used.
|
||||
*/
|
||||
public class RandomComponent {
|
||||
private final int percent;
|
||||
private final PayloadGenerator component;
|
||||
|
||||
|
||||
public record RandomComponent(int percent, PayloadGenerator component) {
|
||||
@JsonCreator
|
||||
public RandomComponent(@JsonProperty("percent") int percent,
|
||||
@JsonProperty("component") PayloadGenerator component) {
|
||||
this.percent = percent;
|
||||
this.component = component;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public int percent() {
|
||||
return percent;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public PayloadGenerator component() {
|
||||
return component;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -24,8 +24,8 @@ import org.apache.kafka.trogdor.task.TaskWorker;
|
|||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* The specification for a workload that sends messages to a broker and then
|
||||
|
@ -123,7 +123,7 @@ public class RoundTripWorkloadSpec extends TaskSpec {
|
|||
|
||||
@Override
|
||||
public TaskController newController(String id) {
|
||||
return topology -> Collections.singleton(clientNode);
|
||||
return topology -> Set.of(clientNode);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -27,7 +27,6 @@ import com.fasterxml.jackson.annotation.JsonProperty;
|
|||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
@ -176,7 +175,7 @@ public final class ShareConsumeBenchSpec extends TaskSpec {
|
|||
|
||||
@Override
|
||||
public TaskController newController(String id) {
|
||||
return topology -> Collections.singleton(consumerNode);
|
||||
return topology -> Set.of(consumerNode);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -24,8 +24,8 @@ import org.apache.kafka.trogdor.task.TaskWorker;
|
|||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* The specification for a benchmark that creates sustained connections.
|
||||
|
@ -187,7 +187,7 @@ public final class SustainedConnectionSpec extends TaskSpec {
|
|||
}
|
||||
|
||||
public TaskController newController(String id) {
|
||||
return topology -> Collections.singleton(clientNode);
|
||||
return topology -> Set.of(clientNode);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -343,15 +343,15 @@ public class SustainedConnectionWorker implements TaskWorker {
|
|||
this.consumer = new KafkaConsumer<>(this.props, new ByteArrayDeserializer(), new ByteArrayDeserializer());
|
||||
List<TopicPartition> partitions = this.consumer.partitionsFor(this.topicName).stream()
|
||||
.map(partitionInfo -> new TopicPartition(partitionInfo.topic(), partitionInfo.partition()))
|
||||
.collect(Collectors.toList());
|
||||
.toList();
|
||||
|
||||
// Select a random partition and assign it.
|
||||
this.activePartition = partitions.get(this.rand.nextInt(partitions.size()));
|
||||
this.consumer.assign(Collections.singletonList(this.activePartition));
|
||||
this.consumer.assign(List.of(this.activePartition));
|
||||
}
|
||||
|
||||
// The behavior when passing in an empty list is to seek to the end of all subscribed partitions.
|
||||
this.consumer.seekToEnd(Collections.emptyList());
|
||||
this.consumer.seekToEnd(List.of());
|
||||
|
||||
// Poll to keep the connection alive, ignoring any records returned.
|
||||
this.consumer.poll(Duration.ofMillis(50));
|
||||
|
|
|
@ -24,7 +24,6 @@ import com.fasterxml.jackson.annotation.JsonAnyGetter;
|
|||
import com.fasterxml.jackson.annotation.JsonAnySetter;
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
|
@ -68,9 +67,7 @@ public class TopicsSpec extends Message {
|
|||
}
|
||||
|
||||
public TopicsSpec immutableCopy() {
|
||||
HashMap<String, PartitionsSpec> mapCopy = new HashMap<>();
|
||||
mapCopy.putAll(map);
|
||||
return new TopicsSpec(Collections.unmodifiableMap(mapCopy));
|
||||
return new TopicsSpec(Map.copyOf(map));
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -61,11 +61,12 @@ import java.io.PrintStream;
|
|||
import java.nio.charset.StandardCharsets;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.TreeMap;
|
||||
|
||||
import static java.util.Arrays.asList;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
|
@ -80,7 +81,7 @@ public class AgentTest {
|
|||
HashMap<String, String> config = new HashMap<>();
|
||||
config.put(Platform.Config.TROGDOR_AGENT_PORT, Integer.toString(Agent.DEFAULT_PORT));
|
||||
nodes.put("node01", new BasicNode("node01", "localhost",
|
||||
config, Collections.emptySet()));
|
||||
config, Set.of()));
|
||||
BasicTopology topology = new BasicTopology(nodes);
|
||||
return new BasicPlatform("node01", topology,
|
||||
scheduler, new BasicPlatform.ShellCommandRunner());
|
||||
|
@ -153,7 +154,7 @@ public class AgentTest {
|
|||
maxTries(10).target("localhost", agent.port()).build();
|
||||
AgentStatusResponse status = client.status();
|
||||
|
||||
assertEquals(Collections.emptyMap(), status.workers());
|
||||
assertEquals(Map.of(), status.workers());
|
||||
new ExpectedTasks().waitFor(client);
|
||||
|
||||
final NoOpTaskSpec fooSpec = new NoOpTaskSpec(10, 10);
|
||||
|
@ -191,7 +192,7 @@ public class AgentTest {
|
|||
AgentClient client = new AgentClient.Builder().
|
||||
maxTries(10).target("localhost", agent.port()).build();
|
||||
AgentStatusResponse status = client.status();
|
||||
assertEquals(Collections.emptyMap(), status.workers());
|
||||
assertEquals(Map.of(), status.workers());
|
||||
new ExpectedTasks().waitFor(client);
|
||||
|
||||
final NoOpTaskSpec fooSpec = new NoOpTaskSpec(1000, 600000);
|
||||
|
@ -304,7 +305,7 @@ public class AgentTest {
|
|||
new ExpectedTasks().waitFor(client);
|
||||
|
||||
SampleTaskSpec fooSpec = new SampleTaskSpec(0, 900000,
|
||||
Collections.singletonMap("node01", 1L), "");
|
||||
Map.of("node01", 1L), "");
|
||||
client.createWorker(new CreateWorkerRequest(0, "foo", fooSpec));
|
||||
new ExpectedTasks().
|
||||
addTask(new ExpectedTaskBuilder("foo").
|
||||
|
@ -313,7 +314,7 @@ public class AgentTest {
|
|||
waitFor(client);
|
||||
|
||||
SampleTaskSpec barSpec = new SampleTaskSpec(0, 900000,
|
||||
Collections.singletonMap("node01", 2L), "baz");
|
||||
Map.of("node01", 2L), "baz");
|
||||
client.createWorker(new CreateWorkerRequest(1, "bar", barSpec));
|
||||
|
||||
time.sleep(1);
|
||||
|
@ -373,17 +374,17 @@ public class AgentTest {
|
|||
try (MockKibosh mockKibosh = new MockKibosh()) {
|
||||
assertEquals(KiboshControlFile.EMPTY, mockKibosh.read());
|
||||
FilesUnreadableFaultSpec fooSpec = new FilesUnreadableFaultSpec(0, 900000,
|
||||
Collections.singleton("myAgent"), mockKibosh.tempDir.getPath(), "/foo", 123);
|
||||
Set.of("myAgent"), mockKibosh.tempDir.getPath(), "/foo", 123);
|
||||
client.createWorker(new CreateWorkerRequest(0, "foo", fooSpec));
|
||||
new ExpectedTasks().
|
||||
addTask(new ExpectedTaskBuilder("foo").
|
||||
workerState(new WorkerRunning("foo", fooSpec, 0, new TextNode("Added fault foo"))).
|
||||
build()).
|
||||
waitFor(client);
|
||||
assertEquals(new KiboshControlFile(Collections.singletonList(
|
||||
assertEquals(new KiboshControlFile(List.of(
|
||||
new KiboshFilesUnreadableFaultSpec("/foo", 123))), mockKibosh.read());
|
||||
FilesUnreadableFaultSpec barSpec = new FilesUnreadableFaultSpec(0, 900000,
|
||||
Collections.singleton("myAgent"), mockKibosh.tempDir.getPath(), "/bar", 456);
|
||||
Set.of("myAgent"), mockKibosh.tempDir.getPath(), "/bar", 456);
|
||||
client.createWorker(new CreateWorkerRequest(1, "bar", barSpec));
|
||||
new ExpectedTasks().
|
||||
addTask(new ExpectedTaskBuilder("foo").
|
||||
|
@ -391,7 +392,7 @@ public class AgentTest {
|
|||
addTask(new ExpectedTaskBuilder("bar").
|
||||
workerState(new WorkerRunning("bar", barSpec, 0, new TextNode("Added fault bar"))).build()).
|
||||
waitFor(client);
|
||||
assertEquals(new KiboshControlFile(asList(
|
||||
assertEquals(new KiboshControlFile(List.of(
|
||||
new KiboshFilesUnreadableFaultSpec("/foo", 123),
|
||||
new KiboshFilesUnreadableFaultSpec("/bar", 456))
|
||||
), mockKibosh.read());
|
||||
|
@ -403,7 +404,7 @@ public class AgentTest {
|
|||
addTask(new ExpectedTaskBuilder("bar").
|
||||
workerState(new WorkerRunning("bar", barSpec, 0, new TextNode("Added fault bar"))).build()).
|
||||
waitFor(client);
|
||||
assertEquals(new KiboshControlFile(Collections.singletonList(
|
||||
assertEquals(new KiboshControlFile(List.of(
|
||||
new KiboshFilesUnreadableFaultSpec("/bar", 456))), mockKibosh.read());
|
||||
}
|
||||
}
|
||||
|
@ -476,7 +477,7 @@ public class AgentTest {
|
|||
public void testAgentExecWithNormalExit() throws Exception {
|
||||
Agent agent = createAgent(Scheduler.SYSTEM);
|
||||
SampleTaskSpec spec = new SampleTaskSpec(0, 120000,
|
||||
Collections.singletonMap("node01", 1L), "");
|
||||
Map.of("node01", 1L), "");
|
||||
TaskSpec rebasedSpec = agent.rebaseTaskSpecTime(spec);
|
||||
testExec(agent,
|
||||
String.format("Waiting for completion of task:%s%n",
|
||||
|
|
|
@ -35,7 +35,6 @@ import org.junit.jupiter.api.Test;
|
|||
|
||||
import java.lang.reflect.Field;
|
||||
import java.lang.reflect.Modifier;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
@ -63,8 +62,8 @@ public class JsonSerializationTest {
|
|||
verify(new TopicsSpec());
|
||||
verify(new PartitionsSpec(0, (short) 0, null, null));
|
||||
Map<Integer, List<Integer>> partitionAssignments = new HashMap<>();
|
||||
partitionAssignments.put(0, Arrays.asList(1, 2, 3));
|
||||
partitionAssignments.put(1, Arrays.asList(1, 2, 3));
|
||||
partitionAssignments.put(0, List.of(1, 2, 3));
|
||||
partitionAssignments.put(1, List.of(1, 2, 3));
|
||||
verify(new PartitionsSpec(0, (short) 0, partitionAssignments, null));
|
||||
verify(new PartitionsSpec(0, (short) 0, null, null));
|
||||
}
|
||||
|
|
|
@ -45,10 +45,7 @@ public class JsonUtilTest {
|
|||
assertFalse(JsonUtil.openBraceComesFirst(" blah{}"));
|
||||
}
|
||||
|
||||
static final class Foo {
|
||||
@JsonProperty
|
||||
final int bar;
|
||||
|
||||
record Foo(@JsonProperty int bar) {
|
||||
@JsonCreator
|
||||
Foo(@JsonProperty("bar") int bar) {
|
||||
this.bar = bar;
|
||||
|
|
|
@ -33,9 +33,9 @@ import org.apache.kafka.trogdor.rest.JsonRestServer;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.TreeMap;
|
||||
import java.util.TreeSet;
|
||||
import java.util.concurrent.Callable;
|
||||
|
@ -160,8 +160,7 @@ public class MiniTrogdorCluster implements AutoCloseable {
|
|||
config.put(Platform.Config.TROGDOR_COORDINATOR_PORT,
|
||||
Integer.toString(node.coordinatorPort));
|
||||
}
|
||||
node.node = new BasicNode(entry.getKey(), node.hostname, config,
|
||||
Collections.emptySet());
|
||||
node.node = new BasicNode(entry.getKey(), node.hostname, config, Set.of());
|
||||
}
|
||||
TreeMap<String, Node> topologyNodes = new TreeMap<>();
|
||||
for (Map.Entry<String, NodeData> entry : nodes.entrySet()) {
|
||||
|
|
|
@ -20,7 +20,6 @@ package org.apache.kafka.trogdor.common;
|
|||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.Timeout;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.Set;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
|
@ -30,9 +29,9 @@ public class StringExpanderTest {
|
|||
|
||||
@Test
|
||||
public void testNoExpansionNeeded() {
|
||||
assertEquals(Collections.singleton("foo"), StringExpander.expand("foo"));
|
||||
assertEquals(Collections.singleton("bar"), StringExpander.expand("bar"));
|
||||
assertEquals(Collections.singleton(""), StringExpander.expand(""));
|
||||
assertEquals(Set.of("foo"), StringExpander.expand("foo"));
|
||||
assertEquals(Set.of("bar"), StringExpander.expand("bar"));
|
||||
assertEquals(Set.of(""), StringExpander.expand(""));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -21,7 +21,7 @@ import org.junit.jupiter.api.Test;
|
|||
import org.junit.jupiter.api.Timeout;
|
||||
|
||||
import java.time.ZoneOffset;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
import static org.apache.kafka.trogdor.common.StringFormatter.dateString;
|
||||
import static org.apache.kafka.trogdor.common.StringFormatter.durationString;
|
||||
|
@ -53,8 +53,8 @@ public class StringFormatterTest {
|
|||
"lion 1 12345 %n" +
|
||||
"manatee 50 1 %n"),
|
||||
StringFormatter.prettyPrintGrid(
|
||||
Arrays.asList(Arrays.asList("ANIMAL", "NUMBER", "INDEX"),
|
||||
Arrays.asList("lion", "1", "12345"),
|
||||
Arrays.asList("manatee", "50", "1"))));
|
||||
List.of(List.of("ANIMAL", "NUMBER", "INDEX"),
|
||||
List.of("lion", "1", "12345"),
|
||||
List.of("manatee", "50", "1"))));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -33,9 +33,7 @@ import org.slf4j.Logger;
|
|||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
|
@ -53,8 +51,8 @@ public class WorkerUtilsTest {
|
|||
private final Node broker1 = new Node(0, "testHost-1", 1234);
|
||||
private final Node broker2 = new Node(1, "testHost-2", 1234);
|
||||
private final Node broker3 = new Node(1, "testHost-3", 1234);
|
||||
private final List<Node> cluster = Arrays.asList(broker1, broker2, broker3);
|
||||
private final List<Node> singleReplica = Collections.singletonList(broker1);
|
||||
private final List<Node> cluster = List.of(broker1, broker2, broker3);
|
||||
private final List<Node> singleReplica = List.of(broker1);
|
||||
|
||||
private static final String TEST_TOPIC = "test-topic-1";
|
||||
private static final short TEST_REPLICATION_FACTOR = 1;
|
||||
|
@ -72,17 +70,17 @@ public class WorkerUtilsTest {
|
|||
|
||||
@Test
|
||||
public void testCreateOneTopic() throws Throwable {
|
||||
Map<String, NewTopic> newTopics = Collections.singletonMap(TEST_TOPIC, NEW_TEST_TOPIC);
|
||||
Map<String, NewTopic> newTopics = Map.of(TEST_TOPIC, NEW_TEST_TOPIC);
|
||||
|
||||
WorkerUtils.createTopics(log, adminClient, newTopics, true);
|
||||
assertEquals(Collections.singleton(TEST_TOPIC), adminClient.listTopics().names().get());
|
||||
assertEquals(Set.of(TEST_TOPIC), adminClient.listTopics().names().get());
|
||||
assertEquals(
|
||||
new TopicDescription(
|
||||
TEST_TOPIC, false,
|
||||
Collections.singletonList(
|
||||
new TopicPartitionInfo(0, broker1, singleReplica, Collections.emptyList(), Collections.emptyList(), Collections.emptyList()))),
|
||||
List.of(
|
||||
new TopicPartitionInfo(0, broker1, singleReplica, List.of(), List.of(), List.of()))),
|
||||
adminClient.describeTopics(
|
||||
Collections.singleton(TEST_TOPIC)).topicNameValues().get(TEST_TOPIC).get()
|
||||
Set.of(TEST_TOPIC)).topicNameValues().get(TEST_TOPIC).get()
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -91,21 +89,21 @@ public class WorkerUtilsTest {
|
|||
adminClient.timeoutNextRequest(1);
|
||||
|
||||
WorkerUtils.createTopics(
|
||||
log, adminClient, Collections.singletonMap(TEST_TOPIC, NEW_TEST_TOPIC), true);
|
||||
log, adminClient, Map.of(TEST_TOPIC, NEW_TEST_TOPIC), true);
|
||||
|
||||
assertEquals(
|
||||
new TopicDescription(
|
||||
TEST_TOPIC, false,
|
||||
Collections.singletonList(
|
||||
new TopicPartitionInfo(0, broker1, singleReplica, Collections.emptyList(), Collections.emptyList(), Collections.emptyList()))),
|
||||
List.of(
|
||||
new TopicPartitionInfo(0, broker1, singleReplica, List.of(), List.of(), List.of()))),
|
||||
adminClient.describeTopics(
|
||||
Collections.singleton(TEST_TOPIC)).topicNameValues().get(TEST_TOPIC).get()
|
||||
Set.of(TEST_TOPIC)).topicNameValues().get(TEST_TOPIC).get()
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCreateZeroTopicsDoesNothing() throws Throwable {
|
||||
WorkerUtils.createTopics(log, adminClient, Collections.emptyMap(), true);
|
||||
WorkerUtils.createTopics(log, adminClient, Map.of(), true);
|
||||
assertEquals(0, adminClient.listTopics().names().get().size());
|
||||
}
|
||||
|
||||
|
@ -114,7 +112,7 @@ public class WorkerUtilsTest {
|
|||
adminClient.addTopic(
|
||||
false,
|
||||
TEST_TOPIC,
|
||||
Collections.singletonList(new TopicPartitionInfo(0, broker1, singleReplica, Collections.emptyList())),
|
||||
List.of(new TopicPartitionInfo(0, broker1, singleReplica, List.of())),
|
||||
null);
|
||||
|
||||
Map<String, NewTopic> newTopics = new HashMap<>();
|
||||
|
@ -130,8 +128,8 @@ public class WorkerUtilsTest {
|
|||
@Test
|
||||
public void testExistingTopicsMustHaveRequestedNumberOfPartitions() {
|
||||
List<TopicPartitionInfo> tpInfo = new ArrayList<>();
|
||||
tpInfo.add(new TopicPartitionInfo(0, broker1, singleReplica, Collections.emptyList()));
|
||||
tpInfo.add(new TopicPartitionInfo(1, broker2, singleReplica, Collections.emptyList()));
|
||||
tpInfo.add(new TopicPartitionInfo(0, broker1, singleReplica, List.of()));
|
||||
tpInfo.add(new TopicPartitionInfo(1, broker2, singleReplica, List.of()));
|
||||
adminClient.addTopic(
|
||||
false,
|
||||
TEST_TOPIC,
|
||||
|
@ -139,16 +137,16 @@ public class WorkerUtilsTest {
|
|||
null);
|
||||
|
||||
assertThrows(RuntimeException.class, () -> WorkerUtils.createTopics(
|
||||
log, adminClient, Collections.singletonMap(TEST_TOPIC, NEW_TEST_TOPIC), false));
|
||||
log, adminClient, Map.of(TEST_TOPIC, NEW_TEST_TOPIC), false));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testExistingTopicsNotCreated() throws Throwable {
|
||||
final String existingTopic = "existing-topic";
|
||||
List<TopicPartitionInfo> tpInfo = new ArrayList<>();
|
||||
tpInfo.add(new TopicPartitionInfo(0, broker1, singleReplica, Collections.emptyList()));
|
||||
tpInfo.add(new TopicPartitionInfo(1, broker2, singleReplica, Collections.emptyList()));
|
||||
tpInfo.add(new TopicPartitionInfo(2, broker3, singleReplica, Collections.emptyList()));
|
||||
tpInfo.add(new TopicPartitionInfo(0, broker1, singleReplica, List.of()));
|
||||
tpInfo.add(new TopicPartitionInfo(1, broker2, singleReplica, List.of()));
|
||||
tpInfo.add(new TopicPartitionInfo(2, broker3, singleReplica, List.of()));
|
||||
adminClient.addTopic(
|
||||
false,
|
||||
existingTopic,
|
||||
|
@ -157,11 +155,11 @@ public class WorkerUtilsTest {
|
|||
|
||||
WorkerUtils.createTopics(
|
||||
log, adminClient,
|
||||
Collections.singletonMap(
|
||||
Map.of(
|
||||
existingTopic,
|
||||
new NewTopic(existingTopic, tpInfo.size(), TEST_REPLICATION_FACTOR)), false);
|
||||
|
||||
assertEquals(Collections.singleton(existingTopic), adminClient.listTopics().names().get());
|
||||
assertEquals(Set.of(existingTopic), adminClient.listTopics().names().get());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -170,15 +168,15 @@ public class WorkerUtilsTest {
|
|||
assertEquals(0, adminClient.listTopics().names().get().size());
|
||||
|
||||
WorkerUtils.createTopics(
|
||||
log, adminClient, Collections.singletonMap(TEST_TOPIC, NEW_TEST_TOPIC), false);
|
||||
log, adminClient, Map.of(TEST_TOPIC, NEW_TEST_TOPIC), false);
|
||||
|
||||
assertEquals(Collections.singleton(TEST_TOPIC), adminClient.listTopics().names().get());
|
||||
assertEquals(Set.of(TEST_TOPIC), adminClient.listTopics().names().get());
|
||||
assertEquals(
|
||||
new TopicDescription(
|
||||
TEST_TOPIC, false,
|
||||
Collections.singletonList(
|
||||
new TopicPartitionInfo(0, broker1, singleReplica, Collections.emptyList(), Collections.emptyList(), Collections.emptyList()))),
|
||||
adminClient.describeTopics(Collections.singleton(TEST_TOPIC)).topicNameValues().get(TEST_TOPIC).get()
|
||||
List.of(
|
||||
new TopicPartitionInfo(0, broker1, singleReplica, List.of(), List.of(), List.of()))),
|
||||
adminClient.describeTopics(Set.of(TEST_TOPIC)).topicNameValues().get(TEST_TOPIC).get()
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -186,8 +184,8 @@ public class WorkerUtilsTest {
|
|||
public void testCreatesOneTopicVerifiesOneTopic() throws Throwable {
|
||||
final String existingTopic = "existing-topic";
|
||||
List<TopicPartitionInfo> tpInfo = new ArrayList<>();
|
||||
tpInfo.add(new TopicPartitionInfo(0, broker1, singleReplica, Collections.emptyList()));
|
||||
tpInfo.add(new TopicPartitionInfo(1, broker2, singleReplica, Collections.emptyList()));
|
||||
tpInfo.add(new TopicPartitionInfo(0, broker1, singleReplica, List.of()));
|
||||
tpInfo.add(new TopicPartitionInfo(1, broker2, singleReplica, List.of()));
|
||||
adminClient.addTopic(
|
||||
false,
|
||||
existingTopic,
|
||||
|
@ -207,7 +205,7 @@ public class WorkerUtilsTest {
|
|||
@Test
|
||||
public void testCreateNonExistingTopicsWithZeroTopicsDoesNothing() throws Throwable {
|
||||
WorkerUtils.createTopics(
|
||||
log, adminClient, Collections.emptyMap(), false);
|
||||
log, adminClient, Map.of(), false);
|
||||
assertEquals(0, adminClient.listTopics().names().get().size());
|
||||
}
|
||||
|
||||
|
@ -224,8 +222,8 @@ public class WorkerUtilsTest {
|
|||
|
||||
WorkerUtils.addConfigsToProperties(
|
||||
props,
|
||||
Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, "test-client"),
|
||||
Collections.singletonMap(ProducerConfig.LINGER_MS_CONFIG, "1000"));
|
||||
Map.of(ProducerConfig.CLIENT_ID_CONFIG, "test-client"),
|
||||
Map.of(ProducerConfig.LINGER_MS_CONFIG, "1000"));
|
||||
assertEquals(resultProps, props);
|
||||
}
|
||||
|
||||
|
@ -242,8 +240,8 @@ public class WorkerUtilsTest {
|
|||
|
||||
WorkerUtils.addConfigsToProperties(
|
||||
props,
|
||||
Collections.singletonMap(ProducerConfig.ACKS_CONFIG, "1"),
|
||||
Collections.singletonMap(ProducerConfig.LINGER_MS_CONFIG, "1000"));
|
||||
Map.of(ProducerConfig.ACKS_CONFIG, "1"),
|
||||
Map.of(ProducerConfig.LINGER_MS_CONFIG, "1000"));
|
||||
assertEquals(resultProps, props);
|
||||
}
|
||||
|
||||
|
@ -259,8 +257,8 @@ public class WorkerUtilsTest {
|
|||
|
||||
WorkerUtils.addConfigsToProperties(
|
||||
props,
|
||||
Collections.singletonMap(ProducerConfig.ACKS_CONFIG, "1"),
|
||||
Collections.singletonMap(ProducerConfig.ACKS_CONFIG, "0"));
|
||||
Map.of(ProducerConfig.ACKS_CONFIG, "1"),
|
||||
Map.of(ProducerConfig.ACKS_CONFIG, "0"));
|
||||
assertEquals(resultProps, props);
|
||||
}
|
||||
|
||||
|
@ -308,7 +306,7 @@ public class WorkerUtilsTest {
|
|||
for (int i = 0; i < numPartitions; ++i) {
|
||||
Node broker = cluster.get(brokerIndex);
|
||||
tpInfo.add(new TopicPartitionInfo(
|
||||
i, broker, singleReplica, Collections.emptyList()));
|
||||
i, broker, singleReplica, List.of()));
|
||||
brokerIndex = (brokerIndex + 1) % cluster.size();
|
||||
}
|
||||
adminClient.addTopic(
|
||||
|
@ -320,14 +318,14 @@ public class WorkerUtilsTest {
|
|||
|
||||
@Test
|
||||
public void testVerifyTopics() throws Throwable {
|
||||
Map<String, NewTopic> newTopics = Collections.singletonMap(TEST_TOPIC, NEW_TEST_TOPIC);
|
||||
Map<String, NewTopic> newTopics = Map.of(TEST_TOPIC, NEW_TEST_TOPIC);
|
||||
WorkerUtils.createTopics(log, adminClient, newTopics, true);
|
||||
adminClient.setFetchesRemainingUntilVisible(TEST_TOPIC, 2);
|
||||
WorkerUtils.verifyTopics(log, adminClient, Collections.singleton(TEST_TOPIC),
|
||||
Collections.singletonMap(TEST_TOPIC, NEW_TEST_TOPIC), 3, 1);
|
||||
WorkerUtils.verifyTopics(log, adminClient, Set.of(TEST_TOPIC),
|
||||
Map.of(TEST_TOPIC, NEW_TEST_TOPIC), 3, 1);
|
||||
adminClient.setFetchesRemainingUntilVisible(TEST_TOPIC, 100);
|
||||
assertThrows(UnknownTopicOrPartitionException.class, () ->
|
||||
WorkerUtils.verifyTopics(log, adminClient, Collections.singleton(TEST_TOPIC),
|
||||
Collections.singletonMap(TEST_TOPIC, NEW_TEST_TOPIC), 2, 1));
|
||||
WorkerUtils.verifyTopics(log, adminClient, Set.of(TEST_TOPIC),
|
||||
Map.of(TEST_TOPIC, NEW_TEST_TOPIC), 2, 1));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -57,7 +57,6 @@ import org.slf4j.Logger;
|
|||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
@ -370,7 +369,7 @@ public class CoordinatorTest {
|
|||
private static List<List<String>> createPartitionLists(String[][] array) {
|
||||
List<List<String>> list = new ArrayList<>();
|
||||
for (String[] a : array) {
|
||||
list.add(Arrays.asList(a));
|
||||
list.add(List.of(a));
|
||||
}
|
||||
return list;
|
||||
}
|
||||
|
@ -486,7 +485,7 @@ public class CoordinatorTest {
|
|||
assertEquals(0, coordinatorClient.tasks(
|
||||
new TasksRequest(null, 10, 0, 10, 0, Optional.empty())).tasks().size());
|
||||
TasksResponse resp1 = coordinatorClient.tasks(
|
||||
new TasksRequest(Arrays.asList("foo", "baz"), 0, 0, 0, 0, Optional.empty()));
|
||||
new TasksRequest(List.of("foo", "baz"), 0, 0, 0, 0, Optional.empty()));
|
||||
assertTrue(resp1.tasks().containsKey("foo"));
|
||||
assertFalse(resp1.tasks().containsKey("bar"));
|
||||
assertEquals(1, resp1.tasks().size());
|
||||
|
|
|
@ -20,8 +20,6 @@ package org.apache.kafka.trogdor.task;
|
|||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
public class SampleTaskSpec extends TaskSpec {
|
||||
|
@ -34,8 +32,7 @@ public class SampleTaskSpec extends TaskSpec {
|
|||
@JsonProperty("nodeToExitMs") Map<String, Long> nodeToExitMs,
|
||||
@JsonProperty("error") String error) {
|
||||
super(startMs, durationMs);
|
||||
this.nodeToExitMs = nodeToExitMs == null ? new HashMap<>() :
|
||||
Collections.unmodifiableMap(nodeToExitMs);
|
||||
this.nodeToExitMs = nodeToExitMs == null ? Map.of() : Map.copyOf(nodeToExitMs);
|
||||
this.error = error == null ? "" : error;
|
||||
}
|
||||
|
||||
|
|
|
@ -22,13 +22,10 @@ import org.apache.kafka.common.TopicPartition;
|
|||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
|
@ -38,7 +35,7 @@ public class ConsumeBenchSpecTest {
|
|||
|
||||
@Test
|
||||
public void testMaterializeTopicsWithNoPartitions() {
|
||||
Map<String, List<TopicPartition>> materializedTopics = consumeBenchSpec(Arrays.asList("topic[1-3]", "secondTopic")).materializeTopics();
|
||||
Map<String, List<TopicPartition>> materializedTopics = consumeBenchSpec(List.of("topic[1-3]", "secondTopic")).materializeTopics();
|
||||
Map<String, List<TopicPartition>> expected = new HashMap<>();
|
||||
expected.put("topic1", new ArrayList<>());
|
||||
expected.put("topic2", new ArrayList<>());
|
||||
|
@ -50,28 +47,28 @@ public class ConsumeBenchSpecTest {
|
|||
|
||||
@Test
|
||||
public void testMaterializeTopicsWithSomePartitions() {
|
||||
Map<String, List<TopicPartition>> materializedTopics = consumeBenchSpec(Arrays.asList("topic[1-3]:[1-5]", "secondTopic", "thirdTopic:1")).materializeTopics();
|
||||
Map<String, List<TopicPartition>> materializedTopics = consumeBenchSpec(List.of("topic[1-3]:[1-5]", "secondTopic", "thirdTopic:1")).materializeTopics();
|
||||
Map<String, List<TopicPartition>> expected = new HashMap<>();
|
||||
expected.put("topic1", IntStream.range(1, 6).asLongStream().mapToObj(i -> new TopicPartition("topic1", (int) i)).collect(Collectors.toList()));
|
||||
expected.put("topic2", IntStream.range(1, 6).asLongStream().mapToObj(i -> new TopicPartition("topic2", (int) i)).collect(Collectors.toList()));
|
||||
expected.put("topic3", IntStream.range(1, 6).asLongStream().mapToObj(i -> new TopicPartition("topic3", (int) i)).collect(Collectors.toList()));
|
||||
expected.put("topic1", IntStream.range(1, 6).asLongStream().mapToObj(i -> new TopicPartition("topic1", (int) i)).toList());
|
||||
expected.put("topic2", IntStream.range(1, 6).asLongStream().mapToObj(i -> new TopicPartition("topic2", (int) i)).toList());
|
||||
expected.put("topic3", IntStream.range(1, 6).asLongStream().mapToObj(i -> new TopicPartition("topic3", (int) i)).toList());
|
||||
expected.put("secondTopic", new ArrayList<>());
|
||||
expected.put("thirdTopic", Collections.singletonList(new TopicPartition("thirdTopic", 1)));
|
||||
expected.put("thirdTopic", List.of(new TopicPartition("thirdTopic", 1)));
|
||||
|
||||
assertEquals(expected, materializedTopics);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInvalidTopicNameRaisesExceptionInMaterialize() {
|
||||
for (String invalidName : Arrays.asList("In:valid", "invalid:", ":invalid", "in:valid:1", "invalid:2:2", "invalid::1", "invalid[1-3]:")) {
|
||||
assertThrows(IllegalArgumentException.class, () -> consumeBenchSpec(Collections.singletonList(invalidName)).materializeTopics());
|
||||
for (String invalidName : List.of("In:valid", "invalid:", ":invalid", "in:valid:1", "invalid:2:2", "invalid::1", "invalid[1-3]:")) {
|
||||
assertThrows(IllegalArgumentException.class, () -> consumeBenchSpec(List.of(invalidName)).materializeTopics());
|
||||
}
|
||||
}
|
||||
|
||||
private ConsumeBenchSpec consumeBenchSpec(List<String> activeTopics) {
|
||||
return new ConsumeBenchSpec(0, 0, "node", "localhost",
|
||||
123, 1234, "cg-1",
|
||||
Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), 1,
|
||||
Map.of(), Map.of(), Map.of(), 1,
|
||||
Optional.empty(), activeTopics);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -35,7 +35,7 @@ import java.io.File;
|
|||
import java.io.OutputStream;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.nio.file.Files;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
|
||||
|
@ -62,7 +62,7 @@ public class ExternalCommandWorkerTest {
|
|||
ExternalCommandSpec spec = new ExternalCommandSpec(0,
|
||||
30000,
|
||||
"node0",
|
||||
Arrays.asList(command),
|
||||
List.of(command),
|
||||
workload,
|
||||
Optional.of(shutdownGracePeriodMs));
|
||||
return new ExternalCommandWorker(id, spec);
|
||||
|
|
|
@ -22,7 +22,6 @@ import org.junit.jupiter.api.Timeout;
|
|||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.ByteOrder;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
|
@ -146,8 +145,8 @@ public class PayloadGeneratorTest {
|
|||
new ConstantPayloadGenerator(4, new byte[0]);
|
||||
RandomComponent constantConfig = new RandomComponent(25, constantGenerator);
|
||||
|
||||
List<RandomComponent> components1 = new ArrayList<>(Arrays.asList(nullConfig, uniformConfig));
|
||||
List<RandomComponent> components2 = new ArrayList<>(Arrays.asList(sequentialConfig, constantConfig));
|
||||
List<RandomComponent> components1 = List.of(nullConfig, uniformConfig);
|
||||
List<RandomComponent> components2 = List.of(sequentialConfig, constantConfig);
|
||||
byte[] expected = new byte[4];
|
||||
|
||||
PayloadIterator iter = new PayloadIterator(
|
||||
|
@ -180,8 +179,8 @@ public class PayloadGeneratorTest {
|
|||
RandomComponent sequentialConfig2 = new RandomComponent(25, sequentialGenerator);
|
||||
RandomComponent nullConfig2 = new RandomComponent(25, nullGenerator);
|
||||
|
||||
List<RandomComponent> components3 = new ArrayList<>(Arrays.asList(sequentialConfig2, uniformConfig2, nullConfig));
|
||||
List<RandomComponent> components4 = new ArrayList<>(Arrays.asList(uniformConfig2, sequentialConfig2, constantConfig, nullConfig2));
|
||||
List<RandomComponent> components3 = List.of(sequentialConfig2, uniformConfig2, nullConfig);
|
||||
List<RandomComponent> components4 = List.of(uniformConfig2, sequentialConfig2, constantConfig, nullConfig2);
|
||||
|
||||
testReproducible(new RandomComponentPayloadGenerator(4, components1));
|
||||
testReproducible(new RandomComponentPayloadGenerator(123, components2));
|
||||
|
@ -200,12 +199,12 @@ public class PayloadGeneratorTest {
|
|||
new ConstantPayloadGenerator(4, new byte[0]);
|
||||
RandomComponent constantConfig = new RandomComponent(-25, constantGenerator);
|
||||
|
||||
List<RandomComponent> components1 = new ArrayList<>(Arrays.asList(nullConfig, uniformConfig));
|
||||
List<RandomComponent> components2 = new ArrayList<>(Arrays.asList(
|
||||
nullConfig, constantConfig, uniformConfig, nullConfig, uniformConfig, uniformConfig));
|
||||
List<RandomComponent> components1 = List.of(nullConfig, uniformConfig);
|
||||
List<RandomComponent> components2 = List.of(
|
||||
nullConfig, constantConfig, uniformConfig, nullConfig, uniformConfig, uniformConfig);
|
||||
|
||||
assertThrows(IllegalArgumentException.class, () ->
|
||||
new PayloadIterator(new RandomComponentPayloadGenerator(1, new ArrayList<>())));
|
||||
new PayloadIterator(new RandomComponentPayloadGenerator(1, List.of())));
|
||||
assertThrows(IllegalArgumentException.class, () ->
|
||||
new PayloadIterator(new RandomComponentPayloadGenerator(13, components2)));
|
||||
assertThrows(IllegalArgumentException.class, () ->
|
||||
|
|
|
@ -18,10 +18,9 @@ package org.apache.kafka.trogdor.workload;
|
|||
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
|
||||
|
@ -32,7 +31,7 @@ class ShareConsumeBenchSpecTest {
|
|||
|
||||
@Test
|
||||
public void testExpandTopicNames() {
|
||||
ShareConsumeBenchSpec shareConsumeBenchSpec = shareConsumeBenchSpec(Arrays.asList("foo[1-3]", "bar"));
|
||||
ShareConsumeBenchSpec shareConsumeBenchSpec = shareConsumeBenchSpec(List.of("foo[1-3]", "bar"));
|
||||
Set<String> expectedNames = new HashSet<>();
|
||||
|
||||
expectedNames.add("foo1");
|
||||
|
@ -45,15 +44,15 @@ class ShareConsumeBenchSpecTest {
|
|||
|
||||
@Test
|
||||
public void testInvalidNameRaisesException() {
|
||||
for (String invalidName : Arrays.asList("In:valid", "invalid:", ":invalid[]", "in:valid:", "invalid[1-3]:")) {
|
||||
assertThrows(IllegalArgumentException.class, () -> shareConsumeBenchSpec(Collections.singletonList(invalidName)).expandTopicNames());
|
||||
for (String invalidName : List.of("In:valid", "invalid:", ":invalid[]", "in:valid:", "invalid[1-3]:")) {
|
||||
assertThrows(IllegalArgumentException.class, () -> shareConsumeBenchSpec(List.of(invalidName)).expandTopicNames());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDefaultShareGroupName() {
|
||||
ShareConsumeBenchSpec shareConsumeBenchSpec = new ShareConsumeBenchSpec(0, 0, "node", "localhost",
|
||||
123, 1234, null, Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), 1,
|
||||
123, 1234, null, Map.of(), Map.of(), Map.of(), 1,
|
||||
Optional.empty(), List.of("abc"));
|
||||
assertEquals("share", shareConsumeBenchSpec.shareGroup());
|
||||
}
|
||||
|
@ -61,7 +60,7 @@ class ShareConsumeBenchSpecTest {
|
|||
private ShareConsumeBenchSpec shareConsumeBenchSpec(List<String> activeTopics) {
|
||||
return new ShareConsumeBenchSpec(0, 0, "node", "localhost",
|
||||
123, 1234, "sg-1",
|
||||
Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), 1,
|
||||
Map.of(), Map.of(), Map.of(), 1,
|
||||
Optional.empty(), activeTopics);
|
||||
}
|
||||
|
||||
|
|
|
@ -22,7 +22,6 @@ import org.apache.kafka.trogdor.common.JsonUtil;
|
|||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.Timeout;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
@ -44,8 +43,8 @@ public class TopicsSpecTest {
|
|||
FOO.set("topicA[0-2]", PARTSA);
|
||||
|
||||
Map<Integer, List<Integer>> assignmentsB = new HashMap<>();
|
||||
assignmentsB.put(0, Arrays.asList(0, 1, 2));
|
||||
assignmentsB.put(1, Arrays.asList(2, 3, 4));
|
||||
assignmentsB.put(0, List.of(0, 1, 2));
|
||||
assignmentsB.put(1, List.of(2, 3, 4));
|
||||
PARTSB = new PartitionsSpec(0, (short) 0, assignmentsB, null);
|
||||
FOO.set("topicB", PARTSB);
|
||||
}
|
||||
|
@ -67,14 +66,14 @@ public class TopicsSpecTest {
|
|||
@Test
|
||||
public void testPartitionNumbers() {
|
||||
List<Integer> partsANumbers = PARTSA.partitionNumbers();
|
||||
assertEquals(Integer.valueOf(0), partsANumbers.get(0));
|
||||
assertEquals(Integer.valueOf(1), partsANumbers.get(1));
|
||||
assertEquals(Integer.valueOf(2), partsANumbers.get(2));
|
||||
assertTrue(partsANumbers.contains(0));
|
||||
assertTrue(partsANumbers.contains(1));
|
||||
assertTrue(partsANumbers.contains(2));
|
||||
assertEquals(3, partsANumbers.size());
|
||||
|
||||
List<Integer> partsBNumbers = PARTSB.partitionNumbers();
|
||||
assertEquals(Integer.valueOf(0), partsBNumbers.get(0));
|
||||
assertEquals(Integer.valueOf(1), partsBNumbers.get(1));
|
||||
assertTrue(partsBNumbers.contains(0));
|
||||
assertTrue(partsBNumbers.contains(1));
|
||||
assertEquals(2, partsBNumbers.size());
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue