MINOR: Cleanup Tools Module (1/n) (#20091)

Now that Kafka support Java 17, this PR makes some changes in tools
module. The changes in this PR are limited to only some files. A future
PR(s) shall follow.
The changes mostly include:
- Collections.emptyList(), Collections.singletonList() and
Arrays.asList() are replaced with List.of()
- Collections.emptyMap() and Collections.singletonMap() are replaced
with Map.of()
- Collections.singleton() is replaced with Set.of()

Sub modules targeted: tools/src/main

Reviewers: Ken Huang <s7133700@gmail.com>, Jhen-Yung Hsu
<jhenyunghsu@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Sanskar Jhajharia 2025-07-22 15:44:25 +05:30 committed by GitHub
parent dfd996e51e
commit 5cf6a9d80d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
35 changed files with 261 additions and 319 deletions

View File

@ -40,9 +40,7 @@ import org.apache.kafka.server.util.CommandLineUtils;
import java.io.IOException;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@ -177,7 +175,7 @@ public class AclCommand {
private static void removeAcls(Admin adminClient, Set<AccessControlEntry> acls, ResourcePatternFilter filter) throws ExecutionException, InterruptedException {
if (acls.isEmpty()) {
adminClient.deleteAcls(Collections.singletonList(new AclBindingFilter(filter, AccessControlEntryFilter.ANY))).all().get();
adminClient.deleteAcls(List.of(new AclBindingFilter(filter, AccessControlEntryFilter.ANY))).all().get();
} else {
List<AclBindingFilter> aclBindingFilters = acls.stream().map(acl -> new AclBindingFilter(filter, acl.toFilter())).collect(Collectors.toList());
adminClient.deleteAcls(aclBindingFilters).all().get();
@ -249,8 +247,8 @@ public class AclCommand {
Set<ResourcePatternFilter> transactionalIds = filters.stream().filter(f -> f.resourceType() == ResourceType.TRANSACTIONAL_ID).collect(Collectors.toSet());
boolean enableIdempotence = opts.options.has(opts.idempotentOpt);
Set<AccessControlEntry> topicAcls = getAcl(opts, new HashSet<>(Arrays.asList(WRITE, DESCRIBE, CREATE)));
Set<AccessControlEntry> transactionalIdAcls = getAcl(opts, new HashSet<>(Arrays.asList(WRITE, DESCRIBE)));
Set<AccessControlEntry> topicAcls = getAcl(opts, Set.of(WRITE, DESCRIBE, CREATE));
Set<AccessControlEntry> transactionalIdAcls = getAcl(opts, Set.of(WRITE, DESCRIBE));
//Write, Describe, Create permission on topics, Write, Describe on transactionalIds
Map<ResourcePatternFilter, Set<AccessControlEntry>> result = new HashMap<>();
@ -261,7 +259,7 @@ public class AclCommand {
result.put(transactionalId, transactionalIdAcls);
}
if (enableIdempotence) {
result.put(CLUSTER_RESOURCE_FILTER, getAcl(opts, Collections.singleton(IDEMPOTENT_WRITE)));
result.put(CLUSTER_RESOURCE_FILTER, getAcl(opts, Set.of(IDEMPOTENT_WRITE)));
}
return result;
}
@ -272,8 +270,8 @@ public class AclCommand {
Set<ResourcePatternFilter> groups = filters.stream().filter(f -> f.resourceType() == ResourceType.GROUP).collect(Collectors.toSet());
//Read, Describe on topic, Read on consumerGroup
Set<AccessControlEntry> topicAcls = getAcl(opts, new HashSet<>(Arrays.asList(READ, DESCRIBE)));
Set<AccessControlEntry> groupAcls = getAcl(opts, Collections.singleton(READ));
Set<AccessControlEntry> topicAcls = getAcl(opts, Set.of(READ, DESCRIBE));
Set<AccessControlEntry> groupAcls = getAcl(opts, Set.of(READ));
Map<ResourcePatternFilter, Set<AccessControlEntry>> result = new HashMap<>();
for (ResourcePatternFilter topic : topics) {
@ -333,9 +331,9 @@ public class AclCommand {
if (opts.options.has(hostOptionSpec)) {
return opts.options.valuesOf(hostOptionSpec).stream().map(String::trim).collect(Collectors.toSet());
} else if (opts.options.has(principalOptionSpec)) {
return Collections.singleton(AclEntry.WILDCARD_HOST);
return Set.of(AclEntry.WILDCARD_HOST);
} else {
return Collections.emptySet();
return Set.of();
}
}
@ -345,7 +343,7 @@ public class AclCommand {
.map(s -> SecurityUtils.parseKafkaPrincipal(s.trim()))
.collect(Collectors.toSet());
} else {
return Collections.emptySet();
return Set.of();
}
}
@ -547,7 +545,7 @@ public class AclCommand {
if (!options.has(bootstrapServerOpt) && !options.has(bootstrapControllerOpt)) {
CommandLineUtils.printUsageAndExit(parser, "One of --bootstrap-server or --bootstrap-controller must be specified");
}
List<AbstractOptionSpec<?>> mutuallyExclusiveOptions = Arrays.asList(addOpt, removeOpt, listOpt);
List<AbstractOptionSpec<?>> mutuallyExclusiveOptions = List.of(addOpt, removeOpt, listOpt);
long mutuallyExclusiveOptionsCount = mutuallyExclusiveOptions.stream()
.filter(abstractOptionSpec -> options.has(abstractOptionSpec))
.count();
@ -592,10 +590,10 @@ public class AclCommand {
@Override
public String valuePattern() {
List<PatternType> values = Arrays.asList(PatternType.values());
List<PatternType> values = List.of(PatternType.values());
List<PatternType> filteredValues = values.stream()
.filter(type -> type != PatternType.UNKNOWN)
.collect(Collectors.toList());
.toList();
return filteredValues.stream()
.map(Object::toString)
.collect(Collectors.joining("|"));

View File

@ -59,7 +59,6 @@ import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
@ -67,6 +66,7 @@ import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
@ -291,13 +291,13 @@ public class ClientCompatibilityTest {
tryFeature("createTopics", testConfig.createTopicsSupported,
() -> {
try {
client.createTopics(Collections.singleton(
client.createTopics(Set.of(
new NewTopic("newtopic", 1, (short) 1))).all().get();
} catch (ExecutionException e) {
throw e.getCause();
}
},
() -> createTopicsResultTest(client, Collections.singleton("newtopic"))
() -> createTopicsResultTest(client, Set.of("newtopic"))
);
while (true) {
@ -337,7 +337,7 @@ public class ClientCompatibilityTest {
);
Map<ConfigResource, Config> brokerConfig =
client.describeConfigs(Collections.singleton(configResource)).all().get();
client.describeConfigs(Set.of(configResource)).all().get();
if (brokerConfig.get(configResource).entries().isEmpty()) {
throw new KafkaException("Expected to see config entries, but got zero entries");

View File

@ -37,10 +37,10 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
@ -91,11 +91,7 @@ public class ClientMetricsCommand {
}
} catch (ExecutionException e) {
Throwable cause = e.getCause();
if (cause != null) {
printException(cause);
} else {
printException(e);
}
printException(Objects.requireNonNullElse(cause, e));
exitCode = 1;
} catch (Throwable t) {
printException(t);
@ -130,8 +126,8 @@ public class ClientMetricsCommand {
Collection<AlterConfigOp> alterEntries = configsToBeSet.entrySet().stream()
.map(entry -> new AlterConfigOp(new ConfigEntry(entry.getKey(), entry.getValue()),
entry.getValue().isEmpty() ? AlterConfigOp.OpType.DELETE : AlterConfigOp.OpType.SET))
.collect(Collectors.toList());
adminClient.incrementalAlterConfigs(Collections.singletonMap(configResource, alterEntries), alterOptions).all()
.toList();
adminClient.incrementalAlterConfigs(Map.of(configResource, alterEntries), alterOptions).all()
.get(30, TimeUnit.SECONDS);
System.out.println("Altered client metrics config for " + entityName + ".");
@ -144,8 +140,8 @@ public class ClientMetricsCommand {
ConfigResource configResource = new ConfigResource(ConfigResource.Type.CLIENT_METRICS, entityName);
AlterConfigsOptions alterOptions = new AlterConfigsOptions().timeoutMs(30000).validateOnly(false);
Collection<AlterConfigOp> alterEntries = oldConfigs.stream()
.map(entry -> new AlterConfigOp(entry, AlterConfigOp.OpType.DELETE)).collect(Collectors.toList());
adminClient.incrementalAlterConfigs(Collections.singletonMap(configResource, alterEntries), alterOptions)
.map(entry -> new AlterConfigOp(entry, AlterConfigOp.OpType.DELETE)).toList();
adminClient.incrementalAlterConfigs(Map.of(configResource, alterEntries), alterOptions)
.all().get(30, TimeUnit.SECONDS);
System.out.println("Deleted client metrics config for " + entityName + ".");
@ -162,7 +158,7 @@ public class ClientMetricsCommand {
System.out.println("The client metric resource " + entityNameOpt.get() + " doesn't exist and doesn't have dynamic config.");
return;
}
entities = Collections.singletonList(entityNameOpt.get());
entities = List.of(entityNameOpt.get());
} else {
Collection<ConfigResource> resources = adminClient
.listConfigResources(Set.of(ConfigResource.Type.CLIENT_METRICS), new ListConfigResourcesOptions())
@ -189,7 +185,7 @@ public class ClientMetricsCommand {
private Collection<ConfigEntry> getClientMetricsConfig(String entityName) throws Exception {
ConfigResource configResource = new ConfigResource(ConfigResource.Type.CLIENT_METRICS, entityName);
Map<ConfigResource, Config> result = adminClient.describeConfigs(Collections.singleton(configResource))
Map<ConfigResource, Config> result = adminClient.describeConfigs(Set.of(configResource))
.all().get(30, TimeUnit.SECONDS);
return result.get(configResource).entries();
}

View File

@ -32,8 +32,8 @@ import net.sourceforge.argparse4j.inf.Subparser;
import net.sourceforge.argparse4j.inf.Subparsers;
import java.io.PrintStream;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
@ -74,7 +74,7 @@ public class ClusterTool {
.help("Unregister a broker.");
Subparser listEndpoints = subparsers.addParser("list-endpoints")
.help("List endpoints");
for (Subparser subpparser : Arrays.asList(clusterIdParser, unregisterParser, listEndpoints)) {
for (Subparser subpparser : List.of(clusterIdParser, unregisterParser, listEndpoints)) {
MutuallyExclusiveGroup connectionOptions = subpparser.addMutuallyExclusiveGroup().required(true);
connectionOptions.addArgument("--bootstrap-server", "-b")
.action(store())
@ -162,7 +162,7 @@ public class ClusterTool {
Collection<Node> nodes = adminClient.describeCluster(option).nodes().get();
String maxHostLength = String.valueOf(nodes.stream().map(node -> node.host().length()).max(Integer::compareTo).orElse(100));
String maxRackLength = String.valueOf(nodes.stream().filter(node -> node.hasRack()).map(node -> node.rack().length()).max(Integer::compareTo).orElse(10));
String maxRackLength = String.valueOf(nodes.stream().filter(Node::hasRack).map(node -> node.rack().length()).max(Integer::compareTo).orElse(10));
if (listControllerEndpoints) {
String format = "%-10s %-" + maxHostLength + "s %-10s %-" + maxRackLength + "s %-15s%n";

View File

@ -42,7 +42,6 @@ import java.io.UncheckedIOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
@ -149,14 +148,12 @@ public class ConnectPluginPath {
if (subcommand == null) {
throw new ArgumentParserException("No subcommand specified", parser);
}
switch (subcommand) {
case "list":
return new Config(Command.LIST, locations, false, false, out, err);
case "sync-manifests":
return new Config(Command.SYNC_MANIFESTS, locations, namespace.getBoolean("dry_run"), namespace.getBoolean("keep_not_found"), out, err);
default:
throw new ArgumentParserException("Unrecognized subcommand: '" + subcommand + "'", parser);
}
return switch (subcommand) {
case "list" -> new Config(Command.LIST, locations, false, false, out, err);
case "sync-manifests" ->
new Config(Command.SYNC_MANIFESTS, locations, namespace.getBoolean("dry_run"), namespace.getBoolean("keep_not_found"), out, err);
default -> throw new ArgumentParserException("Unrecognized subcommand: '" + subcommand + "'", parser);
};
}
private static Set<Path> parseLocations(ArgumentParser parser, Namespace namespace) throws ArgumentParserException, TerseException {
@ -197,7 +194,7 @@ public class ConnectPluginPath {
}
enum Command {
LIST, SYNC_MANIFESTS;
LIST, SYNC_MANIFESTS
}
private static class Config {
@ -326,11 +323,12 @@ public class ConnectPluginPath {
rowAliases.add(PluginUtils.prunedName(pluginDesc));
rows.add(newRow(workspace, pluginDesc.className(), new ArrayList<>(rowAliases), pluginDesc.type(), pluginDesc.version(), true));
// If a corresponding manifest exists, mark it as loadable by removing it from the map.
// TODO: The use of Collections here shall be fixed with KAFKA-19524.
nonLoadableManifests.getOrDefault(pluginDesc.className(), Collections.emptySet()).remove(pluginDesc.type());
});
nonLoadableManifests.forEach((className, types) -> types.forEach(type -> {
// All manifests which remain in the map are not loadable
rows.add(newRow(workspace, className, Collections.emptyList(), type, PluginDesc.UNDEFINED_VERSION, false));
rows.add(newRow(workspace, className, List.of(), type, PluginDesc.UNDEFINED_VERSION, false));
}));
return rows;
}
@ -436,8 +434,8 @@ public class ConnectPluginPath {
}
private static PluginScanResult discoverPlugins(PluginSource source, ReflectionScanner reflectionScanner, ServiceLoaderScanner serviceLoaderScanner) {
PluginScanResult serviceLoadResult = serviceLoaderScanner.discoverPlugins(Collections.singleton(source));
PluginScanResult reflectiveResult = reflectionScanner.discoverPlugins(Collections.singleton(source));
return new PluginScanResult(Arrays.asList(serviceLoadResult, reflectiveResult));
PluginScanResult serviceLoadResult = serviceLoaderScanner.discoverPlugins(Set.of(source));
PluginScanResult reflectiveResult = reflectionScanner.discoverPlugins(Set.of(source));
return new PluginScanResult(List.of(serviceLoadResult, reflectiveResult));
}
}

View File

@ -37,7 +37,6 @@ import java.io.IOException;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
@ -221,8 +220,8 @@ public class ConsumerPerformance {
}
public static class ConsumerPerfRebListener implements ConsumerRebalanceListener {
private AtomicLong joinTimeMs;
private AtomicLong joinTimeMsInSingleRound;
private final AtomicLong joinTimeMs;
private final AtomicLong joinTimeMsInSingleRound;
private long joinStartMs;
public ConsumerPerfRebListener(AtomicLong joinTimeMs, long joinStartMs, AtomicLong joinTimeMsInSingleRound) {
@ -355,7 +354,7 @@ public class ConsumerPerformance {
}
public Set<String> topic() {
return Collections.singleton(options.valueOf(topicOpt));
return Set.of(options.valueOf(topicOpt));
}
public long numMessages() {

View File

@ -38,7 +38,6 @@ import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.Set;
@ -106,7 +105,7 @@ public class DelegationTokenCommand {
CreateDelegationTokenResult createResult = adminClient.createDelegationToken(createDelegationTokenOptions);
DelegationToken token = createResult.delegationToken().get();
System.out.println("Created delegation token with tokenId : " + token.tokenInfo().tokenId());
printToken(Collections.singletonList(token));
printToken(List.of(token));
return token;
}

View File

@ -34,11 +34,11 @@ import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
@ -90,7 +90,7 @@ public class EndToEndLatency {
int messageSizeBytes = Integer.parseInt(args[4]);
Optional<String> propertiesFile = (args.length > 5 && !Utils.isBlank(args[5])) ? Optional.of(args[5]) : Optional.empty();
if (!Arrays.asList("1", "all").contains(acks)) {
if (!List.of("1", "all").contains(acks)) {
throw new IllegalArgumentException("Latency testing requires synchronous acknowledgement. Please use 1 or all");
}
@ -186,7 +186,7 @@ public class EndToEndLatency {
Admin adminClient = Admin.create(adminProps);
NewTopic newTopic = new NewTopic(topic, DEFAULT_NUM_PARTITIONS, DEFAULT_REPLICATION_FACTOR);
try {
adminClient.createTopics(Collections.singleton(newTopic)).all().get();
adminClient.createTopics(Set.of(newTopic)).all().get();
} catch (ExecutionException | InterruptedException e) {
System.out.printf("Creation of topic %s failed%n", topic);
throw new RuntimeException(e);

View File

@ -39,7 +39,6 @@ import net.sourceforge.argparse4j.inf.Subparser;
import net.sourceforge.argparse4j.inf.Subparsers;
import net.sourceforge.argparse4j.internal.HelpScreenException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -240,7 +239,7 @@ public class FeatureCommand {
}
static String metadataVersionsToString(MetadataVersion first, MetadataVersion last) {
List<MetadataVersion> versions = Arrays.asList(MetadataVersion.VERSIONS).subList(first.ordinal(), last.ordinal() + 1);
List<MetadataVersion> versions = List.of(MetadataVersion.VERSIONS).subList(first.ordinal(), last.ordinal() + 1);
return versions.stream()
.map(String::valueOf)
.collect(Collectors.joining(", "));

View File

@ -42,7 +42,6 @@ import org.apache.kafka.tools.filter.TopicPartitionFilter.TopicFilterAndPartitio
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
@ -312,7 +311,7 @@ public class GetOffsetShell {
* PartitionPattern: NUMBER | NUMBER-(NUMBER)? | -NUMBER
*/
public TopicPartitionFilter createTopicPartitionFilterWithPatternList(String topicPartitions) {
List<String> ruleSpecs = Arrays.asList(topicPartitions.split(","));
List<String> ruleSpecs = List.of(topicPartitions.split(","));
List<TopicPartitionFilter> rules = ruleSpecs.stream().map(ruleSpec -> {
try {
return parseRuleSpec(ruleSpec);
@ -338,7 +337,7 @@ public class GetOffsetShell {
Set<Integer> partitions;
if (partitionsString == null || partitionsString.isEmpty()) {
partitions = Collections.emptySet();
partitions = Set.of();
} else {
try {
partitions = Arrays.stream(partitionsString.split(",")).map(Integer::parseInt).collect(Collectors.toSet());

View File

@ -33,6 +33,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
@ -75,11 +76,7 @@ public class GroupsCommand {
}
} catch (ExecutionException e) {
Throwable cause = e.getCause();
if (cause != null) {
printException(cause);
} else {
printException(e);
}
printException(Objects.requireNonNullElse(cause, e));
exitCode = 1;
} catch (Throwable t) {
printException(t);

View File

@ -96,7 +96,7 @@ public class JmxTool {
while (keepGoing) {
long start = System.currentTimeMillis();
Map<String, Object> attributes = queryAttributes(conn, found, attributesInclude);
attributes.put("time", dateFormat.isPresent() ? dateFormat.get().format(new Date()) : String.valueOf(System.currentTimeMillis()));
attributes.put("time", dateFormat.map(format -> format.format(new Date())).orElseGet(() -> String.valueOf(System.currentTimeMillis())));
maybePrintDataRows(reportFormat, numExpectedAttributes, keys, attributes);
if (options.isOneTime()) {
keepGoing = false;
@ -225,7 +225,7 @@ public class JmxTool {
AttributeList attributes = conn.getAttributes(objectName, attributesNames(mBeanInfo));
List<ObjectName> expectedAttributes = new ArrayList<>();
attributes.asList().forEach(attribute -> {
if (Arrays.asList(attributesInclude.get()).contains(attribute.getName())) {
if (List.of(attributesInclude.get()).contains(attribute.getName())) {
expectedAttributes.add(objectName);
}
});
@ -254,10 +254,10 @@ public class JmxTool {
for (ObjectName objectName : objectNames) {
MBeanInfo beanInfo = conn.getMBeanInfo(objectName);
AttributeList attributes = conn.getAttributes(objectName,
Arrays.stream(beanInfo.getAttributes()).map(a -> a.getName()).toArray(String[]::new));
Arrays.stream(beanInfo.getAttributes()).map(MBeanFeatureInfo::getName).toArray(String[]::new));
for (Attribute attribute : attributes.asList()) {
if (attributesInclude.isPresent()) {
if (Arrays.asList(attributesInclude.get()).contains(attribute.getName())) {
if (List.of(attributesInclude.get()).contains(attribute.getName())) {
result.put(String.format("%s:%s", objectName.toString(), attribute.getName()),
attribute.getValue());
}
@ -395,7 +395,7 @@ public class JmxTool {
private String parseFormat() {
String reportFormat = options.valueOf(reportFormatOpt).toLowerCase(Locale.ROOT);
return Arrays.asList("properties", "csv", "tsv").contains(reportFormat) ? reportFormat : "original";
return List.of("properties", "csv", "tsv").contains(reportFormat) ? reportFormat : "original";
}
public boolean hasJmxAuthPropOpt() {

View File

@ -40,7 +40,6 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@ -93,7 +92,7 @@ public class LeaderElectionCommand {
Optional<Integer> partitionOption = Optional.ofNullable(commandOptions.getPartition());
final Optional<Set<TopicPartition>> singleTopicPartition =
(topicOption.isPresent() && partitionOption.isPresent()) ?
Optional.of(Collections.singleton(new TopicPartition(topicOption.get(), partitionOption.get()))) :
Optional.of(Set.of(new TopicPartition(topicOption.get(), partitionOption.get()))) :
Optional.empty();
/* Note: No need to look at --all-topic-partitions as we want this to be null if it is use.
@ -346,7 +345,7 @@ public class LeaderElectionCommand {
}
// One and only one is required: --topic, --all-topic-partitions or --path-to-json-file
List<AbstractOptionSpec<?>> mutuallyExclusiveOptions = Arrays.asList(
List<AbstractOptionSpec<?>> mutuallyExclusiveOptions = List.of(
topic,
allTopicPartitions,
pathToJsonFile

View File

@ -45,7 +45,6 @@ import java.nio.file.StandardCopyOption;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.HashSet;
@ -83,23 +82,13 @@ public class ManifestWorkspace {
}
public SourceWorkspace<?> forSource(PluginSource source) throws IOException {
SourceWorkspace<?> sourceWorkspace;
switch (source.type()) {
case CLASSPATH:
sourceWorkspace = new ClasspathWorkspace(source);
break;
case MULTI_JAR:
sourceWorkspace = new MultiJarWorkspace(source);
break;
case SINGLE_JAR:
sourceWorkspace = new SingleJarWorkspace(source);
break;
case CLASS_HIERARCHY:
sourceWorkspace = new ClassHierarchyWorkspace(source);
break;
default:
throw new IllegalStateException("Unknown source type " + source.type());
}
SourceWorkspace<?> sourceWorkspace = switch (source.type()) {
case CLASSPATH -> new ClasspathWorkspace(source);
case MULTI_JAR -> new MultiJarWorkspace(source);
case SINGLE_JAR -> new SingleJarWorkspace(source);
case CLASS_HIERARCHY -> new ClassHierarchyWorkspace(source);
default -> throw new IllegalStateException("Unknown source type " + source.type());
};
workspaces.add(sourceWorkspace);
return sourceWorkspace;
}
@ -390,7 +379,7 @@ public class ManifestWorkspace {
}
try (FileSystem jar = FileSystems.newFileSystem(
new URI("jar", writableJar.toUri().toString(), ""),
Collections.emptyMap()
Map.of()
)) {
Path zipRoot = jar.getRootDirectories().iterator().next();
// Set dryRun to false because this jar file is already a writable copy.

View File

@ -50,7 +50,6 @@ import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.LinkedHashSet;
@ -127,35 +126,36 @@ public class MetadataQuorumCommand {
Optional.ofNullable(namespace.getString("bootstrap_controller")));
admin = Admin.create(props);
if (command.equals("describe")) {
if (namespace.getBoolean("status") && namespace.getBoolean("replication")) {
throw new TerseException("Only one of --status or --replication should be specified with describe sub-command");
} else if (namespace.getBoolean("replication")) {
boolean humanReadable = Optional.of(namespace.getBoolean("human_readable")).orElse(false);
handleDescribeReplication(admin, humanReadable);
} else if (namespace.getBoolean("status")) {
if (namespace.getBoolean("human_readable")) {
throw new TerseException("The option --human-readable is only supported along with --replication");
switch (command) {
case "describe" -> {
if (namespace.getBoolean("status") && namespace.getBoolean("replication")) {
throw new TerseException("Only one of --status or --replication should be specified with describe sub-command");
} else if (namespace.getBoolean("replication")) {
boolean humanReadable = Optional.of(namespace.getBoolean("human_readable")).orElse(false);
handleDescribeReplication(admin, humanReadable);
} else if (namespace.getBoolean("status")) {
if (namespace.getBoolean("human_readable")) {
throw new TerseException("The option --human-readable is only supported along with --replication");
}
handleDescribeStatus(admin);
} else {
throw new TerseException("One of --status or --replication must be specified with describe sub-command");
}
handleDescribeStatus(admin);
} else {
throw new TerseException("One of --status or --replication must be specified with describe sub-command");
}
} else if (command.equals("add-controller")) {
if (optionalCommandConfig == null) {
throw new TerseException("You must supply the configuration file of the controller you are " +
"adding when using add-controller.");
case "add-controller" -> {
if (optionalCommandConfig == null) {
throw new TerseException("You must supply the configuration file of the controller you are " +
"adding when using add-controller.");
}
handleAddController(admin,
namespace.getBoolean("dry_run"),
props);
}
handleAddController(admin,
namespace.getBoolean("dry_run"),
props);
} else if (command.equals("remove-controller")) {
handleRemoveController(admin,
case "remove-controller" -> handleRemoveController(admin,
namespace.getInt("controller_id"),
namespace.getString("controller_directory_id"),
namespace.getBoolean("dry_run"));
} else {
throw new IllegalStateException(format("Unknown command: %s", command));
default -> throw new IllegalStateException(format("Unknown command: %s", command));
}
} finally {
if (admin != null)
@ -231,7 +231,7 @@ public class MetadataQuorumCommand {
lastFetchTimestamp,
lastCaughtUpTimestamp,
status
).map(r -> r.toString()).collect(Collectors.toList());
).map(Object::toString).collect(Collectors.toList());
}).collect(Collectors.toList());
}
@ -253,7 +253,7 @@ public class MetadataQuorumCommand {
QuorumInfo quorumInfo = admin.describeMetadataQuorum().quorumInfo().get();
int leaderId = quorumInfo.leaderId();
QuorumInfo.ReplicaState leader = quorumInfo.voters().stream().filter(voter -> voter.replicaId() == leaderId).findFirst().get();
QuorumInfo.ReplicaState maxLagFollower = quorumInfo.voters().stream().min(Comparator.comparingLong(qi -> qi.logEndOffset())).get();
QuorumInfo.ReplicaState maxLagFollower = quorumInfo.voters().stream().min(Comparator.comparingLong(QuorumInfo.ReplicaState::logEndOffset)).get();
long maxFollowerLag = leader.logEndOffset() - maxLagFollower.logEndOffset();
long maxFollowerLagTimeMs;
@ -292,7 +292,7 @@ public class MetadataQuorumCommand {
List<Node> currentVoterList = replicas.stream().map(voter -> new Node(
voter.replicaId(),
voter.replicaDirectoryId(),
getEndpoints(quorumInfo.nodes().get(voter.replicaId())))).collect(Collectors.toList());
getEndpoints(quorumInfo.nodes().get(voter.replicaId())))).toList();
return currentVoterList.stream().map(Objects::toString).collect(Collectors.joining(", ", "[", "]"));
}
@ -378,7 +378,7 @@ public class MetadataQuorumCommand {
static Uuid getMetadataDirectoryId(String metadataDirectory) throws Exception {
MetaPropertiesEnsemble ensemble = new MetaPropertiesEnsemble.Loader().
addLogDirs(Collections.singletonList(metadataDirectory)).
addLogDirs(List.of(metadataDirectory)).
addMetadataLogDir(metadataDirectory).
load();
MetaProperties metaProperties = ensemble.logDirProps().get(metadataDirectory);

View File

@ -42,7 +42,6 @@ import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -367,7 +366,7 @@ public class OffsetsUtils {
if (resetPlanForGroup == null) {
printError("No reset plan for group " + groupId + " found", Optional.empty());
return Collections.<TopicPartition, OffsetAndMetadata>emptyMap();
return Map.<TopicPartition, OffsetAndMetadata>of();
}
Map<TopicPartition, Long> requestedOffsets = resetPlanForGroup.keySet().stream().collect(Collectors.toMap(
@ -376,7 +375,7 @@ public class OffsetsUtils {
return checkOffsetsRange(requestedOffsets).entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> new OffsetAndMetadata(e.getValue())));
}).orElseGet(Collections::emptyMap);
}).orElseGet(Map::of);
}
public Map<TopicPartition, OffsetAndMetadata> resetToCurrent(Collection<TopicPartition> partitionsToReset, Map<TopicPartition, OffsetAndMetadata> currentCommittedOffsets) {

View File

@ -127,7 +127,7 @@ public class ReplicaVerificationTool {
List<TopicDescription> filteredTopicMetadata = topicsMetadata.stream().filter(
topicMetadata -> options.topicsIncludeFilter().isTopicAllowed(topicMetadata.name(), false)
).collect(Collectors.toList());
).toList();
if (filteredTopicMetadata.isEmpty()) {
LOG.error("No topics found. {} if specified, is either filtering out all topics or there is no topic.", options.topicsIncludeOpt);
@ -196,7 +196,7 @@ public class ReplicaVerificationTool {
counter.incrementAndGet()
);
})
.collect(Collectors.toList());
.toList();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
LOG.info("Stopping all fetchers");

View File

@ -37,7 +37,6 @@ import java.io.IOException;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@ -396,7 +395,7 @@ public class ShareConsumerPerformance {
}
public Set<String> topic() {
return Collections.singleton(options.valueOf(topicOpt));
return Set.of(options.valueOf(topicOpt));
}
public long numMessages() {

View File

@ -44,7 +44,6 @@ import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
@ -178,7 +177,7 @@ public class StreamsResetter {
final StreamsResetterOptions options)
throws ExecutionException, InterruptedException {
final DescribeConsumerGroupsResult describeResult = adminClient.describeConsumerGroups(
Collections.singleton(groupId),
Set.of(groupId),
new DescribeConsumerGroupsOptions().timeoutMs(10 * 1000));
try {
final List<MemberDescription> members =
@ -212,15 +211,15 @@ public class StreamsResetter {
final List<String> notFoundInputTopics = new ArrayList<>();
final List<String> notFoundIntermediateTopics = new ArrayList<>();
if (inputTopics.size() == 0 && intermediateTopics.size() == 0) {
if (inputTopics.isEmpty() && intermediateTopics.isEmpty()) {
System.out.println("No input or intermediate topics specified. Skipping seek.");
return EXIT_CODE_SUCCESS;
}
if (inputTopics.size() != 0) {
if (!inputTopics.isEmpty()) {
System.out.println("Reset-offsets for input topics " + inputTopics);
}
if (intermediateTopics.size() != 0) {
if (!intermediateTopics.isEmpty()) {
System.out.println("Seek-to-end for intermediate topics " + intermediateTopics);
}
@ -313,7 +312,7 @@ public class StreamsResetter {
public void maybeSeekToEnd(final String groupId,
final Consumer<byte[], byte[]> client,
final Set<TopicPartition> intermediateTopicPartitions) {
if (intermediateTopicPartitions.size() > 0) {
if (!intermediateTopicPartitions.isEmpty()) {
System.out.println("Following intermediate topics offsets will be reset to end (for consumer group " + groupId + ")");
for (final TopicPartition topicPartition : intermediateTopicPartitions) {
if (allTopics.contains(topicPartition.topic())) {
@ -328,7 +327,7 @@ public class StreamsResetter {
final Set<TopicPartition> inputTopicPartitions,
final StreamsResetterOptions options)
throws IOException, ParseException {
if (inputTopicPartitions.size() > 0) {
if (!inputTopicPartitions.isEmpty()) {
System.out.println("Following input topics offsets will be reset to (for consumer group " + options.applicationId() + ")");
if (options.hasToOffset()) {
resetOffsetsTo(client, inputTopicPartitions, options.toOffset());
@ -405,7 +404,7 @@ public class StreamsResetter {
if (partitionOffset.isPresent()) {
client.seek(topicPartition, partitionOffset.get());
} else {
client.seekToEnd(Collections.singletonList(topicPartition));
client.seekToEnd(List.of(topicPartition));
System.out.println("Partition " + topicPartition.partition() + " from topic " + topicPartition.topic() +
" is empty, without a committed record. Falling back to latest known offset.");
}
@ -508,7 +507,7 @@ public class StreamsResetter {
final List<String> topicsToDelete;
if (!specifiedInternalTopics.isEmpty()) {
if (!inferredInternalTopics.containsAll(specifiedInternalTopics)) {
if (!new HashSet<>(inferredInternalTopics).containsAll(specifiedInternalTopics)) {
throw new IllegalArgumentException("Invalid topic specified in the "
+ "--internal-topics option. "
+ "Ensure that the topics specified are all internal topics. "

View File

@ -59,18 +59,19 @@ import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import joptsimple.ArgumentAcceptingOptionSpec;
import joptsimple.OptionSpec;
@ -112,11 +113,7 @@ public abstract class TopicCommand {
}
} catch (ExecutionException e) {
Throwable cause = e.getCause();
if (cause != null) {
printException(cause);
} else {
printException(e);
}
printException(Objects.requireNonNullElse(cause, e));
exitCode = 1;
} catch (Throwable e) {
printException(e);
@ -159,10 +156,10 @@ public abstract class TopicCommand {
@SuppressWarnings("deprecation")
private static Properties parseTopicConfigsToBeAdded(TopicCommandOptions opts) {
List<List<String>> configsToBeAdded = opts.topicConfig().orElse(Collections.emptyList())
List<List<String>> configsToBeAdded = opts.topicConfig().orElse(List.of())
.stream()
.map(s -> Arrays.asList(s.split("\\s*=\\s*")))
.collect(Collectors.toList());
.map(s -> List.of(s.split("\\s*=\\s*")))
.toList();
if (!configsToBeAdded.stream().allMatch(config -> config.size() == 2)) {
throw new IllegalArgumentException("requirement failed: Invalid topic config: all configs to be added must be in the format \"key=val\".");
@ -256,7 +253,7 @@ public abstract class TopicCommand {
name = options.topic().get();
partitions = options.partitions();
replicationFactor = options.replicationFactor();
replicaAssignment = options.replicaAssignment().orElse(Collections.emptyMap());
replicaAssignment = options.replicaAssignment().orElse(Map.of());
configsToAdd = parseTopicConfigsToBeAdded(options);
}
@ -357,10 +354,10 @@ public abstract class TopicCommand {
.collect(Collectors.joining(",")));
if (reassignment != null) {
System.out.print("\tAdding Replicas: " + reassignment.addingReplicas().stream()
.map(node -> node.toString())
.map(Object::toString)
.collect(Collectors.joining(",")));
System.out.print("\tRemoving Replicas: " + reassignment.removingReplicas().stream()
.map(node -> node.toString())
.map(Object::toString)
.collect(Collectors.joining(",")));
}
@ -443,9 +440,7 @@ public abstract class TopicCommand {
}
private static Admin createAdminClient(Properties commandConfig, Optional<String> bootstrapServer) {
if (bootstrapServer.isPresent()) {
commandConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer.get());
}
bootstrapServer.ifPresent(s -> commandConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, s));
return Admin.create(commandConfig);
}
@ -475,10 +470,10 @@ public abstract class TopicCommand {
}
Map<String, String> configsMap = topic.configsToAdd.stringPropertyNames().stream()
.collect(Collectors.toMap(name -> name, name -> topic.configsToAdd.getProperty(name)));
.collect(Collectors.toMap(name -> name, topic.configsToAdd::getProperty));
newTopic.configs(configsMap);
CreateTopicsResult createResult = adminClient.createTopics(Collections.singleton(newTopic),
CreateTopicsResult createResult = adminClient.createTopics(Set.of(newTopic),
new CreateTopicsOptions().retryOnQuotaViolation(false));
createResult.all().get();
System.out.println("Created topic " + topic.name + ".");
@ -493,9 +488,7 @@ public abstract class TopicCommand {
}
public void listTopics(TopicCommandOptions opts) throws ExecutionException, InterruptedException {
String results = getTopics(opts.topic(), opts.excludeInternalTopics())
.stream()
.collect(Collectors.joining("\n"));
String results = String.join("\n", getTopics(opts.topic(), opts.excludeInternalTopics()));
System.out.println(results);
}
@ -539,7 +532,7 @@ public abstract class TopicCommand {
Throwable cause = e.getCause();
if (cause instanceof UnsupportedVersionException || cause instanceof ClusterAuthorizationException) {
LOG.debug("Couldn't query reassignments through the AdminClient API: " + cause.getMessage(), cause);
return Collections.emptyMap();
return Map.of();
} else {
throw new RuntimeException(e);
}
@ -558,9 +551,9 @@ public abstract class TopicCommand {
List<String> topics;
if (useTopicId) {
topicIds = getTopicIds(inputTopicId.get(), opts.excludeInternalTopics());
topics = Collections.emptyList();
topics = List.of();
} else {
topicIds = Collections.emptyList();
topicIds = List.of();
topics = getTopics(opts.topic(), opts.excludeInternalTopics());
}
@ -588,7 +581,7 @@ public abstract class TopicCommand {
List<String> topicNames = topicDescriptions.stream()
.map(org.apache.kafka.clients.admin.TopicDescription::name)
.collect(Collectors.toList());
.toList();
Map<ConfigResource, KafkaFuture<Config>> allConfigs = adminClient.describeConfigs(
topicNames.stream()
.map(name -> new ConfigResource(ConfigResource.Type.TOPIC, name))
@ -596,7 +589,7 @@ public abstract class TopicCommand {
).values();
List<Integer> liveBrokers = adminClient.describeCluster().nodes().get().stream()
.map(Node::id)
.collect(Collectors.toList());
.toList();
DescribeOptions describeOptions = new DescribeOptions(opts, new HashSet<>(liveBrokers));
Set<TopicPartition> topicPartitions = topicDescriptions
.stream()
@ -647,7 +640,7 @@ public abstract class TopicCommand {
public void deleteTopic(TopicCommandOptions opts) throws ExecutionException, InterruptedException {
List<String> topics = getTopics(opts.topic(), opts.excludeInternalTopics());
ensureTopicExists(topics, opts.topic(), !opts.ifExists());
adminClient.deleteTopics(Collections.unmodifiableList(topics),
adminClient.deleteTopics(List.copyOf(topics),
new DeleteTopicsOptions().retryOnQuotaViolation(false)
).all().get();
}
@ -668,10 +661,10 @@ public abstract class TopicCommand {
List<Uuid> allTopicIds = allTopics.listings().get().stream()
.map(TopicListing::topicId)
.sorted()
.collect(Collectors.toList());
.toList();
return allTopicIds.contains(topicIdIncludeList) ?
Collections.singletonList(topicIdIncludeList) :
Collections.emptyList();
List.of(topicIdIncludeList) :
List.of();
}
@Override
@ -835,7 +828,7 @@ public abstract class TopicCommand {
}
public <A> Optional<List<A>> valuesAsOption(OptionSpec<A> option) {
return valuesAsOption(option, Collections.emptyList());
return valuesAsOption(option, List.of());
}
public <A> Optional<A> valueAsOption(OptionSpec<A> option, Optional<A> defaultValue) {
@ -953,8 +946,7 @@ public abstract class TopicCommand {
// should have exactly one action
long actions =
Arrays.asList(createOpt, listOpt, alterOpt, describeOpt, deleteOpt)
.stream().filter(options::has)
Stream.of(createOpt, listOpt, alterOpt, describeOpt, deleteOpt).filter(options::has)
.count();
if (actions != 1)
CommandLineUtils.printUsageAndExit(parser, "Command must include exactly one action: --list, --describe, --create, --alter or --delete");
@ -989,29 +981,29 @@ public abstract class TopicCommand {
private void checkInvalidArgs() {
// check invalid args
CommandLineUtils.checkInvalidArgs(parser, options, configOpt, invalidOptions(Arrays.asList(alterOpt, createOpt)));
CommandLineUtils.checkInvalidArgs(parser, options, partitionsOpt, invalidOptions(Arrays.asList(alterOpt, createOpt)));
CommandLineUtils.checkInvalidArgs(parser, options, replicationFactorOpt, invalidOptions(Arrays.asList(createOpt)));
CommandLineUtils.checkInvalidArgs(parser, options, replicaAssignmentOpt, invalidOptions(Arrays.asList(alterOpt, createOpt)));
CommandLineUtils.checkInvalidArgs(parser, options, configOpt, invalidOptions(List.of(alterOpt, createOpt)));
CommandLineUtils.checkInvalidArgs(parser, options, partitionsOpt, invalidOptions(List.of(alterOpt, createOpt)));
CommandLineUtils.checkInvalidArgs(parser, options, replicationFactorOpt, invalidOptions(List.of(createOpt)));
CommandLineUtils.checkInvalidArgs(parser, options, replicaAssignmentOpt, invalidOptions(List.of(alterOpt, createOpt)));
if (options.has(createOpt)) {
CommandLineUtils.checkInvalidArgs(parser, options, replicaAssignmentOpt, partitionsOpt, replicationFactorOpt);
}
CommandLineUtils.checkInvalidArgs(parser, options, reportUnderReplicatedPartitionsOpt,
invalidOptions(Collections.singleton(topicsWithOverridesOpt), Arrays.asList(describeOpt, reportUnderReplicatedPartitionsOpt)));
invalidOptions(Set.of(topicsWithOverridesOpt), List.of(describeOpt, reportUnderReplicatedPartitionsOpt)));
CommandLineUtils.checkInvalidArgs(parser, options, reportUnderMinIsrPartitionsOpt,
invalidOptions(Collections.singleton(topicsWithOverridesOpt), Arrays.asList(describeOpt, reportUnderMinIsrPartitionsOpt)));
invalidOptions(Set.of(topicsWithOverridesOpt), List.of(describeOpt, reportUnderMinIsrPartitionsOpt)));
CommandLineUtils.checkInvalidArgs(parser, options, reportAtMinIsrPartitionsOpt,
invalidOptions(Collections.singleton(topicsWithOverridesOpt), Arrays.asList(describeOpt, reportAtMinIsrPartitionsOpt)));
invalidOptions(Set.of(topicsWithOverridesOpt), List.of(describeOpt, reportAtMinIsrPartitionsOpt)));
CommandLineUtils.checkInvalidArgs(parser, options, reportUnavailablePartitionsOpt,
invalidOptions(Collections.singleton(topicsWithOverridesOpt), Arrays.asList(describeOpt, reportUnavailablePartitionsOpt)));
invalidOptions(Set.of(topicsWithOverridesOpt), List.of(describeOpt, reportUnavailablePartitionsOpt)));
CommandLineUtils.checkInvalidArgs(parser, options, topicsWithOverridesOpt,
invalidOptions(new HashSet<>(allReplicationReportOpts), Arrays.asList(describeOpt)));
invalidOptions(new HashSet<>(allReplicationReportOpts), List.of(describeOpt)));
CommandLineUtils.checkInvalidArgs(parser, options, ifExistsOpt,
invalidOptions(Arrays.asList(alterOpt, deleteOpt, describeOpt)));
CommandLineUtils.checkInvalidArgs(parser, options, ifNotExistsOpt, invalidOptions(Arrays.asList(createOpt)));
CommandLineUtils.checkInvalidArgs(parser, options, excludeInternalTopicOpt, invalidOptions(Arrays.asList(listOpt, describeOpt)));
invalidOptions(List.of(alterOpt, deleteOpt, describeOpt)));
CommandLineUtils.checkInvalidArgs(parser, options, ifNotExistsOpt, invalidOptions(List.of(createOpt)));
CommandLineUtils.checkInvalidArgs(parser, options, excludeInternalTopicOpt, invalidOptions(List.of(listOpt, describeOpt)));
}
private Set<OptionSpec<?>> invalidOptions(List<OptionSpec<?>> removeOptions) {

View File

@ -47,17 +47,16 @@ import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import static java.util.Collections.singleton;
import static net.sourceforge.argparse4j.impl.Arguments.store;
import static net.sourceforge.argparse4j.impl.Arguments.storeTrue;
@ -240,13 +239,13 @@ public class TransactionalMessageCopier {
if (offsetAndMetadata != null)
consumer.seek(tp, offsetAndMetadata.offset());
else
consumer.seekToBeginning(singleton(tp));
consumer.seekToBeginning(Set.of(tp));
});
}
private static long messagesRemaining(KafkaConsumer<String, String> consumer, TopicPartition partition) {
long currentPosition = consumer.position(partition);
Map<TopicPartition, Long> endOffsets = consumer.endOffsets(singleton(partition));
Map<TopicPartition, Long> endOffsets = consumer.endOffsets(Set.of(partition));
if (endOffsets.containsKey(partition)) {
return endOffsets.get(partition) - currentPosition;
}
@ -319,7 +318,7 @@ public class TransactionalMessageCopier {
final AtomicLong numMessagesProcessedSinceLastRebalance = new AtomicLong(0);
final AtomicLong totalMessageProcessed = new AtomicLong(0);
if (groupMode) {
consumer.subscribe(Collections.singleton(topicName), new ConsumerRebalanceListener() {
consumer.subscribe(Set.of(topicName), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
}
@ -341,7 +340,7 @@ public class TransactionalMessageCopier {
});
} else {
TopicPartition inputPartition = new TopicPartition(topicName, parsedArgs.getInt("inputPartition"));
consumer.assign(singleton(inputPartition));
consumer.assign(Set.of(inputPartition));
remainingMessages.set(Math.min(messagesRemaining(consumer, inputPartition), remainingMessages.get()));
}

View File

@ -50,7 +50,6 @@ import java.io.IOException;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@ -65,8 +64,6 @@ import java.util.function.Function;
import java.util.stream.Collectors;
import static java.util.Arrays.asList;
import static java.util.Collections.singleton;
import static java.util.Collections.singletonList;
import static net.sourceforge.argparse4j.impl.Arguments.store;
public abstract class TransactionsCommand {
@ -159,7 +156,7 @@ public abstract class TransactionsCommand {
) throws Exception {
final DescribeProducersResult.PartitionProducerState result;
try {
result = admin.describeProducers(singleton(topicPartition))
result = admin.describeProducers(Set.of(topicPartition))
.partitionResult(topicPartition)
.get();
} catch (ExecutionException e) {
@ -345,7 +342,7 @@ public abstract class TransactionsCommand {
final DescribeProducersResult.PartitionProducerState result;
try {
result = admin.describeProducers(singleton(topicPartition), options)
result = admin.describeProducers(Set.of(topicPartition), options)
.partitionResult(topicPartition)
.get();
} catch (ExecutionException e) {
@ -418,7 +415,7 @@ public abstract class TransactionsCommand {
final TransactionDescription result;
try {
result = admin.describeTransactions(singleton(transactionalId))
result = admin.describeTransactions(Set.of(transactionalId))
.description(transactionalId)
.get();
} catch (ExecutionException e) {
@ -451,7 +448,7 @@ public abstract class TransactionsCommand {
result.topicPartitions().stream().map(TopicPartition::toString).collect(Collectors.joining(","))
);
ToolsUtils.prettyPrintTable(HEADERS, singletonList(row), out);
ToolsUtils.prettyPrintTable(HEADERS, List.of(row), out);
}
}
@ -615,7 +612,7 @@ public abstract class TransactionsCommand {
);
if (candidates.isEmpty()) {
printHangingTransactions(Collections.emptyList(), out);
printHangingTransactions(List.of(), out);
} else {
Map<Long, List<OpenTransaction>> openTransactionsByProducerId = groupByProducerId(candidates);
@ -649,9 +646,9 @@ public abstract class TransactionsCommand {
if (topic.isPresent()) {
if (partition.isPresent()) {
return Collections.singletonList(new TopicPartition(topic.get(), partition.get()));
return List.of(new TopicPartition(topic.get(), partition.get()));
} else {
topics = Collections.singletonList(topic.get());
topics = List.of(topic.get());
}
} else {
topics = listTopics(admin);
@ -752,7 +749,7 @@ public abstract class TransactionsCommand {
} catch (ExecutionException e) {
printErrorAndExit("Failed to describe " + transactionalIds.size()
+ " transactions", e.getCause());
return Collections.emptyMap();
return Map.of();
}
}
@ -778,7 +775,7 @@ public abstract class TransactionsCommand {
return new ArrayList<>(admin.listTopics(listOptions).names().get());
} catch (ExecutionException e) {
printErrorAndExit("Failed to list topics", e.getCause());
return Collections.emptyList();
return List.of();
}
}
@ -788,14 +785,14 @@ public abstract class TransactionsCommand {
List<String> topics
) throws Exception {
List<TopicPartition> topicPartitions = new ArrayList<>();
consumeInBatches(topics, MAX_BATCH_SIZE, batch -> {
consumeInBatches(topics, MAX_BATCH_SIZE, batch ->
findTopicPartitions(
admin,
brokerId,
batch,
topicPartitions
);
});
)
);
return topicPartitions;
}
@ -807,13 +804,13 @@ public abstract class TransactionsCommand {
) throws Exception {
try {
Map<String, TopicDescription> topicDescriptions = admin.describeTopics(topics).allTopicNames().get();
topicDescriptions.forEach((topic, description) -> {
topicDescriptions.forEach((topic, description) ->
description.partitions().forEach(partitionInfo -> {
if (brokerId.isEmpty() || hasReplica(brokerId.get(), partitionInfo)) {
topicPartitions.add(new TopicPartition(topic, partitionInfo.partition()));
}
});
});
})
);
} catch (ExecutionException e) {
printErrorAndExit("Failed to describe " + topics.size() + " topics", e.getCause());
}
@ -838,15 +835,15 @@ public abstract class TransactionsCommand {
List<OpenTransaction> candidateTransactions = new ArrayList<>();
consumeInBatches(topicPartitions, MAX_BATCH_SIZE, batch -> {
consumeInBatches(topicPartitions, MAX_BATCH_SIZE, batch ->
collectCandidateOpenTransactions(
admin,
brokerId,
maxTransactionTimeoutMs,
batch,
candidateTransactions
);
});
)
);
return candidateTransactions;
}
@ -880,7 +877,7 @@ public abstract class TransactionsCommand {
long currentTimeMs = time.milliseconds();
producersByPartition.forEach((topicPartition, producersStates) -> {
producersByPartition.forEach((topicPartition, producersStates) ->
producersStates.activeProducers().forEach(activeProducer -> {
if (activeProducer.currentTransactionStartOffset().isPresent()) {
long transactionDurationMs = currentTimeMs - activeProducer.lastTimestamp();
@ -891,8 +888,8 @@ public abstract class TransactionsCommand {
));
}
}
});
});
})
);
} catch (ExecutionException e) {
printErrorAndExit("Failed to describe producers for " + topicPartitions.size() +
" partitions on broker " + brokerId, e.getCause());
@ -928,7 +925,7 @@ public abstract class TransactionsCommand {
} catch (ExecutionException e) {
printErrorAndExit("Failed to list transactions for " + producerIds.size() +
" producers", e.getCause());
return Collections.emptyMap();
return Map.of();
}
}

View File

@ -58,7 +58,6 @@ import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -231,7 +230,7 @@ public class VerifiableConsumer implements Closeable, OffsetCommitCallback, Cons
public void run() {
try {
printJson(new StartupComplete());
consumer.subscribe(Collections.singletonList(topic), this);
consumer.subscribe(List.of(topic), this);
while (!isFinished()) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(Long.MAX_VALUE));
@ -623,7 +622,7 @@ public class VerifiableConsumer implements Closeable, OffsetCommitCallback, Cons
boolean useAutoCommit = res.getBoolean("useAutoCommit");
String configFile = res.getString("consumer.config");
String brokerHostandPort = res.getString("bootstrapServer");
String brokerHostAndPort = res.getString("bootstrapServer");
Properties consumerProps = new Properties();
if (configFile != null) {
@ -664,7 +663,7 @@ public class VerifiableConsumer implements Closeable, OffsetCommitCallback, Cons
consumerProps.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, groupInstanceId);
}
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerHostandPort);
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerHostAndPort);
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, useAutoCommit);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, res.getString("resetPolicy"));

View File

@ -61,7 +61,6 @@ import java.io.PrintStream;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
@ -421,7 +420,7 @@ public class VerifiableShareConsumer implements Closeable, AcknowledgementCommit
printJson(new OffsetResetStrategySet(offsetResetStrategy.type().toString()));
}
consumer.subscribe(Collections.singleton(this.topic));
consumer.subscribe(Set.of(this.topic));
consumer.setAcknowledgementCommitCallback(this);
while (!(maxMessages >= 0 && totalAcknowledged >= maxMessages)) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(5000));

View File

@ -37,6 +37,7 @@ import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
@ -160,7 +161,7 @@ public class ConsoleConsumer {
if (opts.partitionArg().isPresent()) {
seek(topic.get(), opts.partitionArg().getAsInt(), opts.offsetArg());
} else {
consumer.subscribe(Collections.singletonList(topic.get()));
consumer.subscribe(List.of(topic.get()));
}
} else {
opts.includedTopicsArg().ifPresent(topics -> consumer.subscribe(Pattern.compile(topics)));
@ -169,11 +170,11 @@ public class ConsoleConsumer {
private void seek(String topic, int partitionId, long offset) {
TopicPartition topicPartition = new TopicPartition(topic, partitionId);
consumer.assign(Collections.singletonList(topicPartition));
consumer.assign(List.of(topicPartition));
if (offset == ListOffsetsRequest.EARLIEST_TIMESTAMP) {
consumer.seekToBeginning(Collections.singletonList(topicPartition));
consumer.seekToBeginning(List.of(topicPartition));
} else if (offset == ListOffsetsRequest.LATEST_TIMESTAMP) {
consumer.seekToEnd(Collections.singletonList(topicPartition));
consumer.seekToEnd(List.of(topicPartition));
} else {
consumer.seek(topicPartition, offset);
}

View File

@ -26,7 +26,6 @@ import org.apache.kafka.server.util.CommandLineUtils;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@ -186,9 +185,9 @@ public final class ConsoleConsumerOptions extends CommandDefaultOptions {
}
private void checkRequiredArgs() {
List<Optional<String>> topicOrFilterArgs = new ArrayList<>(Arrays.asList(topicArg(), includedTopicsArg()));
topicOrFilterArgs.removeIf(arg -> arg.isEmpty());
// user need to specify value for either --topic or --include options)
List<Optional<String>> topicOrFilterArgs = new ArrayList<>(List.of(topicArg(), includedTopicsArg()));
topicOrFilterArgs.removeIf(Optional::isEmpty);
// user need to specify value for either --topic or --include options
if (topicOrFilterArgs.size() != 1) {
CommandLineUtils.printUsageAndExit(parser, "Exactly one of --include/--topic is required. ");
}

View File

@ -36,6 +36,7 @@ import java.io.PrintStream;
import java.time.Duration;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
@ -166,7 +167,7 @@ public class ConsoleShareConsumer {
this.consumer = consumer;
this.timeoutMs = timeoutMs;
consumer.subscribe(Collections.singletonList(topic));
consumer.subscribe(List.of(topic));
}
ConsumerRecord<byte[], byte[]> receive() {

View File

@ -58,7 +58,6 @@ import java.util.AbstractMap.SimpleImmutableEntry;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
@ -121,7 +120,7 @@ public class ConsumerGroupCommand {
return;
}
try (ConsumerGroupService consumerGroupService = new ConsumerGroupService(opts, Collections.emptyMap())) {
try (ConsumerGroupService consumerGroupService = new ConsumerGroupService(opts, Map.of())) {
if (opts.options.has(opts.listOpt))
consumerGroupService.listGroups();
else if (opts.options.has(opts.describeOpt))
@ -242,14 +241,14 @@ public class ConsumerGroupCommand {
private Set<GroupState> stateValues() {
String stateValue = opts.options.valueOf(opts.stateOpt);
return (stateValue == null || stateValue.isEmpty())
? Collections.emptySet()
? Set.of()
: groupStatesFromString(stateValue);
}
private Set<GroupType> typeValues() {
String typeValue = opts.options.valueOf(opts.typeOpt);
return (typeValue == null || typeValue.isEmpty())
? Collections.emptySet()
? Set.of()
: consumerGroupTypesFromString(typeValue);
}
@ -585,7 +584,7 @@ public class ConsumerGroupCommand {
Optional<String> clientIdOpt
) {
if (topicPartitions.isEmpty()) {
return Collections.singleton(
return Set.of(
new PartitionAssignmentState(group, coordinator, Optional.empty(), Optional.empty(), Optional.empty(),
getLag(Optional.empty(), Optional.empty()), consumerIdOpt, hostOpt, clientIdOpt, Optional.empty(), Optional.empty())
);
@ -684,7 +683,7 @@ public class ConsumerGroupCommand {
break;
default:
printError("Assignments can only be reset if the group '" + groupId + "' is inactive, but the current state is " + state + ".", Optional.empty());
result.put(groupId, Collections.emptyMap());
result.put(groupId, Map.of());
}
} catch (InterruptedException ie) {
throw new RuntimeException(ie);
@ -855,7 +854,7 @@ public class ConsumerGroupCommand {
* Returns the state of the specified consumer group and partition assignment states
*/
Entry<Optional<GroupState>, Optional<Collection<PartitionAssignmentState>>> collectGroupOffsets(String groupId) throws Exception {
return collectGroupsOffsets(Collections.singletonList(groupId)).getOrDefault(groupId, new SimpleImmutableEntry<>(Optional.empty(), Optional.empty()));
return collectGroupsOffsets(List.of(groupId)).getOrDefault(groupId, new SimpleImmutableEntry<>(Optional.empty(), Optional.empty()));
}
/**
@ -899,7 +898,7 @@ public class ConsumerGroupCommand {
Optional.of(MISSING_COLUMN_VALUE),
Optional.of(MISSING_COLUMN_VALUE),
Optional.of(MISSING_COLUMN_VALUE))
: Collections.emptyList();
: List.of();
rowsWithConsumer.addAll(rowsWithoutConsumer);
@ -910,7 +909,7 @@ public class ConsumerGroupCommand {
}
Entry<Optional<GroupState>, Optional<Collection<MemberAssignmentState>>> collectGroupMembers(String groupId) throws Exception {
return collectGroupsMembers(Collections.singleton(groupId)).get(groupId);
return collectGroupsMembers(Set.of(groupId)).get(groupId);
}
TreeMap<String, Entry<Optional<GroupState>, Optional<Collection<MemberAssignmentState>>>> collectGroupsMembers(Collection<String> groupIds) throws Exception {
@ -928,7 +927,7 @@ public class ConsumerGroupCommand {
consumer.groupInstanceId().orElse(""),
consumer.assignment().topicPartitions().size(),
consumer.assignment().topicPartitions().stream().toList(),
consumer.targetAssignment().map(a -> a.topicPartitions().stream().toList()).orElse(Collections.emptyList()),
consumer.targetAssignment().map(a -> a.topicPartitions().stream().toList()).orElse(List.of()),
consumer.memberEpoch(),
consumerGroup.targetAssignmentEpoch(),
consumer.upgraded()
@ -939,7 +938,7 @@ public class ConsumerGroupCommand {
}
GroupInformation collectGroupState(String groupId) throws Exception {
return collectGroupsState(Collections.singleton(groupId)).get(groupId);
return collectGroupsState(Set.of(groupId)).get(groupId);
}
TreeMap<String, GroupInformation> collectGroupsState(Collection<String> groupIds) throws Exception {
@ -986,14 +985,14 @@ public class ConsumerGroupCommand {
if (!opts.options.has(opts.resetFromFileOpt))
CommandLineUtils.printUsageAndExit(opts.parser, "One of the reset scopes should be defined: --all-topics, --topic.");
return Collections.emptyList();
return List.of();
}
}
private Map<TopicPartition, OffsetAndMetadata> getCommittedOffsets(String groupId) {
try {
return adminClient.listConsumerGroupOffsets(
Collections.singletonMap(groupId, new ListConsumerGroupOffsetsSpec()),
Map.of(groupId, new ListConsumerGroupOffsetsSpec()),
withTimeoutMs(new ListConsumerGroupOffsetsOptions())
).partitionsToOffsetAndMetadata(groupId).get();
} catch (InterruptedException | ExecutionException e) {

View File

@ -22,8 +22,6 @@ import org.apache.kafka.server.util.CommandLineUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
@ -203,11 +201,11 @@ public class ConsumerGroupCommandOptions extends CommandDefaultOptions {
.describedAs("regex")
.ofType(String.class);
allGroupSelectionScopeOpts = new HashSet<>(Arrays.asList(groupOpt, allGroupsOpt));
allConsumerGroupLevelOpts = new HashSet<>(Arrays.asList(listOpt, describeOpt, deleteOpt, resetOffsetsOpt));
allResetOffsetScenarioOpts = new HashSet<>(Arrays.asList(resetToOffsetOpt, resetShiftByOpt,
resetToDatetimeOpt, resetByDurationOpt, resetToEarliestOpt, resetToLatestOpt, resetToCurrentOpt, resetFromFileOpt));
allDeleteOffsetsOpts = new HashSet<>(Arrays.asList(groupOpt, topicOpt));
allGroupSelectionScopeOpts = Set.of(groupOpt, allGroupsOpt);
allConsumerGroupLevelOpts = Set.of(listOpt, describeOpt, deleteOpt, resetOffsetsOpt);
allResetOffsetScenarioOpts = Set.of(resetToOffsetOpt, resetShiftByOpt,
resetToDatetimeOpt, resetByDurationOpt, resetToEarliestOpt, resetToLatestOpt, resetToCurrentOpt, resetFromFileOpt);
allDeleteOffsetsOpts = Set.of(groupOpt, topicOpt);
options = parser.parse(args);
}
@ -224,7 +222,7 @@ public class ConsumerGroupCommandOptions extends CommandDefaultOptions {
if (!options.has(groupOpt) && !options.has(allGroupsOpt))
CommandLineUtils.printUsageAndExit(parser,
"Option " + describeOpt + " takes one of these options: " + allGroupSelectionScopeOpts.stream().map(Object::toString).collect(Collectors.joining(", ")));
List<OptionSpec<?>> mutuallyExclusiveOpts = Arrays.asList(membersOpt, offsetsOpt, stateOpt);
List<OptionSpec<?>> mutuallyExclusiveOpts = List.of(membersOpt, offsetsOpt, stateOpt);
if (mutuallyExclusiveOpts.stream().mapToInt(o -> options.has(o) ? 1 : 0).sum() > 1) {
CommandLineUtils.printUsageAndExit(parser,
"Option " + describeOpt + " takes at most one of these options: " + mutuallyExclusiveOpts.stream().map(Object::toString).collect(Collectors.joining(", ")));

View File

@ -387,7 +387,7 @@ public class ShareGroupCommand {
TreeMap<String, ShareGroupDescription> collectGroupsDescription(Collection<String> groupIds) throws ExecutionException, InterruptedException {
Map<String, ShareGroupDescription> shareGroups = describeShareGroups(groupIds);
TreeMap<String, ShareGroupDescription> res = new TreeMap<>();
shareGroups.forEach(res::put);
res.putAll(shareGroups);
return res;
}

View File

@ -22,8 +22,6 @@ import org.apache.kafka.server.util.CommandLineUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
@ -144,10 +142,10 @@ public class ShareGroupCommandOptions extends CommandDefaultOptions {
verboseOpt = parser.accepts("verbose", VERBOSE_DOC)
.availableIf(describeOpt);
allGroupSelectionScopeOpts = new HashSet<>(Arrays.asList(groupOpt, allGroupsOpt));
allShareGroupLevelOpts = new HashSet<>(Arrays.asList(listOpt, describeOpt, deleteOpt, resetOffsetsOpt));
allResetOffsetScenarioOpts = new HashSet<>(Arrays.asList(resetToDatetimeOpt, resetToEarliestOpt, resetToLatestOpt));
allDeleteOffsetsOpts = new HashSet<>(Arrays.asList(groupOpt, topicOpt));
allGroupSelectionScopeOpts = Set.of(groupOpt, allGroupsOpt);
allShareGroupLevelOpts = Set.of(listOpt, describeOpt, deleteOpt, resetOffsetsOpt);
allResetOffsetScenarioOpts = Set.of(resetToDatetimeOpt, resetToEarliestOpt, resetToLatestOpt);
allDeleteOffsetsOpts = Set.of(groupOpt, topicOpt);
options = parser.parse(args);
}
@ -162,7 +160,7 @@ public class ShareGroupCommandOptions extends CommandDefaultOptions {
if (!options.has(groupOpt) && !options.has(allGroupsOpt))
CommandLineUtils.printUsageAndExit(parser,
"Option " + describeOpt + " takes one of these options: " + allGroupSelectionScopeOpts.stream().map(Object::toString).collect(Collectors.joining(", ")));
List<OptionSpec<?>> mutuallyExclusiveOpts = Arrays.asList(membersOpt, offsetsOpt, stateOpt);
List<OptionSpec<?>> mutuallyExclusiveOpts = List.of(membersOpt, offsetsOpt, stateOpt);
if (mutuallyExclusiveOpts.stream().mapToInt(o -> options.has(o) ? 1 : 0).sum() > 1) {
CommandLineUtils.printUsageAndExit(parser,
"Option " + describeOpt + " takes at most one of these options: " + mutuallyExclusiveOpts.stream().map(Object::toString).collect(Collectors.joining(", ")));

View File

@ -226,7 +226,7 @@ public class ReassignPartitionsCommand {
if (!partsOngoing && !movesOngoing && !preserveThrottles) {
// If the partition assignments and replica assignments are done, clear any throttles
// that were set. We have to clear all throttles, because we don't have enough
// information to know all of the source brokers that might have been involved in the
// information to know all the source brokers that might have been involved in the
// previous reassignments.
clearAllThrottles(adminClient, targetParts);
}
@ -849,7 +849,7 @@ public class ReassignPartitionsCommand {
.map(Object::toString)
.collect(Collectors.joining(","))));
} else {
// If a replica has been moved to a new host and we also specified a particular
// If a replica has been moved to a new host, and we also specified a particular
// log directory, we will have to keep retrying the alterReplicaLogDirs
// call. It can't take effect until the replica is moved to that host.
time.sleep(100);
@ -1342,21 +1342,18 @@ public class ReassignPartitionsCommand {
}
private static List<String> parseTopicsData(int version, JsonValue js) throws JsonMappingException {
switch (version) {
case 1:
List<String> results = new ArrayList<>();
Optional<JsonValue> partitionsSeq = js.asJsonObject().get("topics");
if (partitionsSeq.isPresent()) {
Iterator<JsonValue> iter = partitionsSeq.get().asJsonArray().iterator();
while (iter.hasNext()) {
results.add(iter.next().asJsonObject().apply("topic").to(STRING));
}
if (version == 1) {
List<String> results = new ArrayList<>();
Optional<JsonValue> partitionsSeq = js.asJsonObject().get("topics");
if (partitionsSeq.isPresent()) {
Iterator<JsonValue> iter = partitionsSeq.get().asJsonArray().iterator();
while (iter.hasNext()) {
results.add(iter.next().asJsonObject().apply("topic").to(STRING));
}
return results;
default:
throw new AdminOperationException("Not supported version field value " + version);
}
return results;
}
throw new AdminOperationException("Not supported version field value " + version);
}
private static Entry<List<Entry<TopicPartition, List<Integer>>>, Map<TopicPartitionReplica, String>> parsePartitionReassignmentData(
@ -1376,46 +1373,43 @@ public class ReassignPartitionsCommand {
private static Entry<List<Entry<TopicPartition, List<Integer>>>, Map<TopicPartitionReplica, String>> parsePartitionReassignmentData(
int version, JsonValue jsonData
) throws JsonMappingException {
switch (version) {
case 1:
List<Entry<TopicPartition, List<Integer>>> partitionAssignment = new ArrayList<>();
Map<TopicPartitionReplica, String> replicaAssignment = new HashMap<>();
if (version == 1) {
List<Entry<TopicPartition, List<Integer>>> partitionAssignment = new ArrayList<>();
Map<TopicPartitionReplica, String> replicaAssignment = new HashMap<>();
Optional<JsonValue> partitionsSeq = jsonData.asJsonObject().get("partitions");
if (partitionsSeq.isPresent()) {
Iterator<JsonValue> iter = partitionsSeq.get().asJsonArray().iterator();
while (iter.hasNext()) {
JsonObject partitionFields = iter.next().asJsonObject();
String topic = partitionFields.apply("topic").to(STRING);
int partition = partitionFields.apply("partition").to(INT);
List<Integer> newReplicas = partitionFields.apply("replicas").to(INT_LIST);
Optional<JsonValue> logDirsOpts = partitionFields.get("log_dirs");
List<String> newLogDirs;
if (logDirsOpts.isPresent())
newLogDirs = logDirsOpts.get().to(STRING_LIST);
else
newLogDirs = newReplicas.stream().map(r -> ANY_LOG_DIR).collect(Collectors.toList());
if (newReplicas.size() != newLogDirs.size())
throw new AdminCommandFailedException("Size of replicas list " + newReplicas + " is different from " +
"size of log dirs list " + newLogDirs + " for partition " + new TopicPartition(topic, partition));
partitionAssignment.add(Map.entry(new TopicPartition(topic, partition), newReplicas));
for (int i = 0; i < newLogDirs.size(); i++) {
Integer replica = newReplicas.get(i);
String logDir = newLogDirs.get(i);
Optional<JsonValue> partitionsSeq = jsonData.asJsonObject().get("partitions");
if (partitionsSeq.isPresent()) {
Iterator<JsonValue> iter = partitionsSeq.get().asJsonArray().iterator();
while (iter.hasNext()) {
JsonObject partitionFields = iter.next().asJsonObject();
String topic = partitionFields.apply("topic").to(STRING);
int partition = partitionFields.apply("partition").to(INT);
List<Integer> newReplicas = partitionFields.apply("replicas").to(INT_LIST);
Optional<JsonValue> logDirsOpts = partitionFields.get("log_dirs");
List<String> newLogDirs;
if (logDirsOpts.isPresent())
newLogDirs = logDirsOpts.get().to(STRING_LIST);
else
newLogDirs = newReplicas.stream().map(r -> ANY_LOG_DIR).collect(Collectors.toList());
if (newReplicas.size() != newLogDirs.size())
throw new AdminCommandFailedException("Size of replicas list " + newReplicas + " is different from " +
"size of log dirs list " + newLogDirs + " for partition " + new TopicPartition(topic, partition));
partitionAssignment.add(Map.entry(new TopicPartition(topic, partition), newReplicas));
for (int i = 0; i < newLogDirs.size(); i++) {
Integer replica = newReplicas.get(i);
String logDir = newLogDirs.get(i);
if (logDir.equals(ANY_LOG_DIR))
continue;
if (logDir.equals(ANY_LOG_DIR))
continue;
replicaAssignment.put(new TopicPartitionReplica(topic, partition, replica), logDir);
}
replicaAssignment.put(new TopicPartitionReplica(topic, partition, replica), logDir);
}
}
}
return Map.entry(partitionAssignment, replicaAssignment);
default:
throw new AdminOperationException("Not supported version field value " + version);
return Map.entry(partitionAssignment, replicaAssignment);
}
throw new AdminOperationException("Not supported version field value " + version);
}
static ReassignPartitionsCommandOptions validateAndParseArgs(String[] args) {

View File

@ -20,7 +20,6 @@ package org.apache.kafka.tools.reassign;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionReplica;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
@ -34,7 +33,7 @@ public final class VerifyAssignmentResult {
public final boolean movesOngoing;
public VerifyAssignmentResult(Map<TopicPartition, PartitionReassignmentState> partStates) {
this(partStates, false, Collections.emptyMap(), false);
this(partStates, false, Map.of(), false);
}
/**

View File

@ -61,7 +61,6 @@ import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
@ -457,7 +456,7 @@ public class StreamsGroupCommand {
boolean allPresent = topicPartitions.stream().allMatch(allTopicPartitions::containsKey);
if (!allPresent) {
printError("One or more topics are not part of the group '" + groupId + "'.", Optional.empty());
return Collections.emptyList();
return List.of();
}
return topicPartitions;
} catch (InterruptedException | ExecutionException e) {
@ -508,7 +507,7 @@ public class StreamsGroupCommand {
break;
default:
printError("Assignments can only be reset if the group '" + groupId + "' is inactive, but the current state is " + state + ".", Optional.empty());
result.put(groupId, Collections.emptyMap());
result.put(groupId, Map.of());
}
} catch (InterruptedException ie) {
throw new RuntimeException(ie);
@ -889,7 +888,7 @@ public class StreamsGroupCommand {
if (!opts.options.has(opts.resetFromFileOpt))
CommandLineUtils.printUsageAndExit(opts.parser, "One of the reset scopes should be defined: --all-topics, --topic.");
return Collections.emptyList();
return List.of();
}
}

View File

@ -22,8 +22,6 @@ import org.apache.kafka.server.util.CommandLineUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
@ -200,12 +198,12 @@ public class StreamsGroupCommandOptions extends CommandDefaultOptions {
exportOpt = parser.accepts("export", EXPORT_DOC);
options = parser.parse(args);
allResetOffsetScenarioOpts = new HashSet<>(Arrays.asList(resetToOffsetOpt, resetShiftByOpt,
resetToDatetimeOpt, resetByDurationOpt, resetToEarliestOpt, resetToLatestOpt, resetToCurrentOpt, resetFromFileOpt));
allGroupSelectionScopeOpts = new HashSet<>(Arrays.asList(groupOpt, allGroupsOpt));
allStreamsGroupLevelOpts = new HashSet<>(Arrays.asList(listOpt, describeOpt, deleteOpt));
allDeleteOffsetsOpts = new HashSet<>(Arrays.asList(inputTopicOpt, allInputTopicsOpt));
allDeleteInternalGroupsOpts = new HashSet<>(Arrays.asList(resetOffsetsOpt, deleteOpt));
allResetOffsetScenarioOpts = Set.of(resetToOffsetOpt, resetShiftByOpt,
resetToDatetimeOpt, resetByDurationOpt, resetToEarliestOpt, resetToLatestOpt, resetToCurrentOpt, resetFromFileOpt);
allGroupSelectionScopeOpts = Set.of(groupOpt, allGroupsOpt);
allStreamsGroupLevelOpts = Set.of(listOpt, describeOpt, deleteOpt);
allDeleteOffsetsOpts = Set.of(inputTopicOpt, allInputTopicsOpt);
allDeleteInternalGroupsOpts = Set.of(resetOffsetsOpt, deleteOpt);
}
@SuppressWarnings("NPathComplexity")
@ -256,7 +254,7 @@ public class StreamsGroupCommandOptions extends CommandDefaultOptions {
if (!options.has(groupOpt) && !options.has(allGroupsOpt))
CommandLineUtils.printUsageAndExit(parser,
"Option " + describeOpt + " takes one of these options: " + allGroupSelectionScopeOpts.stream().map(Object::toString).collect(Collectors.joining(", ")));
List<OptionSpec<?>> mutuallyExclusiveOpts = Arrays.asList(membersOpt, offsetsOpt, stateOpt);
List<OptionSpec<?>> mutuallyExclusiveOpts = List.of(membersOpt, offsetsOpt, stateOpt);
if (mutuallyExclusiveOpts.stream().mapToInt(o -> options.has(o) ? 1 : 0).sum() > 1) {
CommandLineUtils.printUsageAndExit(parser,
"Option " + describeOpt + " takes at most one of these options: " + mutuallyExclusiveOpts.stream().map(Object::toString).collect(Collectors.joining(", ")));