KAFKA-19413: Extended AuthorizerIntegrationTest to cover StreamsGroupDescribe (#19981)
CI / build (push) Waiting to run Details

Extending test coverage of authorization for streams group RPC
StreamsGroupDescribe. The RPC requires DESCRIBE GROUP and DESCRIBE TOPIC
permissions for all topics.

Reviewers: Bill Bejeck <bbejeck@apache.org>
This commit is contained in:
Lucas Brutschy 2025-06-18 10:19:34 +02:00 committed by GitHub
parent 788781d4bb
commit 2a06335569
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 209 additions and 7 deletions

View File

@ -17,11 +17,12 @@ import java.time.Duration
import java.util import java.util
import java.util.concurrent.{ExecutionException, Semaphore} import java.util.concurrent.{ExecutionException, Semaphore}
import java.util.regex.Pattern import java.util.regex.Pattern
import java.util.{Comparator, Optional, Properties} import java.util.{Comparator, Optional, Properties, UUID}
import kafka.utils.{TestInfoUtils, TestUtils} import kafka.utils.{TestInfoUtils, TestUtils}
import kafka.utils.TestUtils.waitUntilTrue import kafka.utils.TestUtils.waitUntilTrue
import org.apache.kafka.clients.admin.{Admin, AlterConfigOp, ListGroupsOptions, NewTopic} import org.apache.kafka.clients.admin.{Admin, AlterConfigOp, ListGroupsOptions, NewTopic}
import org.apache.kafka.clients.consumer._ import org.apache.kafka.clients.consumer._
import org.apache.kafka.clients.consumer.internals.{StreamsRebalanceData, StreamsRebalanceListener}
import org.apache.kafka.clients.producer._ import org.apache.kafka.clients.producer._
import org.apache.kafka.common.acl.AclOperation._ import org.apache.kafka.common.acl.AclOperation._
import org.apache.kafka.common.acl.AclPermissionType.{ALLOW, DENY} import org.apache.kafka.common.acl.AclPermissionType.{ALLOW, DENY}
@ -37,7 +38,7 @@ import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProt
import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity
import org.apache.kafka.common.message.ListOffsetsRequestData.{ListOffsetsPartition, ListOffsetsTopic} import org.apache.kafka.common.message.ListOffsetsRequestData.{ListOffsetsPartition, ListOffsetsTopic}
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.{OffsetForLeaderPartition, OffsetForLeaderTopic, OffsetForLeaderTopicCollection} import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.{OffsetForLeaderPartition, OffsetForLeaderTopic, OffsetForLeaderTopicCollection}
import org.apache.kafka.common.message.{AddOffsetsToTxnRequestData, AlterPartitionReassignmentsRequestData, AlterReplicaLogDirsRequestData, AlterShareGroupOffsetsRequestData, ConsumerGroupDescribeRequestData, ConsumerGroupHeartbeatRequestData, ConsumerGroupHeartbeatResponseData, CreateAclsRequestData, CreatePartitionsRequestData, CreateTopicsRequestData, DeleteAclsRequestData, DeleteGroupsRequestData, DeleteRecordsRequestData, DeleteShareGroupOffsetsRequestData, DeleteShareGroupStateRequestData, DeleteTopicsRequestData, DescribeClusterRequestData, DescribeConfigsRequestData, DescribeGroupsRequestData, DescribeLogDirsRequestData, DescribeProducersRequestData, DescribeShareGroupOffsetsRequestData, DescribeTransactionsRequestData, FetchResponseData, FindCoordinatorRequestData, HeartbeatRequestData, IncrementalAlterConfigsRequestData, InitializeShareGroupStateRequestData, JoinGroupRequestData, ListPartitionReassignmentsRequestData, ListTransactionsRequestData, MetadataRequestData, OffsetCommitRequestData, OffsetFetchRequestData, OffsetFetchResponseData, ProduceRequestData, ReadShareGroupStateRequestData, ReadShareGroupStateSummaryRequestData, ShareAcknowledgeRequestData, ShareFetchRequestData, ShareGroupDescribeRequestData, ShareGroupHeartbeatRequestData, StreamsGroupHeartbeatRequestData, StreamsGroupHeartbeatResponseData, SyncGroupRequestData, WriteShareGroupStateRequestData, WriteTxnMarkersRequestData} import org.apache.kafka.common.message.{AddOffsetsToTxnRequestData, AlterPartitionReassignmentsRequestData, AlterReplicaLogDirsRequestData, AlterShareGroupOffsetsRequestData, ConsumerGroupDescribeRequestData, ConsumerGroupHeartbeatRequestData, ConsumerGroupHeartbeatResponseData, CreateAclsRequestData, CreatePartitionsRequestData, CreateTopicsRequestData, DeleteAclsRequestData, DeleteGroupsRequestData, DeleteRecordsRequestData, DeleteShareGroupOffsetsRequestData, DeleteShareGroupStateRequestData, DeleteTopicsRequestData, DescribeClusterRequestData, DescribeConfigsRequestData, DescribeGroupsRequestData, DescribeLogDirsRequestData, DescribeProducersRequestData, DescribeShareGroupOffsetsRequestData, DescribeTransactionsRequestData, FetchResponseData, FindCoordinatorRequestData, HeartbeatRequestData, IncrementalAlterConfigsRequestData, InitializeShareGroupStateRequestData, JoinGroupRequestData, ListPartitionReassignmentsRequestData, ListTransactionsRequestData, MetadataRequestData, OffsetCommitRequestData, OffsetFetchRequestData, OffsetFetchResponseData, ProduceRequestData, ReadShareGroupStateRequestData, ReadShareGroupStateSummaryRequestData, ShareAcknowledgeRequestData, ShareFetchRequestData, ShareGroupDescribeRequestData, ShareGroupHeartbeatRequestData, StreamsGroupDescribeRequestData, StreamsGroupHeartbeatRequestData, StreamsGroupHeartbeatResponseData, SyncGroupRequestData, WriteShareGroupStateRequestData, WriteTxnMarkersRequestData}
import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record.{MemoryRecords, RecordBatch, SimpleRecord} import org.apache.kafka.common.record.{MemoryRecords, RecordBatch, SimpleRecord}
@ -76,6 +77,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
val shareGroupDescribeConfigsAcl = Map(shareGroupResource -> Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, DESCRIBE_CONFIGS, ALLOW))) val shareGroupDescribeConfigsAcl = Map(shareGroupResource -> Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, DESCRIBE_CONFIGS, ALLOW)))
val shareGroupAlterConfigsAcl = Map(shareGroupResource -> Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, ALTER_CONFIGS, ALLOW))) val shareGroupAlterConfigsAcl = Map(shareGroupResource -> Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, ALTER_CONFIGS, ALLOW)))
val streamsGroupReadAcl = Map(streamsGroupResource -> Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, READ, ALLOW))) val streamsGroupReadAcl = Map(streamsGroupResource -> Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, READ, ALLOW)))
val streamsGroupDescribeAcl = Map(streamsGroupResource -> Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, DESCRIBE, ALLOW)))
val clusterAcl = Map(clusterResource -> Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, CLUSTER_ACTION, ALLOW))) val clusterAcl = Map(clusterResource -> Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, CLUSTER_ACTION, ALLOW)))
val clusterCreateAcl = Map(clusterResource -> Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, CREATE, ALLOW))) val clusterCreateAcl = Map(clusterResource -> Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, CREATE, ALLOW)))
val clusterAlterAcl = Map(clusterResource -> Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, ALTER, ALLOW))) val clusterAlterAcl = Map(clusterResource -> Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, ALTER, ALLOW)))
@ -225,7 +227,9 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
resp.data.errorCode)), resp.data.errorCode)),
ApiKeys.ALTER_SHARE_GROUP_OFFSETS -> ((resp: AlterShareGroupOffsetsResponse) => Errors.forCode( ApiKeys.ALTER_SHARE_GROUP_OFFSETS -> ((resp: AlterShareGroupOffsetsResponse) => Errors.forCode(
resp.data.errorCode)), resp.data.errorCode)),
ApiKeys.STREAMS_GROUP_HEARTBEAT -> ((resp: StreamsGroupHeartbeatResponse) => Errors.forCode(resp.data.errorCode)) ApiKeys.STREAMS_GROUP_HEARTBEAT -> ((resp: StreamsGroupHeartbeatResponse) => Errors.forCode(resp.data.errorCode)),
ApiKeys.STREAMS_GROUP_DESCRIBE -> ((resp: StreamsGroupDescribeResponse) =>
Errors.forCode(resp.data.groups.asScala.find(g => streamsGroup == g.groupId).head.errorCode))
) )
def findErrorForTopicId(id: Uuid, response: AbstractResponse): Errors = { def findErrorForTopicId(id: Uuid, response: AbstractResponse): Errors = {
@ -294,7 +298,8 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS -> (shareGroupDescribeAcl ++ topicDescribeAcl), ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS -> (shareGroupDescribeAcl ++ topicDescribeAcl),
ApiKeys.DELETE_SHARE_GROUP_OFFSETS -> (shareGroupDeleteAcl ++ topicReadAcl), ApiKeys.DELETE_SHARE_GROUP_OFFSETS -> (shareGroupDeleteAcl ++ topicReadAcl),
ApiKeys.ALTER_SHARE_GROUP_OFFSETS -> (shareGroupReadAcl ++ topicReadAcl), ApiKeys.ALTER_SHARE_GROUP_OFFSETS -> (shareGroupReadAcl ++ topicReadAcl),
ApiKeys.STREAMS_GROUP_HEARTBEAT -> (streamsGroupReadAcl ++ topicDescribeAcl) ApiKeys.STREAMS_GROUP_HEARTBEAT -> (streamsGroupReadAcl ++ topicDescribeAcl),
ApiKeys.STREAMS_GROUP_DESCRIBE -> (streamsGroupDescribeAcl ++ topicDescribeAcl),
) )
private def createMetadataRequest(allowAutoTopicCreation: Boolean) = { private def createMetadataRequest(allowAutoTopicCreation: Boolean) = {
@ -870,6 +875,11 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
).asJava ).asJava
))).build(ApiKeys.STREAMS_GROUP_HEARTBEAT.latestVersion) ))).build(ApiKeys.STREAMS_GROUP_HEARTBEAT.latestVersion)
private def streamsGroupDescribeRequest = new StreamsGroupDescribeRequest.Builder(
new StreamsGroupDescribeRequestData()
.setGroupIds(List(streamsGroup).asJava)
.setIncludeAuthorizedOperations(false)).build(ApiKeys.STREAMS_GROUP_DESCRIBE.latestVersion)
private def sendRequests(requestKeyToRequest: mutable.Map[ApiKeys, AbstractRequest], topicExists: Boolean = true, private def sendRequests(requestKeyToRequest: mutable.Map[ApiKeys, AbstractRequest], topicExists: Boolean = true,
topicNames: Map[Uuid, String] = getTopicNames()) = { topicNames: Map[Uuid, String] = getTopicNames()) = {
for ((key, request) <- requestKeyToRequest) { for ((key, request) <- requestKeyToRequest) {
@ -954,6 +964,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
ApiKeys.DELETE_SHARE_GROUP_OFFSETS -> deleteShareGroupOffsetsRequest, ApiKeys.DELETE_SHARE_GROUP_OFFSETS -> deleteShareGroupOffsetsRequest,
ApiKeys.ALTER_SHARE_GROUP_OFFSETS -> alterShareGroupOffsetsRequest, ApiKeys.ALTER_SHARE_GROUP_OFFSETS -> alterShareGroupOffsetsRequest,
ApiKeys.STREAMS_GROUP_HEARTBEAT -> streamsGroupHeartbeatRequest, ApiKeys.STREAMS_GROUP_HEARTBEAT -> streamsGroupHeartbeatRequest,
ApiKeys.STREAMS_GROUP_DESCRIBE -> streamsGroupDescribeRequest,
// Delete the topic last // Delete the topic last
ApiKeys.DELETE_TOPICS -> deleteTopicsRequest ApiKeys.DELETE_TOPICS -> deleteTopicsRequest
@ -987,7 +998,8 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
ApiKeys.SHARE_FETCH -> createShareFetchRequest, ApiKeys.SHARE_FETCH -> createShareFetchRequest,
ApiKeys.SHARE_ACKNOWLEDGE -> shareAcknowledgeRequest, ApiKeys.SHARE_ACKNOWLEDGE -> shareAcknowledgeRequest,
ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS -> describeShareGroupOffsetsRequest, ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS -> describeShareGroupOffsetsRequest,
ApiKeys.STREAMS_GROUP_HEARTBEAT -> streamsGroupHeartbeatRequest ApiKeys.STREAMS_GROUP_HEARTBEAT -> streamsGroupHeartbeatRequest,
ApiKeys.STREAMS_GROUP_DESCRIBE -> streamsGroupDescribeRequest
) )
sendRequests(requestKeyToRequest, topicExists = false, topicNames) sendRequests(requestKeyToRequest, topicExists = false, topicNames)
@ -3853,6 +3865,165 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
response.data().status()) response.data().status())
} }
private def createStreamsGroupToDescribe(
topicAsSourceTopic: Boolean,
topicAsRepartitionSinkTopic: Boolean,
topicAsRepartitionSourceTopic: Boolean,
topicAsStateChangelogTopics: Boolean
): Unit = {
createTopicWithBrokerPrincipal(sourceTopic)
createTopicWithBrokerPrincipal(topic)
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, READ, ALLOW)), streamsGroupResource)
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, READ, ALLOW)), topicResource)
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, READ, ALLOW)), sourceTopicResource)
streamsConsumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, streamsGroup)
streamsConsumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
val consumer = createStreamsConsumer(streamsRebalanceData = new StreamsRebalanceData(
UUID.randomUUID(),
Optional.empty(),
util.Map.of(
"subtopology-0", new StreamsRebalanceData.Subtopology(
if (topicAsSourceTopic) util.Set.of(sourceTopic, topic) else util.Set.of(sourceTopic),
if (topicAsRepartitionSinkTopic) util.Set.of(topic) else util.Set.of(),
if (topicAsRepartitionSourceTopic)
util.Map.of(topic, new StreamsRebalanceData.TopicInfo(Optional.of(1), Optional.empty(), util.Map.of()))
else util.Map.of(),
if (topicAsStateChangelogTopics)
util.Map.of(topic, new StreamsRebalanceData.TopicInfo(Optional.of(1), Optional.empty(), util.Map.of()))
else util.Map.of(),
util.Set.of()
)),
Map.empty[String, String].asJava
))
consumer.subscribe(
if (topicAsSourceTopic || topicAsRepartitionSourceTopic) util.Set.of(sourceTopic, topic) else util.Set.of(sourceTopic),
new StreamsRebalanceListener {
override def onTasksRevoked(tasks: util.Set[StreamsRebalanceData.TaskId]): Optional[Exception] =
Optional.empty()
override def onTasksAssigned(assignment: StreamsRebalanceData.Assignment): Optional[Exception] =
Optional.empty()
override def onAllTasksLost(): Optional[Exception] =
Optional.empty()
}
)
consumer.poll(Duration.ofMillis(500L))
removeAllClientAcls()
}
@ParameterizedTest
@CsvSource(Array(
"true, false, false, false",
"false, true, false, false",
"false, false, true, false",
"false, false, false, true"
))
def testStreamsGroupDescribeWithGroupDescribeAndTopicDescribeAcl(
topicAsSourceTopic: Boolean,
topicAsRepartitionSinkTopic: Boolean,
topicAsRepartitionSourceTopic: Boolean,
topicAsStateChangelogTopics: Boolean
): Unit = {
createStreamsGroupToDescribe(
topicAsSourceTopic,
topicAsRepartitionSinkTopic,
topicAsRepartitionSourceTopic,
topicAsStateChangelogTopics
)
addAndVerifyAcls(streamsGroupDescribeAcl(streamsGroupResource), streamsGroupResource)
addAndVerifyAcls(sourceTopicDescribeAcl(sourceTopicResource), sourceTopicResource) // Always added, since we need a source topic
addAndVerifyAcls(topicDescribeAcl(topicResource), topicResource)
val request = streamsGroupDescribeRequest
val resource = Set[ResourceType](GROUP, TOPIC)
sendRequestAndVerifyResponseError(request, resource, isAuthorized = true)
}
@ParameterizedTest
@CsvSource(Array(
"true, false, false, false",
"false, true, false, false",
"false, false, true, false",
"false, false, false, true"
))
def testStreamsGroupDescribeWithOperationAll(
topicAsSourceTopic: Boolean,
topicAsRepartitionSinkTopic: Boolean,
topicAsRepartitionSourceTopic: Boolean,
topicAsStateChangelogTopics: Boolean
): Unit = {
createStreamsGroupToDescribe(
topicAsSourceTopic,
topicAsRepartitionSinkTopic,
topicAsRepartitionSourceTopic,
topicAsStateChangelogTopics
)
val allowAllOpsAcl = new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, ALL, ALLOW)
addAndVerifyAcls(Set(allowAllOpsAcl), streamsGroupResource)
addAndVerifyAcls(sourceTopicDescribeAcl(sourceTopicResource), sourceTopicResource) // Always added, since we need a source topic
addAndVerifyAcls(Set(allowAllOpsAcl), topicResource)
val request = streamsGroupDescribeRequest
val resource = Set[ResourceType](GROUP, TOPIC)
sendRequestAndVerifyResponseError(request, resource, isAuthorized = true)
}
@ParameterizedTest
@CsvSource(Array(
"true, false, false, false",
"false, true, false, false",
"false, false, true, false",
"false, false, false, true"
))
def testStreamsGroupDescribeWithoutGroupDescribeAcl(
topicAsSourceTopic: Boolean,
topicAsRepartitionSinkTopic: Boolean,
topicAsRepartitionSourceTopic: Boolean,
topicAsStateChangelogTopics: Boolean
): Unit = {
createStreamsGroupToDescribe(
topicAsSourceTopic,
topicAsRepartitionSinkTopic,
topicAsRepartitionSourceTopic,
topicAsStateChangelogTopics
)
addAndVerifyAcls(sourceTopicDescribeAcl(sourceTopicResource), sourceTopicResource) // Always added, since we need a source topic
addAndVerifyAcls(topicDescribeAcl(topicResource), topicResource)
val request = streamsGroupDescribeRequest
val resource = Set[ResourceType](GROUP, TOPIC)
sendRequestAndVerifyResponseError(request, resource, isAuthorized = false)
}
@ParameterizedTest
@CsvSource(Array(
"true, false, false, false",
"false, true, false, false",
"false, false, true, false",
"false, false, false, true"
))
def testStreamsGroupDescribeWithoutGroupDescribeOrTopicDescribeAcl(
topicAsSourceTopic: Boolean,
topicAsRepartitionSinkTopic: Boolean,
topicAsRepartitionSourceTopic: Boolean,
topicAsStateChangelogTopics: Boolean
): Unit = {
createStreamsGroupToDescribe(
topicAsSourceTopic,
topicAsRepartitionSinkTopic,
topicAsRepartitionSourceTopic,
topicAsStateChangelogTopics
)
val request = streamsGroupDescribeRequest
val resource = Set[ResourceType](GROUP, TOPIC)
addAndVerifyAcls(sourceTopicDescribeAcl(sourceTopicResource), sourceTopicResource) // Always added, since we need a source topic
sendRequestAndVerifyResponseError(request, resource, isAuthorized = false)
}
private def sendAndReceiveFirstRegexHeartbeat(memberId: String, private def sendAndReceiveFirstRegexHeartbeat(memberId: String,
listenerName: ListenerName): ConsumerGroupHeartbeatResponseData = { listenerName: ListenerName): ConsumerGroupHeartbeatResponseData = {
val request = new ConsumerGroupHeartbeatRequest.Builder( val request = new ConsumerGroupHeartbeatRequest.Builder(

View File

@ -22,14 +22,16 @@ import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, KafkaConsume
import kafka.utils.TestUtils import kafka.utils.TestUtils
import kafka.utils.Implicits._ import kafka.utils.Implicits._
import java.util.Properties import java.util.{Optional, Properties}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig} import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig}
import kafka.server.KafkaConfig import kafka.server.KafkaConfig
import kafka.integration.KafkaServerTestHarness import kafka.integration.KafkaServerTestHarness
import kafka.security.JaasTestUtils import kafka.security.JaasTestUtils
import org.apache.kafka.clients.admin.{Admin, AdminClientConfig} import org.apache.kafka.clients.admin.{Admin, AdminClientConfig}
import org.apache.kafka.clients.consumer.internals.{AsyncKafkaConsumer, StreamsRebalanceData}
import org.apache.kafka.common.network.{ConnectionMode, ListenerName} import org.apache.kafka.common.network.{ConnectionMode, ListenerName}
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, Deserializer, Serializer} import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, Deserializer, Serializer}
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.network.SocketServerConfigs import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.raft.MetadataLogConfig import org.apache.kafka.raft.MetadataLogConfig
import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs} import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs}
@ -49,6 +51,7 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness {
val producerConfig = new Properties val producerConfig = new Properties
val consumerConfig = new Properties val consumerConfig = new Properties
val shareConsumerConfig = new Properties val shareConsumerConfig = new Properties
val streamsConsumerConfig = new Properties
val adminClientConfig = new Properties val adminClientConfig = new Properties
val superuserClientConfig = new Properties val superuserClientConfig = new Properties
val serverConfig = new Properties val serverConfig = new Properties
@ -56,6 +59,7 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness {
private val consumers = mutable.Buffer[Consumer[_, _]]() private val consumers = mutable.Buffer[Consumer[_, _]]()
private val shareConsumers = mutable.Buffer[ShareConsumer[_, _]]() private val shareConsumers = mutable.Buffer[ShareConsumer[_, _]]()
private val streamsConsumers = mutable.Buffer[Consumer[_, _]]()
private val producers = mutable.Buffer[KafkaProducer[_, _]]() private val producers = mutable.Buffer[KafkaProducer[_, _]]()
private val adminClients = mutable.Buffer[Admin]() private val adminClients = mutable.Buffer[Admin]()
@ -148,7 +152,12 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness {
shareConsumerConfig.putIfAbsent(ConsumerConfig.GROUP_ID_CONFIG, "group") shareConsumerConfig.putIfAbsent(ConsumerConfig.GROUP_ID_CONFIG, "group")
shareConsumerConfig.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[ByteArrayDeserializer].getName) shareConsumerConfig.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[ByteArrayDeserializer].getName)
shareConsumerConfig.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[ByteArrayDeserializer].getName) shareConsumerConfig.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[ByteArrayDeserializer].getName)
streamsConsumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers())
streamsConsumerConfig.putIfAbsent(ConsumerConfig.GROUP_ID_CONFIG, "group")
streamsConsumerConfig.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[ByteArrayDeserializer].getName)
streamsConsumerConfig.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[ByteArrayDeserializer].getName)
adminClientConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers()) adminClientConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers())
doSuperuserSetup(testInfo) doSuperuserSetup(testInfo)
@ -207,6 +216,25 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness {
shareConsumer shareConsumer
} }
def createStreamsConsumer[K, V](keyDeserializer: Deserializer[K] = new ByteArrayDeserializer,
valueDeserializer: Deserializer[V] = new ByteArrayDeserializer,
configOverrides: Properties = new Properties,
configsToRemove: List[String] = List(),
streamsRebalanceData: StreamsRebalanceData): AsyncKafkaConsumer[K, V] = {
val props = new Properties
props ++= streamsConsumerConfig
props ++= configOverrides
configsToRemove.foreach(props.remove(_))
val streamsConsumer = new AsyncKafkaConsumer[K, V](
new ConsumerConfig(ConsumerConfig.appendDeserializerToConfig(Utils.propsToMap(props), keyDeserializer, valueDeserializer)),
keyDeserializer,
valueDeserializer,
Optional.of(streamsRebalanceData)
)
streamsConsumers += streamsConsumer
streamsConsumer
}
def createAdminClient( def createAdminClient(
listenerName: ListenerName = listenerName, listenerName: ListenerName = listenerName,
configOverrides: Properties = new Properties configOverrides: Properties = new Properties
@ -239,11 +267,14 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness {
consumers.foreach(_.close(Duration.ZERO)) consumers.foreach(_.close(Duration.ZERO))
shareConsumers.foreach(_.wakeup()) shareConsumers.foreach(_.wakeup())
shareConsumers.foreach(_.close(Duration.ZERO)) shareConsumers.foreach(_.close(Duration.ZERO))
streamsConsumers.foreach(_.wakeup())
streamsConsumers.foreach(_.close(Duration.ZERO))
adminClients.foreach(_.close(Duration.ZERO)) adminClients.foreach(_.close(Duration.ZERO))
producers.clear() producers.clear()
consumers.clear() consumers.clear()
shareConsumers.clear() shareConsumers.clear()
streamsConsumers.clear()
adminClients.clear() adminClients.clear()
} finally { } finally {
super.tearDown() super.tearDown()