diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 70258e8dac5..b76ebff59cb 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -67,10 +67,10 @@ import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol} import org.apache.kafka.common.security.token.delegation.{DelegationToken, TokenInformation} import org.apache.kafka.common.utils.{ProducerIdAndEpoch, Time} import org.apache.kafka.common.{Node, TopicIdPartition, TopicPartition, Uuid} -import org.apache.kafka.coordinator.group.GroupCoordinator +import org.apache.kafka.coordinator.group.{Group, GroupCoordinator} import org.apache.kafka.server.ClientMetricsManager import org.apache.kafka.server.authorizer._ -import org.apache.kafka.server.common.MetadataVersion +import org.apache.kafka.server.common.{GroupVersion, MetadataVersion} import org.apache.kafka.server.common.MetadataVersion.{IBP_0_11_0_IV0, IBP_2_3_IV0} import org.apache.kafka.server.record.BrokerCompressionType import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchIsolation, FetchParams, FetchPartitionData} @@ -3797,10 +3797,15 @@ class KafkaApis(val requestChannel: RequestChannel, ) } + private def isConsumerGroupProtocolEnabled(): Boolean = { + val version = metadataCache.features().finalizedFeatures().getOrDefault(GroupVersion.FEATURE_NAME, 0.toShort) + config.groupCoordinatorRebalanceProtocols.contains(Group.GroupType.CONSUMER) && version >= GroupVersion.GV_1.featureLevel + } + def handleConsumerGroupHeartbeat(request: RequestChannel.Request): CompletableFuture[Unit] = { val consumerGroupHeartbeatRequest = request.body[ConsumerGroupHeartbeatRequest] - if (!config.isNewGroupCoordinatorEnabled) { + if (!isConsumerGroupProtocolEnabled()) { // The API is not supported by the "old" group coordinator (the default). If the // new one is not enabled, we fail directly here. requestHelper.sendMaybeThrottle(request, consumerGroupHeartbeatRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) @@ -3825,7 +3830,7 @@ class KafkaApis(val requestChannel: RequestChannel, def handleConsumerGroupDescribe(request: RequestChannel.Request): CompletableFuture[Unit] = { val consumerGroupDescribeRequest = request.body[ConsumerGroupDescribeRequest] - if (!config.isNewGroupCoordinatorEnabled) { + if (!isConsumerGroupProtocolEnabled()) { // The API is not supported by the "old" group coordinator (the default). If the // new one is not enabled, we fail directly here. requestHelper.sendMaybeThrottle(request, request.body[ConsumerGroupDescribeRequest].getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) diff --git a/core/src/test/java/kafka/test/ClusterConfig.java b/core/src/test/java/kafka/test/ClusterConfig.java index c116a7d5050..b72bb1a5787 100644 --- a/core/src/test/java/kafka/test/ClusterConfig.java +++ b/core/src/test/java/kafka/test/ClusterConfig.java @@ -19,6 +19,7 @@ package kafka.test; import kafka.test.annotation.Type; import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.server.common.Features; import org.apache.kafka.server.common.MetadataVersion; import java.io.File; @@ -58,13 +59,15 @@ public class ClusterConfig { private final Map saslClientProperties; private final List tags; private final Map> perServerProperties; + private final Map features; @SuppressWarnings("checkstyle:ParameterNumber") private ClusterConfig(Set types, int brokers, int controllers, int disksPerBroker, boolean autoStart, SecurityProtocol securityProtocol, String listenerName, File trustStoreFile, MetadataVersion metadataVersion, Map serverProperties, Map producerProperties, Map consumerProperties, Map adminClientProperties, Map saslServerProperties, - Map saslClientProperties, Map> perServerProperties, List tags) { + Map saslClientProperties, Map> perServerProperties, List tags, + Map features) { // do fail fast. the following values are invalid for both zk and kraft modes. if (brokers < 0) throw new IllegalArgumentException("Number of brokers must be greater or equal to zero."); if (controllers < 0) throw new IllegalArgumentException("Number of controller must be greater or equal to zero."); @@ -87,6 +90,7 @@ public class ClusterConfig { this.saslClientProperties = Objects.requireNonNull(saslClientProperties); this.perServerProperties = Objects.requireNonNull(perServerProperties); this.tags = Objects.requireNonNull(tags); + this.features = Objects.requireNonNull(features); } public Set clusterTypes() { @@ -157,6 +161,10 @@ public class ClusterConfig { return tags; } + public Map features() { + return features; + } + public Set displayTags() { Set displayTags = new LinkedHashSet<>(tags); displayTags.add("MetadataVersion=" + metadataVersion); @@ -198,7 +206,8 @@ public class ClusterConfig { .setSaslServerProperties(clusterConfig.saslServerProperties) .setSaslClientProperties(clusterConfig.saslClientProperties) .setPerServerProperties(clusterConfig.perServerProperties) - .setTags(clusterConfig.tags); + .setTags(clusterConfig.tags) + .setFeatures(clusterConfig.features); } public static class Builder { @@ -219,6 +228,7 @@ public class ClusterConfig { private Map saslClientProperties = Collections.emptyMap(); private Map> perServerProperties = Collections.emptyMap(); private List tags = Collections.emptyList(); + private Map features = Collections.emptyMap(); private Builder() {} @@ -309,11 +319,16 @@ public class ClusterConfig { return this; } + public Builder setFeatures(Map features) { + this.features = Collections.unmodifiableMap(features); + return this; + } + public ClusterConfig build() { return new ClusterConfig(types, brokers, controllers, disksPerBroker, autoStart, securityProtocol, listenerName, trustStoreFile, metadataVersion, serverProperties, producerProperties, consumerProperties, adminClientProperties, saslServerProperties, saslClientProperties, - perServerProperties, tags); + perServerProperties, tags, features); } } } diff --git a/core/src/test/java/kafka/test/ClusterInstance.java b/core/src/test/java/kafka/test/ClusterInstance.java index 8cb5ae3d215..03e19f39b62 100644 --- a/core/src/test/java/kafka/test/ClusterInstance.java +++ b/core/src/test/java/kafka/test/ClusterInstance.java @@ -40,7 +40,6 @@ import java.util.stream.Collectors; import static org.apache.kafka.clients.consumer.GroupProtocol.CLASSIC; import static org.apache.kafka.clients.consumer.GroupProtocol.CONSUMER; import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG; -import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG; public interface ClusterInstance { @@ -159,9 +158,7 @@ public interface ClusterInstance { Set supportedGroupProtocols = new HashSet<>(); supportedGroupProtocols.add(CLASSIC); - // KafkaConfig#isNewGroupCoordinatorEnabled check both NEW_GROUP_COORDINATOR_ENABLE_CONFIG and GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG - if (serverProperties.getOrDefault(NEW_GROUP_COORDINATOR_ENABLE_CONFIG, "").equals("true") || - serverProperties.getOrDefault(GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, "").contains("consumer")) { + if (serverProperties.getOrDefault(GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, "").contains("consumer")) { supportedGroupProtocols.add(CONSUMER); } diff --git a/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java b/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java index cdbb942e6d6..c8b53f8b8a2 100644 --- a/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java +++ b/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java @@ -185,13 +185,10 @@ public class ClusterTestExtensionsTest { @ClusterTest public void testDefaults(ClusterInstance clusterInstance) { - Assertions.assertEquals(MetadataVersion.IBP_3_8_IV0, clusterInstance.config().metadataVersion()); + Assertions.assertEquals(MetadataVersion.IBP_4_0_IVO, clusterInstance.config().metadataVersion()); } @ClusterTests({ - @ClusterTest(types = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}, serverProperties = { - @ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"), - }), @ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, serverProperties = { @ClusterConfigProperty(key = GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer"), }), @@ -199,10 +196,6 @@ public class ClusterTestExtensionsTest { @ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"), @ClusterConfigProperty(key = GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer"), }), - @ClusterTest(types = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}, serverProperties = { - @ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"), - @ClusterConfigProperty(key = GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic"), - }), @ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, serverProperties = { @ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "false"), @ClusterConfigProperty(key = GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer"), @@ -217,12 +210,19 @@ public class ClusterTestExtensionsTest { } @ClusterTests({ + @ClusterTest(types = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}, serverProperties = { + @ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"), + }), @ClusterTest(types = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}, serverProperties = { @ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "false"), }), @ClusterTest(types = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}, serverProperties = { @ClusterConfigProperty(key = GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic"), }), + @ClusterTest(types = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}, serverProperties = { + @ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"), + @ClusterConfigProperty(key = GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic"), + }), @ClusterTest(types = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}, serverProperties = { @ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "false"), @ClusterConfigProperty(key = GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic"), diff --git a/core/src/test/java/kafka/test/annotation/ClusterFeature.java b/core/src/test/java/kafka/test/annotation/ClusterFeature.java new file mode 100644 index 00000000000..ab72f132881 --- /dev/null +++ b/core/src/test/java/kafka/test/annotation/ClusterFeature.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.test.annotation; + +import org.apache.kafka.server.common.Features; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +@Documented +@Target({ElementType.ANNOTATION_TYPE}) +@Retention(RetentionPolicy.RUNTIME) +public @interface ClusterFeature { + Features feature(); + short version(); +} diff --git a/core/src/test/java/kafka/test/annotation/ClusterTest.java b/core/src/test/java/kafka/test/annotation/ClusterTest.java index 9364ce690ea..bd95249b4f4 100644 --- a/core/src/test/java/kafka/test/annotation/ClusterTest.java +++ b/core/src/test/java/kafka/test/annotation/ClusterTest.java @@ -40,8 +40,9 @@ public @interface ClusterTest { AutoStart autoStart() default AutoStart.DEFAULT; SecurityProtocol securityProtocol() default SecurityProtocol.PLAINTEXT; String listener() default ""; - MetadataVersion metadataVersion() default MetadataVersion.IBP_3_8_IV0; + MetadataVersion metadataVersion() default MetadataVersion.IBP_4_0_IVO; ClusterConfigProperty[] serverProperties() default {}; // users can add tags that they want to display in test String[] tags() default {}; + ClusterFeature[] features() default {}; } diff --git a/core/src/test/java/kafka/test/junit/ClusterTestExtensions.java b/core/src/test/java/kafka/test/junit/ClusterTestExtensions.java index 2290a0a99eb..3e6b5a7d660 100644 --- a/core/src/test/java/kafka/test/junit/ClusterTestExtensions.java +++ b/core/src/test/java/kafka/test/junit/ClusterTestExtensions.java @@ -18,6 +18,7 @@ package kafka.test.junit; import kafka.test.ClusterConfig; +import kafka.test.annotation.ClusterFeature; import kafka.test.annotation.ClusterTest; import kafka.test.annotation.ClusterTestDefaults; import kafka.test.annotation.ClusterTests; @@ -25,6 +26,7 @@ import kafka.test.annotation.ClusterTemplate; import kafka.test.annotation.ClusterConfigProperty; import kafka.test.annotation.Type; import kafka.test.annotation.AutoStart; +import org.apache.kafka.server.common.Features; import org.junit.jupiter.api.extension.ExtensionContext; import org.junit.jupiter.api.extension.TestTemplateInvocationContext; import org.junit.jupiter.api.extension.TestTemplateInvocationContextProvider; @@ -172,6 +174,10 @@ public class ClusterTestExtensions implements TestTemplateInvocationContextProvi .filter(e -> e.id() != -1) .collect(Collectors.groupingBy(ClusterConfigProperty::id, Collectors.mapping(Function.identity(), Collectors.toMap(ClusterConfigProperty::key, ClusterConfigProperty::value, (a, b) -> b)))); + + Map features = Arrays.stream(annot.features()) + .collect(Collectors.toMap(ClusterFeature::feature, ClusterFeature::version)); + ClusterConfig config = ClusterConfig.builder() .setTypes(new HashSet<>(Arrays.asList(types))) .setBrokers(annot.brokers() == 0 ? defaults.brokers() : annot.brokers()) @@ -184,6 +190,7 @@ public class ClusterTestExtensions implements TestTemplateInvocationContextProvi .setSecurityProtocol(annot.securityProtocol()) .setMetadataVersion(annot.metadataVersion()) .setTags(Arrays.asList(annot.tags())) + .setFeatures(features) .build(); return Arrays.stream(types).map(type -> type.invocationContexts(context.getRequiredTestMethod().getName(), config)) diff --git a/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java b/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java index 9857d4c92cd..3a7bac5aad5 100644 --- a/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java +++ b/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java @@ -28,15 +28,20 @@ import kafka.testkit.KafkaClusterTestKit; import kafka.testkit.TestKitNodes; import kafka.zk.EmbeddedZookeeper; import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.common.metadata.FeatureLevelRecord; import org.apache.kafka.common.network.ListenerName; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.metadata.BrokerState; +import org.apache.kafka.metadata.bootstrap.BootstrapMetadata; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.server.common.MetadataVersion; import org.junit.jupiter.api.extension.AfterTestExecutionCallback; import org.junit.jupiter.api.extension.BeforeTestExecutionCallback; import org.junit.jupiter.api.extension.Extension; import org.junit.jupiter.api.extension.TestTemplateInvocationContext; import scala.compat.java8.OptionConverters; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -237,8 +242,21 @@ public class RaftClusterInvocationContext implements TestTemplateInvocationConte public void format() throws Exception { if (formated.compareAndSet(false, true)) { + List records = new ArrayList<>(); + records.add( + new ApiMessageAndVersion(new FeatureLevelRecord(). + setName(MetadataVersion.FEATURE_NAME). + setFeatureLevel(clusterConfig.metadataVersion().featureLevel()), (short) 0)); + + clusterConfig.features().forEach((feature, version) -> { + records.add( + new ApiMessageAndVersion(new FeatureLevelRecord(). + setName(feature.featureName()). + setFeatureLevel(version), (short) 0)); + }); + TestKitNodes nodes = new TestKitNodes.Builder() - .setBootstrapMetadataVersion(clusterConfig.metadataVersion()) + .setBootstrapMetadata(BootstrapMetadata.fromRecords(records, "testkit")) .setCombined(isCombined) .setNumBrokerNodes(clusterConfig.numBrokers()) .setNumDisksPerBroker(clusterConfig.numDisksPerBroker()) diff --git a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala index e1f549c4e53..192dfa392d9 100644 --- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala +++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala @@ -62,7 +62,6 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness { } override def generateConfigs: Seq[KafkaConfig] = { - val cfgs = TestUtils.createBrokerConfigs(brokerCount, zkConnectOrNull, interBrokerSecurityProtocol = Some(securityProtocol), trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties, logDirCount = logDirCount) configureListeners(cfgs) @@ -72,6 +71,7 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness { } if (isNewGroupCoordinatorEnabled()) { cfgs.foreach(_.setProperty(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, "true")) + cfgs.foreach(_.setProperty(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, "classic,consumer")) } if(isKRaftTest()) { diff --git a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala index fbb59dd90f1..9f787a1b168 100755 --- a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala +++ b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala @@ -37,7 +37,7 @@ import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble.VerificationF import org.apache.kafka.metadata.properties.{MetaProperties, MetaPropertiesEnsemble, MetaPropertiesVersion} import org.apache.kafka.network.SocketServerConfigs import org.apache.kafka.raft.QuorumConfig -import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion} +import org.apache.kafka.server.common.{ApiMessageAndVersion, Features, MetadataVersion} import org.apache.kafka.server.config.{KRaftConfigs, ServerConfigs} import org.apache.kafka.server.fault.{FaultHandler, MockFaultHandler} import org.apache.zookeeper.client.ZKClientConfig @@ -342,6 +342,15 @@ abstract class QuorumTestHarness extends Logging { setName(MetadataVersion.FEATURE_NAME). setFeatureLevel(metadataVersion.featureLevel()), 0.toShort)) + if (isNewGroupCoordinatorEnabled()) { + metadataRecords.add(new ApiMessageAndVersion( + new FeatureLevelRecord() + .setName(Features.GROUP_VERSION.featureName) + .setFeatureLevel(Features.GROUP_VERSION.latestTesting), + 0.toShort + )) + } + optionalMetadataRecords.foreach { metadataArguments => for (record <- metadataArguments) metadataRecords.add(record) } diff --git a/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala index 5fd32d8f4ba..c0b6d916ec2 100644 --- a/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala @@ -72,7 +72,8 @@ object ZkMigrationIntegrationTest { MetadataVersion.IBP_3_7_IV1, MetadataVersion.IBP_3_7_IV2, MetadataVersion.IBP_3_7_IV4, - MetadataVersion.IBP_3_8_IV0 + MetadataVersion.IBP_3_8_IV0, + MetadataVersion.IBP_4_0_IVO ).map { mv => val serverProperties = new util.HashMap[String, String]() serverProperties.put("inter.broker.listener.name", "EXTERNAL") diff --git a/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala b/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala index 30318c1ccd9..b79cf12aa4b 100644 --- a/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala @@ -26,7 +26,7 @@ import org.apache.kafka.common.protocol.ApiKeys import org.apache.kafka.common.record.RecordVersion import org.apache.kafka.common.requests.{ApiVersionsRequest, ApiVersionsResponse, RequestUtils} import org.apache.kafka.common.utils.Utils -import org.apache.kafka.server.common.MetadataVersion +import org.apache.kafka.server.common.{GroupVersion, MetadataVersion} import org.apache.kafka.test.TestUtils import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Tag @@ -69,9 +69,12 @@ abstract class AbstractApiVersionsRequestTest(cluster: ClusterInstance) { assertEquals(MetadataVersion.latestTesting().featureLevel(), apiVersionsResponse.data().finalizedFeatures().find(MetadataVersion.FEATURE_NAME).minVersionLevel()) assertEquals(MetadataVersion.latestTesting().featureLevel(), apiVersionsResponse.data().finalizedFeatures().find(MetadataVersion.FEATURE_NAME).maxVersionLevel()) - assertEquals(1, apiVersionsResponse.data().supportedFeatures().size()) + assertEquals(2, apiVersionsResponse.data().supportedFeatures().size()) assertEquals(MetadataVersion.MINIMUM_KRAFT_VERSION.featureLevel(), apiVersionsResponse.data().supportedFeatures().find(MetadataVersion.FEATURE_NAME).minVersion()) assertEquals(MetadataVersion.latestTesting().featureLevel(), apiVersionsResponse.data().supportedFeatures().find(MetadataVersion.FEATURE_NAME).maxVersion()) + + assertEquals(0, apiVersionsResponse.data().supportedFeatures().find(GroupVersion.FEATURE_NAME).minVersion()) + assertEquals(GroupVersion.GV_1.featureLevel(), apiVersionsResponse.data().supportedFeatures().find(GroupVersion.FEATURE_NAME).maxVersion()) } val expectedApis = if (!cluster.isKRaftTest) { ApiVersionsResponse.collectApis( diff --git a/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala b/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala index 116cef51460..a7415b5d50a 100644 --- a/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala @@ -47,7 +47,7 @@ object ApiVersionsRequestTest { List(ClusterConfig.defaultBuilder() .setTypes(java.util.Collections.singleton(Type.ZK)) .setServerProperties(serverProperties) - .setMetadataVersion(MetadataVersion.IBP_3_8_IV0) + .setMetadataVersion(MetadataVersion.IBP_4_0_IVO) .build()).asJava } @@ -83,7 +83,7 @@ object ApiVersionsRequestTest { class ApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVersionsRequestTest(cluster) { @ClusterTemplate("testApiVersionsRequestTemplate") - @ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT), metadataVersion = MetadataVersion.IBP_3_8_IV0, serverProperties = Array( + @ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT), metadataVersion = MetadataVersion.IBP_4_0_IVO, serverProperties = Array( new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "false"), new ClusterConfigProperty(key = "unstable.feature.versions.enable", value = "true") )) diff --git a/core/src/test/scala/unit/kafka/server/BrokerFeaturesTest.scala b/core/src/test/scala/unit/kafka/server/BrokerFeaturesTest.scala index 78432b71105..9c43df2b2d2 100644 --- a/core/src/test/scala/unit/kafka/server/BrokerFeaturesTest.scala +++ b/core/src/test/scala/unit/kafka/server/BrokerFeaturesTest.scala @@ -18,7 +18,7 @@ package kafka.server import org.apache.kafka.common.feature.{Features, SupportedVersionRange} -import org.apache.kafka.server.common.MetadataVersion +import org.apache.kafka.server.common.{Features => ServerFeatures, MetadataVersion} import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue} import org.junit.jupiter.api.Test @@ -95,6 +95,7 @@ class BrokerFeaturesTest { val expectedFeatures = Map[String, Short]( MetadataVersion.FEATURE_NAME -> MetadataVersion.latestTesting().featureLevel(), + ServerFeatures.GROUP_VERSION.featureName() -> ServerFeatures.GROUP_VERSION.latestTesting(), "test_feature_1" -> 4, "test_feature_2" -> 3, "test_feature_3" -> 7) diff --git a/core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestsTest.scala b/core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestsTest.scala index b2e4e0f85f1..b1f8b8405e7 100644 --- a/core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestsTest.scala +++ b/core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestsTest.scala @@ -18,7 +18,7 @@ package kafka.server import kafka.server.GroupCoordinatorBaseRequestTest import kafka.test.ClusterInstance -import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, Type} +import kafka.test.annotation.{ClusterConfigProperty, ClusterFeature, ClusterTest, ClusterTestDefaults, Type} import kafka.test.junit.ClusterTestExtensions import kafka.utils.TestUtils import org.apache.kafka.common.ConsumerGroupState @@ -26,6 +26,7 @@ import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData.{Assign import org.apache.kafka.common.message.{ConsumerGroupDescribeRequestData, ConsumerGroupDescribeResponseData, ConsumerGroupHeartbeatResponseData} import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.requests.{ConsumerGroupDescribeRequest, ConsumerGroupDescribeResponse} +import org.apache.kafka.server.common.Features import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.extension.ExtendWith import org.junit.jupiter.api.{Tag, Timeout} @@ -60,11 +61,46 @@ class ConsumerGroupDescribeRequestsTest(cluster: ClusterInstance) extends GroupC assertEquals(expectedResponse, consumerGroupDescribeResponse.data) } - @ClusterTest(types = Array(Type.KRAFT), serverProperties = Array( - new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer"), - new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), - new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") - )) + @ClusterTest( + types = Array(Type.KRAFT), + serverProperties = Array( + new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer"), + new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), + new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") + ) + ) + def testConsumerGroupDescribeIsInaccessibleWhenDisabledByGroupVersion(): Unit = { + val consumerGroupDescribeRequest = new ConsumerGroupDescribeRequest.Builder( + new ConsumerGroupDescribeRequestData().setGroupIds(List("grp-1", "grp-2").asJava) + ).build(ApiKeys.CONSUMER_GROUP_DESCRIBE.latestVersion(isUnstableApiEnabled)) + + val consumerGroupDescribeResponse = connectAndReceive[ConsumerGroupDescribeResponse](consumerGroupDescribeRequest) + val expectedResponse = new ConsumerGroupDescribeResponseData() + expectedResponse.groups().add( + new ConsumerGroupDescribeResponseData.DescribedGroup() + .setGroupId("grp-1") + .setErrorCode(Errors.UNSUPPORTED_VERSION.code) + ) + expectedResponse.groups.add( + new ConsumerGroupDescribeResponseData.DescribedGroup() + .setGroupId("grp-2") + .setErrorCode(Errors.UNSUPPORTED_VERSION.code) + ) + + assertEquals(expectedResponse, consumerGroupDescribeResponse.data) + } + + @ClusterTest( + types = Array(Type.KRAFT), + serverProperties = Array( + new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer"), + new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), + new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") + ), + features = Array( + new ClusterFeature(feature = Features.GROUP_VERSION, version = 1) + ) + ) def testConsumerGroupDescribeWithNewGroupCoordinator(): Unit = { // Creates the __consumer_offsets topics because it won't be created automatically // in this test because it does not use FindCoordinator API. diff --git a/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala b/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala index 16e0f3d0ef0..1cf0cedaf23 100644 --- a/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala @@ -17,13 +17,14 @@ package kafka.server import kafka.test.ClusterInstance -import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, Type} +import kafka.test.annotation.{ClusterConfigProperty, ClusterFeature, ClusterTest, Type} import kafka.test.junit.ClusterTestExtensions import kafka.test.junit.RaftClusterInvocationContext.RaftClusterInstance import kafka.utils.TestUtils import org.apache.kafka.common.message.{ConsumerGroupHeartbeatRequestData, ConsumerGroupHeartbeatResponseData} import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.requests.{ConsumerGroupHeartbeatRequest, ConsumerGroupHeartbeatResponse} +import org.apache.kafka.server.common.Features import org.junit.jupiter.api.Assertions.{assertEquals, assertNotEquals, assertNotNull} import org.junit.jupiter.api.{Tag, Timeout} import org.junit.jupiter.api.extension.ExtendWith @@ -36,7 +37,7 @@ import scala.jdk.CollectionConverters._ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) { @ClusterTest() - def testConsumerGroupHeartbeatIsAccessibleWhenEnabled(): Unit = { + def testConsumerGroupHeartbeatIsInaccessibleWhenDisabledByStaticConfig(): Unit = { val consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder( new ConsumerGroupHeartbeatRequestData() ).build() @@ -46,11 +47,35 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) { assertEquals(expectedResponse, consumerGroupHeartbeatResponse.data) } - @ClusterTest(types = Array(Type.KRAFT), serverProperties = Array( - new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer"), - new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), - new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") - )) + @ClusterTest( + types = Array(Type.KRAFT), + serverProperties = Array( + new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer"), + new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), + new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") + ) + ) + def testConsumerGroupHeartbeatIsInaccessibleWhenDisabledByGroupVersion(): Unit = { + val consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder( + new ConsumerGroupHeartbeatRequestData() + ).build() + + val consumerGroupHeartbeatResponse = connectAndReceive(consumerGroupHeartbeatRequest) + val expectedResponse = new ConsumerGroupHeartbeatResponseData().setErrorCode(Errors.UNSUPPORTED_VERSION.code) + assertEquals(expectedResponse, consumerGroupHeartbeatResponse.data) + } + + @ClusterTest( + types = Array(Type.KRAFT), + serverProperties = Array( + new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer"), + new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), + new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") + ), + features = Array( + new ClusterFeature(feature = Features.GROUP_VERSION, version = 1) + ) + ) def testConsumerGroupHeartbeatIsAccessibleWhenNewGroupCoordinatorIsEnabled(): Unit = { val raftCluster = cluster.asInstanceOf[RaftClusterInstance] val admin = cluster.createAdminClient() @@ -134,11 +159,17 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) { assertEquals(-1, consumerGroupHeartbeatResponse.data.memberEpoch) } - @ClusterTest(types = Array(Type.KRAFT), serverProperties = Array( - new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer"), - new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), - new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") - )) + @ClusterTest( + types = Array(Type.KRAFT), + serverProperties = Array( + new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer"), + new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), + new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") + ), + features = Array( + new ClusterFeature(feature = Features.GROUP_VERSION, version = 1) + ) + ) def testRejoiningStaticMemberGetsAssignmentsBackWhenNewGroupCoordinatorIsEnabled(): Unit = { val raftCluster = cluster.asInstanceOf[RaftClusterInstance] val admin = cluster.createAdminClient() @@ -248,13 +279,19 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) { assertNotEquals(oldMemberId, consumerGroupHeartbeatResponse.data.memberId) } - @ClusterTest(types = Array(Type.KRAFT), serverProperties = Array( - new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer"), - new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), - new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1"), - new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value = "5000"), - new ClusterConfigProperty(key = "group.consumer.min.session.timeout.ms", value = "5000") - )) + @ClusterTest( + types = Array(Type.KRAFT), + serverProperties = Array( + new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer"), + new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), + new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1"), + new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value = "5000"), + new ClusterConfigProperty(key = "group.consumer.min.session.timeout.ms", value = "5000") + ), + features = Array( + new ClusterFeature(feature = Features.GROUP_VERSION, version = 1) + ) + ) def testStaticMemberRemovedAfterSessionTimeoutExpiryWhenNewGroupCoordinatorIsEnabled(): Unit = { val raftCluster = cluster.asInstanceOf[RaftClusterInstance] val admin = cluster.createAdminClient() diff --git a/core/src/test/scala/unit/kafka/server/ConsumerProtocolMigrationTest.scala b/core/src/test/scala/unit/kafka/server/ConsumerProtocolMigrationTest.scala index 969e3e9a0db..a93cb68be57 100644 --- a/core/src/test/scala/unit/kafka/server/ConsumerProtocolMigrationTest.scala +++ b/core/src/test/scala/unit/kafka/server/ConsumerProtocolMigrationTest.scala @@ -17,13 +17,14 @@ package kafka.server import kafka.test.ClusterInstance -import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, Type} +import kafka.test.annotation.{ClusterConfigProperty, ClusterFeature, ClusterTest, ClusterTestDefaults, Type} import kafka.test.junit.ClusterTestExtensions import org.apache.kafka.common.message.ListGroupsResponseData import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.coordinator.group.Group import org.apache.kafka.coordinator.group.classic.ClassicGroupState import org.apache.kafka.coordinator.group.consumer.ConsumerGroup.ConsumerGroupState +import org.apache.kafka.server.common.Features import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Tag import org.junit.jupiter.api.Timeout @@ -34,11 +35,16 @@ import org.junit.jupiter.api.extension.ExtendWith @ClusterTestDefaults(types = Array(Type.KRAFT)) @Tag("integration") class ConsumerProtocolMigrationTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) { - @ClusterTest(serverProperties = Array( - new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer"), - new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), - new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") - )) + @ClusterTest( + serverProperties = Array( + new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer"), + new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), + new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") + ), + features = Array( + new ClusterFeature(feature = Features.GROUP_VERSION, version = 1) + ) + ) def testUpgradeFromEmptyClassicToConsumerGroup(): Unit = { // Creates the __consumer_offsets topics because it won't be created automatically // in this test because it does not use FindCoordinator API. @@ -103,11 +109,16 @@ class ConsumerProtocolMigrationTest(cluster: ClusterInstance) extends GroupCoord ) } - @ClusterTest(serverProperties = Array( - new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer"), - new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), - new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") - )) + @ClusterTest( + serverProperties = Array( + new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer"), + new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), + new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") + ), + features = Array( + new ClusterFeature(feature = Features.GROUP_VERSION, version = 1) + ) + ) def testDowngradeFromEmptyConsumerToClassicGroup(): Unit = { // Creates the __consumer_offsets topics because it won't be created automatically // in this test because it does not use FindCoordinator API. @@ -165,11 +176,16 @@ class ConsumerProtocolMigrationTest(cluster: ClusterInstance) extends GroupCoord ) } - @ClusterTest(serverProperties = Array( - new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer"), - new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), - new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") - )) + @ClusterTest( + serverProperties = Array( + new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer"), + new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), + new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") + ), + features = Array( + new ClusterFeature(feature = Features.GROUP_VERSION, version = 1) + ) + ) def testUpgradeFromSimpleGroupToConsumerGroup(): Unit = { // Creates the __consumer_offsets topics because it won't be created automatically // in this test because it does not use FindCoordinator API. diff --git a/core/src/test/scala/unit/kafka/server/DeleteGroupsRequestTest.scala b/core/src/test/scala/unit/kafka/server/DeleteGroupsRequestTest.scala index 10b94c882c3..49bbee71a4f 100644 --- a/core/src/test/scala/unit/kafka/server/DeleteGroupsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/DeleteGroupsRequestTest.scala @@ -17,11 +17,12 @@ package kafka.server import kafka.test.ClusterInstance -import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, Type} +import kafka.test.annotation.{ClusterConfigProperty, ClusterFeature, ClusterTest, ClusterTestDefaults, Type} import kafka.test.junit.ClusterTestExtensions import org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroup import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.coordinator.group.classic.ClassicGroupState +import org.apache.kafka.server.common.Features import org.junit.jupiter.api.Assertions.{assertEquals, fail} import org.junit.jupiter.api.{Tag, Timeout} import org.junit.jupiter.api.extension.ExtendWith @@ -31,31 +32,44 @@ import org.junit.jupiter.api.extension.ExtendWith @ClusterTestDefaults(types = Array(Type.KRAFT)) @Tag("integration") class DeleteGroupsRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) { - @ClusterTest(serverProperties = Array( - new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer"), - new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms", value = "600000"), - new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value = "600000"), - new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), - new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") - )) + @ClusterTest( + serverProperties = Array( + new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer"), + new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms", value = "600000"), + new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value = "600000"), + new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), + new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") + ), + features = Array( + new ClusterFeature(feature = Features.GROUP_VERSION, version = 1) + ) + ) def testDeleteGroupsWithNewConsumerGroupProtocolAndNewGroupCoordinator(): Unit = { testDeleteGroups(true) } - @ClusterTest(serverProperties = Array( - new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer"), - new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), - new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") - )) + @ClusterTest( + serverProperties = Array( + new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer"), + new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), + new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") + ), + features = Array( + new ClusterFeature(feature = Features.GROUP_VERSION, version = 1) + ) + ) def testDeleteGroupsWithOldConsumerGroupProtocolAndNewGroupCoordinator(): Unit = { testDeleteGroups(false) } - @ClusterTest(types = Array(Type.ZK, Type.KRAFT, Type.CO_KRAFT), serverProperties = Array( - new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic"), - new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), - new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") - )) + @ClusterTest( + types = Array(Type.ZK, Type.KRAFT, Type.CO_KRAFT), + serverProperties = Array( + new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic"), + new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), + new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") + ) + ) def testDeleteGroupsWithOldConsumerGroupProtocolAndOldGroupCoordinator(): Unit = { testDeleteGroups(false) } diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index 52b55ffc33f..bd0ded29deb 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -79,7 +79,7 @@ import org.apache.kafka.raft.QuorumConfig import org.apache.kafka.server.ClientMetricsManager import org.apache.kafka.server.authorizer.{Action, AuthorizationResult, Authorizer} import org.apache.kafka.server.common.MetadataVersion.{IBP_0_10_2_IV0, IBP_2_2_IV1} -import org.apache.kafka.server.common.{FinalizedFeatures, MetadataVersion} +import org.apache.kafka.server.common.{FinalizedFeatures, GroupVersion, MetadataVersion} import org.apache.kafka.server.config._ import org.apache.kafka.server.metrics.ClientMetricsTestUtils import org.apache.kafka.server.util.{FutureUtils, MockTime} @@ -7020,6 +7020,16 @@ class KafkaApisTest extends Logging { @Test def testConsumerGroupHeartbeatRequest(): Unit = { + metadataCache = mock(classOf[KRaftMetadataCache]) + when(metadataCache.features()).thenReturn { + new FinalizedFeatures( + MetadataVersion.latestTesting(), + Collections.singletonMap(GroupVersion.FEATURE_NAME, GroupVersion.GV_1.featureLevel()), + 0, + true + ) + } + val consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequestData().setGroupId("group") val requestChannelRequest = buildRequest(new ConsumerGroupHeartbeatRequest.Builder(consumerGroupHeartbeatRequest, true).build()) @@ -7029,9 +7039,10 @@ class KafkaApisTest extends Logging { requestChannelRequest.context, consumerGroupHeartbeatRequest )).thenReturn(future) - kafkaApis = createKafkaApis(overrideProperties = Map( - GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true" - )) + kafkaApis = createKafkaApis( + overrideProperties = Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> "classic,consumer"), + raftSupport = true + ) kafkaApis.handle(requestChannelRequest, RequestLocal.NoCaching) val consumerGroupHeartbeatResponse = new ConsumerGroupHeartbeatResponseData() @@ -7044,6 +7055,16 @@ class KafkaApisTest extends Logging { @Test def testConsumerGroupHeartbeatRequestFutureFailed(): Unit = { + metadataCache = mock(classOf[KRaftMetadataCache]) + when(metadataCache.features()).thenReturn { + new FinalizedFeatures( + MetadataVersion.latestTesting(), + Collections.singletonMap(GroupVersion.FEATURE_NAME, GroupVersion.GV_1.featureLevel()), + 0, + true + ) + } + val consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequestData().setGroupId("group") val requestChannelRequest = buildRequest(new ConsumerGroupHeartbeatRequest.Builder(consumerGroupHeartbeatRequest, true).build()) @@ -7053,9 +7074,10 @@ class KafkaApisTest extends Logging { requestChannelRequest.context, consumerGroupHeartbeatRequest )).thenReturn(future) - kafkaApis = createKafkaApis(overrideProperties = Map( - GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true" - )) + kafkaApis = createKafkaApis( + overrideProperties = Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> "classic,consumer"), + raftSupport = true + ) kafkaApis.handle(requestChannelRequest, RequestLocal.NoCaching) future.completeExceptionally(Errors.FENCED_MEMBER_EPOCH.exception) @@ -7065,6 +7087,16 @@ class KafkaApisTest extends Logging { @Test def testConsumerGroupHeartbeatRequestAuthorizationFailed(): Unit = { + metadataCache = mock(classOf[KRaftMetadataCache]) + when(metadataCache.features()).thenReturn { + new FinalizedFeatures( + MetadataVersion.latestTesting(), + Collections.singletonMap(GroupVersion.FEATURE_NAME, GroupVersion.GV_1.featureLevel()), + 0, + true + ) + } + val consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequestData().setGroupId("group") val requestChannelRequest = buildRequest(new ConsumerGroupHeartbeatRequest.Builder(consumerGroupHeartbeatRequest, true).build()) @@ -7074,7 +7106,8 @@ class KafkaApisTest extends Logging { .thenReturn(Seq(AuthorizationResult.DENIED).asJava) kafkaApis = createKafkaApis( authorizer = Some(authorizer), - overrideProperties = Map(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true") + overrideProperties = Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> "classic,consumer"), + raftSupport = true ) kafkaApis.handle(requestChannelRequest, RequestLocal.NoCaching) @@ -7084,6 +7117,16 @@ class KafkaApisTest extends Logging { @Test def testConsumerGroupDescribe(): Unit = { + metadataCache = mock(classOf[KRaftMetadataCache]) + when(metadataCache.features()).thenReturn { + new FinalizedFeatures( + MetadataVersion.latestTesting(), + Collections.singletonMap(GroupVersion.FEATURE_NAME, GroupVersion.GV_1.featureLevel()), + 0, + true + ) + } + val groupIds = List("group-id-0", "group-id-1", "group-id-2").asJava val consumerGroupDescribeRequestData = new ConsumerGroupDescribeRequestData() consumerGroupDescribeRequestData.groupIds.addAll(groupIds) @@ -7095,7 +7138,8 @@ class KafkaApisTest extends Logging { any[util.List[String]] )).thenReturn(future) kafkaApis = createKafkaApis( - overrideProperties = Map(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true") + overrideProperties = Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> "classic,consumer"), + raftSupport = true ) kafkaApis.handle(requestChannelRequest, RequestLocal.NoCaching) @@ -7134,6 +7178,16 @@ class KafkaApisTest extends Logging { @Test def testConsumerGroupDescribeAuthorizationFailed(): Unit = { + metadataCache = mock(classOf[KRaftMetadataCache]) + when(metadataCache.features()).thenReturn { + new FinalizedFeatures( + MetadataVersion.latestTesting(), + Collections.singletonMap(GroupVersion.FEATURE_NAME, GroupVersion.GV_1.featureLevel()), + 0, + true + ) + } + val consumerGroupDescribeRequestData = new ConsumerGroupDescribeRequestData() consumerGroupDescribeRequestData.groupIds.add("group-id") val requestChannelRequest = buildRequest(new ConsumerGroupDescribeRequest.Builder(consumerGroupDescribeRequestData, true).build()) @@ -7150,7 +7204,8 @@ class KafkaApisTest extends Logging { future.complete(List().asJava) kafkaApis = createKafkaApis( authorizer = Some(authorizer), - overrideProperties = Map(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true") + overrideProperties = Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> "classic,consumer"), + raftSupport = true ) kafkaApis.handle(requestChannelRequest, RequestLocal.NoCaching) @@ -7160,6 +7215,16 @@ class KafkaApisTest extends Logging { @Test def testConsumerGroupDescribeFutureFailed(): Unit = { + metadataCache = mock(classOf[KRaftMetadataCache]) + when(metadataCache.features()).thenReturn { + new FinalizedFeatures( + MetadataVersion.latestTesting(), + Collections.singletonMap(GroupVersion.FEATURE_NAME, GroupVersion.GV_1.featureLevel()), + 0, + true + ) + } + val consumerGroupDescribeRequestData = new ConsumerGroupDescribeRequestData() consumerGroupDescribeRequestData.groupIds.add("group-id") val requestChannelRequest = buildRequest(new ConsumerGroupDescribeRequest.Builder(consumerGroupDescribeRequestData, true).build()) @@ -7169,9 +7234,10 @@ class KafkaApisTest extends Logging { any[RequestContext], any[util.List[String]] )).thenReturn(future) - kafkaApis = createKafkaApis(overrideProperties = Map( - GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true" - )) + kafkaApis = createKafkaApis( + overrideProperties = Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> "classic,consumer"), + raftSupport = true + ) kafkaApis.handle(requestChannelRequest, RequestLocal.NoCaching) future.completeExceptionally(Errors.FENCED_MEMBER_EPOCH.exception) diff --git a/core/src/test/scala/unit/kafka/server/ListGroupsRequestTest.scala b/core/src/test/scala/unit/kafka/server/ListGroupsRequestTest.scala index b99239497e8..3fcffa893af 100644 --- a/core/src/test/scala/unit/kafka/server/ListGroupsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/ListGroupsRequestTest.scala @@ -17,13 +17,14 @@ package kafka.server import kafka.test.ClusterInstance -import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, Type} +import kafka.test.annotation.{ClusterConfigProperty, ClusterFeature, ClusterTest, ClusterTestDefaults, Type} import kafka.test.junit.ClusterTestExtensions import org.apache.kafka.common.message.ListGroupsResponseData import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.coordinator.group.consumer.ConsumerGroup.ConsumerGroupState import org.apache.kafka.coordinator.group.classic.ClassicGroupState import org.apache.kafka.coordinator.group.Group +import org.apache.kafka.server.common.Features import org.junit.jupiter.api.Assertions.{assertEquals, fail} import org.junit.jupiter.api.{Tag, Timeout} import org.junit.jupiter.api.extension.ExtendWith @@ -33,13 +34,18 @@ import org.junit.jupiter.api.extension.ExtendWith @ClusterTestDefaults(types = Array(Type.KRAFT)) @Tag("integration") class ListGroupsRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) { - @ClusterTest(serverProperties = Array( - new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer"), - new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms", value = "600000"), - new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value = "600000"), - new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), - new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") - )) + @ClusterTest( + serverProperties = Array( + new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer"), + new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms", value = "600000"), + new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value = "600000"), + new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), + new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") + ), + features = Array( + new ClusterFeature(feature = Features.GROUP_VERSION, version = 1) + ) + ) def testListGroupsWithNewConsumerGroupProtocolAndNewGroupCoordinator(): Unit = { testListGroups(true) } diff --git a/core/src/test/scala/unit/kafka/server/OffsetCommitRequestTest.scala b/core/src/test/scala/unit/kafka/server/OffsetCommitRequestTest.scala index 7791c6c9420..696e6534bf0 100644 --- a/core/src/test/scala/unit/kafka/server/OffsetCommitRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/OffsetCommitRequestTest.scala @@ -17,9 +17,10 @@ package kafka.server import kafka.test.ClusterInstance -import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, Type} +import kafka.test.annotation.{ClusterConfigProperty, ClusterFeature, ClusterTest, ClusterTestDefaults, Type} import kafka.test.junit.ClusterTestExtensions import org.apache.kafka.common.protocol.{ApiKeys, Errors} +import org.apache.kafka.server.common.Features import org.junit.jupiter.api.Assertions.fail import org.junit.jupiter.api.{Tag, Timeout} import org.junit.jupiter.api.extension.ExtendWith @@ -30,22 +31,29 @@ import org.junit.jupiter.api.extension.ExtendWith @Tag("integration") class OffsetCommitRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) { - @ClusterTest(serverProperties = Array( - new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer"), - new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms", value = "600000"), - new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value = "600000"), - new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), - new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") - )) + @ClusterTest( + serverProperties = Array( + new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer"), + new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms", value = "600000"), + new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value = "600000"), + new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), + new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") + ), + features = Array( + new ClusterFeature(feature = Features.GROUP_VERSION, version = 1) + ) + ) def testOffsetCommitWithNewConsumerGroupProtocolAndNewGroupCoordinator(): Unit = { testOffsetCommit(true) } - @ClusterTest(serverProperties = Array( - new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer"), - new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), - new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") - )) + @ClusterTest( + serverProperties = Array( + new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer"), + new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), + new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") + ) + ) def testOffsetCommitWithOldConsumerGroupProtocolAndNewGroupCoordinator(): Unit = { testOffsetCommit(false) } diff --git a/core/src/test/scala/unit/kafka/server/OffsetDeleteRequestTest.scala b/core/src/test/scala/unit/kafka/server/OffsetDeleteRequestTest.scala index 8fabac4bda7..22c1e28548b 100644 --- a/core/src/test/scala/unit/kafka/server/OffsetDeleteRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/OffsetDeleteRequestTest.scala @@ -17,9 +17,10 @@ package kafka.server import kafka.test.ClusterInstance -import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, Type} +import kafka.test.annotation.{ClusterConfigProperty, ClusterFeature, ClusterTest, ClusterTestDefaults, Type} import kafka.test.junit.ClusterTestExtensions import org.apache.kafka.common.protocol.{ApiKeys, Errors} +import org.apache.kafka.server.common.Features import org.junit.jupiter.api.Assertions.fail import org.junit.jupiter.api.{Tag, Timeout} import org.junit.jupiter.api.extension.ExtendWith @@ -29,13 +30,18 @@ import org.junit.jupiter.api.extension.ExtendWith @ClusterTestDefaults(types = Array(Type.KRAFT)) @Tag("integration") class OffsetDeleteRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) { - @ClusterTest(serverProperties = Array( - new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer"), - new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms", value = "600000"), - new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value = "600000"), - new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), - new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") - )) + @ClusterTest( + serverProperties = Array( + new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer"), + new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms", value = "600000"), + new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value = "600000"), + new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), + new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") + ), + features = Array( + new ClusterFeature(feature = Features.GROUP_VERSION, version = 1) + ) + ) def testOffsetDeleteWithNewConsumerGroupProtocolAndNewGroupCoordinator(): Unit = { testOffsetDelete(true) } diff --git a/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala b/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala index 5f51d6cdb15..2e640bfea6f 100644 --- a/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala @@ -18,18 +18,16 @@ package kafka.server import kafka.test.ClusterInstance -import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, Type} +import kafka.test.annotation.{ClusterConfigProperty, ClusterFeature, ClusterTest, ClusterTestDefaults, Type} import kafka.test.junit.ClusterTestExtensions - import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.message.OffsetFetchResponseData import org.apache.kafka.common.protocol.{ApiKeys, Errors} - +import org.apache.kafka.server.common.Features import org.junit.jupiter.api.Assertions.{assertEquals, fail} import org.junit.jupiter.api.{Tag, Timeout} import org.junit.jupiter.api.extension.ExtendWith - import scala.jdk.CollectionConverters._ @Timeout(120) @@ -38,24 +36,31 @@ import scala.jdk.CollectionConverters._ @Tag("integration") class OffsetFetchRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) { - @ClusterTest(serverProperties = Array( - new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer"), - new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms", value = "600000"), - new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value = "600000"), - new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), - new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") - )) + @ClusterTest( + serverProperties = Array( + new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer"), + new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms", value = "600000"), + new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value = "600000"), + new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), + new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") + ), + features = Array( + new ClusterFeature(feature = Features.GROUP_VERSION, version = 1) + ) + ) def testSingleGroupOffsetFetchWithNewConsumerGroupProtocolAndNewGroupCoordinator(): Unit = { testSingleGroupOffsetFetch(useNewProtocol = true, requireStable = true) } - @ClusterTest(serverProperties = Array( - new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer"), - new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms", value = "600000"), - new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value = "600000"), - new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), - new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") - )) + @ClusterTest( + serverProperties = Array( + new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer"), + new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms", value = "600000"), + new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value = "600000"), + new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), + new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") + ) + ) def testSingleGroupOffsetFetchWithOldConsumerGroupProtocolAndNewGroupCoordinator(): Unit = { testSingleGroupOffsetFetch(useNewProtocol = false, requireStable = false) } @@ -71,13 +76,18 @@ class OffsetFetchRequestTest(cluster: ClusterInstance) extends GroupCoordinatorB testSingleGroupOffsetFetch(useNewProtocol = false, requireStable = true) } - @ClusterTest(serverProperties = Array( - new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer"), - new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms", value = "600000"), - new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value = "600000"), - new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), - new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") - )) + @ClusterTest( + serverProperties = Array( + new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer"), + new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms", value = "600000"), + new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value = "600000"), + new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), + new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") + ), + features = Array( + new ClusterFeature(feature = Features.GROUP_VERSION, version = 1) + ) + ) def testSingleGroupAllOffsetFetchWithNewConsumerGroupProtocolAndNewGroupCoordinator(): Unit = { testSingleGroupAllOffsetFetch(useNewProtocol = true, requireStable = true) } @@ -104,13 +114,18 @@ class OffsetFetchRequestTest(cluster: ClusterInstance) extends GroupCoordinatorB testSingleGroupAllOffsetFetch(useNewProtocol = false, requireStable = true) } - @ClusterTest(serverProperties = Array( - new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer"), - new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms", value = "600000"), - new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value = "600000"), - new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), - new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") - )) + @ClusterTest( + serverProperties = Array( + new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer"), + new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms", value = "600000"), + new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value = "600000"), + new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), + new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") + ), + features = Array( + new ClusterFeature(feature = Features.GROUP_VERSION, version = 1) + ) + ) def testMultiGroupsOffsetFetchWithNewConsumerGroupProtocolAndNewGroupCoordinator(): Unit = { testMultipleGroupsOffsetFetch(useNewProtocol = true, requireStable = true) } diff --git a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala index 8b4bb93a66c..03623bab41f 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala @@ -299,7 +299,7 @@ class ReplicationQuotasTest extends QuorumTestHarness { features.add(new BrokerRegistrationRequestData.Feature() .setName(MetadataVersion.FEATURE_NAME) .setMinSupportedVersion(MetadataVersion.IBP_3_0_IV1.featureLevel()) - .setMaxSupportedVersion(MetadataVersion.IBP_3_8_IV0.featureLevel())) + .setMaxSupportedVersion(MetadataVersion.IBP_4_0_IVO.featureLevel())) controllerServer.controller.registerBroker( ControllerRequestContextUtil.ANONYMOUS_CONTEXT, new BrokerRegistrationRequestData() diff --git a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala index fb9361fc3ab..54a436f231f 100644 --- a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala +++ b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala @@ -376,10 +376,18 @@ Found problem: true ) - val featureLevel = Features.TEST_VERSION.defaultValue(metadataVersion) - if (featureLevel > 0) { - assertEquals(List(generateRecord(TestFeatureVersion.FEATURE_NAME, featureLevel)), records) + val expectedRecords = new ArrayBuffer[ApiMessageAndVersion]() + + def maybeAddRecordFor(features: Features): Unit = { + val featureLevel = features.defaultValue(metadataVersion) + if (featureLevel > 0) { + expectedRecords += generateRecord(features.featureName, featureLevel) + } } + + Features.FEATURES.foreach(maybeAddRecordFor) + + assertEquals(expectedRecords, records) } @Test def testVersionDefaultNoArgs(): Unit = { diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java index 1b18c9648de..4498778592c 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java @@ -196,7 +196,7 @@ public class QuorumControllerTest { ) { controlEnv.activeController().registerBroker(ANONYMOUS_CONTEXT, new BrokerRegistrationRequestData(). - setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_8_IV0)). + setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_4_0_IVO)). setBrokerId(0). setLogDirs(Collections.singletonList(Uuid.fromString("iiaQjkRPQcuMULNII0MUeA"))). setClusterId(logEnv.clusterId())).get(); @@ -240,7 +240,7 @@ public class QuorumControllerTest { ) { controlEnv.activeController().registerBroker(ANONYMOUS_CONTEXT, new BrokerRegistrationRequestData(). - setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_8_IV0)). + setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_4_0_IVO)). setBrokerId(0). setLogDirs(Collections.singletonList(Uuid.fromString("sTbzRAMnTpahIyIPNjiLhw"))). setClusterId(logEnv.clusterId())).get(); @@ -298,7 +298,7 @@ public class QuorumControllerTest { new BrokerRegistrationRequestData(). setBrokerId(brokerId). setClusterId(active.clusterId()). - setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_7_IV0)). + setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_4_0_IVO)). setIncarnationId(Uuid.randomUuid()). setListeners(listeners)); brokerEpochs.put(brokerId, reply.get().epoch()); @@ -717,7 +717,7 @@ public class QuorumControllerTest { setBrokerId(0). setClusterId(active.clusterId()). setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwBA")). - setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_8_IV0)). + setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_4_0_IVO)). setLogDirs(Collections.singletonList(Uuid.fromString("vBpaRsZVSaGsQT53wtYGtg"))). setListeners(listeners)); assertEquals(5L, reply.get().epoch()); diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumFeaturesTest.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumFeaturesTest.java index ecbefc69daf..bdda58d01b6 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/QuorumFeaturesTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumFeaturesTest.java @@ -74,7 +74,7 @@ public class QuorumFeaturesTest { for (Features feature : Features.PRODUCTION_FEATURES) { expectedFeatures.put(feature.featureName(), VersionRange.of( 0, - feature.defaultValue(MetadataVersion.LATEST_PRODUCTION) + feature.defaultValue(MetadataVersion.latestTesting()) )); } assertEquals(expectedFeatures, QuorumFeatures.defaultFeatureMap(true)); diff --git a/server-common/src/main/java/org/apache/kafka/server/common/Features.java b/server-common/src/main/java/org/apache/kafka/server/common/Features.java index a4f11b10ace..eda4b855605 100644 --- a/server-common/src/main/java/org/apache/kafka/server/common/Features.java +++ b/server-common/src/main/java/org/apache/kafka/server/common/Features.java @@ -39,7 +39,8 @@ public enum Features { * * See {@link TestFeatureVersion} as an example. See {@link FeatureVersion} when implementing a new feature. */ - TEST_VERSION("test.feature.version", TestFeatureVersion.values()); + TEST_VERSION("test.feature.version", TestFeatureVersion.values()), + GROUP_VERSION("group.version", GroupVersion.values()); public static final Features[] FEATURES; public static final List PRODUCTION_FEATURES; diff --git a/server-common/src/main/java/org/apache/kafka/server/common/GroupVersion.java b/server-common/src/main/java/org/apache/kafka/server/common/GroupVersion.java new file mode 100644 index 00000000000..002d7ef33f4 --- /dev/null +++ b/server-common/src/main/java/org/apache/kafka/server/common/GroupVersion.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.common; + +import java.util.Collections; +import java.util.Map; + +public enum GroupVersion implements FeatureVersion { + + // Version 1 enables the consumer rebalance protocol (KIP-848). + GV_1(1, MetadataVersion.IBP_4_0_IVO, Collections.emptyMap()); + + public static final String FEATURE_NAME = "group.version"; + + private final short featureLevel; + private final MetadataVersion bootstrapMetadataVersion; + private final Map dependencies; + + GroupVersion( + int featureLevel, + MetadataVersion bootstrapMetadataVersion, + Map dependencies + ) { + this.featureLevel = (short) featureLevel; + this.bootstrapMetadataVersion = bootstrapMetadataVersion; + this.dependencies = dependencies; + } + + @Override + public short featureLevel() { + return featureLevel; + } + + @Override + public String featureName() { + return FEATURE_NAME; + } + + @Override + public MetadataVersion bootstrapMetadataVersion() { + return bootstrapMetadataVersion; + } + + @Override + public Map dependencies() { + return dependencies; + } +} diff --git a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java index e6abca3e726..26b67321e4a 100644 --- a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java +++ b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java @@ -202,7 +202,10 @@ public enum MetadataVersion { IBP_3_7_IV4(19, "3.7", "IV4", false), // Add ELR related supports (KIP-966). - IBP_3_8_IV0(20, "3.8", "IV0", true); + IBP_3_8_IV0(20, "3.8", "IV0", true), + + // Introduce version 1 of the GroupVersion feature (KIP-848). + IBP_4_0_IVO(21, "4.0", "IV0", false); // NOTES when adding a new version: // Update the default version in @ClusterTest annotation to point to the latest version diff --git a/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java b/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java index bd9f2594b00..cfcdcf3afe6 100644 --- a/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java +++ b/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java @@ -47,6 +47,7 @@ class MetadataVersionTest { } @Test + @SuppressWarnings("checkstyle:JavaNCSS") public void testFromVersionString() { assertEquals(IBP_0_8_0, MetadataVersion.fromVersionString("0.8.0")); assertEquals(IBP_0_8_0, MetadataVersion.fromVersionString("0.8.0.0")); @@ -184,6 +185,8 @@ class MetadataVersionTest { assertEquals(IBP_3_7_IV4, MetadataVersion.fromVersionString("3.7-IV4")); assertEquals(IBP_3_8_IV0, MetadataVersion.fromVersionString("3.8-IV0")); + + assertEquals(IBP_4_0_IVO, MetadataVersion.fromVersionString("4.0-IV0")); } @Test @@ -243,6 +246,8 @@ class MetadataVersionTest { assertEquals("3.7", IBP_3_7_IV2.shortVersion()); assertEquals("3.7", IBP_3_7_IV3.shortVersion()); assertEquals("3.7", IBP_3_7_IV4.shortVersion()); + assertEquals("3.8", IBP_3_8_IV0.shortVersion()); + assertEquals("4.0", IBP_4_0_IVO.shortVersion()); } @Test @@ -292,6 +297,7 @@ class MetadataVersionTest { assertEquals("3.7-IV3", IBP_3_7_IV3.version()); assertEquals("3.7-IV4", IBP_3_7_IV4.version()); assertEquals("3.8-IV0", IBP_3_8_IV0.version()); + assertEquals("4.0-IV0", IBP_4_0_IVO.version()); } @Test diff --git a/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java index a89782797c1..b6d4cea0edb 100644 --- a/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java @@ -19,7 +19,10 @@ package org.apache.kafka.tools; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.stream.Collectors; + import kafka.test.ClusterInstance; import kafka.test.annotation.ClusterTest; import kafka.test.annotation.Type; @@ -58,9 +61,15 @@ public class FeatureCommandTest { String commandOutput = ToolsTestUtils.captureStandardOut(() -> assertEquals(0, FeatureCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(), "describe")) ); + + List features = Arrays.stream(commandOutput.split("\n")).sorted().collect(Collectors.toList()); + + assertEquals("Feature: group.version\tSupportedMinVersion: 0\t" + + "SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t", outputWithoutEpoch(features.get(0))); + // Change expected message to reflect latest MetadataVersion (SupportedMaxVersion increases when adding a new version) assertEquals("Feature: metadata.version\tSupportedMinVersion: 3.0-IV1\t" + - "SupportedMaxVersion: 3.8-IV0\tFinalizedVersionLevel: 3.3-IV1\t", outputWithoutEpoch(commandOutput)); + "SupportedMaxVersion: 4.0-IV0\tFinalizedVersionLevel: 3.3-IV1\t", outputWithoutEpoch(features.get(1))); } @ClusterTest(types = {Type.KRAFT}, metadataVersion = MetadataVersion.IBP_3_7_IV4) @@ -68,9 +77,15 @@ public class FeatureCommandTest { String commandOutput = ToolsTestUtils.captureStandardOut(() -> assertEquals(0, FeatureCommand.mainNoExit("--bootstrap-controller", cluster.bootstrapControllers(), "describe")) ); + + List features = Arrays.stream(commandOutput.split("\n")).sorted().collect(Collectors.toList()); + + assertEquals("Feature: group.version\tSupportedMinVersion: 0\t" + + "SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t", outputWithoutEpoch(features.get(0))); + // Change expected message to reflect latest MetadataVersion (SupportedMaxVersion increases when adding a new version) assertEquals("Feature: metadata.version\tSupportedMinVersion: 3.0-IV1\t" + - "SupportedMaxVersion: 3.8-IV0\tFinalizedVersionLevel: 3.7-IV4\t", outputWithoutEpoch(commandOutput)); + "SupportedMaxVersion: 4.0-IV0\tFinalizedVersionLevel: 3.7-IV4\t", outputWithoutEpoch(features.get(1))); } @ClusterTest(types = {Type.ZK}, metadataVersion = MetadataVersion.IBP_3_3_IV1) @@ -129,7 +144,7 @@ public class FeatureCommandTest { ); // Change expected message to reflect possible MetadataVersion range 1-N (N increases when adding a new version) assertEquals("Could not disable metadata.version. Invalid update version 0 for feature " + - "metadata.version. Local controller 3000 only supports versions 1-20", commandOutput); + "metadata.version. Local controller 3000 only supports versions 1-21", commandOutput); commandOutput = ToolsTestUtils.captureStandardOut(() -> assertEquals(1, FeatureCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(), diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTest.java index 3f29c159aed..4fbdeec0d94 100644 --- a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTest.java @@ -85,7 +85,10 @@ public class ConsumerGroupCommandTest extends kafka.integration.KafkaServerTestH 0, false ).foreach(props -> { - props.setProperty(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, isNewGroupCoordinatorEnabled() + ""); + if (isNewGroupCoordinatorEnabled()) { + props.setProperty(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, "true"); + props.setProperty(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, "classic,consumer"); + } cfgs.add(KafkaConfig.fromProps(props)); return null; }); diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java index dc132b6075c..5ea53a1c382 100644 --- a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java +++ b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java @@ -22,6 +22,8 @@ import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.common.Features; +import org.apache.kafka.server.common.GroupVersion; import java.time.Duration; import java.util.ArrayList; @@ -41,6 +43,7 @@ import java.util.stream.Stream; import static kafka.test.annotation.Type.CO_KRAFT; import static kafka.test.annotation.Type.KRAFT; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG; import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG; import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG; import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG; @@ -60,9 +63,11 @@ class ConsumerGroupCommandTestUtils { serverProperties.put(OFFSETS_TOPIC_PARTITIONS_CONFIG, "1"); serverProperties.put(OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, "1"); serverProperties.put(NEW_GROUP_COORDINATOR_ENABLE_CONFIG, "true"); + serverProperties.put(GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, "classic,consumer"); return Collections.singletonList(ClusterConfig.defaultBuilder() .setTypes(Stream.of(KRAFT, CO_KRAFT).collect(Collectors.toSet())) + .setFeatures(Collections.singletonMap(Features.GROUP_VERSION, GroupVersion.GV_1.featureLevel())) .setServerProperties(serverProperties) .setTags(Collections.singletonList("consumerGroupCoordinator")) .build());