diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 50acdf24643..76e61671e5a 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -3831,7 +3831,7 @@ class KafkaApis(val requestChannel: RequestChannel, GroupVersion.fromFeatureLevel(metadataCache.features.finalizedFeatures.getOrDefault(GroupVersion.FEATURE_NAME, 0.toShort)) } - private def isConsumerGroupProtocolEnabled(): Boolean = { + def isConsumerGroupProtocolEnabled(): Boolean = { groupCoordinator.isNewGroupCoordinator && config.groupCoordinatorRebalanceProtocols.contains(Group.GroupType.CONSUMER) && groupVersion().isConsumerRebalanceProtocolSupported diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 46bebf3c116..4b4c9e8f41c 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -581,19 +581,13 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _]) throw new ConfigException(s"Disabling the '${GroupType.CLASSIC}' protocol is not supported.") } if (protocols.contains(GroupType.CONSUMER)) { - if (processRoles.isEmpty) { - throw new ConfigException(s"The new '${GroupType.CONSUMER}' rebalance protocol is only supported in KRaft cluster.") - } - if (!isNewGroupCoordinatorEnabled) { - throw new ConfigException(s"The new '${GroupType.CONSUMER}' rebalance protocol is only supported by the new group coordinator.") + if (processRoles.isEmpty || !isNewGroupCoordinatorEnabled) { + warn(s"The new '${GroupType.CONSUMER}' rebalance protocol is only supported in KRaft cluster with the new group coordinator.") } } if (protocols.contains(GroupType.SHARE)) { - if (processRoles.isEmpty) { - throw new ConfigException(s"The new '${GroupType.SHARE}' rebalance protocol is only supported in KRaft cluster.") - } - if (!isNewGroupCoordinatorEnabled) { - throw new ConfigException(s"The new '${GroupType.SHARE}' rebalance protocol is only supported by the new group coordinator.") + if (processRoles.isEmpty || !isNewGroupCoordinatorEnabled) { + warn(s"The new '${GroupType.SHARE}' rebalance protocol is only supported in KRaft cluster with the new group coordinator.") } warn(s"Share groups and the new '${GroupType.SHARE}' rebalance protocol are enabled. " + "This is part of the early access of KIP-932 and MUST NOT be used in production.") diff --git a/core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java b/core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java index 2270b3f1c8a..36fd2252658 100644 --- a/core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java +++ b/core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java @@ -19,7 +19,6 @@ package kafka.admin; import kafka.cluster.Broker; import kafka.cluster.EndPoint; import kafka.test.ClusterInstance; -import kafka.test.annotation.ClusterConfigProperty; import kafka.test.annotation.ClusterTest; import kafka.test.annotation.Type; import kafka.test.junit.ClusterTestExtensions; @@ -63,8 +62,6 @@ import static java.util.Arrays.asList; import static java.util.Collections.singleton; import static java.util.Collections.singletonList; import static java.util.Collections.singletonMap; -import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG; -import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG; import static org.apache.kafka.security.PasswordEncoderConfigs.PASSWORD_ENCODER_CIPHER_ALGORITHM_CONFIG; import static org.apache.kafka.security.PasswordEncoderConfigs.PASSWORD_ENCODER_ITERATIONS_CONFIG; import static org.apache.kafka.security.PasswordEncoderConfigs.PASSWORD_ENCODER_KEYFACTORY_ALGORITHM_CONFIG; @@ -292,10 +289,7 @@ public class ConfigCommandIntegrationTest { } } - @ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, serverProperties = { - @ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"), - @ClusterConfigProperty(key = GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer"), - }) + @ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}) public void testGroupConfigUpdateUsingKraft() throws Exception { alterOpts = asList("--bootstrap-server", cluster.bootstrapServers(), "--entity-type", "groups", "--alter"); verifyGroupConfigUpdate(); diff --git a/core/src/test/java/kafka/test/ClusterInstance.java b/core/src/test/java/kafka/test/ClusterInstance.java index 75522c1ba7f..41c148b8f42 100644 --- a/core/src/test/java/kafka/test/ClusterInstance.java +++ b/core/src/test/java/kafka/test/ClusterInstance.java @@ -32,7 +32,6 @@ import org.apache.kafka.test.TestUtils; import java.util.Collection; import java.util.Collections; -import java.util.HashSet; import java.util.Map; import java.util.Optional; import java.util.Properties; @@ -41,7 +40,7 @@ import java.util.stream.Collectors; import static org.apache.kafka.clients.consumer.GroupProtocol.CLASSIC; import static org.apache.kafka.clients.consumer.GroupProtocol.CONSUMER; -import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG; +import static org.apache.kafka.common.utils.Utils.mkSet; public interface ClusterInstance { @@ -161,15 +160,11 @@ public interface ClusterInstance { } default Set supportedGroupProtocols() { - Map serverProperties = config().serverProperties(); - Set supportedGroupProtocols = new HashSet<>(); - supportedGroupProtocols.add(CLASSIC); - - if (serverProperties.getOrDefault(GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, "").contains("consumer")) { - supportedGroupProtocols.add(CONSUMER); + if (isKRaftTest() && brokers().values().stream().allMatch(b -> b.dataPlaneRequestProcessor().isConsumerGroupProtocolEnabled())) { + return mkSet(CLASSIC, CONSUMER); + } else { + return Collections.singleton(CLASSIC); } - - return Collections.unmodifiableSet(supportedGroupProtocols); } //---------------------------[modify]---------------------------// diff --git a/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java b/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java index 7c06a441a0b..c8d4c5fdbf3 100644 --- a/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java +++ b/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java @@ -192,45 +192,24 @@ public class ClusterTestExtensionsTest { Assertions.assertEquals(MetadataVersion.latestTesting(), clusterInstance.config().metadataVersion()); } - @ClusterTests({ - @ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, serverProperties = { - @ClusterConfigProperty(key = GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer"), - }), - @ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, serverProperties = { - @ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"), - @ClusterConfigProperty(key = GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer"), - }) - }) + @ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}) public void testSupportedNewGroupProtocols(ClusterInstance clusterInstance) { Set supportedGroupProtocols = new HashSet<>(); supportedGroupProtocols.add(CLASSIC); supportedGroupProtocols.add(CONSUMER); - Assertions.assertTrue(clusterInstance.supportedGroupProtocols().containsAll(supportedGroupProtocols)); - Assertions.assertEquals(2, clusterInstance.supportedGroupProtocols().size()); + Assertions.assertEquals(supportedGroupProtocols, clusterInstance.supportedGroupProtocols()); } @ClusterTests({ @ClusterTest(types = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}, serverProperties = { - @ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"), - }), - @ClusterTest(types = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}, serverProperties = { - @ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "false"), - }), - @ClusterTest(types = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}, serverProperties = { - @ClusterConfigProperty(key = GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic"), - }), - @ClusterTest(types = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}, serverProperties = { - @ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"), @ClusterConfigProperty(key = GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic"), }), @ClusterTest(types = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}, serverProperties = { @ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "false"), - @ClusterConfigProperty(key = GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic"), - }, tags = {"disable-new-coordinator-and-disable-new-consumer-rebalance-coordinator"}), + }) }) public void testNotSupportedNewGroupProtocols(ClusterInstance clusterInstance) { - Assertions.assertTrue(clusterInstance.supportedGroupProtocols().contains(CLASSIC)); - Assertions.assertEquals(1, clusterInstance.supportedGroupProtocols().size()); + Assertions.assertEquals(Collections.singleton(CLASSIC), clusterInstance.supportedGroupProtocols()); } diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index e98d3150252..59632f22858 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -1799,7 +1799,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest { } @ParameterizedTest - @ValueSource(strings = Array("kraft+kip848")) + @ValueSource(strings = Array("kraft")) def testIncrementalAlterGroupConfigsWithAlterAcl(quorum: String): Unit = { addAndVerifyAcls(groupAlterConfigsAcl(groupResource), groupResource) @@ -1809,7 +1809,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest { } @ParameterizedTest - @ValueSource(strings = Array("kraft+kip848")) + @ValueSource(strings = Array("kraft")) def testIncrementalAlterGroupConfigsWithOperationAll(quorum: String): Unit = { val allowAllOpsAcl = new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, ALL, ALLOW) addAndVerifyAcls(Set(allowAllOpsAcl), groupResource) @@ -1820,7 +1820,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest { } @ParameterizedTest - @ValueSource(strings = Array("kraft+kip848")) + @ValueSource(strings = Array("kraft")) def testIncrementalAlterGroupConfigsWithoutAlterAcl(quorum: String): Unit = { removeAllClientAcls() @@ -1830,7 +1830,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest { } @ParameterizedTest - @ValueSource(strings = Array("kraft+kip848")) + @ValueSource(strings = Array("kraft")) def testDescribeGroupConfigsWithDescribeAcl(quorum: String): Unit = { addAndVerifyAcls(groupDescribeConfigsAcl(groupResource), groupResource) @@ -1840,7 +1840,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest { } @ParameterizedTest - @ValueSource(strings = Array("kraft+kip848")) + @ValueSource(strings = Array("kraft")) def testDescribeGroupConfigsWithOperationAll(quorum: String): Unit = { val allowAllOpsAcl = new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, ALL, ALLOW) addAndVerifyAcls(Set(allowAllOpsAcl), groupResource) @@ -1851,7 +1851,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest { } @ParameterizedTest - @ValueSource(strings = Array("kraft+kip848")) + @ValueSource(strings = Array("kraft")) def testDescribeGroupConfigsWithoutDescribeAcl(quorum: String): Unit = { removeAllClientAcls() diff --git a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala index b6312619cd5..26ebed6bd86 100644 --- a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala @@ -115,14 +115,12 @@ object BaseConsumerTest { // We want to test the following combinations: // * ZooKeeper and the classic group protocol // * KRaft and the classic group protocol - // * KRaft with the new group coordinator enabled and the classic group protocol - // * KRaft with the new group coordinator enabled and the consumer group protocol + // * KRaft and the consumer group protocol def getTestQuorumAndGroupProtocolParametersAll() : java.util.stream.Stream[Arguments] = { util.Arrays.stream(Array( Arguments.of("zk", "classic"), Arguments.of("kraft", "classic"), - Arguments.of("kraft+kip848", "classic"), - Arguments.of("kraft+kip848", "consumer") + Arguments.of("kraft", "consumer") )) } @@ -138,20 +136,18 @@ object BaseConsumerTest { // For tests that only work with the classic group protocol, we want to test the following combinations: // * ZooKeeper and the classic group protocol // * KRaft and the classic group protocol - // * KRaft with the new group coordinator enabled and the classic group protocol def getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly() : java.util.stream.Stream[Arguments] = { util.Arrays.stream(Array( Arguments.of("zk", "classic"), - Arguments.of("kraft", "classic"), - Arguments.of("kraft+kip848", "classic") + Arguments.of("kraft", "classic") )) } // For tests that only work with the consumer group protocol, we want to test the following combination: - // * KRaft with the new group coordinator enabled and the consumer group protocol + // * KRaft and the consumer group protocol def getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly(): java.util.stream.Stream[Arguments] = { util.Arrays.stream(Array( - Arguments.of("kraft+kip848", "consumer") + Arguments.of("kraft", "consumer") )) } diff --git a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala index 6995b3be442..3d58441c8d4 100644 --- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala +++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala @@ -71,12 +71,7 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness { if (isZkMigrationTest()) { cfgs.foreach(_.setProperty(KRaftConfigs.MIGRATION_ENABLED_CONFIG, "true")) } - if (isNewGroupCoordinatorEnabled()) { - cfgs.foreach(_.setProperty(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, "true")) - cfgs.foreach(_.setProperty(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, "classic,consumer")) - } if (isShareGroupTest()) { - cfgs.foreach(_.setProperty(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, "true")) cfgs.foreach(_.setProperty(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, "classic,consumer,share")) cfgs.foreach(_.setProperty(ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG, "true")) } diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala index 0a97a776f65..f6368273f56 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala @@ -528,7 +528,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { } @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft", "kraft+kip848")) + @ValueSource(strings = Array("zk", "kraft")) def testAbortTransaction(quorum: String): Unit = { client = createAdminClient val tp = new TopicPartition("topic1", 0) @@ -538,7 +538,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { val configs = new util.HashMap[String, Object]() configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, plaintextBootstrapServers(brokers)) configs.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.toString) - if (quorum == "kraft+kip848") + if (quorum == "kraft") configs.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, ConsumerProtocol.PROTOCOL_TYPE) val consumer = new KafkaConsumer(configs, new ByteArrayDeserializer, new ByteArrayDeserializer) try { @@ -1006,7 +1006,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { } @ParameterizedTest - @ValueSource(strings = Array("kraft+kip848")) + @ValueSource(strings = Array("kraft")) def testIncrementalAlterAndDescribeGroupConfigs(quorum: String): Unit = { client = createAdminClient val group = "describe-alter-configs-group" diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerAssignorsTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerAssignorsTest.scala index f047fd1b4d0..7fefd582868 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerAssignorsTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerAssignorsTest.scala @@ -238,7 +238,7 @@ class PlaintextConsumerAssignorsTest extends AbstractConsumerTest { // Remote assignors only supported with consumer group protocol @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) @CsvSource(Array( - "kraft+kip848, consumer" + "kraft, consumer" )) def testRemoteAssignorInvalid(quorum: String, groupProtocol: String): Unit = { // 1 consumer using invalid remote assignor @@ -268,7 +268,7 @@ class PlaintextConsumerAssignorsTest extends AbstractConsumerTest { // Remote assignors only supported with consumer group protocol @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) @CsvSource(Array( - "kraft+kip848, consumer" + "kraft, consumer" )) def testRemoteAssignorRange(quorum: String, groupProtocol: String): Unit = { // 1 consumer using range assignment diff --git a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala index 0a64ec9ed51..79996915074 100644 --- a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala +++ b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala @@ -112,7 +112,7 @@ class TransactionsTest extends IntegrationTestHarness { } @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft", "kraft+kip848")) + @ValueSource(strings = Array("zk", "kraft")) def testBasicTransactions(quorum: String): Unit = { val producer = transactionalProducers.head val consumer = transactionalConsumers.head @@ -173,7 +173,7 @@ class TransactionsTest extends IntegrationTestHarness { } @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft", "kraft+kip848")) + @ValueSource(strings = Array("zk", "kraft")) def testReadCommittedConsumerShouldNotSeeUndecidedData(quorum: String): Unit = { val producer1 = transactionalProducers.head val producer2 = createTransactionalProducer("other") @@ -241,7 +241,7 @@ class TransactionsTest extends IntegrationTestHarness { } @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft", "kraft+kip848")) + @ValueSource(strings = Array("zk", "kraft")) def testDelayedFetchIncludesAbortedTransaction(quorum: String): Unit = { val producer1 = transactionalProducers.head val producer2 = createTransactionalProducer("other") @@ -300,14 +300,14 @@ class TransactionsTest extends IntegrationTestHarness { @nowarn("cat=deprecation") @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft", "kraft+kip848")) + @ValueSource(strings = Array("zk", "kraft")) def testSendOffsetsWithGroupId(quorum: String): Unit = { sendOffset((producer, groupId, consumer) => producer.sendOffsetsToTransaction(TestUtils.consumerPositions(consumer).asJava, groupId)) } @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft", "kraft+kip848")) + @ValueSource(strings = Array("zk", "kraft")) def testSendOffsetsWithGroupMetadata(quorum: String): Unit = { sendOffset((producer, _, consumer) => producer.sendOffsetsToTransaction(TestUtils.consumerPositions(consumer).asJava, consumer.groupMetadata())) @@ -387,7 +387,7 @@ class TransactionsTest extends IntegrationTestHarness { } @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft", "kraft+kip848")) + @ValueSource(strings = Array("zk", "kraft")) def testFencingOnCommit(quorum: String): Unit = { val producer1 = transactionalProducers(0) val producer2 = transactionalProducers(1) @@ -417,7 +417,7 @@ class TransactionsTest extends IntegrationTestHarness { } @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft", "kraft+kip848")) + @ValueSource(strings = Array("zk", "kraft")) def testFencingOnSendOffsets(quorum: String): Unit = { val producer1 = transactionalProducers(0) val producer2 = transactionalProducers(1) @@ -449,7 +449,7 @@ class TransactionsTest extends IntegrationTestHarness { } @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft", "kraft+kip848")) + @ValueSource(strings = Array("zk", "kraft")) def testOffsetMetadataInSendOffsetsToTransaction(quorum: String): Unit = { val tp = new TopicPartition(topic1, 0) val groupId = "group" @@ -475,26 +475,26 @@ class TransactionsTest extends IntegrationTestHarness { } @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft", "kraft+kip848")) + @ValueSource(strings = Array("zk", "kraft")) def testInitTransactionsTimeout(quorum: String): Unit = { testTimeout(needInitAndSendMsg = false, producer => producer.initTransactions()) } @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft", "kraft+kip848")) + @ValueSource(strings = Array("zk", "kraft")) def testSendOffsetsToTransactionTimeout(quorum: String): Unit = { testTimeout(needInitAndSendMsg = true, producer => producer.sendOffsetsToTransaction( Map(new TopicPartition(topic1, 0) -> new OffsetAndMetadata(0)).asJava, new ConsumerGroupMetadata("test-group"))) } @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft", "kraft+kip848")) + @ValueSource(strings = Array("zk", "kraft")) def testCommitTransactionTimeout(quorum: String): Unit = { testTimeout(needInitAndSendMsg = true, producer => producer.commitTransaction()) } @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft", "kraft+kip848")) + @ValueSource(strings = Array("zk", "kraft")) def testAbortTransactionTimeout(quorum: String): Unit = { testTimeout(needInitAndSendMsg = true, producer => producer.abortTransaction()) } @@ -515,7 +515,7 @@ class TransactionsTest extends IntegrationTestHarness { } @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft", "kraft+kip848")) + @ValueSource(strings = Array("zk", "kraft")) def testFencingOnSend(quorum: String): Unit = { val producer1 = transactionalProducers(0) val producer2 = transactionalProducers(1) @@ -560,7 +560,7 @@ class TransactionsTest extends IntegrationTestHarness { } @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft", "kraft+kip848")) + @ValueSource(strings = Array("zk", "kraft")) def testFencingOnAddPartitions(quorum: String): Unit = { val producer1 = transactionalProducers(0) val producer2 = transactionalProducers(1) @@ -607,7 +607,7 @@ class TransactionsTest extends IntegrationTestHarness { } @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft", "kraft+kip848")) + @ValueSource(strings = Array("zk", "kraft")) def testFencingOnTransactionExpiration(quorum: String): Unit = { val producer = createTransactionalProducer("expiringProducer", transactionTimeoutMs = 100) @@ -650,7 +650,7 @@ class TransactionsTest extends IntegrationTestHarness { } @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft", "kraft+kip848")) + @ValueSource(strings = Array("zk", "kraft")) def testMultipleMarkersOneLeader(quorum: String): Unit = { val firstProducer = transactionalProducers.head val consumer = transactionalConsumers.head @@ -688,7 +688,7 @@ class TransactionsTest extends IntegrationTestHarness { } @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft", "kraft+kip848")) + @ValueSource(strings = Array("zk", "kraft")) def testConsecutivelyRunInitTransactions(quorum: String): Unit = { val producer = createTransactionalProducer(transactionalId = "normalProducer") @@ -697,7 +697,7 @@ class TransactionsTest extends IntegrationTestHarness { } @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft", "kraft+kip848")) + @ValueSource(strings = Array("zk", "kraft")) def testBumpTransactionalEpoch(quorum: String): Unit = { val producer = createTransactionalProducer("transactionalProducer", deliveryTimeoutMs = 5000, requestTimeoutMs = 5000) @@ -759,7 +759,7 @@ class TransactionsTest extends IntegrationTestHarness { } @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft", "kraft+kip848")) + @ValueSource(strings = Array("zk", "kraft")) def testFailureToFenceEpoch(quorum: String): Unit = { val producer1 = transactionalProducers.head val producer2 = createTransactionalProducer("transactional-producer", maxBlockMs = 1000) diff --git a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala index 41d8efcc270..81353be9249 100755 --- a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala +++ b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala @@ -191,10 +191,6 @@ abstract class QuorumTestHarness extends Logging { TestInfoUtils.isZkMigrationTest(testInfo) } - def isNewGroupCoordinatorEnabled(): Boolean = { - TestInfoUtils.isNewGroupCoordinatorEnabled(testInfo) - } - def isShareGroupTest(): Boolean = { TestInfoUtils.isShareGroupTest(testInfo) } diff --git a/core/src/test/scala/kafka/utils/TestInfoUtils.scala b/core/src/test/scala/kafka/utils/TestInfoUtils.scala index 66711a85517..1316bdc3b18 100644 --- a/core/src/test/scala/kafka/utils/TestInfoUtils.scala +++ b/core/src/test/scala/kafka/utils/TestInfoUtils.scala @@ -55,10 +55,6 @@ object TestInfoUtils { final val TestWithParameterizedQuorumAndGroupProtocolNames = "{displayName}.quorum={0}.groupProtocol={1}" - def isNewGroupCoordinatorEnabled(testInfo: TestInfo): Boolean = { - testInfo.getDisplayName.contains("kraft+kip848") - } - def isShareGroupTest(testInfo: TestInfo): Boolean = { testInfo.getDisplayName.contains("kraft+kip932") } diff --git a/core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestTest.scala b/core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestTest.scala index 6a4f2f9bab9..55c9614a381 100644 --- a/core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestTest.scala @@ -69,8 +69,6 @@ class ConsumerGroupDescribeRequestTest(cluster: ClusterInstance) extends GroupCo @ClusterTest( types = Array(Type.KRAFT), serverProperties = Array( - new ClusterConfigProperty(key = GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"), - new ClusterConfigProperty(key = GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer"), new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"), new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1") ), @@ -102,8 +100,6 @@ class ConsumerGroupDescribeRequestTest(cluster: ClusterInstance) extends GroupCo @ClusterTest( types = Array(Type.KRAFT), serverProperties = Array( - new ClusterConfigProperty(key = GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"), - new ClusterConfigProperty(key = GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer"), new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"), new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1") ) diff --git a/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala b/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala index 30d55097ea2..907448ec711 100644 --- a/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala @@ -40,7 +40,14 @@ import scala.jdk.CollectionConverters._ @ExtendWith(value = Array(classOf[ClusterTestExtensions])) class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) { - @ClusterTest() + @ClusterTest( + types = Array(Type.KRAFT), + serverProperties = Array( + new ClusterConfigProperty(key = GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic"), + new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"), + new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1") + ) + ) def testConsumerGroupHeartbeatIsInaccessibleWhenDisabledByStaticConfig(): Unit = { val consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder( new ConsumerGroupHeartbeatRequestData() @@ -54,8 +61,6 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) { @ClusterTest( types = Array(Type.KRAFT), serverProperties = Array( - new ClusterConfigProperty(key = GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"), - new ClusterConfigProperty(key = GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer"), new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"), new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1") ), @@ -76,8 +81,6 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) { @ClusterTest( types = Array(Type.KRAFT), serverProperties = Array( - new ClusterConfigProperty(key = GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"), - new ClusterConfigProperty(key = GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer"), new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"), new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1") ) @@ -168,8 +171,6 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) { @ClusterTest( types = Array(Type.KRAFT), serverProperties = Array( - new ClusterConfigProperty(key = GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"), - new ClusterConfigProperty(key = GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer"), new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"), new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1") ) @@ -286,8 +287,6 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) { @ClusterTest( types = Array(Type.KRAFT), serverProperties = Array( - new ClusterConfigProperty(key = GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"), - new ClusterConfigProperty(key = GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer"), new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"), new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"), new ClusterConfigProperty(key = GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG, value = "5000"), @@ -398,8 +397,6 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) { @ClusterTest( types = Array(Type.KRAFT), serverProperties = Array( - new ClusterConfigProperty(key = GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"), - new ClusterConfigProperty(key = GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer"), new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"), new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"), new ClusterConfigProperty(key = GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, value = "5000") diff --git a/core/src/test/scala/unit/kafka/server/ConsumerProtocolMigrationTest.scala b/core/src/test/scala/unit/kafka/server/ConsumerProtocolMigrationTest.scala index 0a907c93b1b..9164cddd84c 100644 --- a/core/src/test/scala/unit/kafka/server/ConsumerProtocolMigrationTest.scala +++ b/core/src/test/scala/unit/kafka/server/ConsumerProtocolMigrationTest.scala @@ -34,8 +34,6 @@ import org.junit.jupiter.api.extension.ExtendWith class ConsumerProtocolMigrationTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) { @ClusterTest( serverProperties = Array( - new ClusterConfigProperty(key = GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"), - new ClusterConfigProperty(key = GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer"), new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"), new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1") ) @@ -106,8 +104,6 @@ class ConsumerProtocolMigrationTest(cluster: ClusterInstance) extends GroupCoord @ClusterTest( serverProperties = Array( - new ClusterConfigProperty(key = GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"), - new ClusterConfigProperty(key = GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer"), new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"), new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1") ) @@ -171,8 +167,6 @@ class ConsumerProtocolMigrationTest(cluster: ClusterInstance) extends GroupCoord @ClusterTest( serverProperties = Array( - new ClusterConfigProperty(key = GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"), - new ClusterConfigProperty(key = GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer"), new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"), new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1") ) diff --git a/core/src/test/scala/unit/kafka/server/DeleteGroupsRequestTest.scala b/core/src/test/scala/unit/kafka/server/DeleteGroupsRequestTest.scala index 17578627faa..70c97b132ac 100644 --- a/core/src/test/scala/unit/kafka/server/DeleteGroupsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/DeleteGroupsRequestTest.scala @@ -33,8 +33,6 @@ import org.junit.jupiter.api.extension.ExtendWith class DeleteGroupsRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) { @ClusterTest( serverProperties = Array( - new ClusterConfigProperty(key = GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"), - new ClusterConfigProperty(key = GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer"), new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"), new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1") ) @@ -45,8 +43,6 @@ class DeleteGroupsRequestTest(cluster: ClusterInstance) extends GroupCoordinator @ClusterTest( serverProperties = Array( - new ClusterConfigProperty(key = GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"), - new ClusterConfigProperty(key = GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer"), new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"), new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1") ) diff --git a/core/src/test/scala/unit/kafka/server/DescribeGroupsRequestTest.scala b/core/src/test/scala/unit/kafka/server/DescribeGroupsRequestTest.scala index 8cf61ef9061..da47204f67d 100644 --- a/core/src/test/scala/unit/kafka/server/DescribeGroupsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/DescribeGroupsRequestTest.scala @@ -34,8 +34,6 @@ import scala.jdk.CollectionConverters._ @ClusterTestDefaults(types = Array(Type.KRAFT)) class DescribeGroupsRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) { @ClusterTest(serverProperties = Array( - new ClusterConfigProperty(key = GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"), - new ClusterConfigProperty(key = GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer"), new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"), new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1") )) diff --git a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala index 818f0478f88..3ed57e06952 100644 --- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala +++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala @@ -36,7 +36,7 @@ import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity} import org.apache.kafka.common.record.{CompressionType, RecordVersion} import org.apache.kafka.common.security.auth.KafkaPrincipal import org.apache.kafka.common.{TopicPartition, Uuid} -import org.apache.kafka.coordinator.group.{GroupConfig, GroupCoordinatorConfig} +import org.apache.kafka.coordinator.group.GroupConfig import org.apache.kafka.server.common.MetadataVersion.IBP_3_0_IV1 import org.apache.kafka.server.config.{ConfigType, QuotaConfigs, ServerLogConfigs, ZooKeeperInternals} import org.apache.kafka.storage.internals.log.LogConfig @@ -62,10 +62,6 @@ import scala.jdk.CollectionConverters._ class DynamicConfigChangeTest extends KafkaServerTestHarness { override def generateConfigs: Seq[KafkaConfig] = { val cfg = TestUtils.createBrokerConfig(0, zkConnectOrNull) - if (isNewGroupCoordinatorEnabled()) { - cfg.setProperty(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, "true") - cfg.setProperty(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, "classic,consumer") - } List(KafkaConfig.fromProps(cfg)) } @@ -584,7 +580,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness { } @ParameterizedTest - @ValueSource(strings = Array("kraft+kip848")) + @ValueSource(strings = Array("kraft")) def testDynamicGroupConfigChange(quorum: String): Unit = { val newSessionTimeoutMs = 50000 val consumerGroupId = "group-foo" @@ -611,7 +607,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness { } @ParameterizedTest - @ValueSource(strings = Array("kraft+kip848")) + @ValueSource(strings = Array("kraft")) def testIncrementalAlterDefaultGroupConfig(quorum: String): Unit = { val admin = createAdminClient() try { diff --git a/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala b/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala index 11414452455..7c0d8771db2 100644 --- a/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala @@ -60,11 +60,11 @@ class GroupCoordinatorBaseRequestTest(cluster: ClusterInstance) { } protected def isUnstableApiEnabled: Boolean = { - cluster.config.serverProperties.get("unstable.api.versions.enable") == "true" + cluster.brokers.values.stream.allMatch(b => b.config.unstableApiVersionsEnabled) } protected def isNewGroupCoordinatorEnabled: Boolean = { - cluster.config.serverProperties.get("group.coordinator.new.enable") == "true" + cluster.brokers.values.stream.allMatch(b => b.config.isNewGroupCoordinatorEnabled) } protected def commitOffset( diff --git a/core/src/test/scala/unit/kafka/server/HeartbeatRequestTest.scala b/core/src/test/scala/unit/kafka/server/HeartbeatRequestTest.scala index 55c89207de9..db767d3fbf3 100644 --- a/core/src/test/scala/unit/kafka/server/HeartbeatRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/HeartbeatRequestTest.scala @@ -38,8 +38,6 @@ import scala.concurrent.Future @ClusterTestDefaults(types = Array(Type.KRAFT)) class HeartbeatRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) { @ClusterTest(serverProperties = Array( - new ClusterConfigProperty(key = GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"), - new ClusterConfigProperty(key = GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer"), new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"), new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1") )) diff --git a/core/src/test/scala/unit/kafka/server/JoinGroupRequestTest.scala b/core/src/test/scala/unit/kafka/server/JoinGroupRequestTest.scala index c7c47bd3420..d9054867076 100644 --- a/core/src/test/scala/unit/kafka/server/JoinGroupRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/JoinGroupRequestTest.scala @@ -41,7 +41,6 @@ import scala.jdk.CollectionConverters._ @ExtendWith(value = Array(classOf[ClusterTestExtensions])) class JoinGroupRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) { @ClusterTest(types = Array(Type.KRAFT), serverProperties = Array( - new ClusterConfigProperty(key = GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"), new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"), new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"), new ClusterConfigProperty(key = GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, value = "1000"), diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index 307c6bdc021..00cb80aba52 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -3356,9 +3356,7 @@ class KafkaApisTest extends Logging { }.toMap ) } - kafkaApis = createKafkaApis(overrideProperties = Map( - GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true" - )) + kafkaApis = createKafkaApis() kafkaApis.handleWriteTxnMarkersRequest(requestChannelRequest, RequestLocal.noCaching) val expectedResponse = new WriteTxnMarkersResponseData() @@ -3441,9 +3439,7 @@ class KafkaApisTest extends Logging { ArgumentMatchers.eq(TransactionResult.COMMIT), ArgumentMatchers.eq(Duration.ofMillis(ServerConfigs.REQUEST_TIMEOUT_MS_DEFAULT)) )).thenReturn(FutureUtils.failedFuture[Void](error.exception())) - kafkaApis = createKafkaApis(overrideProperties = Map( - GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true" - )) + kafkaApis = createKafkaApis() kafkaApis.handleWriteTxnMarkersRequest(requestChannelRequest, RequestLocal.noCaching) val expectedError = error match { @@ -4612,7 +4608,6 @@ class KafkaApisTest extends Logging { val request = buildRequest(shareFetchRequest) kafkaApis = createKafkaApis( overrideProperties = Map( - GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true", ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true", ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), raftSupport = true) @@ -4696,7 +4691,6 @@ class KafkaApisTest extends Logging { var request = buildRequest(shareFetchRequest) kafkaApis = createKafkaApis( overrideProperties = Map( - GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true", ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true", ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), raftSupport = true) @@ -4800,7 +4794,6 @@ class KafkaApisTest extends Logging { var request = buildRequest(shareFetchRequest) kafkaApis = createKafkaApis( overrideProperties = Map( - GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true", ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true", ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), raftSupport = true) @@ -4884,7 +4877,6 @@ class KafkaApisTest extends Logging { val request = buildRequest(shareFetchRequest) kafkaApis = createKafkaApis( overrideProperties = Map( - GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true", ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true", ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), raftSupport = true) @@ -4962,7 +4954,6 @@ class KafkaApisTest extends Logging { val request = buildRequest(shareFetchRequest) kafkaApis = createKafkaApis( overrideProperties = Map( - GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true", ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true", ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), raftSupport = true) @@ -5027,7 +5018,6 @@ class KafkaApisTest extends Logging { val request = buildRequest(shareFetchRequest) kafkaApis = createKafkaApis( overrideProperties = Map( - GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true", ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true", ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), raftSupport = true) @@ -5084,7 +5074,6 @@ class KafkaApisTest extends Logging { val request = buildRequest(shareFetchRequest) kafkaApis = createKafkaApis( overrideProperties = Map( - GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true", ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true", ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), raftSupport = true) @@ -5156,7 +5145,6 @@ class KafkaApisTest extends Logging { // First share fetch request is to establish the share session with the broker. kafkaApis = createKafkaApis( overrideProperties = Map( - GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true", ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true", ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), raftSupport = true) @@ -5250,7 +5238,6 @@ class KafkaApisTest extends Logging { // First share fetch request is to establish the share session with the broker. kafkaApis = createKafkaApis( overrideProperties = Map( - GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true", ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true", ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), raftSupport = true) @@ -5398,7 +5385,6 @@ class KafkaApisTest extends Logging { // First share fetch request is to establish the share session with the broker. kafkaApis = createKafkaApis( overrideProperties = Map( - GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true", ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true", ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), raftSupport = true) @@ -5732,7 +5718,6 @@ class KafkaApisTest extends Logging { // First share fetch request is to establish the share session with the broker. kafkaApis = createKafkaApis( overrideProperties = Map( - GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true", ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true", ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), raftSupport = true) @@ -6098,7 +6083,6 @@ class KafkaApisTest extends Logging { // First share fetch request is to establish the share session with the broker. kafkaApis = createKafkaApis( overrideProperties = Map( - GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true", ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true", ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), raftSupport = true) @@ -6245,7 +6229,6 @@ class KafkaApisTest extends Logging { // First share fetch request is to establish the share session with the broker. kafkaApis = createKafkaApis( overrideProperties = Map( - GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true", ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true", ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), raftSupport = true) @@ -6389,7 +6372,6 @@ class KafkaApisTest extends Logging { // First share fetch request is to establish the share session with the broker. kafkaApis = createKafkaApis( overrideProperties = Map( - GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true", ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true", ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), raftSupport = true) @@ -6559,7 +6541,6 @@ class KafkaApisTest extends Logging { val request = buildRequest(shareFetchRequest) kafkaApis = createKafkaApis( overrideProperties = Map( - GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true", ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true", ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), raftSupport = true) @@ -6733,7 +6714,6 @@ class KafkaApisTest extends Logging { var request = buildRequest(shareFetchRequest) kafkaApis = createKafkaApis( overrideProperties = Map( - GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true", ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true", ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), raftSupport = true) @@ -6864,7 +6844,6 @@ class KafkaApisTest extends Logging { kafkaApis = createKafkaApis( overrideProperties = Map( - GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true", ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true", ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "false"), raftSupport = true) @@ -6917,7 +6896,6 @@ class KafkaApisTest extends Logging { val request = buildRequest(shareFetchRequest) kafkaApis = createKafkaApis( overrideProperties = Map( - GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true", ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true", ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), authorizer = Option(authorizer), @@ -6982,7 +6960,6 @@ class KafkaApisTest extends Logging { val request = buildRequest(shareFetchRequest) kafkaApis = createKafkaApis( overrideProperties = Map( - GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true", ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true", ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), raftSupport = true) @@ -7050,7 +7027,6 @@ class KafkaApisTest extends Logging { val request = buildRequest(shareAcknowledgeRequest) kafkaApis = createKafkaApis( overrideProperties = Map( - GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true", ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true", ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), raftSupport = true) @@ -7141,7 +7117,6 @@ class KafkaApisTest extends Logging { kafkaApis = createKafkaApis( overrideProperties = Map( - GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true", ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true", ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "false"), raftSupport = true) @@ -7193,7 +7168,6 @@ class KafkaApisTest extends Logging { val request = buildRequest(shareAcknowledgeRequest) kafkaApis = createKafkaApis( overrideProperties = Map( - GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true", ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true", ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), authorizer = Option(authorizer), @@ -7246,7 +7220,6 @@ class KafkaApisTest extends Logging { kafkaApis = createKafkaApis( overrideProperties = Map( - GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true", ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true", ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), raftSupport = true) @@ -7298,7 +7271,6 @@ class KafkaApisTest extends Logging { kafkaApis = createKafkaApis( overrideProperties = Map( - GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true", ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true", ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), raftSupport = true) @@ -7348,7 +7320,6 @@ class KafkaApisTest extends Logging { val request = buildRequest(shareAcknowledgeRequest) kafkaApis = createKafkaApis( overrideProperties = Map( - GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true", ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true", ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), raftSupport = true) @@ -7424,7 +7395,6 @@ class KafkaApisTest extends Logging { val request = buildRequest(shareAcknowledgeRequest) kafkaApis = createKafkaApis( overrideProperties = Map( - GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true", ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true", ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), raftSupport = true) @@ -7488,7 +7458,6 @@ class KafkaApisTest extends Logging { val request = buildRequest(shareAcknowledgeRequest) kafkaApis = createKafkaApis( overrideProperties = Map( - GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true", ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true", ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), raftSupport = true) @@ -7556,7 +7525,6 @@ class KafkaApisTest extends Logging { val request = buildRequest(shareAcknowledgeRequest) kafkaApis = createKafkaApis( overrideProperties = Map( - GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true", ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true", ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), raftSupport = true) @@ -7625,7 +7593,6 @@ class KafkaApisTest extends Logging { val request = buildRequest(shareAcknowledgeRequest) kafkaApis = createKafkaApis( overrideProperties = Map( - GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true", ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true", ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), raftSupport = true) @@ -7712,7 +7679,6 @@ class KafkaApisTest extends Logging { kafkaApis = createKafkaApis( overrideProperties = Map( - GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true", ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true", ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), raftSupport = true) @@ -7782,7 +7748,6 @@ class KafkaApisTest extends Logging { kafkaApis = createKafkaApis( overrideProperties = Map( - GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true", ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true", ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), raftSupport = true) @@ -7856,7 +7821,6 @@ class KafkaApisTest extends Logging { kafkaApis = createKafkaApis( overrideProperties = Map( - GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true", ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true", ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), raftSupport = true) @@ -7924,7 +7888,6 @@ class KafkaApisTest extends Logging { kafkaApis = createKafkaApis( overrideProperties = Map( - GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true", ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true", ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), raftSupport = true) @@ -7997,7 +7960,6 @@ class KafkaApisTest extends Logging { kafkaApis = createKafkaApis( overrideProperties = Map( - GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true", ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true", ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), raftSupport = true) @@ -8077,7 +8039,6 @@ class KafkaApisTest extends Logging { kafkaApis = createKafkaApis( overrideProperties = Map( - GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true", ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true", ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), raftSupport = true) @@ -8158,7 +8119,6 @@ class KafkaApisTest extends Logging { kafkaApis = createKafkaApis( overrideProperties = Map( - GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true", ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true", ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), raftSupport = true) @@ -8233,7 +8193,6 @@ class KafkaApisTest extends Logging { kafkaApis = createKafkaApis( overrideProperties = Map( - GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true", ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true", ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), raftSupport = true) @@ -8331,7 +8290,6 @@ class KafkaApisTest extends Logging { val request = buildRequest(shareAcknowledgeRequest) kafkaApis = createKafkaApis( overrideProperties = Map( - GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true", ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true", ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), raftSupport = true) @@ -11105,7 +11063,6 @@ class KafkaApisTest extends Logging { consumerGroupHeartbeatRequest )).thenReturn(future) kafkaApis = createKafkaApis( - overrideProperties = Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> "classic,consumer"), featureVersions = Seq(GroupVersion.GV_1), raftSupport = true ) @@ -11133,7 +11090,6 @@ class KafkaApisTest extends Logging { consumerGroupHeartbeatRequest )).thenReturn(future) kafkaApis = createKafkaApis( - overrideProperties = Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> "classic,consumer"), featureVersions = Seq(GroupVersion.GV_1), raftSupport = true ) @@ -11157,7 +11113,6 @@ class KafkaApisTest extends Logging { .thenReturn(Seq(AuthorizationResult.DENIED).asJava) kafkaApis = createKafkaApis( authorizer = Some(authorizer), - overrideProperties = Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> "classic,consumer"), featureVersions = Seq(GroupVersion.GV_1), raftSupport = true ) @@ -11184,7 +11139,6 @@ class KafkaApisTest extends Logging { any[util.List[String]] )).thenReturn(future) kafkaApis = createKafkaApis( - overrideProperties = Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> "classic,consumer"), featureVersions = Seq(GroupVersion.GV_1), raftSupport = true ) @@ -11255,7 +11209,6 @@ class KafkaApisTest extends Logging { future.complete(List().asJava) kafkaApis = createKafkaApis( authorizer = Some(authorizer), - overrideProperties = Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> "classic,consumer"), featureVersions = Seq(GroupVersion.GV_1), raftSupport = true ) @@ -11279,7 +11232,6 @@ class KafkaApisTest extends Logging { any[util.List[String]] )).thenReturn(future) kafkaApis = createKafkaApis( - overrideProperties = Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> "classic,consumer"), featureVersions = Seq(GroupVersion.GV_1), raftSupport = true ) @@ -11470,7 +11422,7 @@ class KafkaApisTest extends Logging { )).thenReturn(future) metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0) kafkaApis = createKafkaApis( - overrideProperties = Map(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true", ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), + overrideProperties = Map(ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), raftSupport = true ) kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching) @@ -11494,7 +11446,7 @@ class KafkaApisTest extends Logging { .thenReturn(Seq(AuthorizationResult.DENIED).asJava) metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0) kafkaApis = createKafkaApis( - overrideProperties = Map(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true", ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), + overrideProperties = Map(ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), authorizer = Some(authorizer), raftSupport = true ) @@ -11517,7 +11469,7 @@ class KafkaApisTest extends Logging { )).thenReturn(future) metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0) kafkaApis = createKafkaApis( - overrideProperties = Map(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true", ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), + overrideProperties = Map(ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), raftSupport = true ) kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching) @@ -11534,7 +11486,7 @@ class KafkaApisTest extends Logging { new ShareGroupDescribeResponseData.DescribedGroup().setGroupId(groupIds.get(0)), new ShareGroupDescribeResponseData.DescribedGroup().setGroupId(groupIds.get(1)) ).asJava - getShareGroupDescribeResponse(groupIds, Map(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true", ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true") + getShareGroupDescribeResponse(groupIds, Map(ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true") , true, null, describedGroups) } @@ -11558,7 +11510,7 @@ class KafkaApisTest extends Logging { val authorizer: Authorizer = mock(classOf[Authorizer]) when(authorizer.authorize(any[RequestContext], any[util.List[Action]])) .thenReturn(Seq(AuthorizationResult.DENIED).asJava) - val response = getShareGroupDescribeResponse(groupIds, Map(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true", ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true") + val response = getShareGroupDescribeResponse(groupIds, Map(ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true") , false, authorizer, describedGroups) assertNotNull(response.data) assertEquals(2, response.data.groups.size) @@ -11576,7 +11528,7 @@ class KafkaApisTest extends Logging { when(authorizer.authorize(any[RequestContext], any[util.List[Action]])) .thenReturn(Seq(AuthorizationResult.DENIED).asJava, Seq(AuthorizationResult.ALLOWED).asJava) - val response = getShareGroupDescribeResponse(groupIds, Map(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true", ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true") + val response = getShareGroupDescribeResponse(groupIds, Map(ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true") , false, authorizer, describedGroups) assertNotNull(response.data) diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index b056c19b3f1..ac05bfd2d13 100755 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -1939,14 +1939,6 @@ class KafkaConfigTest { def testGroupCoordinatorRebalanceProtocols(): Unit = { val props = new Properties() - // consumer cannot be enabled in ZK mode. - props.put(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, "classic,consumer") - assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props)) - - // share cannot be enabled in ZK mode. - props.put(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, "classic,share") - assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props)) - // Setting KRaft's properties. props.putAll(kraftProps()) @@ -1971,16 +1963,6 @@ class KafkaConfigTest { assertEquals(Set(GroupType.CLASSIC, GroupType.CONSUMER, GroupType.SHARE), config.groupCoordinatorRebalanceProtocols) assertTrue(config.isNewGroupCoordinatorEnabled) assertTrue(config.shareGroupConfig.isShareGroupEnabled) - - // consumer cannot be used without the new group coordinator. - props.put(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, "false") - props.put(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, "classic,consumer") - assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props)) - - // share cannot be used without the new group coordinator. - props.put(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, "false") - props.put(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, "classic,share") - assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props)) } @Test diff --git a/core/src/test/scala/unit/kafka/server/LeaveGroupRequestTest.scala b/core/src/test/scala/unit/kafka/server/LeaveGroupRequestTest.scala index 9870775d585..6b00870a87e 100644 --- a/core/src/test/scala/unit/kafka/server/LeaveGroupRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/LeaveGroupRequestTest.scala @@ -32,8 +32,6 @@ import org.junit.jupiter.api.extension.ExtendWith @ClusterTestDefaults(types = Array(Type.KRAFT)) class LeaveGroupRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) { @ClusterTest(serverProperties = Array( - new ClusterConfigProperty(key = GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"), - new ClusterConfigProperty(key = GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer"), new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"), new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1") )) diff --git a/core/src/test/scala/unit/kafka/server/ListGroupsRequestTest.scala b/core/src/test/scala/unit/kafka/server/ListGroupsRequestTest.scala index af752944e33..765014f517e 100644 --- a/core/src/test/scala/unit/kafka/server/ListGroupsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/ListGroupsRequestTest.scala @@ -34,8 +34,6 @@ import org.junit.jupiter.api.extension.ExtendWith class ListGroupsRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) { @ClusterTest( serverProperties = Array( - new ClusterConfigProperty(key = GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"), - new ClusterConfigProperty(key = GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer"), new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"), new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"), new ClusterConfigProperty(key = GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, value = "1000") @@ -46,8 +44,6 @@ class ListGroupsRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBa } @ClusterTest(serverProperties = Array( - new ClusterConfigProperty(key = GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"), - new ClusterConfigProperty(key = GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer"), new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"), new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"), new ClusterConfigProperty(key = GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, value = "1000") diff --git a/core/src/test/scala/unit/kafka/server/OffsetCommitRequestTest.scala b/core/src/test/scala/unit/kafka/server/OffsetCommitRequestTest.scala index 96ab980ca07..69f31351d17 100644 --- a/core/src/test/scala/unit/kafka/server/OffsetCommitRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/OffsetCommitRequestTest.scala @@ -32,8 +32,6 @@ class OffsetCommitRequestTest(cluster: ClusterInstance) extends GroupCoordinator @ClusterTest( serverProperties = Array( - new ClusterConfigProperty(key = GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"), - new ClusterConfigProperty(key = GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer"), new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"), new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1") ) @@ -44,8 +42,6 @@ class OffsetCommitRequestTest(cluster: ClusterInstance) extends GroupCoordinator @ClusterTest( serverProperties = Array( - new ClusterConfigProperty(key = GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"), - new ClusterConfigProperty(key = GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer"), new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"), new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1") ) diff --git a/core/src/test/scala/unit/kafka/server/OffsetDeleteRequestTest.scala b/core/src/test/scala/unit/kafka/server/OffsetDeleteRequestTest.scala index daabc7b1bbe..af2a7b9def4 100644 --- a/core/src/test/scala/unit/kafka/server/OffsetDeleteRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/OffsetDeleteRequestTest.scala @@ -31,8 +31,6 @@ import org.junit.jupiter.api.extension.ExtendWith class OffsetDeleteRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) { @ClusterTest( serverProperties = Array( - new ClusterConfigProperty(key = GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"), - new ClusterConfigProperty(key = GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer"), new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"), new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1") ) @@ -42,8 +40,6 @@ class OffsetDeleteRequestTest(cluster: ClusterInstance) extends GroupCoordinator } @ClusterTest(serverProperties = Array( - new ClusterConfigProperty(key = GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"), - new ClusterConfigProperty(key = GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer"), new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"), new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1") )) diff --git a/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala b/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala index 9365c335dc6..0e2d22b4d84 100644 --- a/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala @@ -37,8 +37,6 @@ class OffsetFetchRequestTest(cluster: ClusterInstance) extends GroupCoordinatorB @ClusterTest( serverProperties = Array( - new ClusterConfigProperty(key = GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"), - new ClusterConfigProperty(key = GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer"), new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"), new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1") ) @@ -49,8 +47,6 @@ class OffsetFetchRequestTest(cluster: ClusterInstance) extends GroupCoordinatorB @ClusterTest( serverProperties = Array( - new ClusterConfigProperty(key = GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"), - new ClusterConfigProperty(key = GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer"), new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"), new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1") ) @@ -71,8 +67,6 @@ class OffsetFetchRequestTest(cluster: ClusterInstance) extends GroupCoordinatorB @ClusterTest( serverProperties = Array( - new ClusterConfigProperty(key = GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"), - new ClusterConfigProperty(key = GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer"), new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"), new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1") ) @@ -82,8 +76,6 @@ class OffsetFetchRequestTest(cluster: ClusterInstance) extends GroupCoordinatorB } @ClusterTest(serverProperties = Array( - new ClusterConfigProperty(key = GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"), - new ClusterConfigProperty(key = GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer"), new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"), new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1") )) @@ -103,8 +95,6 @@ class OffsetFetchRequestTest(cluster: ClusterInstance) extends GroupCoordinatorB @ClusterTest( serverProperties = Array( - new ClusterConfigProperty(key = GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"), - new ClusterConfigProperty(key = GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer"), new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"), new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1") ) @@ -114,8 +104,6 @@ class OffsetFetchRequestTest(cluster: ClusterInstance) extends GroupCoordinatorB } @ClusterTest(serverProperties = Array( - new ClusterConfigProperty(key = GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"), - new ClusterConfigProperty(key = GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer"), new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"), new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1") )) diff --git a/core/src/test/scala/unit/kafka/server/ShareGroupDescribeRequestTest.scala b/core/src/test/scala/unit/kafka/server/ShareGroupDescribeRequestTest.scala index ba2fc8671f9..2243471da00 100644 --- a/core/src/test/scala/unit/kafka/server/ShareGroupDescribeRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/ShareGroupDescribeRequestTest.scala @@ -72,7 +72,6 @@ class ShareGroupDescribeRequestTest(cluster: ClusterInstance) extends GroupCoord @ClusterTest( serverProperties = Array( - new ClusterConfigProperty(key = GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"), new ClusterConfigProperty(key = GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer,share"), new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"), new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"), diff --git a/core/src/test/scala/unit/kafka/server/SyncGroupRequestTest.scala b/core/src/test/scala/unit/kafka/server/SyncGroupRequestTest.scala index 3f6994bc5ab..760c2372930 100644 --- a/core/src/test/scala/unit/kafka/server/SyncGroupRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/SyncGroupRequestTest.scala @@ -39,8 +39,6 @@ import scala.concurrent.{Await, Future} @ClusterTestDefaults(types = Array(Type.KRAFT)) class SyncGroupRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) { @ClusterTest(serverProperties = Array( - new ClusterConfigProperty(key = GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"), - new ClusterConfigProperty(key = GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer"), new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"), new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"), new ClusterConfigProperty(key = GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, value = "1000") diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java index 1ae4f41353c..a6ecc2319ad 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java @@ -76,9 +76,9 @@ public class GroupCoordinatorConfig { public static final String GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG = "group.coordinator.rebalance.protocols"; public static final String GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DOC = "The list of enabled rebalance protocols. Supported protocols: " + Arrays.stream(Group.GroupType.values()).map(Group.GroupType::toString).collect(Collectors.joining(",")) + ". " + - "The " + Group.GroupType.CONSUMER + " rebalance protocol is in preview and therefore must not be used in production. " + "The " + Group.GroupType.SHARE + " rebalance protocol is in early access and therefore must not be used in production."; - public static final List GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DEFAULT = Collections.singletonList(Group.GroupType.CLASSIC.toString()); + public static final List GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DEFAULT = + Collections.unmodifiableList(Arrays.asList(Group.GroupType.CLASSIC.toString(), Group.GroupType.CONSUMER.toString())); public static final String GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG = "group.coordinator.append.linger.ms"; public static final String GROUP_COORDINATOR_APPEND_LINGER_MS_DOC = "The duration in milliseconds that the coordinator will " + "wait for writes to accumulate before flushing them to disk. Transactional writes are not accumulated."; diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java index e578c558ba6..e3674bd247d 100644 --- a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java +++ b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java @@ -41,10 +41,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import static kafka.test.annotation.Type.CO_KRAFT; -import static kafka.test.annotation.Type.KRAFT; import static kafka.test.annotation.Type.ZK; -import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG; -import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG; import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG; import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG; @@ -65,11 +62,11 @@ import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_ *

* We can reduce the number of cases as same as the old test framework by using the following methods: *

    - *
  • {@link #forConsumerGroupCoordinator} for the case of (consumer group protocol)
  • + *
  • {@link #forKRaftGroupCoordinator} for the case of (consumer group protocol)
  • *
  • (CO_KRAFT servers) with (group.coordinator.new.enable=true) with (classic / consumer group protocols) = 2 cases
  • *
*
    - *
  • {@link #forClassicGroupCoordinator} for the case of (classic group protocol)
  • + *
  • {@link #forZkGroupCoordinator} for the case of (classic group protocol)
  • *
  • (ZK / KRAFT servers) with (group.coordinator.new.enable=false) with (classic group protocol) = 2 cases
  • *
*/ @@ -79,34 +76,32 @@ class ConsumerGroupCommandTestUtils { } static List generator() { - return Stream.concat(forConsumerGroupCoordinator().stream(), forClassicGroupCoordinator().stream()) - .collect(Collectors.toList()); + return Stream + .concat(forKRaftGroupCoordinator().stream(), forZkGroupCoordinator().stream()) + .collect(Collectors.toList()); } - static List forConsumerGroupCoordinator() { + static List forKRaftGroupCoordinator() { Map serverProperties = new HashMap<>(); serverProperties.put(OFFSETS_TOPIC_PARTITIONS_CONFIG, "1"); serverProperties.put(OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, "1"); - serverProperties.put(NEW_GROUP_COORDINATOR_ENABLE_CONFIG, "true"); - serverProperties.put(GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, "classic,consumer"); return Collections.singletonList(ClusterConfig.defaultBuilder() .setTypes(Collections.singleton(CO_KRAFT)) .setServerProperties(serverProperties) - .setTags(Collections.singletonList("consumerGroupCoordinator")) + .setTags(Collections.singletonList("kraftGroupCoordinator")) .build()); } - static List forClassicGroupCoordinator() { + static List forZkGroupCoordinator() { Map serverProperties = new HashMap<>(); serverProperties.put(OFFSETS_TOPIC_PARTITIONS_CONFIG, "1"); serverProperties.put(OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, "1"); - serverProperties.put(NEW_GROUP_COORDINATOR_ENABLE_CONFIG, "false"); return Collections.singletonList(ClusterConfig.defaultBuilder() - .setTypes(Stream.of(ZK, KRAFT).collect(Collectors.toSet())) + .setTypes(Collections.singleton(ZK)) .setServerProperties(serverProperties) - .setTags(Collections.singletonList("classicGroupCoordinator")) + .setTags(Collections.singletonList("zkGroupCoordinator")) .build()); } diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ListConsumerGroupTest.java b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ListConsumerGroupTest.java index 39dff2f6423..0ff622bf143 100644 --- a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ListConsumerGroupTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ListConsumerGroupTest.java @@ -83,7 +83,7 @@ public class ListConsumerGroupTest { } private static List consumerProtocolOnlyGenerator() { - return ConsumerGroupCommandTestUtils.forConsumerGroupCoordinator(); + return ConsumerGroupCommandTestUtils.forKRaftGroupCoordinator(); } private List supportedGroupProtocols() {