asClass) {
+ return asClass.cast(getUnderlying());
+ }
+
+ Admin createAdminClient(Properties configOverrides);
+
+ default Admin createAdminClient() {
+ return createAdminClient(new Properties());
+ }
+
+ void start();
+
+ void stop();
+}
diff --git a/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java b/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java
new file mode 100644
index 00000000000..6818e4332ff
--- /dev/null
+++ b/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.test;
+
+import kafka.test.annotation.AutoStart;
+import kafka.test.annotation.ClusterConfigProperty;
+import kafka.test.annotation.ClusterTemplate;
+import kafka.test.annotation.ClusterTest;
+import kafka.test.annotation.ClusterTestDefaults;
+import kafka.test.annotation.ClusterTests;
+import kafka.test.annotation.Type;
+import kafka.test.junit.ClusterTestExtensions;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+
+@ClusterTestDefaults(clusterType = Type.ZK) // Set defaults for a few params in @ClusterTest(s)
+@ExtendWith(ClusterTestExtensions.class)
+public class ClusterTestExtensionsTest {
+
+ private final ClusterInstance clusterInstance;
+ private final ClusterConfig config;
+
+ ClusterTestExtensionsTest(ClusterInstance clusterInstance, ClusterConfig config) { // Constructor injections
+ this.clusterInstance = clusterInstance;
+ this.config = config;
+ }
+
+ // Static methods can generate cluster configurations
+ static void generate1(ClusterGenerator clusterGenerator) {
+ clusterGenerator.accept(ClusterConfig.defaultClusterBuilder().name("Generated Test").build());
+ }
+
+ // BeforeEach run after class construction, but before cluster initialization and test invocation
+ @BeforeEach
+ public void beforeEach(ClusterConfig config) {
+ Assertions.assertSame(this.config, config, "Injected objects should be the same");
+ config.serverProperties().put("before", "each");
+ }
+
+ // AfterEach runs after test invocation and cluster teardown
+ @AfterEach
+ public void afterEach(ClusterConfig config) {
+ Assertions.assertSame(this.config, config, "Injected objects should be the same");
+ }
+
+ // With no params, configuration comes from the annotation defaults as well as @ClusterTestDefaults (if present)
+ @ClusterTest
+ public void testClusterTest(ClusterConfig config, ClusterInstance clusterInstance) {
+ Assertions.assertSame(this.config, config, "Injected objects should be the same");
+ Assertions.assertSame(this.clusterInstance, clusterInstance, "Injected objects should be the same");
+ Assertions.assertEquals(clusterInstance.clusterType(), ClusterInstance.ClusterType.ZK); // From the class level default
+ Assertions.assertEquals(clusterInstance.config().serverProperties().getProperty("before"), "each");
+ }
+
+ // generate1 is a template method which generates any number of cluster configs
+ @ClusterTemplate("generate1")
+ public void testClusterTemplate() {
+ Assertions.assertEquals(clusterInstance.clusterType(), ClusterInstance.ClusterType.ZK,
+ "generate1 provided a Zk cluster, so we should see that here");
+ Assertions.assertEquals(clusterInstance.config().name().orElse(""), "Generated Test",
+ "generate 1 named this cluster config, so we should see that here");
+ Assertions.assertEquals(clusterInstance.config().serverProperties().getProperty("before"), "each");
+ }
+
+ // Multiple @ClusterTest can be used with @ClusterTests
+ @ClusterTests({
+ @ClusterTest(name = "cluster-tests-1", clusterType = Type.ZK, serverProperties = {
+ @ClusterConfigProperty(key = "foo", value = "bar"),
+ @ClusterConfigProperty(key = "spam", value = "eggs")
+ }),
+ @ClusterTest(name = "cluster-tests-2", clusterType = Type.ZK, serverProperties = {
+ @ClusterConfigProperty(key = "foo", value = "baz"),
+ @ClusterConfigProperty(key = "spam", value = "eggz")
+ })
+ })
+ public void testClusterTests() {
+ if (clusterInstance.config().name().filter(name -> name.equals("cluster-tests-1")).isPresent()) {
+ Assertions.assertEquals(clusterInstance.config().serverProperties().getProperty("foo"), "bar");
+ Assertions.assertEquals(clusterInstance.config().serverProperties().getProperty("spam"), "eggs");
+ } else if (clusterInstance.config().name().filter(name -> name.equals("cluster-tests-2")).isPresent()) {
+ Assertions.assertEquals(clusterInstance.config().serverProperties().getProperty("foo"), "baz");
+ Assertions.assertEquals(clusterInstance.config().serverProperties().getProperty("spam"), "eggz");
+ } else {
+ Assertions.fail("Unknown cluster config " + clusterInstance.config().name());
+ }
+ }
+
+ @ClusterTest(autoStart = AutoStart.NO)
+ public void testNoAutoStart() {
+ Assertions.assertThrows(RuntimeException.class, clusterInstance::anyBrokerSocketServer);
+ clusterInstance.start();
+ Assertions.assertNotNull(clusterInstance.anyBrokerSocketServer());
+ }
+}
diff --git a/core/src/test/java/kafka/test/annotation/AutoStart.java b/core/src/test/java/kafka/test/annotation/AutoStart.java
new file mode 100644
index 00000000000..24fdedfb22b
--- /dev/null
+++ b/core/src/test/java/kafka/test/annotation/AutoStart.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.test.annotation;
+
+public enum AutoStart {
+ YES,
+ NO,
+ DEFAULT
+}
diff --git a/core/src/test/java/kafka/test/annotation/ClusterConfigProperty.java b/core/src/test/java/kafka/test/annotation/ClusterConfigProperty.java
new file mode 100644
index 00000000000..eb1434d3b05
--- /dev/null
+++ b/core/src/test/java/kafka/test/annotation/ClusterConfigProperty.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.test.annotation;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Documented
+@Target({ElementType.ANNOTATION_TYPE})
+@Retention(RetentionPolicy.RUNTIME)
+public @interface ClusterConfigProperty {
+ String key();
+ String value();
+}
diff --git a/core/src/test/java/kafka/test/annotation/ClusterTemplate.java b/core/src/test/java/kafka/test/annotation/ClusterTemplate.java
new file mode 100644
index 00000000000..f776b4e8322
--- /dev/null
+++ b/core/src/test/java/kafka/test/annotation/ClusterTemplate.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.test.annotation;
+
+import kafka.test.ClusterConfig;
+import kafka.test.ClusterGenerator;
+import org.junit.jupiter.api.TestTemplate;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+/**
+ * Used to indicate that a test should call the method given by {@link #value()} to generate a number of
+ * cluster configurations. The method specified by the value should accept a single argument of the type
+ * {@link ClusterGenerator}. Any return value from the method is ignore. A test invocation
+ * will be generated for each {@link ClusterConfig} provided to the ClusterGenerator instance.
+ *
+ * The method given here must be static since it is invoked before any tests are actually run. Each test generated
+ * by this annotation will run as if it was defined as a separate test method with its own
+ * {@link org.junit.jupiter.api.Test}. That is to say, each generated test invocation will have a separate lifecycle.
+ *
+ * This annotation may be used in conjunction with {@link ClusterTest} and {@link ClusterTests} which also yield
+ * ClusterConfig instances.
+ *
+ * For Scala tests, the method should be defined in a companion object with the same name as the test class.
+ */
+@Documented
+@Target({METHOD})
+@Retention(RUNTIME)
+@TestTemplate
+public @interface ClusterTemplate {
+ /**
+ * Specify the static method used for generating cluster configs
+ */
+ String value();
+}
diff --git a/core/src/test/java/kafka/test/annotation/ClusterTest.java b/core/src/test/java/kafka/test/annotation/ClusterTest.java
new file mode 100644
index 00000000000..687255c3c47
--- /dev/null
+++ b/core/src/test/java/kafka/test/annotation/ClusterTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.test.annotation;
+
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.junit.jupiter.api.TestTemplate;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+@Documented
+@Target({METHOD})
+@Retention(RUNTIME)
+@TestTemplate
+public @interface ClusterTest {
+ Type clusterType() default Type.DEFAULT;
+ int brokers() default 0;
+ int controllers() default 0;
+ AutoStart autoStart() default AutoStart.DEFAULT;
+
+ String name() default "";
+ SecurityProtocol securityProtocol() default SecurityProtocol.PLAINTEXT;
+ String listener() default "";
+ ClusterConfigProperty[] serverProperties() default {};
+}
diff --git a/core/src/test/java/kafka/test/annotation/ClusterTestDefaults.java b/core/src/test/java/kafka/test/annotation/ClusterTestDefaults.java
new file mode 100644
index 00000000000..cd8a66dfda0
--- /dev/null
+++ b/core/src/test/java/kafka/test/annotation/ClusterTestDefaults.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.test.annotation;
+
+import kafka.test.junit.ClusterTestExtensions;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+
+import static java.lang.annotation.ElementType.TYPE;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+/**
+ * Used to set class level defaults for any test template methods annotated with {@link ClusterTest} or
+ * {@link ClusterTests}. The default values here are also used as the source for defaults in
+ * {@link ClusterTestExtensions}.
+ */
+@Documented
+@Target({TYPE})
+@Retention(RUNTIME)
+public @interface ClusterTestDefaults {
+ Type clusterType() default Type.ZK;
+ int brokers() default 1;
+ int controllers() default 1;
+ boolean autoStart() default true;
+}
diff --git a/core/src/test/java/kafka/test/annotation/ClusterTests.java b/core/src/test/java/kafka/test/annotation/ClusterTests.java
new file mode 100644
index 00000000000..64905f8810b
--- /dev/null
+++ b/core/src/test/java/kafka/test/annotation/ClusterTests.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.test.annotation;
+
+import org.junit.jupiter.api.TestTemplate;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+@Documented
+@Target({METHOD})
+@Retention(RUNTIME)
+@TestTemplate
+public @interface ClusterTests {
+ ClusterTest[] value();
+}
diff --git a/core/src/test/java/kafka/test/annotation/Type.java b/core/src/test/java/kafka/test/annotation/Type.java
new file mode 100644
index 00000000000..8e8f23627c3
--- /dev/null
+++ b/core/src/test/java/kafka/test/annotation/Type.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.test.annotation;
+
+/**
+ * The type of cluster config being requested. Used by {@link kafka.test.ClusterConfig} and the test annotations.
+ */
+public enum Type {
+ // RAFT,
+ ZK,
+ BOTH,
+ DEFAULT
+}
diff --git a/core/src/test/java/kafka/test/junit/ClusterInstanceParameterResolver.java b/core/src/test/java/kafka/test/junit/ClusterInstanceParameterResolver.java
new file mode 100644
index 00000000000..3329e328b59
--- /dev/null
+++ b/core/src/test/java/kafka/test/junit/ClusterInstanceParameterResolver.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.test.junit;
+
+import kafka.test.ClusterInstance;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.junit.jupiter.api.extension.ParameterContext;
+import org.junit.jupiter.api.extension.ParameterResolver;
+
+import java.lang.reflect.Executable;
+
+import static org.junit.platform.commons.util.AnnotationUtils.isAnnotated;
+
+/**
+ * This resolver provides an instance of {@link ClusterInstance} to a test invocation. The instance represents the
+ * underlying cluster being run for the current test. It can be injected into test methods or into the class
+ * constructor.
+ *
+ * N.B., if injected into the class constructor, the instance will not be fully initialized until the actual test method
+ * is being invoked. This is because the cluster is not started until after class construction and after "before"
+ * lifecycle methods have been run. Constructor injection is meant for convenience so helper methods can be defined on
+ * the test which can rely on a class member rather than an argument for ClusterInstance.
+ */
+public class ClusterInstanceParameterResolver implements ParameterResolver {
+ private final ClusterInstance clusterInstance;
+
+ ClusterInstanceParameterResolver(ClusterInstance clusterInstance) {
+ this.clusterInstance = clusterInstance;
+ }
+
+ @Override
+ public boolean supportsParameter(ParameterContext parameterContext, ExtensionContext extensionContext) {
+ if (!parameterContext.getParameter().getType().equals(ClusterInstance.class)) {
+ return false;
+ }
+
+ if (!extensionContext.getTestMethod().isPresent()) {
+ // Allow this to be injected into the class
+ extensionContext.getRequiredTestClass();
+ return true;
+ } else {
+ // If we're injecting into a method, make sure it's a test method and not a lifecycle method
+ Executable parameterizedMethod = parameterContext.getParameter().getDeclaringExecutable();
+ return isAnnotated(parameterizedMethod, TestTemplate.class);
+ }
+ }
+
+ @Override
+ public Object resolveParameter(ParameterContext parameterContext, ExtensionContext extensionContext) {
+ return clusterInstance;
+ }
+}
diff --git a/core/src/test/java/kafka/test/junit/ClusterTestExtensions.java b/core/src/test/java/kafka/test/junit/ClusterTestExtensions.java
new file mode 100644
index 00000000000..872b669e21f
--- /dev/null
+++ b/core/src/test/java/kafka/test/junit/ClusterTestExtensions.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.test.junit;
+
+import kafka.test.ClusterConfig;
+import kafka.test.ClusterGenerator;
+import kafka.test.annotation.ClusterTestDefaults;
+import kafka.test.annotation.ClusterConfigProperty;
+import kafka.test.annotation.ClusterTemplate;
+import kafka.test.annotation.ClusterTest;
+import kafka.test.annotation.ClusterTests;
+import kafka.test.annotation.Type;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.junit.jupiter.api.extension.TestTemplateInvocationContext;
+import org.junit.jupiter.api.extension.TestTemplateInvocationContextProvider;
+import org.junit.platform.commons.util.ReflectionUtils;
+
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.function.Consumer;
+import java.util.stream.Stream;
+
+/**
+ * This class is a custom JUnit extension that will generate some number of test invocations depending on the processing
+ * of a few custom annotations. These annotations are placed on so-called test template methods. Template methods look
+ * like normal JUnit test methods, but instead of being invoked directly, they are used as templates for generating
+ * multiple test invocations.
+ *
+ * Test class that use this extension should use one of the following annotations on each template method:
+ *
+ *
+ * - {@link ClusterTest}, define a single cluster configuration
+ * - {@link ClusterTests}, provide multiple instances of @ClusterTest
+ * - {@link ClusterTemplate}, define a static method that generates cluster configurations
+ *
+ *
+ * Any combination of these annotations may be used on a given test template method. If no test invocations are
+ * generated after processing the annotations, an error is thrown.
+ *
+ * Depending on which annotations are used, and what values are given, different {@link ClusterConfig} will be
+ * generated. Each ClusterConfig is used to create an underlying Kafka cluster that is used for the actual test
+ * invocation.
+ *
+ * For example:
+ *
+ *
+ * @ExtendWith(value = Array(classOf[ClusterTestExtensions]))
+ * class SomeIntegrationTest {
+ * @ClusterTest(brokers = 1, controllers = 1, clusterType = ClusterType.Both)
+ * def someTest(): Unit = {
+ * assertTrue(condition)
+ * }
+ * }
+ *
+ *
+ * will generate two invocations of "someTest" (since ClusterType.Both was given). For each invocation, the test class
+ * SomeIntegrationTest will be instantiated, lifecycle methods (before/after) will be run, and "someTest" will be invoked.
+ *
+ **/
+public class ClusterTestExtensions implements TestTemplateInvocationContextProvider {
+ @Override
+ public boolean supportsTestTemplate(ExtensionContext context) {
+ return true;
+ }
+
+ @Override
+ public Stream provideTestTemplateInvocationContexts(ExtensionContext context) {
+ ClusterTestDefaults defaults = getClusterTestDefaults(context.getRequiredTestClass());
+ List generatedContexts = new ArrayList<>();
+
+ // Process the @ClusterTemplate annotation
+ ClusterTemplate clusterTemplateAnnot = context.getRequiredTestMethod().getDeclaredAnnotation(ClusterTemplate.class);
+ if (clusterTemplateAnnot != null) {
+ processClusterTemplate(context, clusterTemplateAnnot, generatedContexts::add);
+ if (generatedContexts.size() == 0) {
+ throw new IllegalStateException("ClusterConfig generator method should provide at least one config");
+ }
+ }
+
+ // Process single @ClusterTest annotation
+ ClusterTest clusterTestAnnot = context.getRequiredTestMethod().getDeclaredAnnotation(ClusterTest.class);
+ if (clusterTestAnnot != null) {
+ processClusterTest(clusterTestAnnot, defaults, generatedContexts::add);
+ }
+
+ // Process multiple @ClusterTest annotation within @ClusterTests
+ ClusterTests clusterTestsAnnot = context.getRequiredTestMethod().getDeclaredAnnotation(ClusterTests.class);
+ if (clusterTestsAnnot != null) {
+ for (ClusterTest annot : clusterTestsAnnot.value()) {
+ processClusterTest(annot, defaults, generatedContexts::add);
+ }
+ }
+
+ if (generatedContexts.size() == 0) {
+ throw new IllegalStateException("Please annotate test methods with @ClusterTemplate, @ClusterTest, or " +
+ "@ClusterTests when using the ClusterTestExtensions provider");
+ }
+
+ return generatedContexts.stream();
+ }
+
+ private void processClusterTemplate(ExtensionContext context, ClusterTemplate annot,
+ Consumer testInvocations) {
+ // If specified, call cluster config generated method (must be static)
+ List generatedClusterConfigs = new ArrayList<>();
+ if (!annot.value().isEmpty()) {
+ generateClusterConfigurations(context, annot.value(), generatedClusterConfigs::add);
+ } else {
+ // Ensure we have at least one cluster config
+ generatedClusterConfigs.add(ClusterConfig.defaultClusterBuilder().build());
+ }
+
+ generatedClusterConfigs.forEach(config -> {
+ if (config.clusterType() == Type.ZK) {
+ testInvocations.accept(new ZkClusterInvocationContext(config.copyOf()));
+ } else {
+ throw new IllegalStateException("Unknown cluster type " + config.clusterType());
+ }
+ });
+ }
+
+ private void generateClusterConfigurations(ExtensionContext context, String generateClustersMethods, ClusterGenerator generator) {
+ Object testInstance = context.getTestInstance().orElse(null);
+ Method method = ReflectionUtils.getRequiredMethod(context.getRequiredTestClass(), generateClustersMethods, ClusterGenerator.class);
+ ReflectionUtils.invokeMethod(method, testInstance, generator);
+ }
+
+ private void processClusterTest(ClusterTest annot, ClusterTestDefaults defaults,
+ Consumer testInvocations) {
+ final Type type;
+ if (annot.clusterType() == Type.DEFAULT) {
+ type = defaults.clusterType();
+ } else {
+ type = annot.clusterType();
+ }
+
+ final int brokers;
+ if (annot.brokers() == 0) {
+ brokers = defaults.brokers();
+ } else {
+ brokers = annot.brokers();
+ }
+
+ final int controllers;
+ if (annot.controllers() == 0) {
+ controllers = defaults.controllers();
+ } else {
+ controllers = annot.controllers();
+ }
+
+ if (brokers <= 0 || controllers <= 0) {
+ throw new IllegalArgumentException("Number of brokers/controllers must be greater than zero.");
+ }
+
+ final boolean autoStart;
+ switch (annot.autoStart()) {
+ case YES:
+ autoStart = true;
+ break;
+ case NO:
+ autoStart = false;
+ break;
+ case DEFAULT:
+ autoStart = defaults.autoStart();
+ break;
+ default:
+ throw new IllegalStateException();
+ }
+
+ ClusterConfig.Builder builder = ClusterConfig.clusterBuilder(type, brokers, controllers, autoStart, annot.securityProtocol());
+ if (!annot.name().isEmpty()) {
+ builder.name(annot.name());
+ }
+ if (!annot.listener().isEmpty()) {
+ builder.listenerName(annot.listener());
+ }
+
+ Properties properties = new Properties();
+ for (ClusterConfigProperty property : annot.serverProperties()) {
+ properties.put(property.key(), property.value());
+ }
+
+ switch (type) {
+ case ZK:
+ case BOTH:
+ ClusterConfig config = builder.build();
+ config.serverProperties().putAll(properties);
+ testInvocations.accept(new ZkClusterInvocationContext(config));
+ break;
+ }
+ }
+
+ private ClusterTestDefaults getClusterTestDefaults(Class> testClass) {
+ return Optional.ofNullable(testClass.getDeclaredAnnotation(ClusterTestDefaults.class))
+ .orElseGet(() -> EmptyClass.class.getDeclaredAnnotation(ClusterTestDefaults.class));
+ }
+
+ @ClusterTestDefaults
+ private final static class EmptyClass {
+ // Just used as a convenience to get default values from the annotation
+ }
+}
diff --git a/core/src/test/java/kafka/test/junit/GenericParameterResolver.java b/core/src/test/java/kafka/test/junit/GenericParameterResolver.java
new file mode 100644
index 00000000000..70387e1680d
--- /dev/null
+++ b/core/src/test/java/kafka/test/junit/GenericParameterResolver.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.test.junit;
+
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.junit.jupiter.api.extension.ParameterContext;
+import org.junit.jupiter.api.extension.ParameterResolver;
+
+/**
+ * This resolver is used for supplying any type of object to the test invocation. It does not restrict where the given
+ * type can be injected, it simply checks if the requested injection type matches the type given in the constructor. If
+ * it matches, the given object is returned.
+ *
+ * This is useful for injecting helper objects and objects which can be fully initialized before the test lifecycle
+ * begins.
+ */
+public class GenericParameterResolver implements ParameterResolver {
+
+ private final T instance;
+ private final Class clazz;
+
+ GenericParameterResolver(T instance, Class clazz) {
+ this.instance = instance;
+ this.clazz = clazz;
+ }
+
+ @Override
+ public boolean supportsParameter(ParameterContext parameterContext, ExtensionContext extensionContext) {
+ return parameterContext.getParameter().getType().equals(clazz);
+ }
+
+ @Override
+ public Object resolveParameter(ParameterContext parameterContext, ExtensionContext extensionContext) {
+ return instance;
+ }
+}
diff --git a/core/src/test/java/kafka/test/junit/README.md b/core/src/test/java/kafka/test/junit/README.md
new file mode 100644
index 00000000000..dbd2bf408c9
--- /dev/null
+++ b/core/src/test/java/kafka/test/junit/README.md
@@ -0,0 +1,139 @@
+This document describes a custom JUnit extension which allows for running the same JUnit tests against multiple Kafka
+cluster configurations.
+
+# Annotations
+
+A new `@ClusterTest` annotation is introduced which allows for a test to declaratively configure an underlying Kafka cluster.
+
+```scala
+@ClusterTest
+def testSomething(): Unit = { ... }
+```
+
+This annotation has fields for cluster type and number of brokers, as well as commonly parameterized configurations.
+Arbitrary server properties can also be provided in the annotation:
+
+```java
+@ClusterTest(clusterType = Type.Zk, securityProtocol = "PLAINTEXT", properties = {
+ @ClusterProperty(key = "inter.broker.protocol.version", value = "2.7-IV2"),
+ @ClusterProperty(key = "socket.send.buffer.bytes", value = "10240"),
+})
+void testSomething() { ... }
+```
+
+Multiple `@ClusterTest` annotations can be given to generate more than one test invocation for the annotated method.
+
+```scala
+@ClusterTests(Array(
+ @ClusterTest(securityProtocol = "PLAINTEXT"),
+ @ClusterTest(securityProtocol = "SASL_PLAINTEXT")
+))
+def testSomething(): Unit = { ... }
+```
+
+A class-level `@ClusterTestDefaults` annotation is added to provide default values for `@ClusterTest` defined within
+the class. The intention here is to reduce repetitive annotation declarations and also make changing defaults easier
+for a class with many test cases.
+
+# Dynamic Configuration
+
+In order to allow for more flexible cluster configuration, a `@ClusterTemplate` annotation is also introduced. This
+annotation takes a single string value which references a static method on the test class. This method is used to
+produce any number of test configurations using a fluent builder style API.
+
+```java
+@ClusterTemplate("generateConfigs")
+void testSomething() { ... }
+
+static void generateConfigs(ClusterGenerator clusterGenerator) {
+ clusterGenerator.accept(ClusterConfig.defaultClusterBuilder()
+ .name("Generated Test 1")
+ .serverProperties(props1)
+ .ibp("2.7-IV1")
+ .build());
+ clusterGenerator.accept(ClusterConfig.defaultClusterBuilder()
+ .name("Generated Test 2")
+ .serverProperties(props2)
+ .ibp("2.7-IV2")
+ .build());
+ clusterGenerator.accept(ClusterConfig.defaultClusterBuilder()
+ .name("Generated Test 3")
+ .serverProperties(props3)
+ .build());
+}
+```
+
+This "escape hatch" from the simple declarative style configuration makes it easy to dynamically configure clusters.
+
+
+# JUnit Extension
+
+One thing to note is that our "test*" methods are no longer _tests_, but rather they are test templates. We have added
+a JUnit extension called `ClusterTestExtensions` which knows how to process these annotations in order to generate test
+invocations. Test classes that wish to make use of these annotations need to explicitly register this extension:
+
+```scala
+import kafka.test.junit.ClusterTestExtensions
+
+@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
+class ApiVersionsRequestTest {
+ ...
+}
+```
+
+# JUnit Lifecycle
+
+The lifecycle of a test class that is extended with `ClusterTestExtensions` follows:
+
+* JUnit discovers test template methods that are annotated with `@ClusterTest`, `@ClusterTests`, or `@ClusterTemplate`
+* `ClusterTestExtensions` is called for each of these template methods in order to generate some number of test invocations
+
+For each generated invocation:
+* Static `@BeforeAll` methods are called
+* Test class is instantiated
+* Non-static `@BeforeEach` methods are called
+* Kafka Cluster is started
+* Test method is invoked
+* Kafka Cluster is stopped
+* Non-static `@AfterEach` methods are called
+* Static `@AfterAll` methods are called
+
+`@BeforeEach` methods give an opportunity to setup additional test dependencies before the cluster is started.
+
+# Dependency Injection
+
+A few classes are introduced to provide context to the underlying cluster and to provide reusable functionality that was
+previously garnered from the test hierarchy.
+
+* ClusterConfig: a mutable cluster configuration, includes cluster type, number of brokers, properties, etc
+* ClusterInstance: a shim to the underlying class that actually runs the cluster, provides access to things like SocketServers
+* IntegrationTestHelper: connection related functions taken from IntegrationTestHarness and BaseRequestTest
+
+In order to have one of these objects injected, simply add it as a parameter to your test class, `@BeforeEach` method, or test method.
+
+| Injection | Class | BeforeEach | Test | Notes
+| --- | --- | --- | --- | --- |
+| ClusterConfig | yes | yes | yes* | Once in the test, changing config has no effect |
+| ClusterInstance | yes* | no | yes | Injectable at class level for convenience, can only be accessed inside test |
+| IntegrationTestHelper | yes | yes | yes | - |
+
+```scala
+@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
+class SomeTestClass(helper: IntegrationTestHelper) {
+
+ @BeforeEach
+ def setup(config: ClusterConfig): Unit = {
+ config.serverProperties().put("foo", "bar")
+ }
+
+ @ClusterTest
+ def testSomething(cluster: ClusterInstance): Unit = {
+ val topics = cluster.createAdminClient().listTopics()
+ }
+}
+```
+
+# Gotchas
+* Test methods annotated with JUnit's `@Test` will still be run, but no cluster will be started and no dependency
+ injection will happen. This is generally not what you want
+* Even though ClusterConfig is accessible and mutable inside the test method, changing it will have no affect on the cluster
\ No newline at end of file
diff --git a/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java b/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java
new file mode 100644
index 00000000000..62cc80df277
--- /dev/null
+++ b/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.test.junit;
+
+import kafka.api.IntegrationTestHarness;
+import kafka.network.SocketServer;
+import kafka.server.KafkaServer;
+import kafka.utils.TestUtils;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import kafka.test.ClusterConfig;
+import kafka.test.ClusterInstance;
+import org.junit.jupiter.api.extension.AfterTestExecutionCallback;
+import org.junit.jupiter.api.extension.BeforeTestExecutionCallback;
+import org.junit.jupiter.api.extension.Extension;
+import org.junit.jupiter.api.extension.TestTemplateInvocationContext;
+import scala.Option;
+import scala.collection.JavaConverters;
+import scala.compat.java8.OptionConverters;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+/**
+ * Wraps a {@link IntegrationTestHarness} inside lifecycle methods for a test invocation. Each instance of this
+ * class is provided with a configuration for the cluster.
+ *
+ * This context also provides parameter resolvers for:
+ *
+ *
+ * - ClusterConfig (the same instance passed to the constructor)
+ * - ClusterInstance (includes methods to expose underlying SocketServer-s)
+ * - IntegrationTestHelper (helper methods)
+ *
+ */
+public class ZkClusterInvocationContext implements TestTemplateInvocationContext {
+
+ private final ClusterConfig clusterConfig;
+ private final AtomicReference clusterReference;
+
+ public ZkClusterInvocationContext(ClusterConfig clusterConfig) {
+ this.clusterConfig = clusterConfig;
+ this.clusterReference = new AtomicReference<>();
+ }
+
+ @Override
+ public String getDisplayName(int invocationIndex) {
+ String clusterDesc = clusterConfig.nameTags().entrySet().stream()
+ .map(Object::toString)
+ .collect(Collectors.joining(", "));
+ return String.format("[Zk %d] %s", invocationIndex, clusterDesc);
+ }
+
+ @Override
+ public List getAdditionalExtensions() {
+ if (clusterConfig.numControllers() != 1) {
+ throw new IllegalArgumentException("For ZK clusters, please specify exactly 1 controller.");
+ }
+ ClusterInstance clusterShim = new ZkClusterInstance(clusterConfig, clusterReference);
+ return Arrays.asList(
+ (BeforeTestExecutionCallback) context -> {
+ // We have to wait to actually create the underlying cluster until after our @BeforeEach methods
+ // have run. This allows tests to set up external dependencies like ZK, MiniKDC, etc.
+ // However, since we cannot create this instance until we are inside the test invocation, we have
+ // to use a container class (AtomicReference) to provide this cluster object to the test itself
+
+ // This is what tests normally extend from to start a cluster, here we create it anonymously and
+ // configure the cluster using values from ClusterConfig
+ IntegrationTestHarness cluster = new IntegrationTestHarness() {
+ @Override
+ public Properties serverConfig() {
+ return clusterConfig.serverProperties();
+ }
+
+ @Override
+ public Properties adminClientConfig() {
+ return clusterConfig.adminClientProperties();
+ }
+
+ @Override
+ public Properties consumerConfig() {
+ return clusterConfig.consumerProperties();
+ }
+
+ @Override
+ public Properties producerConfig() {
+ return clusterConfig.producerProperties();
+ }
+
+ @Override
+ public SecurityProtocol securityProtocol() {
+ return clusterConfig.securityProtocol();
+ }
+
+ @Override
+ public ListenerName listenerName() {
+ return clusterConfig.listenerName().map(ListenerName::normalised)
+ .orElseGet(() -> ListenerName.forSecurityProtocol(securityProtocol()));
+ }
+
+ @Override
+ public Option serverSaslProperties() {
+ if (clusterConfig.saslServerProperties().isEmpty()) {
+ return Option.empty();
+ } else {
+ return Option.apply(clusterConfig.saslServerProperties());
+ }
+ }
+
+ @Override
+ public Option clientSaslProperties() {
+ if (clusterConfig.saslClientProperties().isEmpty()) {
+ return Option.empty();
+ } else {
+ return Option.apply(clusterConfig.saslClientProperties());
+ }
+ }
+
+ @Override
+ public int brokerCount() {
+ // Controllers are also brokers in zk mode, so just use broker count
+ return clusterConfig.numBrokers();
+ }
+
+ @Override
+ public Option trustStoreFile() {
+ return OptionConverters.toScala(clusterConfig.trustStoreFile());
+ }
+ };
+
+ clusterReference.set(cluster);
+ if (clusterConfig.isAutoStart()) {
+ clusterShim.start();
+ }
+ },
+ (AfterTestExecutionCallback) context -> clusterShim.stop(),
+ new ClusterInstanceParameterResolver(clusterShim),
+ new GenericParameterResolver<>(clusterConfig, ClusterConfig.class)
+ );
+ }
+
+ public static class ZkClusterInstance implements ClusterInstance {
+
+ final AtomicReference clusterReference;
+ final ClusterConfig config;
+ final AtomicBoolean started = new AtomicBoolean(false);
+ final AtomicBoolean stopped = new AtomicBoolean(false);
+
+ ZkClusterInstance(ClusterConfig config, AtomicReference clusterReference) {
+ this.config = config;
+ this.clusterReference = clusterReference;
+ }
+
+ @Override
+ public String bootstrapServers() {
+ return TestUtils.bootstrapServers(clusterReference.get().servers(), clusterReference.get().listenerName());
+ }
+
+ @Override
+ public Collection brokerSocketServers() {
+ return JavaConverters.asJavaCollection(clusterReference.get().servers()).stream()
+ .map(KafkaServer::socketServer)
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public ListenerName clientListener() {
+ return clusterReference.get().listenerName();
+ }
+
+ @Override
+ public Collection controllerSocketServers() {
+ return JavaConverters.asJavaCollection(clusterReference.get().servers()).stream()
+ .filter(broker -> broker.kafkaController().isActive())
+ .map(KafkaServer::socketServer)
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public SocketServer anyBrokerSocketServer() {
+ return JavaConverters.asJavaCollection(clusterReference.get().servers()).stream()
+ .map(KafkaServer::socketServer)
+ .findFirst()
+ .orElseThrow(() -> new RuntimeException("No broker SocketServers found"));
+ }
+
+ @Override
+ public SocketServer anyControllerSocketServer() {
+ return JavaConverters.asJavaCollection(clusterReference.get().servers()).stream()
+ .filter(broker -> broker.kafkaController().isActive())
+ .map(KafkaServer::socketServer)
+ .findFirst()
+ .orElseThrow(() -> new RuntimeException("No broker SocketServers found"));
+ }
+
+ @Override
+ public ClusterType clusterType() {
+ return ClusterType.ZK;
+ }
+
+ @Override
+ public ClusterConfig config() {
+ return config;
+ }
+
+ @Override
+ public IntegrationTestHarness getUnderlying() {
+ return clusterReference.get();
+ }
+
+ @Override
+ public Admin createAdminClient(Properties configOverrides) {
+ return clusterReference.get().createAdminClient(configOverrides);
+ }
+
+ @Override
+ public void start() {
+ if (started.compareAndSet(false, true)) {
+ clusterReference.get().setUp();
+ }
+ }
+
+ @Override
+ public void stop() {
+ if (stopped.compareAndSet(false, true)) {
+ clusterReference.get().tearDown();
+ }
+ }
+ }
+}
diff --git a/core/src/test/scala/integration/kafka/api/SaslSetup.scala b/core/src/test/scala/integration/kafka/api/SaslSetup.scala
index 2bd55b855a9..92370113d13 100644
--- a/core/src/test/scala/integration/kafka/api/SaslSetup.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslSetup.scala
@@ -97,7 +97,7 @@ trait SaslSetup {
(serverKeytabFile.get, clientKeytabFile.get)
}
- protected def jaasSections(kafkaServerSaslMechanisms: Seq[String],
+ def jaasSections(kafkaServerSaslMechanisms: Seq[String],
kafkaClientSaslMechanism: Option[String],
mode: SaslSetupMode = Both,
kafkaServerEntryName: String = JaasTestUtils.KafkaServerContextName): Seq[JaasSection] = {
diff --git a/core/src/test/scala/integration/kafka/server/IntegrationTestUtils.scala b/core/src/test/scala/integration/kafka/server/IntegrationTestUtils.scala
new file mode 100644
index 00000000000..c960e35c590
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/server/IntegrationTestUtils.scala
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package integration.kafka.server
+
+import kafka.network.SocketServer
+import kafka.utils.{NotNothing, TestUtils}
+import org.apache.kafka.common.network.{ListenerName, Mode}
+import org.apache.kafka.common.protocol.ApiKeys
+import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, RequestHeader, RequestTestUtils, ResponseHeader}
+import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.apache.kafka.common.utils.Utils
+
+import java.io.{DataInputStream, DataOutputStream}
+import java.net.Socket
+import java.nio.ByteBuffer
+import java.util.Properties
+import scala.annotation.nowarn
+import scala.reflect.ClassTag
+
+object IntegrationTestUtils {
+
+ private def sendRequest(socket: Socket, request: Array[Byte]): Unit = {
+ val outgoing = new DataOutputStream(socket.getOutputStream)
+ outgoing.writeInt(request.length)
+ outgoing.write(request)
+ outgoing.flush()
+ }
+
+ def sendWithHeader(request: AbstractRequest, header: RequestHeader, socket: Socket): Unit = {
+ val serializedBytes = Utils.toArray(RequestTestUtils.serializeRequestWithHeader(header, request))
+ sendRequest(socket, serializedBytes)
+ }
+
+ def nextRequestHeader[T <: AbstractResponse](apiKey: ApiKeys,
+ apiVersion: Short,
+ clientId: String = "client-id",
+ correlationIdOpt: Option[Int] = None): RequestHeader = {
+ val correlationId = correlationIdOpt.getOrElse {
+ this.correlationId += 1
+ this.correlationId
+ }
+ new RequestHeader(apiKey, apiVersion, clientId, correlationId)
+ }
+
+ def send(request: AbstractRequest,
+ socket: Socket,
+ clientId: String = "client-id",
+ correlationId: Option[Int] = None): Unit = {
+ val header = nextRequestHeader(request.apiKey, request.version, clientId, correlationId)
+ sendWithHeader(request, header, socket)
+ }
+
+ def receive[T <: AbstractResponse](socket: Socket, apiKey: ApiKeys, version: Short)
+ (implicit classTag: ClassTag[T], @nowarn("cat=unused") nn: NotNothing[T]): T = {
+ val incoming = new DataInputStream(socket.getInputStream)
+ val len = incoming.readInt()
+
+ val responseBytes = new Array[Byte](len)
+ incoming.readFully(responseBytes)
+
+ val responseBuffer = ByteBuffer.wrap(responseBytes)
+ ResponseHeader.parse(responseBuffer, apiKey.responseHeaderVersion(version))
+
+ AbstractResponse.parseResponse(apiKey, responseBuffer, version) match {
+ case response: T => response
+ case response =>
+ throw new ClassCastException(s"Expected response with type ${classTag.runtimeClass}, but found ${response.getClass}")
+ }
+ }
+
+ def sendAndReceive[T <: AbstractResponse](request: AbstractRequest,
+ socket: Socket,
+ clientId: String = "client-id",
+ correlationId: Option[Int] = None)
+ (implicit classTag: ClassTag[T], nn: NotNothing[T]): T = {
+ send(request, socket, clientId, correlationId)
+ receive[T](socket, request.apiKey, request.version)
+ }
+
+ def connectAndReceive[T <: AbstractResponse](request: AbstractRequest,
+ destination: SocketServer,
+ listenerName: ListenerName)
+ (implicit classTag: ClassTag[T], nn: NotNothing[T]): T = {
+ val socket = connect(destination, listenerName)
+ try sendAndReceive[T](request, socket)
+ finally socket.close()
+ }
+
+ protected def securityProtocol: SecurityProtocol = SecurityProtocol.PLAINTEXT
+ private var correlationId = 0
+
+ def connect(socketServer: SocketServer,
+ listenerName: ListenerName): Socket = {
+ new Socket("localhost", socketServer.boundPort(listenerName))
+ }
+
+ def clientSecurityProps(certAlias: String): Properties = {
+ TestUtils.securityConfigs(Mode.CLIENT, securityProtocol, None, certAlias, TestUtils.SslCertificateCn, None) // TODO use real trust store and client SASL properties
+ }
+}
diff --git a/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala b/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala
index d71764e433b..f7163cad10f 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala
@@ -16,21 +16,28 @@
*/
package kafka.server
-import java.util.Properties
+import integration.kafka.server.IntegrationTestUtils
+import kafka.test.ClusterInstance
import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.ApiKeys
import org.apache.kafka.common.requests.{ApiVersionsRequest, ApiVersionsResponse}
import org.junit.jupiter.api.Assertions._
+import java.util.Properties
import scala.jdk.CollectionConverters._
-abstract class AbstractApiVersionsRequestTest extends BaseRequestTest {
+abstract class AbstractApiVersionsRequestTest(cluster: ClusterInstance) {
+
+ def sendApiVersionsRequest(request: ApiVersionsRequest, listenerName: ListenerName): ApiVersionsResponse = {
+ IntegrationTestUtils.connectAndReceive[ApiVersionsResponse](request, cluster.brokerSocketServers().asScala.head, listenerName)
+ }
def controlPlaneListenerName = new ListenerName("CONTROLLER")
// Configure control plane listener to make sure we have separate listeners for testing.
- override def brokerPropertyOverrides(properties: Properties): Unit = {
+ def brokerPropertyOverrides(properties: Properties): Unit = {
+ val securityProtocol = cluster.config().securityProtocol()
properties.setProperty(KafkaConfig.ControlPlaneListenerNameProp, controlPlaneListenerName.value())
properties.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp, s"${controlPlaneListenerName.value()}:$securityProtocol,$securityProtocol:$securityProtocol")
properties.setProperty("listeners", s"$securityProtocol://localhost:0,${controlPlaneListenerName.value()}://localhost:0")
@@ -38,15 +45,15 @@ abstract class AbstractApiVersionsRequestTest extends BaseRequestTest {
}
def sendUnsupportedApiVersionRequest(request: ApiVersionsRequest): ApiVersionsResponse = {
- val overrideHeader = nextRequestHeader(ApiKeys.API_VERSIONS, Short.MaxValue)
- val socket = connect(anySocketServer)
+ val overrideHeader = IntegrationTestUtils.nextRequestHeader(ApiKeys.API_VERSIONS, Short.MaxValue)
+ val socket = IntegrationTestUtils.connect(cluster.brokerSocketServers().asScala.head, cluster.clientListener())
try {
- sendWithHeader(request, overrideHeader, socket)
- receive[ApiVersionsResponse](socket, ApiKeys.API_VERSIONS, 0.toShort)
+ IntegrationTestUtils.sendWithHeader(request, overrideHeader, socket)
+ IntegrationTestUtils.receive[ApiVersionsResponse](socket, ApiKeys.API_VERSIONS, 0.toShort)
} finally socket.close()
}
- def validateApiVersionsResponse(apiVersionsResponse: ApiVersionsResponse, listenerName: ListenerName = interBrokerListenerName): Unit = {
+ def validateApiVersionsResponse(apiVersionsResponse: ApiVersionsResponse, listenerName: ListenerName): Unit = {
val expectedApis = ApiKeys.brokerApis()
if (listenerName == controlPlaneListenerName) {
expectedApis.add(ApiKeys.ENVELOPE)
diff --git a/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala b/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala
index 0cdae57b9b5..dc35bae4ab0 100644
--- a/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala
@@ -17,32 +17,40 @@
package kafka.server
+import kafka.test.{ClusterConfig, ClusterInstance}
import org.apache.kafka.common.message.ApiVersionsRequestData
-import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
-import org.apache.kafka.common.requests.{ApiVersionsRequest, ApiVersionsResponse}
+import org.apache.kafka.common.requests.ApiVersionsRequest
+import kafka.test.annotation.ClusterTest
+import kafka.test.junit.ClusterTestExtensions
import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.BeforeEach
+import org.junit.jupiter.api.extension.ExtendWith
-class ApiVersionsRequestTest extends AbstractApiVersionsRequestTest {
- override def brokerCount: Int = 1
+@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
+class ApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVersionsRequestTest(cluster) {
- @Test
- def testApiVersionsRequest(): Unit = {
- val request = new ApiVersionsRequest.Builder().build()
- val apiVersionsResponse = sendApiVersionsRequest(request)
- validateApiVersionsResponse(apiVersionsResponse)
+ @BeforeEach
+ def setup(config: ClusterConfig): Unit = {
+ super.brokerPropertyOverrides(config.serverProperties())
}
- @Test
+ @ClusterTest
+ def testApiVersionsRequest(): Unit = {
+ val request = new ApiVersionsRequest.Builder().build()
+ val apiVersionsResponse = sendApiVersionsRequest(request, cluster.clientListener())
+ validateApiVersionsResponse(apiVersionsResponse, cluster.clientListener())
+ }
+
+ @ClusterTest
def testApiVersionsRequestThroughControlPlaneListener(): Unit = {
val request = new ApiVersionsRequest.Builder().build()
val apiVersionsResponse = sendApiVersionsRequest(request, super.controlPlaneListenerName)
validateApiVersionsResponse(apiVersionsResponse, super.controlPlaneListenerName)
}
- @Test
+ @ClusterTest
def testApiVersionsRequestWithUnsupportedVersion(): Unit = {
val apiVersionsRequest = new ApiVersionsRequest.Builder().build()
val apiVersionsResponse = sendUnsupportedApiVersionRequest(apiVersionsRequest)
@@ -54,30 +62,25 @@ class ApiVersionsRequestTest extends AbstractApiVersionsRequestTest {
assertEquals(ApiKeys.API_VERSIONS.latestVersion(), apiVersion.maxVersion())
}
- @Test
+ @ClusterTest
def testApiVersionsRequestValidationV0(): Unit = {
val apiVersionsRequest = new ApiVersionsRequest.Builder().build(0.asInstanceOf[Short])
- val apiVersionsResponse = sendApiVersionsRequest(apiVersionsRequest)
- validateApiVersionsResponse(apiVersionsResponse)
+ val apiVersionsResponse = sendApiVersionsRequest(apiVersionsRequest, cluster.clientListener())
+ validateApiVersionsResponse(apiVersionsResponse, cluster.clientListener())
}
- @Test
+ @ClusterTest
def testApiVersionsRequestValidationV0ThroughControlPlaneListener(): Unit = {
val apiVersionsRequest = new ApiVersionsRequest.Builder().build(0.asInstanceOf[Short])
val apiVersionsResponse = sendApiVersionsRequest(apiVersionsRequest, super.controlPlaneListenerName)
validateApiVersionsResponse(apiVersionsResponse, super.controlPlaneListenerName)
}
- @Test
+ @ClusterTest
def testApiVersionsRequestValidationV3(): Unit = {
// Invalid request because Name and Version are empty by default
val apiVersionsRequest = new ApiVersionsRequest(new ApiVersionsRequestData(), 3.asInstanceOf[Short])
- val apiVersionsResponse = sendApiVersionsRequest(apiVersionsRequest)
+ val apiVersionsResponse = sendApiVersionsRequest(apiVersionsRequest, cluster.clientListener())
assertEquals(Errors.INVALID_REQUEST.code(), apiVersionsResponse.data.errorCode())
}
-
- private def sendApiVersionsRequest(request: ApiVersionsRequest,
- listenerName: ListenerName = super.listenerName): ApiVersionsResponse = {
- connectAndReceive[ApiVersionsResponse](request, listenerName = listenerName)
- }
}
diff --git a/core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala b/core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala
index a011a97cc98..61bd02a0634 100644
--- a/core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala
@@ -17,8 +17,10 @@
package kafka.server
-import java.net.InetAddress
+import integration.kafka.server.IntegrationTestUtils
+import kafka.test.ClusterInstance
+import java.net.InetAddress
import org.apache.kafka.clients.admin.{ScramCredentialInfo, ScramMechanism, UserScramCredentialUpsertion}
import org.apache.kafka.common.config.internals.QuotaConfigs
import org.apache.kafka.common.errors.{InvalidRequestException, UnsupportedVersionException}
@@ -26,23 +28,25 @@ import org.apache.kafka.common.internals.KafkaFutureImpl
import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity, ClientQuotaFilter, ClientQuotaFilterComponent}
import org.apache.kafka.common.requests.{AlterClientQuotasRequest, AlterClientQuotasResponse, DescribeClientQuotasRequest, DescribeClientQuotasResponse}
import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.extension.ExtendWith
+
import java.util
import java.util.concurrent.{ExecutionException, TimeUnit}
-
import kafka.utils.TestUtils
+import kafka.test.annotation.{ClusterTest, ClusterTestDefaults, Type}
+import kafka.test.junit.ClusterTestExtensions
import scala.jdk.CollectionConverters._
-class ClientQuotasRequestTest extends BaseRequestTest {
+@ClusterTestDefaults(clusterType = Type.ZK)
+@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
+class ClientQuotasRequestTest(cluster: ClusterInstance) {
private val ConsumerByteRateProp = QuotaConfigs.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG
private val ProducerByteRateProp = QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG
private val RequestPercentageProp = QuotaConfigs.REQUEST_PERCENTAGE_OVERRIDE_CONFIG
private val IpConnectionRateProp = QuotaConfigs.IP_CONNECTION_RATE_OVERRIDE_CONFIG
- override val brokerCount = 1
-
- @Test
+ @ClusterTest
def testAlterClientQuotasRequest(): Unit = {
val entity = new ClientQuotaEntity(Map((ClientQuotaEntity.USER -> "user"), (ClientQuotaEntity.CLIENT_ID -> "client-id")).asJava)
@@ -112,7 +116,7 @@ class ClientQuotasRequestTest extends BaseRequestTest {
))
}
- @Test
+ @ClusterTest
def testAlterClientQuotasRequestValidateOnly(): Unit = {
val entity = new ClientQuotaEntity(Map((ClientQuotaEntity.USER -> "user")).asJava)
@@ -170,11 +174,11 @@ class ClientQuotasRequestTest extends BaseRequestTest {
))
}
- @Test
+ @ClusterTest
def testClientQuotasForScramUsers(): Unit = {
val userName = "user"
- val results = createAdminClient().alterUserScramCredentials(util.Arrays.asList(
+ val results = cluster.createAdminClient().alterUserScramCredentials(util.Arrays.asList(
new UserScramCredentialUpsertion(userName, new ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "password")))
results.all.get
@@ -193,7 +197,7 @@ class ClientQuotasRequestTest extends BaseRequestTest {
))
}
- @Test
+ @ClusterTest
def testAlterIpQuotasRequest(): Unit = {
val knownHost = "1.2.3.4"
val unknownHost = "2.3.4.5"
@@ -218,7 +222,7 @@ class ClientQuotasRequestTest extends BaseRequestTest {
var currentServerQuota = 0
TestUtils.waitUntilTrue(
() => {
- currentServerQuota = servers.head.socketServer.connectionQuotas.connectionRateForIp(entityIp)
+ currentServerQuota = cluster.brokerSocketServers().asScala.head.connectionQuotas.connectionRateForIp(entityIp)
Math.abs(expectedMatches(entity) - currentServerQuota) < 0.01
}, s"Connection quota of $entity is not ${expectedMatches(entity)} but $currentServerQuota")
}
@@ -251,40 +255,25 @@ class ClientQuotasRequestTest extends BaseRequestTest {
verifyIpQuotas(allIpEntityFilter, Map.empty)
}
- @Test
- def testAlterClientQuotasBadUser(): Unit = {
- val entity = new ClientQuotaEntity(Map((ClientQuotaEntity.USER -> "")).asJava)
- assertThrows(classOf[InvalidRequestException], () => alterEntityQuotas(entity, Map(RequestPercentageProp -> Some(12.34)), validateOnly = true))
- }
+ @ClusterTest
+ def testAlterClientQuotasInvalidRequests(): Unit = {
+ var entity = new ClientQuotaEntity(Map((ClientQuotaEntity.USER -> "")).asJava)
+ assertThrows(classOf[InvalidRequestException], () => alterEntityQuotas(entity, Map((RequestPercentageProp -> Some(12.34))), validateOnly = true))
- @Test
- def testAlterClientQuotasBadClientId(): Unit = {
- val entity = new ClientQuotaEntity(Map((ClientQuotaEntity.CLIENT_ID -> "")).asJava)
- assertThrows(classOf[InvalidRequestException], () => alterEntityQuotas(entity, Map(RequestPercentageProp -> Some(12.34)), validateOnly = true))
- }
+ entity = new ClientQuotaEntity(Map((ClientQuotaEntity.CLIENT_ID -> "")).asJava)
+ assertThrows(classOf[InvalidRequestException], () => alterEntityQuotas(entity, Map((RequestPercentageProp -> Some(12.34))), validateOnly = true))
- @Test
- def testAlterClientQuotasBadEntityType(): Unit = {
- val entity = new ClientQuotaEntity(Map(("" -> "name")).asJava)
- assertThrows(classOf[InvalidRequestException], () => alterEntityQuotas(entity, Map(RequestPercentageProp -> Some(12.34)), validateOnly = true))
- }
+ entity = new ClientQuotaEntity(Map(("" -> "name")).asJava)
+ assertThrows(classOf[InvalidRequestException], () => alterEntityQuotas(entity, Map((RequestPercentageProp -> Some(12.34))), validateOnly = true))
- @Test
- def testAlterClientQuotasEmptyEntity(): Unit = {
- val entity = new ClientQuotaEntity(Map.empty.asJava)
- assertThrows(classOf[InvalidRequestException], () => alterEntityQuotas(entity, Map(ProducerByteRateProp -> Some(10000.5)), validateOnly = true))
- }
+ entity = new ClientQuotaEntity(Map.empty.asJava)
+ assertThrows(classOf[InvalidRequestException], () => alterEntityQuotas(entity, Map((ProducerByteRateProp -> Some(10000.5))), validateOnly = true))
- @Test
- def testAlterClientQuotasBadConfigKey(): Unit = {
- val entity = new ClientQuotaEntity(Map(ClientQuotaEntity.USER -> "user").asJava)
- assertThrows(classOf[InvalidRequestException], () => alterEntityQuotas(entity, Map("bad" -> Some(1.0)), validateOnly = true))
- }
+ entity = new ClientQuotaEntity(Map((ClientQuotaEntity.USER -> "user")).asJava)
+ assertThrows(classOf[InvalidRequestException], () => alterEntityQuotas(entity, Map(("bad" -> Some(1.0))), validateOnly = true))
- @Test
- def testAlterClientQuotasBadConfigValue(): Unit = {
- val entity = new ClientQuotaEntity(Map(ClientQuotaEntity.USER -> "user").asJava)
- assertThrows(classOf[InvalidRequestException], () => alterEntityQuotas(entity, Map(ProducerByteRateProp -> Some(10000.5)), validateOnly = true))
+ entity = new ClientQuotaEntity(Map((ClientQuotaEntity.USER -> "user")).asJava)
+ assertThrows(classOf[InvalidRequestException], () => alterEntityQuotas(entity, Map((ProducerByteRateProp -> Some(10000.5))), validateOnly = true))
}
private def expectInvalidRequestWithMessage(runnable: => Unit, expectedMessage: String): Unit = {
@@ -292,7 +281,7 @@ class ClientQuotasRequestTest extends BaseRequestTest {
assertTrue(exception.getMessage.contains(expectedMessage), s"Expected message $exception to contain $expectedMessage")
}
- @Test
+ @ClusterTest
def testAlterClientQuotasInvalidEntityCombination(): Unit = {
val userAndIpEntity = new ClientQuotaEntity(Map(ClientQuotaEntity.USER -> "user", ClientQuotaEntity.IP -> "1.2.3.4").asJava)
val clientAndIpEntity = new ClientQuotaEntity(Map(ClientQuotaEntity.CLIENT_ID -> "client", ClientQuotaEntity.IP -> "1.2.3.4").asJava)
@@ -303,7 +292,7 @@ class ClientQuotasRequestTest extends BaseRequestTest {
validateOnly = true), expectedExceptionMessage)
}
- @Test
+ @ClusterTest
def testAlterClientQuotasBadIp(): Unit = {
val invalidHostPatternEntity = new ClientQuotaEntity(Map(ClientQuotaEntity.IP -> "abc-123").asJava)
val unresolvableHostEntity = new ClientQuotaEntity(Map(ClientQuotaEntity.IP -> "ip").asJava)
@@ -314,7 +303,7 @@ class ClientQuotasRequestTest extends BaseRequestTest {
validateOnly = true), expectedExceptionMessage)
}
- @Test
+ @ClusterTest
def testDescribeClientQuotasInvalidFilterCombination(): Unit = {
val ipFilterComponent = ClientQuotaFilterComponent.ofEntityType(ClientQuotaEntity.IP)
val userFilterComponent = ClientQuotaFilterComponent.ofEntityType(ClientQuotaEntity.USER)
@@ -357,7 +346,7 @@ class ClientQuotasRequestTest extends BaseRequestTest {
(matchUserClientEntities ++ matchIpEntities).foreach(e => result(e._1).get(10, TimeUnit.SECONDS))
}
- @Test
+ @ClusterTest
def testDescribeClientQuotasMatchExact(): Unit = {
setupDescribeClientQuotasMatchTest()
@@ -402,7 +391,7 @@ class ClientQuotasRequestTest extends BaseRequestTest {
}
}
- @Test
+ @ClusterTest
def testDescribeClientQuotasMatchPartial(): Unit = {
setupDescribeClientQuotasMatchTest()
@@ -509,13 +498,13 @@ class ClientQuotasRequestTest extends BaseRequestTest {
testMatchEntities(ClientQuotaFilter.containsOnly(List.empty.asJava), 0, entity => false)
}
- @Test
+ @ClusterTest
def testClientQuotasUnsupportedEntityTypes(): Unit = {
val entity = new ClientQuotaEntity(Map(("other" -> "name")).asJava)
assertThrows(classOf[UnsupportedVersionException], () => verifyDescribeEntityQuotas(entity, Map.empty))
}
- @Test
+ @ClusterTest
def testClientQuotasSanitized(): Unit = {
// An entity with name that must be sanitized when writing to Zookeeper.
val entity = new ClientQuotaEntity(Map((ClientQuotaEntity.USER -> "user with spaces")).asJava)
@@ -529,7 +518,7 @@ class ClientQuotasRequestTest extends BaseRequestTest {
))
}
- @Test
+ @ClusterTest
def testClientQuotasWithDefaultName(): Unit = {
// An entity using the name associated with the default entity name. The entity's name should be sanitized so
// that it does not conflict with the default entity name.
@@ -580,7 +569,9 @@ class ClientQuotasRequestTest extends BaseRequestTest {
private def sendDescribeClientQuotasRequest(filter: ClientQuotaFilter): DescribeClientQuotasResponse = {
val request = new DescribeClientQuotasRequest.Builder(filter).build()
- connectAndReceive[DescribeClientQuotasResponse](request, destination = controllerSocketServer)
+ IntegrationTestUtils.connectAndReceive[DescribeClientQuotasResponse](request,
+ destination = cluster.anyControllerSocketServer(),
+ listenerName = cluster.clientListener())
}
private def alterEntityQuotas(entity: ClientQuotaEntity, alter: Map[String, Option[Double]], validateOnly: Boolean) =
@@ -606,7 +597,9 @@ class ClientQuotasRequestTest extends BaseRequestTest {
private def sendAlterClientQuotasRequest(entries: Iterable[ClientQuotaAlteration], validateOnly: Boolean): AlterClientQuotasResponse = {
val request = new AlterClientQuotasRequest.Builder(entries.asJavaCollection, validateOnly).build()
- connectAndReceive[AlterClientQuotasResponse](request, destination = controllerSocketServer)
+ IntegrationTestUtils.connectAndReceive[AlterClientQuotasResponse](request,
+ destination = cluster.anyControllerSocketServer(),
+ listenerName = cluster.clientListener())
}
}
diff --git a/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala b/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala
index 0444264edd1..bbc71cad267 100644
--- a/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala
@@ -16,6 +16,8 @@
*/
package kafka.server
+import integration.kafka.server.IntegrationTestUtils
+
import java.net.Socket
import java.util.Collections
import kafka.api.{KafkaSasl, SaslSetup}
@@ -23,49 +25,53 @@ import kafka.utils.JaasTestUtils
import org.apache.kafka.common.message.SaslHandshakeRequestData
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.{ApiVersionsRequest, ApiVersionsResponse, SaslHandshakeRequest, SaslHandshakeResponse}
+import kafka.test.annotation.{ClusterTest, Type}
+import kafka.test.junit.ClusterTestExtensions
+import kafka.test.{ClusterConfig, ClusterInstance}
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.api.{AfterEach, BeforeEach}
+import org.junit.jupiter.api.extension.ExtendWith
-class SaslApiVersionsRequestTest extends AbstractApiVersionsRequestTest with SaslSetup {
- override protected def securityProtocol = SecurityProtocol.SASL_PLAINTEXT
- private val kafkaClientSaslMechanism = "PLAIN"
- private val kafkaServerSaslMechanisms = List("PLAIN")
- protected override val serverSaslProperties = Some(kafkaServerSaslProperties(kafkaServerSaslMechanisms, kafkaClientSaslMechanism))
- protected override val clientSaslProperties = Some(kafkaClientSaslProperties(kafkaClientSaslMechanism))
- override def brokerCount = 1
+import scala.jdk.CollectionConverters._
+
+
+@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
+class SaslApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVersionsRequestTest(cluster) {
+
+ val kafkaClientSaslMechanism = "PLAIN"
+ val kafkaServerSaslMechanisms = List("PLAIN")
+
+ private var sasl: SaslSetup = _
@BeforeEach
- override def setUp(): Unit = {
- startSasl(jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), KafkaSasl, JaasTestUtils.KafkaServerContextName))
- super.setUp()
+ def setupSasl(config: ClusterConfig): Unit = {
+ sasl = new SaslSetup() {}
+ sasl.startSasl(sasl.jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), KafkaSasl, JaasTestUtils.KafkaServerContextName))
+ config.saslServerProperties().putAll(sasl.kafkaServerSaslProperties(kafkaServerSaslMechanisms, kafkaClientSaslMechanism))
+ config.saslClientProperties().putAll(sasl.kafkaClientSaslProperties(kafkaClientSaslMechanism))
+ super.brokerPropertyOverrides(config.serverProperties())
}
- @AfterEach
- override def tearDown(): Unit = {
- super.tearDown()
- closeSasl()
- }
-
- @Test
+ @ClusterTest(securityProtocol = SecurityProtocol.SASL_PLAINTEXT, clusterType = Type.ZK)
def testApiVersionsRequestBeforeSaslHandshakeRequest(): Unit = {
- val socket = connect()
+ val socket = IntegrationTestUtils.connect(cluster.brokerSocketServers().asScala.head, cluster.clientListener())
try {
- val apiVersionsResponse = sendAndReceive[ApiVersionsResponse](
+ val apiVersionsResponse = IntegrationTestUtils.sendAndReceive[ApiVersionsResponse](
new ApiVersionsRequest.Builder().build(0), socket)
- validateApiVersionsResponse(apiVersionsResponse)
+ validateApiVersionsResponse(apiVersionsResponse, cluster.clientListener())
sendSaslHandshakeRequestValidateResponse(socket)
} finally {
socket.close()
}
}
- @Test
+ @ClusterTest(securityProtocol = SecurityProtocol.SASL_PLAINTEXT, clusterType = Type.ZK)
def testApiVersionsRequestAfterSaslHandshakeRequest(): Unit = {
- val socket = connect()
+ val socket = IntegrationTestUtils.connect(cluster.brokerSocketServers().asScala.head, cluster.clientListener())
try {
sendSaslHandshakeRequestValidateResponse(socket)
- val response = sendAndReceive[ApiVersionsResponse](
+ val response = IntegrationTestUtils.sendAndReceive[ApiVersionsResponse](
new ApiVersionsRequest.Builder().build(0), socket)
assertEquals(Errors.ILLEGAL_SASL_STATE.code, response.data.errorCode)
} finally {
@@ -73,26 +79,31 @@ class SaslApiVersionsRequestTest extends AbstractApiVersionsRequestTest with Sas
}
}
- @Test
+ @ClusterTest(securityProtocol = SecurityProtocol.SASL_PLAINTEXT, clusterType = Type.ZK)
def testApiVersionsRequestWithUnsupportedVersion(): Unit = {
- val socket = connect()
+ val socket = IntegrationTestUtils.connect(cluster.brokerSocketServers().asScala.head, cluster.clientListener())
try {
val apiVersionsRequest = new ApiVersionsRequest.Builder().build(0)
val apiVersionsResponse = sendUnsupportedApiVersionRequest(apiVersionsRequest)
assertEquals(Errors.UNSUPPORTED_VERSION.code, apiVersionsResponse.data.errorCode)
- val apiVersionsResponse2 = sendAndReceive[ApiVersionsResponse](
+ val apiVersionsResponse2 = IntegrationTestUtils.sendAndReceive[ApiVersionsResponse](
new ApiVersionsRequest.Builder().build(0), socket)
- validateApiVersionsResponse(apiVersionsResponse2)
+ validateApiVersionsResponse(apiVersionsResponse2, cluster.clientListener())
sendSaslHandshakeRequestValidateResponse(socket)
} finally {
socket.close()
}
}
+ @AfterEach
+ def closeSasl(): Unit = {
+ sasl.closeSasl()
+ }
+
private def sendSaslHandshakeRequestValidateResponse(socket: Socket): Unit = {
val request = new SaslHandshakeRequest(new SaslHandshakeRequestData().setMechanism("PLAIN"),
ApiKeys.SASL_HANDSHAKE.latestVersion)
- val response = sendAndReceive[SaslHandshakeResponse](request, socket)
+ val response = IntegrationTestUtils.sendAndReceive[SaslHandshakeResponse](request, socket)
assertEquals(Errors.NONE, response.error)
assertEquals(Collections.singletonList("PLAIN"), response.enabledMechanisms)
}