JUnit extensions for integration tests (#9986)

Adds JUnit 5 extension for running the same test with different types of clusters. 
See core/src/test/java/kafka/test/junit/README.md for details
This commit is contained in:
David Arthur 2021-02-09 11:49:33 -05:00 committed by GitHub
parent 1bfce16de5
commit e7e4252b0f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 1692 additions and 113 deletions

View File

@ -1013,7 +1013,10 @@ project(':core') {
}
test {
java {
srcDirs = ["src/generated/java", "src/test/java"]
srcDirs = []
}
scala {
srcDirs = ["src/test/java", "src/test/scala"]
}
}
}

View File

@ -58,4 +58,19 @@
<allow pkg="org.apache.kafka.clients" />
</subpackage>
<subpackage name="test">
<allow pkg="kafka.test.annotation"/>
<allow pkg="kafka.test.junit"/>
<allow pkg="kafka.network"/>
<allow pkg="kafka.api"/>
<allow pkg="kafka.server"/>
<allow pkg="org.apache.kafka.clients.admin"/>
<allow pkg="integration.kafka.server" class="IntegrationTestHelper"/>
<subpackage name="annotation">
<allow pkg="kafka.test"/>
</subpackage>
<subpackage name="junit">
<allow pkg="kafka.test"/>
</subpackage>
</subpackage>
</import-control>

View File

@ -23,6 +23,7 @@
<!-- core -->
<suppress checks="(NPathComplexity|ClassFanOutComplexity|CyclomaticComplexity|ClassDataAbstractionCoupling|FinalLocalVariable|LocalVariableName|MemberName|ParameterName|MethodLength|JavaNCSS|AvoidStarImport)"
files="core[\\/]src[\\/](generated|generated-test)[\\/].+.java$"/>
<suppress checks="NPathComplexity" files="ClusterTestExtensions.java"/>
<!-- Clients -->
<suppress id="dontUseSystemExit"

View File

@ -0,0 +1,207 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.test;
import kafka.test.annotation.Type;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import java.io.File;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
/**
* Represents a requested configuration of a Kafka cluster for integration testing
*/
public class ClusterConfig {
private final Type type;
private final int brokers;
private final int controllers;
private final String name;
private final boolean autoStart;
private final SecurityProtocol securityProtocol;
private final String listenerName;
private final File trustStoreFile;
private final Properties serverProperties = new Properties();
private final Properties producerProperties = new Properties();
private final Properties consumerProperties = new Properties();
private final Properties adminClientProperties = new Properties();
private final Properties saslServerProperties = new Properties();
private final Properties saslClientProperties = new Properties();
ClusterConfig(Type type, int brokers, int controllers, String name, boolean autoStart,
SecurityProtocol securityProtocol, String listenerName, File trustStoreFile) {
this.type = type;
this.brokers = brokers;
this.controllers = controllers;
this.name = name;
this.autoStart = autoStart;
this.securityProtocol = securityProtocol;
this.listenerName = listenerName;
this.trustStoreFile = trustStoreFile;
}
public Type clusterType() {
return type;
}
public int numBrokers() {
return brokers;
}
public int numControllers() {
return controllers;
}
public Optional<String> name() {
return Optional.ofNullable(name);
}
public boolean isAutoStart() {
return autoStart;
}
public Properties serverProperties() {
return serverProperties;
}
public Properties producerProperties() {
return producerProperties;
}
public Properties consumerProperties() {
return consumerProperties;
}
public Properties adminClientProperties() {
return adminClientProperties;
}
public Properties saslServerProperties() {
return saslServerProperties;
}
public Properties saslClientProperties() {
return saslClientProperties;
}
public SecurityProtocol securityProtocol() {
return securityProtocol;
}
public Optional<String> listenerName() {
return Optional.ofNullable(listenerName);
}
public Optional<File> trustStoreFile() {
return Optional.ofNullable(trustStoreFile);
}
public Map<String, String> nameTags() {
Map<String, String> tags = new LinkedHashMap<>(3);
name().ifPresent(name -> tags.put("Name", name));
tags.put("security", securityProtocol.name());
listenerName().ifPresent(listener -> tags.put("listener", listener));
return tags;
}
public ClusterConfig copyOf() {
ClusterConfig copy = new ClusterConfig(type, brokers, controllers, name, autoStart, securityProtocol, listenerName, trustStoreFile);
copy.serverProperties.putAll(serverProperties);
copy.producerProperties.putAll(producerProperties);
copy.consumerProperties.putAll(consumerProperties);
copy.saslServerProperties.putAll(saslServerProperties);
copy.saslClientProperties.putAll(saslClientProperties);
return copy;
}
public static Builder defaultClusterBuilder() {
return new Builder(Type.ZK, 1, 1, true, SecurityProtocol.PLAINTEXT);
}
public static Builder clusterBuilder(Type type, int brokers, int controllers, boolean autoStart, SecurityProtocol securityProtocol) {
return new Builder(type, brokers, controllers, autoStart, securityProtocol);
}
public static class Builder {
private Type type;
private int brokers;
private int controllers;
private String name;
private boolean autoStart;
private SecurityProtocol securityProtocol;
private String listenerName;
private File trustStoreFile;
Builder(Type type, int brokers, int controllers, boolean autoStart, SecurityProtocol securityProtocol) {
this.type = type;
this.brokers = brokers;
this.controllers = controllers;
this.autoStart = autoStart;
this.securityProtocol = securityProtocol;
}
public Builder type(Type type) {
this.type = type;
return this;
}
public Builder brokers(int brokers) {
this.brokers = brokers;
return this;
}
public Builder controllers(int controllers) {
this.controllers = controllers;
return this;
}
public Builder name(String name) {
this.name = name;
return this;
}
public Builder autoStart(boolean autoStart) {
this.autoStart = autoStart;
return this;
}
public Builder securityProtocol(SecurityProtocol securityProtocol) {
this.securityProtocol = securityProtocol;
return this;
}
public Builder listenerName(String listenerName) {
this.listenerName = listenerName;
return this;
}
public Builder trustStoreFile(File trustStoreFile) {
this.trustStoreFile = trustStoreFile;
return this;
}
public ClusterConfig build() {
return new ClusterConfig(type, brokers, controllers, name, autoStart, securityProtocol, listenerName, trustStoreFile);
}
}
}

View File

@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.test;
import java.util.function.Consumer;
@FunctionalInterface
public interface ClusterGenerator extends Consumer<ClusterConfig> {
}

View File

@ -0,0 +1,97 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.test;
import kafka.network.SocketServer;
import kafka.test.annotation.ClusterTest;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.common.network.ListenerName;
import java.util.Collection;
import java.util.Properties;
public interface ClusterInstance {
enum ClusterType {
ZK,
// RAFT
}
/**
* Cluster type. For now, only ZK is supported.
*/
ClusterType clusterType();
/**
* The cluster configuration used to create this cluster. Changing data in this instance through this accessor will
* have no affect on the cluster since it is already provisioned.
*/
ClusterConfig config();
/**
* The listener for this cluster as configured by {@link ClusterTest} or by {@link ClusterConfig}. If
* unspecified by those sources, this will return the listener for the default security protocol PLAINTEXT
*/
ListenerName clientListener();
/**
* The broker connect string which can be used by clients for bootstrapping
*/
String bootstrapServers();
/**
* A collection of all brokers in the cluster. In ZK-based clusters this will also include the broker which is
* acting as the controller (since ZK controllers serve both broker and controller roles).
*/
Collection<SocketServer> brokerSocketServers();
/**
* A collection of all controllers in the cluster. For ZK-based clusters, this will return the broker which is also
* currently the active controller. For Raft-based clusters, this will return all controller servers.
*/
Collection<SocketServer> controllerSocketServers();
/**
* Return any one of the broker servers. Throw an error if none are found
*/
SocketServer anyBrokerSocketServer();
/**
* Return any one of the controller servers. Throw an error if none are found
*/
SocketServer anyControllerSocketServer();
/**
* The underlying object which is responsible for setting up and tearing down the cluster.
*/
Object getUnderlying();
default <T> T getUnderlying(Class<T> asClass) {
return asClass.cast(getUnderlying());
}
Admin createAdminClient(Properties configOverrides);
default Admin createAdminClient() {
return createAdminClient(new Properties());
}
void start();
void stop();
}

View File

@ -0,0 +1,112 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.test;
import kafka.test.annotation.AutoStart;
import kafka.test.annotation.ClusterConfigProperty;
import kafka.test.annotation.ClusterTemplate;
import kafka.test.annotation.ClusterTest;
import kafka.test.annotation.ClusterTestDefaults;
import kafka.test.annotation.ClusterTests;
import kafka.test.annotation.Type;
import kafka.test.junit.ClusterTestExtensions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.extension.ExtendWith;
@ClusterTestDefaults(clusterType = Type.ZK) // Set defaults for a few params in @ClusterTest(s)
@ExtendWith(ClusterTestExtensions.class)
public class ClusterTestExtensionsTest {
private final ClusterInstance clusterInstance;
private final ClusterConfig config;
ClusterTestExtensionsTest(ClusterInstance clusterInstance, ClusterConfig config) { // Constructor injections
this.clusterInstance = clusterInstance;
this.config = config;
}
// Static methods can generate cluster configurations
static void generate1(ClusterGenerator clusterGenerator) {
clusterGenerator.accept(ClusterConfig.defaultClusterBuilder().name("Generated Test").build());
}
// BeforeEach run after class construction, but before cluster initialization and test invocation
@BeforeEach
public void beforeEach(ClusterConfig config) {
Assertions.assertSame(this.config, config, "Injected objects should be the same");
config.serverProperties().put("before", "each");
}
// AfterEach runs after test invocation and cluster teardown
@AfterEach
public void afterEach(ClusterConfig config) {
Assertions.assertSame(this.config, config, "Injected objects should be the same");
}
// With no params, configuration comes from the annotation defaults as well as @ClusterTestDefaults (if present)
@ClusterTest
public void testClusterTest(ClusterConfig config, ClusterInstance clusterInstance) {
Assertions.assertSame(this.config, config, "Injected objects should be the same");
Assertions.assertSame(this.clusterInstance, clusterInstance, "Injected objects should be the same");
Assertions.assertEquals(clusterInstance.clusterType(), ClusterInstance.ClusterType.ZK); // From the class level default
Assertions.assertEquals(clusterInstance.config().serverProperties().getProperty("before"), "each");
}
// generate1 is a template method which generates any number of cluster configs
@ClusterTemplate("generate1")
public void testClusterTemplate() {
Assertions.assertEquals(clusterInstance.clusterType(), ClusterInstance.ClusterType.ZK,
"generate1 provided a Zk cluster, so we should see that here");
Assertions.assertEquals(clusterInstance.config().name().orElse(""), "Generated Test",
"generate 1 named this cluster config, so we should see that here");
Assertions.assertEquals(clusterInstance.config().serverProperties().getProperty("before"), "each");
}
// Multiple @ClusterTest can be used with @ClusterTests
@ClusterTests({
@ClusterTest(name = "cluster-tests-1", clusterType = Type.ZK, serverProperties = {
@ClusterConfigProperty(key = "foo", value = "bar"),
@ClusterConfigProperty(key = "spam", value = "eggs")
}),
@ClusterTest(name = "cluster-tests-2", clusterType = Type.ZK, serverProperties = {
@ClusterConfigProperty(key = "foo", value = "baz"),
@ClusterConfigProperty(key = "spam", value = "eggz")
})
})
public void testClusterTests() {
if (clusterInstance.config().name().filter(name -> name.equals("cluster-tests-1")).isPresent()) {
Assertions.assertEquals(clusterInstance.config().serverProperties().getProperty("foo"), "bar");
Assertions.assertEquals(clusterInstance.config().serverProperties().getProperty("spam"), "eggs");
} else if (clusterInstance.config().name().filter(name -> name.equals("cluster-tests-2")).isPresent()) {
Assertions.assertEquals(clusterInstance.config().serverProperties().getProperty("foo"), "baz");
Assertions.assertEquals(clusterInstance.config().serverProperties().getProperty("spam"), "eggz");
} else {
Assertions.fail("Unknown cluster config " + clusterInstance.config().name());
}
}
@ClusterTest(autoStart = AutoStart.NO)
public void testNoAutoStart() {
Assertions.assertThrows(RuntimeException.class, clusterInstance::anyBrokerSocketServer);
clusterInstance.start();
Assertions.assertNotNull(clusterInstance.anyBrokerSocketServer());
}
}

View File

@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.test.annotation;
public enum AutoStart {
YES,
NO,
DEFAULT
}

View File

@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.test.annotation;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Documented
@Target({ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface ClusterConfigProperty {
String key();
String value();
}

View File

@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.test.annotation;
import kafka.test.ClusterConfig;
import kafka.test.ClusterGenerator;
import org.junit.jupiter.api.TestTemplate;
import java.lang.annotation.Documented;
import java.lang.annotation.Retention;
import java.lang.annotation.Target;
import static java.lang.annotation.ElementType.METHOD;
import static java.lang.annotation.RetentionPolicy.RUNTIME;
/**
* Used to indicate that a test should call the method given by {@link #value()} to generate a number of
* cluster configurations. The method specified by the value should accept a single argument of the type
* {@link ClusterGenerator}. Any return value from the method is ignore. A test invocation
* will be generated for each {@link ClusterConfig} provided to the ClusterGenerator instance.
*
* The method given here must be static since it is invoked before any tests are actually run. Each test generated
* by this annotation will run as if it was defined as a separate test method with its own
* {@link org.junit.jupiter.api.Test}. That is to say, each generated test invocation will have a separate lifecycle.
*
* This annotation may be used in conjunction with {@link ClusterTest} and {@link ClusterTests} which also yield
* ClusterConfig instances.
*
* For Scala tests, the method should be defined in a companion object with the same name as the test class.
*/
@Documented
@Target({METHOD})
@Retention(RUNTIME)
@TestTemplate
public @interface ClusterTemplate {
/**
* Specify the static method used for generating cluster configs
*/
String value();
}

View File

@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.test.annotation;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.junit.jupiter.api.TestTemplate;
import java.lang.annotation.Documented;
import java.lang.annotation.Retention;
import java.lang.annotation.Target;
import static java.lang.annotation.ElementType.METHOD;
import static java.lang.annotation.RetentionPolicy.RUNTIME;
@Documented
@Target({METHOD})
@Retention(RUNTIME)
@TestTemplate
public @interface ClusterTest {
Type clusterType() default Type.DEFAULT;
int brokers() default 0;
int controllers() default 0;
AutoStart autoStart() default AutoStart.DEFAULT;
String name() default "";
SecurityProtocol securityProtocol() default SecurityProtocol.PLAINTEXT;
String listener() default "";
ClusterConfigProperty[] serverProperties() default {};
}

View File

@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.test.annotation;
import kafka.test.junit.ClusterTestExtensions;
import java.lang.annotation.Documented;
import java.lang.annotation.Retention;
import java.lang.annotation.Target;
import static java.lang.annotation.ElementType.TYPE;
import static java.lang.annotation.RetentionPolicy.RUNTIME;
/**
* Used to set class level defaults for any test template methods annotated with {@link ClusterTest} or
* {@link ClusterTests}. The default values here are also used as the source for defaults in
* {@link ClusterTestExtensions}.
*/
@Documented
@Target({TYPE})
@Retention(RUNTIME)
public @interface ClusterTestDefaults {
Type clusterType() default Type.ZK;
int brokers() default 1;
int controllers() default 1;
boolean autoStart() default true;
}

View File

@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.test.annotation;
import org.junit.jupiter.api.TestTemplate;
import java.lang.annotation.Documented;
import java.lang.annotation.Retention;
import java.lang.annotation.Target;
import static java.lang.annotation.ElementType.METHOD;
import static java.lang.annotation.RetentionPolicy.RUNTIME;
@Documented
@Target({METHOD})
@Retention(RUNTIME)
@TestTemplate
public @interface ClusterTests {
ClusterTest[] value();
}

View File

@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.test.annotation;
/**
* The type of cluster config being requested. Used by {@link kafka.test.ClusterConfig} and the test annotations.
*/
public enum Type {
// RAFT,
ZK,
BOTH,
DEFAULT
}

View File

@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.test.junit;
import kafka.test.ClusterInstance;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.jupiter.api.extension.ParameterContext;
import org.junit.jupiter.api.extension.ParameterResolver;
import java.lang.reflect.Executable;
import static org.junit.platform.commons.util.AnnotationUtils.isAnnotated;
/**
* This resolver provides an instance of {@link ClusterInstance} to a test invocation. The instance represents the
* underlying cluster being run for the current test. It can be injected into test methods or into the class
* constructor.
*
* N.B., if injected into the class constructor, the instance will not be fully initialized until the actual test method
* is being invoked. This is because the cluster is not started until after class construction and after "before"
* lifecycle methods have been run. Constructor injection is meant for convenience so helper methods can be defined on
* the test which can rely on a class member rather than an argument for ClusterInstance.
*/
public class ClusterInstanceParameterResolver implements ParameterResolver {
private final ClusterInstance clusterInstance;
ClusterInstanceParameterResolver(ClusterInstance clusterInstance) {
this.clusterInstance = clusterInstance;
}
@Override
public boolean supportsParameter(ParameterContext parameterContext, ExtensionContext extensionContext) {
if (!parameterContext.getParameter().getType().equals(ClusterInstance.class)) {
return false;
}
if (!extensionContext.getTestMethod().isPresent()) {
// Allow this to be injected into the class
extensionContext.getRequiredTestClass();
return true;
} else {
// If we're injecting into a method, make sure it's a test method and not a lifecycle method
Executable parameterizedMethod = parameterContext.getParameter().getDeclaringExecutable();
return isAnnotated(parameterizedMethod, TestTemplate.class);
}
}
@Override
public Object resolveParameter(ParameterContext parameterContext, ExtensionContext extensionContext) {
return clusterInstance;
}
}

View File

@ -0,0 +1,220 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.test.junit;
import kafka.test.ClusterConfig;
import kafka.test.ClusterGenerator;
import kafka.test.annotation.ClusterTestDefaults;
import kafka.test.annotation.ClusterConfigProperty;
import kafka.test.annotation.ClusterTemplate;
import kafka.test.annotation.ClusterTest;
import kafka.test.annotation.ClusterTests;
import kafka.test.annotation.Type;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.jupiter.api.extension.TestTemplateInvocationContext;
import org.junit.jupiter.api.extension.TestTemplateInvocationContextProvider;
import org.junit.platform.commons.util.ReflectionUtils;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.function.Consumer;
import java.util.stream.Stream;
/**
* This class is a custom JUnit extension that will generate some number of test invocations depending on the processing
* of a few custom annotations. These annotations are placed on so-called test template methods. Template methods look
* like normal JUnit test methods, but instead of being invoked directly, they are used as templates for generating
* multiple test invocations.
*
* Test class that use this extension should use one of the following annotations on each template method:
*
* <ul>
* <li>{@link ClusterTest}, define a single cluster configuration</li>
* <li>{@link ClusterTests}, provide multiple instances of @ClusterTest</li>
* <li>{@link ClusterTemplate}, define a static method that generates cluster configurations</li>
* </ul>
*
* Any combination of these annotations may be used on a given test template method. If no test invocations are
* generated after processing the annotations, an error is thrown.
*
* Depending on which annotations are used, and what values are given, different {@link ClusterConfig} will be
* generated. Each ClusterConfig is used to create an underlying Kafka cluster that is used for the actual test
* invocation.
*
* For example:
*
* <pre>
* &#64;ExtendWith(value = Array(classOf[ClusterTestExtensions]))
* class SomeIntegrationTest {
* &#64;ClusterTest(brokers = 1, controllers = 1, clusterType = ClusterType.Both)
* def someTest(): Unit = {
* assertTrue(condition)
* }
* }
* </pre>
*
* will generate two invocations of "someTest" (since ClusterType.Both was given). For each invocation, the test class
* SomeIntegrationTest will be instantiated, lifecycle methods (before/after) will be run, and "someTest" will be invoked.
*
**/
public class ClusterTestExtensions implements TestTemplateInvocationContextProvider {
@Override
public boolean supportsTestTemplate(ExtensionContext context) {
return true;
}
@Override
public Stream<TestTemplateInvocationContext> provideTestTemplateInvocationContexts(ExtensionContext context) {
ClusterTestDefaults defaults = getClusterTestDefaults(context.getRequiredTestClass());
List<TestTemplateInvocationContext> generatedContexts = new ArrayList<>();
// Process the @ClusterTemplate annotation
ClusterTemplate clusterTemplateAnnot = context.getRequiredTestMethod().getDeclaredAnnotation(ClusterTemplate.class);
if (clusterTemplateAnnot != null) {
processClusterTemplate(context, clusterTemplateAnnot, generatedContexts::add);
if (generatedContexts.size() == 0) {
throw new IllegalStateException("ClusterConfig generator method should provide at least one config");
}
}
// Process single @ClusterTest annotation
ClusterTest clusterTestAnnot = context.getRequiredTestMethod().getDeclaredAnnotation(ClusterTest.class);
if (clusterTestAnnot != null) {
processClusterTest(clusterTestAnnot, defaults, generatedContexts::add);
}
// Process multiple @ClusterTest annotation within @ClusterTests
ClusterTests clusterTestsAnnot = context.getRequiredTestMethod().getDeclaredAnnotation(ClusterTests.class);
if (clusterTestsAnnot != null) {
for (ClusterTest annot : clusterTestsAnnot.value()) {
processClusterTest(annot, defaults, generatedContexts::add);
}
}
if (generatedContexts.size() == 0) {
throw new IllegalStateException("Please annotate test methods with @ClusterTemplate, @ClusterTest, or " +
"@ClusterTests when using the ClusterTestExtensions provider");
}
return generatedContexts.stream();
}
private void processClusterTemplate(ExtensionContext context, ClusterTemplate annot,
Consumer<TestTemplateInvocationContext> testInvocations) {
// If specified, call cluster config generated method (must be static)
List<ClusterConfig> generatedClusterConfigs = new ArrayList<>();
if (!annot.value().isEmpty()) {
generateClusterConfigurations(context, annot.value(), generatedClusterConfigs::add);
} else {
// Ensure we have at least one cluster config
generatedClusterConfigs.add(ClusterConfig.defaultClusterBuilder().build());
}
generatedClusterConfigs.forEach(config -> {
if (config.clusterType() == Type.ZK) {
testInvocations.accept(new ZkClusterInvocationContext(config.copyOf()));
} else {
throw new IllegalStateException("Unknown cluster type " + config.clusterType());
}
});
}
private void generateClusterConfigurations(ExtensionContext context, String generateClustersMethods, ClusterGenerator generator) {
Object testInstance = context.getTestInstance().orElse(null);
Method method = ReflectionUtils.getRequiredMethod(context.getRequiredTestClass(), generateClustersMethods, ClusterGenerator.class);
ReflectionUtils.invokeMethod(method, testInstance, generator);
}
private void processClusterTest(ClusterTest annot, ClusterTestDefaults defaults,
Consumer<TestTemplateInvocationContext> testInvocations) {
final Type type;
if (annot.clusterType() == Type.DEFAULT) {
type = defaults.clusterType();
} else {
type = annot.clusterType();
}
final int brokers;
if (annot.brokers() == 0) {
brokers = defaults.brokers();
} else {
brokers = annot.brokers();
}
final int controllers;
if (annot.controllers() == 0) {
controllers = defaults.controllers();
} else {
controllers = annot.controllers();
}
if (brokers <= 0 || controllers <= 0) {
throw new IllegalArgumentException("Number of brokers/controllers must be greater than zero.");
}
final boolean autoStart;
switch (annot.autoStart()) {
case YES:
autoStart = true;
break;
case NO:
autoStart = false;
break;
case DEFAULT:
autoStart = defaults.autoStart();
break;
default:
throw new IllegalStateException();
}
ClusterConfig.Builder builder = ClusterConfig.clusterBuilder(type, brokers, controllers, autoStart, annot.securityProtocol());
if (!annot.name().isEmpty()) {
builder.name(annot.name());
}
if (!annot.listener().isEmpty()) {
builder.listenerName(annot.listener());
}
Properties properties = new Properties();
for (ClusterConfigProperty property : annot.serverProperties()) {
properties.put(property.key(), property.value());
}
switch (type) {
case ZK:
case BOTH:
ClusterConfig config = builder.build();
config.serverProperties().putAll(properties);
testInvocations.accept(new ZkClusterInvocationContext(config));
break;
}
}
private ClusterTestDefaults getClusterTestDefaults(Class<?> testClass) {
return Optional.ofNullable(testClass.getDeclaredAnnotation(ClusterTestDefaults.class))
.orElseGet(() -> EmptyClass.class.getDeclaredAnnotation(ClusterTestDefaults.class));
}
@ClusterTestDefaults
private final static class EmptyClass {
// Just used as a convenience to get default values from the annotation
}
}

View File

@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.test.junit;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.jupiter.api.extension.ParameterContext;
import org.junit.jupiter.api.extension.ParameterResolver;
/**
* This resolver is used for supplying any type of object to the test invocation. It does not restrict where the given
* type can be injected, it simply checks if the requested injection type matches the type given in the constructor. If
* it matches, the given object is returned.
*
* This is useful for injecting helper objects and objects which can be fully initialized before the test lifecycle
* begins.
*/
public class GenericParameterResolver<T> implements ParameterResolver {
private final T instance;
private final Class<T> clazz;
GenericParameterResolver(T instance, Class<T> clazz) {
this.instance = instance;
this.clazz = clazz;
}
@Override
public boolean supportsParameter(ParameterContext parameterContext, ExtensionContext extensionContext) {
return parameterContext.getParameter().getType().equals(clazz);
}
@Override
public Object resolveParameter(ParameterContext parameterContext, ExtensionContext extensionContext) {
return instance;
}
}

View File

@ -0,0 +1,139 @@
This document describes a custom JUnit extension which allows for running the same JUnit tests against multiple Kafka
cluster configurations.
# Annotations
A new `@ClusterTest` annotation is introduced which allows for a test to declaratively configure an underlying Kafka cluster.
```scala
@ClusterTest
def testSomething(): Unit = { ... }
```
This annotation has fields for cluster type and number of brokers, as well as commonly parameterized configurations.
Arbitrary server properties can also be provided in the annotation:
```java
@ClusterTest(clusterType = Type.Zk, securityProtocol = "PLAINTEXT", properties = {
@ClusterProperty(key = "inter.broker.protocol.version", value = "2.7-IV2"),
@ClusterProperty(key = "socket.send.buffer.bytes", value = "10240"),
})
void testSomething() { ... }
```
Multiple `@ClusterTest` annotations can be given to generate more than one test invocation for the annotated method.
```scala
@ClusterTests(Array(
@ClusterTest(securityProtocol = "PLAINTEXT"),
@ClusterTest(securityProtocol = "SASL_PLAINTEXT")
))
def testSomething(): Unit = { ... }
```
A class-level `@ClusterTestDefaults` annotation is added to provide default values for `@ClusterTest` defined within
the class. The intention here is to reduce repetitive annotation declarations and also make changing defaults easier
for a class with many test cases.
# Dynamic Configuration
In order to allow for more flexible cluster configuration, a `@ClusterTemplate` annotation is also introduced. This
annotation takes a single string value which references a static method on the test class. This method is used to
produce any number of test configurations using a fluent builder style API.
```java
@ClusterTemplate("generateConfigs")
void testSomething() { ... }
static void generateConfigs(ClusterGenerator clusterGenerator) {
clusterGenerator.accept(ClusterConfig.defaultClusterBuilder()
.name("Generated Test 1")
.serverProperties(props1)
.ibp("2.7-IV1")
.build());
clusterGenerator.accept(ClusterConfig.defaultClusterBuilder()
.name("Generated Test 2")
.serverProperties(props2)
.ibp("2.7-IV2")
.build());
clusterGenerator.accept(ClusterConfig.defaultClusterBuilder()
.name("Generated Test 3")
.serverProperties(props3)
.build());
}
```
This "escape hatch" from the simple declarative style configuration makes it easy to dynamically configure clusters.
# JUnit Extension
One thing to note is that our "test*" methods are no longer _tests_, but rather they are test templates. We have added
a JUnit extension called `ClusterTestExtensions` which knows how to process these annotations in order to generate test
invocations. Test classes that wish to make use of these annotations need to explicitly register this extension:
```scala
import kafka.test.junit.ClusterTestExtensions
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
class ApiVersionsRequestTest {
...
}
```
# JUnit Lifecycle
The lifecycle of a test class that is extended with `ClusterTestExtensions` follows:
* JUnit discovers test template methods that are annotated with `@ClusterTest`, `@ClusterTests`, or `@ClusterTemplate`
* `ClusterTestExtensions` is called for each of these template methods in order to generate some number of test invocations
For each generated invocation:
* Static `@BeforeAll` methods are called
* Test class is instantiated
* Non-static `@BeforeEach` methods are called
* Kafka Cluster is started
* Test method is invoked
* Kafka Cluster is stopped
* Non-static `@AfterEach` methods are called
* Static `@AfterAll` methods are called
`@BeforeEach` methods give an opportunity to setup additional test dependencies before the cluster is started.
# Dependency Injection
A few classes are introduced to provide context to the underlying cluster and to provide reusable functionality that was
previously garnered from the test hierarchy.
* ClusterConfig: a mutable cluster configuration, includes cluster type, number of brokers, properties, etc
* ClusterInstance: a shim to the underlying class that actually runs the cluster, provides access to things like SocketServers
* IntegrationTestHelper: connection related functions taken from IntegrationTestHarness and BaseRequestTest
In order to have one of these objects injected, simply add it as a parameter to your test class, `@BeforeEach` method, or test method.
| Injection | Class | BeforeEach | Test | Notes
| --- | --- | --- | --- | --- |
| ClusterConfig | yes | yes | yes* | Once in the test, changing config has no effect |
| ClusterInstance | yes* | no | yes | Injectable at class level for convenience, can only be accessed inside test |
| IntegrationTestHelper | yes | yes | yes | - |
```scala
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
class SomeTestClass(helper: IntegrationTestHelper) {
@BeforeEach
def setup(config: ClusterConfig): Unit = {
config.serverProperties().put("foo", "bar")
}
@ClusterTest
def testSomething(cluster: ClusterInstance): Unit = {
val topics = cluster.createAdminClient().listTopics()
}
}
```
# Gotchas
* Test methods annotated with JUnit's `@Test` will still be run, but no cluster will be started and no dependency
injection will happen. This is generally not what you want
* Even though ClusterConfig is accessible and mutable inside the test method, changing it will have no affect on the cluster

View File

@ -0,0 +1,252 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.test.junit;
import kafka.api.IntegrationTestHarness;
import kafka.network.SocketServer;
import kafka.server.KafkaServer;
import kafka.utils.TestUtils;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import kafka.test.ClusterConfig;
import kafka.test.ClusterInstance;
import org.junit.jupiter.api.extension.AfterTestExecutionCallback;
import org.junit.jupiter.api.extension.BeforeTestExecutionCallback;
import org.junit.jupiter.api.extension.Extension;
import org.junit.jupiter.api.extension.TestTemplateInvocationContext;
import scala.Option;
import scala.collection.JavaConverters;
import scala.compat.java8.OptionConverters;
import java.io.File;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
/**
* Wraps a {@link IntegrationTestHarness} inside lifecycle methods for a test invocation. Each instance of this
* class is provided with a configuration for the cluster.
*
* This context also provides parameter resolvers for:
*
* <ul>
* <li>ClusterConfig (the same instance passed to the constructor)</li>
* <li>ClusterInstance (includes methods to expose underlying SocketServer-s)</li>
* <li>IntegrationTestHelper (helper methods)</li>
* </ul>
*/
public class ZkClusterInvocationContext implements TestTemplateInvocationContext {
private final ClusterConfig clusterConfig;
private final AtomicReference<IntegrationTestHarness> clusterReference;
public ZkClusterInvocationContext(ClusterConfig clusterConfig) {
this.clusterConfig = clusterConfig;
this.clusterReference = new AtomicReference<>();
}
@Override
public String getDisplayName(int invocationIndex) {
String clusterDesc = clusterConfig.nameTags().entrySet().stream()
.map(Object::toString)
.collect(Collectors.joining(", "));
return String.format("[Zk %d] %s", invocationIndex, clusterDesc);
}
@Override
public List<Extension> getAdditionalExtensions() {
if (clusterConfig.numControllers() != 1) {
throw new IllegalArgumentException("For ZK clusters, please specify exactly 1 controller.");
}
ClusterInstance clusterShim = new ZkClusterInstance(clusterConfig, clusterReference);
return Arrays.asList(
(BeforeTestExecutionCallback) context -> {
// We have to wait to actually create the underlying cluster until after our @BeforeEach methods
// have run. This allows tests to set up external dependencies like ZK, MiniKDC, etc.
// However, since we cannot create this instance until we are inside the test invocation, we have
// to use a container class (AtomicReference) to provide this cluster object to the test itself
// This is what tests normally extend from to start a cluster, here we create it anonymously and
// configure the cluster using values from ClusterConfig
IntegrationTestHarness cluster = new IntegrationTestHarness() {
@Override
public Properties serverConfig() {
return clusterConfig.serverProperties();
}
@Override
public Properties adminClientConfig() {
return clusterConfig.adminClientProperties();
}
@Override
public Properties consumerConfig() {
return clusterConfig.consumerProperties();
}
@Override
public Properties producerConfig() {
return clusterConfig.producerProperties();
}
@Override
public SecurityProtocol securityProtocol() {
return clusterConfig.securityProtocol();
}
@Override
public ListenerName listenerName() {
return clusterConfig.listenerName().map(ListenerName::normalised)
.orElseGet(() -> ListenerName.forSecurityProtocol(securityProtocol()));
}
@Override
public Option<Properties> serverSaslProperties() {
if (clusterConfig.saslServerProperties().isEmpty()) {
return Option.empty();
} else {
return Option.apply(clusterConfig.saslServerProperties());
}
}
@Override
public Option<Properties> clientSaslProperties() {
if (clusterConfig.saslClientProperties().isEmpty()) {
return Option.empty();
} else {
return Option.apply(clusterConfig.saslClientProperties());
}
}
@Override
public int brokerCount() {
// Controllers are also brokers in zk mode, so just use broker count
return clusterConfig.numBrokers();
}
@Override
public Option<File> trustStoreFile() {
return OptionConverters.toScala(clusterConfig.trustStoreFile());
}
};
clusterReference.set(cluster);
if (clusterConfig.isAutoStart()) {
clusterShim.start();
}
},
(AfterTestExecutionCallback) context -> clusterShim.stop(),
new ClusterInstanceParameterResolver(clusterShim),
new GenericParameterResolver<>(clusterConfig, ClusterConfig.class)
);
}
public static class ZkClusterInstance implements ClusterInstance {
final AtomicReference<IntegrationTestHarness> clusterReference;
final ClusterConfig config;
final AtomicBoolean started = new AtomicBoolean(false);
final AtomicBoolean stopped = new AtomicBoolean(false);
ZkClusterInstance(ClusterConfig config, AtomicReference<IntegrationTestHarness> clusterReference) {
this.config = config;
this.clusterReference = clusterReference;
}
@Override
public String bootstrapServers() {
return TestUtils.bootstrapServers(clusterReference.get().servers(), clusterReference.get().listenerName());
}
@Override
public Collection<SocketServer> brokerSocketServers() {
return JavaConverters.asJavaCollection(clusterReference.get().servers()).stream()
.map(KafkaServer::socketServer)
.collect(Collectors.toList());
}
@Override
public ListenerName clientListener() {
return clusterReference.get().listenerName();
}
@Override
public Collection<SocketServer> controllerSocketServers() {
return JavaConverters.asJavaCollection(clusterReference.get().servers()).stream()
.filter(broker -> broker.kafkaController().isActive())
.map(KafkaServer::socketServer)
.collect(Collectors.toList());
}
@Override
public SocketServer anyBrokerSocketServer() {
return JavaConverters.asJavaCollection(clusterReference.get().servers()).stream()
.map(KafkaServer::socketServer)
.findFirst()
.orElseThrow(() -> new RuntimeException("No broker SocketServers found"));
}
@Override
public SocketServer anyControllerSocketServer() {
return JavaConverters.asJavaCollection(clusterReference.get().servers()).stream()
.filter(broker -> broker.kafkaController().isActive())
.map(KafkaServer::socketServer)
.findFirst()
.orElseThrow(() -> new RuntimeException("No broker SocketServers found"));
}
@Override
public ClusterType clusterType() {
return ClusterType.ZK;
}
@Override
public ClusterConfig config() {
return config;
}
@Override
public IntegrationTestHarness getUnderlying() {
return clusterReference.get();
}
@Override
public Admin createAdminClient(Properties configOverrides) {
return clusterReference.get().createAdminClient(configOverrides);
}
@Override
public void start() {
if (started.compareAndSet(false, true)) {
clusterReference.get().setUp();
}
}
@Override
public void stop() {
if (stopped.compareAndSet(false, true)) {
clusterReference.get().tearDown();
}
}
}
}

View File

@ -97,7 +97,7 @@ trait SaslSetup {
(serverKeytabFile.get, clientKeytabFile.get)
}
protected def jaasSections(kafkaServerSaslMechanisms: Seq[String],
def jaasSections(kafkaServerSaslMechanisms: Seq[String],
kafkaClientSaslMechanism: Option[String],
mode: SaslSetupMode = Both,
kafkaServerEntryName: String = JaasTestUtils.KafkaServerContextName): Seq[JaasSection] = {

View File

@ -0,0 +1,115 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package integration.kafka.server
import kafka.network.SocketServer
import kafka.utils.{NotNothing, TestUtils}
import org.apache.kafka.common.network.{ListenerName, Mode}
import org.apache.kafka.common.protocol.ApiKeys
import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, RequestHeader, RequestTestUtils, ResponseHeader}
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.Utils
import java.io.{DataInputStream, DataOutputStream}
import java.net.Socket
import java.nio.ByteBuffer
import java.util.Properties
import scala.annotation.nowarn
import scala.reflect.ClassTag
object IntegrationTestUtils {
private def sendRequest(socket: Socket, request: Array[Byte]): Unit = {
val outgoing = new DataOutputStream(socket.getOutputStream)
outgoing.writeInt(request.length)
outgoing.write(request)
outgoing.flush()
}
def sendWithHeader(request: AbstractRequest, header: RequestHeader, socket: Socket): Unit = {
val serializedBytes = Utils.toArray(RequestTestUtils.serializeRequestWithHeader(header, request))
sendRequest(socket, serializedBytes)
}
def nextRequestHeader[T <: AbstractResponse](apiKey: ApiKeys,
apiVersion: Short,
clientId: String = "client-id",
correlationIdOpt: Option[Int] = None): RequestHeader = {
val correlationId = correlationIdOpt.getOrElse {
this.correlationId += 1
this.correlationId
}
new RequestHeader(apiKey, apiVersion, clientId, correlationId)
}
def send(request: AbstractRequest,
socket: Socket,
clientId: String = "client-id",
correlationId: Option[Int] = None): Unit = {
val header = nextRequestHeader(request.apiKey, request.version, clientId, correlationId)
sendWithHeader(request, header, socket)
}
def receive[T <: AbstractResponse](socket: Socket, apiKey: ApiKeys, version: Short)
(implicit classTag: ClassTag[T], @nowarn("cat=unused") nn: NotNothing[T]): T = {
val incoming = new DataInputStream(socket.getInputStream)
val len = incoming.readInt()
val responseBytes = new Array[Byte](len)
incoming.readFully(responseBytes)
val responseBuffer = ByteBuffer.wrap(responseBytes)
ResponseHeader.parse(responseBuffer, apiKey.responseHeaderVersion(version))
AbstractResponse.parseResponse(apiKey, responseBuffer, version) match {
case response: T => response
case response =>
throw new ClassCastException(s"Expected response with type ${classTag.runtimeClass}, but found ${response.getClass}")
}
}
def sendAndReceive[T <: AbstractResponse](request: AbstractRequest,
socket: Socket,
clientId: String = "client-id",
correlationId: Option[Int] = None)
(implicit classTag: ClassTag[T], nn: NotNothing[T]): T = {
send(request, socket, clientId, correlationId)
receive[T](socket, request.apiKey, request.version)
}
def connectAndReceive[T <: AbstractResponse](request: AbstractRequest,
destination: SocketServer,
listenerName: ListenerName)
(implicit classTag: ClassTag[T], nn: NotNothing[T]): T = {
val socket = connect(destination, listenerName)
try sendAndReceive[T](request, socket)
finally socket.close()
}
protected def securityProtocol: SecurityProtocol = SecurityProtocol.PLAINTEXT
private var correlationId = 0
def connect(socketServer: SocketServer,
listenerName: ListenerName): Socket = {
new Socket("localhost", socketServer.boundPort(listenerName))
}
def clientSecurityProps(certAlias: String): Properties = {
TestUtils.securityConfigs(Mode.CLIENT, securityProtocol, None, certAlias, TestUtils.SslCertificateCn, None) // TODO use real trust store and client SASL properties
}
}

View File

@ -16,21 +16,28 @@
*/
package kafka.server
import java.util.Properties
import integration.kafka.server.IntegrationTestUtils
import kafka.test.ClusterInstance
import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.ApiKeys
import org.apache.kafka.common.requests.{ApiVersionsRequest, ApiVersionsResponse}
import org.junit.jupiter.api.Assertions._
import java.util.Properties
import scala.jdk.CollectionConverters._
abstract class AbstractApiVersionsRequestTest extends BaseRequestTest {
abstract class AbstractApiVersionsRequestTest(cluster: ClusterInstance) {
def sendApiVersionsRequest(request: ApiVersionsRequest, listenerName: ListenerName): ApiVersionsResponse = {
IntegrationTestUtils.connectAndReceive[ApiVersionsResponse](request, cluster.brokerSocketServers().asScala.head, listenerName)
}
def controlPlaneListenerName = new ListenerName("CONTROLLER")
// Configure control plane listener to make sure we have separate listeners for testing.
override def brokerPropertyOverrides(properties: Properties): Unit = {
def brokerPropertyOverrides(properties: Properties): Unit = {
val securityProtocol = cluster.config().securityProtocol()
properties.setProperty(KafkaConfig.ControlPlaneListenerNameProp, controlPlaneListenerName.value())
properties.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp, s"${controlPlaneListenerName.value()}:$securityProtocol,$securityProtocol:$securityProtocol")
properties.setProperty("listeners", s"$securityProtocol://localhost:0,${controlPlaneListenerName.value()}://localhost:0")
@ -38,15 +45,15 @@ abstract class AbstractApiVersionsRequestTest extends BaseRequestTest {
}
def sendUnsupportedApiVersionRequest(request: ApiVersionsRequest): ApiVersionsResponse = {
val overrideHeader = nextRequestHeader(ApiKeys.API_VERSIONS, Short.MaxValue)
val socket = connect(anySocketServer)
val overrideHeader = IntegrationTestUtils.nextRequestHeader(ApiKeys.API_VERSIONS, Short.MaxValue)
val socket = IntegrationTestUtils.connect(cluster.brokerSocketServers().asScala.head, cluster.clientListener())
try {
sendWithHeader(request, overrideHeader, socket)
receive[ApiVersionsResponse](socket, ApiKeys.API_VERSIONS, 0.toShort)
IntegrationTestUtils.sendWithHeader(request, overrideHeader, socket)
IntegrationTestUtils.receive[ApiVersionsResponse](socket, ApiKeys.API_VERSIONS, 0.toShort)
} finally socket.close()
}
def validateApiVersionsResponse(apiVersionsResponse: ApiVersionsResponse, listenerName: ListenerName = interBrokerListenerName): Unit = {
def validateApiVersionsResponse(apiVersionsResponse: ApiVersionsResponse, listenerName: ListenerName): Unit = {
val expectedApis = ApiKeys.brokerApis()
if (listenerName == controlPlaneListenerName) {
expectedApis.add(ApiKeys.ENVELOPE)

View File

@ -17,32 +17,40 @@
package kafka.server
import kafka.test.{ClusterConfig, ClusterInstance}
import org.apache.kafka.common.message.ApiVersionsRequestData
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.{ApiVersionsRequest, ApiVersionsResponse}
import org.apache.kafka.common.requests.ApiVersionsRequest
import kafka.test.annotation.ClusterTest
import kafka.test.junit.ClusterTestExtensions
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.extension.ExtendWith
class ApiVersionsRequestTest extends AbstractApiVersionsRequestTest {
override def brokerCount: Int = 1
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
class ApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVersionsRequestTest(cluster) {
@Test
def testApiVersionsRequest(): Unit = {
val request = new ApiVersionsRequest.Builder().build()
val apiVersionsResponse = sendApiVersionsRequest(request)
validateApiVersionsResponse(apiVersionsResponse)
@BeforeEach
def setup(config: ClusterConfig): Unit = {
super.brokerPropertyOverrides(config.serverProperties())
}
@Test
@ClusterTest
def testApiVersionsRequest(): Unit = {
val request = new ApiVersionsRequest.Builder().build()
val apiVersionsResponse = sendApiVersionsRequest(request, cluster.clientListener())
validateApiVersionsResponse(apiVersionsResponse, cluster.clientListener())
}
@ClusterTest
def testApiVersionsRequestThroughControlPlaneListener(): Unit = {
val request = new ApiVersionsRequest.Builder().build()
val apiVersionsResponse = sendApiVersionsRequest(request, super.controlPlaneListenerName)
validateApiVersionsResponse(apiVersionsResponse, super.controlPlaneListenerName)
}
@Test
@ClusterTest
def testApiVersionsRequestWithUnsupportedVersion(): Unit = {
val apiVersionsRequest = new ApiVersionsRequest.Builder().build()
val apiVersionsResponse = sendUnsupportedApiVersionRequest(apiVersionsRequest)
@ -54,30 +62,25 @@ class ApiVersionsRequestTest extends AbstractApiVersionsRequestTest {
assertEquals(ApiKeys.API_VERSIONS.latestVersion(), apiVersion.maxVersion())
}
@Test
@ClusterTest
def testApiVersionsRequestValidationV0(): Unit = {
val apiVersionsRequest = new ApiVersionsRequest.Builder().build(0.asInstanceOf[Short])
val apiVersionsResponse = sendApiVersionsRequest(apiVersionsRequest)
validateApiVersionsResponse(apiVersionsResponse)
val apiVersionsResponse = sendApiVersionsRequest(apiVersionsRequest, cluster.clientListener())
validateApiVersionsResponse(apiVersionsResponse, cluster.clientListener())
}
@Test
@ClusterTest
def testApiVersionsRequestValidationV0ThroughControlPlaneListener(): Unit = {
val apiVersionsRequest = new ApiVersionsRequest.Builder().build(0.asInstanceOf[Short])
val apiVersionsResponse = sendApiVersionsRequest(apiVersionsRequest, super.controlPlaneListenerName)
validateApiVersionsResponse(apiVersionsResponse, super.controlPlaneListenerName)
}
@Test
@ClusterTest
def testApiVersionsRequestValidationV3(): Unit = {
// Invalid request because Name and Version are empty by default
val apiVersionsRequest = new ApiVersionsRequest(new ApiVersionsRequestData(), 3.asInstanceOf[Short])
val apiVersionsResponse = sendApiVersionsRequest(apiVersionsRequest)
val apiVersionsResponse = sendApiVersionsRequest(apiVersionsRequest, cluster.clientListener())
assertEquals(Errors.INVALID_REQUEST.code(), apiVersionsResponse.data.errorCode())
}
private def sendApiVersionsRequest(request: ApiVersionsRequest,
listenerName: ListenerName = super.listenerName): ApiVersionsResponse = {
connectAndReceive[ApiVersionsResponse](request, listenerName = listenerName)
}
}

View File

@ -17,8 +17,10 @@
package kafka.server
import java.net.InetAddress
import integration.kafka.server.IntegrationTestUtils
import kafka.test.ClusterInstance
import java.net.InetAddress
import org.apache.kafka.clients.admin.{ScramCredentialInfo, ScramMechanism, UserScramCredentialUpsertion}
import org.apache.kafka.common.config.internals.QuotaConfigs
import org.apache.kafka.common.errors.{InvalidRequestException, UnsupportedVersionException}
@ -26,23 +28,25 @@ import org.apache.kafka.common.internals.KafkaFutureImpl
import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity, ClientQuotaFilter, ClientQuotaFilterComponent}
import org.apache.kafka.common.requests.{AlterClientQuotasRequest, AlterClientQuotasResponse, DescribeClientQuotasRequest, DescribeClientQuotasResponse}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.extension.ExtendWith
import java.util
import java.util.concurrent.{ExecutionException, TimeUnit}
import kafka.utils.TestUtils
import kafka.test.annotation.{ClusterTest, ClusterTestDefaults, Type}
import kafka.test.junit.ClusterTestExtensions
import scala.jdk.CollectionConverters._
class ClientQuotasRequestTest extends BaseRequestTest {
@ClusterTestDefaults(clusterType = Type.ZK)
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
class ClientQuotasRequestTest(cluster: ClusterInstance) {
private val ConsumerByteRateProp = QuotaConfigs.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG
private val ProducerByteRateProp = QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG
private val RequestPercentageProp = QuotaConfigs.REQUEST_PERCENTAGE_OVERRIDE_CONFIG
private val IpConnectionRateProp = QuotaConfigs.IP_CONNECTION_RATE_OVERRIDE_CONFIG
override val brokerCount = 1
@Test
@ClusterTest
def testAlterClientQuotasRequest(): Unit = {
val entity = new ClientQuotaEntity(Map((ClientQuotaEntity.USER -> "user"), (ClientQuotaEntity.CLIENT_ID -> "client-id")).asJava)
@ -112,7 +116,7 @@ class ClientQuotasRequestTest extends BaseRequestTest {
))
}
@Test
@ClusterTest
def testAlterClientQuotasRequestValidateOnly(): Unit = {
val entity = new ClientQuotaEntity(Map((ClientQuotaEntity.USER -> "user")).asJava)
@ -170,11 +174,11 @@ class ClientQuotasRequestTest extends BaseRequestTest {
))
}
@Test
@ClusterTest
def testClientQuotasForScramUsers(): Unit = {
val userName = "user"
val results = createAdminClient().alterUserScramCredentials(util.Arrays.asList(
val results = cluster.createAdminClient().alterUserScramCredentials(util.Arrays.asList(
new UserScramCredentialUpsertion(userName, new ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "password")))
results.all.get
@ -193,7 +197,7 @@ class ClientQuotasRequestTest extends BaseRequestTest {
))
}
@Test
@ClusterTest
def testAlterIpQuotasRequest(): Unit = {
val knownHost = "1.2.3.4"
val unknownHost = "2.3.4.5"
@ -218,7 +222,7 @@ class ClientQuotasRequestTest extends BaseRequestTest {
var currentServerQuota = 0
TestUtils.waitUntilTrue(
() => {
currentServerQuota = servers.head.socketServer.connectionQuotas.connectionRateForIp(entityIp)
currentServerQuota = cluster.brokerSocketServers().asScala.head.connectionQuotas.connectionRateForIp(entityIp)
Math.abs(expectedMatches(entity) - currentServerQuota) < 0.01
}, s"Connection quota of $entity is not ${expectedMatches(entity)} but $currentServerQuota")
}
@ -251,40 +255,25 @@ class ClientQuotasRequestTest extends BaseRequestTest {
verifyIpQuotas(allIpEntityFilter, Map.empty)
}
@Test
def testAlterClientQuotasBadUser(): Unit = {
val entity = new ClientQuotaEntity(Map((ClientQuotaEntity.USER -> "")).asJava)
assertThrows(classOf[InvalidRequestException], () => alterEntityQuotas(entity, Map(RequestPercentageProp -> Some(12.34)), validateOnly = true))
}
@ClusterTest
def testAlterClientQuotasInvalidRequests(): Unit = {
var entity = new ClientQuotaEntity(Map((ClientQuotaEntity.USER -> "")).asJava)
assertThrows(classOf[InvalidRequestException], () => alterEntityQuotas(entity, Map((RequestPercentageProp -> Some(12.34))), validateOnly = true))
@Test
def testAlterClientQuotasBadClientId(): Unit = {
val entity = new ClientQuotaEntity(Map((ClientQuotaEntity.CLIENT_ID -> "")).asJava)
assertThrows(classOf[InvalidRequestException], () => alterEntityQuotas(entity, Map(RequestPercentageProp -> Some(12.34)), validateOnly = true))
}
entity = new ClientQuotaEntity(Map((ClientQuotaEntity.CLIENT_ID -> "")).asJava)
assertThrows(classOf[InvalidRequestException], () => alterEntityQuotas(entity, Map((RequestPercentageProp -> Some(12.34))), validateOnly = true))
@Test
def testAlterClientQuotasBadEntityType(): Unit = {
val entity = new ClientQuotaEntity(Map(("" -> "name")).asJava)
assertThrows(classOf[InvalidRequestException], () => alterEntityQuotas(entity, Map(RequestPercentageProp -> Some(12.34)), validateOnly = true))
}
entity = new ClientQuotaEntity(Map(("" -> "name")).asJava)
assertThrows(classOf[InvalidRequestException], () => alterEntityQuotas(entity, Map((RequestPercentageProp -> Some(12.34))), validateOnly = true))
@Test
def testAlterClientQuotasEmptyEntity(): Unit = {
val entity = new ClientQuotaEntity(Map.empty.asJava)
assertThrows(classOf[InvalidRequestException], () => alterEntityQuotas(entity, Map(ProducerByteRateProp -> Some(10000.5)), validateOnly = true))
}
entity = new ClientQuotaEntity(Map.empty.asJava)
assertThrows(classOf[InvalidRequestException], () => alterEntityQuotas(entity, Map((ProducerByteRateProp -> Some(10000.5))), validateOnly = true))
@Test
def testAlterClientQuotasBadConfigKey(): Unit = {
val entity = new ClientQuotaEntity(Map(ClientQuotaEntity.USER -> "user").asJava)
assertThrows(classOf[InvalidRequestException], () => alterEntityQuotas(entity, Map("bad" -> Some(1.0)), validateOnly = true))
}
entity = new ClientQuotaEntity(Map((ClientQuotaEntity.USER -> "user")).asJava)
assertThrows(classOf[InvalidRequestException], () => alterEntityQuotas(entity, Map(("bad" -> Some(1.0))), validateOnly = true))
@Test
def testAlterClientQuotasBadConfigValue(): Unit = {
val entity = new ClientQuotaEntity(Map(ClientQuotaEntity.USER -> "user").asJava)
assertThrows(classOf[InvalidRequestException], () => alterEntityQuotas(entity, Map(ProducerByteRateProp -> Some(10000.5)), validateOnly = true))
entity = new ClientQuotaEntity(Map((ClientQuotaEntity.USER -> "user")).asJava)
assertThrows(classOf[InvalidRequestException], () => alterEntityQuotas(entity, Map((ProducerByteRateProp -> Some(10000.5))), validateOnly = true))
}
private def expectInvalidRequestWithMessage(runnable: => Unit, expectedMessage: String): Unit = {
@ -292,7 +281,7 @@ class ClientQuotasRequestTest extends BaseRequestTest {
assertTrue(exception.getMessage.contains(expectedMessage), s"Expected message $exception to contain $expectedMessage")
}
@Test
@ClusterTest
def testAlterClientQuotasInvalidEntityCombination(): Unit = {
val userAndIpEntity = new ClientQuotaEntity(Map(ClientQuotaEntity.USER -> "user", ClientQuotaEntity.IP -> "1.2.3.4").asJava)
val clientAndIpEntity = new ClientQuotaEntity(Map(ClientQuotaEntity.CLIENT_ID -> "client", ClientQuotaEntity.IP -> "1.2.3.4").asJava)
@ -303,7 +292,7 @@ class ClientQuotasRequestTest extends BaseRequestTest {
validateOnly = true), expectedExceptionMessage)
}
@Test
@ClusterTest
def testAlterClientQuotasBadIp(): Unit = {
val invalidHostPatternEntity = new ClientQuotaEntity(Map(ClientQuotaEntity.IP -> "abc-123").asJava)
val unresolvableHostEntity = new ClientQuotaEntity(Map(ClientQuotaEntity.IP -> "ip").asJava)
@ -314,7 +303,7 @@ class ClientQuotasRequestTest extends BaseRequestTest {
validateOnly = true), expectedExceptionMessage)
}
@Test
@ClusterTest
def testDescribeClientQuotasInvalidFilterCombination(): Unit = {
val ipFilterComponent = ClientQuotaFilterComponent.ofEntityType(ClientQuotaEntity.IP)
val userFilterComponent = ClientQuotaFilterComponent.ofEntityType(ClientQuotaEntity.USER)
@ -357,7 +346,7 @@ class ClientQuotasRequestTest extends BaseRequestTest {
(matchUserClientEntities ++ matchIpEntities).foreach(e => result(e._1).get(10, TimeUnit.SECONDS))
}
@Test
@ClusterTest
def testDescribeClientQuotasMatchExact(): Unit = {
setupDescribeClientQuotasMatchTest()
@ -402,7 +391,7 @@ class ClientQuotasRequestTest extends BaseRequestTest {
}
}
@Test
@ClusterTest
def testDescribeClientQuotasMatchPartial(): Unit = {
setupDescribeClientQuotasMatchTest()
@ -509,13 +498,13 @@ class ClientQuotasRequestTest extends BaseRequestTest {
testMatchEntities(ClientQuotaFilter.containsOnly(List.empty.asJava), 0, entity => false)
}
@Test
@ClusterTest
def testClientQuotasUnsupportedEntityTypes(): Unit = {
val entity = new ClientQuotaEntity(Map(("other" -> "name")).asJava)
assertThrows(classOf[UnsupportedVersionException], () => verifyDescribeEntityQuotas(entity, Map.empty))
}
@Test
@ClusterTest
def testClientQuotasSanitized(): Unit = {
// An entity with name that must be sanitized when writing to Zookeeper.
val entity = new ClientQuotaEntity(Map((ClientQuotaEntity.USER -> "user with spaces")).asJava)
@ -529,7 +518,7 @@ class ClientQuotasRequestTest extends BaseRequestTest {
))
}
@Test
@ClusterTest
def testClientQuotasWithDefaultName(): Unit = {
// An entity using the name associated with the default entity name. The entity's name should be sanitized so
// that it does not conflict with the default entity name.
@ -580,7 +569,9 @@ class ClientQuotasRequestTest extends BaseRequestTest {
private def sendDescribeClientQuotasRequest(filter: ClientQuotaFilter): DescribeClientQuotasResponse = {
val request = new DescribeClientQuotasRequest.Builder(filter).build()
connectAndReceive[DescribeClientQuotasResponse](request, destination = controllerSocketServer)
IntegrationTestUtils.connectAndReceive[DescribeClientQuotasResponse](request,
destination = cluster.anyControllerSocketServer(),
listenerName = cluster.clientListener())
}
private def alterEntityQuotas(entity: ClientQuotaEntity, alter: Map[String, Option[Double]], validateOnly: Boolean) =
@ -606,7 +597,9 @@ class ClientQuotasRequestTest extends BaseRequestTest {
private def sendAlterClientQuotasRequest(entries: Iterable[ClientQuotaAlteration], validateOnly: Boolean): AlterClientQuotasResponse = {
val request = new AlterClientQuotasRequest.Builder(entries.asJavaCollection, validateOnly).build()
connectAndReceive[AlterClientQuotasResponse](request, destination = controllerSocketServer)
IntegrationTestUtils.connectAndReceive[AlterClientQuotasResponse](request,
destination = cluster.anyControllerSocketServer(),
listenerName = cluster.clientListener())
}
}

View File

@ -16,6 +16,8 @@
*/
package kafka.server
import integration.kafka.server.IntegrationTestUtils
import java.net.Socket
import java.util.Collections
import kafka.api.{KafkaSasl, SaslSetup}
@ -23,49 +25,53 @@ import kafka.utils.JaasTestUtils
import org.apache.kafka.common.message.SaslHandshakeRequestData
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.{ApiVersionsRequest, ApiVersionsResponse, SaslHandshakeRequest, SaslHandshakeResponse}
import kafka.test.annotation.{ClusterTest, Type}
import kafka.test.junit.ClusterTestExtensions
import kafka.test.{ClusterConfig, ClusterInstance}
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.junit.jupiter.api.{AfterEach, BeforeEach}
import org.junit.jupiter.api.extension.ExtendWith
class SaslApiVersionsRequestTest extends AbstractApiVersionsRequestTest with SaslSetup {
override protected def securityProtocol = SecurityProtocol.SASL_PLAINTEXT
private val kafkaClientSaslMechanism = "PLAIN"
private val kafkaServerSaslMechanisms = List("PLAIN")
protected override val serverSaslProperties = Some(kafkaServerSaslProperties(kafkaServerSaslMechanisms, kafkaClientSaslMechanism))
protected override val clientSaslProperties = Some(kafkaClientSaslProperties(kafkaClientSaslMechanism))
override def brokerCount = 1
import scala.jdk.CollectionConverters._
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
class SaslApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVersionsRequestTest(cluster) {
val kafkaClientSaslMechanism = "PLAIN"
val kafkaServerSaslMechanisms = List("PLAIN")
private var sasl: SaslSetup = _
@BeforeEach
override def setUp(): Unit = {
startSasl(jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), KafkaSasl, JaasTestUtils.KafkaServerContextName))
super.setUp()
def setupSasl(config: ClusterConfig): Unit = {
sasl = new SaslSetup() {}
sasl.startSasl(sasl.jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), KafkaSasl, JaasTestUtils.KafkaServerContextName))
config.saslServerProperties().putAll(sasl.kafkaServerSaslProperties(kafkaServerSaslMechanisms, kafkaClientSaslMechanism))
config.saslClientProperties().putAll(sasl.kafkaClientSaslProperties(kafkaClientSaslMechanism))
super.brokerPropertyOverrides(config.serverProperties())
}
@AfterEach
override def tearDown(): Unit = {
super.tearDown()
closeSasl()
}
@Test
@ClusterTest(securityProtocol = SecurityProtocol.SASL_PLAINTEXT, clusterType = Type.ZK)
def testApiVersionsRequestBeforeSaslHandshakeRequest(): Unit = {
val socket = connect()
val socket = IntegrationTestUtils.connect(cluster.brokerSocketServers().asScala.head, cluster.clientListener())
try {
val apiVersionsResponse = sendAndReceive[ApiVersionsResponse](
val apiVersionsResponse = IntegrationTestUtils.sendAndReceive[ApiVersionsResponse](
new ApiVersionsRequest.Builder().build(0), socket)
validateApiVersionsResponse(apiVersionsResponse)
validateApiVersionsResponse(apiVersionsResponse, cluster.clientListener())
sendSaslHandshakeRequestValidateResponse(socket)
} finally {
socket.close()
}
}
@Test
@ClusterTest(securityProtocol = SecurityProtocol.SASL_PLAINTEXT, clusterType = Type.ZK)
def testApiVersionsRequestAfterSaslHandshakeRequest(): Unit = {
val socket = connect()
val socket = IntegrationTestUtils.connect(cluster.brokerSocketServers().asScala.head, cluster.clientListener())
try {
sendSaslHandshakeRequestValidateResponse(socket)
val response = sendAndReceive[ApiVersionsResponse](
val response = IntegrationTestUtils.sendAndReceive[ApiVersionsResponse](
new ApiVersionsRequest.Builder().build(0), socket)
assertEquals(Errors.ILLEGAL_SASL_STATE.code, response.data.errorCode)
} finally {
@ -73,26 +79,31 @@ class SaslApiVersionsRequestTest extends AbstractApiVersionsRequestTest with Sas
}
}
@Test
@ClusterTest(securityProtocol = SecurityProtocol.SASL_PLAINTEXT, clusterType = Type.ZK)
def testApiVersionsRequestWithUnsupportedVersion(): Unit = {
val socket = connect()
val socket = IntegrationTestUtils.connect(cluster.brokerSocketServers().asScala.head, cluster.clientListener())
try {
val apiVersionsRequest = new ApiVersionsRequest.Builder().build(0)
val apiVersionsResponse = sendUnsupportedApiVersionRequest(apiVersionsRequest)
assertEquals(Errors.UNSUPPORTED_VERSION.code, apiVersionsResponse.data.errorCode)
val apiVersionsResponse2 = sendAndReceive[ApiVersionsResponse](
val apiVersionsResponse2 = IntegrationTestUtils.sendAndReceive[ApiVersionsResponse](
new ApiVersionsRequest.Builder().build(0), socket)
validateApiVersionsResponse(apiVersionsResponse2)
validateApiVersionsResponse(apiVersionsResponse2, cluster.clientListener())
sendSaslHandshakeRequestValidateResponse(socket)
} finally {
socket.close()
}
}
@AfterEach
def closeSasl(): Unit = {
sasl.closeSasl()
}
private def sendSaslHandshakeRequestValidateResponse(socket: Socket): Unit = {
val request = new SaslHandshakeRequest(new SaslHandshakeRequestData().setMechanism("PLAIN"),
ApiKeys.SASL_HANDSHAKE.latestVersion)
val response = sendAndReceive[SaslHandshakeResponse](request, socket)
val response = IntegrationTestUtils.sendAndReceive[SaslHandshakeResponse](request, socket)
assertEquals(Errors.NONE, response.error)
assertEquals(Collections.singletonList("PLAIN"), response.enabledMechanisms)
}