KAFKA-19173: Add `Feature` for "streams" group (#19509)

Add new StreamsGroupFeature, disabled by default,  and add "streams" as
default value to `group.coordinator.rebalance.protocols`.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, David Jacot
<david.jacot@gmail.com>, Lucas Brutschy <lbrutschy@confluent.io>,
Justine Olshan <jolshan@confluent.io>, Andrew Schofield
<aschofield@confluent.io>, Jun Rao <jun@confluent.io>
This commit is contained in:
Matthias J. Sax 2025-04-29 22:51:10 -07:00 committed by GitHub
parent 81881dee83
commit b0a26bc2f4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
19 changed files with 217 additions and 70 deletions

View File

@ -62,7 +62,7 @@ import org.apache.kafka.coordinator.share.ShareCoordinator
import org.apache.kafka.metadata.{ConfigRepository, MetadataCache} import org.apache.kafka.metadata.{ConfigRepository, MetadataCache}
import org.apache.kafka.server.{ApiVersionManager, ClientMetricsManager, DelegationTokenManager, ProcessRole} import org.apache.kafka.server.{ApiVersionManager, ClientMetricsManager, DelegationTokenManager, ProcessRole}
import org.apache.kafka.server.authorizer._ import org.apache.kafka.server.authorizer._
import org.apache.kafka.server.common.{GroupVersion, RequestLocal, TransactionVersion} import org.apache.kafka.server.common.{GroupVersion, RequestLocal, StreamsVersion, TransactionVersion}
import org.apache.kafka.server.config.DelegationTokenManagerConfigs import org.apache.kafka.server.config.DelegationTokenManagerConfigs
import org.apache.kafka.server.share.context.ShareFetchContext import org.apache.kafka.server.share.context.ShareFetchContext
import org.apache.kafka.server.share.{ErroneousAndValidPartitionData, SharePartitionKey} import org.apache.kafka.server.share.{ErroneousAndValidPartitionData, SharePartitionKey}
@ -2649,11 +2649,15 @@ class KafkaApis(val requestChannel: RequestChannel,
} }
} }
} }
}
private def streamsVersion(): StreamsVersion = {
StreamsVersion.fromFeatureLevel(metadataCache.features.finalizedFeatures.getOrDefault(StreamsVersion.FEATURE_NAME, 0.toShort))
} }
private def isStreamsGroupProtocolEnabled: Boolean = { private def isStreamsGroupProtocolEnabled: Boolean = {
config.groupCoordinatorRebalanceProtocols.contains(Group.GroupType.STREAMS) config.groupCoordinatorRebalanceProtocols.contains(Group.GroupType.STREAMS) &&
streamsVersion().streamsGroupSupported
} }
def handleStreamsGroupHeartbeat(request: RequestChannel.Request): CompletableFuture[Unit] = { def handleStreamsGroupHeartbeat(request: RequestChannel.Request): CompletableFuture[Unit] = {

View File

@ -386,10 +386,6 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
if (!protocols.contains(GroupType.CLASSIC)) { if (!protocols.contains(GroupType.CLASSIC)) {
throw new ConfigException(s"Disabling the '${GroupType.CLASSIC}' protocol is not supported.") throw new ConfigException(s"Disabling the '${GroupType.CLASSIC}' protocol is not supported.")
} }
if (protocols.contains(GroupType.STREAMS)) {
warn(s"Streams groups and the new '${GroupType.STREAMS}' rebalance protocol are enabled. " +
"This is part of the early access of KIP-1071 and MUST NOT be used in production.")
}
protocols protocols
} }

View File

@ -25,7 +25,7 @@ import org.apache.kafka.common.protocol.ApiKeys
import org.apache.kafka.common.requests.{ApiVersionsRequest, ApiVersionsResponse, RequestUtils} import org.apache.kafka.common.requests.{ApiVersionsRequest, ApiVersionsResponse, RequestUtils}
import org.apache.kafka.common.test.ClusterInstance import org.apache.kafka.common.test.ClusterInstance
import org.apache.kafka.common.utils.Utils import org.apache.kafka.common.utils.Utils
import org.apache.kafka.server.common.{EligibleLeaderReplicasVersion, GroupVersion, MetadataVersion, ShareVersion, TransactionVersion} import org.apache.kafka.server.common.{EligibleLeaderReplicasVersion, GroupVersion, MetadataVersion, ShareVersion, StreamsVersion, TransactionVersion}
import org.apache.kafka.test.TestUtils import org.apache.kafka.test.TestUtils
import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Tag import org.junit.jupiter.api.Tag
@ -64,11 +64,11 @@ abstract class AbstractApiVersionsRequestTest(cluster: ClusterInstance) {
apiVersion: Short = ApiKeys.API_VERSIONS.latestVersion apiVersion: Short = ApiKeys.API_VERSIONS.latestVersion
): Unit = { ): Unit = {
if (apiVersion >= 3) { if (apiVersion >= 3) {
assertEquals(5, apiVersionsResponse.data().finalizedFeatures().size()) assertEquals(6, apiVersionsResponse.data().finalizedFeatures().size())
assertEquals(MetadataVersion.latestTesting().featureLevel(), apiVersionsResponse.data().finalizedFeatures().find(MetadataVersion.FEATURE_NAME).minVersionLevel()) assertEquals(MetadataVersion.latestTesting().featureLevel(), apiVersionsResponse.data().finalizedFeatures().find(MetadataVersion.FEATURE_NAME).minVersionLevel())
assertEquals(MetadataVersion.latestTesting().featureLevel(), apiVersionsResponse.data().finalizedFeatures().find(MetadataVersion.FEATURE_NAME).maxVersionLevel()) assertEquals(MetadataVersion.latestTesting().featureLevel(), apiVersionsResponse.data().finalizedFeatures().find(MetadataVersion.FEATURE_NAME).maxVersionLevel())
assertEquals(6, apiVersionsResponse.data().supportedFeatures().size()) assertEquals(7, apiVersionsResponse.data().supportedFeatures().size())
assertEquals(MetadataVersion.MINIMUM_VERSION.featureLevel(), apiVersionsResponse.data().supportedFeatures().find(MetadataVersion.FEATURE_NAME).minVersion()) assertEquals(MetadataVersion.MINIMUM_VERSION.featureLevel(), apiVersionsResponse.data().supportedFeatures().find(MetadataVersion.FEATURE_NAME).minVersion())
if (apiVersion < 4) { if (apiVersion < 4) {
assertEquals(1, apiVersionsResponse.data().supportedFeatures().find("kraft.version").minVersion()) assertEquals(1, apiVersionsResponse.data().supportedFeatures().find("kraft.version").minVersion())
@ -88,6 +88,9 @@ abstract class AbstractApiVersionsRequestTest(cluster: ClusterInstance) {
assertEquals(0, apiVersionsResponse.data().supportedFeatures().find(ShareVersion.FEATURE_NAME).minVersion()) assertEquals(0, apiVersionsResponse.data().supportedFeatures().find(ShareVersion.FEATURE_NAME).minVersion())
assertEquals(ShareVersion.SV_1.featureLevel(), apiVersionsResponse.data().supportedFeatures().find(ShareVersion.FEATURE_NAME).maxVersion()) assertEquals(ShareVersion.SV_1.featureLevel(), apiVersionsResponse.data().supportedFeatures().find(ShareVersion.FEATURE_NAME).maxVersion())
assertEquals(0, apiVersionsResponse.data().supportedFeatures().find(StreamsVersion.FEATURE_NAME).minVersion())
assertEquals(StreamsVersion.SV_1.featureLevel(), apiVersionsResponse.data().supportedFeatures().find(StreamsVersion.FEATURE_NAME).maxVersion())
} }
val expectedApis = if (cluster.controllerListenerName().toScala.contains(listenerName)) { val expectedApis = if (cluster.controllerListenerName().toScala.contains(listenerName)) {
ApiVersionsResponse.collectApis( ApiVersionsResponse.collectApis(

View File

@ -88,7 +88,7 @@ import org.apache.kafka.raft.QuorumConfig
import org.apache.kafka.security.authorizer.AclEntry import org.apache.kafka.security.authorizer.AclEntry
import org.apache.kafka.server.{ClientMetricsManager, SimpleApiVersionManager} import org.apache.kafka.server.{ClientMetricsManager, SimpleApiVersionManager}
import org.apache.kafka.server.authorizer.{Action, AuthorizationResult, Authorizer} import org.apache.kafka.server.authorizer.{Action, AuthorizationResult, Authorizer}
import org.apache.kafka.server.common.{FeatureVersion, FinalizedFeatures, GroupVersion, KRaftVersion, MetadataVersion, RequestLocal, TransactionVersion} import org.apache.kafka.server.common.{FeatureVersion, FinalizedFeatures, GroupVersion, KRaftVersion, MetadataVersion, RequestLocal, StreamsVersion, TransactionVersion}
import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs, ServerConfigs, ServerLogConfigs} import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs, ServerConfigs, ServerLogConfigs}
import org.apache.kafka.server.metrics.ClientMetricsTestUtils import org.apache.kafka.server.metrics.ClientMetricsTestUtils
import org.apache.kafka.server.share.{CachedSharePartition, ErroneousAndValidPartitionData, SharePartitionKey} import org.apache.kafka.server.share.{CachedSharePartition, ErroneousAndValidPartitionData, SharePartitionKey}
@ -10007,7 +10007,11 @@ class KafkaApisTest extends Logging {
@Test @Test
def testStreamsGroupHeartbeatRequest(): Unit = { def testStreamsGroupHeartbeatRequest(): Unit = {
val features = mock(classOf[FinalizedFeatures])
when(features.finalizedFeatures()).thenReturn(Collections.singletonMap(StreamsVersion.FEATURE_NAME, 1.toShort))
metadataCache = mock(classOf[KRaftMetadataCache]) metadataCache = mock(classOf[KRaftMetadataCache])
when(metadataCache.features()).thenReturn(features)
val streamsGroupHeartbeatRequest = new StreamsGroupHeartbeatRequestData().setGroupId("group") val streamsGroupHeartbeatRequest = new StreamsGroupHeartbeatRequestData().setGroupId("group")
@ -10018,9 +10022,7 @@ class KafkaApisTest extends Logging {
requestChannelRequest.context, requestChannelRequest.context,
streamsGroupHeartbeatRequest streamsGroupHeartbeatRequest
)).thenReturn(future) )).thenReturn(future)
kafkaApis = createKafkaApis( kafkaApis = createKafkaApis()
overrideProperties = Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> "classic,streams")
)
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching) kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
val streamsGroupHeartbeatResponse = new StreamsGroupHeartbeatResponseData() val streamsGroupHeartbeatResponse = new StreamsGroupHeartbeatResponseData()
@ -10033,7 +10035,12 @@ class KafkaApisTest extends Logging {
@Test @Test
def testStreamsGroupHeartbeatRequestWithAuthorizedTopology(): Unit = { def testStreamsGroupHeartbeatRequestWithAuthorizedTopology(): Unit = {
val features = mock(classOf[FinalizedFeatures])
when(features.finalizedFeatures()).thenReturn(Collections.singletonMap(StreamsVersion.FEATURE_NAME, 1.toShort))
metadataCache = mock(classOf[KRaftMetadataCache]) metadataCache = mock(classOf[KRaftMetadataCache])
when(metadataCache.features()).thenReturn(features)
val groupId = "group" val groupId = "group"
val fooTopicName = "foo" val fooTopicName = "foo"
val barTopicName = "bar" val barTopicName = "bar"
@ -10084,8 +10091,7 @@ class KafkaApisTest extends Logging {
streamsGroupHeartbeatRequest streamsGroupHeartbeatRequest
)).thenReturn(future) )).thenReturn(future)
kafkaApis = createKafkaApis( kafkaApis = createKafkaApis(
authorizer = Some(authorizer), authorizer = Some(authorizer)
overrideProperties = Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> "classic,streams")
) )
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching) kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
@ -10099,7 +10105,11 @@ class KafkaApisTest extends Logging {
@Test @Test
def testStreamsGroupHeartbeatRequestFutureFailed(): Unit = { def testStreamsGroupHeartbeatRequestFutureFailed(): Unit = {
val features = mock(classOf[FinalizedFeatures])
when(features.finalizedFeatures()).thenReturn(Collections.singletonMap(StreamsVersion.FEATURE_NAME, 1.toShort))
metadataCache = mock(classOf[KRaftMetadataCache]) metadataCache = mock(classOf[KRaftMetadataCache])
when(metadataCache.features()).thenReturn(features)
val streamsGroupHeartbeatRequest = new StreamsGroupHeartbeatRequestData().setGroupId("group") val streamsGroupHeartbeatRequest = new StreamsGroupHeartbeatRequestData().setGroupId("group")
@ -10110,9 +10120,7 @@ class KafkaApisTest extends Logging {
requestChannelRequest.context, requestChannelRequest.context,
streamsGroupHeartbeatRequest streamsGroupHeartbeatRequest
)).thenReturn(future) )).thenReturn(future)
kafkaApis = createKafkaApis( kafkaApis = createKafkaApis()
overrideProperties = Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> "classic,streams")
)
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching) kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
future.completeExceptionally(Errors.FENCED_MEMBER_EPOCH.exception) future.completeExceptionally(Errors.FENCED_MEMBER_EPOCH.exception)
@ -10122,7 +10130,11 @@ class KafkaApisTest extends Logging {
@Test @Test
def testStreamsGroupHeartbeatRequestGroupAuthorizationFailed(): Unit = { def testStreamsGroupHeartbeatRequestGroupAuthorizationFailed(): Unit = {
val features = mock(classOf[FinalizedFeatures])
when(features.finalizedFeatures()).thenReturn(Collections.singletonMap(StreamsVersion.FEATURE_NAME, 1.toShort))
metadataCache = mock(classOf[KRaftMetadataCache]) metadataCache = mock(classOf[KRaftMetadataCache])
when(metadataCache.features()).thenReturn(features)
val streamsGroupHeartbeatRequest = new StreamsGroupHeartbeatRequestData().setGroupId("group") val streamsGroupHeartbeatRequest = new StreamsGroupHeartbeatRequestData().setGroupId("group")
@ -10132,8 +10144,7 @@ class KafkaApisTest extends Logging {
when(authorizer.authorize(any[RequestContext], any[util.List[Action]])) when(authorizer.authorize(any[RequestContext], any[util.List[Action]]))
.thenReturn(Seq(AuthorizationResult.DENIED).asJava) .thenReturn(Seq(AuthorizationResult.DENIED).asJava)
kafkaApis = createKafkaApis( kafkaApis = createKafkaApis(
authorizer = Some(authorizer), authorizer = Some(authorizer)
overrideProperties = Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> "classic,streams")
) )
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching) kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
@ -10143,7 +10154,12 @@ class KafkaApisTest extends Logging {
@Test @Test
def testStreamsGroupHeartbeatRequestTopicAuthorizationFailed(): Unit = { def testStreamsGroupHeartbeatRequestTopicAuthorizationFailed(): Unit = {
val features = mock(classOf[FinalizedFeatures])
when(features.finalizedFeatures()).thenReturn(Collections.singletonMap(StreamsVersion.FEATURE_NAME, 1.toShort))
metadataCache = mock(classOf[KRaftMetadataCache]) metadataCache = mock(classOf[KRaftMetadataCache])
when(metadataCache.features()).thenReturn(features)
val groupId = "group" val groupId = "group"
val fooTopicName = "foo" val fooTopicName = "foo"
val barTopicName = "bar" val barTopicName = "bar"
@ -10184,8 +10200,7 @@ class KafkaApisTest extends Logging {
} }
kafkaApis = createKafkaApis( kafkaApis = createKafkaApis(
authorizer = Some(authorizer), authorizer = Some(authorizer)
overrideProperties = Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> "classic,streams")
) )
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching) kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
@ -10194,7 +10209,7 @@ class KafkaApisTest extends Logging {
} }
@Test @Test
def testStreamsGroupHeartbeatRequestProtocolDisabled(): Unit = { def testStreamsGroupHeartbeatRequestProtocolDisabledViaConfig(): Unit = {
metadataCache = mock(classOf[KRaftMetadataCache]) metadataCache = mock(classOf[KRaftMetadataCache])
val streamsGroupHeartbeatRequest = new StreamsGroupHeartbeatRequestData().setGroupId("group") val streamsGroupHeartbeatRequest = new StreamsGroupHeartbeatRequestData().setGroupId("group")
@ -10211,8 +10226,31 @@ class KafkaApisTest extends Logging {
} }
@Test @Test
def testStreamsGroupHeartbeatRequestInvalidTopicNames(): Unit = { def testStreamsGroupHeartbeatRequestProtocolDisabledViaFeature(): Unit = {
val features = mock(classOf[FinalizedFeatures])
when(features.finalizedFeatures()).thenReturn(Collections.singletonMap(StreamsVersion.FEATURE_NAME, 0.toShort))
metadataCache = mock(classOf[KRaftMetadataCache]) metadataCache = mock(classOf[KRaftMetadataCache])
when(metadataCache.features()).thenReturn(features)
val streamsGroupHeartbeatRequest = new StreamsGroupHeartbeatRequestData().setGroupId("group")
val requestChannelRequest = buildRequest(new StreamsGroupHeartbeatRequest.Builder(streamsGroupHeartbeatRequest, true).build())
kafkaApis = createKafkaApis()
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
val response = verifyNoThrottling[StreamsGroupHeartbeatResponse](requestChannelRequest)
assertEquals(Errors.UNSUPPORTED_VERSION.code, response.data.errorCode)
}
@Test
def testStreamsGroupHeartbeatRequestInvalidTopicNames(): Unit = {
val features = mock(classOf[FinalizedFeatures])
when(features.finalizedFeatures()).thenReturn(Collections.singletonMap(StreamsVersion.FEATURE_NAME, 1.toShort))
metadataCache = mock(classOf[KRaftMetadataCache])
when(metadataCache.features()).thenReturn(features)
val streamsGroupHeartbeatRequest = new StreamsGroupHeartbeatRequestData().setGroupId("group").setTopology( val streamsGroupHeartbeatRequest = new StreamsGroupHeartbeatRequestData().setGroupId("group").setTopology(
new StreamsGroupHeartbeatRequestData.Topology() new StreamsGroupHeartbeatRequestData.Topology()
@ -10229,9 +10267,7 @@ class KafkaApisTest extends Logging {
val requestChannelRequest = buildRequest(new StreamsGroupHeartbeatRequest.Builder(streamsGroupHeartbeatRequest, true).build()) val requestChannelRequest = buildRequest(new StreamsGroupHeartbeatRequest.Builder(streamsGroupHeartbeatRequest, true).build())
kafkaApis = createKafkaApis( kafkaApis = createKafkaApis()
overrideProperties = Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> "classic,streams")
)
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching) kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
val response = verifyNoThrottling[StreamsGroupHeartbeatResponse](requestChannelRequest) val response = verifyNoThrottling[StreamsGroupHeartbeatResponse](requestChannelRequest)
@ -10241,7 +10277,11 @@ class KafkaApisTest extends Logging {
@Test @Test
def testStreamsGroupHeartbeatRequestInternalTopicNames(): Unit = { def testStreamsGroupHeartbeatRequestInternalTopicNames(): Unit = {
val features = mock(classOf[FinalizedFeatures])
when(features.finalizedFeatures()).thenReturn(Collections.singletonMap(StreamsVersion.FEATURE_NAME, 1.toShort))
metadataCache = mock(classOf[KRaftMetadataCache]) metadataCache = mock(classOf[KRaftMetadataCache])
when(metadataCache.features()).thenReturn(features)
val streamsGroupHeartbeatRequest = new StreamsGroupHeartbeatRequestData().setGroupId("group").setTopology( val streamsGroupHeartbeatRequest = new StreamsGroupHeartbeatRequestData().setGroupId("group").setTopology(
new StreamsGroupHeartbeatRequestData.Topology() new StreamsGroupHeartbeatRequestData.Topology()
@ -10257,9 +10297,7 @@ class KafkaApisTest extends Logging {
val requestChannelRequest = buildRequest(new StreamsGroupHeartbeatRequest.Builder(streamsGroupHeartbeatRequest, true).build()) val requestChannelRequest = buildRequest(new StreamsGroupHeartbeatRequest.Builder(streamsGroupHeartbeatRequest, true).build())
kafkaApis = createKafkaApis( kafkaApis = createKafkaApis()
overrideProperties = Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> "classic,streams")
)
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching) kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
val response = verifyNoThrottling[StreamsGroupHeartbeatResponse](requestChannelRequest) val response = verifyNoThrottling[StreamsGroupHeartbeatResponse](requestChannelRequest)
@ -10269,7 +10307,11 @@ class KafkaApisTest extends Logging {
@Test @Test
def testStreamsGroupHeartbeatRequestWithInternalTopicsToCreate(): Unit = { def testStreamsGroupHeartbeatRequestWithInternalTopicsToCreate(): Unit = {
val features = mock(classOf[FinalizedFeatures])
when(features.finalizedFeatures()).thenReturn(Collections.singletonMap(StreamsVersion.FEATURE_NAME, 1.toShort))
metadataCache = mock(classOf[KRaftMetadataCache]) metadataCache = mock(classOf[KRaftMetadataCache])
when(metadataCache.features()).thenReturn(features)
val streamsGroupHeartbeatRequest = new StreamsGroupHeartbeatRequestData().setGroupId("group"); val streamsGroupHeartbeatRequest = new StreamsGroupHeartbeatRequestData().setGroupId("group");
@ -10281,9 +10323,7 @@ class KafkaApisTest extends Logging {
streamsGroupHeartbeatRequest streamsGroupHeartbeatRequest
)).thenReturn(future) )).thenReturn(future)
kafkaApis = createKafkaApis( kafkaApis = createKafkaApis()
overrideProperties = Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> "classic,streams")
)
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching) kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
val missingTopics = Map("test" -> new CreatableTopic()) val missingTopics = Map("test" -> new CreatableTopic())
@ -10298,7 +10338,11 @@ class KafkaApisTest extends Logging {
@Test @Test
def testStreamsGroupHeartbeatRequestWithInternalTopicsToCreateMissingCreateACL(): Unit = { def testStreamsGroupHeartbeatRequestWithInternalTopicsToCreateMissingCreateACL(): Unit = {
val features = mock(classOf[FinalizedFeatures])
when(features.finalizedFeatures()).thenReturn(Collections.singletonMap(StreamsVersion.FEATURE_NAME, 1.toShort))
metadataCache = mock(classOf[KRaftMetadataCache]) metadataCache = mock(classOf[KRaftMetadataCache])
when(metadataCache.features()).thenReturn(features)
val streamsGroupHeartbeatRequest = new StreamsGroupHeartbeatRequestData().setGroupId("group"); val streamsGroupHeartbeatRequest = new StreamsGroupHeartbeatRequestData().setGroupId("group");
@ -10324,8 +10368,7 @@ class KafkaApisTest extends Logging {
}.asJava }.asJava
}) })
kafkaApis = createKafkaApis( kafkaApis = createKafkaApis(
authorizer = Some(authorizer), authorizer = Some(authorizer)
overrideProperties = Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> "classic,streams")
) )
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching) kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
@ -10519,7 +10562,12 @@ class KafkaApisTest extends Logging {
@ParameterizedTest @ParameterizedTest
@ValueSource(booleans = Array(true, false)) @ValueSource(booleans = Array(true, false))
def testStreamsGroupDescribe(includeAuthorizedOperations: Boolean): Unit = { def testStreamsGroupDescribe(includeAuthorizedOperations: Boolean): Unit = {
val features = mock(classOf[FinalizedFeatures])
when(features.finalizedFeatures()).thenReturn(Collections.singletonMap(StreamsVersion.FEATURE_NAME, 1.toShort))
metadataCache = mock(classOf[KRaftMetadataCache]) metadataCache = mock(classOf[KRaftMetadataCache])
when(metadataCache.features()).thenReturn(features)
val fooTopicName = "foo" val fooTopicName = "foo"
val barTopicName = "bar" val barTopicName = "bar"
@ -10534,9 +10582,7 @@ class KafkaApisTest extends Logging {
any[RequestContext], any[RequestContext],
any[util.List[String]] any[util.List[String]]
)).thenReturn(future) )).thenReturn(future)
kafkaApis = createKafkaApis( kafkaApis = createKafkaApis()
overrideProperties = Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> "classic,streams")
)
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching) kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
val subtopology0 = new StreamsGroupDescribeResponseData.Subtopology() val subtopology0 = new StreamsGroupDescribeResponseData.Subtopology()
@ -10627,7 +10673,11 @@ class KafkaApisTest extends Logging {
@Test @Test
def testStreamsGroupDescribeAuthorizationFailed(): Unit = { def testStreamsGroupDescribeAuthorizationFailed(): Unit = {
val features = mock(classOf[FinalizedFeatures])
when(features.finalizedFeatures()).thenReturn(Collections.singletonMap(StreamsVersion.FEATURE_NAME, 1.toShort))
metadataCache = mock(classOf[KRaftMetadataCache]) metadataCache = mock(classOf[KRaftMetadataCache])
when(metadataCache.features()).thenReturn(features)
val streamsGroupDescribeRequestData = new StreamsGroupDescribeRequestData() val streamsGroupDescribeRequestData = new StreamsGroupDescribeRequestData()
streamsGroupDescribeRequestData.groupIds.add("group-id") streamsGroupDescribeRequestData.groupIds.add("group-id")
@ -10644,8 +10694,7 @@ class KafkaApisTest extends Logging {
)).thenReturn(future) )).thenReturn(future)
future.complete(List().asJava) future.complete(List().asJava)
kafkaApis = createKafkaApis( kafkaApis = createKafkaApis(
authorizer = Some(authorizer), authorizer = Some(authorizer)
overrideProperties = Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> "classic,streams")
) )
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching) kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
@ -10655,7 +10704,11 @@ class KafkaApisTest extends Logging {
@Test @Test
def testStreamsGroupDescribeFutureFailed(): Unit = { def testStreamsGroupDescribeFutureFailed(): Unit = {
val features = mock(classOf[FinalizedFeatures])
when(features.finalizedFeatures()).thenReturn(Collections.singletonMap(StreamsVersion.FEATURE_NAME, 1.toShort))
metadataCache = mock(classOf[KRaftMetadataCache]) metadataCache = mock(classOf[KRaftMetadataCache])
when(metadataCache.features()).thenReturn(features)
val streamsGroupDescribeRequestData = new StreamsGroupDescribeRequestData() val streamsGroupDescribeRequestData = new StreamsGroupDescribeRequestData()
streamsGroupDescribeRequestData.groupIds.add("group-id") streamsGroupDescribeRequestData.groupIds.add("group-id")
@ -10666,9 +10719,7 @@ class KafkaApisTest extends Logging {
any[RequestContext], any[RequestContext],
any[util.List[String]] any[util.List[String]]
)).thenReturn(future) )).thenReturn(future)
kafkaApis = createKafkaApis( kafkaApis = createKafkaApis()
overrideProperties = Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> "classic,streams")
)
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching) kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
future.completeExceptionally(Errors.FENCED_MEMBER_EPOCH.exception) future.completeExceptionally(Errors.FENCED_MEMBER_EPOCH.exception)
@ -10683,7 +10734,11 @@ class KafkaApisTest extends Logging {
val barTopicName = "bar" val barTopicName = "bar"
val errorMessage = "The described group uses topics that the client is not authorized to describe." val errorMessage = "The described group uses topics that the client is not authorized to describe."
val features = mock(classOf[FinalizedFeatures])
when(features.finalizedFeatures()).thenReturn(Collections.singletonMap(StreamsVersion.FEATURE_NAME, 1.toShort))
metadataCache = mock(classOf[KRaftMetadataCache]) metadataCache = mock(classOf[KRaftMetadataCache])
when(metadataCache.features()).thenReturn(features)
val groupIds = List("group-id-0", "group-id-1", "group-id-2").asJava val groupIds = List("group-id-0", "group-id-1", "group-id-2").asJava
val streamsGroupDescribeRequestData = new StreamsGroupDescribeRequestData() val streamsGroupDescribeRequestData = new StreamsGroupDescribeRequestData()
@ -10715,8 +10770,7 @@ class KafkaApisTest extends Logging {
any[util.List[String]] any[util.List[String]]
)).thenReturn(future) )).thenReturn(future)
kafkaApis = createKafkaApis( kafkaApis = createKafkaApis(
authorizer = Some(authorizer), authorizer = Some(authorizer)
overrideProperties = Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> "classic,streams")
) )
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching) kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)

View File

@ -325,7 +325,7 @@ Found problem:
properties.putAll(defaultStaticQuorumProperties) properties.putAll(defaultStaticQuorumProperties)
properties.setProperty("log.dirs", availableDirs.mkString(",")) properties.setProperty("log.dirs", availableDirs.mkString(","))
assertEquals("Unsupported feature: non.existent.feature. Supported features are: " + assertEquals("Unsupported feature: non.existent.feature. Supported features are: " +
"eligible.leader.replicas.version, group.version, kraft.version, share.version, transaction.version", "eligible.leader.replicas.version, group.version, kraft.version, share.version, streams.version, transaction.version",
assertThrows(classOf[FormatterException], () => assertThrows(classOf[FormatterException], () =>
runFormatCommand(new ByteArrayOutputStream(), properties, runFormatCommand(new ByteArrayOutputStream(), properties,
Seq("--feature", "non.existent.feature=20"))).getMessage) Seq("--feature", "non.existent.feature=20"))).getMessage)

View File

@ -63,7 +63,8 @@ public class GroupCoordinatorConfig {
"The " + Group.GroupType.STREAMS + " rebalance protocol is in early access and therefore must not be used in production."; "The " + Group.GroupType.STREAMS + " rebalance protocol is in early access and therefore must not be used in production.";
public static final List<String> GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DEFAULT = List.of( public static final List<String> GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DEFAULT = List.of(
Group.GroupType.CLASSIC.toString(), Group.GroupType.CLASSIC.toString(),
Group.GroupType.CONSUMER.toString()); Group.GroupType.CONSUMER.toString(),
Group.GroupType.STREAMS.toString());
public static final String GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG = "group.coordinator.append.linger.ms"; public static final String GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG = "group.coordinator.append.linger.ms";
public static final String GROUP_COORDINATOR_APPEND_LINGER_MS_DOC = "The duration in milliseconds that the coordinator will " + public static final String GROUP_COORDINATOR_APPEND_LINGER_MS_DOC = "The duration in milliseconds that the coordinator will " +
"wait for writes to accumulate before flushing them to disk. Increasing this value improves write efficiency and batch size, " + "wait for writes to accumulate before flushing them to disk. Increasing this value improves write efficiency and batch size, " +

View File

@ -396,7 +396,7 @@ public class FeatureControlManagerTest {
build(); build();
manager.replay(new FeatureLevelRecord().setName(MetadataVersion.FEATURE_NAME).setFeatureLevel(MetadataVersion.MINIMUM_VERSION.featureLevel())); manager.replay(new FeatureLevelRecord().setName(MetadataVersion.FEATURE_NAME).setFeatureLevel(MetadataVersion.MINIMUM_VERSION.featureLevel()));
assertEquals(ControllerResult.of(List.of(), new ApiError(Errors.INVALID_UPDATE_VERSION, assertEquals(ControllerResult.of(List.of(), new ApiError(Errors.INVALID_UPDATE_VERSION,
"Invalid update version 6 for feature metadata.version. Local controller 0 only supports versions 7-28")), "Invalid update version 6 for feature metadata.version. Local controller 0 only supports versions 7-29")),
manager.updateFeatures( manager.updateFeatures(
Map.of(MetadataVersion.FEATURE_NAME, MetadataVersionTestUtils.IBP_3_3_IV2_FEATURE_LEVEL), Map.of(MetadataVersion.FEATURE_NAME, MetadataVersionTestUtils.IBP_3_3_IV2_FEATURE_LEVEL),
Map.of(MetadataVersion.FEATURE_NAME, FeatureUpdate.UpgradeType.UNSAFE_DOWNGRADE), Map.of(MetadataVersion.FEATURE_NAME, FeatureUpdate.UpgradeType.UNSAFE_DOWNGRADE),

View File

@ -368,7 +368,7 @@ public class FormatterTest {
formatter1.formatter.setFeatureLevel("nonexistent.feature", (short) 1); formatter1.formatter.setFeatureLevel("nonexistent.feature", (short) 1);
assertEquals("Unsupported feature: nonexistent.feature. Supported features " + assertEquals("Unsupported feature: nonexistent.feature. Supported features " +
"are: eligible.leader.replicas.version, group.version, kraft.version, " + "are: eligible.leader.replicas.version, group.version, kraft.version, " +
"share.version, test.feature.version, transaction.version", "share.version, streams.version, test.feature.version, transaction.version",
assertThrows(FormatterException.class, assertThrows(FormatterException.class,
() -> formatter1.formatter.run()). () -> formatter1.formatter.run()).
getMessage()); getMessage());

View File

@ -48,6 +48,7 @@ public enum Feature {
GROUP_VERSION(GroupVersion.FEATURE_NAME, GroupVersion.values(), GroupVersion.LATEST_PRODUCTION), GROUP_VERSION(GroupVersion.FEATURE_NAME, GroupVersion.values(), GroupVersion.LATEST_PRODUCTION),
ELIGIBLE_LEADER_REPLICAS_VERSION(EligibleLeaderReplicasVersion.FEATURE_NAME, EligibleLeaderReplicasVersion.values(), EligibleLeaderReplicasVersion.LATEST_PRODUCTION), ELIGIBLE_LEADER_REPLICAS_VERSION(EligibleLeaderReplicasVersion.FEATURE_NAME, EligibleLeaderReplicasVersion.values(), EligibleLeaderReplicasVersion.LATEST_PRODUCTION),
SHARE_VERSION(ShareVersion.FEATURE_NAME, ShareVersion.values(), ShareVersion.LATEST_PRODUCTION), SHARE_VERSION(ShareVersion.FEATURE_NAME, ShareVersion.values(), ShareVersion.LATEST_PRODUCTION),
STREAMS_VERSION(StreamsVersion.FEATURE_NAME, StreamsVersion.values(), StreamsVersion.LATEST_PRODUCTION),
/** /**
* Features defined only for unit tests and are not used in production. * Features defined only for unit tests and are not used in production.

View File

@ -127,7 +127,15 @@ public enum MetadataVersion {
// *** SHARE GROUPS BECOME PRODUCTION-READY IN THE FUTURE. ITS DEFINITION ALLOWS A SHARE *** // *** SHARE GROUPS BECOME PRODUCTION-READY IN THE FUTURE. ITS DEFINITION ALLOWS A SHARE ***
// *** GROUPS FEATURE TO BE DEFINED IN 4.1 BUT TURNED OFF BY DEFAULT, ABLE TO BE TURNED ON *** // *** GROUPS FEATURE TO BE DEFINED IN 4.1 BUT TURNED OFF BY DEFAULT, ABLE TO BE TURNED ON ***
// *** DYNAMICALLY TO TRY OUT THE PREVIEW CAPABILITY. *** // *** DYNAMICALLY TO TRY OUT THE PREVIEW CAPABILITY. ***
IBP_4_2_IV0(28, "4.2", "IV0", false); IBP_4_2_IV0(28, "4.2", "IV0", false),
// Enables "streams" groups by default for new clusters (KIP-1071).
//
// *** THIS IS A PLACEHOLDER UNSTABLE VERSION WHICH IS USED TO DEFINE THE POINT AT WHICH ***
// *** STREAMS GROUPS BECOME PRODUCTION-READY IN THE FUTURE. ITS DEFINITION ALLOWS A STREAMS ***
// *** GROUPS FEATURE TO BE DEFINED IN 4.1 BUT TURNED OFF BY DEFAULT, ABLE TO BE TURNED ON ***
// *** DYNAMICALLY TO TRY OUT THE EARLY ACCESS CAPABILITY. ***
IBP_4_2_IV1(29, "4.2", "IV1", false);
// NOTES when adding a new version: // NOTES when adding a new version:
// Update the default version in @ClusterTest annotation to point to the latest version // Update the default version in @ClusterTest annotation to point to the latest version

View File

@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.common;
import java.util.Map;
public enum StreamsVersion implements FeatureVersion {
// Version 0 keeps "streams" groups disabled (KIP-1071).
SV_0(0, MetadataVersion.MINIMUM_VERSION, Map.of()),
// Version 1 enables "streams" groups (KIP-1071).
// Using metadata version IBP_4_2_IV1 disables it by default in AK 4.1 release, and enables it by default in AK 4.2 release.
// - in AK 4.1, this can be enabled as "early access [unstable]"
// - in AK 4.2, it is planned to go GA (cf `LATEST_PRODUCTION`)
SV_1(1, MetadataVersion.IBP_4_2_IV1, Map.of());
public static final String FEATURE_NAME = "streams.version";
// Mark "streams" group as unstable in AK 4.1 release
// Needs to be updated to SV_1 in AK 4.2, to mark as stable
public static final StreamsVersion LATEST_PRODUCTION = SV_0;
private final short featureLevel;
private final MetadataVersion bootstrapMetadataVersion;
private final Map<String, Short> dependencies;
StreamsVersion(
int featureLevel,
MetadataVersion bootstrapMetadataVersion,
Map<String, Short> dependencies
) {
this.featureLevel = (short) featureLevel;
this.bootstrapMetadataVersion = bootstrapMetadataVersion;
this.dependencies = dependencies;
}
@Override
public short featureLevel() {
return featureLevel;
}
@Override
public String featureName() {
return FEATURE_NAME;
}
@Override
public MetadataVersion bootstrapMetadataVersion() {
return bootstrapMetadataVersion;
}
@Override
public Map<String, Short> dependencies() {
return dependencies;
}
public boolean streamsGroupSupported() {
return featureLevel >= SV_1.featureLevel;
}
public static StreamsVersion fromFeatureLevel(short version) {
return switch (version) {
case 0 -> SV_0;
case 1 -> SV_1;
default -> throw new RuntimeException("Unknown streams feature level: " + (int) version);
};
}
}

View File

@ -30,6 +30,7 @@ import java.util.Map;
import static org.apache.kafka.server.common.Feature.ELIGIBLE_LEADER_REPLICAS_VERSION; import static org.apache.kafka.server.common.Feature.ELIGIBLE_LEADER_REPLICAS_VERSION;
import static org.apache.kafka.server.common.Feature.GROUP_VERSION; import static org.apache.kafka.server.common.Feature.GROUP_VERSION;
import static org.apache.kafka.server.common.Feature.SHARE_VERSION; import static org.apache.kafka.server.common.Feature.SHARE_VERSION;
import static org.apache.kafka.server.common.Feature.STREAMS_VERSION;
import static org.apache.kafka.server.common.Feature.TRANSACTION_VERSION; import static org.apache.kafka.server.common.Feature.TRANSACTION_VERSION;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertFalse;
@ -99,6 +100,7 @@ public class BrokerFeaturesTest {
GROUP_VERSION.featureName(), GROUP_VERSION.latestTesting(), GROUP_VERSION.featureName(), GROUP_VERSION.latestTesting(),
ELIGIBLE_LEADER_REPLICAS_VERSION.featureName(), ELIGIBLE_LEADER_REPLICAS_VERSION.latestTesting(), ELIGIBLE_LEADER_REPLICAS_VERSION.featureName(), ELIGIBLE_LEADER_REPLICAS_VERSION.latestTesting(),
SHARE_VERSION.featureName(), SHARE_VERSION.latestTesting(), SHARE_VERSION.featureName(), SHARE_VERSION.latestTesting(),
STREAMS_VERSION.featureName(), STREAMS_VERSION.latestTesting(),
"kraft.version", (short) 0, "kraft.version", (short) 0,
"test_feature_1", (short) 4, "test_feature_1", (short) 4,
"test_feature_2", (short) 3, "test_feature_2", (short) 3,

View File

@ -75,7 +75,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
@Timeout(600) @Timeout(600)
@Tag("integration") @Tag("integration")
public class InternalTopicIntegrationTest { public class InternalTopicIntegrationTest {
public static final EmbeddedKafkaCluster CLUSTER = EmbeddedKafkaCluster.withStreamsRebalanceProtocol(1); public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
@BeforeAll @BeforeAll
public static void startCluster() throws IOException, InterruptedException { public static void startCluster() throws IOException, InterruptedException {

View File

@ -51,7 +51,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
@Timeout(600) @Timeout(600)
@Tag("integration") @Tag("integration")
public class SmokeTestDriverIntegrationTest { public class SmokeTestDriverIntegrationTest {
public static final EmbeddedKafkaCluster CLUSTER = EmbeddedKafkaCluster.withStreamsRebalanceProtocol(3); public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(3);
public TestInfo testInfo; public TestInfo testInfo;
@BeforeAll @BeforeAll

View File

@ -57,7 +57,7 @@ import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
public class StandbyTaskCreationIntegrationTest { public class StandbyTaskCreationIntegrationTest {
private static final int NUM_BROKERS = 1; private static final int NUM_BROKERS = 1;
public static final EmbeddedKafkaCluster CLUSTER = EmbeddedKafkaCluster.withStreamsRebalanceProtocol(NUM_BROKERS); public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
private String safeTestName; private String safeTestName;

View File

@ -89,7 +89,7 @@ import static org.junit.jupiter.api.Assertions.fail;
public class StreamsUncaughtExceptionHandlerIntegrationTest { public class StreamsUncaughtExceptionHandlerIntegrationTest {
private static final long NOW = Instant.now().toEpochMilli(); private static final long NOW = Instant.now().toEpochMilli();
public static final EmbeddedKafkaCluster CLUSTER = EmbeddedKafkaCluster.withStreamsRebalanceProtocol(1); public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
@BeforeAll @BeforeAll
public static void startCluster() throws IOException { public static void startCluster() throws IOException {

View File

@ -143,16 +143,6 @@ public class EmbeddedKafkaCluster {
this.time = new MockTime(mockTimeMillisStart, mockTimeNanoStart); this.time = new MockTime(mockTimeMillisStart, mockTimeNanoStart);
} }
public static EmbeddedKafkaCluster withStreamsRebalanceProtocol(final int numBrokers) {
return withStreamsRebalanceProtocol(numBrokers, new Properties());
}
public static EmbeddedKafkaCluster withStreamsRebalanceProtocol(final int numBrokers, final Properties props) {
props.setProperty(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, "classic,consumer,streams");
props.setProperty(ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG, "true");
return new EmbeddedKafkaCluster(numBrokers, props);
}
public void start() { public void start() {
try { try {
cluster.format(); cluster.format();

View File

@ -52,7 +52,7 @@ public @interface ClusterTest {
String brokerListener() default DEFAULT_BROKER_LISTENER_NAME; String brokerListener() default DEFAULT_BROKER_LISTENER_NAME;
SecurityProtocol controllerSecurityProtocol() default SecurityProtocol.PLAINTEXT; SecurityProtocol controllerSecurityProtocol() default SecurityProtocol.PLAINTEXT;
String controllerListener() default DEFAULT_CONTROLLER_LISTENER_NAME; String controllerListener() default DEFAULT_CONTROLLER_LISTENER_NAME;
MetadataVersion metadataVersion() default MetadataVersion.IBP_4_2_IV0; MetadataVersion metadataVersion() default MetadataVersion.IBP_4_2_IV1;
ClusterConfigProperty[] serverProperties() default {}; ClusterConfigProperty[] serverProperties() default {};
// users can add tags that they want to display in test // users can add tags that they want to display in test
String[] tags() default {}; String[] tags() default {};

View File

@ -64,11 +64,13 @@ public class FeatureCommandTest {
assertEquals("Feature: kraft.version\tSupportedMinVersion: 0\t" + assertEquals("Feature: kraft.version\tSupportedMinVersion: 0\t" +
"SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t", outputWithoutEpoch(features.get(2))); "SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t", outputWithoutEpoch(features.get(2)));
assertEquals("Feature: metadata.version\tSupportedMinVersion: 3.3-IV3\t" + assertEquals("Feature: metadata.version\tSupportedMinVersion: 3.3-IV3\t" +
"SupportedMaxVersion: 4.2-IV0\tFinalizedVersionLevel: 3.3-IV3\t", outputWithoutEpoch(features.get(3))); "SupportedMaxVersion: 4.2-IV1\tFinalizedVersionLevel: 3.3-IV3\t", outputWithoutEpoch(features.get(3)));
assertEquals("Feature: share.version\tSupportedMinVersion: 0\t" + assertEquals("Feature: share.version\tSupportedMinVersion: 0\t" +
"SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t", outputWithoutEpoch(features.get(4))); "SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t", outputWithoutEpoch(features.get(4)));
assertEquals("Feature: streams.version\tSupportedMinVersion: 0\t" +
"SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t", outputWithoutEpoch(features.get(5)));
assertEquals("Feature: transaction.version\tSupportedMinVersion: 0\t" + assertEquals("Feature: transaction.version\tSupportedMinVersion: 0\t" +
"SupportedMaxVersion: 2\tFinalizedVersionLevel: 0\t", outputWithoutEpoch(features.get(5))); "SupportedMaxVersion: 2\tFinalizedVersionLevel: 0\t", outputWithoutEpoch(features.get(6)));
} }
// Use the first MetadataVersion that supports KIP-919 // Use the first MetadataVersion that supports KIP-919
@ -88,11 +90,13 @@ public class FeatureCommandTest {
assertEquals("Feature: kraft.version\tSupportedMinVersion: 0\t" + assertEquals("Feature: kraft.version\tSupportedMinVersion: 0\t" +
"SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t", outputWithoutEpoch(features.get(2))); "SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t", outputWithoutEpoch(features.get(2)));
assertEquals("Feature: metadata.version\tSupportedMinVersion: 3.3-IV3\t" + assertEquals("Feature: metadata.version\tSupportedMinVersion: 3.3-IV3\t" +
"SupportedMaxVersion: 4.2-IV0\tFinalizedVersionLevel: 3.7-IV0\t", outputWithoutEpoch(features.get(3))); "SupportedMaxVersion: 4.2-IV1\tFinalizedVersionLevel: 3.7-IV0\t", outputWithoutEpoch(features.get(3)));
assertEquals("Feature: share.version\tSupportedMinVersion: 0\t" + assertEquals("Feature: share.version\tSupportedMinVersion: 0\t" +
"SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t", outputWithoutEpoch(features.get(4))); "SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t", outputWithoutEpoch(features.get(4)));
assertEquals("Feature: streams.version\tSupportedMinVersion: 0\t" +
"SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t", outputWithoutEpoch(features.get(5)));
assertEquals("Feature: transaction.version\tSupportedMinVersion: 0\t" + assertEquals("Feature: transaction.version\tSupportedMinVersion: 0\t" +
"SupportedMaxVersion: 2\tFinalizedVersionLevel: 0\t", outputWithoutEpoch(features.get(5))); "SupportedMaxVersion: 2\tFinalizedVersionLevel: 0\t", outputWithoutEpoch(features.get(6)));
} }
@ClusterTest(types = {Type.KRAFT}, metadataVersion = MetadataVersion.IBP_3_3_IV3) @ClusterTest(types = {Type.KRAFT}, metadataVersion = MetadataVersion.IBP_3_3_IV3)
@ -118,7 +122,7 @@ public class FeatureCommandTest {
); );
// Change expected message to reflect possible MetadataVersion range 1-N (N increases when adding a new version) // Change expected message to reflect possible MetadataVersion range 1-N (N increases when adding a new version)
assertEquals("Could not disable metadata.version. The update failed for all features since the following " + assertEquals("Could not disable metadata.version. The update failed for all features since the following " +
"feature had an error: Invalid update version 0 for feature metadata.version. Local controller 3000 only supports versions 7-28", commandOutput); "feature had an error: Invalid update version 0 for feature metadata.version. Local controller 3000 only supports versions 7-29", commandOutput);
commandOutput = ToolsTestUtils.captureStandardOut(() -> commandOutput = ToolsTestUtils.captureStandardOut(() ->
assertEquals(1, FeatureCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(), assertEquals(1, FeatureCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(),
@ -182,6 +186,7 @@ public class FeatureCommandTest {
"kraft.version was downgraded to 0.\n" + "kraft.version was downgraded to 0.\n" +
"metadata.version was downgraded to 18.\n" + "metadata.version was downgraded to 18.\n" +
"share.version was downgraded to 0.\n" + "share.version was downgraded to 0.\n" +
"streams.version was downgraded to 0.\n" +
"transaction.version was downgraded to 0.", commandOutput); "transaction.version was downgraded to 0.", commandOutput);
} }