MINOR: Some cleanups in group coordinator's intergration tests (#19281)

This patch applies a few cleanups to uniformize how group coordinator's
integration tests are setup.

Reviewers: Lianet Magrans <lmagrans@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
David Jacot 2025-03-27 14:06:36 +01:00 committed by GitHub
parent b952ea460f
commit 9e42b76147
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
18 changed files with 135 additions and 207 deletions

View File

@ -37,15 +37,17 @@ import java.lang.{Byte => JByte}
import java.util.Collections
import scala.jdk.CollectionConverters._
@ClusterTestDefaults(types = Array(Type.KRAFT), brokers = 1)
@ClusterTestDefaults(
types = Array(Type.KRAFT),
brokers = 1,
serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
)
)
class ConsumerGroupDescribeRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) {
@ClusterTest(
types = Array(Type.KRAFT),
serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
),
features = Array(
new ClusterFeature(feature = Feature.GROUP_VERSION, version = 0)
)
@ -71,13 +73,7 @@ class ConsumerGroupDescribeRequestTest(cluster: ClusterInstance) extends GroupCo
assertEquals(expectedResponse, consumerGroupDescribeResponse.data)
}
@ClusterTest(
types = Array(Type.KRAFT),
serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
)
)
@ClusterTest
def testConsumerGroupDescribeWithNewGroupCoordinator(): Unit = {
// Creates the __consumer_offsets topics because it won't be created automatically
// in this test because it does not use FindCoordinator API.
@ -217,13 +213,7 @@ class ConsumerGroupDescribeRequestTest(cluster: ClusterInstance) extends GroupCo
}
}
@ClusterTest(
types = Array(Type.KRAFT),
serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
)
)
@ClusterTest
def testConsumerGroupDescribeWithMigrationMember(): Unit = {
// Creates the __consumer_offsets topics because it won't be created automatically
// in this test because it does not use FindCoordinator API.

View File

@ -40,7 +40,7 @@ import scala.jdk.CollectionConverters._
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
)
)
class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) {
class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) {
@ClusterTest(
serverProperties = Array(
@ -52,7 +52,7 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) {
new ConsumerGroupHeartbeatRequestData()
).build()
val consumerGroupHeartbeatResponse = connectAndReceive(consumerGroupHeartbeatRequest)
val consumerGroupHeartbeatResponse = connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest)
val expectedResponse = new ConsumerGroupHeartbeatResponseData().setErrorCode(Errors.UNSUPPORTED_VERSION.code)
assertEquals(expectedResponse, consumerGroupHeartbeatResponse.data)
}
@ -67,7 +67,7 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) {
new ConsumerGroupHeartbeatRequestData()
).build()
val consumerGroupHeartbeatResponse = connectAndReceive(consumerGroupHeartbeatRequest)
val consumerGroupHeartbeatResponse = connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest)
val expectedResponse = new ConsumerGroupHeartbeatResponseData().setErrorCode(Errors.UNSUPPORTED_VERSION.code)
assertEquals(expectedResponse, consumerGroupHeartbeatResponse.data)
}
@ -101,7 +101,7 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) {
// here because the group coordinator is loaded in the background.
var consumerGroupHeartbeatResponse: ConsumerGroupHeartbeatResponse = null
TestUtils.waitUntilTrue(() => {
consumerGroupHeartbeatResponse = connectAndReceive(consumerGroupHeartbeatRequest)
consumerGroupHeartbeatResponse = connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest)
consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code
}, msg = s"Could not join the group successfully. Last response $consumerGroupHeartbeatResponse.")
@ -134,7 +134,7 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) {
// Heartbeats until the partitions are assigned.
consumerGroupHeartbeatResponse = null
TestUtils.waitUntilTrue(() => {
consumerGroupHeartbeatResponse = connectAndReceive(consumerGroupHeartbeatRequest)
consumerGroupHeartbeatResponse = connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest)
consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code &&
consumerGroupHeartbeatResponse.data.assignment == expectedAssignment
}, msg = s"Could not get partitions assigned. Last response $consumerGroupHeartbeatResponse.")
@ -151,7 +151,7 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) {
.setMemberEpoch(-1)
).build()
consumerGroupHeartbeatResponse = connectAndReceive(consumerGroupHeartbeatRequest)
consumerGroupHeartbeatResponse = connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest)
// Verify the response.
assertEquals(-1, consumerGroupHeartbeatResponse.data.memberEpoch)
@ -189,7 +189,7 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) {
// here because the group coordinator is loaded in the background.
var consumerGroupHeartbeatResponse: ConsumerGroupHeartbeatResponse = null
TestUtils.waitUntilTrue(() => {
consumerGroupHeartbeatResponse = connectAndReceive(consumerGroupHeartbeatRequest)
consumerGroupHeartbeatResponse = connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest)
consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code
}, msg = s"Could not join the group successfully. Last response $consumerGroupHeartbeatResponse.")
@ -222,7 +222,7 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) {
// Heartbeats until the partitions are assigned.
consumerGroupHeartbeatResponse = null
TestUtils.waitUntilTrue(() => {
consumerGroupHeartbeatResponse = connectAndReceive(consumerGroupHeartbeatRequest)
consumerGroupHeartbeatResponse = connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest)
consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code &&
consumerGroupHeartbeatResponse.data.assignment == expectedAssignment
}, msg = s"Could not get partitions assigned. Last response $consumerGroupHeartbeatResponse.")
@ -248,7 +248,7 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) {
// Heartbeats until the partitions are revoked.
consumerGroupHeartbeatResponse = null
TestUtils.waitUntilTrue(() => {
consumerGroupHeartbeatResponse = connectAndReceive(consumerGroupHeartbeatRequest)
consumerGroupHeartbeatResponse = connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest)
consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code &&
consumerGroupHeartbeatResponse.data.assignment == expectedAssignment
}, msg = s"Could not get partitions revoked. Last response $consumerGroupHeartbeatResponse.")
@ -290,7 +290,7 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) {
// here because the group coordinator is loaded in the background.
var consumerGroupHeartbeatResponse: ConsumerGroupHeartbeatResponse = null
TestUtils.waitUntilTrue(() => {
consumerGroupHeartbeatResponse = connectAndReceive(consumerGroupHeartbeatRequest)
consumerGroupHeartbeatResponse = connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest)
consumerGroupHeartbeatResponse.data.errorCode == Errors.INVALID_REGULAR_EXPRESSION.code
}, msg = s"Did not receive the expected error. Last response $consumerGroupHeartbeatResponse.")
@ -329,7 +329,7 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) {
// here because the group coordinator is loaded in the background.
var consumerGroupHeartbeatResponse: ConsumerGroupHeartbeatResponse = null
TestUtils.waitUntilTrue(() => {
consumerGroupHeartbeatResponse = connectAndReceive(consumerGroupHeartbeatRequest)
consumerGroupHeartbeatResponse = connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest)
consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code
}, msg = s"Did not receive the expected successful response. Last response $consumerGroupHeartbeatResponse.")
@ -348,7 +348,7 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) {
// here because the group coordinator is loaded in the background.
consumerGroupHeartbeatResponse = null
TestUtils.waitUntilTrue(() => {
consumerGroupHeartbeatResponse = connectAndReceive(consumerGroupHeartbeatRequest)
consumerGroupHeartbeatResponse = connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest)
consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code
}, msg = s"Did not receive the expected successful response. Last response $consumerGroupHeartbeatResponse.")
} finally {
@ -386,7 +386,7 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) {
// here because the group coordinator is loaded in the background.
var consumerGroupHeartbeatResponse: ConsumerGroupHeartbeatResponse = null
TestUtils.waitUntilTrue(() => {
consumerGroupHeartbeatResponse = connectAndReceive(consumerGroupHeartbeatRequest)
consumerGroupHeartbeatResponse = connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest)
consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code
}, msg = s"Static member could not join the group successfully. Last response $consumerGroupHeartbeatResponse.")
@ -420,7 +420,7 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) {
// Heartbeats until the partitions are assigned.
consumerGroupHeartbeatResponse = null
TestUtils.waitUntilTrue(() => {
consumerGroupHeartbeatResponse = connectAndReceive(consumerGroupHeartbeatRequest)
consumerGroupHeartbeatResponse = connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest)
consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code &&
consumerGroupHeartbeatResponse.data.assignment == expectedAssignment
}, msg = s"Static member could not get partitions assigned. Last response $consumerGroupHeartbeatResponse.")
@ -441,7 +441,7 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) {
.setMemberEpoch(-2)
).build()
consumerGroupHeartbeatResponse = connectAndReceive(consumerGroupHeartbeatRequest)
consumerGroupHeartbeatResponse = connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest)
// Verify the response.
assertEquals(-2, consumerGroupHeartbeatResponse.data.memberEpoch)
@ -458,7 +458,7 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) {
.setTopicPartitions(List.empty.asJava)
).build()
consumerGroupHeartbeatResponse = connectAndReceive(consumerGroupHeartbeatRequest)
consumerGroupHeartbeatResponse = connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest)
// Verify the response.
assertNotNull(consumerGroupHeartbeatResponse.data.memberId)
@ -507,7 +507,7 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) {
// here because the group coordinator is loaded in the background.
var consumerGroupHeartbeatResponse: ConsumerGroupHeartbeatResponse = null
TestUtils.waitUntilTrue(() => {
consumerGroupHeartbeatResponse = connectAndReceive(consumerGroupHeartbeatRequest)
consumerGroupHeartbeatResponse = connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest)
consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code
}, msg = s"Could not join the group successfully. Last response $consumerGroupHeartbeatResponse.")
@ -541,7 +541,7 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) {
// Heartbeats until the partitions are assigned.
consumerGroupHeartbeatResponse = null
TestUtils.waitUntilTrue(() => {
consumerGroupHeartbeatResponse = connectAndReceive(consumerGroupHeartbeatRequest)
consumerGroupHeartbeatResponse = connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest)
consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code &&
consumerGroupHeartbeatResponse.data.assignment == expectedAssignment
}, msg = s"Could not get partitions assigned. Last response $consumerGroupHeartbeatResponse.")
@ -563,14 +563,14 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) {
).build()
// Validating that trying to join with an in-use instanceId would throw an UnreleasedInstanceIdException.
consumerGroupHeartbeatResponse = connectAndReceive(consumerGroupHeartbeatRequest)
consumerGroupHeartbeatResponse = connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest)
assertEquals(Errors.UNRELEASED_INSTANCE_ID.code, consumerGroupHeartbeatResponse.data.errorCode)
// The new static member join group will keep failing with an UnreleasedInstanceIdException
// until eventually it gets through because the existing member will be kicked out
// because of not sending a heartbeat till session timeout expiry.
TestUtils.waitUntilTrue(() => {
consumerGroupHeartbeatResponse = connectAndReceive(consumerGroupHeartbeatRequest)
consumerGroupHeartbeatResponse = connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest)
consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code &&
consumerGroupHeartbeatResponse.data.assignment == expectedAssignment
}, msg = s"Could not re-join the group successfully. Last response $consumerGroupHeartbeatResponse.")
@ -620,7 +620,7 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) {
// here because the group coordinator is loaded in the background.
var consumerGroupHeartbeatResponse: ConsumerGroupHeartbeatResponse = null
TestUtils.waitUntilTrue(() => {
consumerGroupHeartbeatResponse = connectAndReceive(consumerGroupHeartbeatRequest)
consumerGroupHeartbeatResponse = connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest)
consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code
}, msg = s"Could not join the group successfully. Last response $consumerGroupHeartbeatResponse.")
@ -648,7 +648,7 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) {
// Verify the response. The heartbeat interval was updated.
TestUtils.waitUntilTrue(() => {
consumerGroupHeartbeatResponse = connectAndReceive(consumerGroupHeartbeatRequest)
consumerGroupHeartbeatResponse = connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest)
consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code &&
newHeartbeatIntervalMs == consumerGroupHeartbeatResponse.data.heartbeatIntervalMs
}, msg = s"Dynamic update consumer group config failed. Last response $consumerGroupHeartbeatResponse.")
@ -681,7 +681,7 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) {
var consumerGroupHeartbeatResponse: ConsumerGroupHeartbeatResponse = null
TestUtils.waitUntilTrue(() => {
consumerGroupHeartbeatResponse = connectAndReceive(consumerGroupHeartbeatRequest)
consumerGroupHeartbeatResponse = connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest)
consumerGroupHeartbeatResponse.data.errorCode == Errors.INVALID_REQUEST.code
}, msg = "Should fail due to invalid member id.")
} finally {
@ -712,7 +712,7 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) {
var consumerGroupHeartbeatResponse: ConsumerGroupHeartbeatResponse = null
TestUtils.waitUntilTrue(() => {
consumerGroupHeartbeatResponse = connectAndReceive(consumerGroupHeartbeatRequest)
consumerGroupHeartbeatResponse = connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest)
consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code
}, msg = s"Could not join the group successfully. Last response $consumerGroupHeartbeatResponse.")
@ -721,12 +721,4 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) {
assertFalse(memberId.isEmpty)
admin.close()
}
private def connectAndReceive(request: ConsumerGroupHeartbeatRequest): ConsumerGroupHeartbeatResponse = {
IntegrationTestUtils.connectAndReceive[ConsumerGroupHeartbeatResponse](
request,
cluster.anyBrokerSocketServer(),
cluster.clientListener()
)
}
}

View File

@ -34,12 +34,16 @@ import java.util.Collections
import scala.jdk.CollectionConverters._
@Timeout(120)
@ClusterTestDefaults(types = Array(Type.KRAFT))
@ClusterTestDefaults(
types = Array(Type.KRAFT),
serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
)
)
class ConsumerProtocolMigrationTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) {
@ClusterTest(
serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG, value = "bidirectional")
)
)
@ -49,8 +53,6 @@ class ConsumerProtocolMigrationTest(cluster: ClusterInstance) extends GroupCoord
@ClusterTest(
serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG, value = "upgrade")
)
)
@ -60,8 +62,6 @@ class ConsumerProtocolMigrationTest(cluster: ClusterInstance) extends GroupCoord
@ClusterTest(
serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG, value = "downgrade")
)
)
@ -71,8 +71,6 @@ class ConsumerProtocolMigrationTest(cluster: ClusterInstance) extends GroupCoord
@ClusterTest(
serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG, value = "disabled")
)
)
@ -82,8 +80,6 @@ class ConsumerProtocolMigrationTest(cluster: ClusterInstance) extends GroupCoord
@ClusterTest(
serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG, value = "bidirectional")
)
)
@ -93,8 +89,6 @@ class ConsumerProtocolMigrationTest(cluster: ClusterInstance) extends GroupCoord
@ClusterTest(
serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG, value = "upgrade")
)
)
@ -104,8 +98,6 @@ class ConsumerProtocolMigrationTest(cluster: ClusterInstance) extends GroupCoord
@ClusterTest(
serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG, value = "downgrade")
)
)
@ -115,8 +107,6 @@ class ConsumerProtocolMigrationTest(cluster: ClusterInstance) extends GroupCoord
@ClusterTest(
serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG, value = "disabled")
)
)
@ -126,8 +116,6 @@ class ConsumerProtocolMigrationTest(cluster: ClusterInstance) extends GroupCoord
@ClusterTest(
serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG, value = "bidirectional")
)
)
@ -137,8 +125,6 @@ class ConsumerProtocolMigrationTest(cluster: ClusterInstance) extends GroupCoord
@ClusterTest(
serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG, value = "upgrade")
)
)
@ -148,8 +134,6 @@ class ConsumerProtocolMigrationTest(cluster: ClusterInstance) extends GroupCoord
@ClusterTest(
serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG, value = "downgrade")
)
)
@ -159,8 +143,6 @@ class ConsumerProtocolMigrationTest(cluster: ClusterInstance) extends GroupCoord
@ClusterTest(
serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG, value = "disabled")
)
)
@ -170,8 +152,6 @@ class ConsumerProtocolMigrationTest(cluster: ClusterInstance) extends GroupCoord
@ClusterTest(
serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG, value = "bidirectional")
)
)
@ -181,8 +161,6 @@ class ConsumerProtocolMigrationTest(cluster: ClusterInstance) extends GroupCoord
@ClusterTest(
serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG, value = "bidirectional")
)
)
@ -192,8 +170,6 @@ class ConsumerProtocolMigrationTest(cluster: ClusterInstance) extends GroupCoord
@ClusterTest(
serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG, value = "bidirectional")
)
)
@ -203,8 +179,6 @@ class ConsumerProtocolMigrationTest(cluster: ClusterInstance) extends GroupCoord
@ClusterTest(
serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG, value = "bidirectional")
)
)
@ -214,8 +188,6 @@ class ConsumerProtocolMigrationTest(cluster: ClusterInstance) extends GroupCoord
@ClusterTest(
serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG, value = "upgrade")
)
)
@ -288,8 +260,6 @@ class ConsumerProtocolMigrationTest(cluster: ClusterInstance) extends GroupCoord
@ClusterTest(
serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG, value = "downgrade")
)
)
@ -392,8 +362,6 @@ class ConsumerProtocolMigrationTest(cluster: ClusterInstance) extends GroupCoord
@ClusterTest(
serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG, value = "disabled")
)
)
@ -430,8 +398,6 @@ class ConsumerProtocolMigrationTest(cluster: ClusterInstance) extends GroupCoord
@ClusterTest(
serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG, value = "disabled")
)
)
@ -493,8 +459,6 @@ class ConsumerProtocolMigrationTest(cluster: ClusterInstance) extends GroupCoord
*/
@ClusterTest(
serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG, value = "bidirectional")
)
)

View File

@ -24,24 +24,20 @@ import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
import org.apache.kafka.coordinator.group.classic.ClassicGroupState
import org.junit.jupiter.api.Assertions.assertEquals
@ClusterTestDefaults(types = Array(Type.KRAFT))
class DeleteGroupsRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) {
@ClusterTest(
serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
)
@ClusterTestDefaults(
types = Array(Type.KRAFT),
serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
)
)
class DeleteGroupsRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) {
@ClusterTest
def testDeleteGroupsWithNewConsumerGroupProtocol(): Unit = {
testDeleteGroups(true)
}
@ClusterTest(
serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
)
)
@ClusterTest
def testDeleteGroupsWithOldConsumerGroupProtocol(): Unit = {
testDeleteGroups(false)
}

View File

@ -26,12 +26,15 @@ import org.junit.jupiter.api.Assertions.assertEquals
import scala.jdk.CollectionConverters._
@ClusterTestDefaults(types = Array(Type.KRAFT))
class DescribeGroupsRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) {
@ClusterTest(serverProperties = Array(
@ClusterTestDefaults(
types = Array(Type.KRAFT),
serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
))
)
)
class DescribeGroupsRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) {
@ClusterTest
def testDescribeGroupsWithOldConsumerGroupProtocol(): Unit = {
// Creates the __consumer_offsets topics because it won't be created automatically
// in this test because it does not use FindCoordinator API.

View File

@ -135,7 +135,7 @@ class GroupCoordinatorBaseRequestTest(cluster: ClusterInstance) {
}
protected def closeProducer(): Unit = {
if( producer != null )
if(producer != null)
producer.close()
}

View File

@ -30,12 +30,15 @@ import java.util.Collections
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
@ClusterTestDefaults(types = Array(Type.KRAFT))
class HeartbeatRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) {
@ClusterTest(serverProperties = Array(
@ClusterTestDefaults(
types = Array(Type.KRAFT),
serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
))
)
)
class HeartbeatRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) {
@ClusterTest
def testHeartbeatWithOldConsumerGroupProtocol(): Unit = {
// Creates the __consumer_offsets topics because it won't be created automatically
// in this test because it does not use FindCoordinator API.

View File

@ -16,7 +16,7 @@
*/
package kafka.server
import org.apache.kafka.common.test.api.{ClusterConfigProperty, ClusterTest, Type}
import org.apache.kafka.common.test.api.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, Type}
import kafka.utils.TestUtils
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol
@ -34,12 +34,16 @@ import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Future}
import scala.jdk.CollectionConverters._
class JoinGroupRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) {
@ClusterTest(types = Array(Type.KRAFT), serverProperties = Array(
@ClusterTestDefaults(
types = Array(Type.KRAFT),
serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, value = "1000"),
))
new ClusterConfigProperty(key = GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, value = "1000")
)
)
class JoinGroupRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) {
@ClusterTest
def testJoinGroupWithOldConsumerGroupProtocol(): Unit = {
// Creates the __consumer_offsets topics because it won't be created automatically
// in this test because it does not use FindCoordinator API.

View File

@ -29,10 +29,13 @@ import org.junit.jupiter.api.Assertions.assertEquals
import scala.jdk.CollectionConverters._
@ClusterTestDefaults(types = Array(Type.KRAFT), serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
))
@ClusterTestDefaults(
types = Array(Type.KRAFT),
serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
)
)
class LeaveGroupRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) {
@ClusterTest
def testLeaveGroupWithNewConsumerGroupProtocolAndNewGroupCoordinator(): Unit = {

View File

@ -25,24 +25,21 @@ import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroup.Consumer
import org.apache.kafka.coordinator.group.{Group, GroupCoordinatorConfig}
import org.junit.jupiter.api.Assertions.assertEquals
@ClusterTestDefaults(types = Array(Type.KRAFT))
class ListGroupsRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) {
@ClusterTest(
serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, value = "1000")
)
@ClusterTestDefaults(
types = Array(Type.KRAFT),
serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, value = "1000")
)
)
class ListGroupsRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) {
@ClusterTest
def testListGroupsWithNewConsumerGroupProtocol(): Unit = {
testListGroups(true)
}
@ClusterTest(serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, value = "1000")
))
@ClusterTest
def testListGroupsWithOldConsumerGroupProtocol(): Unit = {
testListGroups(false)
}

View File

@ -21,25 +21,21 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.test.ClusterInstance
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
@ClusterTestDefaults(types = Array(Type.KRAFT))
@ClusterTestDefaults(
types = Array(Type.KRAFT),
serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
)
)
class OffsetCommitRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) {
@ClusterTest(
serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
)
)
@ClusterTest
def testOffsetCommitWithNewConsumerGroupProtocol(): Unit = {
testOffsetCommit(true)
}
@ClusterTest(
serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
)
)
@ClusterTest
def testOffsetCommitWithOldConsumerGroupProtocol(): Unit = {
testOffsetCommit(false)
}

View File

@ -21,22 +21,20 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.test.ClusterInstance
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
@ClusterTestDefaults(types = Array(Type.KRAFT))
class OffsetDeleteRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) {
@ClusterTest(
serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
)
@ClusterTestDefaults(
types = Array(Type.KRAFT),
serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
)
)
class OffsetDeleteRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) {
@ClusterTest
def testOffsetDeleteWithNewConsumerGroupProtocol(): Unit = {
testOffsetDelete(true)
}
@ClusterTest(serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
))
@ClusterTest
def testOffsetDeleteWithOldConsumerGroupProtocol(): Unit = {
testOffsetDelete(false)
}

View File

@ -26,61 +26,41 @@ import org.junit.jupiter.api.Assertions.assertEquals
import scala.jdk.CollectionConverters._
@ClusterTestDefaults(types = Array(Type.KRAFT))
@ClusterTestDefaults(
types = Array(Type.KRAFT),
serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
)
)
class OffsetFetchRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) {
@ClusterTest(
serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
)
)
@ClusterTest
def testSingleGroupOffsetFetchWithNewConsumerGroupProtocol(): Unit = {
testSingleGroupOffsetFetch(useNewProtocol = true, requireStable = true)
}
@ClusterTest(
serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
)
)
@ClusterTest
def testSingleGroupOffsetFetchWithOldConsumerGroupProtocol(): Unit = {
testSingleGroupOffsetFetch(useNewProtocol = false, requireStable = false)
}
@ClusterTest(
serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
)
)
@ClusterTest
def testSingleGroupAllOffsetFetchWithNewConsumerGroupProtocol(): Unit = {
testSingleGroupAllOffsetFetch(useNewProtocol = true, requireStable = true)
}
@ClusterTest(serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
))
@ClusterTest
def testSingleGroupAllOffsetFetchWithOldConsumerGroupProtocol(): Unit = {
testSingleGroupAllOffsetFetch(useNewProtocol = false, requireStable = false)
}
@ClusterTest(
serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
)
)
@ClusterTest
def testMultiGroupsOffsetFetchWithNewConsumerGroupProtocol(): Unit = {
testMultipleGroupsOffsetFetch(useNewProtocol = true, requireStable = true)
}
@ClusterTest(serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
))
@ClusterTest
def testMultiGroupsOffsetFetchWithOldConsumerGroupProtocol(): Unit = {
testMultipleGroupsOffsetFetch(useNewProtocol = false, requireStable = false)
}

View File

@ -25,7 +25,7 @@ import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
import org.apache.kafka.common.requests.{ShareAcknowledgeRequest, ShareAcknowledgeResponse, ShareFetchRequest, ShareFetchResponse, ShareRequestMetadata}
import org.apache.kafka.common.test.ClusterInstance
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
import org.junit.jupiter.api.{AfterEach, Tag, Timeout}
import org.junit.jupiter.api.{AfterEach, Timeout}
import java.util
import java.util.Collections
@ -35,7 +35,6 @@ import scala.jdk.CollectionConverters._
@ClusterTestDefaults(types = Array(Type.KRAFT), brokers = 1, serverProperties = Array(
new ClusterConfigProperty(key = "group.share.persister.class.name", value = "")
))
@Tag("integration")
class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster){
private final val MAX_WAIT_MS = 5000

View File

@ -31,7 +31,7 @@ import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig
import org.apache.kafka.security.authorizer.AclEntry
import org.apache.kafka.server.config.ServerConfigs
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.{Tag, Timeout}
import org.junit.jupiter.api.Timeout
import java.lang.{Byte => JByte}
import scala.jdk.CollectionConverters._
@ -40,7 +40,6 @@ import scala.jdk.CollectionConverters._
@ClusterTestDefaults(types = Array(Type.KRAFT), brokers = 1, serverProperties = Array(
new ClusterConfigProperty(key = ShareGroupConfig.SHARE_GROUP_PERSISTER_CLASS_NAME_CONFIG, value = "")
))
@Tag("integration")
class ShareGroupDescribeRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) {
@ClusterTest(

View File

@ -26,7 +26,7 @@ import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{ShareGroupHeartbeatRequest, ShareGroupHeartbeatResponse}
import org.apache.kafka.common.test.ClusterInstance
import org.junit.jupiter.api.Assertions.{assertEquals, assertNotEquals, assertNotNull, assertNull, assertTrue}
import org.junit.jupiter.api.{Tag, Timeout}
import org.junit.jupiter.api.Timeout
import java.util
import scala.jdk.CollectionConverters._
@ -35,7 +35,6 @@ import scala.jdk.CollectionConverters._
@ClusterTestDefaults(types = Array(Type.KRAFT), brokers = 1, serverProperties = Array(
new ClusterConfigProperty(key = "group.share.persister.class.name", value = "")
))
@Tag("integration")
class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) {
@ClusterTest(

View File

@ -31,13 +31,16 @@ import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Future}
@ClusterTestDefaults(types = Array(Type.KRAFT))
class SyncGroupRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) {
@ClusterTest(serverProperties = Array(
@ClusterTestDefaults(
types = Array(Type.KRAFT),
serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, value = "1000")
))
)
)
class SyncGroupRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) {
@ClusterTest
def testSyncGroupWithOldConsumerGroupProtocol(): Unit = {
// Creates the __consumer_offsets topics because it won't be created automatically
// in this test because it does not use FindCoordinator API.

View File

@ -30,7 +30,9 @@ import org.junit.jupiter.api.Assertions.{assertThrows, assertTrue}
import scala.jdk.CollectionConverters.IterableHasAsScala
@ClusterTestDefaults(types = Array(Type.KRAFT), serverProperties = Array(
@ClusterTestDefaults(
types = Array(Type.KRAFT),
serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
new ClusterConfigProperty(key = TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, value = "1"),