mirror of https://github.com/apache/kafka.git
KAFKA-16654 Refactor kafka.test.annotation.Type and ClusterTestExtensions (#15916)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
parent
1dcdccf736
commit
89083520ef
|
@ -35,13 +35,14 @@ import org.apache.kafka.server.common.MetadataVersion;
|
|||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Collections;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Set;
|
||||
import java.util.HashSet;
|
||||
import java.util.Arrays;
|
||||
|
||||
import static org.apache.kafka.clients.consumer.GroupProtocol.CLASSIC;
|
||||
import static org.apache.kafka.clients.consumer.GroupProtocol.CONSUMER;
|
||||
|
@ -62,10 +63,10 @@ public class ClusterTestExtensionsTest {
|
|||
}
|
||||
|
||||
// Static methods can generate cluster configurations
|
||||
static void generate1(ClusterGenerator clusterGenerator) {
|
||||
static List<ClusterConfig> generate1() {
|
||||
Map<String, String> serverProperties = new HashMap<>();
|
||||
serverProperties.put("foo", "bar");
|
||||
clusterGenerator.accept(ClusterConfig.defaultBuilder()
|
||||
return Collections.singletonList(ClusterConfig.defaultBuilder()
|
||||
.setTypes(Collections.singleton(Type.ZK))
|
||||
.setServerProperties(serverProperties)
|
||||
.setTags(Collections.singletonList("Generated Test"))
|
||||
|
|
|
@ -22,7 +22,6 @@ import kafka.test.junit.RaftClusterInvocationContext;
|
|||
import kafka.test.junit.ZkClusterInvocationContext;
|
||||
import org.junit.jupiter.api.extension.TestTemplateInvocationContext;
|
||||
|
||||
import java.util.function.Consumer;
|
||||
|
||||
/**
|
||||
* The type of cluster config being requested. Used by {@link kafka.test.ClusterConfig} and the test annotations.
|
||||
|
@ -30,22 +29,22 @@ import java.util.function.Consumer;
|
|||
public enum Type {
|
||||
KRAFT {
|
||||
@Override
|
||||
public void invocationContexts(String baseDisplayName, ClusterConfig config, Consumer<TestTemplateInvocationContext> invocationConsumer) {
|
||||
invocationConsumer.accept(new RaftClusterInvocationContext(baseDisplayName, config, false));
|
||||
public TestTemplateInvocationContext invocationContexts(String baseDisplayName, ClusterConfig config) {
|
||||
return new RaftClusterInvocationContext(baseDisplayName, config, false);
|
||||
}
|
||||
},
|
||||
CO_KRAFT {
|
||||
@Override
|
||||
public void invocationContexts(String baseDisplayName, ClusterConfig config, Consumer<TestTemplateInvocationContext> invocationConsumer) {
|
||||
invocationConsumer.accept(new RaftClusterInvocationContext(baseDisplayName, config, true));
|
||||
public TestTemplateInvocationContext invocationContexts(String baseDisplayName, ClusterConfig config) {
|
||||
return new RaftClusterInvocationContext(baseDisplayName, config, true);
|
||||
}
|
||||
},
|
||||
ZK {
|
||||
@Override
|
||||
public void invocationContexts(String baseDisplayName, ClusterConfig config, Consumer<TestTemplateInvocationContext> invocationConsumer) {
|
||||
invocationConsumer.accept(new ZkClusterInvocationContext(baseDisplayName, config));
|
||||
public TestTemplateInvocationContext invocationContexts(String baseDisplayName, ClusterConfig config) {
|
||||
return new ZkClusterInvocationContext(baseDisplayName, config);
|
||||
}
|
||||
};
|
||||
|
||||
public abstract void invocationContexts(String baseDisplayName, ClusterConfig config, Consumer<TestTemplateInvocationContext> invocationConsumer);
|
||||
public abstract TestTemplateInvocationContext invocationContexts(String baseDisplayName, ClusterConfig config);
|
||||
}
|
||||
|
|
|
@ -18,7 +18,6 @@
|
|||
package kafka.test.junit;
|
||||
|
||||
import kafka.test.ClusterConfig;
|
||||
import kafka.test.ClusterGenerator;
|
||||
import kafka.test.annotation.ClusterTest;
|
||||
import kafka.test.annotation.ClusterTestDefaults;
|
||||
import kafka.test.annotation.ClusterTests;
|
||||
|
@ -32,7 +31,6 @@ import org.junit.jupiter.api.extension.TestTemplateInvocationContextProvider;
|
|||
import org.junit.platform.commons.util.ReflectionUtils;
|
||||
|
||||
import java.lang.reflect.Method;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashSet;
|
||||
|
@ -94,24 +92,19 @@ public class ClusterTestExtensions implements TestTemplateInvocationContextProvi
|
|||
// Process the @ClusterTemplate annotation
|
||||
ClusterTemplate clusterTemplateAnnot = context.getRequiredTestMethod().getDeclaredAnnotation(ClusterTemplate.class);
|
||||
if (clusterTemplateAnnot != null) {
|
||||
processClusterTemplate(context, clusterTemplateAnnot, generatedContexts::add);
|
||||
if (generatedContexts.isEmpty()) {
|
||||
throw new IllegalStateException("ClusterConfig generator method should provide at least one config");
|
||||
}
|
||||
generatedContexts.addAll(processClusterTemplate(context, clusterTemplateAnnot));
|
||||
}
|
||||
|
||||
// Process single @ClusterTest annotation
|
||||
ClusterTest clusterTestAnnot = context.getRequiredTestMethod().getDeclaredAnnotation(ClusterTest.class);
|
||||
if (clusterTestAnnot != null) {
|
||||
processClusterTest(context, clusterTestAnnot, defaults, generatedContexts::add);
|
||||
generatedContexts.addAll(processClusterTest(context, clusterTestAnnot, defaults));
|
||||
}
|
||||
|
||||
// Process multiple @ClusterTest annotation within @ClusterTests
|
||||
ClusterTests clusterTestsAnnot = context.getRequiredTestMethod().getDeclaredAnnotation(ClusterTests.class);
|
||||
if (clusterTestsAnnot != null) {
|
||||
for (ClusterTest annot : clusterTestsAnnot.value()) {
|
||||
processClusterTest(context, annot, defaults, generatedContexts::add);
|
||||
}
|
||||
generatedContexts.addAll(processClusterTests(context, clusterTestsAnnot, defaults));
|
||||
}
|
||||
|
||||
if (generatedContexts.isEmpty()) {
|
||||
|
@ -122,31 +115,54 @@ public class ClusterTestExtensions implements TestTemplateInvocationContextProvi
|
|||
return generatedContexts.stream();
|
||||
}
|
||||
|
||||
void processClusterTemplate(ExtensionContext context, ClusterTemplate annot,
|
||||
Consumer<TestTemplateInvocationContext> testInvocations) {
|
||||
// If specified, call cluster config generated method (must be static)
|
||||
List<ClusterConfig> generatedClusterConfigs = new ArrayList<>();
|
||||
|
||||
|
||||
List<TestTemplateInvocationContext> processClusterTemplate(ExtensionContext context, ClusterTemplate annot) {
|
||||
if (annot.value().trim().isEmpty()) {
|
||||
throw new IllegalStateException("ClusterTemplate value can't be empty string.");
|
||||
}
|
||||
generateClusterConfigurations(context, annot.value(), generatedClusterConfigs::add);
|
||||
|
||||
String baseDisplayName = context.getRequiredTestMethod().getName();
|
||||
generatedClusterConfigs.forEach(config -> {
|
||||
for (Type type: config.clusterTypes()) {
|
||||
type.invocationContexts(baseDisplayName, config, testInvocations);
|
||||
}
|
||||
});
|
||||
List<TestTemplateInvocationContext> contexts = generateClusterConfigurations(context, annot.value())
|
||||
.stream().flatMap(config -> config.clusterTypes().stream()
|
||||
.map(type -> type.invocationContexts(baseDisplayName, config))).collect(Collectors.toList());
|
||||
|
||||
if (contexts.isEmpty()) {
|
||||
throw new IllegalStateException("ClusterConfig generator method should provide at least one config");
|
||||
}
|
||||
|
||||
return contexts;
|
||||
}
|
||||
|
||||
private void generateClusterConfigurations(ExtensionContext context, String generateClustersMethods, ClusterGenerator generator) {
|
||||
@SuppressWarnings("unchecked")
|
||||
private List<ClusterConfig> generateClusterConfigurations(ExtensionContext context, String generateClustersMethods) {
|
||||
Object testInstance = context.getTestInstance().orElse(null);
|
||||
Method method = ReflectionUtils.getRequiredMethod(context.getRequiredTestClass(), generateClustersMethods, ClusterGenerator.class);
|
||||
ReflectionUtils.invokeMethod(method, testInstance, generator);
|
||||
Method method = ReflectionUtils.getRequiredMethod(context.getRequiredTestClass(), generateClustersMethods);
|
||||
return (List<ClusterConfig>) ReflectionUtils.invokeMethod(method, testInstance);
|
||||
}
|
||||
|
||||
private void processClusterTest(ExtensionContext context, ClusterTest annot, ClusterTestDefaults defaults,
|
||||
Consumer<TestTemplateInvocationContext> testInvocations) {
|
||||
private List<TestTemplateInvocationContext> processClusterTests(ExtensionContext context, ClusterTests annots, ClusterTestDefaults defaults) {
|
||||
|
||||
List<TestTemplateInvocationContext> ret = Arrays.stream(annots.value())
|
||||
.flatMap(annot -> processClusterTestInternal(context, annot, defaults).stream()).collect(Collectors.toList());
|
||||
|
||||
if (ret.isEmpty()) {
|
||||
throw new IllegalStateException("processClusterTests method should provide at least one config");
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
private List<TestTemplateInvocationContext> processClusterTest(ExtensionContext context, ClusterTest annot, ClusterTestDefaults defaults) {
|
||||
List<TestTemplateInvocationContext> ret = processClusterTestInternal(context, annot, defaults);
|
||||
|
||||
if (ret.isEmpty()) {
|
||||
throw new IllegalStateException("processClusterTest method should provide at least one config");
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
private List<TestTemplateInvocationContext> processClusterTestInternal(ExtensionContext context, ClusterTest annot, ClusterTestDefaults defaults) {
|
||||
Type[] types = annot.types().length == 0 ? defaults.types() : annot.types();
|
||||
Map<String, String> serverProperties = Stream.concat(Arrays.stream(defaults.serverProperties()), Arrays.stream(annot.serverProperties()))
|
||||
.filter(e -> e.id() == -1)
|
||||
|
@ -169,9 +185,9 @@ public class ClusterTestExtensions implements TestTemplateInvocationContextProvi
|
|||
.setMetadataVersion(annot.metadataVersion())
|
||||
.setTags(Arrays.asList(annot.tags()))
|
||||
.build();
|
||||
for (Type type : types) {
|
||||
type.invocationContexts(context.getRequiredTestMethod().getName(), config, testInvocations);
|
||||
}
|
||||
|
||||
return Arrays.stream(types).map(type -> type.invocationContexts(context.getRequiredTestMethod().getName(), config))
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
private ClusterTestDefaults getClusterTestDefaults(Class<?> testClass) {
|
||||
|
|
|
@ -22,27 +22,24 @@ import kafka.test.annotation.ClusterTemplate;
|
|||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.extension.ExtensionContext;
|
||||
import org.junit.jupiter.api.extension.TestTemplateInvocationContext;
|
||||
import java.util.function.Consumer;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
public class ClusterTestExtensionsUnitTest {
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
void testProcessClusterTemplate() {
|
||||
ClusterTestExtensions ext = new ClusterTestExtensions();
|
||||
ExtensionContext context = mock(ExtensionContext.class);
|
||||
Consumer<TestTemplateInvocationContext> testInvocations = mock(Consumer.class);
|
||||
|
||||
ClusterTemplate annot = mock(ClusterTemplate.class);
|
||||
when(annot.value()).thenReturn("").thenReturn(" ");
|
||||
|
||||
Assertions.assertThrows(IllegalStateException.class, () ->
|
||||
ext.processClusterTemplate(context, annot, testInvocations)
|
||||
ext.processClusterTemplate(context, annot)
|
||||
);
|
||||
|
||||
Assertions.assertThrows(IllegalStateException.class, () ->
|
||||
ext.processClusterTemplate(context, annot, testInvocations)
|
||||
ext.processClusterTemplate(context, annot)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -42,24 +42,27 @@ annotation takes a single string value which references a static method on the t
|
|||
produce any number of test configurations using a fluent builder style API.
|
||||
|
||||
```java
|
||||
@ClusterTemplate("generateConfigs")
|
||||
void testSomething() { ... }
|
||||
import java.util.Arrays;
|
||||
|
||||
static void generateConfigs(ClusterGenerator clusterGenerator) {
|
||||
clusterGenerator.accept(ClusterConfig.defaultClusterBuilder()
|
||||
.name("Generated Test 1")
|
||||
.serverProperties(props1)
|
||||
.ibp("2.7-IV1")
|
||||
.build());
|
||||
clusterGenerator.accept(ClusterConfig.defaultClusterBuilder()
|
||||
.name("Generated Test 2")
|
||||
.serverProperties(props2)
|
||||
.ibp("2.7-IV2")
|
||||
.build());
|
||||
clusterGenerator.accept(ClusterConfig.defaultClusterBuilder()
|
||||
.name("Generated Test 3")
|
||||
.serverProperties(props3)
|
||||
.build());
|
||||
@ClusterTemplate("generateConfigs")
|
||||
void testSomething() { ...}
|
||||
|
||||
static List<ClusterConfig> generateConfigs() {
|
||||
ClusterConfig config1 = ClusterConfig.defaultClusterBuilder()
|
||||
.name("Generated Test 1")
|
||||
.serverProperties(props1)
|
||||
.ibp("2.7-IV1")
|
||||
.build();
|
||||
ClusterConfig config2 = ClusterConfig.defaultClusterBuilder()
|
||||
.name("Generated Test 2")
|
||||
.serverProperties(props2)
|
||||
.ibp("2.7-IV2")
|
||||
.build();
|
||||
ClusterConfig config3 = ClusterConfig.defaultClusterBuilder()
|
||||
.name("Generated Test 3")
|
||||
.serverProperties(props3)
|
||||
.build();
|
||||
return Arrays.asList(config1, config2, config3);
|
||||
}
|
||||
```
|
||||
|
||||
|
|
|
@ -19,7 +19,7 @@ package kafka.coordinator.transaction
|
|||
|
||||
import kafka.network.SocketServer
|
||||
import kafka.server.IntegrationTestUtils
|
||||
import kafka.test.{ClusterConfig, ClusterGenerator, ClusterInstance}
|
||||
import kafka.test.{ClusterConfig, ClusterInstance}
|
||||
import kafka.test.annotation.{AutoStart, ClusterConfigProperty, ClusterTemplate, ClusterTest, ClusterTestDefaults, ClusterTests, Type}
|
||||
import kafka.test.junit.ClusterTestExtensions
|
||||
import org.apache.kafka.common.message.InitProducerIdRequestData
|
||||
|
@ -38,19 +38,19 @@ import scala.concurrent.duration.DurationInt
|
|||
import scala.jdk.CollectionConverters._
|
||||
|
||||
object ProducerIdsIntegrationTest {
|
||||
def uniqueProducerIdsBumpIBP(clusterGenerator: ClusterGenerator): Unit = {
|
||||
def uniqueProducerIdsBumpIBP(): java.util.List[ClusterConfig] = {
|
||||
val serverProperties = java.util.Collections.singletonMap(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG, "2.8")
|
||||
val perBrokerProperties: java.util.Map[Integer, java.util.Map[String, String]] =
|
||||
java.util.Collections.singletonMap(0,
|
||||
java.util.Collections.singletonMap(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG, "3.0-IV0"))
|
||||
|
||||
clusterGenerator.accept(ClusterConfig.defaultBuilder()
|
||||
List(ClusterConfig.defaultBuilder()
|
||||
.setTypes(Set(Type.ZK).asJava)
|
||||
.setBrokers(3)
|
||||
.setAutoStart(false)
|
||||
.setServerProperties(serverProperties)
|
||||
.setPerServerProperties(perBrokerProperties)
|
||||
.build())
|
||||
.build()).asJava
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -17,7 +17,7 @@
|
|||
package kafka.zk
|
||||
|
||||
import kafka.server.KRaftCachedControllerId
|
||||
import kafka.test.{ClusterConfig, ClusterGenerator, ClusterInstance}
|
||||
import kafka.test.{ClusterConfig, ClusterInstance}
|
||||
import kafka.test.annotation.{AutoStart, ClusterConfigProperty, ClusterTemplate, ClusterTest, Type}
|
||||
import kafka.test.junit.ClusterTestExtensions
|
||||
import kafka.test.junit.ZkClusterInvocationContext.ZkClusterInstance
|
||||
|
@ -63,8 +63,7 @@ import scala.collection.Seq
|
|||
import scala.jdk.CollectionConverters._
|
||||
|
||||
object ZkMigrationIntegrationTest {
|
||||
|
||||
def zkClustersForAllMigrationVersions(clusterGenerator: ClusterGenerator): Unit = {
|
||||
def zkClustersForAllMigrationVersions(): java.util.List[ClusterConfig] = {
|
||||
Seq(
|
||||
MetadataVersion.IBP_3_4_IV0,
|
||||
MetadataVersion.IBP_3_5_IV2,
|
||||
|
@ -74,19 +73,19 @@ object ZkMigrationIntegrationTest {
|
|||
MetadataVersion.IBP_3_7_IV2,
|
||||
MetadataVersion.IBP_3_7_IV4,
|
||||
MetadataVersion.IBP_3_8_IV0
|
||||
).foreach { mv =>
|
||||
).map { mv =>
|
||||
val serverProperties = new util.HashMap[String, String]()
|
||||
serverProperties.put("inter.broker.listener.name", "EXTERNAL")
|
||||
serverProperties.put("listeners", "PLAINTEXT://localhost:0,EXTERNAL://localhost:0")
|
||||
serverProperties.put("advertised.listeners", "PLAINTEXT://localhost:0,EXTERNAL://localhost:0")
|
||||
serverProperties.put("listener.security.protocol.map", "EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT")
|
||||
clusterGenerator.accept(ClusterConfig.defaultBuilder()
|
||||
ClusterConfig.defaultBuilder()
|
||||
.setMetadataVersion(mv)
|
||||
.setBrokers(3)
|
||||
.setServerProperties(serverProperties)
|
||||
.setTypes(Set(Type.ZK).asJava)
|
||||
.build())
|
||||
}
|
||||
.build()
|
||||
}.asJava
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -17,7 +17,7 @@
|
|||
|
||||
package kafka.server
|
||||
|
||||
import kafka.test.{ClusterConfig, ClusterGenerator, ClusterInstance}
|
||||
import kafka.test.{ClusterConfig, ClusterInstance}
|
||||
import org.apache.kafka.common.message.ApiVersionsRequestData
|
||||
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
|
||||
import org.apache.kafka.common.requests.ApiVersionsRequest
|
||||
|
@ -26,6 +26,7 @@ import kafka.test.junit.ClusterTestExtensions
|
|||
import org.apache.kafka.server.common.MetadataVersion
|
||||
import org.junit.jupiter.api.Assertions._
|
||||
import org.junit.jupiter.api.extension.ExtendWith
|
||||
import scala.jdk.CollectionConverters._
|
||||
|
||||
object ApiVersionsRequestTest {
|
||||
|
||||
|
@ -39,42 +40,42 @@ object ApiVersionsRequestTest {
|
|||
serverProperties
|
||||
}
|
||||
|
||||
def testApiVersionsRequestTemplate(clusterGenerator: ClusterGenerator): Unit = {
|
||||
def testApiVersionsRequestTemplate(): java.util.List[ClusterConfig] = {
|
||||
val serverProperties: java.util.HashMap[String, String] = controlPlaneListenerProperties()
|
||||
serverProperties.put("unstable.api.versions.enable", "false")
|
||||
serverProperties.put("unstable.metadata.versions.enable", "true")
|
||||
clusterGenerator.accept(ClusterConfig.defaultBuilder()
|
||||
List(ClusterConfig.defaultBuilder()
|
||||
.setTypes(java.util.Collections.singleton(Type.ZK))
|
||||
.setServerProperties(serverProperties)
|
||||
.setMetadataVersion(MetadataVersion.IBP_3_8_IV0)
|
||||
.build())
|
||||
.build()).asJava
|
||||
}
|
||||
|
||||
def testApiVersionsRequestIncludesUnreleasedApisTemplate(clusterGenerator: ClusterGenerator): Unit = {
|
||||
def testApiVersionsRequestIncludesUnreleasedApisTemplate(): java.util.List[ClusterConfig] = {
|
||||
val serverProperties: java.util.HashMap[String, String] = controlPlaneListenerProperties()
|
||||
serverProperties.put("unstable.api.versions.enable", "true")
|
||||
serverProperties.put("unstable.metadata.versions.enable", "true")
|
||||
clusterGenerator.accept(ClusterConfig.defaultBuilder()
|
||||
List(ClusterConfig.defaultBuilder()
|
||||
.setTypes(java.util.Collections.singleton(Type.ZK))
|
||||
.setServerProperties(serverProperties)
|
||||
.build())
|
||||
.build()).asJava
|
||||
}
|
||||
|
||||
def testApiVersionsRequestValidationV0Template(clusterGenerator: ClusterGenerator): Unit = {
|
||||
def testApiVersionsRequestValidationV0Template(): java.util.List[ClusterConfig] = {
|
||||
val serverProperties: java.util.HashMap[String, String] = controlPlaneListenerProperties()
|
||||
serverProperties.put("unstable.api.versions.enable", "false")
|
||||
serverProperties.put("unstable.metadata.versions.enable", "false")
|
||||
clusterGenerator.accept(ClusterConfig.defaultBuilder()
|
||||
List(ClusterConfig.defaultBuilder()
|
||||
.setTypes(java.util.Collections.singleton(Type.ZK))
|
||||
.setMetadataVersion(MetadataVersion.IBP_3_7_IV4)
|
||||
.build())
|
||||
.build()).asJava
|
||||
}
|
||||
|
||||
def zkApiVersionsRequest(clusterGenerator: ClusterGenerator): Unit = {
|
||||
clusterGenerator.accept(ClusterConfig.defaultBuilder()
|
||||
def zkApiVersionsRequest(): java.util.List[ClusterConfig] = {
|
||||
List(ClusterConfig.defaultBuilder()
|
||||
.setTypes(java.util.Collections.singleton(Type.ZK))
|
||||
.setServerProperties(controlPlaneListenerProperties())
|
||||
.build())
|
||||
.build()).asJava
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -22,7 +22,7 @@ import kafka.api.{KafkaSasl, SaslSetup}
|
|||
import kafka.server.SaslApiVersionsRequestTest.{kafkaClientSaslMechanism, kafkaServerSaslMechanisms}
|
||||
import kafka.test.annotation.{ClusterTemplate, Type}
|
||||
import kafka.test.junit.ClusterTestExtensions
|
||||
import kafka.test.{ClusterConfig, ClusterGenerator, ClusterInstance}
|
||||
import kafka.test.{ClusterConfig, ClusterInstance}
|
||||
import kafka.utils.JaasTestUtils
|
||||
import org.apache.kafka.common.config.SaslConfigs
|
||||
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
|
||||
|
@ -44,7 +44,7 @@ object SaslApiVersionsRequestTest {
|
|||
val controlPlaneListenerName = "CONTROL_PLANE"
|
||||
val securityProtocol = SecurityProtocol.SASL_PLAINTEXT
|
||||
|
||||
def saslApiVersionsRequestClusterConfig(clusterGenerator: ClusterGenerator): Unit = {
|
||||
def saslApiVersionsRequestClusterConfig(): java.util.List[ClusterConfig] = {
|
||||
val saslServerProperties = new java.util.HashMap[String, String]()
|
||||
saslServerProperties.put(KafkaSecurityConfigs.SASL_MECHANISM_INTER_BROKER_PROTOCOL_CONFIG, kafkaClientSaslMechanism)
|
||||
saslServerProperties.put(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG, kafkaServerSaslMechanisms.mkString(","))
|
||||
|
@ -59,13 +59,13 @@ object SaslApiVersionsRequestTest {
|
|||
serverProperties.put("listeners", s"$securityProtocol://localhost:0,$controlPlaneListenerName://localhost:0")
|
||||
serverProperties.put(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG, s"$securityProtocol://localhost:0,$controlPlaneListenerName://localhost:0")
|
||||
|
||||
clusterGenerator.accept(ClusterConfig.defaultBuilder
|
||||
List(ClusterConfig.defaultBuilder
|
||||
.setSecurityProtocol(securityProtocol)
|
||||
.setTypes(Set(Type.ZK).asJava)
|
||||
.setSaslServerProperties(saslServerProperties)
|
||||
.setSaslClientProperties(saslClientProperties)
|
||||
.setServerProperties(serverProperties)
|
||||
.build())
|
||||
.build()).asJava
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -18,7 +18,6 @@
|
|||
package org.apache.kafka.tools.consumer.group;
|
||||
|
||||
import kafka.test.ClusterConfig;
|
||||
import kafka.test.ClusterGenerator;
|
||||
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
||||
import org.apache.kafka.common.errors.WakeupException;
|
||||
import org.apache.kafka.common.utils.Utils;
|
||||
|
@ -29,6 +28,7 @@ import java.util.HashMap;
|
|||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Collections;
|
||||
import java.util.Arrays;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
@ -50,7 +50,7 @@ class ConsumerGroupCommandTestUtils {
|
|||
private ConsumerGroupCommandTestUtils() {
|
||||
}
|
||||
|
||||
static void generator(ClusterGenerator clusterGenerator) {
|
||||
static List<ClusterConfig> generator() {
|
||||
Map<String, String> serverProperties = new HashMap<>();
|
||||
serverProperties.put(OFFSETS_TOPIC_PARTITIONS_CONFIG, "1");
|
||||
serverProperties.put(OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, "1");
|
||||
|
@ -61,7 +61,6 @@ class ConsumerGroupCommandTestUtils {
|
|||
.setServerProperties(serverProperties)
|
||||
.setTags(Collections.singletonList("classicGroupCoordinator"))
|
||||
.build();
|
||||
clusterGenerator.accept(classicGroupCoordinator);
|
||||
|
||||
// Following are test case config with new group coordinator
|
||||
serverProperties.put(NEW_GROUP_COORDINATOR_ENABLE_CONFIG, "true");
|
||||
|
@ -71,7 +70,7 @@ class ConsumerGroupCommandTestUtils {
|
|||
.setServerProperties(serverProperties)
|
||||
.setTags(Collections.singletonList("newGroupCoordinator"))
|
||||
.build();
|
||||
clusterGenerator.accept(consumerGroupCoordinator);
|
||||
return Arrays.asList(classicGroupCoordinator, consumerGroupCoordinator);
|
||||
}
|
||||
|
||||
static <T> AutoCloseable buildConsumers(int numberOfConsumers,
|
||||
|
|
|
@ -17,7 +17,7 @@
|
|||
package org.apache.kafka.tools.consumer.group;
|
||||
|
||||
import joptsimple.OptionException;
|
||||
import kafka.test.ClusterGenerator;
|
||||
import kafka.test.ClusterConfig;
|
||||
import kafka.test.ClusterInstance;
|
||||
import kafka.test.annotation.ClusterTemplate;
|
||||
import kafka.test.junit.ClusterTestExtensions;
|
||||
|
@ -36,6 +36,7 @@ import org.junit.jupiter.api.Test;
|
|||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
|
@ -67,8 +68,8 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
|
|||
@ExtendWith(value = ClusterTestExtensions.class)
|
||||
public class DeleteConsumerGroupsTest {
|
||||
|
||||
private static void generator(ClusterGenerator clusterGenerator) {
|
||||
ConsumerGroupCommandTestUtils.generator(clusterGenerator);
|
||||
private static List<ClusterConfig> generator() {
|
||||
return ConsumerGroupCommandTestUtils.generator();
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -16,8 +16,8 @@
|
|||
*/
|
||||
package org.apache.kafka.tools.consumer.group;
|
||||
|
||||
import kafka.test.ClusterConfig;
|
||||
import kafka.test.ClusterInstance;
|
||||
import kafka.test.ClusterGenerator;
|
||||
import kafka.test.annotation.ClusterTemplate;
|
||||
import kafka.test.junit.ClusterTestExtensions;
|
||||
import org.apache.kafka.clients.CommonClientConfigs;
|
||||
|
@ -44,6 +44,7 @@ import org.junit.jupiter.api.extension.ExtendWith;
|
|||
import java.time.Duration;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Properties;
|
||||
|
@ -64,8 +65,8 @@ public class DeleteOffsetsConsumerGroupCommandIntegrationTest {
|
|||
this.clusterInstance = clusterInstance;
|
||||
}
|
||||
|
||||
private static void generator(ClusterGenerator clusterGenerator) {
|
||||
ConsumerGroupCommandTestUtils.generator(clusterGenerator);
|
||||
private static List<ClusterConfig> generator() {
|
||||
return ConsumerGroupCommandTestUtils.generator();
|
||||
}
|
||||
|
||||
@ClusterTemplate("generator")
|
||||
|
|
Loading…
Reference in New Issue