KAFKA-16894: Define feature to enable share groups (#19293)

This PR proposes a switch to enable share groups for 4.1 (preview) and
4.2 (GA).

* `share.version=1` to indicate that share groups are enabled. This is
used as the switch for turning share groups on and off.

In 4.1, the default will be `share.version=0`. Then a user wanting to
evaluate the preview of KIP-932 would use `bin/kafka-features.sh
--bootstrap.server xxxx upgrade --feature share.version=1`.

In 4.2, the default will be `share.version=1`.

Reviewers: Jun Rao <junrao@gmail.com>
This commit is contained in:
Andrew Schofield 2025-04-11 12:14:38 +01:00 committed by GitHub
parent 3e0276ebb7
commit 21a080f08c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 121 additions and 13 deletions

View File

@ -25,7 +25,7 @@ import org.apache.kafka.common.protocol.ApiKeys
import org.apache.kafka.common.requests.{ApiVersionsRequest, ApiVersionsResponse, RequestUtils}
import org.apache.kafka.common.test.ClusterInstance
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.server.common.{EligibleLeaderReplicasVersion, GroupVersion, MetadataVersion, TransactionVersion}
import org.apache.kafka.server.common.{EligibleLeaderReplicasVersion, GroupVersion, MetadataVersion, ShareVersion, TransactionVersion}
import org.apache.kafka.test.TestUtils
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Tag
@ -64,11 +64,11 @@ abstract class AbstractApiVersionsRequestTest(cluster: ClusterInstance) {
apiVersion: Short = ApiKeys.API_VERSIONS.latestVersion
): Unit = {
if (apiVersion >= 3) {
assertEquals(4, apiVersionsResponse.data().finalizedFeatures().size())
assertEquals(5, apiVersionsResponse.data().finalizedFeatures().size())
assertEquals(MetadataVersion.latestTesting().featureLevel(), apiVersionsResponse.data().finalizedFeatures().find(MetadataVersion.FEATURE_NAME).minVersionLevel())
assertEquals(MetadataVersion.latestTesting().featureLevel(), apiVersionsResponse.data().finalizedFeatures().find(MetadataVersion.FEATURE_NAME).maxVersionLevel())
assertEquals(5, apiVersionsResponse.data().supportedFeatures().size())
assertEquals(6, apiVersionsResponse.data().supportedFeatures().size())
assertEquals(MetadataVersion.MINIMUM_VERSION.featureLevel(), apiVersionsResponse.data().supportedFeatures().find(MetadataVersion.FEATURE_NAME).minVersion())
if (apiVersion < 4) {
assertEquals(1, apiVersionsResponse.data().supportedFeatures().find("kraft.version").minVersion())
@ -85,6 +85,9 @@ abstract class AbstractApiVersionsRequestTest(cluster: ClusterInstance) {
assertEquals(0, apiVersionsResponse.data().supportedFeatures().find(EligibleLeaderReplicasVersion.FEATURE_NAME).minVersion())
assertEquals(EligibleLeaderReplicasVersion.ELRV_1.featureLevel(), apiVersionsResponse.data().supportedFeatures().find(EligibleLeaderReplicasVersion.FEATURE_NAME).maxVersion())
assertEquals(0, apiVersionsResponse.data().supportedFeatures().find(ShareVersion.FEATURE_NAME).minVersion())
assertEquals(ShareVersion.SV_1.featureLevel(), apiVersionsResponse.data().supportedFeatures().find(ShareVersion.FEATURE_NAME).maxVersion())
}
val expectedApis = if (cluster.controllerListenerName().toScala.contains(listenerName)) {
ApiVersionsResponse.collectApis(

View File

@ -325,7 +325,7 @@ Found problem:
properties.putAll(defaultStaticQuorumProperties)
properties.setProperty("log.dirs", availableDirs.mkString(","))
assertEquals("Unsupported feature: non.existent.feature. Supported features are: " +
"eligible.leader.replicas.version, group.version, kraft.version, transaction.version",
"eligible.leader.replicas.version, group.version, kraft.version, share.version, transaction.version",
assertThrows(classOf[FormatterException], () =>
runFormatCommand(new ByteArrayOutputStream(), properties,
Seq("--feature", "non.existent.feature=20"))).getMessage)

View File

@ -387,7 +387,7 @@ public class FeatureControlManagerTest {
build();
manager.replay(new FeatureLevelRecord().setName(MetadataVersion.FEATURE_NAME).setFeatureLevel(MetadataVersion.MINIMUM_VERSION.featureLevel()));
assertEquals(ControllerResult.of(List.of(), new ApiError(Errors.INVALID_UPDATE_VERSION,
"Invalid update version 6 for feature metadata.version. Local controller 0 only supports versions 7-26")),
"Invalid update version 6 for feature metadata.version. Local controller 0 only supports versions 7-28")),
manager.updateFeatures(
Map.of(MetadataVersion.FEATURE_NAME, MetadataVersionTestUtils.IBP_3_3_IV2_FEATURE_LEVEL),
Map.of(MetadataVersion.FEATURE_NAME, FeatureUpdate.UpgradeType.UNSAFE_DOWNGRADE),

View File

@ -368,7 +368,7 @@ public class FormatterTest {
formatter1.formatter.setFeatureLevel("nonexistent.feature", (short) 1);
assertEquals("Unsupported feature: nonexistent.feature. Supported features " +
"are: eligible.leader.replicas.version, group.version, kraft.version, " +
"test.feature.version, transaction.version",
"share.version, test.feature.version, transaction.version",
assertThrows(FormatterException.class,
() -> formatter1.formatter.run()).
getMessage());

View File

@ -47,6 +47,7 @@ public enum Feature {
TRANSACTION_VERSION(TransactionVersion.FEATURE_NAME, TransactionVersion.values(), TransactionVersion.LATEST_PRODUCTION),
GROUP_VERSION(GroupVersion.FEATURE_NAME, GroupVersion.values(), GroupVersion.LATEST_PRODUCTION),
ELIGIBLE_LEADER_REPLICAS_VERSION(EligibleLeaderReplicasVersion.FEATURE_NAME, EligibleLeaderReplicasVersion.values(), EligibleLeaderReplicasVersion.LATEST_PRODUCTION),
SHARE_VERSION(ShareVersion.FEATURE_NAME, ShareVersion.values(), ShareVersion.LATEST_PRODUCTION),
/**
* Features defined only for unit tests and are not used in production.

View File

@ -112,7 +112,22 @@ public enum MetadataVersion {
//
// Enables ELR by default for new clusters (KIP-966).
IBP_4_1_IV0(26, "4.1", "IV0", false);
IBP_4_1_IV0(26, "4.1", "IV0", false),
// Enables share groups. Note, share groups are for preview only in 4.1. (KIP-932).
IBP_4_1_IV1(27, "4.1", "IV1", false),
// Insert any additional IBP_4_1_IVx versions above this comment, and bump the feature level of
// IBP_4_2_IV0 accordingly. When 4.2 development begins, IBP_4_2_IV0 will cease to be
// a placeholder.
// Enables share groups by default for new clusters (KIP-932).
//
// *** THIS IS A PLACEHOLDER UNSTABLE VERSION WHICH IS USED TO DEFINE THE POINT AT WHICH ***
// *** SHARE GROUPS BECOME PRODUCTION-READY IN THE FUTURE. ITS DEFINITION ALLOWS A SHARE ***
// *** GROUPS FEATURE TO BE DEFINED IN 4.1 BUT TURNED OFF BY DEFAULT, ABLE TO BE TURNED ON ***
// *** DYNAMICALLY TO TRY OUT THE PREVIEW CAPABILITY. ***
IBP_4_2_IV0(28, "4.2", "IV0", false);
// NOTES when adding a new version:
// Update the default version in @ClusterTest annotation to point to the latest version

View File

@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.common;
import java.util.Map;
public enum ShareVersion implements FeatureVersion {
// Version 0 does not enable share groups.
SV_0(0, MetadataVersion.MINIMUM_VERSION, Map.of()),
// Version 1 enables share groups (KIP-932).
// This is a preview in 4.1, and production-ready in 4.2.
SV_1(1, MetadataVersion.IBP_4_2_IV0, Map.of(MetadataVersion.FEATURE_NAME, MetadataVersion.IBP_4_1_IV1.featureLevel()));
public static final String FEATURE_NAME = "share.version";
public static final ShareVersion LATEST_PRODUCTION = SV_0;
private final short featureLevel;
private final MetadataVersion bootstrapMetadataVersion;
private final Map<String, Short> dependencies;
ShareVersion(
int featureLevel,
MetadataVersion bootstrapMetadataVersion,
Map<String, Short> dependencies
) {
this.featureLevel = (short) featureLevel;
this.bootstrapMetadataVersion = bootstrapMetadataVersion;
this.dependencies = dependencies;
}
@Override
public short featureLevel() {
return featureLevel;
}
@Override
public String featureName() {
return FEATURE_NAME;
}
@Override
public MetadataVersion bootstrapMetadataVersion() {
return bootstrapMetadataVersion;
}
@Override
public Map<String, Short> dependencies() {
return dependencies;
}
public boolean supportsShareGroups() {
return featureLevel >= SV_1.featureLevel;
}
public static ShareVersion fromFeatureLevel(short version) {
switch (version) {
case 0:
return SV_0;
case 1:
return SV_1;
default:
throw new RuntimeException("Unknown share feature level: " + (int) version);
}
}
}

View File

@ -29,6 +29,7 @@ import java.util.Map;
import static org.apache.kafka.server.common.Feature.ELIGIBLE_LEADER_REPLICAS_VERSION;
import static org.apache.kafka.server.common.Feature.GROUP_VERSION;
import static org.apache.kafka.server.common.Feature.SHARE_VERSION;
import static org.apache.kafka.server.common.Feature.TRANSACTION_VERSION;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@ -97,6 +98,7 @@ public class BrokerFeaturesTest {
TRANSACTION_VERSION.featureName(), TRANSACTION_VERSION.latestTesting(),
GROUP_VERSION.featureName(), GROUP_VERSION.latestTesting(),
ELIGIBLE_LEADER_REPLICAS_VERSION.featureName(), ELIGIBLE_LEADER_REPLICAS_VERSION.latestTesting(),
SHARE_VERSION.featureName(), SHARE_VERSION.latestTesting(),
"kraft.version", (short) 0,
"test_feature_1", (short) 4,
"test_feature_2", (short) 3,

View File

@ -52,7 +52,7 @@ public @interface ClusterTest {
String brokerListener() default DEFAULT_BROKER_LISTENER_NAME;
SecurityProtocol controllerSecurityProtocol() default SecurityProtocol.PLAINTEXT;
String controllerListener() default DEFAULT_CONTROLLER_LISTENER_NAME;
MetadataVersion metadataVersion() default MetadataVersion.IBP_4_1_IV0;
MetadataVersion metadataVersion() default MetadataVersion.IBP_4_2_IV0;
ClusterConfigProperty[] serverProperties() default {};
// users can add tags that they want to display in test
String[] tags() default {};

View File

@ -64,9 +64,11 @@ public class FeatureCommandTest {
assertEquals("Feature: kraft.version\tSupportedMinVersion: 0\t" +
"SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t", outputWithoutEpoch(features.get(2)));
assertEquals("Feature: metadata.version\tSupportedMinVersion: 3.3-IV3\t" +
"SupportedMaxVersion: 4.1-IV0\tFinalizedVersionLevel: 3.3-IV3\t", outputWithoutEpoch(features.get(3)));
"SupportedMaxVersion: 4.2-IV0\tFinalizedVersionLevel: 3.3-IV3\t", outputWithoutEpoch(features.get(3)));
assertEquals("Feature: share.version\tSupportedMinVersion: 0\t" +
"SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t", outputWithoutEpoch(features.get(4)));
assertEquals("Feature: transaction.version\tSupportedMinVersion: 0\t" +
"SupportedMaxVersion: 2\tFinalizedVersionLevel: 0\t", outputWithoutEpoch(features.get(4)));
"SupportedMaxVersion: 2\tFinalizedVersionLevel: 0\t", outputWithoutEpoch(features.get(5)));
}
// Use the first MetadataVersion that supports KIP-919
@ -86,9 +88,11 @@ public class FeatureCommandTest {
assertEquals("Feature: kraft.version\tSupportedMinVersion: 0\t" +
"SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t", outputWithoutEpoch(features.get(2)));
assertEquals("Feature: metadata.version\tSupportedMinVersion: 3.3-IV3\t" +
"SupportedMaxVersion: 4.1-IV0\tFinalizedVersionLevel: 3.7-IV0\t", outputWithoutEpoch(features.get(3)));
"SupportedMaxVersion: 4.2-IV0\tFinalizedVersionLevel: 3.7-IV0\t", outputWithoutEpoch(features.get(3)));
assertEquals("Feature: share.version\tSupportedMinVersion: 0\t" +
"SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t", outputWithoutEpoch(features.get(4)));
assertEquals("Feature: transaction.version\tSupportedMinVersion: 0\t" +
"SupportedMaxVersion: 2\tFinalizedVersionLevel: 0\t", outputWithoutEpoch(features.get(4)));
"SupportedMaxVersion: 2\tFinalizedVersionLevel: 0\t", outputWithoutEpoch(features.get(5)));
}
@ClusterTest(types = {Type.KRAFT}, metadataVersion = MetadataVersion.IBP_3_3_IV3)
@ -114,7 +118,7 @@ public class FeatureCommandTest {
);
// Change expected message to reflect possible MetadataVersion range 1-N (N increases when adding a new version)
assertEquals("Could not disable metadata.version. The update failed for all features since the following " +
"feature had an error: Invalid update version 0 for feature metadata.version. Local controller 3000 only supports versions 7-26", commandOutput);
"feature had an error: Invalid update version 0 for feature metadata.version. Local controller 3000 only supports versions 7-28", commandOutput);
commandOutput = ToolsTestUtils.captureStandardOut(() ->
assertEquals(1, FeatureCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(),
@ -177,6 +181,7 @@ public class FeatureCommandTest {
"group.version was downgraded to 0.\n" +
"kraft.version was downgraded to 0.\n" +
"metadata.version was downgraded to 18.\n" +
"share.version was downgraded to 0.\n" +
"transaction.version was downgraded to 0.", commandOutput);
}