KAFKA-16860; [1/2] Introduce group.version feature flag (#16120)

This patch introduces the `group.version` feature flag with one version:
1) Version 1 enables the new consumer group rebalance protocol (KIP-848).

Reviewers: Justine Olshan <jolshan@confluent.io>
This commit is contained in:
David Jacot 2024-05-31 21:48:55 +02:00 committed by GitHub
parent f8ad9ee892
commit ba61ff0cd9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
34 changed files with 574 additions and 177 deletions

View File

@ -67,10 +67,10 @@ import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.common.security.token.delegation.{DelegationToken, TokenInformation}
import org.apache.kafka.common.utils.{ProducerIdAndEpoch, Time}
import org.apache.kafka.common.{Node, TopicIdPartition, TopicPartition, Uuid}
import org.apache.kafka.coordinator.group.GroupCoordinator
import org.apache.kafka.coordinator.group.{Group, GroupCoordinator}
import org.apache.kafka.server.ClientMetricsManager
import org.apache.kafka.server.authorizer._
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.{GroupVersion, MetadataVersion}
import org.apache.kafka.server.common.MetadataVersion.{IBP_0_11_0_IV0, IBP_2_3_IV0}
import org.apache.kafka.server.record.BrokerCompressionType
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchIsolation, FetchParams, FetchPartitionData}
@ -3797,10 +3797,15 @@ class KafkaApis(val requestChannel: RequestChannel,
)
}
private def isConsumerGroupProtocolEnabled(): Boolean = {
val version = metadataCache.features().finalizedFeatures().getOrDefault(GroupVersion.FEATURE_NAME, 0.toShort)
config.groupCoordinatorRebalanceProtocols.contains(Group.GroupType.CONSUMER) && version >= GroupVersion.GV_1.featureLevel
}
def handleConsumerGroupHeartbeat(request: RequestChannel.Request): CompletableFuture[Unit] = {
val consumerGroupHeartbeatRequest = request.body[ConsumerGroupHeartbeatRequest]
if (!config.isNewGroupCoordinatorEnabled) {
if (!isConsumerGroupProtocolEnabled()) {
// The API is not supported by the "old" group coordinator (the default). If the
// new one is not enabled, we fail directly here.
requestHelper.sendMaybeThrottle(request, consumerGroupHeartbeatRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
@ -3825,7 +3830,7 @@ class KafkaApis(val requestChannel: RequestChannel,
def handleConsumerGroupDescribe(request: RequestChannel.Request): CompletableFuture[Unit] = {
val consumerGroupDescribeRequest = request.body[ConsumerGroupDescribeRequest]
if (!config.isNewGroupCoordinatorEnabled) {
if (!isConsumerGroupProtocolEnabled()) {
// The API is not supported by the "old" group coordinator (the default). If the
// new one is not enabled, we fail directly here.
requestHelper.sendMaybeThrottle(request, request.body[ConsumerGroupDescribeRequest].getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))

View File

@ -19,6 +19,7 @@ package kafka.test;
import kafka.test.annotation.Type;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.server.common.Features;
import org.apache.kafka.server.common.MetadataVersion;
import java.io.File;
@ -58,13 +59,15 @@ public class ClusterConfig {
private final Map<String, String> saslClientProperties;
private final List<String> tags;
private final Map<Integer, Map<String, String>> perServerProperties;
private final Map<Features, Short> features;
@SuppressWarnings("checkstyle:ParameterNumber")
private ClusterConfig(Set<Type> types, int brokers, int controllers, int disksPerBroker, boolean autoStart,
SecurityProtocol securityProtocol, String listenerName, File trustStoreFile,
MetadataVersion metadataVersion, Map<String, String> serverProperties, Map<String, String> producerProperties,
Map<String, String> consumerProperties, Map<String, String> adminClientProperties, Map<String, String> saslServerProperties,
Map<String, String> saslClientProperties, Map<Integer, Map<String, String>> perServerProperties, List<String> tags) {
Map<String, String> saslClientProperties, Map<Integer, Map<String, String>> perServerProperties, List<String> tags,
Map<Features, Short> features) {
// do fail fast. the following values are invalid for both zk and kraft modes.
if (brokers < 0) throw new IllegalArgumentException("Number of brokers must be greater or equal to zero.");
if (controllers < 0) throw new IllegalArgumentException("Number of controller must be greater or equal to zero.");
@ -87,6 +90,7 @@ public class ClusterConfig {
this.saslClientProperties = Objects.requireNonNull(saslClientProperties);
this.perServerProperties = Objects.requireNonNull(perServerProperties);
this.tags = Objects.requireNonNull(tags);
this.features = Objects.requireNonNull(features);
}
public Set<Type> clusterTypes() {
@ -157,6 +161,10 @@ public class ClusterConfig {
return tags;
}
public Map<Features, Short> features() {
return features;
}
public Set<String> displayTags() {
Set<String> displayTags = new LinkedHashSet<>(tags);
displayTags.add("MetadataVersion=" + metadataVersion);
@ -198,7 +206,8 @@ public class ClusterConfig {
.setSaslServerProperties(clusterConfig.saslServerProperties)
.setSaslClientProperties(clusterConfig.saslClientProperties)
.setPerServerProperties(clusterConfig.perServerProperties)
.setTags(clusterConfig.tags);
.setTags(clusterConfig.tags)
.setFeatures(clusterConfig.features);
}
public static class Builder {
@ -219,6 +228,7 @@ public class ClusterConfig {
private Map<String, String> saslClientProperties = Collections.emptyMap();
private Map<Integer, Map<String, String>> perServerProperties = Collections.emptyMap();
private List<String> tags = Collections.emptyList();
private Map<Features, Short> features = Collections.emptyMap();
private Builder() {}
@ -309,11 +319,16 @@ public class ClusterConfig {
return this;
}
public Builder setFeatures(Map<Features, Short> features) {
this.features = Collections.unmodifiableMap(features);
return this;
}
public ClusterConfig build() {
return new ClusterConfig(types, brokers, controllers, disksPerBroker, autoStart, securityProtocol, listenerName,
trustStoreFile, metadataVersion, serverProperties, producerProperties, consumerProperties,
adminClientProperties, saslServerProperties, saslClientProperties,
perServerProperties, tags);
perServerProperties, tags, features);
}
}
}

View File

@ -40,7 +40,6 @@ import java.util.stream.Collectors;
import static org.apache.kafka.clients.consumer.GroupProtocol.CLASSIC;
import static org.apache.kafka.clients.consumer.GroupProtocol.CONSUMER;
import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG;
import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG;
public interface ClusterInstance {
@ -159,9 +158,7 @@ public interface ClusterInstance {
Set<GroupProtocol> supportedGroupProtocols = new HashSet<>();
supportedGroupProtocols.add(CLASSIC);
// KafkaConfig#isNewGroupCoordinatorEnabled check both NEW_GROUP_COORDINATOR_ENABLE_CONFIG and GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG
if (serverProperties.getOrDefault(NEW_GROUP_COORDINATOR_ENABLE_CONFIG, "").equals("true") ||
serverProperties.getOrDefault(GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, "").contains("consumer")) {
if (serverProperties.getOrDefault(GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, "").contains("consumer")) {
supportedGroupProtocols.add(CONSUMER);
}

View File

@ -185,13 +185,10 @@ public class ClusterTestExtensionsTest {
@ClusterTest
public void testDefaults(ClusterInstance clusterInstance) {
Assertions.assertEquals(MetadataVersion.IBP_3_8_IV0, clusterInstance.config().metadataVersion());
Assertions.assertEquals(MetadataVersion.IBP_4_0_IVO, clusterInstance.config().metadataVersion());
}
@ClusterTests({
@ClusterTest(types = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}, serverProperties = {
@ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
}),
@ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, serverProperties = {
@ClusterConfigProperty(key = GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer"),
}),
@ -199,10 +196,6 @@ public class ClusterTestExtensionsTest {
@ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
@ClusterConfigProperty(key = GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer"),
}),
@ClusterTest(types = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}, serverProperties = {
@ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
@ClusterConfigProperty(key = GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic"),
}),
@ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, serverProperties = {
@ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "false"),
@ClusterConfigProperty(key = GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer"),
@ -217,12 +210,19 @@ public class ClusterTestExtensionsTest {
}
@ClusterTests({
@ClusterTest(types = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}, serverProperties = {
@ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
}),
@ClusterTest(types = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}, serverProperties = {
@ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "false"),
}),
@ClusterTest(types = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}, serverProperties = {
@ClusterConfigProperty(key = GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic"),
}),
@ClusterTest(types = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}, serverProperties = {
@ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
@ClusterConfigProperty(key = GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic"),
}),
@ClusterTest(types = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}, serverProperties = {
@ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "false"),
@ClusterConfigProperty(key = GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic"),

View File

@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.test.annotation;
import org.apache.kafka.server.common.Features;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Documented
@Target({ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface ClusterFeature {
Features feature();
short version();
}

View File

@ -40,8 +40,9 @@ public @interface ClusterTest {
AutoStart autoStart() default AutoStart.DEFAULT;
SecurityProtocol securityProtocol() default SecurityProtocol.PLAINTEXT;
String listener() default "";
MetadataVersion metadataVersion() default MetadataVersion.IBP_3_8_IV0;
MetadataVersion metadataVersion() default MetadataVersion.IBP_4_0_IVO;
ClusterConfigProperty[] serverProperties() default {};
// users can add tags that they want to display in test
String[] tags() default {};
ClusterFeature[] features() default {};
}

View File

@ -18,6 +18,7 @@
package kafka.test.junit;
import kafka.test.ClusterConfig;
import kafka.test.annotation.ClusterFeature;
import kafka.test.annotation.ClusterTest;
import kafka.test.annotation.ClusterTestDefaults;
import kafka.test.annotation.ClusterTests;
@ -25,6 +26,7 @@ import kafka.test.annotation.ClusterTemplate;
import kafka.test.annotation.ClusterConfigProperty;
import kafka.test.annotation.Type;
import kafka.test.annotation.AutoStart;
import org.apache.kafka.server.common.Features;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.jupiter.api.extension.TestTemplateInvocationContext;
import org.junit.jupiter.api.extension.TestTemplateInvocationContextProvider;
@ -172,6 +174,10 @@ public class ClusterTestExtensions implements TestTemplateInvocationContextProvi
.filter(e -> e.id() != -1)
.collect(Collectors.groupingBy(ClusterConfigProperty::id, Collectors.mapping(Function.identity(),
Collectors.toMap(ClusterConfigProperty::key, ClusterConfigProperty::value, (a, b) -> b))));
Map<Features, Short> features = Arrays.stream(annot.features())
.collect(Collectors.toMap(ClusterFeature::feature, ClusterFeature::version));
ClusterConfig config = ClusterConfig.builder()
.setTypes(new HashSet<>(Arrays.asList(types)))
.setBrokers(annot.brokers() == 0 ? defaults.brokers() : annot.brokers())
@ -184,6 +190,7 @@ public class ClusterTestExtensions implements TestTemplateInvocationContextProvi
.setSecurityProtocol(annot.securityProtocol())
.setMetadataVersion(annot.metadataVersion())
.setTags(Arrays.asList(annot.tags()))
.setFeatures(features)
.build();
return Arrays.stream(types).map(type -> type.invocationContexts(context.getRequiredTestMethod().getName(), config))

View File

@ -28,15 +28,20 @@ import kafka.testkit.KafkaClusterTestKit;
import kafka.testkit.TestKitNodes;
import kafka.zk.EmbeddedZookeeper;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.common.metadata.FeatureLevelRecord;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.metadata.BrokerState;
import org.apache.kafka.metadata.bootstrap.BootstrapMetadata;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.MetadataVersion;
import org.junit.jupiter.api.extension.AfterTestExecutionCallback;
import org.junit.jupiter.api.extension.BeforeTestExecutionCallback;
import org.junit.jupiter.api.extension.Extension;
import org.junit.jupiter.api.extension.TestTemplateInvocationContext;
import scala.compat.java8.OptionConverters;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
@ -237,8 +242,21 @@ public class RaftClusterInvocationContext implements TestTemplateInvocationConte
public void format() throws Exception {
if (formated.compareAndSet(false, true)) {
List<ApiMessageAndVersion> records = new ArrayList<>();
records.add(
new ApiMessageAndVersion(new FeatureLevelRecord().
setName(MetadataVersion.FEATURE_NAME).
setFeatureLevel(clusterConfig.metadataVersion().featureLevel()), (short) 0));
clusterConfig.features().forEach((feature, version) -> {
records.add(
new ApiMessageAndVersion(new FeatureLevelRecord().
setName(feature.featureName()).
setFeatureLevel(version), (short) 0));
});
TestKitNodes nodes = new TestKitNodes.Builder()
.setBootstrapMetadataVersion(clusterConfig.metadataVersion())
.setBootstrapMetadata(BootstrapMetadata.fromRecords(records, "testkit"))
.setCombined(isCombined)
.setNumBrokerNodes(clusterConfig.numBrokers())
.setNumDisksPerBroker(clusterConfig.numDisksPerBroker())

View File

@ -62,7 +62,6 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness {
}
override def generateConfigs: Seq[KafkaConfig] = {
val cfgs = TestUtils.createBrokerConfigs(brokerCount, zkConnectOrNull, interBrokerSecurityProtocol = Some(securityProtocol),
trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties, logDirCount = logDirCount)
configureListeners(cfgs)
@ -72,6 +71,7 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness {
}
if (isNewGroupCoordinatorEnabled()) {
cfgs.foreach(_.setProperty(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, "true"))
cfgs.foreach(_.setProperty(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, "classic,consumer"))
}
if(isKRaftTest()) {

View File

@ -37,7 +37,7 @@ import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble.VerificationF
import org.apache.kafka.metadata.properties.{MetaProperties, MetaPropertiesEnsemble, MetaPropertiesVersion}
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.raft.QuorumConfig
import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion}
import org.apache.kafka.server.common.{ApiMessageAndVersion, Features, MetadataVersion}
import org.apache.kafka.server.config.{KRaftConfigs, ServerConfigs}
import org.apache.kafka.server.fault.{FaultHandler, MockFaultHandler}
import org.apache.zookeeper.client.ZKClientConfig
@ -342,6 +342,15 @@ abstract class QuorumTestHarness extends Logging {
setName(MetadataVersion.FEATURE_NAME).
setFeatureLevel(metadataVersion.featureLevel()), 0.toShort))
if (isNewGroupCoordinatorEnabled()) {
metadataRecords.add(new ApiMessageAndVersion(
new FeatureLevelRecord()
.setName(Features.GROUP_VERSION.featureName)
.setFeatureLevel(Features.GROUP_VERSION.latestTesting),
0.toShort
))
}
optionalMetadataRecords.foreach { metadataArguments =>
for (record <- metadataArguments) metadataRecords.add(record)
}

View File

@ -72,7 +72,8 @@ object ZkMigrationIntegrationTest {
MetadataVersion.IBP_3_7_IV1,
MetadataVersion.IBP_3_7_IV2,
MetadataVersion.IBP_3_7_IV4,
MetadataVersion.IBP_3_8_IV0
MetadataVersion.IBP_3_8_IV0,
MetadataVersion.IBP_4_0_IVO
).map { mv =>
val serverProperties = new util.HashMap[String, String]()
serverProperties.put("inter.broker.listener.name", "EXTERNAL")

View File

@ -26,7 +26,7 @@ import org.apache.kafka.common.protocol.ApiKeys
import org.apache.kafka.common.record.RecordVersion
import org.apache.kafka.common.requests.{ApiVersionsRequest, ApiVersionsResponse, RequestUtils}
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.{GroupVersion, MetadataVersion}
import org.apache.kafka.test.TestUtils
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Tag
@ -69,9 +69,12 @@ abstract class AbstractApiVersionsRequestTest(cluster: ClusterInstance) {
assertEquals(MetadataVersion.latestTesting().featureLevel(), apiVersionsResponse.data().finalizedFeatures().find(MetadataVersion.FEATURE_NAME).minVersionLevel())
assertEquals(MetadataVersion.latestTesting().featureLevel(), apiVersionsResponse.data().finalizedFeatures().find(MetadataVersion.FEATURE_NAME).maxVersionLevel())
assertEquals(1, apiVersionsResponse.data().supportedFeatures().size())
assertEquals(2, apiVersionsResponse.data().supportedFeatures().size())
assertEquals(MetadataVersion.MINIMUM_KRAFT_VERSION.featureLevel(), apiVersionsResponse.data().supportedFeatures().find(MetadataVersion.FEATURE_NAME).minVersion())
assertEquals(MetadataVersion.latestTesting().featureLevel(), apiVersionsResponse.data().supportedFeatures().find(MetadataVersion.FEATURE_NAME).maxVersion())
assertEquals(0, apiVersionsResponse.data().supportedFeatures().find(GroupVersion.FEATURE_NAME).minVersion())
assertEquals(GroupVersion.GV_1.featureLevel(), apiVersionsResponse.data().supportedFeatures().find(GroupVersion.FEATURE_NAME).maxVersion())
}
val expectedApis = if (!cluster.isKRaftTest) {
ApiVersionsResponse.collectApis(

View File

@ -47,7 +47,7 @@ object ApiVersionsRequestTest {
List(ClusterConfig.defaultBuilder()
.setTypes(java.util.Collections.singleton(Type.ZK))
.setServerProperties(serverProperties)
.setMetadataVersion(MetadataVersion.IBP_3_8_IV0)
.setMetadataVersion(MetadataVersion.IBP_4_0_IVO)
.build()).asJava
}
@ -83,7 +83,7 @@ object ApiVersionsRequestTest {
class ApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVersionsRequestTest(cluster) {
@ClusterTemplate("testApiVersionsRequestTemplate")
@ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT), metadataVersion = MetadataVersion.IBP_3_8_IV0, serverProperties = Array(
@ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT), metadataVersion = MetadataVersion.IBP_4_0_IVO, serverProperties = Array(
new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "false"),
new ClusterConfigProperty(key = "unstable.feature.versions.enable", value = "true")
))

View File

@ -18,7 +18,7 @@
package kafka.server
import org.apache.kafka.common.feature.{Features, SupportedVersionRange}
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.{Features => ServerFeatures, MetadataVersion}
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
import org.junit.jupiter.api.Test
@ -95,6 +95,7 @@ class BrokerFeaturesTest {
val expectedFeatures = Map[String, Short](
MetadataVersion.FEATURE_NAME -> MetadataVersion.latestTesting().featureLevel(),
ServerFeatures.GROUP_VERSION.featureName() -> ServerFeatures.GROUP_VERSION.latestTesting(),
"test_feature_1" -> 4,
"test_feature_2" -> 3,
"test_feature_3" -> 7)

View File

@ -18,7 +18,7 @@ package kafka.server
import kafka.server.GroupCoordinatorBaseRequestTest
import kafka.test.ClusterInstance
import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, Type}
import kafka.test.annotation.{ClusterConfigProperty, ClusterFeature, ClusterTest, ClusterTestDefaults, Type}
import kafka.test.junit.ClusterTestExtensions
import kafka.utils.TestUtils
import org.apache.kafka.common.ConsumerGroupState
@ -26,6 +26,7 @@ import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData.{Assign
import org.apache.kafka.common.message.{ConsumerGroupDescribeRequestData, ConsumerGroupDescribeResponseData, ConsumerGroupHeartbeatResponseData}
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.{ConsumerGroupDescribeRequest, ConsumerGroupDescribeResponse}
import org.apache.kafka.server.common.Features
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.extension.ExtendWith
import org.junit.jupiter.api.{Tag, Timeout}
@ -60,11 +61,46 @@ class ConsumerGroupDescribeRequestsTest(cluster: ClusterInstance) extends GroupC
assertEquals(expectedResponse, consumerGroupDescribeResponse.data)
}
@ClusterTest(types = Array(Type.KRAFT), serverProperties = Array(
new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer"),
new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"),
new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1")
))
@ClusterTest(
types = Array(Type.KRAFT),
serverProperties = Array(
new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer"),
new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"),
new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1")
)
)
def testConsumerGroupDescribeIsInaccessibleWhenDisabledByGroupVersion(): Unit = {
val consumerGroupDescribeRequest = new ConsumerGroupDescribeRequest.Builder(
new ConsumerGroupDescribeRequestData().setGroupIds(List("grp-1", "grp-2").asJava)
).build(ApiKeys.CONSUMER_GROUP_DESCRIBE.latestVersion(isUnstableApiEnabled))
val consumerGroupDescribeResponse = connectAndReceive[ConsumerGroupDescribeResponse](consumerGroupDescribeRequest)
val expectedResponse = new ConsumerGroupDescribeResponseData()
expectedResponse.groups().add(
new ConsumerGroupDescribeResponseData.DescribedGroup()
.setGroupId("grp-1")
.setErrorCode(Errors.UNSUPPORTED_VERSION.code)
)
expectedResponse.groups.add(
new ConsumerGroupDescribeResponseData.DescribedGroup()
.setGroupId("grp-2")
.setErrorCode(Errors.UNSUPPORTED_VERSION.code)
)
assertEquals(expectedResponse, consumerGroupDescribeResponse.data)
}
@ClusterTest(
types = Array(Type.KRAFT),
serverProperties = Array(
new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer"),
new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"),
new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1")
),
features = Array(
new ClusterFeature(feature = Features.GROUP_VERSION, version = 1)
)
)
def testConsumerGroupDescribeWithNewGroupCoordinator(): Unit = {
// Creates the __consumer_offsets topics because it won't be created automatically
// in this test because it does not use FindCoordinator API.

View File

@ -17,13 +17,14 @@
package kafka.server
import kafka.test.ClusterInstance
import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, Type}
import kafka.test.annotation.{ClusterConfigProperty, ClusterFeature, ClusterTest, Type}
import kafka.test.junit.ClusterTestExtensions
import kafka.test.junit.RaftClusterInvocationContext.RaftClusterInstance
import kafka.utils.TestUtils
import org.apache.kafka.common.message.{ConsumerGroupHeartbeatRequestData, ConsumerGroupHeartbeatResponseData}
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{ConsumerGroupHeartbeatRequest, ConsumerGroupHeartbeatResponse}
import org.apache.kafka.server.common.Features
import org.junit.jupiter.api.Assertions.{assertEquals, assertNotEquals, assertNotNull}
import org.junit.jupiter.api.{Tag, Timeout}
import org.junit.jupiter.api.extension.ExtendWith
@ -36,7 +37,7 @@ import scala.jdk.CollectionConverters._
class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) {
@ClusterTest()
def testConsumerGroupHeartbeatIsAccessibleWhenEnabled(): Unit = {
def testConsumerGroupHeartbeatIsInaccessibleWhenDisabledByStaticConfig(): Unit = {
val consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder(
new ConsumerGroupHeartbeatRequestData()
).build()
@ -46,11 +47,35 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) {
assertEquals(expectedResponse, consumerGroupHeartbeatResponse.data)
}
@ClusterTest(types = Array(Type.KRAFT), serverProperties = Array(
new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer"),
new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"),
new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1")
))
@ClusterTest(
types = Array(Type.KRAFT),
serverProperties = Array(
new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer"),
new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"),
new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1")
)
)
def testConsumerGroupHeartbeatIsInaccessibleWhenDisabledByGroupVersion(): Unit = {
val consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder(
new ConsumerGroupHeartbeatRequestData()
).build()
val consumerGroupHeartbeatResponse = connectAndReceive(consumerGroupHeartbeatRequest)
val expectedResponse = new ConsumerGroupHeartbeatResponseData().setErrorCode(Errors.UNSUPPORTED_VERSION.code)
assertEquals(expectedResponse, consumerGroupHeartbeatResponse.data)
}
@ClusterTest(
types = Array(Type.KRAFT),
serverProperties = Array(
new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer"),
new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"),
new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1")
),
features = Array(
new ClusterFeature(feature = Features.GROUP_VERSION, version = 1)
)
)
def testConsumerGroupHeartbeatIsAccessibleWhenNewGroupCoordinatorIsEnabled(): Unit = {
val raftCluster = cluster.asInstanceOf[RaftClusterInstance]
val admin = cluster.createAdminClient()
@ -134,11 +159,17 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) {
assertEquals(-1, consumerGroupHeartbeatResponse.data.memberEpoch)
}
@ClusterTest(types = Array(Type.KRAFT), serverProperties = Array(
new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer"),
new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"),
new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1")
))
@ClusterTest(
types = Array(Type.KRAFT),
serverProperties = Array(
new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer"),
new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"),
new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1")
),
features = Array(
new ClusterFeature(feature = Features.GROUP_VERSION, version = 1)
)
)
def testRejoiningStaticMemberGetsAssignmentsBackWhenNewGroupCoordinatorIsEnabled(): Unit = {
val raftCluster = cluster.asInstanceOf[RaftClusterInstance]
val admin = cluster.createAdminClient()
@ -248,13 +279,19 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) {
assertNotEquals(oldMemberId, consumerGroupHeartbeatResponse.data.memberId)
}
@ClusterTest(types = Array(Type.KRAFT), serverProperties = Array(
new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer"),
new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"),
new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1"),
new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value = "5000"),
new ClusterConfigProperty(key = "group.consumer.min.session.timeout.ms", value = "5000")
))
@ClusterTest(
types = Array(Type.KRAFT),
serverProperties = Array(
new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer"),
new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"),
new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1"),
new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value = "5000"),
new ClusterConfigProperty(key = "group.consumer.min.session.timeout.ms", value = "5000")
),
features = Array(
new ClusterFeature(feature = Features.GROUP_VERSION, version = 1)
)
)
def testStaticMemberRemovedAfterSessionTimeoutExpiryWhenNewGroupCoordinatorIsEnabled(): Unit = {
val raftCluster = cluster.asInstanceOf[RaftClusterInstance]
val admin = cluster.createAdminClient()

View File

@ -17,13 +17,14 @@
package kafka.server
import kafka.test.ClusterInstance
import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, Type}
import kafka.test.annotation.{ClusterConfigProperty, ClusterFeature, ClusterTest, ClusterTestDefaults, Type}
import kafka.test.junit.ClusterTestExtensions
import org.apache.kafka.common.message.ListGroupsResponseData
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.coordinator.group.Group
import org.apache.kafka.coordinator.group.classic.ClassicGroupState
import org.apache.kafka.coordinator.group.consumer.ConsumerGroup.ConsumerGroupState
import org.apache.kafka.server.common.Features
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Tag
import org.junit.jupiter.api.Timeout
@ -34,11 +35,16 @@ import org.junit.jupiter.api.extension.ExtendWith
@ClusterTestDefaults(types = Array(Type.KRAFT))
@Tag("integration")
class ConsumerProtocolMigrationTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) {
@ClusterTest(serverProperties = Array(
new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer"),
new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"),
new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1")
))
@ClusterTest(
serverProperties = Array(
new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer"),
new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"),
new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1")
),
features = Array(
new ClusterFeature(feature = Features.GROUP_VERSION, version = 1)
)
)
def testUpgradeFromEmptyClassicToConsumerGroup(): Unit = {
// Creates the __consumer_offsets topics because it won't be created automatically
// in this test because it does not use FindCoordinator API.
@ -103,11 +109,16 @@ class ConsumerProtocolMigrationTest(cluster: ClusterInstance) extends GroupCoord
)
}
@ClusterTest(serverProperties = Array(
new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer"),
new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"),
new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1")
))
@ClusterTest(
serverProperties = Array(
new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer"),
new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"),
new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1")
),
features = Array(
new ClusterFeature(feature = Features.GROUP_VERSION, version = 1)
)
)
def testDowngradeFromEmptyConsumerToClassicGroup(): Unit = {
// Creates the __consumer_offsets topics because it won't be created automatically
// in this test because it does not use FindCoordinator API.
@ -165,11 +176,16 @@ class ConsumerProtocolMigrationTest(cluster: ClusterInstance) extends GroupCoord
)
}
@ClusterTest(serverProperties = Array(
new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer"),
new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"),
new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1")
))
@ClusterTest(
serverProperties = Array(
new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer"),
new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"),
new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1")
),
features = Array(
new ClusterFeature(feature = Features.GROUP_VERSION, version = 1)
)
)
def testUpgradeFromSimpleGroupToConsumerGroup(): Unit = {
// Creates the __consumer_offsets topics because it won't be created automatically
// in this test because it does not use FindCoordinator API.

View File

@ -17,11 +17,12 @@
package kafka.server
import kafka.test.ClusterInstance
import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, Type}
import kafka.test.annotation.{ClusterConfigProperty, ClusterFeature, ClusterTest, ClusterTestDefaults, Type}
import kafka.test.junit.ClusterTestExtensions
import org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroup
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.coordinator.group.classic.ClassicGroupState
import org.apache.kafka.server.common.Features
import org.junit.jupiter.api.Assertions.{assertEquals, fail}
import org.junit.jupiter.api.{Tag, Timeout}
import org.junit.jupiter.api.extension.ExtendWith
@ -31,31 +32,44 @@ import org.junit.jupiter.api.extension.ExtendWith
@ClusterTestDefaults(types = Array(Type.KRAFT))
@Tag("integration")
class DeleteGroupsRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) {
@ClusterTest(serverProperties = Array(
new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer"),
new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms", value = "600000"),
new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value = "600000"),
new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"),
new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1")
))
@ClusterTest(
serverProperties = Array(
new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer"),
new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms", value = "600000"),
new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value = "600000"),
new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"),
new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1")
),
features = Array(
new ClusterFeature(feature = Features.GROUP_VERSION, version = 1)
)
)
def testDeleteGroupsWithNewConsumerGroupProtocolAndNewGroupCoordinator(): Unit = {
testDeleteGroups(true)
}
@ClusterTest(serverProperties = Array(
new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer"),
new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"),
new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1")
))
@ClusterTest(
serverProperties = Array(
new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer"),
new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"),
new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1")
),
features = Array(
new ClusterFeature(feature = Features.GROUP_VERSION, version = 1)
)
)
def testDeleteGroupsWithOldConsumerGroupProtocolAndNewGroupCoordinator(): Unit = {
testDeleteGroups(false)
}
@ClusterTest(types = Array(Type.ZK, Type.KRAFT, Type.CO_KRAFT), serverProperties = Array(
new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic"),
new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"),
new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1")
))
@ClusterTest(
types = Array(Type.ZK, Type.KRAFT, Type.CO_KRAFT),
serverProperties = Array(
new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic"),
new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"),
new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1")
)
)
def testDeleteGroupsWithOldConsumerGroupProtocolAndOldGroupCoordinator(): Unit = {
testDeleteGroups(false)
}

View File

@ -79,7 +79,7 @@ import org.apache.kafka.raft.QuorumConfig
import org.apache.kafka.server.ClientMetricsManager
import org.apache.kafka.server.authorizer.{Action, AuthorizationResult, Authorizer}
import org.apache.kafka.server.common.MetadataVersion.{IBP_0_10_2_IV0, IBP_2_2_IV1}
import org.apache.kafka.server.common.{FinalizedFeatures, MetadataVersion}
import org.apache.kafka.server.common.{FinalizedFeatures, GroupVersion, MetadataVersion}
import org.apache.kafka.server.config._
import org.apache.kafka.server.metrics.ClientMetricsTestUtils
import org.apache.kafka.server.util.{FutureUtils, MockTime}
@ -7020,6 +7020,16 @@ class KafkaApisTest extends Logging {
@Test
def testConsumerGroupHeartbeatRequest(): Unit = {
metadataCache = mock(classOf[KRaftMetadataCache])
when(metadataCache.features()).thenReturn {
new FinalizedFeatures(
MetadataVersion.latestTesting(),
Collections.singletonMap(GroupVersion.FEATURE_NAME, GroupVersion.GV_1.featureLevel()),
0,
true
)
}
val consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequestData().setGroupId("group")
val requestChannelRequest = buildRequest(new ConsumerGroupHeartbeatRequest.Builder(consumerGroupHeartbeatRequest, true).build())
@ -7029,9 +7039,10 @@ class KafkaApisTest extends Logging {
requestChannelRequest.context,
consumerGroupHeartbeatRequest
)).thenReturn(future)
kafkaApis = createKafkaApis(overrideProperties = Map(
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true"
))
kafkaApis = createKafkaApis(
overrideProperties = Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> "classic,consumer"),
raftSupport = true
)
kafkaApis.handle(requestChannelRequest, RequestLocal.NoCaching)
val consumerGroupHeartbeatResponse = new ConsumerGroupHeartbeatResponseData()
@ -7044,6 +7055,16 @@ class KafkaApisTest extends Logging {
@Test
def testConsumerGroupHeartbeatRequestFutureFailed(): Unit = {
metadataCache = mock(classOf[KRaftMetadataCache])
when(metadataCache.features()).thenReturn {
new FinalizedFeatures(
MetadataVersion.latestTesting(),
Collections.singletonMap(GroupVersion.FEATURE_NAME, GroupVersion.GV_1.featureLevel()),
0,
true
)
}
val consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequestData().setGroupId("group")
val requestChannelRequest = buildRequest(new ConsumerGroupHeartbeatRequest.Builder(consumerGroupHeartbeatRequest, true).build())
@ -7053,9 +7074,10 @@ class KafkaApisTest extends Logging {
requestChannelRequest.context,
consumerGroupHeartbeatRequest
)).thenReturn(future)
kafkaApis = createKafkaApis(overrideProperties = Map(
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true"
))
kafkaApis = createKafkaApis(
overrideProperties = Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> "classic,consumer"),
raftSupport = true
)
kafkaApis.handle(requestChannelRequest, RequestLocal.NoCaching)
future.completeExceptionally(Errors.FENCED_MEMBER_EPOCH.exception)
@ -7065,6 +7087,16 @@ class KafkaApisTest extends Logging {
@Test
def testConsumerGroupHeartbeatRequestAuthorizationFailed(): Unit = {
metadataCache = mock(classOf[KRaftMetadataCache])
when(metadataCache.features()).thenReturn {
new FinalizedFeatures(
MetadataVersion.latestTesting(),
Collections.singletonMap(GroupVersion.FEATURE_NAME, GroupVersion.GV_1.featureLevel()),
0,
true
)
}
val consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequestData().setGroupId("group")
val requestChannelRequest = buildRequest(new ConsumerGroupHeartbeatRequest.Builder(consumerGroupHeartbeatRequest, true).build())
@ -7074,7 +7106,8 @@ class KafkaApisTest extends Logging {
.thenReturn(Seq(AuthorizationResult.DENIED).asJava)
kafkaApis = createKafkaApis(
authorizer = Some(authorizer),
overrideProperties = Map(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true")
overrideProperties = Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> "classic,consumer"),
raftSupport = true
)
kafkaApis.handle(requestChannelRequest, RequestLocal.NoCaching)
@ -7084,6 +7117,16 @@ class KafkaApisTest extends Logging {
@Test
def testConsumerGroupDescribe(): Unit = {
metadataCache = mock(classOf[KRaftMetadataCache])
when(metadataCache.features()).thenReturn {
new FinalizedFeatures(
MetadataVersion.latestTesting(),
Collections.singletonMap(GroupVersion.FEATURE_NAME, GroupVersion.GV_1.featureLevel()),
0,
true
)
}
val groupIds = List("group-id-0", "group-id-1", "group-id-2").asJava
val consumerGroupDescribeRequestData = new ConsumerGroupDescribeRequestData()
consumerGroupDescribeRequestData.groupIds.addAll(groupIds)
@ -7095,7 +7138,8 @@ class KafkaApisTest extends Logging {
any[util.List[String]]
)).thenReturn(future)
kafkaApis = createKafkaApis(
overrideProperties = Map(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true")
overrideProperties = Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> "classic,consumer"),
raftSupport = true
)
kafkaApis.handle(requestChannelRequest, RequestLocal.NoCaching)
@ -7134,6 +7178,16 @@ class KafkaApisTest extends Logging {
@Test
def testConsumerGroupDescribeAuthorizationFailed(): Unit = {
metadataCache = mock(classOf[KRaftMetadataCache])
when(metadataCache.features()).thenReturn {
new FinalizedFeatures(
MetadataVersion.latestTesting(),
Collections.singletonMap(GroupVersion.FEATURE_NAME, GroupVersion.GV_1.featureLevel()),
0,
true
)
}
val consumerGroupDescribeRequestData = new ConsumerGroupDescribeRequestData()
consumerGroupDescribeRequestData.groupIds.add("group-id")
val requestChannelRequest = buildRequest(new ConsumerGroupDescribeRequest.Builder(consumerGroupDescribeRequestData, true).build())
@ -7150,7 +7204,8 @@ class KafkaApisTest extends Logging {
future.complete(List().asJava)
kafkaApis = createKafkaApis(
authorizer = Some(authorizer),
overrideProperties = Map(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true")
overrideProperties = Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> "classic,consumer"),
raftSupport = true
)
kafkaApis.handle(requestChannelRequest, RequestLocal.NoCaching)
@ -7160,6 +7215,16 @@ class KafkaApisTest extends Logging {
@Test
def testConsumerGroupDescribeFutureFailed(): Unit = {
metadataCache = mock(classOf[KRaftMetadataCache])
when(metadataCache.features()).thenReturn {
new FinalizedFeatures(
MetadataVersion.latestTesting(),
Collections.singletonMap(GroupVersion.FEATURE_NAME, GroupVersion.GV_1.featureLevel()),
0,
true
)
}
val consumerGroupDescribeRequestData = new ConsumerGroupDescribeRequestData()
consumerGroupDescribeRequestData.groupIds.add("group-id")
val requestChannelRequest = buildRequest(new ConsumerGroupDescribeRequest.Builder(consumerGroupDescribeRequestData, true).build())
@ -7169,9 +7234,10 @@ class KafkaApisTest extends Logging {
any[RequestContext],
any[util.List[String]]
)).thenReturn(future)
kafkaApis = createKafkaApis(overrideProperties = Map(
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true"
))
kafkaApis = createKafkaApis(
overrideProperties = Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> "classic,consumer"),
raftSupport = true
)
kafkaApis.handle(requestChannelRequest, RequestLocal.NoCaching)
future.completeExceptionally(Errors.FENCED_MEMBER_EPOCH.exception)

View File

@ -17,13 +17,14 @@
package kafka.server
import kafka.test.ClusterInstance
import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, Type}
import kafka.test.annotation.{ClusterConfigProperty, ClusterFeature, ClusterTest, ClusterTestDefaults, Type}
import kafka.test.junit.ClusterTestExtensions
import org.apache.kafka.common.message.ListGroupsResponseData
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.coordinator.group.consumer.ConsumerGroup.ConsumerGroupState
import org.apache.kafka.coordinator.group.classic.ClassicGroupState
import org.apache.kafka.coordinator.group.Group
import org.apache.kafka.server.common.Features
import org.junit.jupiter.api.Assertions.{assertEquals, fail}
import org.junit.jupiter.api.{Tag, Timeout}
import org.junit.jupiter.api.extension.ExtendWith
@ -33,13 +34,18 @@ import org.junit.jupiter.api.extension.ExtendWith
@ClusterTestDefaults(types = Array(Type.KRAFT))
@Tag("integration")
class ListGroupsRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) {
@ClusterTest(serverProperties = Array(
new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer"),
new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms", value = "600000"),
new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value = "600000"),
new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"),
new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1")
))
@ClusterTest(
serverProperties = Array(
new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer"),
new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms", value = "600000"),
new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value = "600000"),
new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"),
new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1")
),
features = Array(
new ClusterFeature(feature = Features.GROUP_VERSION, version = 1)
)
)
def testListGroupsWithNewConsumerGroupProtocolAndNewGroupCoordinator(): Unit = {
testListGroups(true)
}

View File

@ -17,9 +17,10 @@
package kafka.server
import kafka.test.ClusterInstance
import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, Type}
import kafka.test.annotation.{ClusterConfigProperty, ClusterFeature, ClusterTest, ClusterTestDefaults, Type}
import kafka.test.junit.ClusterTestExtensions
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.server.common.Features
import org.junit.jupiter.api.Assertions.fail
import org.junit.jupiter.api.{Tag, Timeout}
import org.junit.jupiter.api.extension.ExtendWith
@ -30,22 +31,29 @@ import org.junit.jupiter.api.extension.ExtendWith
@Tag("integration")
class OffsetCommitRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) {
@ClusterTest(serverProperties = Array(
new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer"),
new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms", value = "600000"),
new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value = "600000"),
new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"),
new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1")
))
@ClusterTest(
serverProperties = Array(
new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer"),
new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms", value = "600000"),
new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value = "600000"),
new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"),
new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1")
),
features = Array(
new ClusterFeature(feature = Features.GROUP_VERSION, version = 1)
)
)
def testOffsetCommitWithNewConsumerGroupProtocolAndNewGroupCoordinator(): Unit = {
testOffsetCommit(true)
}
@ClusterTest(serverProperties = Array(
new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer"),
new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"),
new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1")
))
@ClusterTest(
serverProperties = Array(
new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer"),
new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"),
new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1")
)
)
def testOffsetCommitWithOldConsumerGroupProtocolAndNewGroupCoordinator(): Unit = {
testOffsetCommit(false)
}

View File

@ -17,9 +17,10 @@
package kafka.server
import kafka.test.ClusterInstance
import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, Type}
import kafka.test.annotation.{ClusterConfigProperty, ClusterFeature, ClusterTest, ClusterTestDefaults, Type}
import kafka.test.junit.ClusterTestExtensions
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.server.common.Features
import org.junit.jupiter.api.Assertions.fail
import org.junit.jupiter.api.{Tag, Timeout}
import org.junit.jupiter.api.extension.ExtendWith
@ -29,13 +30,18 @@ import org.junit.jupiter.api.extension.ExtendWith
@ClusterTestDefaults(types = Array(Type.KRAFT))
@Tag("integration")
class OffsetDeleteRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) {
@ClusterTest(serverProperties = Array(
new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer"),
new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms", value = "600000"),
new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value = "600000"),
new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"),
new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1")
))
@ClusterTest(
serverProperties = Array(
new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer"),
new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms", value = "600000"),
new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value = "600000"),
new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"),
new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1")
),
features = Array(
new ClusterFeature(feature = Features.GROUP_VERSION, version = 1)
)
)
def testOffsetDeleteWithNewConsumerGroupProtocolAndNewGroupCoordinator(): Unit = {
testOffsetDelete(true)
}

View File

@ -18,18 +18,16 @@ package kafka.server
import kafka.test.ClusterInstance
import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, Type}
import kafka.test.annotation.{ClusterConfigProperty, ClusterFeature, ClusterTest, ClusterTestDefaults, Type}
import kafka.test.junit.ClusterTestExtensions
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.message.OffsetFetchResponseData
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.server.common.Features
import org.junit.jupiter.api.Assertions.{assertEquals, fail}
import org.junit.jupiter.api.{Tag, Timeout}
import org.junit.jupiter.api.extension.ExtendWith
import scala.jdk.CollectionConverters._
@Timeout(120)
@ -38,24 +36,31 @@ import scala.jdk.CollectionConverters._
@Tag("integration")
class OffsetFetchRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) {
@ClusterTest(serverProperties = Array(
new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer"),
new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms", value = "600000"),
new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value = "600000"),
new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"),
new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1")
))
@ClusterTest(
serverProperties = Array(
new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer"),
new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms", value = "600000"),
new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value = "600000"),
new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"),
new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1")
),
features = Array(
new ClusterFeature(feature = Features.GROUP_VERSION, version = 1)
)
)
def testSingleGroupOffsetFetchWithNewConsumerGroupProtocolAndNewGroupCoordinator(): Unit = {
testSingleGroupOffsetFetch(useNewProtocol = true, requireStable = true)
}
@ClusterTest(serverProperties = Array(
new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer"),
new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms", value = "600000"),
new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value = "600000"),
new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"),
new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1")
))
@ClusterTest(
serverProperties = Array(
new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer"),
new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms", value = "600000"),
new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value = "600000"),
new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"),
new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1")
)
)
def testSingleGroupOffsetFetchWithOldConsumerGroupProtocolAndNewGroupCoordinator(): Unit = {
testSingleGroupOffsetFetch(useNewProtocol = false, requireStable = false)
}
@ -71,13 +76,18 @@ class OffsetFetchRequestTest(cluster: ClusterInstance) extends GroupCoordinatorB
testSingleGroupOffsetFetch(useNewProtocol = false, requireStable = true)
}
@ClusterTest(serverProperties = Array(
new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer"),
new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms", value = "600000"),
new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value = "600000"),
new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"),
new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1")
))
@ClusterTest(
serverProperties = Array(
new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer"),
new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms", value = "600000"),
new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value = "600000"),
new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"),
new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1")
),
features = Array(
new ClusterFeature(feature = Features.GROUP_VERSION, version = 1)
)
)
def testSingleGroupAllOffsetFetchWithNewConsumerGroupProtocolAndNewGroupCoordinator(): Unit = {
testSingleGroupAllOffsetFetch(useNewProtocol = true, requireStable = true)
}
@ -104,13 +114,18 @@ class OffsetFetchRequestTest(cluster: ClusterInstance) extends GroupCoordinatorB
testSingleGroupAllOffsetFetch(useNewProtocol = false, requireStable = true)
}
@ClusterTest(serverProperties = Array(
new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer"),
new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms", value = "600000"),
new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value = "600000"),
new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"),
new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1")
))
@ClusterTest(
serverProperties = Array(
new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer"),
new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms", value = "600000"),
new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value = "600000"),
new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"),
new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1")
),
features = Array(
new ClusterFeature(feature = Features.GROUP_VERSION, version = 1)
)
)
def testMultiGroupsOffsetFetchWithNewConsumerGroupProtocolAndNewGroupCoordinator(): Unit = {
testMultipleGroupsOffsetFetch(useNewProtocol = true, requireStable = true)
}

View File

@ -299,7 +299,7 @@ class ReplicationQuotasTest extends QuorumTestHarness {
features.add(new BrokerRegistrationRequestData.Feature()
.setName(MetadataVersion.FEATURE_NAME)
.setMinSupportedVersion(MetadataVersion.IBP_3_0_IV1.featureLevel())
.setMaxSupportedVersion(MetadataVersion.IBP_3_8_IV0.featureLevel()))
.setMaxSupportedVersion(MetadataVersion.IBP_4_0_IVO.featureLevel()))
controllerServer.controller.registerBroker(
ControllerRequestContextUtil.ANONYMOUS_CONTEXT,
new BrokerRegistrationRequestData()

View File

@ -376,10 +376,18 @@ Found problem:
true
)
val featureLevel = Features.TEST_VERSION.defaultValue(metadataVersion)
if (featureLevel > 0) {
assertEquals(List(generateRecord(TestFeatureVersion.FEATURE_NAME, featureLevel)), records)
val expectedRecords = new ArrayBuffer[ApiMessageAndVersion]()
def maybeAddRecordFor(features: Features): Unit = {
val featureLevel = features.defaultValue(metadataVersion)
if (featureLevel > 0) {
expectedRecords += generateRecord(features.featureName, featureLevel)
}
}
Features.FEATURES.foreach(maybeAddRecordFor)
assertEquals(expectedRecords, records)
}
@Test
def testVersionDefaultNoArgs(): Unit = {

View File

@ -196,7 +196,7 @@ public class QuorumControllerTest {
) {
controlEnv.activeController().registerBroker(ANONYMOUS_CONTEXT,
new BrokerRegistrationRequestData().
setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_8_IV0)).
setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_4_0_IVO)).
setBrokerId(0).
setLogDirs(Collections.singletonList(Uuid.fromString("iiaQjkRPQcuMULNII0MUeA"))).
setClusterId(logEnv.clusterId())).get();
@ -240,7 +240,7 @@ public class QuorumControllerTest {
) {
controlEnv.activeController().registerBroker(ANONYMOUS_CONTEXT,
new BrokerRegistrationRequestData().
setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_8_IV0)).
setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_4_0_IVO)).
setBrokerId(0).
setLogDirs(Collections.singletonList(Uuid.fromString("sTbzRAMnTpahIyIPNjiLhw"))).
setClusterId(logEnv.clusterId())).get();
@ -298,7 +298,7 @@ public class QuorumControllerTest {
new BrokerRegistrationRequestData().
setBrokerId(brokerId).
setClusterId(active.clusterId()).
setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_7_IV0)).
setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_4_0_IVO)).
setIncarnationId(Uuid.randomUuid()).
setListeners(listeners));
brokerEpochs.put(brokerId, reply.get().epoch());
@ -717,7 +717,7 @@ public class QuorumControllerTest {
setBrokerId(0).
setClusterId(active.clusterId()).
setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwBA")).
setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_8_IV0)).
setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_4_0_IVO)).
setLogDirs(Collections.singletonList(Uuid.fromString("vBpaRsZVSaGsQT53wtYGtg"))).
setListeners(listeners));
assertEquals(5L, reply.get().epoch());

View File

@ -74,7 +74,7 @@ public class QuorumFeaturesTest {
for (Features feature : Features.PRODUCTION_FEATURES) {
expectedFeatures.put(feature.featureName(), VersionRange.of(
0,
feature.defaultValue(MetadataVersion.LATEST_PRODUCTION)
feature.defaultValue(MetadataVersion.latestTesting())
));
}
assertEquals(expectedFeatures, QuorumFeatures.defaultFeatureMap(true));

View File

@ -39,7 +39,8 @@ public enum Features {
*
* See {@link TestFeatureVersion} as an example. See {@link FeatureVersion} when implementing a new feature.
*/
TEST_VERSION("test.feature.version", TestFeatureVersion.values());
TEST_VERSION("test.feature.version", TestFeatureVersion.values()),
GROUP_VERSION("group.version", GroupVersion.values());
public static final Features[] FEATURES;
public static final List<Features> PRODUCTION_FEATURES;

View File

@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.common;
import java.util.Collections;
import java.util.Map;
public enum GroupVersion implements FeatureVersion {
// Version 1 enables the consumer rebalance protocol (KIP-848).
GV_1(1, MetadataVersion.IBP_4_0_IVO, Collections.emptyMap());
public static final String FEATURE_NAME = "group.version";
private final short featureLevel;
private final MetadataVersion bootstrapMetadataVersion;
private final Map<String, Short> dependencies;
GroupVersion(
int featureLevel,
MetadataVersion bootstrapMetadataVersion,
Map<String, Short> dependencies
) {
this.featureLevel = (short) featureLevel;
this.bootstrapMetadataVersion = bootstrapMetadataVersion;
this.dependencies = dependencies;
}
@Override
public short featureLevel() {
return featureLevel;
}
@Override
public String featureName() {
return FEATURE_NAME;
}
@Override
public MetadataVersion bootstrapMetadataVersion() {
return bootstrapMetadataVersion;
}
@Override
public Map<String, Short> dependencies() {
return dependencies;
}
}

View File

@ -202,7 +202,10 @@ public enum MetadataVersion {
IBP_3_7_IV4(19, "3.7", "IV4", false),
// Add ELR related supports (KIP-966).
IBP_3_8_IV0(20, "3.8", "IV0", true);
IBP_3_8_IV0(20, "3.8", "IV0", true),
// Introduce version 1 of the GroupVersion feature (KIP-848).
IBP_4_0_IVO(21, "4.0", "IV0", false);
// NOTES when adding a new version:
// Update the default version in @ClusterTest annotation to point to the latest version

View File

@ -47,6 +47,7 @@ class MetadataVersionTest {
}
@Test
@SuppressWarnings("checkstyle:JavaNCSS")
public void testFromVersionString() {
assertEquals(IBP_0_8_0, MetadataVersion.fromVersionString("0.8.0"));
assertEquals(IBP_0_8_0, MetadataVersion.fromVersionString("0.8.0.0"));
@ -184,6 +185,8 @@ class MetadataVersionTest {
assertEquals(IBP_3_7_IV4, MetadataVersion.fromVersionString("3.7-IV4"));
assertEquals(IBP_3_8_IV0, MetadataVersion.fromVersionString("3.8-IV0"));
assertEquals(IBP_4_0_IVO, MetadataVersion.fromVersionString("4.0-IV0"));
}
@Test
@ -243,6 +246,8 @@ class MetadataVersionTest {
assertEquals("3.7", IBP_3_7_IV2.shortVersion());
assertEquals("3.7", IBP_3_7_IV3.shortVersion());
assertEquals("3.7", IBP_3_7_IV4.shortVersion());
assertEquals("3.8", IBP_3_8_IV0.shortVersion());
assertEquals("4.0", IBP_4_0_IVO.shortVersion());
}
@Test
@ -292,6 +297,7 @@ class MetadataVersionTest {
assertEquals("3.7-IV3", IBP_3_7_IV3.version());
assertEquals("3.7-IV4", IBP_3_7_IV4.version());
assertEquals("3.8-IV0", IBP_3_8_IV0.version());
assertEquals("4.0-IV0", IBP_4_0_IVO.version());
}
@Test

View File

@ -19,7 +19,10 @@ package org.apache.kafka.tools;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import kafka.test.ClusterInstance;
import kafka.test.annotation.ClusterTest;
import kafka.test.annotation.Type;
@ -58,9 +61,15 @@ public class FeatureCommandTest {
String commandOutput = ToolsTestUtils.captureStandardOut(() ->
assertEquals(0, FeatureCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(), "describe"))
);
List<String> features = Arrays.stream(commandOutput.split("\n")).sorted().collect(Collectors.toList());
assertEquals("Feature: group.version\tSupportedMinVersion: 0\t" +
"SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t", outputWithoutEpoch(features.get(0)));
// Change expected message to reflect latest MetadataVersion (SupportedMaxVersion increases when adding a new version)
assertEquals("Feature: metadata.version\tSupportedMinVersion: 3.0-IV1\t" +
"SupportedMaxVersion: 3.8-IV0\tFinalizedVersionLevel: 3.3-IV1\t", outputWithoutEpoch(commandOutput));
"SupportedMaxVersion: 4.0-IV0\tFinalizedVersionLevel: 3.3-IV1\t", outputWithoutEpoch(features.get(1)));
}
@ClusterTest(types = {Type.KRAFT}, metadataVersion = MetadataVersion.IBP_3_7_IV4)
@ -68,9 +77,15 @@ public class FeatureCommandTest {
String commandOutput = ToolsTestUtils.captureStandardOut(() ->
assertEquals(0, FeatureCommand.mainNoExit("--bootstrap-controller", cluster.bootstrapControllers(), "describe"))
);
List<String> features = Arrays.stream(commandOutput.split("\n")).sorted().collect(Collectors.toList());
assertEquals("Feature: group.version\tSupportedMinVersion: 0\t" +
"SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t", outputWithoutEpoch(features.get(0)));
// Change expected message to reflect latest MetadataVersion (SupportedMaxVersion increases when adding a new version)
assertEquals("Feature: metadata.version\tSupportedMinVersion: 3.0-IV1\t" +
"SupportedMaxVersion: 3.8-IV0\tFinalizedVersionLevel: 3.7-IV4\t", outputWithoutEpoch(commandOutput));
"SupportedMaxVersion: 4.0-IV0\tFinalizedVersionLevel: 3.7-IV4\t", outputWithoutEpoch(features.get(1)));
}
@ClusterTest(types = {Type.ZK}, metadataVersion = MetadataVersion.IBP_3_3_IV1)
@ -129,7 +144,7 @@ public class FeatureCommandTest {
);
// Change expected message to reflect possible MetadataVersion range 1-N (N increases when adding a new version)
assertEquals("Could not disable metadata.version. Invalid update version 0 for feature " +
"metadata.version. Local controller 3000 only supports versions 1-20", commandOutput);
"metadata.version. Local controller 3000 only supports versions 1-21", commandOutput);
commandOutput = ToolsTestUtils.captureStandardOut(() ->
assertEquals(1, FeatureCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(),

View File

@ -85,7 +85,10 @@ public class ConsumerGroupCommandTest extends kafka.integration.KafkaServerTestH
0,
false
).foreach(props -> {
props.setProperty(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, isNewGroupCoordinatorEnabled() + "");
if (isNewGroupCoordinatorEnabled()) {
props.setProperty(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, "true");
props.setProperty(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, "classic,consumer");
}
cfgs.add(KafkaConfig.fromProps(props));
return null;
});

View File

@ -22,6 +22,8 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.common.Features;
import org.apache.kafka.server.common.GroupVersion;
import java.time.Duration;
import java.util.ArrayList;
@ -41,6 +43,7 @@ import java.util.stream.Stream;
import static kafka.test.annotation.Type.CO_KRAFT;
import static kafka.test.annotation.Type.KRAFT;
import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG;
import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG;
import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG;
import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG;
@ -60,9 +63,11 @@ class ConsumerGroupCommandTestUtils {
serverProperties.put(OFFSETS_TOPIC_PARTITIONS_CONFIG, "1");
serverProperties.put(OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, "1");
serverProperties.put(NEW_GROUP_COORDINATOR_ENABLE_CONFIG, "true");
serverProperties.put(GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, "classic,consumer");
return Collections.singletonList(ClusterConfig.defaultBuilder()
.setTypes(Stream.of(KRAFT, CO_KRAFT).collect(Collectors.toSet()))
.setFeatures(Collections.singletonMap(Features.GROUP_VERSION, GroupVersion.GV_1.featureLevel()))
.setServerProperties(serverProperties)
.setTags(Collections.singletonList("consumerGroupCoordinator"))
.build());