MINOR: Set `group.coordinator.rebalance.protocols` to `classic,consumer` by default (#17057)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
David Jacot 2024-09-05 07:50:20 +02:00 committed by GitHub
parent 748d20200f
commit 9abb8d3b3c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
34 changed files with 84 additions and 265 deletions

View File

@ -3831,7 +3831,7 @@ class KafkaApis(val requestChannel: RequestChannel,
GroupVersion.fromFeatureLevel(metadataCache.features.finalizedFeatures.getOrDefault(GroupVersion.FEATURE_NAME, 0.toShort))
}
private def isConsumerGroupProtocolEnabled(): Boolean = {
def isConsumerGroupProtocolEnabled(): Boolean = {
groupCoordinator.isNewGroupCoordinator &&
config.groupCoordinatorRebalanceProtocols.contains(Group.GroupType.CONSUMER) &&
groupVersion().isConsumerRebalanceProtocolSupported

View File

@ -581,19 +581,13 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
throw new ConfigException(s"Disabling the '${GroupType.CLASSIC}' protocol is not supported.")
}
if (protocols.contains(GroupType.CONSUMER)) {
if (processRoles.isEmpty) {
throw new ConfigException(s"The new '${GroupType.CONSUMER}' rebalance protocol is only supported in KRaft cluster.")
}
if (!isNewGroupCoordinatorEnabled) {
throw new ConfigException(s"The new '${GroupType.CONSUMER}' rebalance protocol is only supported by the new group coordinator.")
if (processRoles.isEmpty || !isNewGroupCoordinatorEnabled) {
warn(s"The new '${GroupType.CONSUMER}' rebalance protocol is only supported in KRaft cluster with the new group coordinator.")
}
}
if (protocols.contains(GroupType.SHARE)) {
if (processRoles.isEmpty) {
throw new ConfigException(s"The new '${GroupType.SHARE}' rebalance protocol is only supported in KRaft cluster.")
}
if (!isNewGroupCoordinatorEnabled) {
throw new ConfigException(s"The new '${GroupType.SHARE}' rebalance protocol is only supported by the new group coordinator.")
if (processRoles.isEmpty || !isNewGroupCoordinatorEnabled) {
warn(s"The new '${GroupType.SHARE}' rebalance protocol is only supported in KRaft cluster with the new group coordinator.")
}
warn(s"Share groups and the new '${GroupType.SHARE}' rebalance protocol are enabled. " +
"This is part of the early access of KIP-932 and MUST NOT be used in production.")

View File

@ -19,7 +19,6 @@ package kafka.admin;
import kafka.cluster.Broker;
import kafka.cluster.EndPoint;
import kafka.test.ClusterInstance;
import kafka.test.annotation.ClusterConfigProperty;
import kafka.test.annotation.ClusterTest;
import kafka.test.annotation.Type;
import kafka.test.junit.ClusterTestExtensions;
@ -63,8 +62,6 @@ import static java.util.Arrays.asList;
import static java.util.Collections.singleton;
import static java.util.Collections.singletonList;
import static java.util.Collections.singletonMap;
import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG;
import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG;
import static org.apache.kafka.security.PasswordEncoderConfigs.PASSWORD_ENCODER_CIPHER_ALGORITHM_CONFIG;
import static org.apache.kafka.security.PasswordEncoderConfigs.PASSWORD_ENCODER_ITERATIONS_CONFIG;
import static org.apache.kafka.security.PasswordEncoderConfigs.PASSWORD_ENCODER_KEYFACTORY_ALGORITHM_CONFIG;
@ -292,10 +289,7 @@ public class ConfigCommandIntegrationTest {
}
}
@ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, serverProperties = {
@ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
@ClusterConfigProperty(key = GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer"),
})
@ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT})
public void testGroupConfigUpdateUsingKraft() throws Exception {
alterOpts = asList("--bootstrap-server", cluster.bootstrapServers(), "--entity-type", "groups", "--alter");
verifyGroupConfigUpdate();

View File

@ -32,7 +32,6 @@ import org.apache.kafka.test.TestUtils;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
@ -41,7 +40,7 @@ import java.util.stream.Collectors;
import static org.apache.kafka.clients.consumer.GroupProtocol.CLASSIC;
import static org.apache.kafka.clients.consumer.GroupProtocol.CONSUMER;
import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG;
import static org.apache.kafka.common.utils.Utils.mkSet;
public interface ClusterInstance {
@ -161,15 +160,11 @@ public interface ClusterInstance {
}
default Set<GroupProtocol> supportedGroupProtocols() {
Map<String, String> serverProperties = config().serverProperties();
Set<GroupProtocol> supportedGroupProtocols = new HashSet<>();
supportedGroupProtocols.add(CLASSIC);
if (serverProperties.getOrDefault(GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, "").contains("consumer")) {
supportedGroupProtocols.add(CONSUMER);
if (isKRaftTest() && brokers().values().stream().allMatch(b -> b.dataPlaneRequestProcessor().isConsumerGroupProtocolEnabled())) {
return mkSet(CLASSIC, CONSUMER);
} else {
return Collections.singleton(CLASSIC);
}
return Collections.unmodifiableSet(supportedGroupProtocols);
}
//---------------------------[modify]---------------------------//

View File

@ -192,45 +192,24 @@ public class ClusterTestExtensionsTest {
Assertions.assertEquals(MetadataVersion.latestTesting(), clusterInstance.config().metadataVersion());
}
@ClusterTests({
@ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, serverProperties = {
@ClusterConfigProperty(key = GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer"),
}),
@ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, serverProperties = {
@ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
@ClusterConfigProperty(key = GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer"),
})
})
@ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT})
public void testSupportedNewGroupProtocols(ClusterInstance clusterInstance) {
Set<GroupProtocol> supportedGroupProtocols = new HashSet<>();
supportedGroupProtocols.add(CLASSIC);
supportedGroupProtocols.add(CONSUMER);
Assertions.assertTrue(clusterInstance.supportedGroupProtocols().containsAll(supportedGroupProtocols));
Assertions.assertEquals(2, clusterInstance.supportedGroupProtocols().size());
Assertions.assertEquals(supportedGroupProtocols, clusterInstance.supportedGroupProtocols());
}
@ClusterTests({
@ClusterTest(types = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}, serverProperties = {
@ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
}),
@ClusterTest(types = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}, serverProperties = {
@ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "false"),
}),
@ClusterTest(types = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}, serverProperties = {
@ClusterConfigProperty(key = GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic"),
}),
@ClusterTest(types = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}, serverProperties = {
@ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
@ClusterConfigProperty(key = GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic"),
}),
@ClusterTest(types = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}, serverProperties = {
@ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "false"),
@ClusterConfigProperty(key = GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic"),
}, tags = {"disable-new-coordinator-and-disable-new-consumer-rebalance-coordinator"}),
})
})
public void testNotSupportedNewGroupProtocols(ClusterInstance clusterInstance) {
Assertions.assertTrue(clusterInstance.supportedGroupProtocols().contains(CLASSIC));
Assertions.assertEquals(1, clusterInstance.supportedGroupProtocols().size());
Assertions.assertEquals(Collections.singleton(CLASSIC), clusterInstance.supportedGroupProtocols());
}

View File

@ -1799,7 +1799,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
}
@ParameterizedTest
@ValueSource(strings = Array("kraft+kip848"))
@ValueSource(strings = Array("kraft"))
def testIncrementalAlterGroupConfigsWithAlterAcl(quorum: String): Unit = {
addAndVerifyAcls(groupAlterConfigsAcl(groupResource), groupResource)
@ -1809,7 +1809,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
}
@ParameterizedTest
@ValueSource(strings = Array("kraft+kip848"))
@ValueSource(strings = Array("kraft"))
def testIncrementalAlterGroupConfigsWithOperationAll(quorum: String): Unit = {
val allowAllOpsAcl = new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, ALL, ALLOW)
addAndVerifyAcls(Set(allowAllOpsAcl), groupResource)
@ -1820,7 +1820,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
}
@ParameterizedTest
@ValueSource(strings = Array("kraft+kip848"))
@ValueSource(strings = Array("kraft"))
def testIncrementalAlterGroupConfigsWithoutAlterAcl(quorum: String): Unit = {
removeAllClientAcls()
@ -1830,7 +1830,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
}
@ParameterizedTest
@ValueSource(strings = Array("kraft+kip848"))
@ValueSource(strings = Array("kraft"))
def testDescribeGroupConfigsWithDescribeAcl(quorum: String): Unit = {
addAndVerifyAcls(groupDescribeConfigsAcl(groupResource), groupResource)
@ -1840,7 +1840,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
}
@ParameterizedTest
@ValueSource(strings = Array("kraft+kip848"))
@ValueSource(strings = Array("kraft"))
def testDescribeGroupConfigsWithOperationAll(quorum: String): Unit = {
val allowAllOpsAcl = new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, ALL, ALLOW)
addAndVerifyAcls(Set(allowAllOpsAcl), groupResource)
@ -1851,7 +1851,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
}
@ParameterizedTest
@ValueSource(strings = Array("kraft+kip848"))
@ValueSource(strings = Array("kraft"))
def testDescribeGroupConfigsWithoutDescribeAcl(quorum: String): Unit = {
removeAllClientAcls()

View File

@ -115,14 +115,12 @@ object BaseConsumerTest {
// We want to test the following combinations:
// * ZooKeeper and the classic group protocol
// * KRaft and the classic group protocol
// * KRaft with the new group coordinator enabled and the classic group protocol
// * KRaft with the new group coordinator enabled and the consumer group protocol
// * KRaft and the consumer group protocol
def getTestQuorumAndGroupProtocolParametersAll() : java.util.stream.Stream[Arguments] = {
util.Arrays.stream(Array(
Arguments.of("zk", "classic"),
Arguments.of("kraft", "classic"),
Arguments.of("kraft+kip848", "classic"),
Arguments.of("kraft+kip848", "consumer")
Arguments.of("kraft", "consumer")
))
}
@ -138,20 +136,18 @@ object BaseConsumerTest {
// For tests that only work with the classic group protocol, we want to test the following combinations:
// * ZooKeeper and the classic group protocol
// * KRaft and the classic group protocol
// * KRaft with the new group coordinator enabled and the classic group protocol
def getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly() : java.util.stream.Stream[Arguments] = {
util.Arrays.stream(Array(
Arguments.of("zk", "classic"),
Arguments.of("kraft", "classic"),
Arguments.of("kraft+kip848", "classic")
Arguments.of("kraft", "classic")
))
}
// For tests that only work with the consumer group protocol, we want to test the following combination:
// * KRaft with the new group coordinator enabled and the consumer group protocol
// * KRaft and the consumer group protocol
def getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly(): java.util.stream.Stream[Arguments] = {
util.Arrays.stream(Array(
Arguments.of("kraft+kip848", "consumer")
Arguments.of("kraft", "consumer")
))
}

View File

@ -71,12 +71,7 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness {
if (isZkMigrationTest()) {
cfgs.foreach(_.setProperty(KRaftConfigs.MIGRATION_ENABLED_CONFIG, "true"))
}
if (isNewGroupCoordinatorEnabled()) {
cfgs.foreach(_.setProperty(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, "true"))
cfgs.foreach(_.setProperty(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, "classic,consumer"))
}
if (isShareGroupTest()) {
cfgs.foreach(_.setProperty(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, "true"))
cfgs.foreach(_.setProperty(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, "classic,consumer,share"))
cfgs.foreach(_.setProperty(ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG, "true"))
}

View File

@ -528,7 +528,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft", "kraft+kip848"))
@ValueSource(strings = Array("zk", "kraft"))
def testAbortTransaction(quorum: String): Unit = {
client = createAdminClient
val tp = new TopicPartition("topic1", 0)
@ -538,7 +538,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
val configs = new util.HashMap[String, Object]()
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, plaintextBootstrapServers(brokers))
configs.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.toString)
if (quorum == "kraft+kip848")
if (quorum == "kraft")
configs.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, ConsumerProtocol.PROTOCOL_TYPE)
val consumer = new KafkaConsumer(configs, new ByteArrayDeserializer, new ByteArrayDeserializer)
try {
@ -1006,7 +1006,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
}
@ParameterizedTest
@ValueSource(strings = Array("kraft+kip848"))
@ValueSource(strings = Array("kraft"))
def testIncrementalAlterAndDescribeGroupConfigs(quorum: String): Unit = {
client = createAdminClient
val group = "describe-alter-configs-group"

View File

@ -238,7 +238,7 @@ class PlaintextConsumerAssignorsTest extends AbstractConsumerTest {
// Remote assignors only supported with consumer group protocol
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@CsvSource(Array(
"kraft+kip848, consumer"
"kraft, consumer"
))
def testRemoteAssignorInvalid(quorum: String, groupProtocol: String): Unit = {
// 1 consumer using invalid remote assignor
@ -268,7 +268,7 @@ class PlaintextConsumerAssignorsTest extends AbstractConsumerTest {
// Remote assignors only supported with consumer group protocol
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@CsvSource(Array(
"kraft+kip848, consumer"
"kraft, consumer"
))
def testRemoteAssignorRange(quorum: String, groupProtocol: String): Unit = {
// 1 consumer using range assignment

View File

@ -112,7 +112,7 @@ class TransactionsTest extends IntegrationTestHarness {
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft", "kraft+kip848"))
@ValueSource(strings = Array("zk", "kraft"))
def testBasicTransactions(quorum: String): Unit = {
val producer = transactionalProducers.head
val consumer = transactionalConsumers.head
@ -173,7 +173,7 @@ class TransactionsTest extends IntegrationTestHarness {
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft", "kraft+kip848"))
@ValueSource(strings = Array("zk", "kraft"))
def testReadCommittedConsumerShouldNotSeeUndecidedData(quorum: String): Unit = {
val producer1 = transactionalProducers.head
val producer2 = createTransactionalProducer("other")
@ -241,7 +241,7 @@ class TransactionsTest extends IntegrationTestHarness {
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft", "kraft+kip848"))
@ValueSource(strings = Array("zk", "kraft"))
def testDelayedFetchIncludesAbortedTransaction(quorum: String): Unit = {
val producer1 = transactionalProducers.head
val producer2 = createTransactionalProducer("other")
@ -300,14 +300,14 @@ class TransactionsTest extends IntegrationTestHarness {
@nowarn("cat=deprecation")
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft", "kraft+kip848"))
@ValueSource(strings = Array("zk", "kraft"))
def testSendOffsetsWithGroupId(quorum: String): Unit = {
sendOffset((producer, groupId, consumer) =>
producer.sendOffsetsToTransaction(TestUtils.consumerPositions(consumer).asJava, groupId))
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft", "kraft+kip848"))
@ValueSource(strings = Array("zk", "kraft"))
def testSendOffsetsWithGroupMetadata(quorum: String): Unit = {
sendOffset((producer, _, consumer) =>
producer.sendOffsetsToTransaction(TestUtils.consumerPositions(consumer).asJava, consumer.groupMetadata()))
@ -387,7 +387,7 @@ class TransactionsTest extends IntegrationTestHarness {
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft", "kraft+kip848"))
@ValueSource(strings = Array("zk", "kraft"))
def testFencingOnCommit(quorum: String): Unit = {
val producer1 = transactionalProducers(0)
val producer2 = transactionalProducers(1)
@ -417,7 +417,7 @@ class TransactionsTest extends IntegrationTestHarness {
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft", "kraft+kip848"))
@ValueSource(strings = Array("zk", "kraft"))
def testFencingOnSendOffsets(quorum: String): Unit = {
val producer1 = transactionalProducers(0)
val producer2 = transactionalProducers(1)
@ -449,7 +449,7 @@ class TransactionsTest extends IntegrationTestHarness {
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft", "kraft+kip848"))
@ValueSource(strings = Array("zk", "kraft"))
def testOffsetMetadataInSendOffsetsToTransaction(quorum: String): Unit = {
val tp = new TopicPartition(topic1, 0)
val groupId = "group"
@ -475,26 +475,26 @@ class TransactionsTest extends IntegrationTestHarness {
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft", "kraft+kip848"))
@ValueSource(strings = Array("zk", "kraft"))
def testInitTransactionsTimeout(quorum: String): Unit = {
testTimeout(needInitAndSendMsg = false, producer => producer.initTransactions())
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft", "kraft+kip848"))
@ValueSource(strings = Array("zk", "kraft"))
def testSendOffsetsToTransactionTimeout(quorum: String): Unit = {
testTimeout(needInitAndSendMsg = true, producer => producer.sendOffsetsToTransaction(
Map(new TopicPartition(topic1, 0) -> new OffsetAndMetadata(0)).asJava, new ConsumerGroupMetadata("test-group")))
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft", "kraft+kip848"))
@ValueSource(strings = Array("zk", "kraft"))
def testCommitTransactionTimeout(quorum: String): Unit = {
testTimeout(needInitAndSendMsg = true, producer => producer.commitTransaction())
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft", "kraft+kip848"))
@ValueSource(strings = Array("zk", "kraft"))
def testAbortTransactionTimeout(quorum: String): Unit = {
testTimeout(needInitAndSendMsg = true, producer => producer.abortTransaction())
}
@ -515,7 +515,7 @@ class TransactionsTest extends IntegrationTestHarness {
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft", "kraft+kip848"))
@ValueSource(strings = Array("zk", "kraft"))
def testFencingOnSend(quorum: String): Unit = {
val producer1 = transactionalProducers(0)
val producer2 = transactionalProducers(1)
@ -560,7 +560,7 @@ class TransactionsTest extends IntegrationTestHarness {
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft", "kraft+kip848"))
@ValueSource(strings = Array("zk", "kraft"))
def testFencingOnAddPartitions(quorum: String): Unit = {
val producer1 = transactionalProducers(0)
val producer2 = transactionalProducers(1)
@ -607,7 +607,7 @@ class TransactionsTest extends IntegrationTestHarness {
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft", "kraft+kip848"))
@ValueSource(strings = Array("zk", "kraft"))
def testFencingOnTransactionExpiration(quorum: String): Unit = {
val producer = createTransactionalProducer("expiringProducer", transactionTimeoutMs = 100)
@ -650,7 +650,7 @@ class TransactionsTest extends IntegrationTestHarness {
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft", "kraft+kip848"))
@ValueSource(strings = Array("zk", "kraft"))
def testMultipleMarkersOneLeader(quorum: String): Unit = {
val firstProducer = transactionalProducers.head
val consumer = transactionalConsumers.head
@ -688,7 +688,7 @@ class TransactionsTest extends IntegrationTestHarness {
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft", "kraft+kip848"))
@ValueSource(strings = Array("zk", "kraft"))
def testConsecutivelyRunInitTransactions(quorum: String): Unit = {
val producer = createTransactionalProducer(transactionalId = "normalProducer")
@ -697,7 +697,7 @@ class TransactionsTest extends IntegrationTestHarness {
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft", "kraft+kip848"))
@ValueSource(strings = Array("zk", "kraft"))
def testBumpTransactionalEpoch(quorum: String): Unit = {
val producer = createTransactionalProducer("transactionalProducer",
deliveryTimeoutMs = 5000, requestTimeoutMs = 5000)
@ -759,7 +759,7 @@ class TransactionsTest extends IntegrationTestHarness {
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft", "kraft+kip848"))
@ValueSource(strings = Array("zk", "kraft"))
def testFailureToFenceEpoch(quorum: String): Unit = {
val producer1 = transactionalProducers.head
val producer2 = createTransactionalProducer("transactional-producer", maxBlockMs = 1000)

View File

@ -191,10 +191,6 @@ abstract class QuorumTestHarness extends Logging {
TestInfoUtils.isZkMigrationTest(testInfo)
}
def isNewGroupCoordinatorEnabled(): Boolean = {
TestInfoUtils.isNewGroupCoordinatorEnabled(testInfo)
}
def isShareGroupTest(): Boolean = {
TestInfoUtils.isShareGroupTest(testInfo)
}

View File

@ -55,10 +55,6 @@ object TestInfoUtils {
final val TestWithParameterizedQuorumAndGroupProtocolNames = "{displayName}.quorum={0}.groupProtocol={1}"
def isNewGroupCoordinatorEnabled(testInfo: TestInfo): Boolean = {
testInfo.getDisplayName.contains("kraft+kip848")
}
def isShareGroupTest(testInfo: TestInfo): Boolean = {
testInfo.getDisplayName.contains("kraft+kip932")
}

View File

@ -69,8 +69,6 @@ class ConsumerGroupDescribeRequestTest(cluster: ClusterInstance) extends GroupCo
@ClusterTest(
types = Array(Type.KRAFT),
serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
),
@ -102,8 +100,6 @@ class ConsumerGroupDescribeRequestTest(cluster: ClusterInstance) extends GroupCo
@ClusterTest(
types = Array(Type.KRAFT),
serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
)

View File

@ -40,7 +40,14 @@ import scala.jdk.CollectionConverters._
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) {
@ClusterTest()
@ClusterTest(
types = Array(Type.KRAFT),
serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
)
)
def testConsumerGroupHeartbeatIsInaccessibleWhenDisabledByStaticConfig(): Unit = {
val consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder(
new ConsumerGroupHeartbeatRequestData()
@ -54,8 +61,6 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) {
@ClusterTest(
types = Array(Type.KRAFT),
serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
),
@ -76,8 +81,6 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) {
@ClusterTest(
types = Array(Type.KRAFT),
serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
)
@ -168,8 +171,6 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) {
@ClusterTest(
types = Array(Type.KRAFT),
serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
)
@ -286,8 +287,6 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) {
@ClusterTest(
types = Array(Type.KRAFT),
serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG, value = "5000"),
@ -398,8 +397,6 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) {
@ClusterTest(
types = Array(Type.KRAFT),
serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, value = "5000")

View File

@ -34,8 +34,6 @@ import org.junit.jupiter.api.extension.ExtendWith
class ConsumerProtocolMigrationTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) {
@ClusterTest(
serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
)
@ -106,8 +104,6 @@ class ConsumerProtocolMigrationTest(cluster: ClusterInstance) extends GroupCoord
@ClusterTest(
serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
)
@ -171,8 +167,6 @@ class ConsumerProtocolMigrationTest(cluster: ClusterInstance) extends GroupCoord
@ClusterTest(
serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
)

View File

@ -33,8 +33,6 @@ import org.junit.jupiter.api.extension.ExtendWith
class DeleteGroupsRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) {
@ClusterTest(
serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
)
@ -45,8 +43,6 @@ class DeleteGroupsRequestTest(cluster: ClusterInstance) extends GroupCoordinator
@ClusterTest(
serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
)

View File

@ -34,8 +34,6 @@ import scala.jdk.CollectionConverters._
@ClusterTestDefaults(types = Array(Type.KRAFT))
class DescribeGroupsRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) {
@ClusterTest(serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
))

View File

@ -36,7 +36,7 @@ import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity}
import org.apache.kafka.common.record.{CompressionType, RecordVersion}
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.{TopicPartition, Uuid}
import org.apache.kafka.coordinator.group.{GroupConfig, GroupCoordinatorConfig}
import org.apache.kafka.coordinator.group.GroupConfig
import org.apache.kafka.server.common.MetadataVersion.IBP_3_0_IV1
import org.apache.kafka.server.config.{ConfigType, QuotaConfigs, ServerLogConfigs, ZooKeeperInternals}
import org.apache.kafka.storage.internals.log.LogConfig
@ -62,10 +62,6 @@ import scala.jdk.CollectionConverters._
class DynamicConfigChangeTest extends KafkaServerTestHarness {
override def generateConfigs: Seq[KafkaConfig] = {
val cfg = TestUtils.createBrokerConfig(0, zkConnectOrNull)
if (isNewGroupCoordinatorEnabled()) {
cfg.setProperty(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, "true")
cfg.setProperty(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, "classic,consumer")
}
List(KafkaConfig.fromProps(cfg))
}
@ -584,7 +580,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
}
@ParameterizedTest
@ValueSource(strings = Array("kraft+kip848"))
@ValueSource(strings = Array("kraft"))
def testDynamicGroupConfigChange(quorum: String): Unit = {
val newSessionTimeoutMs = 50000
val consumerGroupId = "group-foo"
@ -611,7 +607,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
}
@ParameterizedTest
@ValueSource(strings = Array("kraft+kip848"))
@ValueSource(strings = Array("kraft"))
def testIncrementalAlterDefaultGroupConfig(quorum: String): Unit = {
val admin = createAdminClient()
try {

View File

@ -60,11 +60,11 @@ class GroupCoordinatorBaseRequestTest(cluster: ClusterInstance) {
}
protected def isUnstableApiEnabled: Boolean = {
cluster.config.serverProperties.get("unstable.api.versions.enable") == "true"
cluster.brokers.values.stream.allMatch(b => b.config.unstableApiVersionsEnabled)
}
protected def isNewGroupCoordinatorEnabled: Boolean = {
cluster.config.serverProperties.get("group.coordinator.new.enable") == "true"
cluster.brokers.values.stream.allMatch(b => b.config.isNewGroupCoordinatorEnabled)
}
protected def commitOffset(

View File

@ -38,8 +38,6 @@ import scala.concurrent.Future
@ClusterTestDefaults(types = Array(Type.KRAFT))
class HeartbeatRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) {
@ClusterTest(serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
))

View File

@ -41,7 +41,6 @@ import scala.jdk.CollectionConverters._
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
class JoinGroupRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) {
@ClusterTest(types = Array(Type.KRAFT), serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, value = "1000"),

View File

@ -3356,9 +3356,7 @@ class KafkaApisTest extends Logging {
}.toMap
)
}
kafkaApis = createKafkaApis(overrideProperties = Map(
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true"
))
kafkaApis = createKafkaApis()
kafkaApis.handleWriteTxnMarkersRequest(requestChannelRequest, RequestLocal.noCaching)
val expectedResponse = new WriteTxnMarkersResponseData()
@ -3441,9 +3439,7 @@ class KafkaApisTest extends Logging {
ArgumentMatchers.eq(TransactionResult.COMMIT),
ArgumentMatchers.eq(Duration.ofMillis(ServerConfigs.REQUEST_TIMEOUT_MS_DEFAULT))
)).thenReturn(FutureUtils.failedFuture[Void](error.exception()))
kafkaApis = createKafkaApis(overrideProperties = Map(
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true"
))
kafkaApis = createKafkaApis()
kafkaApis.handleWriteTxnMarkersRequest(requestChannelRequest, RequestLocal.noCaching)
val expectedError = error match {
@ -4612,7 +4608,6 @@ class KafkaApisTest extends Logging {
val request = buildRequest(shareFetchRequest)
kafkaApis = createKafkaApis(
overrideProperties = Map(
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
raftSupport = true)
@ -4696,7 +4691,6 @@ class KafkaApisTest extends Logging {
var request = buildRequest(shareFetchRequest)
kafkaApis = createKafkaApis(
overrideProperties = Map(
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
raftSupport = true)
@ -4800,7 +4794,6 @@ class KafkaApisTest extends Logging {
var request = buildRequest(shareFetchRequest)
kafkaApis = createKafkaApis(
overrideProperties = Map(
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
raftSupport = true)
@ -4884,7 +4877,6 @@ class KafkaApisTest extends Logging {
val request = buildRequest(shareFetchRequest)
kafkaApis = createKafkaApis(
overrideProperties = Map(
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
raftSupport = true)
@ -4962,7 +4954,6 @@ class KafkaApisTest extends Logging {
val request = buildRequest(shareFetchRequest)
kafkaApis = createKafkaApis(
overrideProperties = Map(
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
raftSupport = true)
@ -5027,7 +5018,6 @@ class KafkaApisTest extends Logging {
val request = buildRequest(shareFetchRequest)
kafkaApis = createKafkaApis(
overrideProperties = Map(
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
raftSupport = true)
@ -5084,7 +5074,6 @@ class KafkaApisTest extends Logging {
val request = buildRequest(shareFetchRequest)
kafkaApis = createKafkaApis(
overrideProperties = Map(
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
raftSupport = true)
@ -5156,7 +5145,6 @@ class KafkaApisTest extends Logging {
// First share fetch request is to establish the share session with the broker.
kafkaApis = createKafkaApis(
overrideProperties = Map(
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
raftSupport = true)
@ -5250,7 +5238,6 @@ class KafkaApisTest extends Logging {
// First share fetch request is to establish the share session with the broker.
kafkaApis = createKafkaApis(
overrideProperties = Map(
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
raftSupport = true)
@ -5398,7 +5385,6 @@ class KafkaApisTest extends Logging {
// First share fetch request is to establish the share session with the broker.
kafkaApis = createKafkaApis(
overrideProperties = Map(
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
raftSupport = true)
@ -5732,7 +5718,6 @@ class KafkaApisTest extends Logging {
// First share fetch request is to establish the share session with the broker.
kafkaApis = createKafkaApis(
overrideProperties = Map(
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
raftSupport = true)
@ -6098,7 +6083,6 @@ class KafkaApisTest extends Logging {
// First share fetch request is to establish the share session with the broker.
kafkaApis = createKafkaApis(
overrideProperties = Map(
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
raftSupport = true)
@ -6245,7 +6229,6 @@ class KafkaApisTest extends Logging {
// First share fetch request is to establish the share session with the broker.
kafkaApis = createKafkaApis(
overrideProperties = Map(
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
raftSupport = true)
@ -6389,7 +6372,6 @@ class KafkaApisTest extends Logging {
// First share fetch request is to establish the share session with the broker.
kafkaApis = createKafkaApis(
overrideProperties = Map(
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
raftSupport = true)
@ -6559,7 +6541,6 @@ class KafkaApisTest extends Logging {
val request = buildRequest(shareFetchRequest)
kafkaApis = createKafkaApis(
overrideProperties = Map(
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
raftSupport = true)
@ -6733,7 +6714,6 @@ class KafkaApisTest extends Logging {
var request = buildRequest(shareFetchRequest)
kafkaApis = createKafkaApis(
overrideProperties = Map(
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
raftSupport = true)
@ -6864,7 +6844,6 @@ class KafkaApisTest extends Logging {
kafkaApis = createKafkaApis(
overrideProperties = Map(
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "false"),
raftSupport = true)
@ -6917,7 +6896,6 @@ class KafkaApisTest extends Logging {
val request = buildRequest(shareFetchRequest)
kafkaApis = createKafkaApis(
overrideProperties = Map(
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
authorizer = Option(authorizer),
@ -6982,7 +6960,6 @@ class KafkaApisTest extends Logging {
val request = buildRequest(shareFetchRequest)
kafkaApis = createKafkaApis(
overrideProperties = Map(
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
raftSupport = true)
@ -7050,7 +7027,6 @@ class KafkaApisTest extends Logging {
val request = buildRequest(shareAcknowledgeRequest)
kafkaApis = createKafkaApis(
overrideProperties = Map(
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
raftSupport = true)
@ -7141,7 +7117,6 @@ class KafkaApisTest extends Logging {
kafkaApis = createKafkaApis(
overrideProperties = Map(
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "false"),
raftSupport = true)
@ -7193,7 +7168,6 @@ class KafkaApisTest extends Logging {
val request = buildRequest(shareAcknowledgeRequest)
kafkaApis = createKafkaApis(
overrideProperties = Map(
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
authorizer = Option(authorizer),
@ -7246,7 +7220,6 @@ class KafkaApisTest extends Logging {
kafkaApis = createKafkaApis(
overrideProperties = Map(
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
raftSupport = true)
@ -7298,7 +7271,6 @@ class KafkaApisTest extends Logging {
kafkaApis = createKafkaApis(
overrideProperties = Map(
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
raftSupport = true)
@ -7348,7 +7320,6 @@ class KafkaApisTest extends Logging {
val request = buildRequest(shareAcknowledgeRequest)
kafkaApis = createKafkaApis(
overrideProperties = Map(
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
raftSupport = true)
@ -7424,7 +7395,6 @@ class KafkaApisTest extends Logging {
val request = buildRequest(shareAcknowledgeRequest)
kafkaApis = createKafkaApis(
overrideProperties = Map(
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
raftSupport = true)
@ -7488,7 +7458,6 @@ class KafkaApisTest extends Logging {
val request = buildRequest(shareAcknowledgeRequest)
kafkaApis = createKafkaApis(
overrideProperties = Map(
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
raftSupport = true)
@ -7556,7 +7525,6 @@ class KafkaApisTest extends Logging {
val request = buildRequest(shareAcknowledgeRequest)
kafkaApis = createKafkaApis(
overrideProperties = Map(
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
raftSupport = true)
@ -7625,7 +7593,6 @@ class KafkaApisTest extends Logging {
val request = buildRequest(shareAcknowledgeRequest)
kafkaApis = createKafkaApis(
overrideProperties = Map(
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
raftSupport = true)
@ -7712,7 +7679,6 @@ class KafkaApisTest extends Logging {
kafkaApis = createKafkaApis(
overrideProperties = Map(
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
raftSupport = true)
@ -7782,7 +7748,6 @@ class KafkaApisTest extends Logging {
kafkaApis = createKafkaApis(
overrideProperties = Map(
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
raftSupport = true)
@ -7856,7 +7821,6 @@ class KafkaApisTest extends Logging {
kafkaApis = createKafkaApis(
overrideProperties = Map(
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
raftSupport = true)
@ -7924,7 +7888,6 @@ class KafkaApisTest extends Logging {
kafkaApis = createKafkaApis(
overrideProperties = Map(
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
raftSupport = true)
@ -7997,7 +7960,6 @@ class KafkaApisTest extends Logging {
kafkaApis = createKafkaApis(
overrideProperties = Map(
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
raftSupport = true)
@ -8077,7 +8039,6 @@ class KafkaApisTest extends Logging {
kafkaApis = createKafkaApis(
overrideProperties = Map(
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
raftSupport = true)
@ -8158,7 +8119,6 @@ class KafkaApisTest extends Logging {
kafkaApis = createKafkaApis(
overrideProperties = Map(
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
raftSupport = true)
@ -8233,7 +8193,6 @@ class KafkaApisTest extends Logging {
kafkaApis = createKafkaApis(
overrideProperties = Map(
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
raftSupport = true)
@ -8331,7 +8290,6 @@ class KafkaApisTest extends Logging {
val request = buildRequest(shareAcknowledgeRequest)
kafkaApis = createKafkaApis(
overrideProperties = Map(
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
raftSupport = true)
@ -11105,7 +11063,6 @@ class KafkaApisTest extends Logging {
consumerGroupHeartbeatRequest
)).thenReturn(future)
kafkaApis = createKafkaApis(
overrideProperties = Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> "classic,consumer"),
featureVersions = Seq(GroupVersion.GV_1),
raftSupport = true
)
@ -11133,7 +11090,6 @@ class KafkaApisTest extends Logging {
consumerGroupHeartbeatRequest
)).thenReturn(future)
kafkaApis = createKafkaApis(
overrideProperties = Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> "classic,consumer"),
featureVersions = Seq(GroupVersion.GV_1),
raftSupport = true
)
@ -11157,7 +11113,6 @@ class KafkaApisTest extends Logging {
.thenReturn(Seq(AuthorizationResult.DENIED).asJava)
kafkaApis = createKafkaApis(
authorizer = Some(authorizer),
overrideProperties = Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> "classic,consumer"),
featureVersions = Seq(GroupVersion.GV_1),
raftSupport = true
)
@ -11184,7 +11139,6 @@ class KafkaApisTest extends Logging {
any[util.List[String]]
)).thenReturn(future)
kafkaApis = createKafkaApis(
overrideProperties = Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> "classic,consumer"),
featureVersions = Seq(GroupVersion.GV_1),
raftSupport = true
)
@ -11255,7 +11209,6 @@ class KafkaApisTest extends Logging {
future.complete(List().asJava)
kafkaApis = createKafkaApis(
authorizer = Some(authorizer),
overrideProperties = Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> "classic,consumer"),
featureVersions = Seq(GroupVersion.GV_1),
raftSupport = true
)
@ -11279,7 +11232,6 @@ class KafkaApisTest extends Logging {
any[util.List[String]]
)).thenReturn(future)
kafkaApis = createKafkaApis(
overrideProperties = Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> "classic,consumer"),
featureVersions = Seq(GroupVersion.GV_1),
raftSupport = true
)
@ -11470,7 +11422,7 @@ class KafkaApisTest extends Logging {
)).thenReturn(future)
metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0)
kafkaApis = createKafkaApis(
overrideProperties = Map(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true", ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
overrideProperties = Map(ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
raftSupport = true
)
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
@ -11494,7 +11446,7 @@ class KafkaApisTest extends Logging {
.thenReturn(Seq(AuthorizationResult.DENIED).asJava)
metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0)
kafkaApis = createKafkaApis(
overrideProperties = Map(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true", ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
overrideProperties = Map(ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
authorizer = Some(authorizer),
raftSupport = true
)
@ -11517,7 +11469,7 @@ class KafkaApisTest extends Logging {
)).thenReturn(future)
metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0)
kafkaApis = createKafkaApis(
overrideProperties = Map(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true", ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
overrideProperties = Map(ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
raftSupport = true
)
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
@ -11534,7 +11486,7 @@ class KafkaApisTest extends Logging {
new ShareGroupDescribeResponseData.DescribedGroup().setGroupId(groupIds.get(0)),
new ShareGroupDescribeResponseData.DescribedGroup().setGroupId(groupIds.get(1))
).asJava
getShareGroupDescribeResponse(groupIds, Map(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true", ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true")
getShareGroupDescribeResponse(groupIds, Map(ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true")
, true, null, describedGroups)
}
@ -11558,7 +11510,7 @@ class KafkaApisTest extends Logging {
val authorizer: Authorizer = mock(classOf[Authorizer])
when(authorizer.authorize(any[RequestContext], any[util.List[Action]]))
.thenReturn(Seq(AuthorizationResult.DENIED).asJava)
val response = getShareGroupDescribeResponse(groupIds, Map(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true", ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true")
val response = getShareGroupDescribeResponse(groupIds, Map(ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true")
, false, authorizer, describedGroups)
assertNotNull(response.data)
assertEquals(2, response.data.groups.size)
@ -11576,7 +11528,7 @@ class KafkaApisTest extends Logging {
when(authorizer.authorize(any[RequestContext], any[util.List[Action]]))
.thenReturn(Seq(AuthorizationResult.DENIED).asJava, Seq(AuthorizationResult.ALLOWED).asJava)
val response = getShareGroupDescribeResponse(groupIds, Map(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true", ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true")
val response = getShareGroupDescribeResponse(groupIds, Map(ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true")
, false, authorizer, describedGroups)
assertNotNull(response.data)

View File

@ -1939,14 +1939,6 @@ class KafkaConfigTest {
def testGroupCoordinatorRebalanceProtocols(): Unit = {
val props = new Properties()
// consumer cannot be enabled in ZK mode.
props.put(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, "classic,consumer")
assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props))
// share cannot be enabled in ZK mode.
props.put(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, "classic,share")
assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props))
// Setting KRaft's properties.
props.putAll(kraftProps())
@ -1971,16 +1963,6 @@ class KafkaConfigTest {
assertEquals(Set(GroupType.CLASSIC, GroupType.CONSUMER, GroupType.SHARE), config.groupCoordinatorRebalanceProtocols)
assertTrue(config.isNewGroupCoordinatorEnabled)
assertTrue(config.shareGroupConfig.isShareGroupEnabled)
// consumer cannot be used without the new group coordinator.
props.put(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, "false")
props.put(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, "classic,consumer")
assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props))
// share cannot be used without the new group coordinator.
props.put(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, "false")
props.put(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, "classic,share")
assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props))
}
@Test

View File

@ -32,8 +32,6 @@ import org.junit.jupiter.api.extension.ExtendWith
@ClusterTestDefaults(types = Array(Type.KRAFT))
class LeaveGroupRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) {
@ClusterTest(serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
))

View File

@ -34,8 +34,6 @@ import org.junit.jupiter.api.extension.ExtendWith
class ListGroupsRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) {
@ClusterTest(
serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, value = "1000")
@ -46,8 +44,6 @@ class ListGroupsRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBa
}
@ClusterTest(serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, value = "1000")

View File

@ -32,8 +32,6 @@ class OffsetCommitRequestTest(cluster: ClusterInstance) extends GroupCoordinator
@ClusterTest(
serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
)
@ -44,8 +42,6 @@ class OffsetCommitRequestTest(cluster: ClusterInstance) extends GroupCoordinator
@ClusterTest(
serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
)

View File

@ -31,8 +31,6 @@ import org.junit.jupiter.api.extension.ExtendWith
class OffsetDeleteRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) {
@ClusterTest(
serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
)
@ -42,8 +40,6 @@ class OffsetDeleteRequestTest(cluster: ClusterInstance) extends GroupCoordinator
}
@ClusterTest(serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
))

View File

@ -37,8 +37,6 @@ class OffsetFetchRequestTest(cluster: ClusterInstance) extends GroupCoordinatorB
@ClusterTest(
serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
)
@ -49,8 +47,6 @@ class OffsetFetchRequestTest(cluster: ClusterInstance) extends GroupCoordinatorB
@ClusterTest(
serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
)
@ -71,8 +67,6 @@ class OffsetFetchRequestTest(cluster: ClusterInstance) extends GroupCoordinatorB
@ClusterTest(
serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
)
@ -82,8 +76,6 @@ class OffsetFetchRequestTest(cluster: ClusterInstance) extends GroupCoordinatorB
}
@ClusterTest(serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
))
@ -103,8 +95,6 @@ class OffsetFetchRequestTest(cluster: ClusterInstance) extends GroupCoordinatorB
@ClusterTest(
serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
)
@ -114,8 +104,6 @@ class OffsetFetchRequestTest(cluster: ClusterInstance) extends GroupCoordinatorB
}
@ClusterTest(serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
))

View File

@ -72,7 +72,6 @@ class ShareGroupDescribeRequestTest(cluster: ClusterInstance) extends GroupCoord
@ClusterTest(
serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer,share"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),

View File

@ -39,8 +39,6 @@ import scala.concurrent.{Await, Future}
@ClusterTestDefaults(types = Array(Type.KRAFT))
class SyncGroupRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) {
@ClusterTest(serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, value = "1000")

View File

@ -76,9 +76,9 @@ public class GroupCoordinatorConfig {
public static final String GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG = "group.coordinator.rebalance.protocols";
public static final String GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DOC = "The list of enabled rebalance protocols. Supported protocols: " +
Arrays.stream(Group.GroupType.values()).map(Group.GroupType::toString).collect(Collectors.joining(",")) + ". " +
"The " + Group.GroupType.CONSUMER + " rebalance protocol is in preview and therefore must not be used in production. " +
"The " + Group.GroupType.SHARE + " rebalance protocol is in early access and therefore must not be used in production.";
public static final List<String> GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DEFAULT = Collections.singletonList(Group.GroupType.CLASSIC.toString());
public static final List<String> GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DEFAULT =
Collections.unmodifiableList(Arrays.asList(Group.GroupType.CLASSIC.toString(), Group.GroupType.CONSUMER.toString()));
public static final String GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG = "group.coordinator.append.linger.ms";
public static final String GROUP_COORDINATOR_APPEND_LINGER_MS_DOC = "The duration in milliseconds that the coordinator will " +
"wait for writes to accumulate before flushing them to disk. Transactional writes are not accumulated.";

View File

@ -41,10 +41,7 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
import static kafka.test.annotation.Type.CO_KRAFT;
import static kafka.test.annotation.Type.KRAFT;
import static kafka.test.annotation.Type.ZK;
import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG;
import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG;
import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG;
import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG;
@ -65,11 +62,11 @@ import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_
* <p>
* We can reduce the number of cases as same as the old test framework by using the following methods:
* <ul>
* <li>{@link #forConsumerGroupCoordinator} for the case of (consumer group protocol)</li>
* <li>{@link #forKRaftGroupCoordinator} for the case of (consumer group protocol)</li>
* <li>(CO_KRAFT servers) with (group.coordinator.new.enable=true) with (classic / consumer group protocols) = 2 cases</li>
* </ul>
* <ul>
* <li>{@link #forClassicGroupCoordinator} for the case of (classic group protocol)</li>
* <li>{@link #forZkGroupCoordinator} for the case of (classic group protocol)</li>
* <li>(ZK / KRAFT servers) with (group.coordinator.new.enable=false) with (classic group protocol) = 2 cases</li>
* </ul>
*/
@ -79,34 +76,32 @@ class ConsumerGroupCommandTestUtils {
}
static List<ClusterConfig> generator() {
return Stream.concat(forConsumerGroupCoordinator().stream(), forClassicGroupCoordinator().stream())
.collect(Collectors.toList());
return Stream
.concat(forKRaftGroupCoordinator().stream(), forZkGroupCoordinator().stream())
.collect(Collectors.toList());
}
static List<ClusterConfig> forConsumerGroupCoordinator() {
static List<ClusterConfig> forKRaftGroupCoordinator() {
Map<String, String> serverProperties = new HashMap<>();
serverProperties.put(OFFSETS_TOPIC_PARTITIONS_CONFIG, "1");
serverProperties.put(OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, "1");
serverProperties.put(NEW_GROUP_COORDINATOR_ENABLE_CONFIG, "true");
serverProperties.put(GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, "classic,consumer");
return Collections.singletonList(ClusterConfig.defaultBuilder()
.setTypes(Collections.singleton(CO_KRAFT))
.setServerProperties(serverProperties)
.setTags(Collections.singletonList("consumerGroupCoordinator"))
.setTags(Collections.singletonList("kraftGroupCoordinator"))
.build());
}
static List<ClusterConfig> forClassicGroupCoordinator() {
static List<ClusterConfig> forZkGroupCoordinator() {
Map<String, String> serverProperties = new HashMap<>();
serverProperties.put(OFFSETS_TOPIC_PARTITIONS_CONFIG, "1");
serverProperties.put(OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, "1");
serverProperties.put(NEW_GROUP_COORDINATOR_ENABLE_CONFIG, "false");
return Collections.singletonList(ClusterConfig.defaultBuilder()
.setTypes(Stream.of(ZK, KRAFT).collect(Collectors.toSet()))
.setTypes(Collections.singleton(ZK))
.setServerProperties(serverProperties)
.setTags(Collections.singletonList("classicGroupCoordinator"))
.setTags(Collections.singletonList("zkGroupCoordinator"))
.build());
}

View File

@ -83,7 +83,7 @@ public class ListConsumerGroupTest {
}
private static List<ClusterConfig> consumerProtocolOnlyGenerator() {
return ConsumerGroupCommandTestUtils.forConsumerGroupCoordinator();
return ConsumerGroupCommandTestUtils.forKRaftGroupCoordinator();
}
private List<GroupProtocol> supportedGroupProtocols() {