KAFKA-17619: Remove zk type and instance from ClusterTest (#17284)

Reviewers: Colin P. McCabe <cmccabe@apache.org>, Chia-Ping Tsai <chia7712@gmail.com>, David Arthur <mumrah@gmail.com>
This commit is contained in:
PoAn Yang 2024-09-27 23:38:15 +08:00 committed by GitHub
parent 05d05e1b5e
commit 10c789416c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
34 changed files with 109 additions and 1269 deletions

View File

@ -17,15 +17,12 @@
package kafka.admin;
import kafka.admin.AclCommand.AclCommandOptions;
import kafka.security.authorizer.AclAuthorizer;
import kafka.test.ClusterInstance;
import kafka.test.annotation.ClusterConfigProperty;
import kafka.test.annotation.ClusterTest;
import kafka.test.annotation.ClusterTestDefaults;
import kafka.test.annotation.ClusterTests;
import kafka.test.annotation.Type;
import kafka.test.junit.ClusterTestExtensions;
import kafka.test.junit.ZkClusterInvocationContext;
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.acl.AclBindingFilter;
@ -98,14 +95,17 @@ import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ClusterTestDefaults(serverProperties = {
@ClusterConfigProperty(key = StandardAuthorizer.SUPER_USERS_CONFIG, value = "User:ANONYMOUS"),
@ClusterConfigProperty(key = AUTHORIZER_CLASS_NAME_CONFIG, value = AclCommandTest.ACL_AUTHORIZER)
})
@ClusterTestDefaults(
types = {Type.KRAFT},
serverProperties = {
@ClusterConfigProperty(key = StandardAuthorizer.SUPER_USERS_CONFIG, value = "User:ANONYMOUS"),
@ClusterConfigProperty(key = AUTHORIZER_CLASS_NAME_CONFIG, value = AclCommandTest.STANDARD_AUTHORIZER)}
)
@ExtendWith(ClusterTestExtensions.class)
public class AclCommandTest {
public static final String ACL_AUTHORIZER = "kafka.security.authorizer.AclAuthorizer";
private static final String STANDARD_AUTHORIZER = "org.apache.kafka.metadata.authorizer.StandardAuthorizer";
public static final String STANDARD_AUTHORIZER = "org.apache.kafka.metadata.authorizer.StandardAuthorizer";
private static final String LOCALHOST = "localhost:9092";
private static final String AUTHORIZER = "--authorizer";
private static final String AUTHORIZER_PROPERTIES = AUTHORIZER + "-properties";
@ -221,77 +221,38 @@ public class AclCommandTest {
}).collect(Collectors.toMap(Entry::getKey, Entry::getValue)));
}};
@ClusterTest(types = {Type.ZK})
public void testAclCliWithAuthorizer(ClusterInstance cluster) throws InterruptedException {
testAclCli(cluster, zkArgs(cluster));
}
@ClusterTests({
@ClusterTest(types = {Type.ZK}),
@ClusterTest(types = {Type.KRAFT}, serverProperties = {
@ClusterConfigProperty(key = AUTHORIZER_CLASS_NAME_CONFIG, value = STANDARD_AUTHORIZER)
})
})
@ClusterTest
public void testAclCliWithAdminAPI(ClusterInstance cluster) throws InterruptedException {
testAclCli(cluster, adminArgs(cluster.bootstrapServers(), Optional.empty()));
}
@ClusterTest(types = {Type.KRAFT}, serverProperties = {
@ClusterConfigProperty(key = AUTHORIZER_CLASS_NAME_CONFIG, value = STANDARD_AUTHORIZER)
})
@ClusterTest
public void testAclCliWithAdminAPIAndBootstrapController(ClusterInstance cluster) throws InterruptedException {
testAclCli(cluster, adminArgsWithBootstrapController(cluster.bootstrapControllers(), Optional.empty()));
}
@ClusterTests({
@ClusterTest(types = {Type.ZK}),
@ClusterTest(types = {Type.KRAFT}, serverProperties = {
@ClusterConfigProperty(key = AUTHORIZER_CLASS_NAME_CONFIG, value = STANDARD_AUTHORIZER)
})
})
@ClusterTest
public void testAclCliWithMisusingBootstrapServerToController(ClusterInstance cluster) {
assertThrows(RuntimeException.class, () -> testAclCli(cluster, adminArgsWithBootstrapController(cluster.bootstrapServers(), Optional.empty())));
}
@ClusterTests({
@ClusterTest(types = {Type.ZK}),
@ClusterTest(types = {Type.KRAFT}, serverProperties = {
@ClusterConfigProperty(key = AUTHORIZER_CLASS_NAME_CONFIG, value = STANDARD_AUTHORIZER)
})
})
@ClusterTest
public void testAclCliWithMisusingBootstrapControllerToServer(ClusterInstance cluster) {
assertThrows(RuntimeException.class, () -> testAclCli(cluster, adminArgs(cluster.bootstrapControllers(), Optional.empty())));
}
@ClusterTest(types = {Type.ZK})
public void testProducerConsumerCliWithAuthorizer(ClusterInstance cluster) throws InterruptedException {
testProducerConsumerCli(cluster, zkArgs(cluster));
}
@ClusterTests({
@ClusterTest(types = {Type.ZK}),
@ClusterTest(types = {Type.KRAFT}, serverProperties = {
@ClusterConfigProperty(key = AUTHORIZER_CLASS_NAME_CONFIG, value = STANDARD_AUTHORIZER)
})
})
@ClusterTest
public void testProducerConsumerCliWithAdminAPI(ClusterInstance cluster) throws InterruptedException {
testProducerConsumerCli(cluster, adminArgs(cluster.bootstrapServers(), Optional.empty()));
}
@ClusterTest(types = {Type.KRAFT}, serverProperties = {
@ClusterConfigProperty(key = AUTHORIZER_CLASS_NAME_CONFIG, value = STANDARD_AUTHORIZER)
})
@ClusterTest
public void testProducerConsumerCliWithAdminAPIAndBootstrapController(ClusterInstance cluster) throws InterruptedException {
testProducerConsumerCli(cluster, adminArgsWithBootstrapController(cluster.bootstrapControllers(), Optional.empty()));
}
@ClusterTests({
@ClusterTest(types = {Type.ZK}),
@ClusterTest(types = {Type.KRAFT}, serverProperties = {
@ClusterConfigProperty(key = AUTHORIZER_CLASS_NAME_CONFIG, value = STANDARD_AUTHORIZER)
})
})
@ClusterTest
public void testAclCliWithClientId(ClusterInstance cluster) throws IOException, InterruptedException {
try (LogCaptureAppender appender = LogCaptureAppender.createAndRegister()) {
appender.setClassLogger(AppInfoParser.class, Level.WARN);
@ -303,9 +264,7 @@ public class AclCommandTest {
}
}
@ClusterTest(types = {Type.KRAFT}, serverProperties = {
@ClusterConfigProperty(key = AUTHORIZER_CLASS_NAME_CONFIG, value = STANDARD_AUTHORIZER)
})
@ClusterTest
public void testAclCliWithClientIdAndBootstrapController(ClusterInstance cluster) throws IOException, InterruptedException {
try (LogCaptureAppender appender = LogCaptureAppender.createAndRegister()) {
appender.setClassLogger(AppInfoParser.class, Level.WARN);
@ -317,57 +276,22 @@ public class AclCommandTest {
}
}
@ClusterTest(types = {Type.ZK})
public void testAclsOnPrefixedResourcesWithAuthorizer(ClusterInstance cluster) throws InterruptedException {
testAclsOnPrefixedResources(cluster, zkArgs(cluster));
}
@ClusterTests({
@ClusterTest(types = {Type.ZK}),
@ClusterTest(types = {Type.KRAFT}, serverProperties = {
@ClusterConfigProperty(key = AUTHORIZER_CLASS_NAME_CONFIG, value = STANDARD_AUTHORIZER)
})
})
@ClusterTest
public void testAclsOnPrefixedResourcesWithAdminAPI(ClusterInstance cluster) throws InterruptedException {
testAclsOnPrefixedResources(cluster, adminArgs(cluster.bootstrapServers(), Optional.empty()));
}
@ClusterTest(types = {Type.KRAFT}, serverProperties = {
@ClusterConfigProperty(key = AUTHORIZER_CLASS_NAME_CONFIG, value = STANDARD_AUTHORIZER)
})
@ClusterTest
public void testAclsOnPrefixedResourcesWithAdminAPIAndBootstrapController(ClusterInstance cluster) throws InterruptedException {
testAclsOnPrefixedResources(cluster, adminArgsWithBootstrapController(cluster.bootstrapControllers(), Optional.empty()));
}
@ClusterTest(types = {Type.ZK})
public void testInvalidAuthorizerProperty(ClusterInstance cluster) {
AclCommand.AuthorizerService aclCommandService = new AclCommand.AuthorizerService(
AclAuthorizer.class.getName(),
new AclCommandOptions(new String[]{AUTHORIZER_PROPERTIES, "zookeeper.connect " + zkConnect(cluster)})
);
assertThrows(IllegalArgumentException.class, aclCommandService::listAcls);
}
@ClusterTest(types = {Type.ZK})
public void testPatternTypesWithAuthorizer(ClusterInstance cluster) {
testPatternTypes(zkArgs(cluster));
}
@ClusterTests({
@ClusterTest(types = {Type.ZK}),
@ClusterTest(types = {Type.KRAFT}, serverProperties = {
@ClusterConfigProperty(key = AUTHORIZER_CLASS_NAME_CONFIG, value = STANDARD_AUTHORIZER)
})
})
@ClusterTest
public void testPatternTypesWithAdminAPI(ClusterInstance cluster) {
testPatternTypes(adminArgs(cluster.bootstrapServers(), Optional.empty()));
}
@ClusterTests({
@ClusterTest(types = {Type.KRAFT}, serverProperties = {
@ClusterConfigProperty(key = AUTHORIZER_CLASS_NAME_CONFIG, value = STANDARD_AUTHORIZER)
})
})
@ClusterTest
public void testPatternTypesWithAdminAPIAndBootstrapController(ClusterInstance cluster) {
testPatternTypes(adminArgsWithBootstrapController(cluster.bootstrapControllers(), Optional.empty()));
}
@ -728,14 +652,6 @@ public class AclCommandTest {
return JavaConverters.setAsJavaSet(scalaSet);
}
private String zkConnect(ClusterInstance cluster) {
return ((ZkClusterInvocationContext.ZkClusterInstance) cluster).getUnderlying().zkConnect();
}
private List<String> zkArgs(ClusterInstance cluster) {
return Arrays.asList("--authorizer-properties", "zookeeper.connect=" + zkConnect(cluster));
}
private void assertInitializeInvalidOptionsExitCodeAndMsg(List<String> args, String expectedMsg) {
Exit.setExitProcedure((exitCode, message) -> {
assertEquals(1, exitCode);

View File

@ -16,27 +16,15 @@
*/
package kafka.admin;
import kafka.cluster.Broker;
import kafka.cluster.EndPoint;
import kafka.test.ClusterInstance;
import kafka.test.annotation.ClusterTest;
import kafka.test.annotation.Type;
import kafka.test.junit.ClusterTestExtensions;
import kafka.test.junit.ZkClusterInvocationContext;
import kafka.zk.AdminZkClient;
import kafka.zk.BrokerInfo;
import kafka.zk.KafkaZkClient;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.security.PasswordEncoder;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.config.ZooKeeperInternals;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.extension.ExtendWith;
@ -50,7 +38,6 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicReference;
@ -62,13 +49,8 @@ import static java.util.Arrays.asList;
import static java.util.Collections.singleton;
import static java.util.Collections.singletonList;
import static java.util.Collections.singletonMap;
import static org.apache.kafka.security.PasswordEncoderConfigs.PASSWORD_ENCODER_CIPHER_ALGORITHM_CONFIG;
import static org.apache.kafka.security.PasswordEncoderConfigs.PASSWORD_ENCODER_ITERATIONS_CONFIG;
import static org.apache.kafka.security.PasswordEncoderConfigs.PASSWORD_ENCODER_KEYFACTORY_ALGORITHM_CONFIG;
import static org.apache.kafka.security.PasswordEncoderConfigs.PASSWORD_ENCODER_KEY_LENGTH_CONFIG;
import static org.apache.kafka.security.PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ -98,7 +80,7 @@ public class ConfigCommandIntegrationTest {
this.cluster = cluster;
}
@ClusterTest(types = {Type.ZK, Type.KRAFT, Type.CO_KRAFT})
@ClusterTest
public void testExitWithNonZeroStatusOnUpdatingUnallowedConfig() {
assertNonZeroStatusExit(Stream.concat(quorumArgs(), Stream.of(
"--entity-name", cluster.isKRaftTest() ? "0" : "1",
@ -108,46 +90,7 @@ public class ConfigCommandIntegrationTest {
errOut -> assertTrue(errOut.contains("Cannot update these configs dynamically: Set(security.inter.broker.protocol)"), errOut));
}
@ClusterTest(types = {Type.ZK})
public void testExitWithNonZeroStatusOnZkCommandAlterUserQuota() {
assertNonZeroStatusExit(Stream.concat(quorumArgs(), Stream.of(
"--entity-type", "users",
"--entity-name", "admin",
"--alter", "--add-config", "consumer_byte_rate=20000")),
errOut -> assertTrue(errOut.contains("User configuration updates using ZooKeeper are only supported for SCRAM credential updates."), errOut));
}
@ClusterTest(types = {Type.ZK})
public void testExitWithNonZeroStatusOnZkCommandAlterGroup() {
assertNonZeroStatusExit(Stream.concat(quorumArgs(), Stream.of(
"--entity-type", "groups",
"--entity-name", "group",
"--alter", "--add-config", "consumer.session.timeout.ms=50000")),
errOut -> assertTrue(errOut.contains("Invalid entity type groups, the entity type must be one of users, brokers with a --zookeeper argument"), errOut));
// Test for the --group alias
assertNonZeroStatusExit(Stream.concat(quorumArgs(), Stream.of(
"--group", "group",
"--alter", "--add-config", "consumer.session.timeout.ms=50000")),
errOut -> assertTrue(errOut.contains("Invalid entity type groups, the entity type must be one of users, brokers with a --zookeeper argument"), errOut));
}
@ClusterTest(types = {Type.ZK})
public void testExitWithNonZeroStatusOnZkCommandAlterClientMetrics() {
assertNonZeroStatusExit(Stream.concat(quorumArgs(), Stream.of(
"--entity-type", "client-metrics",
"--entity-name", "cm",
"--alter", "--add-config", "metrics=org.apache")),
errOut -> assertTrue(errOut.contains("Invalid entity type client-metrics, the entity type must be one of users, brokers with a --zookeeper argument"), errOut));
// Test for the --client-metrics alias
assertNonZeroStatusExit(Stream.concat(quorumArgs(), Stream.of(
"--client-metrics", "cm",
"--alter", "--add-config", "consumer.session.timeout.ms=50000")),
errOut -> assertTrue(errOut.contains("Invalid entity type client-metrics, the entity type must be one of users, brokers with a --zookeeper argument"), errOut));
}
@ClusterTest(types = {Type.CO_KRAFT, Type.KRAFT})
@ClusterTest
public void testNullStatusOnKraftCommandAlterUserQuota() {
Stream<String> command = Stream.concat(quorumArgs(), Stream.of(
"--entity-type", "users",
@ -158,7 +101,7 @@ public class ConfigCommandIntegrationTest {
assertTrue(StringUtils.isBlank(message), message);
}
@ClusterTest(types = {Type.CO_KRAFT, Type.KRAFT})
@ClusterTest
public void testNullStatusOnKraftCommandAlterGroup() {
Stream<String> command = Stream.concat(quorumArgs(), Stream.of(
"--entity-type", "groups",
@ -175,7 +118,7 @@ public class ConfigCommandIntegrationTest {
assertTrue(StringUtils.isBlank(message), message);
}
@ClusterTest(types = {Type.CO_KRAFT, Type.KRAFT})
@ClusterTest
public void testNullStatusOnKraftCommandAlterClientMetrics() {
Stream<String> command = Stream.concat(quorumArgs(), Stream.of(
"--entity-type", "client-metrics",
@ -192,96 +135,7 @@ public class ConfigCommandIntegrationTest {
assertTrue(StringUtils.isBlank(message), message);
}
@ClusterTest(types = Type.ZK)
public void testDynamicBrokerConfigUpdateUsingZooKeeper() throws Exception {
cluster.shutdownBroker(0);
String zkConnect = ((ZkClusterInvocationContext.ZkClusterInstance) cluster).getUnderlying().zkConnect();
KafkaZkClient zkClient = ((ZkClusterInvocationContext.ZkClusterInstance) cluster).getUnderlying().zkClient();
String brokerId = "1";
AdminZkClient adminZkClient = new AdminZkClient(zkClient, scala.None$.empty());
alterOpts = asList("--zookeeper", zkConnect, "--entity-type", "brokers", "--alter");
// Add config
alterAndVerifyConfig(zkClient, adminZkClient, Optional.of(brokerId),
singletonMap("message.max.bytes", "110000"));
alterAndVerifyConfig(zkClient, adminZkClient, Optional.empty(),
singletonMap("message.max.bytes", "120000"));
// Change config
alterAndVerifyConfig(zkClient, adminZkClient, Optional.of(brokerId),
singletonMap("message.max.bytes", "130000"));
alterAndVerifyConfig(zkClient, adminZkClient, Optional.empty(),
singletonMap("message.max.bytes", "140000"));
// Delete config
deleteAndVerifyConfig(zkClient, adminZkClient, Optional.of(brokerId),
singleton("message.max.bytes"));
deleteAndVerifyConfig(zkClient, adminZkClient, Optional.empty(),
singleton("message.max.bytes"));
// Listener configs: should work only with listener name
alterAndVerifyConfig(zkClient, adminZkClient, Optional.of(brokerId),
singletonMap("listener.name.internal.ssl.keystore.location", "/tmp/test.jks"));
assertThrows(ConfigException.class,
() -> alterConfigWithZk(zkClient, adminZkClient, Optional.of(brokerId),
singletonMap("ssl.keystore.location", "/tmp/test.jks")));
// Per-broker config configured at default cluster-level should fail
assertThrows(ConfigException.class,
() -> alterConfigWithZk(zkClient, adminZkClient, Optional.empty(),
singletonMap("listener.name.internal.ssl.keystore.location", "/tmp/test.jks")));
deleteAndVerifyConfig(zkClient, adminZkClient, Optional.of(brokerId),
singleton("listener.name.internal.ssl.keystore.location"));
// Password config update without encoder secret should fail
assertThrows(IllegalArgumentException.class,
() -> alterConfigWithZk(zkClient, adminZkClient, Optional.of(brokerId),
singletonMap("listener.name.external.ssl.keystore.password", "secret")));
// Password config update with encoder secret should succeed and encoded password must be stored in ZK
Map<String, String> configs = new HashMap<>();
configs.put("listener.name.external.ssl.keystore.password", "secret");
configs.put("log.cleaner.threads", "2");
Map<String, String> encoderConfigs = new HashMap<>(configs);
encoderConfigs.put(PASSWORD_ENCODER_SECRET_CONFIG, "encoder-secret");
alterConfigWithZk(zkClient, adminZkClient, Optional.of(brokerId), encoderConfigs);
Properties brokerConfigs = zkClient.getEntityConfigs("brokers", brokerId);
assertFalse(brokerConfigs.contains(PASSWORD_ENCODER_SECRET_CONFIG), "Encoder secret stored in ZooKeeper");
assertEquals("2", brokerConfigs.getProperty("log.cleaner.threads")); // not encoded
String encodedPassword = brokerConfigs.getProperty("listener.name.external.ssl.keystore.password");
PasswordEncoder passwordEncoder = ConfigCommand.createPasswordEncoder(encoderConfigs);
assertEquals("secret", passwordEncoder.decode(encodedPassword).value());
assertEquals(configs.size(), brokerConfigs.size());
// Password config update with overrides for encoder parameters
Map<String, String> encoderConfigs2 = generateEncodeConfig();
alterConfigWithZk(zkClient, adminZkClient, Optional.of(brokerId), encoderConfigs2);
Properties brokerConfigs2 = zkClient.getEntityConfigs("brokers", brokerId);
String encodedPassword2 = brokerConfigs2.getProperty("listener.name.external.ssl.keystore.password");
assertEquals("secret2", ConfigCommand.createPasswordEncoder(encoderConfigs)
.decode(encodedPassword2).value());
assertEquals("secret2", ConfigCommand.createPasswordEncoder(encoderConfigs2)
.decode(encodedPassword2).value());
// Password config update at default cluster-level should fail
assertThrows(ConfigException.class,
() -> alterConfigWithZk(zkClient, adminZkClient, Optional.empty(), encoderConfigs));
// Dynamic config updates using ZK should fail if broker is running.
registerBrokerInZk(zkClient, Integer.parseInt(brokerId));
assertThrows(IllegalArgumentException.class,
() -> alterConfigWithZk(zkClient, adminZkClient,
Optional.of(brokerId), singletonMap("message.max.bytes", "210000")));
assertThrows(IllegalArgumentException.class,
() -> alterConfigWithZk(zkClient, adminZkClient,
Optional.empty(), singletonMap("message.max.bytes", "220000")));
// Dynamic config updates using ZK should for a different broker that is not running should succeed
alterAndVerifyConfig(zkClient, adminZkClient, Optional.of("2"), singletonMap("message.max.bytes", "230000"));
}
@ClusterTest(types = {Type.CO_KRAFT, Type.KRAFT})
@ClusterTest
public void testDynamicBrokerConfigUpdateUsingKraft() throws Exception {
alterOpts = generateDefaultAlterOpts(cluster.bootstrapServers());
@ -322,7 +176,7 @@ public class ConfigCommandIntegrationTest {
}
}
@ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT})
@ClusterTest
public void testGroupConfigUpdateUsingKraft() throws Exception {
alterOpts = asList("--bootstrap-server", cluster.bootstrapServers(), "--entity-type", "groups", "--alter");
verifyGroupConfigUpdate();
@ -381,26 +235,7 @@ public class ConfigCommandIntegrationTest {
}
}
@ClusterTest(types = {Type.ZK})
public void testAlterReadOnlyConfigInZookeeperThenShouldFail() {
cluster.shutdownBroker(0);
String zkConnect = ((ZkClusterInvocationContext.ZkClusterInstance) cluster).getUnderlying().zkConnect();
KafkaZkClient zkClient = ((ZkClusterInvocationContext.ZkClusterInstance) cluster).getUnderlying().zkClient();
AdminZkClient adminZkClient = new AdminZkClient(zkClient, scala.None$.empty());
alterOpts = generateDefaultAlterOpts(zkConnect);
assertThrows(ConfigException.class,
() -> alterConfigWithZk(zkClient, adminZkClient, Optional.of(defaultBrokerId),
singletonMap("auto.create.topics.enable", "false")));
assertThrows(ConfigException.class,
() -> alterConfigWithZk(zkClient, adminZkClient, Optional.of(defaultBrokerId),
singletonMap("auto.leader.rebalance.enable", "false")));
assertThrows(ConfigException.class,
() -> alterConfigWithZk(zkClient, adminZkClient, Optional.of(defaultBrokerId),
singletonMap("broker.id", "1")));
}
@ClusterTest(types = {Type.CO_KRAFT, Type.KRAFT})
@ClusterTest
public void testAlterReadOnlyConfigInKRaftThenShouldFail() {
alterOpts = generateDefaultAlterOpts(cluster.bootstrapServers());
@ -417,23 +252,7 @@ public class ConfigCommandIntegrationTest {
}
}
@ClusterTest(types = {Type.ZK})
public void testUpdateClusterWideConfigInZookeeperThenShouldSuccessful() {
cluster.shutdownBroker(0);
String zkConnect = ((ZkClusterInvocationContext.ZkClusterInstance) cluster).getUnderlying().zkConnect();
KafkaZkClient zkClient = ((ZkClusterInvocationContext.ZkClusterInstance) cluster).getUnderlying().zkClient();
AdminZkClient adminZkClient = new AdminZkClient(zkClient, scala.None$.empty());
alterOpts = generateDefaultAlterOpts(zkConnect);
Map<String, String> configs = new HashMap<>();
configs.put("log.flush.interval.messages", "100");
configs.put("log.retention.bytes", "20");
configs.put("log.retention.ms", "2");
alterAndVerifyConfig(zkClient, adminZkClient, Optional.of(defaultBrokerId), configs);
}
@ClusterTest(types = {Type.CO_KRAFT, Type.KRAFT})
@ClusterTest
public void testUpdateClusterWideConfigInKRaftThenShouldSuccessful() throws Exception {
alterOpts = generateDefaultAlterOpts(cluster.bootstrapServers());
@ -447,34 +266,7 @@ public class ConfigCommandIntegrationTest {
}
}
@ClusterTest(types = {Type.ZK})
public void testUpdatePerBrokerConfigWithListenerNameInZookeeperThenShouldSuccessful() {
cluster.shutdownBroker(0);
String zkConnect = ((ZkClusterInvocationContext.ZkClusterInstance) cluster).getUnderlying().zkConnect();
KafkaZkClient zkClient = ((ZkClusterInvocationContext.ZkClusterInstance) cluster).getUnderlying().zkClient();
AdminZkClient adminZkClient = new AdminZkClient(zkClient, scala.None$.empty());
alterOpts = generateDefaultAlterOpts(zkConnect);
String listenerName = "listener.name.internal.";
String sslTruststoreType = listenerName + "ssl.truststore.type";
String sslTruststoreLocation = listenerName + "ssl.truststore.location";
String sslTruststorePassword = listenerName + "ssl.truststore.password";
Map<String, String> configs = new HashMap<>();
configs.put(sslTruststoreType, "PKCS12");
configs.put(sslTruststoreLocation, "/temp/test.jks");
configs.put("password.encoder.secret", "encoder-secret");
configs.put(sslTruststorePassword, "password");
alterConfigWithZk(zkClient, adminZkClient, Optional.of(defaultBrokerId), configs);
Properties properties = zkClient.getEntityConfigs("brokers", defaultBrokerId);
assertTrue(properties.containsKey(sslTruststorePassword));
assertEquals(configs.get(sslTruststoreType), properties.getProperty(sslTruststoreType));
assertEquals(configs.get(sslTruststoreLocation), properties.getProperty(sslTruststoreLocation));
}
@ClusterTest(types = {Type.CO_KRAFT, Type.KRAFT})
@ClusterTest
public void testUpdatePerBrokerConfigWithListenerNameInKRaftThenShouldSuccessful() throws Exception {
alterOpts = generateDefaultAlterOpts(cluster.bootstrapServers());
String listenerName = "listener.name.internal.";
@ -492,26 +284,7 @@ public class ConfigCommandIntegrationTest {
}
}
@ClusterTest(types = {Type.ZK})
public void testUpdatePerBrokerConfigInZookeeperThenShouldFail() {
cluster.shutdownBroker(0);
String zkConnect = ((ZkClusterInvocationContext.ZkClusterInstance) cluster).getUnderlying().zkConnect();
KafkaZkClient zkClient = ((ZkClusterInvocationContext.ZkClusterInstance) cluster).getUnderlying().zkClient();
AdminZkClient adminZkClient = new AdminZkClient(zkClient, scala.None$.empty());
alterOpts = generateDefaultAlterOpts(zkConnect);
assertThrows(ConfigException.class, () ->
alterAndVerifyConfig(zkClient, adminZkClient, Optional.of(defaultBrokerId),
singletonMap("ssl.truststore.type", "PKCS12")));
assertThrows(ConfigException.class, () ->
alterAndVerifyConfig(zkClient, adminZkClient, Optional.of(defaultBrokerId),
singletonMap("ssl.truststore.location", "/temp/test.jks")));
assertThrows(ConfigException.class, () ->
alterAndVerifyConfig(zkClient, adminZkClient, Optional.of(defaultBrokerId),
singletonMap("ssl.truststore.password", "password")));
}
@ClusterTest(types = {Type.CO_KRAFT, Type.KRAFT})
@ClusterTest
public void testUpdatePerBrokerConfigInKRaftThenShouldFail() {
alterOpts = generateDefaultAlterOpts(cluster.bootstrapServers());
@ -543,29 +316,7 @@ public class ConfigCommandIntegrationTest {
}
private Stream<String> quorumArgs() {
return cluster.isKRaftTest()
? Stream.of("--bootstrap-server", cluster.bootstrapServers())
: Stream.of("--zookeeper", ((ZkClusterInvocationContext.ZkClusterInstance) cluster).getUnderlying().zkConnect());
}
private void verifyConfig(KafkaZkClient zkClient, Optional<String> brokerId, Map<String, String> config) {
Properties entityConfigs = zkClient.getEntityConfigs("brokers",
brokerId.orElse(ZooKeeperInternals.DEFAULT_STRING));
assertEquals(config, entityConfigs);
}
private void alterAndVerifyConfig(KafkaZkClient zkClient, AdminZkClient adminZkClient,
Optional<String> brokerId, Map<String, String> configs) {
alterConfigWithZk(zkClient, adminZkClient, brokerId, configs);
verifyConfig(zkClient, brokerId, configs);
}
private void alterConfigWithZk(KafkaZkClient zkClient, AdminZkClient adminZkClient,
Optional<String> brokerId, Map<String, String> config) {
String configStr = transferConfigMapToString(config);
ConfigCommand.ConfigCommandOptions addOpts =
new ConfigCommand.ConfigCommandOptions(toArray(alterOpts, entityOp(brokerId), asList("--add-config", configStr)));
ConfigCommand.alterConfigWithZk(zkClient, addOpts, adminZkClient);
return Stream.of("--bootstrap-server", cluster.bootstrapServers());
}
private List<String> entityOp(Optional<String> brokerId) {
@ -573,36 +324,6 @@ public class ConfigCommandIntegrationTest {
.orElse(singletonList("--entity-default"));
}
private void deleteAndVerifyConfig(KafkaZkClient zkClient, AdminZkClient adminZkClient,
Optional<String> brokerId, Set<String> configNames) {
ConfigCommand.ConfigCommandOptions deleteOpts =
new ConfigCommand.ConfigCommandOptions(toArray(alterOpts, entityOp(brokerId),
asList("--delete-config", String.join(",", configNames))));
ConfigCommand.alterConfigWithZk(zkClient, deleteOpts, adminZkClient);
verifyConfig(zkClient, brokerId, Collections.emptyMap());
}
private Map<String, String> generateEncodeConfig() {
Map<String, String> map = new HashMap<>();
map.put(PASSWORD_ENCODER_SECRET_CONFIG, "encoder-secret");
map.put(PASSWORD_ENCODER_CIPHER_ALGORITHM_CONFIG, "DES/CBC/PKCS5Padding");
map.put(PASSWORD_ENCODER_ITERATIONS_CONFIG, "1024");
map.put(PASSWORD_ENCODER_KEYFACTORY_ALGORITHM_CONFIG, "PBKDF2WithHmacSHA1");
map.put(PASSWORD_ENCODER_KEY_LENGTH_CONFIG, "64");
map.put("listener.name.external.ssl.keystore.password", "secret2");
return map;
}
private void registerBrokerInZk(KafkaZkClient zkClient, int id) {
zkClient.createTopLevelPaths();
SecurityProtocol securityProtocol = SecurityProtocol.PLAINTEXT;
EndPoint endpoint = new EndPoint("localhost", 9092,
ListenerName.forSecurityProtocol(securityProtocol), securityProtocol);
BrokerInfo brokerInfo = BrokerInfo.apply(Broker.apply(id, endpoint,
scala.None$.empty()), MetadataVersion.latestTesting(), 9192);
zkClient.registerBroker(brokerInfo);
}
private List<String> generateDefaultAlterOpts(String bootstrapServers) {
return asList("--bootstrap-server", bootstrapServers,
"--entity-type", "brokers", "--alter");

View File

@ -69,7 +69,7 @@ public class ClusterConfig {
Map<String, String> consumerProperties, Map<String, String> adminClientProperties, Map<String, String> saslServerProperties,
Map<String, String> saslClientProperties, Map<Integer, Map<String, String>> perServerProperties, List<String> tags,
Map<Features, Short> features) {
// do fail fast. the following values are invalid for both zk and kraft modes.
// do fail fast. the following values are invalid for kraft modes.
if (brokers < 0) throw new IllegalArgumentException("Number of brokers must be greater or equal to zero.");
if (controllers < 0) throw new IllegalArgumentException("Number of controller must be greater or equal to zero.");
if (disksPerBroker <= 0) throw new IllegalArgumentException("Number of disks must be greater than zero.");
@ -176,7 +176,7 @@ public class ClusterConfig {
public static Builder defaultBuilder() {
return new Builder()
.setTypes(Stream.of(Type.ZK, Type.KRAFT, Type.CO_KRAFT).collect(Collectors.toSet()))
.setTypes(Stream.of(Type.KRAFT, Type.CO_KRAFT).collect(Collectors.toSet()))
.setBrokers(1)
.setControllers(1)
.setDisksPerBroker(1)

View File

@ -74,7 +74,6 @@ public interface ClusterInstance {
/**
* Return the set of all controller IDs configured for this test. For kraft, this
* will return only the nodes which have the "controller" role enabled in `process.roles`.
* For zookeeper, this will return all broker IDs since they are all eligible controllers.
*/
Set<Integer> controllerIds();
@ -92,19 +91,12 @@ public interface ClusterInstance {
ListenerName clientListener();
/**
* The listener for the kraft cluster controller configured by controller.listener.names. In ZK-based clusters, return Optional.empty
* The listener for the kraft cluster controller configured by controller.listener.names.
*/
default Optional<ListenerName> controllerListenerName() {
return Optional.empty();
}
/**
* The listener for the zk controller configured by control.plane.listener.name. In Raft-based clusters, return Optional.empty
*/
default Optional<ListenerName> controlPlaneListenerName() {
return Optional.empty();
}
/**
* The broker connect string which can be used by clients for bootstrapping
*/
@ -116,8 +108,7 @@ public interface ClusterInstance {
String bootstrapControllers();
/**
* A collection of all brokers in the cluster. In ZK-based clusters this will also include the broker which is
* acting as the controller (since ZK controllers serve both broker and controller roles).
* A collection of all brokers in the cluster.
*/
default Collection<SocketServer> brokerSocketServers() {
return brokers().values().stream()
@ -126,8 +117,7 @@ public interface ClusterInstance {
}
/**
* A collection of all controllers in the cluster. For ZK-based clusters, this will return the broker which is also
* currently the active controller. For Raft-based clusters, this will return all controller servers.
* A collection of all controllers in the cluster.
*/
Collection<SocketServer> controllerSocketServers();

View File

@ -53,7 +53,7 @@ import static org.apache.kafka.clients.consumer.GroupProtocol.CONSUMER;
import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG;
import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG;
@ClusterTestDefaults(types = {Type.ZK}, serverProperties = {
@ClusterTestDefaults(types = {Type.KRAFT}, serverProperties = {
@ClusterConfigProperty(key = "default.key", value = "default.value"),
@ClusterConfigProperty(id = 0, key = "queued.max.requests", value = "100"),
}) // Set defaults for a few params in @ClusterTest(s)
@ -71,7 +71,7 @@ public class ClusterTestExtensionsTest {
Map<String, String> serverProperties = new HashMap<>();
serverProperties.put("foo", "bar");
return Collections.singletonList(ClusterConfig.defaultBuilder()
.setTypes(Collections.singleton(Type.ZK))
.setTypes(Collections.singleton(Type.KRAFT))
.setServerProperties(serverProperties)
.setTags(Collections.singletonList("Generated Test"))
.build());
@ -81,29 +81,21 @@ public class ClusterTestExtensionsTest {
@ClusterTest
public void testClusterTest(ClusterInstance clusterInstance) {
Assertions.assertSame(this.clusterInstance, clusterInstance, "Injected objects should be the same");
Assertions.assertEquals(Type.ZK, clusterInstance.type()); // From the class level default
Assertions.assertEquals(Type.KRAFT, clusterInstance.type()); // From the class level default
Assertions.assertEquals("default.value", clusterInstance.config().serverProperties().get("default.key"));
}
// generate1 is a template method which generates any number of cluster configs
@ClusterTemplate("generate1")
public void testClusterTemplate() {
Assertions.assertEquals(Type.ZK, clusterInstance.type(),
"generate1 provided a Zk cluster, so we should see that here");
Assertions.assertEquals(Type.KRAFT, clusterInstance.type(),
"generate1 provided a KRAFT cluster, so we should see that here");
Assertions.assertEquals("bar", clusterInstance.config().serverProperties().get("foo"));
Assertions.assertEquals(Collections.singletonList("Generated Test"), clusterInstance.config().tags());
}
// Multiple @ClusterTest can be used with @ClusterTests
@ClusterTests({
@ClusterTest(types = {Type.ZK}, serverProperties = {
@ClusterConfigProperty(key = "foo", value = "bar"),
@ClusterConfigProperty(key = "spam", value = "eggs"),
@ClusterConfigProperty(id = 86400, key = "baz", value = "qux"), // this one will be ignored as there is no broker id is 86400
@ClusterConfigProperty(key = "spam", value = "eggs")
}, tags = {
"default.display.key1", "default.display.key2"
}),
@ClusterTest(types = {Type.KRAFT}, serverProperties = {
@ClusterConfigProperty(key = "foo", value = "baz"),
@ClusterConfigProperty(key = "spam", value = "eggz"),
@ -127,49 +119,34 @@ public class ClusterTestExtensionsTest {
})
})
public void testClusterTests() throws ExecutionException, InterruptedException {
if (!clusterInstance.isKRaftTest()) {
Assertions.assertEquals("bar", clusterInstance.config().serverProperties().get("foo"));
Assertions.assertEquals("eggs", clusterInstance.config().serverProperties().get("spam"));
Assertions.assertEquals("default.value", clusterInstance.config().serverProperties().get("default.key"));
Assertions.assertEquals(Arrays.asList("default.display.key1", "default.display.key2"), clusterInstance.config().tags());
Assertions.assertEquals("baz", clusterInstance.config().serverProperties().get("foo"));
Assertions.assertEquals("eggs", clusterInstance.config().serverProperties().get("spam"));
Assertions.assertEquals("overwrite.value", clusterInstance.config().serverProperties().get("default.key"));
Assertions.assertEquals(Arrays.asList("default.display.key1", "default.display.key2"), clusterInstance.config().tags());
// assert broker server 0 contains property queued.max.requests 100 from ClusterTestDefaults
try (Admin admin = clusterInstance.createAdminClient()) {
ConfigResource configResource = new ConfigResource(ConfigResource.Type.BROKER, "0");
// assert broker server 0 contains property queued.max.requests 200 from ClusterTest which overrides
// the value 100 in server property in ClusterTestDefaults
try (Admin admin = clusterInstance.createAdminClient()) {
ConfigResource configResource = new ConfigResource(ConfigResource.Type.BROKER, "0");
Map<ConfigResource, Config> configs = admin.describeConfigs(Collections.singletonList(configResource)).all().get();
Assertions.assertEquals(1, configs.size());
Assertions.assertEquals("200", configs.get(configResource).get("queued.max.requests").value());
}
// In KRaft cluster non-combined mode, assert the controller server 3000 contains the property queued.max.requests 300
if (clusterInstance.type() == Type.KRAFT) {
try (Admin admin = Admin.create(Collections.singletonMap(
AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG, clusterInstance.bootstrapControllers()))) {
ConfigResource configResource = new ConfigResource(ConfigResource.Type.BROKER, "3000");
Map<ConfigResource, Config> configs = admin.describeConfigs(Collections.singletonList(configResource)).all().get();
Assertions.assertEquals(1, configs.size());
Assertions.assertEquals("100", configs.get(configResource).get("queued.max.requests").value());
}
} else {
Assertions.assertEquals("baz", clusterInstance.config().serverProperties().get("foo"));
Assertions.assertEquals("eggs", clusterInstance.config().serverProperties().get("spam"));
Assertions.assertEquals("overwrite.value", clusterInstance.config().serverProperties().get("default.key"));
Assertions.assertEquals(Arrays.asList("default.display.key1", "default.display.key2"), clusterInstance.config().tags());
// assert broker server 0 contains property queued.max.requests 200 from ClusterTest which overrides
// the value 100 in server property in ClusterTestDefaults
try (Admin admin = clusterInstance.createAdminClient()) {
ConfigResource configResource = new ConfigResource(ConfigResource.Type.BROKER, "0");
Map<ConfigResource, Config> configs = admin.describeConfigs(Collections.singletonList(configResource)).all().get();
Assertions.assertEquals(1, configs.size());
Assertions.assertEquals("200", configs.get(configResource).get("queued.max.requests").value());
}
// In KRaft cluster non-combined mode, assert the controller server 3000 contains the property queued.max.requests 300
if (clusterInstance.type() == Type.KRAFT) {
try (Admin admin = Admin.create(Collections.singletonMap(
AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG, clusterInstance.bootstrapControllers()))) {
ConfigResource configResource = new ConfigResource(ConfigResource.Type.BROKER, "3000");
Map<ConfigResource, Config> configs = admin.describeConfigs(Collections.singletonList(configResource)).all().get();
Assertions.assertEquals(1, configs.size());
Assertions.assertEquals("300", configs.get(configResource).get("queued.max.requests").value());
}
Assertions.assertEquals("300", configs.get(configResource).get("queued.max.requests").value());
}
}
}
@ClusterTests({
@ClusterTest(types = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}),
@ClusterTest(types = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}, disksPerBroker = 2),
@ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}),
@ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, disksPerBroker = 2),
})
public void testClusterTestWithDisksPerBroker() throws ExecutionException, InterruptedException {
Admin admin = clusterInstance.createAdminClient();
@ -201,10 +178,10 @@ public class ClusterTestExtensionsTest {
}
@ClusterTests({
@ClusterTest(types = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}, serverProperties = {
@ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, serverProperties = {
@ClusterConfigProperty(key = GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic"),
}),
@ClusterTest(types = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}, serverProperties = {
@ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, serverProperties = {
@ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "false"),
})
})
@ -214,7 +191,7 @@ public class ClusterTestExtensionsTest {
@ClusterTest(types = {Type.ZK, Type.CO_KRAFT, Type.KRAFT}, brokers = 3)
@ClusterTest(types = {Type.CO_KRAFT, Type.KRAFT}, brokers = 3)
public void testCreateTopic(ClusterInstance clusterInstance) throws Exception {
String topicName = "test";
int numPartition = 3;
@ -230,7 +207,7 @@ public class ClusterTestExtensionsTest {
}
}
@ClusterTest(types = {Type.ZK, Type.CO_KRAFT, Type.KRAFT}, brokers = 4)
@ClusterTest(types = {Type.CO_KRAFT, Type.KRAFT}, brokers = 4)
public void testShutdownAndSyncMetadata(ClusterInstance clusterInstance) throws Exception {
String topicName = "test";
int numPartition = 3;
@ -240,7 +217,7 @@ public class ClusterTestExtensionsTest {
clusterInstance.waitForTopic(topicName, numPartition);
}
@ClusterTest(types = {Type.ZK, Type.CO_KRAFT, Type.KRAFT}, brokers = 4)
@ClusterTest(types = {Type.CO_KRAFT, Type.KRAFT}, brokers = 4)
public void testClusterAliveBrokers(ClusterInstance clusterInstance) throws Exception {
clusterInstance.waitForReadyBrokers();
@ -256,7 +233,7 @@ public class ClusterTestExtensionsTest {
}
@ClusterTest(types = {Type.ZK, Type.CO_KRAFT, Type.KRAFT}, brokers = 4)
@ClusterTest(types = {Type.CO_KRAFT, Type.KRAFT}, brokers = 4)
public void testVerifyTopicDeletion(ClusterInstance clusterInstance) throws Exception {
try (Admin admin = clusterInstance.createAdminClient()) {
String testTopic = "testTopic";

View File

@ -32,9 +32,6 @@ public @interface ClusterConfigProperty {
* all controller/broker servers. Note that the "controller" here refers to the KRaft quorum controller.
* The id can vary depending on the different {@link kafka.test.annotation.Type}.
* <ul>
* <li> Under {@link kafka.test.annotation.Type#ZK}, the broker id starts from
* {@link kafka.testkit.TestKitNodes#BROKER_ID_OFFSET 0} and increases by 1
* with each additional broker, and there is no controller server under this mode. </li>
* <li> Under {@link kafka.test.annotation.Type#KRAFT}, the broker id starts from
* {@link kafka.testkit.TestKitNodes#BROKER_ID_OFFSET 0}, the controller id
* starts from {@link kafka.testkit.TestKitNodes#CONTROLLER_ID_OFFSET 3000}

View File

@ -35,7 +35,7 @@ import static java.lang.annotation.RetentionPolicy.RUNTIME;
@Target({TYPE})
@Retention(RUNTIME)
public @interface ClusterTestDefaults {
Type[] types() default {Type.ZK, Type.KRAFT, Type.CO_KRAFT};
Type[] types() default {Type.KRAFT, Type.CO_KRAFT};
int brokers() default 1;
int controllers() default 1;
int disksPerBroker() default 1;

View File

@ -19,7 +19,6 @@ package kafka.test.annotation;
import kafka.test.ClusterConfig;
import kafka.test.junit.RaftClusterInvocationContext;
import kafka.test.junit.ZkClusterInvocationContext;
import org.junit.jupiter.api.extension.TestTemplateInvocationContext;
@ -39,12 +38,6 @@ public enum Type {
public TestTemplateInvocationContext invocationContexts(String baseDisplayName, ClusterConfig config) {
return new RaftClusterInvocationContext(baseDisplayName, config, true);
}
},
ZK {
@Override
public TestTemplateInvocationContext invocationContexts(String baseDisplayName, ClusterConfig config) {
return new ZkClusterInvocationContext(baseDisplayName, config);
}
};
public abstract TestTemplateInvocationContext invocationContexts(String baseDisplayName, ClusterConfig config);

View File

@ -14,7 +14,7 @@ This annotation has fields for a set of cluster types and number of brokers, as
Arbitrary server properties can also be provided in the annotation:
```java
@ClusterTest(types = {Type.Zk}, securityProtocol = "PLAINTEXT", properties = {
@ClusterTest(types = {Type.KRAFT}, securityProtocol = "PLAINTEXT", properties = {
@ClusterProperty(key = "inter.broker.protocol.version", value = "2.7-IV2"),
@ClusterProperty(key = "socket.send.buffer.bytes", value = "10240"),
})

View File

@ -1,372 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.test.junit;
import kafka.api.IntegrationTestHarness;
import kafka.network.SocketServer;
import kafka.server.ControllerServer;
import kafka.server.KafkaBroker;
import kafka.server.KafkaServer;
import kafka.test.ClusterConfig;
import kafka.test.ClusterInstance;
import kafka.test.annotation.Type;
import kafka.utils.EmptyTestInfo;
import kafka.utils.TestUtils;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.junit.jupiter.api.extension.AfterTestExecutionCallback;
import org.junit.jupiter.api.extension.BeforeTestExecutionCallback;
import org.junit.jupiter.api.extension.Extension;
import org.junit.jupiter.api.extension.TestTemplateInvocationContext;
import java.io.File;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import scala.Option;
import scala.collection.JavaConverters;
import scala.collection.Seq;
import scala.compat.java8.OptionConverters;
import static java.util.Objects.requireNonNull;
import static org.apache.kafka.server.config.ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG;
/**
* Wraps a {@link IntegrationTestHarness} inside lifecycle methods for a test invocation. Each instance of this
* class is provided with a configuration for the cluster.
*
* This context also provides parameter resolvers for:
*
* <ul>
* <li>ClusterConfig (the same instance passed to the constructor)</li>
* <li>ClusterInstance (includes methods to expose underlying SocketServer-s)</li>
* </ul>
*/
public class ZkClusterInvocationContext implements TestTemplateInvocationContext {
private final String baseDisplayName;
private final ClusterConfig clusterConfig;
private final AtomicReference<ClusterConfigurableIntegrationHarness> clusterReference;
public ZkClusterInvocationContext(String baseDisplayName, ClusterConfig clusterConfig) {
this.baseDisplayName = baseDisplayName;
this.clusterConfig = clusterConfig;
this.clusterReference = new AtomicReference<>();
}
@Override
public String getDisplayName(int invocationIndex) {
return String.format("%s [%d] Type=ZK, %s", baseDisplayName, invocationIndex, String.join(",", clusterConfig.displayTags()));
}
@Override
public List<Extension> getAdditionalExtensions() {
if (clusterConfig.numControllers() != 1) {
throw new IllegalArgumentException("For ZK clusters, please specify exactly 1 controller.");
}
ClusterInstance clusterShim = new ZkClusterInstance(clusterConfig, clusterReference);
return Arrays.asList(
(BeforeTestExecutionCallback) context -> {
// We have to wait to actually create the underlying cluster until after our @BeforeEach methods
// have run. This allows tests to set up external dependencies like ZK, MiniKDC, etc.
// However, since we cannot create this instance until we are inside the test invocation, we have
// to use a container class (AtomicReference) to provide this cluster object to the test itself
clusterReference.set(new ClusterConfigurableIntegrationHarness(clusterConfig));
if (clusterConfig.isAutoStart()) {
clusterShim.start();
}
},
(AfterTestExecutionCallback) context -> clusterShim.stop(),
new ClusterInstanceParameterResolver(clusterShim)
);
}
public static class ZkClusterInstance implements ClusterInstance {
final AtomicReference<ClusterConfigurableIntegrationHarness> clusterReference;
final ClusterConfig config;
final AtomicBoolean started = new AtomicBoolean(false);
final AtomicBoolean stopped = new AtomicBoolean(false);
ZkClusterInstance(ClusterConfig config, AtomicReference<ClusterConfigurableIntegrationHarness> clusterReference) {
this.config = config;
this.clusterReference = clusterReference;
}
@Override
public String bootstrapServers() {
return TestUtils.bootstrapServers(clusterReference.get().servers(), clusterReference.get().listenerName());
}
@Override
public String bootstrapControllers() {
throw new RuntimeException("Cannot use --bootstrap-controller with ZK-based clusters.");
}
@Override
public ListenerName clientListener() {
return clusterReference.get().listenerName();
}
@Override
public Optional<ListenerName> controlPlaneListenerName() {
return OptionConverters.toJava(clusterReference.get().servers().head().config().controlPlaneListenerName());
}
@Override
public Collection<SocketServer> controllerSocketServers() {
return brokers().values().stream()
.filter(s -> ((KafkaServer) s).kafkaController().isActive())
.map(KafkaBroker::socketServer)
.collect(Collectors.toList());
}
@Override
public String clusterId() {
return brokers().values().stream().findFirst().map(KafkaBroker::clusterId).orElseThrow(
() -> new RuntimeException("No broker instances found"));
}
@Override
public Type type() {
return Type.ZK;
}
@Override
public ClusterConfig config() {
return config;
}
@Override
public Set<Integer> controllerIds() {
return brokerIds();
}
@Override
public IntegrationTestHarness getUnderlying() {
return clusterReference.get();
}
@Override
public Admin createAdminClient(Properties configOverrides) {
return clusterReference.get().createAdminClient(clientListener(), configOverrides);
}
@Override
public void start() {
if (started.compareAndSet(false, true)) {
clusterReference.get().setUp(new EmptyTestInfo());
}
}
@Override
public void stop() {
if (stopped.compareAndSet(false, true)) {
clusterReference.get().tearDown();
}
}
@Override
public void shutdownBroker(int brokerId) {
findBrokerOrThrow(brokerId).shutdown();
}
@Override
public void startBroker(int brokerId) {
findBrokerOrThrow(brokerId).startup();
}
@Override
public void waitTopicDeletion(String topic) throws InterruptedException {
org.apache.kafka.test.TestUtils.waitForCondition(
() -> !clusterReference.get().zkClient().isTopicMarkedForDeletion(topic),
String.format("Admin path /admin/delete_topics/%s path not deleted even after a replica is restarted", topic)
);
org.apache.kafka.test.TestUtils.waitForCondition(
() -> !clusterReference.get().zkClient().topicExists(topic),
String.format("Topic path /brokers/topics/%s not deleted after /admin/delete_topics/%s path is deleted", topic, topic)
);
ClusterInstance.super.waitTopicDeletion(topic);
}
/**
* Restart brokers with given cluster config.
*
* @param clusterConfig clusterConfig is optional. If left Optional.empty(), brokers will restart without
* reconfiguring configurations. Otherwise, the restart will reconfigure configurations
* according to the provided cluster config.
*/
public void rollingBrokerRestart(Optional<ClusterConfig> clusterConfig) {
requireNonNull(clusterConfig);
if (!started.get()) {
throw new IllegalStateException("Tried to restart brokers but the cluster has not been started!");
}
for (int i = 0; i < clusterReference.get().brokerCount(); i++) {
clusterReference.get().killBroker(i);
}
clusterConfig.ifPresent(config -> clusterReference.get().setClusterConfig(config));
clusterReference.get().restartDeadBrokers(true);
clusterReference.get().adminClientConfig().put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers());
}
@Override
public void waitForReadyBrokers() throws InterruptedException {
org.apache.kafka.test.TestUtils.waitForCondition(() -> {
int numRegisteredBrokers = clusterReference.get().zkClient().getAllBrokersInCluster().size();
return numRegisteredBrokers == config.numBrokers();
}, "Timed out while waiting for brokers to become ready");
}
private KafkaServer findBrokerOrThrow(int brokerId) {
return brokers().values().stream()
.filter(server -> server.config().brokerId() == brokerId)
.map(s -> (KafkaServer) s)
.findFirst()
.orElseThrow(() -> new IllegalArgumentException("Unknown brokerId " + brokerId));
}
@Override
public Map<Integer, ControllerServer> controllers() {
return Collections.emptyMap();
}
@Override
public Map<Integer, KafkaBroker> brokers() {
return JavaConverters.asJavaCollection(clusterReference.get().servers())
.stream().collect(Collectors.toMap(s -> s.config().brokerId(), s -> s));
}
}
// This is what tests normally extend from to start a cluster, here we extend it and
// configure the cluster using values from ClusterConfig.
private static class ClusterConfigurableIntegrationHarness extends IntegrationTestHarness {
private ClusterConfig clusterConfig;
private ClusterConfigurableIntegrationHarness(ClusterConfig clusterConfig) {
this.clusterConfig = Objects.requireNonNull(clusterConfig);
}
public void setClusterConfig(ClusterConfig clusterConfig) {
this.clusterConfig = Objects.requireNonNull(clusterConfig);
}
@Override
public void modifyConfigs(Seq<Properties> props) {
super.modifyConfigs(props);
for (int i = 0; i < props.length(); i++) {
props.apply(i).putAll(clusterConfig.perServerOverrideProperties().getOrDefault(i, Collections.emptyMap()));
}
}
@Override
public Properties serverConfig() {
Properties props = new Properties();
props.putAll(clusterConfig.serverProperties());
props.put(INTER_BROKER_PROTOCOL_VERSION_CONFIG, clusterConfig.metadataVersion().version());
return props;
}
@Override
public Properties adminClientConfig() {
Properties props = new Properties();
props.putAll(clusterConfig.adminClientProperties());
return props;
}
@Override
public Properties consumerConfig() {
Properties props = new Properties();
props.putAll(clusterConfig.consumerProperties());
return props;
}
@Override
public Properties producerConfig() {
Properties props = new Properties();
props.putAll(clusterConfig.producerProperties());
return props;
}
@Override
public SecurityProtocol securityProtocol() {
return clusterConfig.securityProtocol();
}
@Override
public ListenerName listenerName() {
return clusterConfig.listenerName().map(ListenerName::normalised)
.orElseGet(() -> ListenerName.forSecurityProtocol(securityProtocol()));
}
@Override
public Option<Properties> serverSaslProperties() {
if (clusterConfig.saslServerProperties().isEmpty()) {
return Option.empty();
} else {
Properties props = new Properties();
props.putAll(clusterConfig.saslServerProperties());
return Option.apply(props);
}
}
@Override
public Option<Properties> clientSaslProperties() {
if (clusterConfig.saslClientProperties().isEmpty()) {
return Option.empty();
} else {
Properties props = new Properties();
props.putAll(clusterConfig.saslClientProperties());
return Option.apply(props);
}
}
@Override
public int brokerCount() {
// Controllers are also brokers in zk mode, so just use broker count
return clusterConfig.numBrokers();
}
@Override
public int logDirCount() {
return clusterConfig.numDisksPerBroker();
}
@Override
public Option<File> trustStoreFile() {
return OptionConverters.toScala(clusterConfig.trustStoreFile());
}
}
}

View File

@ -39,7 +39,7 @@ import java.util.concurrent.TimeUnit
class GroupCoordinatorIntegrationTest(cluster: ClusterInstance) {
@ClusterTest(
types = Array(Type.KRAFT, Type.ZK),
types = Array(Type.KRAFT),
serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),

View File

@ -19,41 +19,22 @@ package kafka.coordinator.transaction
import kafka.network.SocketServer
import kafka.server.IntegrationTestUtils
import kafka.test.{ClusterConfig, ClusterInstance}
import kafka.test.annotation.{AutoStart, ClusterConfigProperty, ClusterTemplate, ClusterTest, ClusterTestDefaults, ClusterTests, Type}
import kafka.test.ClusterInstance
import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, ClusterTests, Type}
import kafka.test.junit.ClusterTestExtensions
import org.apache.kafka.common.message.InitProducerIdRequestData
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.RecordBatch
import org.apache.kafka.common.requests.{InitProducerIdRequest, InitProducerIdResponse}
import org.apache.kafka.server.config.ReplicationConfigs
import org.apache.kafka.server.common.MetadataVersion
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
import org.junit.jupiter.api.extension.ExtendWith
import org.junit.jupiter.api.{Disabled, Timeout}
import java.util.stream.{Collectors, IntStream}
import scala.concurrent.duration.DurationInt
import scala.jdk.CollectionConverters._
object ProducerIdsIntegrationTest {
def uniqueProducerIdsBumpIBP(): java.util.List[ClusterConfig] = {
val serverProperties = java.util.Collections.singletonMap(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG, "2.8")
val perBrokerProperties: java.util.Map[Integer, java.util.Map[String, String]] =
java.util.Collections.singletonMap(0,
java.util.Collections.singletonMap(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG, "3.0-IV0"))
List(ClusterConfig.defaultBuilder()
.setTypes(Set(Type.ZK).asJava)
.setBrokers(3)
.setAutoStart(false)
.setServerProperties(serverProperties)
.setPerServerProperties(perBrokerProperties)
.build()).asJava
}
}
@ClusterTestDefaults(serverProperties = Array(
new ClusterConfigProperty(key = "transaction.state.log.num.partitions", value = "1")
))
@ -61,41 +42,12 @@ object ProducerIdsIntegrationTest {
class ProducerIdsIntegrationTest {
@ClusterTests(Array(
new ClusterTest(types = Array(Type.ZK), brokers = 3, metadataVersion = MetadataVersion.IBP_2_8_IV1),
new ClusterTest(types = Array(Type.ZK), brokers = 3, metadataVersion = MetadataVersion.IBP_3_0_IV0),
new ClusterTest(types = Array(Type.KRAFT), brokers = 3, metadataVersion = MetadataVersion.IBP_3_3_IV0)
))
def testUniqueProducerIds(clusterInstance: ClusterInstance): Unit = {
verifyUniqueIds(clusterInstance)
}
@ClusterTemplate("uniqueProducerIdsBumpIBP")
def testUniqueProducerIdsBumpIBP(clusterInstance: ClusterInstance): Unit = {
clusterInstance.start()
verifyUniqueIds(clusterInstance)
clusterInstance.stop()
}
@ClusterTest(types = Array(Type.ZK), brokers = 1, autoStart = AutoStart.NO, serverProperties = Array(
new ClusterConfigProperty(key = "num.io.threads", value = "1")
))
@Timeout(20)
def testHandleAllocateProducerIdsSingleRequestHandlerThread(clusterInstance: ClusterInstance): Unit = {
clusterInstance.start()
verifyUniqueIds(clusterInstance)
clusterInstance.stop()
}
@Disabled // TODO: Enable once producer id block size is configurable (KAFKA-15029)
@ClusterTest(types = Array(Type.ZK), brokers = 1, autoStart = AutoStart.NO, serverProperties = Array(
new ClusterConfigProperty(key = "num.io.threads", value = "2")
))
def testMultipleAllocateProducerIdsRequest(clusterInstance: ClusterInstance): Unit = {
clusterInstance.start()
verifyUniqueIds(clusterInstance)
clusterInstance.stop()
}
private def verifyUniqueIds(clusterInstance: ClusterInstance): Unit = {
// Request enough PIDs from each broker to ensure each broker generates two blocks
val ids = clusterInstance.brokerSocketServers().stream().flatMap( broker => {

View File

@ -1,131 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server
import kafka.test.{ClusterConfig, ClusterInstance}
import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, Type}
import kafka.test.junit.ClusterTestExtensions
import kafka.test.junit.ZkClusterInvocationContext.ZkClusterInstance
import kafka.testkit.{KafkaClusterTestKit, TestKitNodes}
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.raft.QuorumConfig
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.config.{KRaftConfigs, ZkConfigs}
import org.junit.jupiter.api.Assertions.{assertThrows, fail}
import org.junit.jupiter.api.extension.ExtendWith
import java.util.Optional
import java.util.concurrent.{TimeUnit, TimeoutException}
import scala.jdk.CollectionConverters._
/**
* This test creates a full ZK cluster and a controller-only KRaft cluster and configures the ZK brokers to register
* themselves with the KRaft controller. This is mainly a happy-path test since the only way to reliably test the
* failure paths is to use timeouts. See {@link unit.kafka.server.BrokerRegistrationRequestTest} for integration test
* of just the broker registration path.
*/
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
class KafkaServerKRaftRegistrationTest {
@ClusterTest(types = Array(Type.ZK), brokers = 3, metadataVersion = MetadataVersion.IBP_3_4_IV0, serverProperties = Array(
new ClusterConfigProperty(key = "inter.broker.listener.name", value = "EXTERNAL"),
new ClusterConfigProperty(key = "listeners", value = "PLAINTEXT://localhost:0,EXTERNAL://localhost:0"),
new ClusterConfigProperty(key = "advertised.listeners", value = "PLAINTEXT://localhost:0,EXTERNAL://localhost:0"),
new ClusterConfigProperty(key = "listener.security.protocol.map", value = "EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT")
))
def testRegisterZkBrokerInKraft(zkCluster: ClusterInstance): Unit = {
val clusterId = zkCluster.clusterId()
// Bootstrap the ZK cluster ID into KRaft
val kraftCluster = new KafkaClusterTestKit.Builder(
new TestKitNodes.Builder().
setBootstrapMetadataVersion(MetadataVersion.IBP_3_4_IV0).
setClusterId(clusterId).
setNumBrokerNodes(0).
setNumControllerNodes(1).build())
.setConfigProp(KRaftConfigs.MIGRATION_ENABLED_CONFIG, "true")
.setConfigProp(ZkConfigs.ZK_CONNECT_CONFIG, zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkConnect)
.build()
try {
kraftCluster.format()
kraftCluster.startup()
val readyFuture = kraftCluster.controllers().values().asScala.head.controller.waitForReadyBrokers(3)
// Enable migration configs and restart brokers
val serverProperties = new java.util.HashMap[String, String](zkCluster.config().serverProperties())
serverProperties.put(KRaftConfigs.MIGRATION_ENABLED_CONFIG, "true")
serverProperties.put(QuorumConfig.QUORUM_VOTERS_CONFIG, kraftCluster.quorumVotersConfig())
serverProperties.put(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER")
serverProperties.put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, "CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT")
val clusterConfig = ClusterConfig.builder(zkCluster.config())
.setServerProperties(serverProperties)
.build()
zkCluster.asInstanceOf[ZkClusterInstance].rollingBrokerRestart(Optional.of(clusterConfig))
zkCluster.waitForReadyBrokers()
try {
// Wait until all three ZK brokers are registered with KRaft controller
readyFuture.get(30, TimeUnit.SECONDS)
} catch {
case _: TimeoutException => fail("Did not see 3 brokers within 30 seconds")
case t: Throwable => fail("Had some other error waiting for brokers", t)
}
} finally {
shutdownInSequence(zkCluster, kraftCluster)
}
}
@ClusterTest(types = Array(Type.ZK), brokers = 3, metadataVersion = MetadataVersion.IBP_3_3_IV0)
def testRestartOldIbpZkBrokerInMigrationMode(zkCluster: ClusterInstance): Unit = {
// Bootstrap the ZK cluster ID into KRaft
val clusterId = zkCluster.clusterId()
val kraftCluster = new KafkaClusterTestKit.Builder(
new TestKitNodes.Builder().
setBootstrapMetadataVersion(MetadataVersion.IBP_3_4_IV0).
setClusterId(clusterId).
setNumBrokerNodes(0).
setNumControllerNodes(1).build())
.setConfigProp(KRaftConfigs.MIGRATION_ENABLED_CONFIG, "true")
.setConfigProp(ZkConfigs.ZK_CONNECT_CONFIG, zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkConnect)
.build()
try {
kraftCluster.format()
kraftCluster.startup()
// Enable migration configs and restart brokers
val serverProperties = new java.util.HashMap[String, String](zkCluster.config().serverProperties())
serverProperties.put(KRaftConfigs.MIGRATION_ENABLED_CONFIG, "true")
serverProperties.put(QuorumConfig.QUORUM_VOTERS_CONFIG, kraftCluster.quorumVotersConfig())
serverProperties.put(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER")
serverProperties.put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT")
val clusterConfig = ClusterConfig.builder(zkCluster.config())
.setServerProperties(serverProperties)
.build()
assertThrows(classOf[IllegalArgumentException], () => zkCluster.asInstanceOf[ZkClusterInstance].rollingBrokerRestart(Optional.of(clusterConfig)))
} finally {
shutdownInSequence(zkCluster, kraftCluster)
}
}
def shutdownInSequence(zkCluster: ClusterInstance, kraftCluster: KafkaClusterTestKit): Unit = {
zkCluster.brokerIds().forEach(zkCluster.shutdownBroker(_))
kraftCluster.close()
zkCluster.stop()
}
}

View File

@ -17,72 +17,19 @@
package kafka.server
import kafka.test.{ClusterConfig, ClusterInstance}
import kafka.test.ClusterInstance
import org.apache.kafka.common.message.ApiVersionsRequestData
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.ApiVersionsRequest
import kafka.test.annotation.{ClusterConfigProperty, ClusterTemplate, ClusterTest, Type}
import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, Type}
import kafka.test.junit.ClusterTestExtensions
import org.apache.kafka.server.common.MetadataVersion
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.extension.ExtendWith
import scala.jdk.CollectionConverters._
object ApiVersionsRequestTest {
def controlPlaneListenerProperties(): java.util.HashMap[String, String] = {
// Configure control plane listener to make sure we have separate listeners for testing.
val serverProperties = new java.util.HashMap[String, String]()
serverProperties.put("control.plane.listener.name", "CONTROL_PLANE")
serverProperties.put("listener.security.protocol.map", "CONTROL_PLANE:PLAINTEXT,PLAINTEXT:PLAINTEXT")
serverProperties.put("listeners", "PLAINTEXT://localhost:0,CONTROL_PLANE://localhost:0")
serverProperties.put("advertised.listeners", "PLAINTEXT://localhost:0,CONTROL_PLANE://localhost:0")
serverProperties
}
def testApiVersionsRequestTemplate(): java.util.List[ClusterConfig] = {
val serverProperties: java.util.HashMap[String, String] = controlPlaneListenerProperties()
serverProperties.put("unstable.api.versions.enable", "false")
serverProperties.put("unstable.feature.versions.enable", "true")
List(ClusterConfig.defaultBuilder()
.setTypes(java.util.Collections.singleton(Type.ZK))
.setServerProperties(serverProperties)
.setMetadataVersion(MetadataVersion.latestTesting())
.build()).asJava
}
def testApiVersionsRequestIncludesUnreleasedApisTemplate(): java.util.List[ClusterConfig] = {
val serverProperties: java.util.HashMap[String, String] = controlPlaneListenerProperties()
serverProperties.put("unstable.api.versions.enable", "true")
serverProperties.put("unstable.feature.versions.enable", "true")
List(ClusterConfig.defaultBuilder()
.setTypes(java.util.Collections.singleton(Type.ZK))
.setServerProperties(serverProperties)
.build()).asJava
}
def testApiVersionsRequestValidationV0Template(): java.util.List[ClusterConfig] = {
val serverProperties: java.util.HashMap[String, String] = controlPlaneListenerProperties()
serverProperties.put("unstable.api.versions.enable", "false")
serverProperties.put("unstable.feature.versions.enable", "false")
List(ClusterConfig.defaultBuilder()
.setTypes(java.util.Collections.singleton(Type.ZK))
.setMetadataVersion(MetadataVersion.latestProduction())
.build()).asJava
}
def zkApiVersionsRequest(): java.util.List[ClusterConfig] = {
List(ClusterConfig.defaultBuilder()
.setTypes(java.util.Collections.singleton(Type.ZK))
.setServerProperties(controlPlaneListenerProperties())
.build()).asJava
}
}
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
class ApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVersionsRequestTest(cluster) {
@ClusterTemplate("testApiVersionsRequestTemplate")
@ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT), serverProperties = Array(
new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "false"),
new ClusterConfigProperty(key = "unstable.feature.versions.enable", value = "true")
@ -93,7 +40,6 @@ class ApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVersio
validateApiVersionsResponse(apiVersionsResponse)
}
@ClusterTemplate("testApiVersionsRequestIncludesUnreleasedApisTemplate")
@ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT), serverProperties = Array(
new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "true"),
new ClusterConfigProperty(key = "unstable.feature.versions.enable", value = "true"),
@ -104,13 +50,6 @@ class ApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVersio
validateApiVersionsResponse(apiVersionsResponse, enableUnstableLastVersion = true)
}
@ClusterTemplate("zkApiVersionsRequest")
def testApiVersionsRequestThroughControlPlaneListener(): Unit = {
val request = new ApiVersionsRequest.Builder().build()
val apiVersionsResponse = sendApiVersionsRequest(request, cluster.controlPlaneListenerName().get())
validateApiVersionsResponse(apiVersionsResponse, cluster.controlPlaneListenerName().get(), true)
}
@ClusterTest(types = Array(Type.KRAFT))
def testApiVersionsRequestThroughControllerListener(): Unit = {
val request = new ApiVersionsRequest.Builder().build()
@ -118,7 +57,6 @@ class ApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVersio
validateApiVersionsResponse(apiVersionsResponse, cluster.controllerListenerName.get(), enableUnstableLastVersion = true)
}
@ClusterTemplate("zkApiVersionsRequest")
@ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT))
def testApiVersionsRequestWithUnsupportedVersion(): Unit = {
val apiVersionsRequest = new ApiVersionsRequest.Builder().build()
@ -132,7 +70,6 @@ class ApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVersio
}
// Use the latest production MV for this test
@ClusterTemplate("testApiVersionsRequestValidationV0Template")
@ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT), metadataVersion = MetadataVersion.IBP_3_8_IV0, serverProperties = Array(
new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "false"),
new ClusterConfigProperty(key = "unstable.feature.versions.enable", value = "false"),
@ -145,13 +82,6 @@ class ApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVersio
cluster.config().serverProperties().get("unstable.api.versions.enable")))
}
@ClusterTemplate("zkApiVersionsRequest")
def testApiVersionsRequestValidationV0ThroughControlPlaneListener(): Unit = {
val apiVersionsRequest = new ApiVersionsRequest.Builder().build(0.asInstanceOf[Short])
val apiVersionsResponse = sendApiVersionsRequest(apiVersionsRequest, cluster.controlPlaneListenerName().get())
validateApiVersionsResponse(apiVersionsResponse, cluster.controlPlaneListenerName().get(), true)
}
@ClusterTest(types = Array(Type.KRAFT))
def testApiVersionsRequestValidationV0ThroughControllerListener(): Unit = {
val apiVersionsRequest = new ApiVersionsRequest.Builder().build(0.asInstanceOf[Short])
@ -159,7 +89,6 @@ class ApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVersio
validateApiVersionsResponse(apiVersionsResponse, cluster.controllerListenerName.get(), apiVersion = 0, enableUnstableLastVersion = true)
}
@ClusterTemplate("zkApiVersionsRequest")
@ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT))
def testApiVersionsRequestValidationV3(): Unit = {
// Invalid request because Name and Version are empty by default

View File

@ -21,7 +21,7 @@ import java.net.InetAddress
import java.util
import java.util.concurrent.{ExecutionException, TimeUnit}
import kafka.test.ClusterInstance
import kafka.test.annotation.{ClusterTest, Type}
import kafka.test.annotation.ClusterTest
import kafka.test.junit.ClusterTestExtensions
import kafka.utils.TestUtils
import org.apache.kafka.clients.admin.{ScramCredentialInfo, ScramMechanism, UserScramCredentialUpsertion}
@ -31,6 +31,7 @@ import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity,
import org.apache.kafka.common.requests.{AlterClientQuotasRequest, AlterClientQuotasResponse, DescribeClientQuotasRequest, DescribeClientQuotasResponse}
import org.apache.kafka.server.config.{QuotaConfigs, ZooKeeperInternals}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Disabled
import org.junit.jupiter.api.extension.ExtendWith
import scala.jdk.CollectionConverters._
@ -165,7 +166,8 @@ class ClientQuotasRequestTest(cluster: ClusterInstance) {
))
}
@ClusterTest(types = Array(Type.ZK)) // No SCRAM for Raft yet
@Disabled("TODO: KAFKA-17630 - Convert ClientQuotasRequestTest#testClientQuotasForScramUsers to kraft")
@ClusterTest
def testClientQuotasForScramUsers(): Unit = {
val userName = "user"

View File

@ -40,30 +40,6 @@ import scala.jdk.CollectionConverters._
@ClusterTestDefaults(types = Array(Type.KRAFT), brokers = 1)
class ConsumerGroupDescribeRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) {
@ClusterTest(types = Array(Type.ZK), serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "false"),
))
def testConsumerGroupDescribeWithZookeeperCluster(): Unit = {
val consumerGroupDescribeRequest = new ConsumerGroupDescribeRequest.Builder(
new ConsumerGroupDescribeRequestData().setGroupIds(List("grp-1", "grp-2").asJava)
).build(ApiKeys.CONSUMER_GROUP_DESCRIBE.latestVersion(isUnstableApiEnabled))
val consumerGroupDescribeResponse = connectAndReceive[ConsumerGroupDescribeResponse](consumerGroupDescribeRequest)
val expectedResponse = new ConsumerGroupDescribeResponseData()
expectedResponse.groups().add(
new ConsumerGroupDescribeResponseData.DescribedGroup()
.setGroupId("grp-1")
.setErrorCode(Errors.UNSUPPORTED_VERSION.code)
)
expectedResponse.groups.add(
new ConsumerGroupDescribeResponseData.DescribedGroup()
.setGroupId("grp-2")
.setErrorCode(Errors.UNSUPPORTED_VERSION.code)
)
assertEquals(expectedResponse, consumerGroupDescribeResponse.data)
}
@ClusterTest(
types = Array(Type.KRAFT),
serverProperties = Array(

View File

@ -50,7 +50,7 @@ class DeleteGroupsRequestTest(cluster: ClusterInstance) extends GroupCoordinator
}
@ClusterTest(
types = Array(Type.ZK, Type.KRAFT, Type.CO_KRAFT),
types = Array(Type.KRAFT, Type.CO_KRAFT),
serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "false"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic"),

View File

@ -39,7 +39,7 @@ class DescribeGroupsRequestTest(cluster: ClusterInstance) extends GroupCoordinat
testDescribeGroups()
}
@ClusterTest(types = Array(Type.ZK, Type.KRAFT, Type.CO_KRAFT), serverProperties = Array(
@ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT), serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "false"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),

View File

@ -16,13 +16,12 @@
*/
package kafka.server
import java.io.IOException
import kafka.test.ClusterInstance
import kafka.test.annotation.{ClusterTest, ClusterTestDefaults, Type}
import kafka.test.junit.ClusterTestExtensions
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.DescribeQuorumRequest.singletonRequest
import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, ApiVersionsRequest, ApiVersionsResponse, DescribeQuorumRequest, DescribeQuorumResponse}
import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, DescribeQuorumRequest, DescribeQuorumResponse}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.extension.ExtendWith
@ -33,21 +32,6 @@ import scala.reflect.ClassTag
@ClusterTestDefaults(types = Array(Type.KRAFT))
class DescribeQuorumRequestTest(cluster: ClusterInstance) {
@ClusterTest(types = Array(Type.ZK))
def testDescribeQuorumNotSupportedByZkBrokers(): Unit = {
val apiRequest = new ApiVersionsRequest.Builder().build()
val apiResponse = connectAndReceive[ApiVersionsResponse](apiRequest)
assertNull(apiResponse.apiVersion(ApiKeys.DESCRIBE_QUORUM.id))
val describeQuorumRequest = new DescribeQuorumRequest.Builder(
singletonRequest(KafkaRaftServer.MetadataPartition)
).build()
assertThrows(classOf[IOException], () => {
connectAndReceive[DescribeQuorumResponse](describeQuorumRequest)
})
}
@ClusterTest
def testDescribeQuorum(): Unit = {
for (version <- ApiKeys.DESCRIBE_QUORUM.allVersions.asScala) {

View File

@ -43,7 +43,7 @@ class HeartbeatRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBas
testHeartbeat()
}
@ClusterTest(types = Array(Type.ZK, Type.KRAFT, Type.CO_KRAFT), serverProperties = Array(
@ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT), serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "false"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),

View File

@ -37,7 +37,7 @@ class LeaveGroupRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBa
testLeaveGroup()
}
@ClusterTest(types = Array(Type.ZK, Type.KRAFT, Type.CO_KRAFT), serverProperties = Array(
@ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT), serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "false"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),

View File

@ -50,7 +50,7 @@ class ListGroupsRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBa
testListGroups(false)
}
@ClusterTest(types = Array(Type.ZK, Type.KRAFT, Type.CO_KRAFT), serverProperties = Array(
@ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT), serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "false"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),

View File

@ -48,7 +48,7 @@ class OffsetCommitRequestTest(cluster: ClusterInstance) extends GroupCoordinator
testOffsetCommit(false)
}
@ClusterTest(types = Array(Type.ZK, Type.KRAFT, Type.CO_KRAFT), serverProperties = Array(
@ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT), serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "false"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),

View File

@ -45,7 +45,7 @@ class OffsetDeleteRequestTest(cluster: ClusterInstance) extends GroupCoordinator
testOffsetDelete(false)
}
@ClusterTest(types = Array(Type.ZK, Type.KRAFT, Type.CO_KRAFT), serverProperties = Array(
@ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT), serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "false"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),

View File

@ -53,7 +53,7 @@ class OffsetFetchRequestTest(cluster: ClusterInstance) extends GroupCoordinatorB
testSingleGroupOffsetFetch(useNewProtocol = false, requireStable = false)
}
@ClusterTest(types = Array(Type.ZK, Type.KRAFT, Type.CO_KRAFT), serverProperties = Array(
@ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT), serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "false"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
@ -81,7 +81,7 @@ class OffsetFetchRequestTest(cluster: ClusterInstance) extends GroupCoordinatorB
testSingleGroupAllOffsetFetch(useNewProtocol = false, requireStable = false)
}
@ClusterTest(types = Array(Type.ZK, Type.KRAFT, Type.CO_KRAFT), serverProperties = Array(
@ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT), serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "false"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
@ -109,7 +109,7 @@ class OffsetFetchRequestTest(cluster: ClusterInstance) extends GroupCoordinatorB
testMultipleGroupsOffsetFetch(useNewProtocol = false, requireStable = false)
}
@ClusterTest(types = Array(Type.ZK, Type.KRAFT, Type.CO_KRAFT), serverProperties = Array(
@ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT), serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "false"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),

View File

@ -31,7 +31,7 @@ import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.network.SocketServerConfigs
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.extension.ExtendWith
import org.junit.jupiter.api.{AfterEach, BeforeEach}
import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled}
import java.net.Socket
import java.util.Collections
@ -60,7 +60,7 @@ object SaslApiVersionsRequestTest {
List(ClusterConfig.defaultBuilder
.setSecurityProtocol(securityProtocol)
.setTypes(Set(Type.ZK).asJava)
.setTypes(Set(Type.KRAFT).asJava)
.setSaslServerProperties(saslServerProperties)
.setSaslClientProperties(saslClientProperties)
.setServerProperties(serverProperties)
@ -68,6 +68,7 @@ object SaslApiVersionsRequestTest {
}
}
@Disabled("TODO: KAFKA-17631 - Convert SaslApiVersionsRequestTest to kraft")
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
class SaslApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVersionsRequestTest(cluster) {
private var sasl: SaslSetup = _

View File

@ -45,7 +45,7 @@ class SyncGroupRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBas
testSyncGroup()
}
@ClusterTest(types = Array(Type.ZK, Type.KRAFT, Type.CO_KRAFT), serverProperties = Array(
@ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT), serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "false"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),

View File

@ -49,13 +49,6 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
@ExtendWith(value = ClusterTestExtensions.class)
public class FeatureCommandTest {
@ClusterTest(types = {Type.ZK}, metadataVersion = MetadataVersion.IBP_3_3_IV1)
public void testDescribeWithZK(ClusterInstance cluster) {
String commandOutput = ToolsTestUtils.captureStandardOut(() ->
assertEquals(0, FeatureCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(), "describe"))
);
assertEquals("", commandOutput);
}
@ClusterTest(types = {Type.KRAFT}, metadataVersion = MetadataVersion.IBP_3_3_IV1)
public void testDescribeWithKRaft(ClusterInstance cluster) {
@ -96,16 +89,6 @@ public class FeatureCommandTest {
"SupportedMaxVersion: 2\tFinalizedVersionLevel: 0\t", outputWithoutEpoch(features.get(3)));
}
@ClusterTest(types = {Type.ZK}, metadataVersion = MetadataVersion.IBP_3_3_IV1)
public void testUpgradeMetadataVersionWithZk(ClusterInstance cluster) {
String commandOutput = ToolsTestUtils.captureStandardOut(() ->
assertEquals(1, FeatureCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(),
"upgrade", "--metadata", "3.3-IV2"))
);
assertEquals("Could not upgrade metadata.version to 6. Could not apply finalized feature " +
"update because the provided feature is not supported.", commandOutput);
}
@ClusterTest(types = {Type.KRAFT}, metadataVersion = MetadataVersion.IBP_3_3_IV1)
public void testUpgradeMetadataVersionWithKraft(ClusterInstance cluster) {
String commandOutput = ToolsTestUtils.captureStandardOut(() ->
@ -121,29 +104,6 @@ public class FeatureCommandTest {
assertEquals("metadata.version was upgraded to 6.", commandOutput);
}
@ClusterTest(types = {Type.ZK}, metadataVersion = MetadataVersion.IBP_3_3_IV1)
public void testDowngradeMetadataVersionWithZk(ClusterInstance cluster) {
String commandOutput = ToolsTestUtils.captureStandardOut(() ->
assertEquals(1, FeatureCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(),
"disable", "--feature", "metadata.version"))
);
assertEquals("Could not disable metadata.version. Can not delete non-existing finalized feature.", commandOutput);
commandOutput = ToolsTestUtils.captureStandardOut(() ->
assertEquals(1, FeatureCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(),
"downgrade", "--metadata", "3.3-IV0"))
);
assertEquals("Could not downgrade metadata.version to 4. Could not apply finalized feature " +
"update because the provided feature is not supported.", commandOutput);
commandOutput = ToolsTestUtils.captureStandardOut(() ->
assertEquals(1, FeatureCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(),
"downgrade", "--unsafe", "--metadata", "3.3-IV0"))
);
assertEquals("Could not downgrade metadata.version to 4. Could not apply finalized feature " +
"update because the provided feature is not supported.", commandOutput);
}
@ClusterTest(types = {Type.KRAFT}, metadataVersion = MetadataVersion.IBP_3_3_IV1)
public void testDowngradeMetadataVersionWithKRaft(ClusterInstance cluster) {
String commandOutput = ToolsTestUtils.captureStandardOut(() ->

View File

@ -62,7 +62,6 @@ import java.util.stream.Stream;
import static kafka.test.annotation.Type.CO_KRAFT;
import static kafka.test.annotation.Type.KRAFT;
import static kafka.test.annotation.Type.ZK;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ -149,7 +148,7 @@ public class GetOffsetShellTest {
// align listener name here as KafkaClusterTestKit (KRAFT/CO_KRAFT) the default
// broker listener name is EXTERNAL while in ZK it is PLAINTEXT
ClusterConfig.defaultBuilder()
.setTypes(Stream.of(ZK, KRAFT, CO_KRAFT).collect(Collectors.toSet()))
.setTypes(Stream.of(KRAFT, CO_KRAFT).collect(Collectors.toSet()))
.setServerProperties(serverProperties)
.setListenerName("EXTERNAL")
.build());

View File

@ -23,7 +23,6 @@ import kafka.test.annotation.Type;
import kafka.test.junit.ClusterTestExtensions;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Test;
@ -33,14 +32,11 @@ import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ExtendWith(value = ClusterTestExtensions.class)
@ -52,9 +48,9 @@ class MetadataQuorumCommandTest {
* 3. Fewer brokers than controllers
*/
@ClusterTests({
@ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, brokers = 2, controllers = 2),
@ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, brokers = 2, controllers = 1),
@ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, brokers = 1, controllers = 2),
@ClusterTest(brokers = 2, controllers = 2),
@ClusterTest(brokers = 2, controllers = 1),
@ClusterTest(brokers = 1, controllers = 2),
})
public void testDescribeQuorumReplicationSuccessful(ClusterInstance cluster) throws InterruptedException {
cluster.waitForReadyBrokers();
@ -94,9 +90,9 @@ class MetadataQuorumCommandTest {
* 3. Fewer brokers than controllers
*/
@ClusterTests({
@ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, brokers = 2, controllers = 2),
@ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, brokers = 2, controllers = 1),
@ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, brokers = 1, controllers = 2),
@ClusterTest(brokers = 2, controllers = 2),
@ClusterTest(brokers = 2, controllers = 1),
@ClusterTest(brokers = 1, controllers = 2),
})
public void testDescribeQuorumStatusSuccessful(ClusterInstance cluster) throws InterruptedException {
cluster.waitForReadyBrokers();
@ -135,7 +131,7 @@ class MetadataQuorumCommandTest {
}
}
@ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT})
@ClusterTest
public void testOnlyOneBrokerAndOneController(ClusterInstance cluster) {
String statusOutput = ToolsTestUtils.captureStandardOut(() ->
MetadataQuorumCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(), "describe", "--status")
@ -157,19 +153,6 @@ class MetadataQuorumCommandTest {
"--command-config", tmpfile.getAbsolutePath(), "describe", "--status"));
}
@ClusterTest(types = {Type.ZK})
public void testDescribeQuorumInZkMode(ClusterInstance cluster) {
assertInstanceOf(UnsupportedVersionException.class, assertThrows(
ExecutionException.class,
() -> MetadataQuorumCommand.execute("--bootstrap-server", cluster.bootstrapServers(), "describe", "--status")
).getCause());
assertInstanceOf(UnsupportedVersionException.class, assertThrows(
ExecutionException.class,
() -> MetadataQuorumCommand.execute("--bootstrap-server", cluster.bootstrapServers(), "describe", "--replication")
).getCause());
}
@ClusterTest(types = {Type.CO_KRAFT})
public void testHumanReadableOutput(ClusterInstance cluster) {
assertEquals(1, MetadataQuorumCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(), "describe", "--human-readable"));

View File

@ -367,7 +367,7 @@ public class TopicCommandTest {
.setBrokers(6)
.setServerProperties(serverProp)
.setPerServerProperties(rackInfo)
.setTypes(Stream.of(Type.ZK, Type.KRAFT).collect(Collectors.toSet()))
.setTypes(Stream.of(Type.KRAFT).collect(Collectors.toSet()))
.build()
);
}

View File

@ -37,11 +37,8 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static kafka.test.annotation.Type.CO_KRAFT;
import static kafka.test.annotation.Type.ZK;
import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG;
import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG;
import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG;
@ -51,26 +48,24 @@ import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_
/**
* The old test framework {@link kafka.api.BaseConsumerTest#getTestQuorumAndGroupProtocolParametersAll} test for the following cases:
* <ul>
* <li>(ZK / KRAFT servers) with (group.coordinator.new.enable=false) with (classic group protocol) = 2 cases</li>
* <li>(KRAFT servers) with (group.coordinator.new.enable=false) with (classic group protocol) = 1 cases</li>
* <li>(KRAFT server) with (group.coordinator.new.enable=true) with (classic group protocol) = 1 case</li>
* <li>(KRAFT server) with (group.coordinator.new.enable=true) with (consumer group protocol) = 1 case</li>
* </ul>
* <p>
* The new test framework run seven cases for the following cases:
* <ul>
* <li>(ZK / KRAFT / CO_KRAFT servers) with (group.coordinator.new.enable=false) with (classic group protocol) = 3 cases</li>
* <li>(KRAFT / CO_KRAFT servers) with (group.coordinator.new.enable=false) with (classic group protocol) = 2 cases</li>
* <li>(KRAFT / CO_KRAFT servers) with (group.coordinator.new.enable=true) with (classic group protocol) = 2 cases</li>
* <li>(KRAFT / CO_KRAFT servers) with (group.coordinator.new.enable=true) with (consumer group protocol) = 2 cases</li>
* </ul>
* <p>
* We can reduce the number of cases as same as the old test framework by using the following methods:
* <ul>
* <li>{@link #forKRaftGroupCoordinator} for the case of (consumer group protocol)</li>
* <li>(CO_KRAFT servers) with (group.coordinator.new.enable=true) with (classic / consumer group protocols) = 2 cases</li>
* </ul>
* <ul>
* <li>{@link #forZkGroupCoordinator} for the case of (classic group protocol)</li>
* <li>(ZK / KRAFT servers) with (group.coordinator.new.enable=false) with (classic group protocol) = 2 cases</li>
* <li>(KRAFT servers) with (group.coordinator.new.enable=false) with (classic group protocol) = 1 cases</li>
* </ul>
*/
class ConsumerGroupCommandTestUtils {
@ -79,12 +74,6 @@ class ConsumerGroupCommandTestUtils {
}
static List<ClusterConfig> generator() {
return Stream
.concat(forKRaftGroupCoordinator().stream(), forZkGroupCoordinator().stream())
.collect(Collectors.toList());
}
static List<ClusterConfig> forKRaftGroupCoordinator() {
Map<String, String> serverProperties = new HashMap<>();
serverProperties.put(OFFSETS_TOPIC_PARTITIONS_CONFIG, "1");
serverProperties.put(OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, "1");
@ -99,21 +88,6 @@ class ConsumerGroupCommandTestUtils {
.build());
}
static List<ClusterConfig> forZkGroupCoordinator() {
Map<String, String> serverProperties = new HashMap<>();
serverProperties.put(OFFSETS_TOPIC_PARTITIONS_CONFIG, "1");
serverProperties.put(OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, "1");
serverProperties.put(GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, "1000");
serverProperties.put(CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, "500");
serverProperties.put(CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, "500");
return Collections.singletonList(ClusterConfig.defaultBuilder()
.setTypes(Collections.singleton(ZK))
.setServerProperties(serverProperties)
.setTags(Collections.singletonList("zkGroupCoordinator"))
.build());
}
static <T> AutoCloseable buildConsumers(int numberOfConsumers,
Set<TopicPartition> partitions,
Supplier<KafkaConsumer<T, T>> consumerSupplier) {

View File

@ -82,10 +82,6 @@ public class ListConsumerGroupTest {
return ConsumerGroupCommandTestUtils.generator();
}
private static List<ClusterConfig> consumerProtocolOnlyGenerator() {
return ConsumerGroupCommandTestUtils.forKRaftGroupCoordinator();
}
private List<GroupProtocol> supportedGroupProtocols() {
return new ArrayList<>(clusterInstance.supportedGroupProtocols());
}
@ -243,7 +239,7 @@ public class ListConsumerGroupTest {
}
}
@ClusterTemplate("consumerProtocolOnlyGenerator")
@ClusterTemplate("defaultGenerator")
public void testListConsumerGroupsWithTypesConsumerProtocol() throws Exception {
GroupProtocol groupProtocol = GroupProtocol.CONSUMER;
String topic = TOPIC_PREFIX + groupProtocol.name;
@ -415,7 +411,7 @@ public class ListConsumerGroupTest {
}
}
@ClusterTemplate("consumerProtocolOnlyGenerator")
@ClusterTemplate("defaultGenerator")
public void testListGroupCommandConsumerProtocol() throws Exception {
GroupProtocol groupProtocol = GroupProtocol.CONSUMER;
String topic = TOPIC_PREFIX + groupProtocol.name;

View File

@ -81,7 +81,6 @@ import static java.util.Collections.singleton;
import static java.util.Collections.singletonList;
import static java.util.Collections.singletonMap;
import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC;
import static org.apache.kafka.server.common.MetadataVersion.IBP_2_7_IV1;
import static org.apache.kafka.server.common.MetadataVersion.IBP_3_3_IV0;
import static org.apache.kafka.server.config.QuotaConfigs.FOLLOWER_REPLICATION_THROTTLED_REPLICAS_CONFIG;
import static org.apache.kafka.server.config.QuotaConfigs.LEADER_REPLICATION_THROTTLED_REPLICAS_CONFIG;
@ -133,7 +132,6 @@ public class ReassignPartitionsCommandTest {
}
@ClusterTests({
@ClusterTest(types = {Type.ZK}, metadataVersion = IBP_2_7_IV1),
@ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, metadataVersion = IBP_3_3_IV0)
})
public void testReassignmentWithAlterPartitionDisabled() throws Exception {
@ -146,11 +144,6 @@ public class ReassignPartitionsCommandTest {
}
@ClusterTests({
@ClusterTest(types = {Type.ZK}, serverProperties = {
@ClusterConfigProperty(id = 1, key = INTER_BROKER_PROTOCOL_VERSION_CONFIG, value = "2.7-IV1"),
@ClusterConfigProperty(id = 2, key = INTER_BROKER_PROTOCOL_VERSION_CONFIG, value = "2.7-IV1"),
@ClusterConfigProperty(id = 3, key = INTER_BROKER_PROTOCOL_VERSION_CONFIG, value = "2.7-IV1"),
}),
@ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, serverProperties = {
@ClusterConfigProperty(id = 1, key = INTER_BROKER_PROTOCOL_VERSION_CONFIG, value = "3.3-IV0"),
@ClusterConfigProperty(id = 2, key = INTER_BROKER_PROTOCOL_VERSION_CONFIG, value = "3.3-IV0"),
@ -416,7 +409,7 @@ public class ReassignPartitionsCommandTest {
/**
* Test moving partitions between directories.
*/
@ClusterTest(types = {Type.ZK, Type.KRAFT})
@ClusterTest(types = {Type.KRAFT})
public void testLogDirReassignment() throws Exception {
createTopics();
TopicPartition topicPartition = new TopicPartition("foo", 0);
@ -464,7 +457,7 @@ public class ReassignPartitionsCommandTest {
}
}
@ClusterTest(types = {Type.ZK, Type.KRAFT})
@ClusterTest(types = {Type.KRAFT})
public void testAlterLogDirReassignmentThrottle() throws Exception {
createTopics();
TopicPartition topicPartition = new TopicPartition("foo", 0);