diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 678351c8da9..1fc7aa0b2d2 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -335,7 +335,7 @@
+ files="(RecordHelpersTest|GroupCoordinatorRecordHelpers|GroupMetadataManager|GroupCoordinatorService|GroupMetadataManagerTest|OffsetMetadataManagerTest|GroupCoordinatorServiceTest|GroupCoordinatorShardTest|GroupCoordinatorRecordSerde|StreamsGroupTest).java"/>
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala
index e13f0f278cf..22dbc3a3563 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala
@@ -18,13 +18,14 @@ package kafka.coordinator.group
import kafka.server.{KafkaConfig, ReplicaManager}
import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
-import org.apache.kafka.common.message.{ConsumerGroupDescribeResponseData, ConsumerGroupHeartbeatRequestData, ConsumerGroupHeartbeatResponseData, DeleteGroupsResponseData, DescribeGroupsResponseData, DescribeShareGroupOffsetsRequestData, DescribeShareGroupOffsetsResponseData, HeartbeatRequestData, HeartbeatResponseData, JoinGroupRequestData, JoinGroupResponseData, LeaveGroupRequestData, LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteRequestData, OffsetDeleteResponseData, OffsetFetchRequestData, OffsetFetchResponseData, ShareGroupDescribeResponseData, ShareGroupHeartbeatRequestData, ShareGroupHeartbeatResponseData, SyncGroupRequestData, SyncGroupResponseData, TxnOffsetCommitRequestData, TxnOffsetCommitResponseData}
+import org.apache.kafka.common.message.{ConsumerGroupDescribeResponseData, ConsumerGroupHeartbeatRequestData, ConsumerGroupHeartbeatResponseData, DeleteGroupsResponseData, DescribeGroupsResponseData, DescribeShareGroupOffsetsRequestData, DescribeShareGroupOffsetsResponseData, HeartbeatRequestData, HeartbeatResponseData, JoinGroupRequestData, JoinGroupResponseData, LeaveGroupRequestData, LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteRequestData, OffsetDeleteResponseData, OffsetFetchRequestData, OffsetFetchResponseData, ShareGroupDescribeResponseData, ShareGroupHeartbeatRequestData, ShareGroupHeartbeatResponseData, StreamsGroupDescribeResponseData, StreamsGroupHeartbeatRequestData, SyncGroupRequestData, SyncGroupResponseData, TxnOffsetCommitRequestData, TxnOffsetCommitResponseData}
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record.RecordBatch
import org.apache.kafka.common.requests.{OffsetCommitRequest, RequestContext, TransactionResult}
import org.apache.kafka.common.utils.{BufferSupplier, Time}
import org.apache.kafka.coordinator.group
+import org.apache.kafka.coordinator.group.streams.StreamsGroupHeartbeatResult
import org.apache.kafka.coordinator.group.OffsetAndMetadata
import org.apache.kafka.image.{MetadataDelta, MetadataImage}
import org.apache.kafka.server.common.RequestLocal
@@ -77,6 +78,15 @@ private[group] class GroupCoordinatorAdapter(
))
}
+ override def streamsGroupHeartbeat(
+ context: RequestContext,
+ request: StreamsGroupHeartbeatRequestData
+ ): CompletableFuture[StreamsGroupHeartbeatResult] = {
+ FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
+ s"The old group coordinator does not support ${ApiKeys.STREAMS_GROUP_HEARTBEAT.name} API."
+ ))
+ }
+
override def shareGroupHeartbeat(
context: RequestContext,
request: ShareGroupHeartbeatRequestData
@@ -662,6 +672,15 @@ private[group] class GroupCoordinatorAdapter(
))
}
+ override def streamsGroupDescribe(
+ context: RequestContext,
+ groupIds: util.List[String]
+ ): CompletableFuture[util.List[StreamsGroupDescribeResponseData.DescribedGroup]] = {
+ FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
+ s"The old group coordinator does not support ${ApiKeys.STREAMS_GROUP_DESCRIBE.name} API."
+ ))
+ }
+
override def shareGroupDescribe(
context: RequestContext,
groupIds: util.List[String]
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index aa1f392506e..f3421a21bc6 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -236,6 +236,8 @@ class KafkaApis(val requestChannel: RequestChannel,
case ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS => handleDescribeShareGroupOffsetsRequest(request)
case ApiKeys.ALTER_SHARE_GROUP_OFFSETS => handleAlterShareGroupOffsetsRequest(request)
case ApiKeys.DELETE_SHARE_GROUP_OFFSETS => handleDeleteShareGroupOffsetsRequest(request)
+ case ApiKeys.STREAMS_GROUP_DESCRIBE => handleStreamsGroupDescribe(request).exceptionally(handleError)
+ case ApiKeys.STREAMS_GROUP_HEARTBEAT => handleStreamsGroupHeartbeat(request).exceptionally(handleError)
case _ => throw new IllegalStateException(s"No handler for request api key ${request.header.apiKey}")
}
} catch {
@@ -2599,6 +2601,132 @@ class KafkaApis(val requestChannel: RequestChannel,
}
+ private def isStreamsGroupProtocolEnabled(): Boolean = {
+ groupCoordinator.isNewGroupCoordinator &&
+ config.groupCoordinatorRebalanceProtocols.contains(Group.GroupType.STREAMS)
+ }
+
+ def handleStreamsGroupHeartbeat(request: RequestChannel.Request): CompletableFuture[Unit] = {
+ val streamsGroupHeartbeatRequest = request.body[StreamsGroupHeartbeatRequest]
+
+ if (!isStreamsGroupProtocolEnabled()) {
+ // The API is not supported by the "old" group coordinator (the default). If the
+ // new one is not enabled, we fail directly here.
+ requestHelper.sendMaybeThrottle(request, streamsGroupHeartbeatRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
+ CompletableFuture.completedFuture[Unit](())
+ } else if (!authHelper.authorize(request.context, READ, GROUP, streamsGroupHeartbeatRequest.data.groupId)) {
+ requestHelper.sendMaybeThrottle(request, streamsGroupHeartbeatRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
+ CompletableFuture.completedFuture[Unit](())
+ } else {
+ if (streamsGroupHeartbeatRequest.data().topology() != null) {
+ val requiredTopics: Seq[String] =
+ streamsGroupHeartbeatRequest.data().topology().subtopologies().iterator().asScala.flatMap(subtopology =>
+ (subtopology.sourceTopics().iterator().asScala:Iterator[String])
+ ++ (subtopology.repartitionSinkTopics().iterator().asScala:Iterator[String])
+ ++ (subtopology.repartitionSourceTopics().iterator().asScala.map(_.name()):Iterator[String])
+ ++ (subtopology.stateChangelogTopics().iterator().asScala.map(_.name()):Iterator[String])
+ ).toSeq
+
+ // While correctness of the heartbeat request is checked inside the group coordinator,
+ // we are checking early that topics in the topology have valid names and are not internal
+ // kafka topics, since we need to pass it to the authorization helper before passing the
+ // request to the group coordinator.
+
+ val prohibitedTopics = requiredTopics.filter(Topic.isInternal)
+ if (prohibitedTopics.nonEmpty) {
+ val errorResponse = new StreamsGroupHeartbeatResponseData()
+ errorResponse.setErrorCode(Errors.STREAMS_INVALID_TOPOLOGY.code)
+ errorResponse.setErrorMessage(f"Use of Kafka internal topics ${prohibitedTopics.mkString(",")} in a Kafka Streams topology is prohibited.")
+ requestHelper.sendMaybeThrottle(request, new StreamsGroupHeartbeatResponse(errorResponse))
+ return CompletableFuture.completedFuture[Unit](())
+ }
+
+ val invalidTopics = requiredTopics.filterNot(Topic.isValid)
+ if (invalidTopics.nonEmpty) {
+ val errorResponse = new StreamsGroupHeartbeatResponseData()
+ errorResponse.setErrorCode(Errors.STREAMS_INVALID_TOPOLOGY.code)
+ errorResponse.setErrorMessage(f"Topic names ${invalidTopics.mkString(",")} are not valid topic names.")
+ requestHelper.sendMaybeThrottle(request, new StreamsGroupHeartbeatResponse(errorResponse))
+ return CompletableFuture.completedFuture[Unit](())
+ }
+ }
+
+ groupCoordinator.streamsGroupHeartbeat(
+ request.context,
+ streamsGroupHeartbeatRequest.data,
+ ).handle[Unit] { (response, exception) =>
+ if (exception != null) {
+ requestHelper.sendMaybeThrottle(request, streamsGroupHeartbeatRequest.getErrorResponse(exception))
+ } else {
+ val responseData = response.data()
+ val topicsToCreate = response.creatableTopics().asScala
+ if (topicsToCreate.nonEmpty) {
+ throw new UnsupportedOperationException("Internal topic auto-creation not yet implemented.")
+ }
+
+ requestHelper.sendMaybeThrottle(request, new StreamsGroupHeartbeatResponse(responseData))
+ }
+ }
+ }
+ }
+
+ def handleStreamsGroupDescribe(request: RequestChannel.Request): CompletableFuture[Unit] = {
+ val streamsGroupDescribeRequest = request.body[StreamsGroupDescribeRequest]
+ val includeAuthorizedOperations = streamsGroupDescribeRequest.data.includeAuthorizedOperations
+
+ if (!isStreamsGroupProtocolEnabled()) {
+ // The API is not supported by the "old" group coordinator (the default). If the
+ // new one is not enabled, we fail directly here.
+ requestHelper.sendMaybeThrottle(request, request.body[StreamsGroupDescribeRequest].getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
+ CompletableFuture.completedFuture[Unit](())
+ } else {
+ val response = new StreamsGroupDescribeResponseData()
+
+ val authorizedGroups = new ArrayBuffer[String]()
+ streamsGroupDescribeRequest.data.groupIds.forEach { groupId =>
+ if (!authHelper.authorize(request.context, DESCRIBE, GROUP, groupId)) {
+ response.groups.add(new StreamsGroupDescribeResponseData.DescribedGroup()
+ .setGroupId(groupId)
+ .setErrorCode(Errors.GROUP_AUTHORIZATION_FAILED.code)
+ )
+ } else {
+ authorizedGroups += groupId
+ }
+ }
+
+ groupCoordinator.streamsGroupDescribe(
+ request.context,
+ authorizedGroups.asJava
+ ).handle[Unit] { (results, exception) =>
+ if (exception != null) {
+ requestHelper.sendMaybeThrottle(request, streamsGroupDescribeRequest.getErrorResponse(exception))
+ } else {
+ if (includeAuthorizedOperations) {
+ results.forEach { groupResult =>
+ if (groupResult.errorCode == Errors.NONE.code) {
+ groupResult.setAuthorizedOperations(authHelper.authorizedOperations(
+ request,
+ new Resource(ResourceType.GROUP, groupResult.groupId)
+ ))
+ }
+ }
+ }
+
+ if (response.groups.isEmpty) {
+ // If the response is empty, we can directly reuse the results.
+ response.setGroups(results)
+ } else {
+ // Otherwise, we have to copy the results into the existing ones.
+ response.groups.addAll(results)
+ }
+
+ requestHelper.sendMaybeThrottle(request, new StreamsGroupDescribeResponse(response))
+ }
+ }
+ }
+
+ }
+
def handleGetTelemetrySubscriptionsRequest(request: RequestChannel.Request): Unit = {
val subscriptionRequest = request.body[GetTelemetrySubscriptionsRequest]
try {
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 4940791beab..3711f074917 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -411,6 +411,13 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
warn(s"Share groups and the new '${GroupType.SHARE}' rebalance protocol are enabled. " +
"This is part of the early access of KIP-932 and MUST NOT be used in production.")
}
+ if (protocols.contains(GroupType.STREAMS)) {
+ if (processRoles.isEmpty || !isNewGroupCoordinatorEnabled) {
+ throw new ConfigException(s"The new '${GroupType.STREAMS}' rebalance protocol is only supported in KRaft cluster with the new group coordinator.")
+ }
+ warn(s"The new '${GroupType.STREAMS}' rebalance protocol is enabled along with the new group coordinator. " +
+ "This is part of the preview of KIP-1071 and MUST NOT be used in production.")
+ }
protocols
}
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala
index 308617df421..d4c4e52f70e 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala
@@ -19,7 +19,7 @@ package kafka.coordinator.group
import kafka.coordinator.group.GroupCoordinatorConcurrencyTest.{JoinGroupCallback, SyncGroupCallback}
import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
import org.apache.kafka.common.errors.{InvalidGroupIdException, UnsupportedVersionException}
-import org.apache.kafka.common.message.{ConsumerGroupHeartbeatRequestData, DeleteGroupsResponseData, DescribeGroupsResponseData, DescribeShareGroupOffsetsRequestData, HeartbeatRequestData, HeartbeatResponseData, JoinGroupRequestData, JoinGroupResponseData, LeaveGroupRequestData, LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteRequestData, OffsetDeleteResponseData, OffsetFetchRequestData, OffsetFetchResponseData, ShareGroupHeartbeatRequestData, SyncGroupRequestData, SyncGroupResponseData, TxnOffsetCommitRequestData, TxnOffsetCommitResponseData}
+import org.apache.kafka.common.message.{ConsumerGroupHeartbeatRequestData, DeleteGroupsResponseData, DescribeGroupsResponseData, DescribeShareGroupOffsetsRequestData, HeartbeatRequestData, HeartbeatResponseData, JoinGroupRequestData, JoinGroupResponseData, LeaveGroupRequestData, LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteRequestData, OffsetDeleteResponseData, OffsetFetchRequestData, OffsetFetchResponseData, ShareGroupHeartbeatRequestData, StreamsGroupHeartbeatRequestData, SyncGroupRequestData, SyncGroupResponseData, TxnOffsetCommitRequestData, TxnOffsetCommitResponseData}
import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocol
import org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember
import org.apache.kafka.common.message.OffsetDeleteRequestData.{OffsetDeleteRequestPartition, OffsetDeleteRequestTopic, OffsetDeleteRequestTopicCollection}
@@ -79,6 +79,22 @@ class GroupCoordinatorAdapterTest {
assertFutureThrows(classOf[UnsupportedVersionException], future)
}
+ @Test
+ def testStreamsGroupHeartbeat(): Unit = {
+ val groupCoordinator = mock(classOf[GroupCoordinator])
+ val adapter = new GroupCoordinatorAdapter(groupCoordinator, Time.SYSTEM)
+
+ val ctx = makeContext(ApiKeys.STREAMS_GROUP_HEARTBEAT, ApiKeys.STREAMS_GROUP_HEARTBEAT.latestVersion)
+ val request = new StreamsGroupHeartbeatRequestData()
+ .setGroupId("group")
+
+ val future = adapter.streamsGroupHeartbeat(ctx, request)
+
+ assertTrue(future.isDone)
+ assertTrue(future.isCompletedExceptionally)
+ assertFutureThrows(classOf[UnsupportedVersionException], future)
+ }
+
@Test
def testJoinShareGroup(): Unit = {
val groupCoordinator = mock(classOf[GroupCoordinator])
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 5f30cfd83e3..c31a75ad8da 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -75,6 +75,7 @@ import org.apache.kafka.common.utils.{ImplicitLinkedHashCollection, ProducerIdAn
import org.apache.kafka.coordinator.group.GroupConfig.{CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, CONSUMER_SESSION_TIMEOUT_MS_CONFIG, SHARE_AUTO_OFFSET_RESET_CONFIG, SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, SHARE_RECORD_LOCK_DURATION_MS_CONFIG, SHARE_SESSION_TIMEOUT_MS_CONFIG}
import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig
import org.apache.kafka.coordinator.group.{GroupConfig, GroupCoordinator, GroupCoordinatorConfig}
+import org.apache.kafka.coordinator.group.streams.StreamsGroupHeartbeatResult
import org.apache.kafka.coordinator.share.{ShareCoordinator, ShareCoordinatorTestConfig}
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.image.{MetadataDelta, MetadataImage, MetadataProvenance}
@@ -2408,7 +2409,7 @@ class KafkaApisTest extends Logging {
val markersResponse = capturedResponse.getValue
assertEquals(2, markersResponse.errorsByProducerId.size())
}
-
+
@Test
def shouldRespondWithUnknownTopicOrPartitionForBadPartitionAndNoErrorsForGoodPartition(): Unit = {
val tp1 = new TopicPartition("t", 0)
@@ -9871,6 +9872,174 @@ class KafkaApisTest extends Logging {
assertEquals(Errors.GROUP_AUTHORIZATION_FAILED.code, response.data.errorCode)
}
+ @Test
+ def testStreamsGroupHeartbeatReturnsUnsupportedVersion(): Unit = {
+ val streamsGroupHeartbeatRequest = new StreamsGroupHeartbeatRequestData().setGroupId("group")
+
+ val requestChannelRequest = buildRequest(new StreamsGroupHeartbeatRequest.Builder(streamsGroupHeartbeatRequest).build())
+ metadataCache = {
+ val cache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_1)
+ val delta = new MetadataDelta(MetadataImage.EMPTY);
+ delta.replay(new FeatureLevelRecord()
+ .setName(MetadataVersion.FEATURE_NAME)
+ .setFeatureLevel(MetadataVersion.MINIMUM_VERSION.featureLevel())
+ )
+ cache.setImage(delta.apply(MetadataProvenance.EMPTY))
+ cache
+ }
+ kafkaApis = createKafkaApis()
+ kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
+
+ val expectedHeartbeatResponse = new StreamsGroupHeartbeatResponseData()
+ .setErrorCode(Errors.UNSUPPORTED_VERSION.code)
+ val response = verifyNoThrottling[StreamsGroupHeartbeatResponse](requestChannelRequest)
+ assertEquals(expectedHeartbeatResponse, response.data)
+ }
+
+ @Test
+ def testStreamsGroupHeartbeatRequest(): Unit = {
+ metadataCache = mock(classOf[KRaftMetadataCache])
+
+ val streamsGroupHeartbeatRequest = new StreamsGroupHeartbeatRequestData().setGroupId("group")
+
+ val requestChannelRequest = buildRequest(new StreamsGroupHeartbeatRequest.Builder(streamsGroupHeartbeatRequest, true).build())
+
+ val future = new CompletableFuture[StreamsGroupHeartbeatResult]()
+ when(groupCoordinator.streamsGroupHeartbeat(
+ requestChannelRequest.context,
+ streamsGroupHeartbeatRequest
+ )).thenReturn(future)
+ kafkaApis = createKafkaApis(
+ overrideProperties = Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> "classic,streams")
+ )
+ kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
+
+ val streamsGroupHeartbeatResponse = new StreamsGroupHeartbeatResponseData()
+ .setMemberId("member")
+
+ future.complete(new StreamsGroupHeartbeatResult(streamsGroupHeartbeatResponse, Collections.emptyMap()))
+ val response = verifyNoThrottling[StreamsGroupHeartbeatResponse](requestChannelRequest)
+ assertEquals(streamsGroupHeartbeatResponse, response.data)
+ }
+
+ @Test
+ def testStreamsGroupHeartbeatRequestFutureFailed(): Unit = {
+ metadataCache = mock(classOf[KRaftMetadataCache])
+
+ val streamsGroupHeartbeatRequest = new StreamsGroupHeartbeatRequestData().setGroupId("group")
+
+ val requestChannelRequest = buildRequest(new StreamsGroupHeartbeatRequest.Builder(streamsGroupHeartbeatRequest, true).build())
+
+ val future = new CompletableFuture[StreamsGroupHeartbeatResult]()
+ when(groupCoordinator.streamsGroupHeartbeat(
+ requestChannelRequest.context,
+ streamsGroupHeartbeatRequest
+ )).thenReturn(future)
+ kafkaApis = createKafkaApis(
+ overrideProperties = Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> "classic,streams")
+ )
+ kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
+
+ future.completeExceptionally(Errors.FENCED_MEMBER_EPOCH.exception)
+ val response = verifyNoThrottling[StreamsGroupHeartbeatResponse](requestChannelRequest)
+ assertEquals(Errors.FENCED_MEMBER_EPOCH.code, response.data.errorCode)
+ }
+
+ @Test
+ def testStreamsGroupHeartbeatRequestAuthorizationFailed(): Unit = {
+ metadataCache = mock(classOf[KRaftMetadataCache])
+
+ val streamsGroupHeartbeatRequest = new StreamsGroupHeartbeatRequestData().setGroupId("group")
+
+ val requestChannelRequest = buildRequest(new StreamsGroupHeartbeatRequest.Builder(streamsGroupHeartbeatRequest, true).build())
+
+ val authorizer: Authorizer = mock(classOf[Authorizer])
+ when(authorizer.authorize(any[RequestContext], any[util.List[Action]]))
+ .thenReturn(Seq(AuthorizationResult.DENIED).asJava)
+ kafkaApis = createKafkaApis(
+ authorizer = Some(authorizer),
+ overrideProperties = Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> "classic,streams")
+ )
+ kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
+
+ val response = verifyNoThrottling[StreamsGroupHeartbeatResponse](requestChannelRequest)
+ assertEquals(Errors.GROUP_AUTHORIZATION_FAILED.code, response.data.errorCode)
+ }
+
+ @Test
+ def testStreamsGroupHeartbeatRequestProtocolDisabled(): Unit = {
+ metadataCache = mock(classOf[KRaftMetadataCache])
+
+ val streamsGroupHeartbeatRequest = new StreamsGroupHeartbeatRequestData().setGroupId("group")
+
+ val requestChannelRequest = buildRequest(new StreamsGroupHeartbeatRequest.Builder(streamsGroupHeartbeatRequest, true).build())
+
+ kafkaApis = createKafkaApis(
+ overrideProperties = Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> "classic,consumer")
+ )
+ kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
+
+ val response = verifyNoThrottling[StreamsGroupHeartbeatResponse](requestChannelRequest)
+ assertEquals(Errors.UNSUPPORTED_VERSION.code, response.data.errorCode)
+ }
+
+ @Test
+ def testStreamsGroupHeartbeatRequestInvalidTopicNames(): Unit = {
+ metadataCache = mock(classOf[KRaftMetadataCache])
+
+ val streamsGroupHeartbeatRequest = new StreamsGroupHeartbeatRequestData().setGroupId("group").setTopology(
+ new StreamsGroupHeartbeatRequestData.Topology()
+ .setEpoch(3)
+ .setSubtopologies(
+ Collections.singletonList(new StreamsGroupHeartbeatRequestData.Subtopology().setSubtopologyId("subtopology")
+ .setSourceTopics(Collections.singletonList("a "))
+ .setRepartitionSinkTopics(Collections.singletonList("b?"))
+ .setRepartitionSourceTopics(Collections.singletonList(new StreamsGroupHeartbeatRequestData.TopicInfo().setName("c!")))
+ .setStateChangelogTopics(Collections.singletonList(new StreamsGroupHeartbeatRequestData.TopicInfo().setName("d/")))
+ )
+ )
+ )
+
+ val requestChannelRequest = buildRequest(new StreamsGroupHeartbeatRequest.Builder(streamsGroupHeartbeatRequest, true).build())
+
+ kafkaApis = createKafkaApis(
+ overrideProperties = Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> "classic,streams")
+ )
+ kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
+
+ val response = verifyNoThrottling[StreamsGroupHeartbeatResponse](requestChannelRequest)
+ assertEquals(Errors.STREAMS_INVALID_TOPOLOGY.code, response.data.errorCode)
+ assertEquals("Topic names a ,b?,c!,d/ are not valid topic names.", response.data.errorMessage())
+ }
+
+ @Test
+ def testStreamsGroupHeartbeatRequestInternalTopicNames(): Unit = {
+ metadataCache = mock(classOf[KRaftMetadataCache])
+
+ val streamsGroupHeartbeatRequest = new StreamsGroupHeartbeatRequestData().setGroupId("group").setTopology(
+ new StreamsGroupHeartbeatRequestData.Topology()
+ .setEpoch(3)
+ .setSubtopologies(
+ Collections.singletonList(new StreamsGroupHeartbeatRequestData.Subtopology().setSubtopologyId("subtopology")
+ .setSourceTopics(Collections.singletonList("__consumer_offsets"))
+ .setRepartitionSinkTopics(Collections.singletonList("__transaction_state"))
+ .setRepartitionSourceTopics(Collections.singletonList(new StreamsGroupHeartbeatRequestData.TopicInfo().setName("__share_group_state")))
+ )
+ )
+ )
+
+ val requestChannelRequest = buildRequest(new StreamsGroupHeartbeatRequest.Builder(streamsGroupHeartbeatRequest, true).build())
+
+ kafkaApis = createKafkaApis(
+ overrideProperties = Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> "classic,streams")
+ )
+ kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
+
+ val response = verifyNoThrottling[StreamsGroupHeartbeatResponse](requestChannelRequest)
+ assertEquals(Errors.STREAMS_INVALID_TOPOLOGY.code, response.data.errorCode)
+ assertEquals("Use of Kafka internal topics __consumer_offsets,__transaction_state,__share_group_state in a Kafka Streams topology is prohibited.", response.data.errorMessage())
+ }
+
@ParameterizedTest
@ValueSource(booleans = Array(true, false))
def testConsumerGroupDescribe(includeAuthorizedOperations: Boolean): Unit = {
@@ -9998,6 +10167,133 @@ class KafkaApisTest extends Logging {
assertEquals(Errors.FENCED_MEMBER_EPOCH.code, response.data.groups.get(0).errorCode)
}
+ @ParameterizedTest
+ @ValueSource(booleans = Array(true, false))
+ def testStreamsGroupDescribe(includeAuthorizedOperations: Boolean): Unit = {
+ metadataCache = mock(classOf[KRaftMetadataCache])
+
+ val groupIds = List("group-id-0", "group-id-1", "group-id-2").asJava
+ val streamsGroupDescribeRequestData = new StreamsGroupDescribeRequestData()
+ .setIncludeAuthorizedOperations(includeAuthorizedOperations)
+ streamsGroupDescribeRequestData.groupIds.addAll(groupIds)
+ val requestChannelRequest = buildRequest(new StreamsGroupDescribeRequest.Builder(streamsGroupDescribeRequestData, true).build())
+
+ val future = new CompletableFuture[util.List[StreamsGroupDescribeResponseData.DescribedGroup]]()
+ when(groupCoordinator.streamsGroupDescribe(
+ any[RequestContext],
+ any[util.List[String]]
+ )).thenReturn(future)
+ kafkaApis = createKafkaApis(
+ overrideProperties = Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> "classic,streams")
+ )
+ kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
+
+ future.complete(List(
+ new StreamsGroupDescribeResponseData.DescribedGroup().setGroupId(groupIds.get(0)),
+ new StreamsGroupDescribeResponseData.DescribedGroup().setGroupId(groupIds.get(1)),
+ new StreamsGroupDescribeResponseData.DescribedGroup().setGroupId(groupIds.get(2))
+ ).asJava)
+
+ var authorizedOperationsInt = Int.MinValue;
+ if (includeAuthorizedOperations) {
+ authorizedOperationsInt = Utils.to32BitField(
+ AclEntry.supportedOperations(ResourceType.GROUP).asScala
+ .map(_.code.asInstanceOf[JByte]).asJava)
+ }
+
+ // Can't reuse the above list here because we would not test the implementation in KafkaApis then
+ val describedGroups = List(
+ new StreamsGroupDescribeResponseData.DescribedGroup().setGroupId(groupIds.get(0)),
+ new StreamsGroupDescribeResponseData.DescribedGroup().setGroupId(groupIds.get(1)),
+ new StreamsGroupDescribeResponseData.DescribedGroup().setGroupId(groupIds.get(2))
+ ).map(group => group.setAuthorizedOperations(authorizedOperationsInt))
+ val expectedStreamsGroupDescribeResponseData = new StreamsGroupDescribeResponseData()
+ .setGroups(describedGroups.asJava)
+
+ val response = verifyNoThrottling[StreamsGroupDescribeResponse](requestChannelRequest)
+
+ assertEquals(expectedStreamsGroupDescribeResponseData, response.data)
+ }
+
+ @Test
+ def testStreamsGroupDescribeReturnsUnsupportedVersion(): Unit = {
+ val groupId = "group0"
+ val streamsGroupDescribeRequestData = new StreamsGroupDescribeRequestData()
+ streamsGroupDescribeRequestData.groupIds.add(groupId)
+ val requestChannelRequest = buildRequest(new StreamsGroupDescribeRequest.Builder(streamsGroupDescribeRequestData, true).build())
+
+ val errorCode = Errors.UNSUPPORTED_VERSION.code
+ val expectedDescribedGroup = new StreamsGroupDescribeResponseData.DescribedGroup().setGroupId(groupId).setErrorCode(errorCode)
+ val expectedResponse = new StreamsGroupDescribeResponseData()
+ expectedResponse.groups.add(expectedDescribedGroup)
+ metadataCache = {
+ val cache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_1)
+ val delta = new MetadataDelta(MetadataImage.EMPTY);
+ delta.replay(new FeatureLevelRecord()
+ .setName(MetadataVersion.FEATURE_NAME)
+ .setFeatureLevel(MetadataVersion.MINIMUM_VERSION.featureLevel())
+ )
+ cache.setImage(delta.apply(MetadataProvenance.EMPTY))
+ cache
+ }
+ kafkaApis = createKafkaApis()
+ kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
+ val response = verifyNoThrottling[StreamsGroupDescribeResponse](requestChannelRequest)
+
+ assertEquals(expectedResponse, response.data)
+ }
+
+ @Test
+ def testStreamsGroupDescribeAuthorizationFailed(): Unit = {
+ metadataCache = mock(classOf[KRaftMetadataCache])
+
+ val streamsGroupDescribeRequestData = new StreamsGroupDescribeRequestData()
+ streamsGroupDescribeRequestData.groupIds.add("group-id")
+ val requestChannelRequest = buildRequest(new StreamsGroupDescribeRequest.Builder(streamsGroupDescribeRequestData, true).build())
+
+ val authorizer: Authorizer = mock(classOf[Authorizer])
+ when(authorizer.authorize(any[RequestContext], any[util.List[Action]]))
+ .thenReturn(Seq(AuthorizationResult.DENIED).asJava)
+
+ val future = new CompletableFuture[util.List[StreamsGroupDescribeResponseData.DescribedGroup]]()
+ when(groupCoordinator.streamsGroupDescribe(
+ any[RequestContext],
+ any[util.List[String]]
+ )).thenReturn(future)
+ future.complete(List().asJava)
+ kafkaApis = createKafkaApis(
+ authorizer = Some(authorizer),
+ overrideProperties = Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> "classic,streams")
+ )
+ kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
+
+ val response = verifyNoThrottling[StreamsGroupDescribeResponse](requestChannelRequest)
+ assertEquals(Errors.GROUP_AUTHORIZATION_FAILED.code, response.data.groups.get(0).errorCode)
+ }
+
+ @Test
+ def testStreamsGroupDescribeFutureFailed(): Unit = {
+ metadataCache = mock(classOf[KRaftMetadataCache])
+
+ val streamsGroupDescribeRequestData = new StreamsGroupDescribeRequestData()
+ streamsGroupDescribeRequestData.groupIds.add("group-id")
+ val requestChannelRequest = buildRequest(new StreamsGroupDescribeRequest.Builder(streamsGroupDescribeRequestData, true).build())
+
+ val future = new CompletableFuture[util.List[StreamsGroupDescribeResponseData.DescribedGroup]]()
+ when(groupCoordinator.streamsGroupDescribe(
+ any[RequestContext],
+ any[util.List[String]]
+ )).thenReturn(future)
+ kafkaApis = createKafkaApis(
+ overrideProperties = Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> "classic,streams")
+ )
+ kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
+
+ future.completeExceptionally(Errors.FENCED_MEMBER_EPOCH.exception)
+ val response = verifyNoThrottling[StreamsGroupDescribeResponse](requestChannelRequest)
+ assertEquals(Errors.FENCED_MEMBER_EPOCH.code, response.data.groups.get(0).errorCode)
+ }
+
@Test
def testGetTelemetrySubscriptions(): Unit = {
val request = buildRequest(new GetTelemetrySubscriptionsRequest.Builder(
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 343ffa3a00a..797fdce5451 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -1753,6 +1753,11 @@ class KafkaConfigTest {
assertEquals(Set(GroupType.CLASSIC, GroupType.CONSUMER, GroupType.SHARE), config.groupCoordinatorRebalanceProtocols)
assertTrue(config.isNewGroupCoordinatorEnabled)
assertTrue(config.shareGroupConfig.isShareGroupEnabled)
+
+ props.put(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, "classic,streams")
+ val config2 = KafkaConfig.fromProps(props)
+ assertEquals(Set(GroupType.CLASSIC, GroupType.STREAMS), config2.groupCoordinatorRebalanceProtocols)
+ assertTrue(config2.isNewGroupCoordinatorEnabled)
}
@Test
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
index 6e191b0f927..47269bad3a3 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
@@ -41,6 +41,8 @@ import org.apache.kafka.common.message.OffsetFetchResponseData;
import org.apache.kafka.common.message.ShareGroupDescribeResponseData;
import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData;
import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData;
+import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
+import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData;
import org.apache.kafka.common.message.SyncGroupRequestData;
import org.apache.kafka.common.message.SyncGroupResponseData;
import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
@@ -48,6 +50,7 @@ import org.apache.kafka.common.message.TxnOffsetCommitResponseData;
import org.apache.kafka.common.requests.RequestContext;
import org.apache.kafka.common.requests.TransactionResult;
import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.coordinator.group.streams.StreamsGroupHeartbeatResult;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
@@ -84,6 +87,20 @@ public interface GroupCoordinator {
ConsumerGroupHeartbeatRequestData request
);
+ /**
+ * Heartbeat to a Streams Group.
+ *
+ * @param context The request context.
+ * @param request The StreamsGroupHeartbeatResponseData data.
+ *
+ * @return A future yielding the response together with internal topics to create.
+ * The error code(s) of the response are set to indicate the error(s) occurred during the execution.
+ */
+ CompletableFuture streamsGroupHeartbeat(
+ RequestContext context,
+ StreamsGroupHeartbeatRequestData request
+ );
+
/**
* Heartbeat to a Share Group.
*
@@ -199,6 +216,19 @@ public interface GroupCoordinator {
List groupIds
);
+ /**
+ * Describe streams groups.
+ *
+ * @param context The coordinator request context.
+ * @param groupIds The group ids.
+ *
+ * @return A future yielding the results or an exception.
+ */
+ CompletableFuture> streamsGroupDescribe(
+ RequestContext context,
+ List groupIds
+ );
+
/**
* Describe share groups.
*
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
index 286eef4af02..0e7004ba40c 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
@@ -48,6 +48,9 @@ import org.apache.kafka.common.message.ShareGroupDescribeResponseData;
import org.apache.kafka.common.message.ShareGroupDescribeResponseData.DescribedGroup;
import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData;
import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData;
+import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
+import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData;
+import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
import org.apache.kafka.common.message.SyncGroupRequestData;
import org.apache.kafka.common.message.SyncGroupResponseData;
import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
@@ -61,6 +64,7 @@ import org.apache.kafka.common.requests.DescribeShareGroupOffsetsRequest;
import org.apache.kafka.common.requests.OffsetCommitRequest;
import org.apache.kafka.common.requests.RequestContext;
import org.apache.kafka.common.requests.ShareGroupDescribeRequest;
+import org.apache.kafka.common.requests.StreamsGroupDescribeRequest;
import org.apache.kafka.common.requests.TransactionResult;
import org.apache.kafka.common.requests.TxnOffsetCommitRequest;
import org.apache.kafka.common.utils.BufferSupplier;
@@ -77,6 +81,7 @@ import org.apache.kafka.coordinator.common.runtime.CoordinatorShardBuilderSuppli
import org.apache.kafka.coordinator.common.runtime.MultiThreadedEventProcessor;
import org.apache.kafka.coordinator.common.runtime.PartitionWriter;
import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics;
+import org.apache.kafka.coordinator.group.streams.StreamsGroupHeartbeatResult;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.server.record.BrokerCompressionType;
@@ -373,6 +378,44 @@ public class GroupCoordinatorService implements GroupCoordinator {
));
}
+ /**
+ * See
+ * {@link GroupCoordinator#streamsGroupHeartbeat(RequestContext, org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData)}.
+ */
+ @Override
+ public CompletableFuture streamsGroupHeartbeat(
+ RequestContext context,
+ StreamsGroupHeartbeatRequestData request
+ ) {
+ if (!isActive.get()) {
+ return CompletableFuture.completedFuture(
+ new StreamsGroupHeartbeatResult(
+ new StreamsGroupHeartbeatResponseData().setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()),
+ Collections.emptyMap()
+ )
+ );
+ }
+
+ return runtime.scheduleWriteOperation(
+ "streams-group-heartbeat",
+ topicPartitionFor(request.groupId()),
+ Duration.ofMillis(config.offsetCommitTimeoutMs()),
+ coordinator -> coordinator.streamsGroupHeartbeat(context, request)
+ ).exceptionally(exception -> handleOperationException(
+ "streams-group-heartbeat",
+ request,
+ exception,
+ (error, message) ->
+ new StreamsGroupHeartbeatResult(
+ new StreamsGroupHeartbeatResponseData()
+ .setErrorCode(error.code())
+ .setErrorMessage(message),
+ Collections.emptyMap()
+ ),
+ log
+ ));
+ }
+
/**
* See {@link GroupCoordinator#shareGroupHeartbeat(RequestContext, ShareGroupHeartbeatRequestData)}.
*/
@@ -690,6 +733,58 @@ public class GroupCoordinatorService implements GroupCoordinator {
return FutureUtils.combineFutures(futures, ArrayList::new, List::addAll);
}
+ /**
+ * See {@link GroupCoordinator#streamsGroupDescribe(RequestContext, List)}.
+ */
+ @Override
+ public CompletableFuture> streamsGroupDescribe(
+ RequestContext context,
+ List groupIds
+ ) {
+ if (!isActive.get()) {
+ return CompletableFuture.completedFuture(StreamsGroupDescribeRequest.getErrorDescribedGroupList(
+ groupIds,
+ Errors.COORDINATOR_NOT_AVAILABLE
+ ));
+ }
+
+ final List>> futures =
+ new ArrayList<>(groupIds.size());
+ final Map> groupsByTopicPartition = new HashMap<>();
+ groupIds.forEach(groupId -> {
+ if (isGroupIdNotEmpty(groupId)) {
+ groupsByTopicPartition
+ .computeIfAbsent(topicPartitionFor(groupId), __ -> new ArrayList<>())
+ .add(groupId);
+ } else {
+ futures.add(CompletableFuture.completedFuture(Collections.singletonList(
+ new StreamsGroupDescribeResponseData.DescribedGroup()
+ .setGroupId(null)
+ .setErrorCode(Errors.INVALID_GROUP_ID.code())
+ )));
+ }
+ });
+
+ groupsByTopicPartition.forEach((topicPartition, groupList) -> {
+ CompletableFuture> future =
+ runtime.scheduleReadOperation(
+ "streams-group-describe",
+ topicPartition,
+ (coordinator, lastCommittedOffset) -> coordinator.streamsGroupDescribe(groupIds, lastCommittedOffset)
+ ).exceptionally(exception -> handleOperationException(
+ "streams-group-describe",
+ groupList,
+ exception,
+ (error, __) -> StreamsGroupDescribeRequest.getErrorDescribedGroupList(groupList, error),
+ log
+ ));
+
+ futures.add(future);
+ });
+
+ return FutureUtils.combineFutures(futures, ArrayList::new, List::addAll);
+ }
+
/**
* See {@link GroupCoordinator#shareGroupDescribe(RequestContext, List)}.
*/
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
index fa6e0c382cc..1cea887a000 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
@@ -42,6 +42,8 @@ import org.apache.kafka.common.message.OffsetFetchResponseData;
import org.apache.kafka.common.message.ShareGroupDescribeResponseData;
import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData;
import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData;
+import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
+import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData;
import org.apache.kafka.common.message.SyncGroupRequestData;
import org.apache.kafka.common.message.SyncGroupResponseData;
import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
@@ -110,6 +112,7 @@ import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics;
import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard;
import org.apache.kafka.coordinator.group.modern.share.ShareGroup;
+import org.apache.kafka.coordinator.group.streams.StreamsGroupHeartbeatResult;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.server.common.ApiMessageAndVersion;
@@ -374,6 +377,22 @@ public class GroupCoordinatorShard implements CoordinatorShard streamsGroupHeartbeat(
+ RequestContext context,
+ StreamsGroupHeartbeatRequestData request
+ ) {
+ return groupMetadataManager.streamsGroupHeartbeat(context, request);
+ }
+
/**
* Handles a ShareGroupHeartbeat request.
*
@@ -626,6 +645,21 @@ public class GroupCoordinatorShard implements CoordinatorShard streamsGroupDescribe(
+ List groupIds,
+ long committedOffset
+ ) {
+ return groupMetadataManager.streamsGroupDescribe(groupIds, committedOffset);
+ }
+
/**
* Handles a ShareGroupDescribe request.
*
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 059ddebee5e..be07edf0501 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -54,6 +54,8 @@ import org.apache.kafka.common.message.ListGroupsResponseData;
import org.apache.kafka.common.message.ShareGroupDescribeResponseData;
import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData;
import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData;
+import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
+import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData;
import org.apache.kafka.common.message.SyncGroupRequestData;
import org.apache.kafka.common.message.SyncGroupResponseData;
import org.apache.kafka.common.protocol.Errors;
@@ -133,6 +135,7 @@ import org.apache.kafka.coordinator.group.modern.share.ShareGroup;
import org.apache.kafka.coordinator.group.modern.share.ShareGroupAssignmentBuilder;
import org.apache.kafka.coordinator.group.modern.share.ShareGroupMember;
import org.apache.kafka.coordinator.group.streams.StreamsGroup;
+import org.apache.kafka.coordinator.group.streams.StreamsGroupHeartbeatResult;
import org.apache.kafka.coordinator.group.streams.StreamsGroupMember;
import org.apache.kafka.coordinator.group.streams.StreamsTopology;
import org.apache.kafka.coordinator.group.streams.TasksTuple;
@@ -615,6 +618,35 @@ public class GroupMetadataManager {
return describedGroups;
}
+ /**
+ * Handles a StreamsGroupDescribe request.
+ *
+ * @param groupIds The IDs of the groups to describe.
+ * @param committedOffset A specified committed offset corresponding to this shard.
+ *
+ * @return A list containing the StreamsGroupDescribeResponseData.DescribedGroup.
+ * If a group is not found, the DescribedGroup will contain the error code and message.
+ */
+ public List streamsGroupDescribe(
+ List groupIds,
+ long committedOffset
+ ) {
+ final List describedGroups = new ArrayList<>();
+ groupIds.forEach(groupId -> {
+ try {
+ describedGroups.add(streamsGroup(groupId, committedOffset).asDescribedGroup(committedOffset));
+ } catch (GroupIdNotFoundException exception) {
+ describedGroups.add(new StreamsGroupDescribeResponseData.DescribedGroup()
+ .setGroupId(groupId)
+ .setErrorCode(Errors.GROUP_ID_NOT_FOUND.code())
+ .setErrorMessage(exception.getMessage())
+ );
+ }
+ });
+
+ return describedGroups;
+ }
+
/**
* Handles a DescribeGroup request.
*
@@ -3787,6 +3819,22 @@ public class GroupMetadataManager {
}
}
+ /**
+ * Handles a StreamsGroupHeartbeat request.
+ *
+ * @param context The request context.
+ * @param request The actual StreamsGroupHeartbeat request.
+ *
+ * @return A Result containing the StreamsGroupHeartbeat response, a list of internal topics to create and
+ * a list of records to update the state machine.
+ */
+ public CoordinatorResult streamsGroupHeartbeat(
+ RequestContext context,
+ StreamsGroupHeartbeatRequestData request
+ ) throws ApiException {
+ throw new UnsupportedOperationException("StreamsGroupHeartbeat is not implemented yet.");
+ }
+
/**
* Replays StreamsGroupTopologyKey/Value to update the hard state of
* the streams group.
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupHeartbeatResult.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupHeartbeatResult.java
new file mode 100644
index 00000000000..736492f063a
--- /dev/null
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupHeartbeatResult.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams;
+
+import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
+import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * A simple record to hold the result of a StreamsGroupHeartbeat request.
+ *
+ * @param data The data to be returned to the client.
+ * @param creatableTopics The internal topics to be created.
+ */
+public record StreamsGroupHeartbeatResult(StreamsGroupHeartbeatResponseData data, Map creatableTopics) {
+
+ public StreamsGroupHeartbeatResult {
+ Objects.requireNonNull(data);
+ creatableTopics = Objects.requireNonNull(Collections.unmodifiableMap(creatableTopics));
+ }
+
+}
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMember.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMember.java
index 612e72fabdd..69d2eae8831 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMember.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMember.java
@@ -355,10 +355,10 @@ public record StreamsGroupMember(String memberId,
private static List taskIdsFromMap(Map> tasks) {
List taskIds = new ArrayList<>();
- tasks.forEach((subtopologyId, partitionSet) -> {
+ tasks.keySet().stream().sorted().forEach(subtopologyId -> {
taskIds.add(new StreamsGroupDescribeResponseData.TaskIds()
.setSubtopologyId(subtopologyId)
- .setPartitions(new ArrayList<>(partitionSet)));
+ .setPartitions(tasks.get(subtopologyId).stream().sorted().toList()));
});
return taskIds;
}
diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
index ccbefba35b6..4d3e79e395b 100644
--- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
+++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
@@ -32,6 +32,9 @@ import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
import org.apache.kafka.common.errors.RebalanceInProgressException;
import org.apache.kafka.common.errors.RecordBatchTooLargeException;
import org.apache.kafka.common.errors.RecordTooLargeException;
+import org.apache.kafka.common.errors.StreamsInvalidTopologyEpochException;
+import org.apache.kafka.common.errors.StreamsInvalidTopologyException;
+import org.apache.kafka.common.errors.StreamsTopologyFencedException;
import org.apache.kafka.common.errors.UnknownMemberIdException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.internals.Topic;
@@ -59,6 +62,9 @@ import org.apache.kafka.common.message.ReadShareGroupStateSummaryResponseData;
import org.apache.kafka.common.message.ShareGroupDescribeResponseData;
import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData;
import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData;
+import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
+import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData;
+import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
import org.apache.kafka.common.message.SyncGroupRequestData;
import org.apache.kafka.common.message.SyncGroupResponseData;
import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
@@ -78,6 +84,7 @@ import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime;
import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics;
+import org.apache.kafka.coordinator.group.streams.StreamsGroupHeartbeatResult;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.TopicsImage;
import org.apache.kafka.server.record.BrokerCompressionType;
@@ -263,6 +270,118 @@ public class GroupCoordinatorServiceTest {
);
}
+ @Test
+ public void testStreamsGroupHeartbeatWhenNotStarted() throws ExecutionException, InterruptedException {
+ CoordinatorRuntime runtime = mockRuntime();
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .build();
+
+ StreamsGroupHeartbeatRequestData request = new StreamsGroupHeartbeatRequestData()
+ .setGroupId("foo");
+
+ CompletableFuture future = service.streamsGroupHeartbeat(
+ requestContext(ApiKeys.STREAMS_GROUP_HEARTBEAT),
+ request
+ );
+
+ assertEquals(
+ new StreamsGroupHeartbeatResult(
+ new StreamsGroupHeartbeatResponseData().setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()),
+ Collections.emptyMap()
+ ),
+ future.get()
+ );
+ }
+
+ @Test
+ public void testStreamsGroupHeartbeat() throws ExecutionException, InterruptedException, TimeoutException {
+ CoordinatorRuntime runtime = mockRuntime();
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setRuntime(runtime)
+ .setConfig(createConfig())
+ .build(true);
+
+ StreamsGroupHeartbeatRequestData request = new StreamsGroupHeartbeatRequestData()
+ .setGroupId("foo");
+
+ when(runtime.scheduleWriteOperation(
+ ArgumentMatchers.eq("streams-group-heartbeat"),
+ ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
+ ArgumentMatchers.eq(Duration.ofMillis(5000)),
+ ArgumentMatchers.any()
+ )).thenReturn(CompletableFuture.completedFuture(
+ new StreamsGroupHeartbeatResult(
+ new StreamsGroupHeartbeatResponseData(),
+ Collections.emptyMap()
+ )
+ ));
+
+ CompletableFuture future = service.streamsGroupHeartbeat(
+ requestContext(ApiKeys.STREAMS_GROUP_HEARTBEAT),
+ request
+ );
+
+ assertEquals(new StreamsGroupHeartbeatResult(new StreamsGroupHeartbeatResponseData(), Collections.emptyMap()), future.get(5, TimeUnit.SECONDS));
+ }
+
+ private static Stream testStreamsGroupHeartbeatWithExceptionSource() {
+ return Stream.of(
+ Arguments.arguments(new UnknownTopicOrPartitionException(), Errors.COORDINATOR_NOT_AVAILABLE.code(), null),
+ Arguments.arguments(new NotEnoughReplicasException(), Errors.COORDINATOR_NOT_AVAILABLE.code(), null),
+ Arguments.arguments(new org.apache.kafka.common.errors.TimeoutException(), Errors.COORDINATOR_NOT_AVAILABLE.code(), null),
+ Arguments.arguments(new NotLeaderOrFollowerException(), Errors.NOT_COORDINATOR.code(), null),
+ Arguments.arguments(new KafkaStorageException(), Errors.NOT_COORDINATOR.code(), null),
+ Arguments.arguments(new RecordTooLargeException(), Errors.UNKNOWN_SERVER_ERROR.code(), null),
+ Arguments.arguments(new RecordBatchTooLargeException(), Errors.UNKNOWN_SERVER_ERROR.code(), null),
+ Arguments.arguments(new InvalidFetchSizeException(""), Errors.UNKNOWN_SERVER_ERROR.code(), null),
+ Arguments.arguments(new InvalidRequestException("Invalid"), Errors.INVALID_REQUEST.code(), "Invalid"),
+ Arguments.arguments(new StreamsInvalidTopologyException("Invalid"), Errors.STREAMS_INVALID_TOPOLOGY.code(), "Invalid"),
+ Arguments.arguments(new StreamsTopologyFencedException("Invalid"), Errors.STREAMS_TOPOLOGY_FENCED.code(), "Invalid"),
+ Arguments.arguments(new StreamsInvalidTopologyEpochException("Invalid"), Errors.STREAMS_INVALID_TOPOLOGY_EPOCH.code(), "Invalid")
+ );
+ }
+
+ @ParameterizedTest
+ @MethodSource("testStreamsGroupHeartbeatWithExceptionSource")
+ public void testStreamsGroupHeartbeatWithException(
+ Throwable exception,
+ short expectedErrorCode,
+ String expectedErrorMessage
+ ) throws ExecutionException, InterruptedException, TimeoutException {
+ CoordinatorRuntime runtime = mockRuntime();
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setRuntime(runtime)
+ .setConfig(createConfig())
+ .build(true);
+
+ StreamsGroupHeartbeatRequestData request = new StreamsGroupHeartbeatRequestData()
+ .setGroupId("foo");
+
+ when(runtime.scheduleWriteOperation(
+ ArgumentMatchers.eq("streams-group-heartbeat"),
+ ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
+ ArgumentMatchers.eq(Duration.ofMillis(5000)),
+ ArgumentMatchers.any()
+ )).thenReturn(FutureUtils.failedFuture(exception));
+
+ CompletableFuture future = service.streamsGroupHeartbeat(
+ requestContext(ApiKeys.STREAMS_GROUP_HEARTBEAT),
+ request
+ );
+
+ assertEquals(
+ new StreamsGroupHeartbeatResult(
+ new StreamsGroupHeartbeatResponseData()
+ .setErrorCode(expectedErrorCode)
+ .setErrorMessage(expectedErrorMessage),
+ Collections.emptyMap()
+ ),
+ future.get(5, TimeUnit.SECONDS)
+ );
+ }
+
@Test
public void testPartitionFor() {
CoordinatorRuntime runtime = mockRuntime();
@@ -1409,6 +1528,135 @@ public class GroupCoordinatorServiceTest {
);
}
+ @Test
+ public void testStreamsGroupDescribe() throws InterruptedException, ExecutionException {
+ CoordinatorRuntime runtime = mockRuntime();
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .build();
+ int partitionCount = 2;
+ service.startup(() -> partitionCount);
+
+ StreamsGroupDescribeResponseData.DescribedGroup describedGroup1 = new StreamsGroupDescribeResponseData.DescribedGroup()
+ .setGroupId("group-id-1");
+ StreamsGroupDescribeResponseData.DescribedGroup describedGroup2 = new StreamsGroupDescribeResponseData.DescribedGroup()
+ .setGroupId("group-id-2");
+ List expectedDescribedGroups = Arrays.asList(
+ describedGroup1,
+ describedGroup2
+ );
+
+ when(runtime.scheduleReadOperation(
+ ArgumentMatchers.eq("streams-group-describe"),
+ ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+ ArgumentMatchers.any()
+ )).thenReturn(CompletableFuture.completedFuture(Collections.singletonList(describedGroup1)));
+
+ CompletableFuture