KAFKA-18614, KAFKA-18613: Add streams group request plumbing (#18979)

This change implements the basic RPC handling StreamsGroupHeartbeat and
StreamsGroupDescribe. This includes:
 - Adding an option to enable streams groups on the broker
- Passing describe and heartbeats to the right shard of the group
coordinator
- The handler inside the GroupMetadatManager for StreamsGroupDescribe is
fairly trivial, and is included directly in this PR.
- The handler for StreamsGroupHeartbeat is complex and not included in
this PR yet. Instead, a UnsupportedOperationException is thrown.
However, the interface is already defined: The result of a
streamsGroupHeartbeat is a response, together with a list of internal
topics to be created.

The heartbeat implementation inside the `GroupMetadataManager`, which
actually implements the assignment / reconciliation logic, will come in
a follow-up PR. Also, automatic creation of internal topics will be
created in a follow-up PR.

Reviewers: Bill Bejeck <bill@confluent.io>
This commit is contained in:
Lucas Brutschy 2025-02-26 16:33:26 +01:00 committed by GitHub
parent 4b5a16bf6f
commit cb7c54ccd3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
17 changed files with 1153 additions and 7 deletions

View File

@ -335,7 +335,7 @@
<suppress checks="ParameterNumber" <suppress checks="ParameterNumber"
files="(ConsumerGroupMember|GroupMetadataManager|GroupCoordinatorConfig).java"/> files="(ConsumerGroupMember|GroupMetadataManager|GroupCoordinatorConfig).java"/>
<suppress checks="ClassDataAbstractionCouplingCheck" <suppress checks="ClassDataAbstractionCouplingCheck"
files="(RecordHelpersTest|GroupCoordinatorRecordHelpers|GroupMetadataManager|GroupMetadataManagerTest|OffsetMetadataManagerTest|GroupCoordinatorServiceTest|GroupCoordinatorShardTest|GroupCoordinatorRecordSerde|StreamsGroupTest).java"/> files="(RecordHelpersTest|GroupCoordinatorRecordHelpers|GroupMetadataManager|GroupCoordinatorService|GroupMetadataManagerTest|OffsetMetadataManagerTest|GroupCoordinatorServiceTest|GroupCoordinatorShardTest|GroupCoordinatorRecordSerde|StreamsGroupTest).java"/>
<suppress checks="JavaNCSS" <suppress checks="JavaNCSS"
files="(GroupMetadataManager|GroupMetadataManagerTest).java"/> files="(GroupMetadataManager|GroupMetadataManagerTest).java"/>

View File

@ -18,13 +18,14 @@ package kafka.coordinator.group
import kafka.server.{KafkaConfig, ReplicaManager} import kafka.server.{KafkaConfig, ReplicaManager}
import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid} import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
import org.apache.kafka.common.message.{ConsumerGroupDescribeResponseData, ConsumerGroupHeartbeatRequestData, ConsumerGroupHeartbeatResponseData, DeleteGroupsResponseData, DescribeGroupsResponseData, DescribeShareGroupOffsetsRequestData, DescribeShareGroupOffsetsResponseData, HeartbeatRequestData, HeartbeatResponseData, JoinGroupRequestData, JoinGroupResponseData, LeaveGroupRequestData, LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteRequestData, OffsetDeleteResponseData, OffsetFetchRequestData, OffsetFetchResponseData, ShareGroupDescribeResponseData, ShareGroupHeartbeatRequestData, ShareGroupHeartbeatResponseData, SyncGroupRequestData, SyncGroupResponseData, TxnOffsetCommitRequestData, TxnOffsetCommitResponseData} import org.apache.kafka.common.message.{ConsumerGroupDescribeResponseData, ConsumerGroupHeartbeatRequestData, ConsumerGroupHeartbeatResponseData, DeleteGroupsResponseData, DescribeGroupsResponseData, DescribeShareGroupOffsetsRequestData, DescribeShareGroupOffsetsResponseData, HeartbeatRequestData, HeartbeatResponseData, JoinGroupRequestData, JoinGroupResponseData, LeaveGroupRequestData, LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteRequestData, OffsetDeleteResponseData, OffsetFetchRequestData, OffsetFetchResponseData, ShareGroupDescribeResponseData, ShareGroupHeartbeatRequestData, ShareGroupHeartbeatResponseData, StreamsGroupDescribeResponseData, StreamsGroupHeartbeatRequestData, SyncGroupRequestData, SyncGroupResponseData, TxnOffsetCommitRequestData, TxnOffsetCommitResponseData}
import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record.RecordBatch import org.apache.kafka.common.record.RecordBatch
import org.apache.kafka.common.requests.{OffsetCommitRequest, RequestContext, TransactionResult} import org.apache.kafka.common.requests.{OffsetCommitRequest, RequestContext, TransactionResult}
import org.apache.kafka.common.utils.{BufferSupplier, Time} import org.apache.kafka.common.utils.{BufferSupplier, Time}
import org.apache.kafka.coordinator.group import org.apache.kafka.coordinator.group
import org.apache.kafka.coordinator.group.streams.StreamsGroupHeartbeatResult
import org.apache.kafka.coordinator.group.OffsetAndMetadata import org.apache.kafka.coordinator.group.OffsetAndMetadata
import org.apache.kafka.image.{MetadataDelta, MetadataImage} import org.apache.kafka.image.{MetadataDelta, MetadataImage}
import org.apache.kafka.server.common.RequestLocal import org.apache.kafka.server.common.RequestLocal
@ -77,6 +78,15 @@ private[group] class GroupCoordinatorAdapter(
)) ))
} }
override def streamsGroupHeartbeat(
context: RequestContext,
request: StreamsGroupHeartbeatRequestData
): CompletableFuture[StreamsGroupHeartbeatResult] = {
FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
s"The old group coordinator does not support ${ApiKeys.STREAMS_GROUP_HEARTBEAT.name} API."
))
}
override def shareGroupHeartbeat( override def shareGroupHeartbeat(
context: RequestContext, context: RequestContext,
request: ShareGroupHeartbeatRequestData request: ShareGroupHeartbeatRequestData
@ -662,6 +672,15 @@ private[group] class GroupCoordinatorAdapter(
)) ))
} }
override def streamsGroupDescribe(
context: RequestContext,
groupIds: util.List[String]
): CompletableFuture[util.List[StreamsGroupDescribeResponseData.DescribedGroup]] = {
FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
s"The old group coordinator does not support ${ApiKeys.STREAMS_GROUP_DESCRIBE.name} API."
))
}
override def shareGroupDescribe( override def shareGroupDescribe(
context: RequestContext, context: RequestContext,
groupIds: util.List[String] groupIds: util.List[String]

View File

@ -236,6 +236,8 @@ class KafkaApis(val requestChannel: RequestChannel,
case ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS => handleDescribeShareGroupOffsetsRequest(request) case ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS => handleDescribeShareGroupOffsetsRequest(request)
case ApiKeys.ALTER_SHARE_GROUP_OFFSETS => handleAlterShareGroupOffsetsRequest(request) case ApiKeys.ALTER_SHARE_GROUP_OFFSETS => handleAlterShareGroupOffsetsRequest(request)
case ApiKeys.DELETE_SHARE_GROUP_OFFSETS => handleDeleteShareGroupOffsetsRequest(request) case ApiKeys.DELETE_SHARE_GROUP_OFFSETS => handleDeleteShareGroupOffsetsRequest(request)
case ApiKeys.STREAMS_GROUP_DESCRIBE => handleStreamsGroupDescribe(request).exceptionally(handleError)
case ApiKeys.STREAMS_GROUP_HEARTBEAT => handleStreamsGroupHeartbeat(request).exceptionally(handleError)
case _ => throw new IllegalStateException(s"No handler for request api key ${request.header.apiKey}") case _ => throw new IllegalStateException(s"No handler for request api key ${request.header.apiKey}")
} }
} catch { } catch {
@ -2599,6 +2601,132 @@ class KafkaApis(val requestChannel: RequestChannel,
} }
private def isStreamsGroupProtocolEnabled(): Boolean = {
groupCoordinator.isNewGroupCoordinator &&
config.groupCoordinatorRebalanceProtocols.contains(Group.GroupType.STREAMS)
}
def handleStreamsGroupHeartbeat(request: RequestChannel.Request): CompletableFuture[Unit] = {
val streamsGroupHeartbeatRequest = request.body[StreamsGroupHeartbeatRequest]
if (!isStreamsGroupProtocolEnabled()) {
// The API is not supported by the "old" group coordinator (the default). If the
// new one is not enabled, we fail directly here.
requestHelper.sendMaybeThrottle(request, streamsGroupHeartbeatRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
CompletableFuture.completedFuture[Unit](())
} else if (!authHelper.authorize(request.context, READ, GROUP, streamsGroupHeartbeatRequest.data.groupId)) {
requestHelper.sendMaybeThrottle(request, streamsGroupHeartbeatRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
CompletableFuture.completedFuture[Unit](())
} else {
if (streamsGroupHeartbeatRequest.data().topology() != null) {
val requiredTopics: Seq[String] =
streamsGroupHeartbeatRequest.data().topology().subtopologies().iterator().asScala.flatMap(subtopology =>
(subtopology.sourceTopics().iterator().asScala:Iterator[String])
++ (subtopology.repartitionSinkTopics().iterator().asScala:Iterator[String])
++ (subtopology.repartitionSourceTopics().iterator().asScala.map(_.name()):Iterator[String])
++ (subtopology.stateChangelogTopics().iterator().asScala.map(_.name()):Iterator[String])
).toSeq
// While correctness of the heartbeat request is checked inside the group coordinator,
// we are checking early that topics in the topology have valid names and are not internal
// kafka topics, since we need to pass it to the authorization helper before passing the
// request to the group coordinator.
val prohibitedTopics = requiredTopics.filter(Topic.isInternal)
if (prohibitedTopics.nonEmpty) {
val errorResponse = new StreamsGroupHeartbeatResponseData()
errorResponse.setErrorCode(Errors.STREAMS_INVALID_TOPOLOGY.code)
errorResponse.setErrorMessage(f"Use of Kafka internal topics ${prohibitedTopics.mkString(",")} in a Kafka Streams topology is prohibited.")
requestHelper.sendMaybeThrottle(request, new StreamsGroupHeartbeatResponse(errorResponse))
return CompletableFuture.completedFuture[Unit](())
}
val invalidTopics = requiredTopics.filterNot(Topic.isValid)
if (invalidTopics.nonEmpty) {
val errorResponse = new StreamsGroupHeartbeatResponseData()
errorResponse.setErrorCode(Errors.STREAMS_INVALID_TOPOLOGY.code)
errorResponse.setErrorMessage(f"Topic names ${invalidTopics.mkString(",")} are not valid topic names.")
requestHelper.sendMaybeThrottle(request, new StreamsGroupHeartbeatResponse(errorResponse))
return CompletableFuture.completedFuture[Unit](())
}
}
groupCoordinator.streamsGroupHeartbeat(
request.context,
streamsGroupHeartbeatRequest.data,
).handle[Unit] { (response, exception) =>
if (exception != null) {
requestHelper.sendMaybeThrottle(request, streamsGroupHeartbeatRequest.getErrorResponse(exception))
} else {
val responseData = response.data()
val topicsToCreate = response.creatableTopics().asScala
if (topicsToCreate.nonEmpty) {
throw new UnsupportedOperationException("Internal topic auto-creation not yet implemented.")
}
requestHelper.sendMaybeThrottle(request, new StreamsGroupHeartbeatResponse(responseData))
}
}
}
}
def handleStreamsGroupDescribe(request: RequestChannel.Request): CompletableFuture[Unit] = {
val streamsGroupDescribeRequest = request.body[StreamsGroupDescribeRequest]
val includeAuthorizedOperations = streamsGroupDescribeRequest.data.includeAuthorizedOperations
if (!isStreamsGroupProtocolEnabled()) {
// The API is not supported by the "old" group coordinator (the default). If the
// new one is not enabled, we fail directly here.
requestHelper.sendMaybeThrottle(request, request.body[StreamsGroupDescribeRequest].getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
CompletableFuture.completedFuture[Unit](())
} else {
val response = new StreamsGroupDescribeResponseData()
val authorizedGroups = new ArrayBuffer[String]()
streamsGroupDescribeRequest.data.groupIds.forEach { groupId =>
if (!authHelper.authorize(request.context, DESCRIBE, GROUP, groupId)) {
response.groups.add(new StreamsGroupDescribeResponseData.DescribedGroup()
.setGroupId(groupId)
.setErrorCode(Errors.GROUP_AUTHORIZATION_FAILED.code)
)
} else {
authorizedGroups += groupId
}
}
groupCoordinator.streamsGroupDescribe(
request.context,
authorizedGroups.asJava
).handle[Unit] { (results, exception) =>
if (exception != null) {
requestHelper.sendMaybeThrottle(request, streamsGroupDescribeRequest.getErrorResponse(exception))
} else {
if (includeAuthorizedOperations) {
results.forEach { groupResult =>
if (groupResult.errorCode == Errors.NONE.code) {
groupResult.setAuthorizedOperations(authHelper.authorizedOperations(
request,
new Resource(ResourceType.GROUP, groupResult.groupId)
))
}
}
}
if (response.groups.isEmpty) {
// If the response is empty, we can directly reuse the results.
response.setGroups(results)
} else {
// Otherwise, we have to copy the results into the existing ones.
response.groups.addAll(results)
}
requestHelper.sendMaybeThrottle(request, new StreamsGroupDescribeResponse(response))
}
}
}
}
def handleGetTelemetrySubscriptionsRequest(request: RequestChannel.Request): Unit = { def handleGetTelemetrySubscriptionsRequest(request: RequestChannel.Request): Unit = {
val subscriptionRequest = request.body[GetTelemetrySubscriptionsRequest] val subscriptionRequest = request.body[GetTelemetrySubscriptionsRequest]
try { try {

View File

@ -411,6 +411,13 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
warn(s"Share groups and the new '${GroupType.SHARE}' rebalance protocol are enabled. " + warn(s"Share groups and the new '${GroupType.SHARE}' rebalance protocol are enabled. " +
"This is part of the early access of KIP-932 and MUST NOT be used in production.") "This is part of the early access of KIP-932 and MUST NOT be used in production.")
} }
if (protocols.contains(GroupType.STREAMS)) {
if (processRoles.isEmpty || !isNewGroupCoordinatorEnabled) {
throw new ConfigException(s"The new '${GroupType.STREAMS}' rebalance protocol is only supported in KRaft cluster with the new group coordinator.")
}
warn(s"The new '${GroupType.STREAMS}' rebalance protocol is enabled along with the new group coordinator. " +
"This is part of the preview of KIP-1071 and MUST NOT be used in production.")
}
protocols protocols
} }

View File

@ -19,7 +19,7 @@ package kafka.coordinator.group
import kafka.coordinator.group.GroupCoordinatorConcurrencyTest.{JoinGroupCallback, SyncGroupCallback} import kafka.coordinator.group.GroupCoordinatorConcurrencyTest.{JoinGroupCallback, SyncGroupCallback}
import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid} import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
import org.apache.kafka.common.errors.{InvalidGroupIdException, UnsupportedVersionException} import org.apache.kafka.common.errors.{InvalidGroupIdException, UnsupportedVersionException}
import org.apache.kafka.common.message.{ConsumerGroupHeartbeatRequestData, DeleteGroupsResponseData, DescribeGroupsResponseData, DescribeShareGroupOffsetsRequestData, HeartbeatRequestData, HeartbeatResponseData, JoinGroupRequestData, JoinGroupResponseData, LeaveGroupRequestData, LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteRequestData, OffsetDeleteResponseData, OffsetFetchRequestData, OffsetFetchResponseData, ShareGroupHeartbeatRequestData, SyncGroupRequestData, SyncGroupResponseData, TxnOffsetCommitRequestData, TxnOffsetCommitResponseData} import org.apache.kafka.common.message.{ConsumerGroupHeartbeatRequestData, DeleteGroupsResponseData, DescribeGroupsResponseData, DescribeShareGroupOffsetsRequestData, HeartbeatRequestData, HeartbeatResponseData, JoinGroupRequestData, JoinGroupResponseData, LeaveGroupRequestData, LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteRequestData, OffsetDeleteResponseData, OffsetFetchRequestData, OffsetFetchResponseData, ShareGroupHeartbeatRequestData, StreamsGroupHeartbeatRequestData, SyncGroupRequestData, SyncGroupResponseData, TxnOffsetCommitRequestData, TxnOffsetCommitResponseData}
import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocol import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocol
import org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember import org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember
import org.apache.kafka.common.message.OffsetDeleteRequestData.{OffsetDeleteRequestPartition, OffsetDeleteRequestTopic, OffsetDeleteRequestTopicCollection} import org.apache.kafka.common.message.OffsetDeleteRequestData.{OffsetDeleteRequestPartition, OffsetDeleteRequestTopic, OffsetDeleteRequestTopicCollection}
@ -79,6 +79,22 @@ class GroupCoordinatorAdapterTest {
assertFutureThrows(classOf[UnsupportedVersionException], future) assertFutureThrows(classOf[UnsupportedVersionException], future)
} }
@Test
def testStreamsGroupHeartbeat(): Unit = {
val groupCoordinator = mock(classOf[GroupCoordinator])
val adapter = new GroupCoordinatorAdapter(groupCoordinator, Time.SYSTEM)
val ctx = makeContext(ApiKeys.STREAMS_GROUP_HEARTBEAT, ApiKeys.STREAMS_GROUP_HEARTBEAT.latestVersion)
val request = new StreamsGroupHeartbeatRequestData()
.setGroupId("group")
val future = adapter.streamsGroupHeartbeat(ctx, request)
assertTrue(future.isDone)
assertTrue(future.isCompletedExceptionally)
assertFutureThrows(classOf[UnsupportedVersionException], future)
}
@Test @Test
def testJoinShareGroup(): Unit = { def testJoinShareGroup(): Unit = {
val groupCoordinator = mock(classOf[GroupCoordinator]) val groupCoordinator = mock(classOf[GroupCoordinator])

View File

@ -75,6 +75,7 @@ import org.apache.kafka.common.utils.{ImplicitLinkedHashCollection, ProducerIdAn
import org.apache.kafka.coordinator.group.GroupConfig.{CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, CONSUMER_SESSION_TIMEOUT_MS_CONFIG, SHARE_AUTO_OFFSET_RESET_CONFIG, SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, SHARE_RECORD_LOCK_DURATION_MS_CONFIG, SHARE_SESSION_TIMEOUT_MS_CONFIG} import org.apache.kafka.coordinator.group.GroupConfig.{CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, CONSUMER_SESSION_TIMEOUT_MS_CONFIG, SHARE_AUTO_OFFSET_RESET_CONFIG, SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, SHARE_RECORD_LOCK_DURATION_MS_CONFIG, SHARE_SESSION_TIMEOUT_MS_CONFIG}
import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig
import org.apache.kafka.coordinator.group.{GroupConfig, GroupCoordinator, GroupCoordinatorConfig} import org.apache.kafka.coordinator.group.{GroupConfig, GroupCoordinator, GroupCoordinatorConfig}
import org.apache.kafka.coordinator.group.streams.StreamsGroupHeartbeatResult
import org.apache.kafka.coordinator.share.{ShareCoordinator, ShareCoordinatorTestConfig} import org.apache.kafka.coordinator.share.{ShareCoordinator, ShareCoordinatorTestConfig}
import org.apache.kafka.coordinator.transaction.TransactionLogConfig import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.image.{MetadataDelta, MetadataImage, MetadataProvenance} import org.apache.kafka.image.{MetadataDelta, MetadataImage, MetadataProvenance}
@ -2408,7 +2409,7 @@ class KafkaApisTest extends Logging {
val markersResponse = capturedResponse.getValue val markersResponse = capturedResponse.getValue
assertEquals(2, markersResponse.errorsByProducerId.size()) assertEquals(2, markersResponse.errorsByProducerId.size())
} }
@Test @Test
def shouldRespondWithUnknownTopicOrPartitionForBadPartitionAndNoErrorsForGoodPartition(): Unit = { def shouldRespondWithUnknownTopicOrPartitionForBadPartitionAndNoErrorsForGoodPartition(): Unit = {
val tp1 = new TopicPartition("t", 0) val tp1 = new TopicPartition("t", 0)
@ -9871,6 +9872,174 @@ class KafkaApisTest extends Logging {
assertEquals(Errors.GROUP_AUTHORIZATION_FAILED.code, response.data.errorCode) assertEquals(Errors.GROUP_AUTHORIZATION_FAILED.code, response.data.errorCode)
} }
@Test
def testStreamsGroupHeartbeatReturnsUnsupportedVersion(): Unit = {
val streamsGroupHeartbeatRequest = new StreamsGroupHeartbeatRequestData().setGroupId("group")
val requestChannelRequest = buildRequest(new StreamsGroupHeartbeatRequest.Builder(streamsGroupHeartbeatRequest).build())
metadataCache = {
val cache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_1)
val delta = new MetadataDelta(MetadataImage.EMPTY);
delta.replay(new FeatureLevelRecord()
.setName(MetadataVersion.FEATURE_NAME)
.setFeatureLevel(MetadataVersion.MINIMUM_VERSION.featureLevel())
)
cache.setImage(delta.apply(MetadataProvenance.EMPTY))
cache
}
kafkaApis = createKafkaApis()
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
val expectedHeartbeatResponse = new StreamsGroupHeartbeatResponseData()
.setErrorCode(Errors.UNSUPPORTED_VERSION.code)
val response = verifyNoThrottling[StreamsGroupHeartbeatResponse](requestChannelRequest)
assertEquals(expectedHeartbeatResponse, response.data)
}
@Test
def testStreamsGroupHeartbeatRequest(): Unit = {
metadataCache = mock(classOf[KRaftMetadataCache])
val streamsGroupHeartbeatRequest = new StreamsGroupHeartbeatRequestData().setGroupId("group")
val requestChannelRequest = buildRequest(new StreamsGroupHeartbeatRequest.Builder(streamsGroupHeartbeatRequest, true).build())
val future = new CompletableFuture[StreamsGroupHeartbeatResult]()
when(groupCoordinator.streamsGroupHeartbeat(
requestChannelRequest.context,
streamsGroupHeartbeatRequest
)).thenReturn(future)
kafkaApis = createKafkaApis(
overrideProperties = Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> "classic,streams")
)
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
val streamsGroupHeartbeatResponse = new StreamsGroupHeartbeatResponseData()
.setMemberId("member")
future.complete(new StreamsGroupHeartbeatResult(streamsGroupHeartbeatResponse, Collections.emptyMap()))
val response = verifyNoThrottling[StreamsGroupHeartbeatResponse](requestChannelRequest)
assertEquals(streamsGroupHeartbeatResponse, response.data)
}
@Test
def testStreamsGroupHeartbeatRequestFutureFailed(): Unit = {
metadataCache = mock(classOf[KRaftMetadataCache])
val streamsGroupHeartbeatRequest = new StreamsGroupHeartbeatRequestData().setGroupId("group")
val requestChannelRequest = buildRequest(new StreamsGroupHeartbeatRequest.Builder(streamsGroupHeartbeatRequest, true).build())
val future = new CompletableFuture[StreamsGroupHeartbeatResult]()
when(groupCoordinator.streamsGroupHeartbeat(
requestChannelRequest.context,
streamsGroupHeartbeatRequest
)).thenReturn(future)
kafkaApis = createKafkaApis(
overrideProperties = Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> "classic,streams")
)
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
future.completeExceptionally(Errors.FENCED_MEMBER_EPOCH.exception)
val response = verifyNoThrottling[StreamsGroupHeartbeatResponse](requestChannelRequest)
assertEquals(Errors.FENCED_MEMBER_EPOCH.code, response.data.errorCode)
}
@Test
def testStreamsGroupHeartbeatRequestAuthorizationFailed(): Unit = {
metadataCache = mock(classOf[KRaftMetadataCache])
val streamsGroupHeartbeatRequest = new StreamsGroupHeartbeatRequestData().setGroupId("group")
val requestChannelRequest = buildRequest(new StreamsGroupHeartbeatRequest.Builder(streamsGroupHeartbeatRequest, true).build())
val authorizer: Authorizer = mock(classOf[Authorizer])
when(authorizer.authorize(any[RequestContext], any[util.List[Action]]))
.thenReturn(Seq(AuthorizationResult.DENIED).asJava)
kafkaApis = createKafkaApis(
authorizer = Some(authorizer),
overrideProperties = Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> "classic,streams")
)
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
val response = verifyNoThrottling[StreamsGroupHeartbeatResponse](requestChannelRequest)
assertEquals(Errors.GROUP_AUTHORIZATION_FAILED.code, response.data.errorCode)
}
@Test
def testStreamsGroupHeartbeatRequestProtocolDisabled(): Unit = {
metadataCache = mock(classOf[KRaftMetadataCache])
val streamsGroupHeartbeatRequest = new StreamsGroupHeartbeatRequestData().setGroupId("group")
val requestChannelRequest = buildRequest(new StreamsGroupHeartbeatRequest.Builder(streamsGroupHeartbeatRequest, true).build())
kafkaApis = createKafkaApis(
overrideProperties = Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> "classic,consumer")
)
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
val response = verifyNoThrottling[StreamsGroupHeartbeatResponse](requestChannelRequest)
assertEquals(Errors.UNSUPPORTED_VERSION.code, response.data.errorCode)
}
@Test
def testStreamsGroupHeartbeatRequestInvalidTopicNames(): Unit = {
metadataCache = mock(classOf[KRaftMetadataCache])
val streamsGroupHeartbeatRequest = new StreamsGroupHeartbeatRequestData().setGroupId("group").setTopology(
new StreamsGroupHeartbeatRequestData.Topology()
.setEpoch(3)
.setSubtopologies(
Collections.singletonList(new StreamsGroupHeartbeatRequestData.Subtopology().setSubtopologyId("subtopology")
.setSourceTopics(Collections.singletonList("a "))
.setRepartitionSinkTopics(Collections.singletonList("b?"))
.setRepartitionSourceTopics(Collections.singletonList(new StreamsGroupHeartbeatRequestData.TopicInfo().setName("c!")))
.setStateChangelogTopics(Collections.singletonList(new StreamsGroupHeartbeatRequestData.TopicInfo().setName("d/")))
)
)
)
val requestChannelRequest = buildRequest(new StreamsGroupHeartbeatRequest.Builder(streamsGroupHeartbeatRequest, true).build())
kafkaApis = createKafkaApis(
overrideProperties = Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> "classic,streams")
)
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
val response = verifyNoThrottling[StreamsGroupHeartbeatResponse](requestChannelRequest)
assertEquals(Errors.STREAMS_INVALID_TOPOLOGY.code, response.data.errorCode)
assertEquals("Topic names a ,b?,c!,d/ are not valid topic names.", response.data.errorMessage())
}
@Test
def testStreamsGroupHeartbeatRequestInternalTopicNames(): Unit = {
metadataCache = mock(classOf[KRaftMetadataCache])
val streamsGroupHeartbeatRequest = new StreamsGroupHeartbeatRequestData().setGroupId("group").setTopology(
new StreamsGroupHeartbeatRequestData.Topology()
.setEpoch(3)
.setSubtopologies(
Collections.singletonList(new StreamsGroupHeartbeatRequestData.Subtopology().setSubtopologyId("subtopology")
.setSourceTopics(Collections.singletonList("__consumer_offsets"))
.setRepartitionSinkTopics(Collections.singletonList("__transaction_state"))
.setRepartitionSourceTopics(Collections.singletonList(new StreamsGroupHeartbeatRequestData.TopicInfo().setName("__share_group_state")))
)
)
)
val requestChannelRequest = buildRequest(new StreamsGroupHeartbeatRequest.Builder(streamsGroupHeartbeatRequest, true).build())
kafkaApis = createKafkaApis(
overrideProperties = Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> "classic,streams")
)
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
val response = verifyNoThrottling[StreamsGroupHeartbeatResponse](requestChannelRequest)
assertEquals(Errors.STREAMS_INVALID_TOPOLOGY.code, response.data.errorCode)
assertEquals("Use of Kafka internal topics __consumer_offsets,__transaction_state,__share_group_state in a Kafka Streams topology is prohibited.", response.data.errorMessage())
}
@ParameterizedTest @ParameterizedTest
@ValueSource(booleans = Array(true, false)) @ValueSource(booleans = Array(true, false))
def testConsumerGroupDescribe(includeAuthorizedOperations: Boolean): Unit = { def testConsumerGroupDescribe(includeAuthorizedOperations: Boolean): Unit = {
@ -9998,6 +10167,133 @@ class KafkaApisTest extends Logging {
assertEquals(Errors.FENCED_MEMBER_EPOCH.code, response.data.groups.get(0).errorCode) assertEquals(Errors.FENCED_MEMBER_EPOCH.code, response.data.groups.get(0).errorCode)
} }
@ParameterizedTest
@ValueSource(booleans = Array(true, false))
def testStreamsGroupDescribe(includeAuthorizedOperations: Boolean): Unit = {
metadataCache = mock(classOf[KRaftMetadataCache])
val groupIds = List("group-id-0", "group-id-1", "group-id-2").asJava
val streamsGroupDescribeRequestData = new StreamsGroupDescribeRequestData()
.setIncludeAuthorizedOperations(includeAuthorizedOperations)
streamsGroupDescribeRequestData.groupIds.addAll(groupIds)
val requestChannelRequest = buildRequest(new StreamsGroupDescribeRequest.Builder(streamsGroupDescribeRequestData, true).build())
val future = new CompletableFuture[util.List[StreamsGroupDescribeResponseData.DescribedGroup]]()
when(groupCoordinator.streamsGroupDescribe(
any[RequestContext],
any[util.List[String]]
)).thenReturn(future)
kafkaApis = createKafkaApis(
overrideProperties = Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> "classic,streams")
)
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
future.complete(List(
new StreamsGroupDescribeResponseData.DescribedGroup().setGroupId(groupIds.get(0)),
new StreamsGroupDescribeResponseData.DescribedGroup().setGroupId(groupIds.get(1)),
new StreamsGroupDescribeResponseData.DescribedGroup().setGroupId(groupIds.get(2))
).asJava)
var authorizedOperationsInt = Int.MinValue;
if (includeAuthorizedOperations) {
authorizedOperationsInt = Utils.to32BitField(
AclEntry.supportedOperations(ResourceType.GROUP).asScala
.map(_.code.asInstanceOf[JByte]).asJava)
}
// Can't reuse the above list here because we would not test the implementation in KafkaApis then
val describedGroups = List(
new StreamsGroupDescribeResponseData.DescribedGroup().setGroupId(groupIds.get(0)),
new StreamsGroupDescribeResponseData.DescribedGroup().setGroupId(groupIds.get(1)),
new StreamsGroupDescribeResponseData.DescribedGroup().setGroupId(groupIds.get(2))
).map(group => group.setAuthorizedOperations(authorizedOperationsInt))
val expectedStreamsGroupDescribeResponseData = new StreamsGroupDescribeResponseData()
.setGroups(describedGroups.asJava)
val response = verifyNoThrottling[StreamsGroupDescribeResponse](requestChannelRequest)
assertEquals(expectedStreamsGroupDescribeResponseData, response.data)
}
@Test
def testStreamsGroupDescribeReturnsUnsupportedVersion(): Unit = {
val groupId = "group0"
val streamsGroupDescribeRequestData = new StreamsGroupDescribeRequestData()
streamsGroupDescribeRequestData.groupIds.add(groupId)
val requestChannelRequest = buildRequest(new StreamsGroupDescribeRequest.Builder(streamsGroupDescribeRequestData, true).build())
val errorCode = Errors.UNSUPPORTED_VERSION.code
val expectedDescribedGroup = new StreamsGroupDescribeResponseData.DescribedGroup().setGroupId(groupId).setErrorCode(errorCode)
val expectedResponse = new StreamsGroupDescribeResponseData()
expectedResponse.groups.add(expectedDescribedGroup)
metadataCache = {
val cache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_1)
val delta = new MetadataDelta(MetadataImage.EMPTY);
delta.replay(new FeatureLevelRecord()
.setName(MetadataVersion.FEATURE_NAME)
.setFeatureLevel(MetadataVersion.MINIMUM_VERSION.featureLevel())
)
cache.setImage(delta.apply(MetadataProvenance.EMPTY))
cache
}
kafkaApis = createKafkaApis()
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
val response = verifyNoThrottling[StreamsGroupDescribeResponse](requestChannelRequest)
assertEquals(expectedResponse, response.data)
}
@Test
def testStreamsGroupDescribeAuthorizationFailed(): Unit = {
metadataCache = mock(classOf[KRaftMetadataCache])
val streamsGroupDescribeRequestData = new StreamsGroupDescribeRequestData()
streamsGroupDescribeRequestData.groupIds.add("group-id")
val requestChannelRequest = buildRequest(new StreamsGroupDescribeRequest.Builder(streamsGroupDescribeRequestData, true).build())
val authorizer: Authorizer = mock(classOf[Authorizer])
when(authorizer.authorize(any[RequestContext], any[util.List[Action]]))
.thenReturn(Seq(AuthorizationResult.DENIED).asJava)
val future = new CompletableFuture[util.List[StreamsGroupDescribeResponseData.DescribedGroup]]()
when(groupCoordinator.streamsGroupDescribe(
any[RequestContext],
any[util.List[String]]
)).thenReturn(future)
future.complete(List().asJava)
kafkaApis = createKafkaApis(
authorizer = Some(authorizer),
overrideProperties = Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> "classic,streams")
)
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
val response = verifyNoThrottling[StreamsGroupDescribeResponse](requestChannelRequest)
assertEquals(Errors.GROUP_AUTHORIZATION_FAILED.code, response.data.groups.get(0).errorCode)
}
@Test
def testStreamsGroupDescribeFutureFailed(): Unit = {
metadataCache = mock(classOf[KRaftMetadataCache])
val streamsGroupDescribeRequestData = new StreamsGroupDescribeRequestData()
streamsGroupDescribeRequestData.groupIds.add("group-id")
val requestChannelRequest = buildRequest(new StreamsGroupDescribeRequest.Builder(streamsGroupDescribeRequestData, true).build())
val future = new CompletableFuture[util.List[StreamsGroupDescribeResponseData.DescribedGroup]]()
when(groupCoordinator.streamsGroupDescribe(
any[RequestContext],
any[util.List[String]]
)).thenReturn(future)
kafkaApis = createKafkaApis(
overrideProperties = Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> "classic,streams")
)
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
future.completeExceptionally(Errors.FENCED_MEMBER_EPOCH.exception)
val response = verifyNoThrottling[StreamsGroupDescribeResponse](requestChannelRequest)
assertEquals(Errors.FENCED_MEMBER_EPOCH.code, response.data.groups.get(0).errorCode)
}
@Test @Test
def testGetTelemetrySubscriptions(): Unit = { def testGetTelemetrySubscriptions(): Unit = {
val request = buildRequest(new GetTelemetrySubscriptionsRequest.Builder( val request = buildRequest(new GetTelemetrySubscriptionsRequest.Builder(

View File

@ -1753,6 +1753,11 @@ class KafkaConfigTest {
assertEquals(Set(GroupType.CLASSIC, GroupType.CONSUMER, GroupType.SHARE), config.groupCoordinatorRebalanceProtocols) assertEquals(Set(GroupType.CLASSIC, GroupType.CONSUMER, GroupType.SHARE), config.groupCoordinatorRebalanceProtocols)
assertTrue(config.isNewGroupCoordinatorEnabled) assertTrue(config.isNewGroupCoordinatorEnabled)
assertTrue(config.shareGroupConfig.isShareGroupEnabled) assertTrue(config.shareGroupConfig.isShareGroupEnabled)
props.put(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, "classic,streams")
val config2 = KafkaConfig.fromProps(props)
assertEquals(Set(GroupType.CLASSIC, GroupType.STREAMS), config2.groupCoordinatorRebalanceProtocols)
assertTrue(config2.isNewGroupCoordinatorEnabled)
} }
@Test @Test

View File

@ -41,6 +41,8 @@ import org.apache.kafka.common.message.OffsetFetchResponseData;
import org.apache.kafka.common.message.ShareGroupDescribeResponseData; import org.apache.kafka.common.message.ShareGroupDescribeResponseData;
import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData; import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData;
import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData; import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData;
import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData;
import org.apache.kafka.common.message.SyncGroupRequestData; import org.apache.kafka.common.message.SyncGroupRequestData;
import org.apache.kafka.common.message.SyncGroupResponseData; import org.apache.kafka.common.message.SyncGroupResponseData;
import org.apache.kafka.common.message.TxnOffsetCommitRequestData; import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
@ -48,6 +50,7 @@ import org.apache.kafka.common.message.TxnOffsetCommitResponseData;
import org.apache.kafka.common.requests.RequestContext; import org.apache.kafka.common.requests.RequestContext;
import org.apache.kafka.common.requests.TransactionResult; import org.apache.kafka.common.requests.TransactionResult;
import org.apache.kafka.common.utils.BufferSupplier; import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.coordinator.group.streams.StreamsGroupHeartbeatResult;
import org.apache.kafka.image.MetadataDelta; import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage; import org.apache.kafka.image.MetadataImage;
@ -84,6 +87,20 @@ public interface GroupCoordinator {
ConsumerGroupHeartbeatRequestData request ConsumerGroupHeartbeatRequestData request
); );
/**
* Heartbeat to a Streams Group.
*
* @param context The request context.
* @param request The StreamsGroupHeartbeatResponseData data.
*
* @return A future yielding the response together with internal topics to create.
* The error code(s) of the response are set to indicate the error(s) occurred during the execution.
*/
CompletableFuture<StreamsGroupHeartbeatResult> streamsGroupHeartbeat(
RequestContext context,
StreamsGroupHeartbeatRequestData request
);
/** /**
* Heartbeat to a Share Group. * Heartbeat to a Share Group.
* *
@ -199,6 +216,19 @@ public interface GroupCoordinator {
List<String> groupIds List<String> groupIds
); );
/**
* Describe streams groups.
*
* @param context The coordinator request context.
* @param groupIds The group ids.
*
* @return A future yielding the results or an exception.
*/
CompletableFuture<List<StreamsGroupDescribeResponseData.DescribedGroup>> streamsGroupDescribe(
RequestContext context,
List<String> groupIds
);
/** /**
* Describe share groups. * Describe share groups.
* *

View File

@ -48,6 +48,9 @@ import org.apache.kafka.common.message.ShareGroupDescribeResponseData;
import org.apache.kafka.common.message.ShareGroupDescribeResponseData.DescribedGroup; import org.apache.kafka.common.message.ShareGroupDescribeResponseData.DescribedGroup;
import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData; import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData;
import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData; import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData;
import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData;
import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
import org.apache.kafka.common.message.SyncGroupRequestData; import org.apache.kafka.common.message.SyncGroupRequestData;
import org.apache.kafka.common.message.SyncGroupResponseData; import org.apache.kafka.common.message.SyncGroupResponseData;
import org.apache.kafka.common.message.TxnOffsetCommitRequestData; import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
@ -61,6 +64,7 @@ import org.apache.kafka.common.requests.DescribeShareGroupOffsetsRequest;
import org.apache.kafka.common.requests.OffsetCommitRequest; import org.apache.kafka.common.requests.OffsetCommitRequest;
import org.apache.kafka.common.requests.RequestContext; import org.apache.kafka.common.requests.RequestContext;
import org.apache.kafka.common.requests.ShareGroupDescribeRequest; import org.apache.kafka.common.requests.ShareGroupDescribeRequest;
import org.apache.kafka.common.requests.StreamsGroupDescribeRequest;
import org.apache.kafka.common.requests.TransactionResult; import org.apache.kafka.common.requests.TransactionResult;
import org.apache.kafka.common.requests.TxnOffsetCommitRequest; import org.apache.kafka.common.requests.TxnOffsetCommitRequest;
import org.apache.kafka.common.utils.BufferSupplier; import org.apache.kafka.common.utils.BufferSupplier;
@ -77,6 +81,7 @@ import org.apache.kafka.coordinator.common.runtime.CoordinatorShardBuilderSuppli
import org.apache.kafka.coordinator.common.runtime.MultiThreadedEventProcessor; import org.apache.kafka.coordinator.common.runtime.MultiThreadedEventProcessor;
import org.apache.kafka.coordinator.common.runtime.PartitionWriter; import org.apache.kafka.coordinator.common.runtime.PartitionWriter;
import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics; import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics;
import org.apache.kafka.coordinator.group.streams.StreamsGroupHeartbeatResult;
import org.apache.kafka.image.MetadataDelta; import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage; import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.server.record.BrokerCompressionType; import org.apache.kafka.server.record.BrokerCompressionType;
@ -373,6 +378,44 @@ public class GroupCoordinatorService implements GroupCoordinator {
)); ));
} }
/**
* See
* {@link GroupCoordinator#streamsGroupHeartbeat(RequestContext, org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData)}.
*/
@Override
public CompletableFuture<StreamsGroupHeartbeatResult> streamsGroupHeartbeat(
RequestContext context,
StreamsGroupHeartbeatRequestData request
) {
if (!isActive.get()) {
return CompletableFuture.completedFuture(
new StreamsGroupHeartbeatResult(
new StreamsGroupHeartbeatResponseData().setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()),
Collections.emptyMap()
)
);
}
return runtime.scheduleWriteOperation(
"streams-group-heartbeat",
topicPartitionFor(request.groupId()),
Duration.ofMillis(config.offsetCommitTimeoutMs()),
coordinator -> coordinator.streamsGroupHeartbeat(context, request)
).exceptionally(exception -> handleOperationException(
"streams-group-heartbeat",
request,
exception,
(error, message) ->
new StreamsGroupHeartbeatResult(
new StreamsGroupHeartbeatResponseData()
.setErrorCode(error.code())
.setErrorMessage(message),
Collections.emptyMap()
),
log
));
}
/** /**
* See {@link GroupCoordinator#shareGroupHeartbeat(RequestContext, ShareGroupHeartbeatRequestData)}. * See {@link GroupCoordinator#shareGroupHeartbeat(RequestContext, ShareGroupHeartbeatRequestData)}.
*/ */
@ -690,6 +733,58 @@ public class GroupCoordinatorService implements GroupCoordinator {
return FutureUtils.combineFutures(futures, ArrayList::new, List::addAll); return FutureUtils.combineFutures(futures, ArrayList::new, List::addAll);
} }
/**
* See {@link GroupCoordinator#streamsGroupDescribe(RequestContext, List)}.
*/
@Override
public CompletableFuture<List<StreamsGroupDescribeResponseData.DescribedGroup>> streamsGroupDescribe(
RequestContext context,
List<String> groupIds
) {
if (!isActive.get()) {
return CompletableFuture.completedFuture(StreamsGroupDescribeRequest.getErrorDescribedGroupList(
groupIds,
Errors.COORDINATOR_NOT_AVAILABLE
));
}
final List<CompletableFuture<List<StreamsGroupDescribeResponseData.DescribedGroup>>> futures =
new ArrayList<>(groupIds.size());
final Map<TopicPartition, List<String>> groupsByTopicPartition = new HashMap<>();
groupIds.forEach(groupId -> {
if (isGroupIdNotEmpty(groupId)) {
groupsByTopicPartition
.computeIfAbsent(topicPartitionFor(groupId), __ -> new ArrayList<>())
.add(groupId);
} else {
futures.add(CompletableFuture.completedFuture(Collections.singletonList(
new StreamsGroupDescribeResponseData.DescribedGroup()
.setGroupId(null)
.setErrorCode(Errors.INVALID_GROUP_ID.code())
)));
}
});
groupsByTopicPartition.forEach((topicPartition, groupList) -> {
CompletableFuture<List<StreamsGroupDescribeResponseData.DescribedGroup>> future =
runtime.scheduleReadOperation(
"streams-group-describe",
topicPartition,
(coordinator, lastCommittedOffset) -> coordinator.streamsGroupDescribe(groupIds, lastCommittedOffset)
).exceptionally(exception -> handleOperationException(
"streams-group-describe",
groupList,
exception,
(error, __) -> StreamsGroupDescribeRequest.getErrorDescribedGroupList(groupList, error),
log
));
futures.add(future);
});
return FutureUtils.combineFutures(futures, ArrayList::new, List::addAll);
}
/** /**
* See {@link GroupCoordinator#shareGroupDescribe(RequestContext, List)}. * See {@link GroupCoordinator#shareGroupDescribe(RequestContext, List)}.
*/ */

View File

@ -42,6 +42,8 @@ import org.apache.kafka.common.message.OffsetFetchResponseData;
import org.apache.kafka.common.message.ShareGroupDescribeResponseData; import org.apache.kafka.common.message.ShareGroupDescribeResponseData;
import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData; import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData;
import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData; import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData;
import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData;
import org.apache.kafka.common.message.SyncGroupRequestData; import org.apache.kafka.common.message.SyncGroupRequestData;
import org.apache.kafka.common.message.SyncGroupResponseData; import org.apache.kafka.common.message.SyncGroupResponseData;
import org.apache.kafka.common.message.TxnOffsetCommitRequestData; import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
@ -110,6 +112,7 @@ import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics; import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics;
import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard; import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard;
import org.apache.kafka.coordinator.group.modern.share.ShareGroup; import org.apache.kafka.coordinator.group.modern.share.ShareGroup;
import org.apache.kafka.coordinator.group.streams.StreamsGroupHeartbeatResult;
import org.apache.kafka.image.MetadataDelta; import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage; import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.server.common.ApiMessageAndVersion;
@ -374,6 +377,22 @@ public class GroupCoordinatorShard implements CoordinatorShard<CoordinatorRecord
return groupMetadataManager.consumerGroupHeartbeat(context, request); return groupMetadataManager.consumerGroupHeartbeat(context, request);
} }
/**
* Handles a StreamsGroupHeartbeat request.
*
* @param context The request context.
* @param request The actual StreamsGroupHeartbeat request.
*
* @return A Result containing the StreamsGroupHeartbeat response, a list of internal topics to be created and
* a list of records to update the state machine.
*/
public CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> streamsGroupHeartbeat(
RequestContext context,
StreamsGroupHeartbeatRequestData request
) {
return groupMetadataManager.streamsGroupHeartbeat(context, request);
}
/** /**
* Handles a ShareGroupHeartbeat request. * Handles a ShareGroupHeartbeat request.
* *
@ -626,6 +645,21 @@ public class GroupCoordinatorShard implements CoordinatorShard<CoordinatorRecord
return groupMetadataManager.consumerGroupDescribe(groupIds, committedOffset); return groupMetadataManager.consumerGroupDescribe(groupIds, committedOffset);
} }
/**
* Handles a StreamsGroupDescribe request.
*
* @param groupIds The IDs of the groups to describe.
*
* @return A list containing the StreamsGroupDescribeResponseData.DescribedGroup.
*
*/
public List<StreamsGroupDescribeResponseData.DescribedGroup> streamsGroupDescribe(
List<String> groupIds,
long committedOffset
) {
return groupMetadataManager.streamsGroupDescribe(groupIds, committedOffset);
}
/** /**
* Handles a ShareGroupDescribe request. * Handles a ShareGroupDescribe request.
* *

View File

@ -54,6 +54,8 @@ import org.apache.kafka.common.message.ListGroupsResponseData;
import org.apache.kafka.common.message.ShareGroupDescribeResponseData; import org.apache.kafka.common.message.ShareGroupDescribeResponseData;
import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData; import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData;
import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData; import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData;
import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData;
import org.apache.kafka.common.message.SyncGroupRequestData; import org.apache.kafka.common.message.SyncGroupRequestData;
import org.apache.kafka.common.message.SyncGroupResponseData; import org.apache.kafka.common.message.SyncGroupResponseData;
import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.Errors;
@ -133,6 +135,7 @@ import org.apache.kafka.coordinator.group.modern.share.ShareGroup;
import org.apache.kafka.coordinator.group.modern.share.ShareGroupAssignmentBuilder; import org.apache.kafka.coordinator.group.modern.share.ShareGroupAssignmentBuilder;
import org.apache.kafka.coordinator.group.modern.share.ShareGroupMember; import org.apache.kafka.coordinator.group.modern.share.ShareGroupMember;
import org.apache.kafka.coordinator.group.streams.StreamsGroup; import org.apache.kafka.coordinator.group.streams.StreamsGroup;
import org.apache.kafka.coordinator.group.streams.StreamsGroupHeartbeatResult;
import org.apache.kafka.coordinator.group.streams.StreamsGroupMember; import org.apache.kafka.coordinator.group.streams.StreamsGroupMember;
import org.apache.kafka.coordinator.group.streams.StreamsTopology; import org.apache.kafka.coordinator.group.streams.StreamsTopology;
import org.apache.kafka.coordinator.group.streams.TasksTuple; import org.apache.kafka.coordinator.group.streams.TasksTuple;
@ -615,6 +618,35 @@ public class GroupMetadataManager {
return describedGroups; return describedGroups;
} }
/**
* Handles a StreamsGroupDescribe request.
*
* @param groupIds The IDs of the groups to describe.
* @param committedOffset A specified committed offset corresponding to this shard.
*
* @return A list containing the StreamsGroupDescribeResponseData.DescribedGroup.
* If a group is not found, the DescribedGroup will contain the error code and message.
*/
public List<StreamsGroupDescribeResponseData.DescribedGroup> streamsGroupDescribe(
List<String> groupIds,
long committedOffset
) {
final List<StreamsGroupDescribeResponseData.DescribedGroup> describedGroups = new ArrayList<>();
groupIds.forEach(groupId -> {
try {
describedGroups.add(streamsGroup(groupId, committedOffset).asDescribedGroup(committedOffset));
} catch (GroupIdNotFoundException exception) {
describedGroups.add(new StreamsGroupDescribeResponseData.DescribedGroup()
.setGroupId(groupId)
.setErrorCode(Errors.GROUP_ID_NOT_FOUND.code())
.setErrorMessage(exception.getMessage())
);
}
});
return describedGroups;
}
/** /**
* Handles a DescribeGroup request. * Handles a DescribeGroup request.
* *
@ -3787,6 +3819,22 @@ public class GroupMetadataManager {
} }
} }
/**
* Handles a StreamsGroupHeartbeat request.
*
* @param context The request context.
* @param request The actual StreamsGroupHeartbeat request.
*
* @return A Result containing the StreamsGroupHeartbeat response, a list of internal topics to create and
* a list of records to update the state machine.
*/
public CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> streamsGroupHeartbeat(
RequestContext context,
StreamsGroupHeartbeatRequestData request
) throws ApiException {
throw new UnsupportedOperationException("StreamsGroupHeartbeat is not implemented yet.");
}
/** /**
* Replays StreamsGroupTopologyKey/Value to update the hard state of * Replays StreamsGroupTopologyKey/Value to update the hard state of
* the streams group. * the streams group.

View File

@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.streams;
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
/**
* A simple record to hold the result of a StreamsGroupHeartbeat request.
*
* @param data The data to be returned to the client.
* @param creatableTopics The internal topics to be created.
*/
public record StreamsGroupHeartbeatResult(StreamsGroupHeartbeatResponseData data, Map<String, CreatableTopic> creatableTopics) {
public StreamsGroupHeartbeatResult {
Objects.requireNonNull(data);
creatableTopics = Objects.requireNonNull(Collections.unmodifiableMap(creatableTopics));
}
}

View File

@ -355,10 +355,10 @@ public record StreamsGroupMember(String memberId,
private static List<StreamsGroupDescribeResponseData.TaskIds> taskIdsFromMap(Map<String, Set<Integer>> tasks) { private static List<StreamsGroupDescribeResponseData.TaskIds> taskIdsFromMap(Map<String, Set<Integer>> tasks) {
List<StreamsGroupDescribeResponseData.TaskIds> taskIds = new ArrayList<>(); List<StreamsGroupDescribeResponseData.TaskIds> taskIds = new ArrayList<>();
tasks.forEach((subtopologyId, partitionSet) -> { tasks.keySet().stream().sorted().forEach(subtopologyId -> {
taskIds.add(new StreamsGroupDescribeResponseData.TaskIds() taskIds.add(new StreamsGroupDescribeResponseData.TaskIds()
.setSubtopologyId(subtopologyId) .setSubtopologyId(subtopologyId)
.setPartitions(new ArrayList<>(partitionSet))); .setPartitions(tasks.get(subtopologyId).stream().sorted().toList()));
}); });
return taskIds; return taskIds;
} }

View File

@ -32,6 +32,9 @@ import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
import org.apache.kafka.common.errors.RebalanceInProgressException; import org.apache.kafka.common.errors.RebalanceInProgressException;
import org.apache.kafka.common.errors.RecordBatchTooLargeException; import org.apache.kafka.common.errors.RecordBatchTooLargeException;
import org.apache.kafka.common.errors.RecordTooLargeException; import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.StreamsInvalidTopologyEpochException;
import org.apache.kafka.common.errors.StreamsInvalidTopologyException;
import org.apache.kafka.common.errors.StreamsTopologyFencedException;
import org.apache.kafka.common.errors.UnknownMemberIdException; import org.apache.kafka.common.errors.UnknownMemberIdException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.internals.Topic; import org.apache.kafka.common.internals.Topic;
@ -59,6 +62,9 @@ import org.apache.kafka.common.message.ReadShareGroupStateSummaryResponseData;
import org.apache.kafka.common.message.ShareGroupDescribeResponseData; import org.apache.kafka.common.message.ShareGroupDescribeResponseData;
import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData; import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData;
import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData; import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData;
import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData;
import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
import org.apache.kafka.common.message.SyncGroupRequestData; import org.apache.kafka.common.message.SyncGroupRequestData;
import org.apache.kafka.common.message.SyncGroupResponseData; import org.apache.kafka.common.message.SyncGroupResponseData;
import org.apache.kafka.common.message.TxnOffsetCommitRequestData; import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
@ -78,6 +84,7 @@ import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord; import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime; import org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime;
import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics; import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics;
import org.apache.kafka.coordinator.group.streams.StreamsGroupHeartbeatResult;
import org.apache.kafka.image.MetadataImage; import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.TopicsImage; import org.apache.kafka.image.TopicsImage;
import org.apache.kafka.server.record.BrokerCompressionType; import org.apache.kafka.server.record.BrokerCompressionType;
@ -263,6 +270,118 @@ public class GroupCoordinatorServiceTest {
); );
} }
@Test
public void testStreamsGroupHeartbeatWhenNotStarted() throws ExecutionException, InterruptedException {
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
.setConfig(createConfig())
.setRuntime(runtime)
.build();
StreamsGroupHeartbeatRequestData request = new StreamsGroupHeartbeatRequestData()
.setGroupId("foo");
CompletableFuture<StreamsGroupHeartbeatResult> future = service.streamsGroupHeartbeat(
requestContext(ApiKeys.STREAMS_GROUP_HEARTBEAT),
request
);
assertEquals(
new StreamsGroupHeartbeatResult(
new StreamsGroupHeartbeatResponseData().setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()),
Collections.emptyMap()
),
future.get()
);
}
@Test
public void testStreamsGroupHeartbeat() throws ExecutionException, InterruptedException, TimeoutException {
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
.setRuntime(runtime)
.setConfig(createConfig())
.build(true);
StreamsGroupHeartbeatRequestData request = new StreamsGroupHeartbeatRequestData()
.setGroupId("foo");
when(runtime.scheduleWriteOperation(
ArgumentMatchers.eq("streams-group-heartbeat"),
ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
ArgumentMatchers.eq(Duration.ofMillis(5000)),
ArgumentMatchers.any()
)).thenReturn(CompletableFuture.completedFuture(
new StreamsGroupHeartbeatResult(
new StreamsGroupHeartbeatResponseData(),
Collections.emptyMap()
)
));
CompletableFuture<StreamsGroupHeartbeatResult> future = service.streamsGroupHeartbeat(
requestContext(ApiKeys.STREAMS_GROUP_HEARTBEAT),
request
);
assertEquals(new StreamsGroupHeartbeatResult(new StreamsGroupHeartbeatResponseData(), Collections.emptyMap()), future.get(5, TimeUnit.SECONDS));
}
private static Stream<Arguments> testStreamsGroupHeartbeatWithExceptionSource() {
return Stream.of(
Arguments.arguments(new UnknownTopicOrPartitionException(), Errors.COORDINATOR_NOT_AVAILABLE.code(), null),
Arguments.arguments(new NotEnoughReplicasException(), Errors.COORDINATOR_NOT_AVAILABLE.code(), null),
Arguments.arguments(new org.apache.kafka.common.errors.TimeoutException(), Errors.COORDINATOR_NOT_AVAILABLE.code(), null),
Arguments.arguments(new NotLeaderOrFollowerException(), Errors.NOT_COORDINATOR.code(), null),
Arguments.arguments(new KafkaStorageException(), Errors.NOT_COORDINATOR.code(), null),
Arguments.arguments(new RecordTooLargeException(), Errors.UNKNOWN_SERVER_ERROR.code(), null),
Arguments.arguments(new RecordBatchTooLargeException(), Errors.UNKNOWN_SERVER_ERROR.code(), null),
Arguments.arguments(new InvalidFetchSizeException(""), Errors.UNKNOWN_SERVER_ERROR.code(), null),
Arguments.arguments(new InvalidRequestException("Invalid"), Errors.INVALID_REQUEST.code(), "Invalid"),
Arguments.arguments(new StreamsInvalidTopologyException("Invalid"), Errors.STREAMS_INVALID_TOPOLOGY.code(), "Invalid"),
Arguments.arguments(new StreamsTopologyFencedException("Invalid"), Errors.STREAMS_TOPOLOGY_FENCED.code(), "Invalid"),
Arguments.arguments(new StreamsInvalidTopologyEpochException("Invalid"), Errors.STREAMS_INVALID_TOPOLOGY_EPOCH.code(), "Invalid")
);
}
@ParameterizedTest
@MethodSource("testStreamsGroupHeartbeatWithExceptionSource")
public void testStreamsGroupHeartbeatWithException(
Throwable exception,
short expectedErrorCode,
String expectedErrorMessage
) throws ExecutionException, InterruptedException, TimeoutException {
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
.setRuntime(runtime)
.setConfig(createConfig())
.build(true);
StreamsGroupHeartbeatRequestData request = new StreamsGroupHeartbeatRequestData()
.setGroupId("foo");
when(runtime.scheduleWriteOperation(
ArgumentMatchers.eq("streams-group-heartbeat"),
ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
ArgumentMatchers.eq(Duration.ofMillis(5000)),
ArgumentMatchers.any()
)).thenReturn(FutureUtils.failedFuture(exception));
CompletableFuture<StreamsGroupHeartbeatResult> future = service.streamsGroupHeartbeat(
requestContext(ApiKeys.STREAMS_GROUP_HEARTBEAT),
request
);
assertEquals(
new StreamsGroupHeartbeatResult(
new StreamsGroupHeartbeatResponseData()
.setErrorCode(expectedErrorCode)
.setErrorMessage(expectedErrorMessage),
Collections.emptyMap()
),
future.get(5, TimeUnit.SECONDS)
);
}
@Test @Test
public void testPartitionFor() { public void testPartitionFor() {
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime(); CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
@ -1409,6 +1528,135 @@ public class GroupCoordinatorServiceTest {
); );
} }
@Test
public void testStreamsGroupDescribe() throws InterruptedException, ExecutionException {
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
.setConfig(createConfig())
.setRuntime(runtime)
.build();
int partitionCount = 2;
service.startup(() -> partitionCount);
StreamsGroupDescribeResponseData.DescribedGroup describedGroup1 = new StreamsGroupDescribeResponseData.DescribedGroup()
.setGroupId("group-id-1");
StreamsGroupDescribeResponseData.DescribedGroup describedGroup2 = new StreamsGroupDescribeResponseData.DescribedGroup()
.setGroupId("group-id-2");
List<StreamsGroupDescribeResponseData.DescribedGroup> expectedDescribedGroups = Arrays.asList(
describedGroup1,
describedGroup2
);
when(runtime.scheduleReadOperation(
ArgumentMatchers.eq("streams-group-describe"),
ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
ArgumentMatchers.any()
)).thenReturn(CompletableFuture.completedFuture(Collections.singletonList(describedGroup1)));
CompletableFuture<Object> describedGroupFuture = new CompletableFuture<>();
when(runtime.scheduleReadOperation(
ArgumentMatchers.eq("streams-group-describe"),
ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 1)),
ArgumentMatchers.any()
)).thenReturn(describedGroupFuture);
CompletableFuture<List<StreamsGroupDescribeResponseData.DescribedGroup>> future =
service.streamsGroupDescribe(requestContext(ApiKeys.STREAMS_GROUP_DESCRIBE), Arrays.asList("group-id-1", "group-id-2"));
assertFalse(future.isDone());
describedGroupFuture.complete(Collections.singletonList(describedGroup2));
assertEquals(expectedDescribedGroups, future.get());
}
@Test
public void testStreamsGroupDescribeInvalidGroupId() throws ExecutionException, InterruptedException {
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
.setConfig(createConfig())
.setRuntime(runtime)
.build();
int partitionCount = 1;
service.startup(() -> partitionCount);
StreamsGroupDescribeResponseData.DescribedGroup describedGroup = new StreamsGroupDescribeResponseData.DescribedGroup()
.setGroupId(null)
.setErrorCode(Errors.INVALID_GROUP_ID.code());
List<StreamsGroupDescribeResponseData.DescribedGroup> expectedDescribedGroups = Arrays.asList(
new StreamsGroupDescribeResponseData.DescribedGroup()
.setGroupId(null)
.setErrorCode(Errors.INVALID_GROUP_ID.code()),
describedGroup
);
when(runtime.scheduleReadOperation(
ArgumentMatchers.eq("streams-group-describe"),
ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
ArgumentMatchers.any()
)).thenReturn(CompletableFuture.completedFuture(Collections.singletonList(describedGroup)));
CompletableFuture<List<StreamsGroupDescribeResponseData.DescribedGroup>> future =
service.streamsGroupDescribe(requestContext(ApiKeys.STREAMS_GROUP_DESCRIBE), Arrays.asList("", null));
assertEquals(expectedDescribedGroups, future.get());
}
@Test
public void testStreamsGroupDescribeCoordinatorLoadInProgress() throws ExecutionException, InterruptedException {
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
.setConfig(createConfig())
.setRuntime(runtime)
.build();
int partitionCount = 1;
service.startup(() -> partitionCount);
when(runtime.scheduleReadOperation(
ArgumentMatchers.eq("streams-group-describe"),
ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
ArgumentMatchers.any()
)).thenReturn(FutureUtils.failedFuture(
new CoordinatorLoadInProgressException(null)
));
CompletableFuture<List<StreamsGroupDescribeResponseData.DescribedGroup>> future =
service.streamsGroupDescribe(requestContext(ApiKeys.STREAMS_GROUP_DESCRIBE), Collections.singletonList("group-id"));
assertEquals(
Collections.singletonList(new StreamsGroupDescribeResponseData.DescribedGroup()
.setGroupId("group-id")
.setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code())
),
future.get()
);
}
@Test
public void testStreamsGroupDescribeCoordinatorNotActive() throws ExecutionException, InterruptedException {
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
.setConfig(createConfig())
.setRuntime(runtime)
.build();
when(runtime.scheduleReadOperation(
ArgumentMatchers.eq("streams-group-describe"),
ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
ArgumentMatchers.any()
)).thenReturn(FutureUtils.failedFuture(
Errors.COORDINATOR_NOT_AVAILABLE.exception()
));
CompletableFuture<List<StreamsGroupDescribeResponseData.DescribedGroup>> future =
service.streamsGroupDescribe(requestContext(ApiKeys.STREAMS_GROUP_DESCRIBE), Collections.singletonList("group-id"));
assertEquals(
Collections.singletonList(new StreamsGroupDescribeResponseData.DescribedGroup()
.setGroupId("group-id")
.setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
),
future.get()
);
}
@Test @Test
public void testDeleteOffsets() throws Exception { public void testDeleteOffsets() throws Exception {
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime(); CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();

View File

@ -27,6 +27,8 @@ import org.apache.kafka.common.message.OffsetCommitRequestData;
import org.apache.kafka.common.message.OffsetCommitResponseData; import org.apache.kafka.common.message.OffsetCommitResponseData;
import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData; import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData;
import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData; import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData;
import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData;
import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
import org.apache.kafka.common.message.TxnOffsetCommitRequestData; import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
import org.apache.kafka.common.message.TxnOffsetCommitResponseData; import org.apache.kafka.common.message.TxnOffsetCommitResponseData;
import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.ApiKeys;
@ -81,6 +83,7 @@ import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignment
import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyKey; import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyKey;
import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue; import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
import org.apache.kafka.coordinator.group.modern.share.ShareGroup; import org.apache.kafka.coordinator.group.modern.share.ShareGroup;
import org.apache.kafka.coordinator.group.streams.StreamsGroupHeartbeatResult;
import org.apache.kafka.image.MetadataImage; import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.share.persister.DeleteShareGroupStateParameters; import org.apache.kafka.server.share.persister.DeleteShareGroupStateParameters;
@ -125,7 +128,7 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
@SuppressWarnings({"ClassFanOutComplexity"}) @SuppressWarnings("ClassFanOutComplexity")
public class GroupCoordinatorShardTest { public class GroupCoordinatorShardTest {
@Test @Test
@ -160,6 +163,38 @@ public class GroupCoordinatorShardTest {
assertEquals(result, coordinator.consumerGroupHeartbeat(context, request)); assertEquals(result, coordinator.consumerGroupHeartbeat(context, request));
} }
@Test
public void testStreamsGroupHeartbeat() {
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class);
CoordinatorMetricsShard metricsShard = mock(CoordinatorMetricsShard.class);
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
new LogContext(),
groupMetadataManager,
offsetMetadataManager,
Time.SYSTEM,
new MockCoordinatorTimer<>(Time.SYSTEM),
mock(GroupCoordinatorConfig.class),
coordinatorMetrics,
metricsShard
);
RequestContext context = requestContext(ApiKeys.STREAMS_GROUP_HEARTBEAT);
StreamsGroupHeartbeatRequestData request = new StreamsGroupHeartbeatRequestData();
CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> result = new CoordinatorResult<>(
Collections.emptyList(),
new StreamsGroupHeartbeatResult(new StreamsGroupHeartbeatResponseData(), Collections.emptyMap())
);
when(groupMetadataManager.streamsGroupHeartbeat(
context,
request
)).thenReturn(result);
assertEquals(result, coordinator.streamsGroupHeartbeat(context, request));
}
@Test @Test
public void testCommitOffset() { public void testCommitOffset() {
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);

View File

@ -56,6 +56,7 @@ import org.apache.kafka.common.message.ListGroupsResponseData;
import org.apache.kafka.common.message.ShareGroupDescribeResponseData; import org.apache.kafka.common.message.ShareGroupDescribeResponseData;
import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData; import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData;
import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData; import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData;
import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
import org.apache.kafka.common.message.SyncGroupRequestData; import org.apache.kafka.common.message.SyncGroupRequestData;
import org.apache.kafka.common.message.SyncGroupRequestData.SyncGroupRequestAssignment; import org.apache.kafka.common.message.SyncGroupRequestData.SyncGroupRequestAssignment;
import org.apache.kafka.common.message.SyncGroupResponseData; import org.apache.kafka.common.message.SyncGroupResponseData;
@ -96,6 +97,7 @@ import org.apache.kafka.coordinator.group.modern.share.ShareGroupBuilder;
import org.apache.kafka.coordinator.group.modern.share.ShareGroupMember; import org.apache.kafka.coordinator.group.modern.share.ShareGroupMember;
import org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers; import org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers;
import org.apache.kafka.coordinator.group.streams.StreamsGroup; import org.apache.kafka.coordinator.group.streams.StreamsGroup;
import org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState;
import org.apache.kafka.coordinator.group.streams.StreamsGroupBuilder; import org.apache.kafka.coordinator.group.streams.StreamsGroupBuilder;
import org.apache.kafka.coordinator.group.streams.StreamsGroupMember; import org.apache.kafka.coordinator.group.streams.StreamsGroupMember;
import org.apache.kafka.coordinator.group.streams.StreamsTopology; import org.apache.kafka.coordinator.group.streams.StreamsTopology;
@ -119,6 +121,7 @@ import org.junit.jupiter.params.provider.ValueSource;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
@ -8952,6 +8955,112 @@ public class GroupMetadataManagerTest {
assertEquals(expected, actual); assertEquals(expected, actual);
} }
@Test
public void testStreamsGroupDescribeNoErrors() {
List<String> streamsGroupIds = Arrays.asList("group-id-1", "group-id-2");
int epoch = 10;
String memberId = "member-id";
StreamsGroupMember.Builder memberBuilder = streamsGroupMemberBuilderWithDefaults(memberId)
.setClientTags(Collections.singletonMap("clientTag", "clientValue"))
.setProcessId("processId")
.setMemberEpoch(epoch)
.setPreviousMemberEpoch(epoch - 1);
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.withStreamsGroup(new StreamsGroupBuilder(streamsGroupIds.get(0), epoch))
.withStreamsGroup(new StreamsGroupBuilder(streamsGroupIds.get(1), epoch)
.withMember(memberBuilder.build()))
.build();
List<StreamsGroupDescribeResponseData.DescribedGroup> expected = Arrays.asList(
new StreamsGroupDescribeResponseData.DescribedGroup()
.setGroupEpoch(epoch)
.setGroupId(streamsGroupIds.get(0))
.setGroupState(StreamsGroupState.EMPTY.toString())
.setAssignmentEpoch(0),
new StreamsGroupDescribeResponseData.DescribedGroup()
.setGroupEpoch(epoch)
.setGroupId(streamsGroupIds.get(1))
.setMembers(Collections.singletonList(
memberBuilder.build().asStreamsGroupDescribeMember(
TasksTuple.EMPTY
)
))
.setGroupState(StreamsGroupState.NOT_READY.toString())
);
List<StreamsGroupDescribeResponseData.DescribedGroup> actual = context.sendStreamsGroupDescribe(streamsGroupIds);
assertEquals(expected, actual);
}
@Test
public void testStreamsGroupDescribeWithErrors() {
String groupId = "groupId";
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder().build();
List<StreamsGroupDescribeResponseData.DescribedGroup> actual = context.sendStreamsGroupDescribe(Collections.singletonList(groupId));
StreamsGroupDescribeResponseData.DescribedGroup describedGroup = new StreamsGroupDescribeResponseData.DescribedGroup()
.setGroupId(groupId)
.setErrorCode(Errors.GROUP_ID_NOT_FOUND.code())
.setErrorMessage("Group groupId not found.");
List<StreamsGroupDescribeResponseData.DescribedGroup> expected = Collections.singletonList(
describedGroup
);
assertEquals(expected, actual);
}
@Test
public void testStreamsGroupDescribeBeforeAndAfterCommittingOffset() {
String streamsGroupId = "streamsGroupId";
int epoch = 10;
String memberId1 = "memberId1";
String memberId2 = "memberId2";
String subtopologyId = "subtopology1";
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder().build();
StreamsGroupMember.Builder memberBuilder1 = streamsGroupMemberBuilderWithDefaults(memberId1);
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(streamsGroupId, memberBuilder1.build()));
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(streamsGroupId, memberBuilder1.build()));
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(streamsGroupId, epoch + 1));
TasksTuple assignment = new TasksTuple(
Map.of(subtopologyId, Set.of(0, 1)),
Map.of(subtopologyId, Set.of(0, 1)),
Map.of(subtopologyId, Set.of(0, 1))
);
StreamsGroupMember.Builder memberBuilder2 = streamsGroupMemberBuilderWithDefaults(memberId2);
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(streamsGroupId, memberBuilder2.build()));
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(streamsGroupId, memberId2, assignment));
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(streamsGroupId, memberBuilder2.build()));
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(streamsGroupId, epoch + 2));
List<StreamsGroupDescribeResponseData.DescribedGroup> actual = context.groupMetadataManager.streamsGroupDescribe(Collections.singletonList(streamsGroupId), context.lastCommittedOffset);
StreamsGroupDescribeResponseData.DescribedGroup describedGroup = new StreamsGroupDescribeResponseData.DescribedGroup()
.setGroupId(streamsGroupId)
.setErrorCode(Errors.GROUP_ID_NOT_FOUND.code())
.setErrorMessage("Group streamsGroupId not found.");
assertEquals(1, actual.size());
assertEquals(describedGroup, actual.get(0));
// Commit the offset and test again
context.commit();
actual = context.groupMetadataManager.streamsGroupDescribe(Collections.singletonList(streamsGroupId), context.lastCommittedOffset);
describedGroup = new StreamsGroupDescribeResponseData.DescribedGroup()
.setGroupId(streamsGroupId)
.setMembers(Arrays.asList(
memberBuilder1.build().asStreamsGroupDescribeMember(TasksTuple.EMPTY),
memberBuilder2.build().asStreamsGroupDescribeMember(assignment)
))
.setGroupState(StreamsGroup.StreamsGroupState.NOT_READY.toString())
.setGroupEpoch(epoch + 2);
assertEquals(1, actual.size());
assertEquals(describedGroup, actual.get(0));
}
@Test @Test
public void testDescribeGroupStable() { public void testDescribeGroupStable() {
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()

View File

@ -37,6 +37,8 @@ import org.apache.kafka.common.message.ListGroupsResponseData;
import org.apache.kafka.common.message.ShareGroupDescribeResponseData; import org.apache.kafka.common.message.ShareGroupDescribeResponseData;
import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData; import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData;
import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData; import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData;
import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData;
import org.apache.kafka.common.message.SyncGroupRequestData; import org.apache.kafka.common.message.SyncGroupRequestData;
import org.apache.kafka.common.message.SyncGroupResponseData; import org.apache.kafka.common.message.SyncGroupResponseData;
import org.apache.kafka.common.network.ClientInformation; import org.apache.kafka.common.network.ClientInformation;
@ -108,6 +110,7 @@ import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupBuilder;
import org.apache.kafka.coordinator.group.modern.share.ShareGroup; import org.apache.kafka.coordinator.group.modern.share.ShareGroup;
import org.apache.kafka.coordinator.group.modern.share.ShareGroupBuilder; import org.apache.kafka.coordinator.group.modern.share.ShareGroupBuilder;
import org.apache.kafka.coordinator.group.streams.StreamsGroupBuilder; import org.apache.kafka.coordinator.group.streams.StreamsGroupBuilder;
import org.apache.kafka.coordinator.group.streams.StreamsGroupHeartbeatResult;
import org.apache.kafka.image.MetadataImage; import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.timeline.SnapshotRegistry; import org.apache.kafka.timeline.SnapshotRegistry;
@ -676,6 +679,36 @@ public class GroupMetadataManagerTestContext {
return result; return result;
} }
public CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> streamsGroupHeartbeat(
StreamsGroupHeartbeatRequestData request
) {
RequestContext context = new RequestContext(
new RequestHeader(
ApiKeys.STREAMS_GROUP_HEARTBEAT,
ApiKeys.STREAMS_GROUP_HEARTBEAT.latestVersion(),
"client",
0
),
"1",
InetAddress.getLoopbackAddress(),
KafkaPrincipal.ANONYMOUS,
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT),
SecurityProtocol.PLAINTEXT,
ClientInformation.EMPTY,
false
);
CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> result = groupMetadataManager.streamsGroupHeartbeat(
context,
request
);
if (result.replayRecords()) {
result.records().forEach(this::replay);
}
return result;
}
public List<MockCoordinatorTimer.ExpiredTimeout<Void, CoordinatorRecord>> sleep(long ms) { public List<MockCoordinatorTimer.ExpiredTimeout<Void, CoordinatorRecord>> sleep(long ms) {
time.sleep(ms); time.sleep(ms);
List<MockCoordinatorTimer.ExpiredTimeout<Void, CoordinatorRecord>> timeouts = timer.poll(); List<MockCoordinatorTimer.ExpiredTimeout<Void, CoordinatorRecord>> timeouts = timer.poll();
@ -1286,6 +1319,10 @@ public class GroupMetadataManagerTestContext {
return groupMetadataManager.consumerGroupDescribe(groupIds, lastCommittedOffset); return groupMetadataManager.consumerGroupDescribe(groupIds, lastCommittedOffset);
} }
public List<StreamsGroupDescribeResponseData.DescribedGroup> sendStreamsGroupDescribe(List<String> groupIds) {
return groupMetadataManager.streamsGroupDescribe(groupIds, lastCommittedOffset);
}
public List<DescribeGroupsResponseData.DescribedGroup> describeGroups(List<String> groupIds) { public List<DescribeGroupsResponseData.DescribedGroup> describeGroups(List<String> groupIds) {
RequestContext context = new RequestContext( RequestContext context = new RequestContext(
new RequestHeader( new RequestHeader(