KAFKA-15742: KRaft support in GroupCoordinatorIntegrationTest (#15086)

updated GroupCoordinatorIntegrationTest.testGroupCoordinatorPropagatesOffsetsTopicCompressionCodec to support KRaft

Reviewers: Justine Olshan <jolshan@confluent.io>
This commit is contained in:
Dmitry Werner 2024-01-03 21:46:12 +05:00 committed by GitHub
parent 60c445bdd5
commit d4aeec3d3f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 8 additions and 6 deletions

View File

@ -15,11 +15,12 @@ package kafka.api
import kafka.integration.KafkaServerTestHarness
import kafka.log.UnifiedLog
import kafka.server.KafkaConfig
import kafka.utils.TestUtils
import kafka.utils.{TestInfoUtils, TestUtils}
import org.apache.kafka.clients.consumer.OffsetAndMetadata
import org.apache.kafka.common.TopicPartition
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import scala.jdk.CollectionConverters._
import java.util.Properties
@ -33,18 +34,19 @@ class GroupCoordinatorIntegrationTest extends KafkaServerTestHarness {
overridingProps.put(KafkaConfig.OffsetsTopicPartitionsProp, "1")
overridingProps.put(KafkaConfig.OffsetsTopicCompressionCodecProp, offsetsTopicCompressionCodec.id.toString)
override def generateConfigs = TestUtils.createBrokerConfigs(1, zkConnect, enableControlledShutdown = false).map {
override def generateConfigs = TestUtils.createBrokerConfigs(1, zkConnectOrNull, enableControlledShutdown = false).map {
KafkaConfig.fromProps(_, overridingProps)
}
@Test
def testGroupCoordinatorPropagatesOffsetsTopicCompressionCodec(): Unit = {
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testGroupCoordinatorPropagatesOffsetsTopicCompressionCodec(quorum: String): Unit = {
val consumer = TestUtils.createConsumer(bootstrapServers())
val offsetMap = Map(
new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0) -> new OffsetAndMetadata(10, "")
).asJava
consumer.commitSync(offsetMap)
val logManager = servers.head.getLogManager
val logManager = brokers.head.logManager
def getGroupMetadataLogOpt: Option[UnifiedLog] =
logManager.getLog(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0))