MINOR: Cleanups in Test Common Module (#19775)
CI / build (push) Waiting to run Details

Now that Kafka Brokers support Java 17, this PR makes some changes in
test-common module. The changes mostly include:
- Collections.emptyList(), Collections.singletonList() and
Arrays.asList() are replaced with List.of()
- Collections.emptyMap() and Collections.singletonMap() are replaced
with Map.of()
- Collections.singleton() is replaced with Set.of()

Reviewers: Ken Huang <s7133700@gmail.com>, Chia-Ping Tsai
 <chia7712@gmail.com>
This commit is contained in:
Sanskar Jhajharia 2025-05-26 00:37:30 +05:30 committed by GitHub
parent 651f86b77e
commit 2fe447a8a3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
17 changed files with 94 additions and 111 deletions

View File

@ -207,10 +207,10 @@ public class ClusterConfig {
private ListenerName controllerListenerName; private ListenerName controllerListenerName;
private File trustStoreFile; private File trustStoreFile;
private MetadataVersion metadataVersion; private MetadataVersion metadataVersion;
private Map<String, String> serverProperties = Collections.emptyMap(); private Map<String, String> serverProperties = Map.of();
private Map<Integer, Map<String, String>> perServerProperties = Collections.emptyMap(); private Map<Integer, Map<String, String>> perServerProperties = Map.of();
private List<String> tags = Collections.emptyList(); private List<String> tags = List.of();
private Map<Feature, Short> features = Collections.emptyMap(); private Map<Feature, Short> features = Map.of();
private Builder() {} private Builder() {}

View File

@ -44,7 +44,7 @@ import static java.lang.annotation.RetentionPolicy.RUNTIME;
* Usage looks something like this: * Usage looks something like this:
* <pre>{@code * <pre>{@code
* private static List<ClusterConfig> generator() { * private static List<ClusterConfig> generator() {
* return Collections.singletonList(ClusterConfig.defaultBuilder().build()); * return List.of(ClusterConfig.defaultBuilder().build());
* } * }
* *
* @ClusterTemplate("generator") * @ClusterTemplate("generator")

View File

@ -79,7 +79,7 @@ references a static method on the test class. This method is used to produce any
number of test configurations using a fluent builder style API. number of test configurations using a fluent builder style API.
```java ```java
import java.util.Arrays; import java.util.List;
@ClusterTemplate("generateConfigs") @ClusterTemplate("generateConfigs")
void testSomething() { ... } void testSomething() { ... }
@ -99,7 +99,7 @@ static List<ClusterConfig> generateConfigs() {
.name("Generated Test 3") .name("Generated Test 3")
.serverProperties(props3) .serverProperties(props3)
.build(); .build();
return Arrays.asList(config1, config2, config3); return List.of(config1, config2, config3);
} }
``` ```

View File

@ -29,7 +29,6 @@ import java.io.IOException;
import java.lang.reflect.Field; import java.lang.reflect.Field;
import java.nio.file.Files; import java.nio.file.Files;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
@ -55,20 +54,20 @@ public class ClusterConfigTest {
trustStoreFile.deleteOnExit(); trustStoreFile.deleteOnExit();
ClusterConfig clusterConfig = ClusterConfig.builder() ClusterConfig clusterConfig = ClusterConfig.builder()
.setTypes(Collections.singleton(Type.KRAFT)) .setTypes(Set.of(Type.KRAFT))
.setBrokers(3) .setBrokers(3)
.setControllers(2) .setControllers(2)
.setDisksPerBroker(1) .setDisksPerBroker(1)
.setAutoStart(true) .setAutoStart(true)
.setTags(Arrays.asList("name", "Generated Test")) .setTags(List.of("name", "Generated Test"))
.setBrokerSecurityProtocol(SecurityProtocol.PLAINTEXT) .setBrokerSecurityProtocol(SecurityProtocol.PLAINTEXT)
.setBrokerListenerName(ListenerName.normalised("EXTERNAL")) .setBrokerListenerName(ListenerName.normalised("EXTERNAL"))
.setControllerSecurityProtocol(SecurityProtocol.SASL_PLAINTEXT) .setControllerSecurityProtocol(SecurityProtocol.SASL_PLAINTEXT)
.setControllerListenerName(ListenerName.normalised("CONTROLLER")) .setControllerListenerName(ListenerName.normalised("CONTROLLER"))
.setTrustStoreFile(trustStoreFile) .setTrustStoreFile(trustStoreFile)
.setMetadataVersion(MetadataVersion.MINIMUM_VERSION) .setMetadataVersion(MetadataVersion.MINIMUM_VERSION)
.setServerProperties(Collections.singletonMap("broker", "broker_value")) .setServerProperties(Map.of("broker", "broker_value"))
.setPerServerProperties(Collections.singletonMap(0, Collections.singletonMap("broker_0", "broker_0_value"))) .setPerServerProperties(Map.of(0, Map.of("broker_0", "broker_0_value")))
.build(); .build();
Map<String, Object> clusterConfigFields = fields(clusterConfig); Map<String, Object> clusterConfigFields = fields(clusterConfig);
@ -105,7 +104,7 @@ public class ClusterConfigTest {
@Test @Test
public void testDisplayTags() { public void testDisplayTags() {
List<String> tags = Arrays.asList("tag 1", "tag 2", "tag 3"); List<String> tags = List.of("tag 1", "tag 2", "tag 3");
ClusterConfig clusterConfig = ClusterConfig.defaultBuilder().setTags(tags).build(); ClusterConfig clusterConfig = ClusterConfig.defaultBuilder().setTags(tags).build();
Set<String> expectedDisplayTags = clusterConfig.displayTags(); Set<String> expectedDisplayTags = clusterConfig.displayTags();

View File

@ -57,7 +57,6 @@ import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
@ -242,7 +241,7 @@ public interface ClusterInstance {
if (brokers().values().stream().allMatch(b -> b.dataPlaneRequestProcessor().isConsumerGroupProtocolEnabled())) { if (brokers().values().stream().allMatch(b -> b.dataPlaneRequestProcessor().isConsumerGroupProtocolEnabled())) {
return Set.of(CLASSIC, CONSUMER); return Set.of(CLASSIC, CONSUMER);
} else { } else {
return Collections.singleton(CLASSIC); return Set.of(CLASSIC);
} }
} }

View File

@ -60,7 +60,6 @@ import java.nio.file.Paths;
import java.util.AbstractMap.SimpleImmutableEntry; import java.util.AbstractMap.SimpleImmutableEntry;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -251,7 +250,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
socketFactoryManager.getOrCreatePortForListener(node.id(), brokerListenerName); socketFactoryManager.getOrCreatePortForListener(node.id(), brokerListenerName);
} }
for (TestKitNode node : nodes.controllerNodes().values()) { for (TestKitNode node : nodes.controllerNodes().values()) {
setupNodeDirectories(baseDirectory, node.metadataDirectory(), Collections.emptyList()); setupNodeDirectories(baseDirectory, node.metadataDirectory(), List.of());
KafkaConfig config = createNodeConfig(node); KafkaConfig config = createNodeConfig(node);
SharedServer sharedServer = new SharedServer( SharedServer sharedServer = new SharedServer(
config, config,
@ -259,7 +258,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
Time.SYSTEM, Time.SYSTEM,
new Metrics(), new Metrics(),
CompletableFuture.completedFuture(QuorumConfig.parseVoterConnections(config.quorumConfig().voters())), CompletableFuture.completedFuture(QuorumConfig.parseVoterConnections(config.quorumConfig().voters())),
Collections.emptyList(), List.of(),
faultHandlerFactory, faultHandlerFactory,
socketFactoryManager.getOrCreateSocketFactory(node.id()) socketFactoryManager.getOrCreateSocketFactory(node.id())
); );
@ -287,7 +286,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
Time.SYSTEM, Time.SYSTEM,
new Metrics(), new Metrics(),
CompletableFuture.completedFuture(QuorumConfig.parseVoterConnections(config.quorumConfig().voters())), CompletableFuture.completedFuture(QuorumConfig.parseVoterConnections(config.quorumConfig().voters())),
Collections.emptyList(), List.of(),
faultHandlerFactory, faultHandlerFactory,
socketFactoryManager.getOrCreateSocketFactory(node.id()) socketFactoryManager.getOrCreateSocketFactory(node.id())
); );

View File

@ -70,7 +70,6 @@ import org.apache.kafka.server.authorizer.AclDeleteResult;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -388,7 +387,7 @@ public class MockController implements Controller {
configs.computeIfAbsent(resource, __ -> new HashMap<>()).put(key, value); configs.computeIfAbsent(resource, __ -> new HashMap<>()).put(key, value);
break; break;
case DELETE: case DELETE:
configs.getOrDefault(resource, Collections.emptyMap()).remove(key); configs.getOrDefault(resource, Map.of()).remove(key);
break; break;
default: default:
break; break;

View File

@ -23,7 +23,6 @@ import org.apache.kafka.server.ServerSocketFactory;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.nio.channels.ServerSocketChannel; import java.nio.channels.ServerSocketChannel;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.Map; import java.util.Map;
@ -174,7 +173,7 @@ public class PreboundSocketFactoryManager implements AutoCloseable {
// SocketServer.) // SocketServer.)
for (Entry<Integer, Map<String, ServerSocketChannel>> socketsEntry : sockets.entrySet()) { for (Entry<Integer, Map<String, ServerSocketChannel>> socketsEntry : sockets.entrySet()) {
Set<String> usedListeners = usedSockets.getOrDefault( Set<String> usedListeners = usedSockets.getOrDefault(
socketsEntry.getKey(), Collections.emptySet()); socketsEntry.getKey(), Set.of());
for (Entry<String, ServerSocketChannel> entry : socketsEntry.getValue().entrySet()) { for (Entry<String, ServerSocketChannel> entry : socketsEntry.getValue().entrySet()) {
if (!usedListeners.contains(entry.getKey())) { if (!usedListeners.contains(entry.getKey())) {
Utils.closeQuietly(entry.getValue(), "serverSocketChannel"); Utils.closeQuietly(entry.getValue(), "serverSocketChannel");

View File

@ -52,7 +52,7 @@ public class TestKitNodes {
private int numControllerNodes; private int numControllerNodes;
private int numBrokerNodes; private int numBrokerNodes;
private int numDisksPerBroker = 1; private int numDisksPerBroker = 1;
private Map<Integer, Map<String, String>> perServerProperties = Collections.emptyMap(); private Map<Integer, Map<String, String>> perServerProperties = Map.of();
private BootstrapMetadata bootstrapMetadata; private BootstrapMetadata bootstrapMetadata;
public Builder() { public Builder() {
@ -201,7 +201,7 @@ public class TestKitNodes {
baseDirectory.toFile().getAbsolutePath(), baseDirectory.toFile().getAbsolutePath(),
clusterId, clusterId,
brokerNodeIds.contains(id), brokerNodeIds.contains(id),
perServerProperties.getOrDefault(id, Collections.emptyMap()) perServerProperties.getOrDefault(id, Map.of())
); );
controllerNodes.put(id, controllerNode); controllerNodes.put(id, controllerNode);
} }
@ -213,7 +213,7 @@ public class TestKitNodes {
baseDirectory.toFile().getAbsolutePath(), baseDirectory.toFile().getAbsolutePath(),
clusterId, clusterId,
controllerNodeIds.contains(id), controllerNodeIds.contains(id),
perServerProperties.getOrDefault(id, Collections.emptyMap()), perServerProperties.getOrDefault(id, Map.of()),
numDisksPerBroker numDisksPerBroker
); );
brokerNodes.put(id, brokerNode); brokerNodes.put(id, brokerNode);

View File

@ -32,7 +32,7 @@ import org.slf4j.LoggerFactory;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.nio.file.Files; import java.nio.file.Files;
import java.util.Collections; import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.Random; import java.util.Random;
import java.util.function.BiFunction; import java.util.function.BiFunction;
@ -180,7 +180,7 @@ public class TestUtils {
} }
private static Integer getLeaderFromAdmin(Admin admin, String topic, int partition) throws Exception { private static Integer getLeaderFromAdmin(Admin admin, String topic, int partition) throws Exception {
TopicDescription topicDescription = admin.describeTopics(Collections.singletonList(topic)).allTopicNames().get().get(topic); TopicDescription topicDescription = admin.describeTopics(List.of(topic)).allTopicNames().get().get(topic);
return topicDescription.partitions().stream() return topicDescription.partitions().stream()
.filter(partitionInfo -> partitionInfo.partition() == partition) .filter(partitionInfo -> partitionInfo.partition() == partition)
.findFirst() .findFirst()

View File

@ -42,7 +42,6 @@ import org.junit.platform.commons.util.ReflectionUtils;
import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
@ -268,7 +267,7 @@ public class ClusterTestExtensions implements TestTemplateInvocationContextProvi
.collect(Collectors.toMap(ClusterFeature::feature, ClusterFeature::version)); .collect(Collectors.toMap(ClusterFeature::feature, ClusterFeature::version));
ClusterConfig config = ClusterConfig.builder() ClusterConfig config = ClusterConfig.builder()
.setTypes(new HashSet<>(Arrays.asList(types))) .setTypes(Set.of(types))
.setBrokers(clusterTest.brokers() == 0 ? defaults.brokers() : clusterTest.brokers()) .setBrokers(clusterTest.brokers() == 0 ? defaults.brokers() : clusterTest.brokers())
.setControllers(clusterTest.controllers() == 0 ? defaults.controllers() : clusterTest.controllers()) .setControllers(clusterTest.controllers() == 0 ? defaults.controllers() : clusterTest.controllers())
.setDisksPerBroker(clusterTest.disksPerBroker() == 0 ? defaults.disksPerBroker() : clusterTest.disksPerBroker()) .setDisksPerBroker(clusterTest.disksPerBroker() == 0 ? defaults.disksPerBroker() : clusterTest.disksPerBroker())
@ -280,7 +279,7 @@ public class ClusterTestExtensions implements TestTemplateInvocationContextProvi
.setServerProperties(serverProperties) .setServerProperties(serverProperties)
.setPerServerProperties(perServerProperties) .setPerServerProperties(perServerProperties)
.setMetadataVersion(clusterTest.metadataVersion()) .setMetadataVersion(clusterTest.metadataVersion())
.setTags(Arrays.asList(clusterTest.tags())) .setTags(List.of(clusterTest.tags()))
.setFeatures(features) .setFeatures(features)
.build(); .build();

View File

@ -42,7 +42,6 @@ import org.junit.jupiter.api.extension.BeforeEachCallback;
import org.junit.jupiter.api.extension.Extension; import org.junit.jupiter.api.extension.Extension;
import org.junit.jupiter.api.extension.TestTemplateInvocationContext; import org.junit.jupiter.api.extension.TestTemplateInvocationContext;
import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
@ -55,8 +54,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
/** /**
* Wraps a {@link KafkaClusterTestKit} inside lifecycle methods for a test invocation. Each instance of this * Wraps a {@link KafkaClusterTestKit} inside lifecycle methods for a test invocation. Each instance of this
* class is provided with a configuration for the cluster. * class is provided with a configuration for the cluster.
@ -88,7 +85,7 @@ public class RaftClusterInvocationContext implements TestTemplateInvocationConte
@Override @Override
public List<Extension> getAdditionalExtensions() { public List<Extension> getAdditionalExtensions() {
RaftClusterInstance clusterInstance = new RaftClusterInstance(clusterConfig, isCombined); RaftClusterInstance clusterInstance = new RaftClusterInstance(clusterConfig, isCombined);
return Arrays.asList( return List.of(
(BeforeEachCallback) context -> { (BeforeEachCallback) context -> {
clusterInstance.format(); clusterInstance.format();
if (clusterConfig.isAutoStart()) { if (clusterConfig.isAutoStart()) {

View File

@ -23,10 +23,7 @@ import org.junit.jupiter.params.provider.ValueSource;
import java.nio.file.Path; import java.nio.file.Path;
import java.nio.file.Paths; import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -76,8 +73,8 @@ public class KafkaClusterTestKitTest {
@Test @Test
public void testCreateClusterWithBadPerServerProperties() { public void testCreateClusterWithBadPerServerProperties() {
Map<Integer, Map<String, String>> perServerProperties = new HashMap<>(); Map<Integer, Map<String, String>> perServerProperties = new HashMap<>();
perServerProperties.put(100, Collections.singletonMap("foo", "foo1")); perServerProperties.put(100, Map.of("foo", "foo1"));
perServerProperties.put(200, Collections.singletonMap("bar", "bar1")); perServerProperties.put(200, Map.of("bar", "bar1"));
IllegalArgumentException e = assertThrowsExactly(IllegalArgumentException.class, () -> new KafkaClusterTestKit.Builder( IllegalArgumentException e = assertThrowsExactly(IllegalArgumentException.class, () -> new KafkaClusterTestKit.Builder(
new TestKitNodes.Builder() new TestKitNodes.Builder()
@ -105,9 +102,9 @@ public class KafkaClusterTestKitTest {
nodes.brokerNodes().forEach((brokerId, node) -> { nodes.brokerNodes().forEach((brokerId, node) -> {
assertEquals(2, node.logDataDirectories().size()); assertEquals(2, node.logDataDirectories().size());
Set<String> expected = new HashSet<>(Arrays.asList(String.format("broker_%d_data0", brokerId), String.format("broker_%d_data1", brokerId))); Set<String> expected = Set.of(String.format("broker_%d_data0", brokerId), String.format("broker_%d_data1", brokerId));
if (nodes.isCombined(node.id())) { if (nodes.isCombined(node.id())) {
expected = new HashSet<>(Arrays.asList(String.format("combined_%d_0", brokerId), String.format("combined_%d_1", brokerId))); expected = Set.of(String.format("combined_%d_0", brokerId), String.format("combined_%d_1", brokerId));
} }
assertEquals( assertEquals(
expected, expected,
@ -140,6 +137,7 @@ public class KafkaClusterTestKitTest {
assertTrue(Paths.get(broker.metadataDirectory()).startsWith(baseDirectory))); assertTrue(Paths.get(broker.metadataDirectory()).startsWith(baseDirectory)));
} }
} }
@Test @Test
public void testExposedFaultHandlers() { public void testExposedFaultHandlers() {
TestKitNodes nodes = new TestKitNodes.Builder() TestKitNodes nodes = new TestKitNodes.Builder()

View File

@ -62,8 +62,6 @@ import org.junit.jupiter.api.Assertions;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.time.Duration; import java.time.Duration;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
@ -73,8 +71,6 @@ import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import static java.util.Collections.singleton;
import static java.util.Collections.singletonList;
import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG; import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
import static org.apache.kafka.clients.consumer.GroupProtocol.CLASSIC; import static org.apache.kafka.clients.consumer.GroupProtocol.CLASSIC;
@ -105,10 +101,10 @@ public class ClusterTestExtensionsTest {
static List<ClusterConfig> generate1() { static List<ClusterConfig> generate1() {
Map<String, String> serverProperties = new HashMap<>(); Map<String, String> serverProperties = new HashMap<>();
serverProperties.put("foo", "bar"); serverProperties.put("foo", "bar");
return singletonList(ClusterConfig.defaultBuilder() return List.of(ClusterConfig.defaultBuilder()
.setTypes(singleton(Type.KRAFT)) .setTypes(Set.of(Type.KRAFT))
.setServerProperties(serverProperties) .setServerProperties(serverProperties)
.setTags(singletonList("Generated Test")) .setTags(List.of("Generated Test"))
.build()); .build());
} }
@ -126,7 +122,7 @@ public class ClusterTestExtensionsTest {
assertEquals(Type.KRAFT, clusterInstance.type(), assertEquals(Type.KRAFT, clusterInstance.type(),
"generate1 provided a KRAFT cluster, so we should see that here"); "generate1 provided a KRAFT cluster, so we should see that here");
assertEquals("bar", clusterInstance.config().serverProperties().get("foo")); assertEquals("bar", clusterInstance.config().serverProperties().get("foo"));
assertEquals(singletonList("Generated Test"), clusterInstance.config().tags()); assertEquals(List.of("Generated Test"), clusterInstance.config().tags());
} }
// Multiple @ClusterTest can be used with @ClusterTests // Multiple @ClusterTest can be used with @ClusterTests
@ -157,22 +153,22 @@ public class ClusterTestExtensionsTest {
assertEquals("baz", clusterInstance.config().serverProperties().get("foo")); assertEquals("baz", clusterInstance.config().serverProperties().get("foo"));
assertEquals("eggs", clusterInstance.config().serverProperties().get("spam")); assertEquals("eggs", clusterInstance.config().serverProperties().get("spam"));
assertEquals("overwrite.value", clusterInstance.config().serverProperties().get("default.key")); assertEquals("overwrite.value", clusterInstance.config().serverProperties().get("default.key"));
assertEquals(Arrays.asList("default.display.key1", "default.display.key2"), clusterInstance.config().tags()); assertEquals(List.of("default.display.key1", "default.display.key2"), clusterInstance.config().tags());
// assert broker server 0 contains property queued.max.requests 200 from ClusterTest which overrides // assert broker server 0 contains property queued.max.requests 200 from ClusterTest which overrides
// the value 100 in server property in ClusterTestDefaults // the value 100 in server property in ClusterTestDefaults
try (Admin admin = clusterInstance.admin()) { try (Admin admin = clusterInstance.admin()) {
ConfigResource configResource = new ConfigResource(ConfigResource.Type.BROKER, "0"); ConfigResource configResource = new ConfigResource(ConfigResource.Type.BROKER, "0");
Map<ConfigResource, Config> configs = admin.describeConfigs(singletonList(configResource)).all().get(); Map<ConfigResource, Config> configs = admin.describeConfigs(List.of(configResource)).all().get();
assertEquals(1, configs.size()); assertEquals(1, configs.size());
assertEquals("200", configs.get(configResource).get("queued.max.requests").value()); assertEquals("200", configs.get(configResource).get("queued.max.requests").value());
} }
// In KRaft cluster non-combined mode, assert the controller server 3000 contains the property queued.max.requests 300 // In KRaft cluster non-combined mode, assert the controller server 3000 contains the property queued.max.requests 300
if (clusterInstance.type() == Type.KRAFT) { if (clusterInstance.type() == Type.KRAFT) {
try (Admin admin = Admin.create(Collections.singletonMap( try (Admin admin = Admin.create(Map.of(
AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG, clusterInstance.bootstrapControllers()))) { AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG, clusterInstance.bootstrapControllers()))) {
ConfigResource configResource = new ConfigResource(ConfigResource.Type.BROKER, "3000"); ConfigResource configResource = new ConfigResource(ConfigResource.Type.BROKER, "3000");
Map<ConfigResource, Config> configs = admin.describeConfigs(singletonList(configResource)).all().get(); Map<ConfigResource, Config> configs = admin.describeConfigs(List.of(configResource)).all().get();
assertEquals(1, configs.size()); assertEquals(1, configs.size());
assertEquals("300", configs.get(configResource).get("queued.max.requests").value()); assertEquals("300", configs.get(configResource).get("queued.max.requests").value());
} }
@ -217,7 +213,7 @@ public class ClusterTestExtensionsTest {
}) })
}) })
public void testNotSupportedNewGroupProtocols(ClusterInstance clusterInstance) { public void testNotSupportedNewGroupProtocols(ClusterInstance clusterInstance) {
assertEquals(singleton(CLASSIC), clusterInstance.supportedGroupProtocols()); assertEquals(Set.of(CLASSIC), clusterInstance.supportedGroupProtocols());
} }
@ -231,7 +227,7 @@ public class ClusterTestExtensionsTest {
try (Admin admin = clusterInstance.admin()) { try (Admin admin = clusterInstance.admin()) {
Assertions.assertTrue(admin.listTopics().listings().get().stream().anyMatch(s -> s.name().equals(topicName))); Assertions.assertTrue(admin.listTopics().listings().get().stream().anyMatch(s -> s.name().equals(topicName)));
List<TopicPartitionInfo> partitions = admin.describeTopics(singleton(topicName)).allTopicNames().get() List<TopicPartitionInfo> partitions = admin.describeTopics(Set.of(topicName)).allTopicNames().get()
.get(topicName).partitions(); .get(topicName).partitions();
assertEquals(numPartition, partitions.size()); assertEquals(numPartition, partitions.size());
Assertions.assertTrue(partitions.stream().allMatch(partition -> partition.replicas().size() == numReplicas)); Assertions.assertTrue(partitions.stream().allMatch(partition -> partition.replicas().size() == numReplicas));
@ -275,9 +271,9 @@ public class ClusterTestExtensionsTest {
public void testVerifyTopicDeletion(ClusterInstance clusterInstance) throws Exception { public void testVerifyTopicDeletion(ClusterInstance clusterInstance) throws Exception {
try (Admin admin = clusterInstance.admin()) { try (Admin admin = clusterInstance.admin()) {
String testTopic = "testTopic"; String testTopic = "testTopic";
admin.createTopics(singletonList(new NewTopic(testTopic, 1, (short) 1))); admin.createTopics(List.of(new NewTopic(testTopic, 1, (short) 1)));
clusterInstance.waitForTopic(testTopic, 1); clusterInstance.waitForTopic(testTopic, 1);
admin.deleteTopics(singletonList(testTopic)); admin.deleteTopics(List.of(testTopic));
clusterInstance.waitTopicDeletion(testTopic); clusterInstance.waitTopicDeletion(testTopic);
Assertions.assertTrue(admin.listTopics().listings().get().stream().noneMatch( Assertions.assertTrue(admin.listTopics().listings().get().stream().noneMatch(
topic -> topic.name().equals(testTopic) topic -> topic.name().equals(testTopic)
@ -299,12 +295,12 @@ public class ClusterTestExtensionsTest {
KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName(), KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName(),
VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName())) VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()))
) { ) {
adminClient.createTopics(singleton(new NewTopic(topic, 1, (short) 1))); adminClient.createTopics(Set.of(new NewTopic(topic, 1, (short) 1)));
assertNotNull(producer); assertNotNull(producer);
assertNotNull(consumer); assertNotNull(consumer);
producer.send(new ProducerRecord<>(topic, key, value)); producer.send(new ProducerRecord<>(topic, key, value));
producer.flush(); producer.flush();
consumer.subscribe(singletonList(topic)); consumer.subscribe(List.of(topic));
List<ConsumerRecord<String, String>> records = new ArrayList<>(); List<ConsumerRecord<String, String>> records = new ArrayList<>();
TestUtils.waitForCondition(() -> { TestUtils.waitForCondition(() -> {
consumer.poll(Duration.ofMillis(100)).forEach(records::add); consumer.poll(Duration.ofMillis(100)).forEach(records::add);
@ -327,12 +323,12 @@ public class ClusterTestExtensionsTest {
Producer<byte[], byte[]> producer = cluster.producer(); Producer<byte[], byte[]> producer = cluster.producer();
Consumer<byte[], byte[]> consumer = cluster.consumer() Consumer<byte[], byte[]> consumer = cluster.consumer()
) { ) {
adminClient.createTopics(singleton(new NewTopic(topic, 1, (short) 1))); adminClient.createTopics(Set.of(new NewTopic(topic, 1, (short) 1)));
assertNotNull(producer); assertNotNull(producer);
assertNotNull(consumer); assertNotNull(consumer);
producer.send(new ProducerRecord<>(topic, key, value)); producer.send(new ProducerRecord<>(topic, key, value));
producer.flush(); producer.flush();
consumer.subscribe(singletonList(topic)); consumer.subscribe(List.of(topic));
List<ConsumerRecord<byte[], byte[]>> records = new ArrayList<>(); List<ConsumerRecord<byte[], byte[]>> records = new ArrayList<>();
TestUtils.waitForCondition(() -> { TestUtils.waitForCondition(() -> {
consumer.poll(Duration.ofMillis(100)).forEach(records::add); consumer.poll(Duration.ofMillis(100)).forEach(records::add);

View File

@ -25,7 +25,6 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtensionContext; import org.junit.jupiter.api.extension.ExtensionContext;
import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.util.Collections;
import java.util.List; import java.util.List;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
@ -34,7 +33,7 @@ import static org.mockito.Mockito.when;
public class ClusterTestExtensionsUnitTest { public class ClusterTestExtensionsUnitTest {
static List<ClusterConfig> cfgEmpty() { static List<ClusterConfig> cfgEmpty() {
return Collections.emptyList(); return List.of();
} }
@SuppressWarnings({"unchecked", "rawtypes"}) @SuppressWarnings({"unchecked", "rawtypes"})

View File

@ -26,7 +26,6 @@ import java.io.IOException;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.List; import java.util.List;
import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertTrue;
@ -56,7 +55,7 @@ public class CatalogTestFilterTest {
@Test @Test
public void testEmptyCatalog(@TempDir Path tempDir) throws IOException { public void testEmptyCatalog(@TempDir Path tempDir) throws IOException {
Path catalog = tempDir.resolve("catalog.txt"); Path catalog = tempDir.resolve("catalog.txt");
Files.write(catalog, Collections.emptyList()); Files.write(catalog, List.of());
Filter<TestDescriptor> filter = CatalogTestFilter.create(catalog.toString()); Filter<TestDescriptor> filter = CatalogTestFilter.create(catalog.toString());
assertTrue(filter.apply(descriptor("o.a.k.Foo", "testBar1")).excluded()); assertTrue(filter.apply(descriptor("o.a.k.Foo", "testBar1")).excluded());