diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 5a439539807..8c14a7f28c4 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -258,6 +258,8 @@ class KafkaApis(val requestChannel: RequestChannel, case ApiKeys.LIST_CLIENT_METRICS_RESOURCES => handleListClientMetricsResources(request) case ApiKeys.ADD_RAFT_VOTER => forwardToControllerOrFail(request) case ApiKeys.REMOVE_RAFT_VOTER => forwardToControllerOrFail(request) + case ApiKeys.SHARE_GROUP_HEARTBEAT => handleShareGroupHeartbeat(request).exceptionally(handleError) + case ApiKeys.SHARE_GROUP_DESCRIBE => handleShareGroupDescribe(request).exceptionally(handleError) case ApiKeys.SHARE_FETCH => handleShareFetchRequest(request) case ApiKeys.SHARE_ACKNOWLEDGE => handleShareAcknowledgeRequest(request) case ApiKeys.INITIALIZE_SHARE_GROUP_STATE => handleInitializeShareGroupStateRequest(request) @@ -3955,6 +3957,72 @@ class KafkaApis(val requestChannel: RequestChannel, } } + def handleShareGroupHeartbeat(request: RequestChannel.Request): CompletableFuture[Unit] = { + val shareGroupHeartbeatRequest = request.body[ShareGroupHeartbeatRequest] + + if (!isShareGroupProtocolEnabled) { + requestHelper.sendMaybeThrottle(request, shareGroupHeartbeatRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) + CompletableFuture.completedFuture[Unit](()) + } else if (!authHelper.authorize(request.context, READ, GROUP, shareGroupHeartbeatRequest.data.groupId)) { + requestHelper.sendMaybeThrottle(request, shareGroupHeartbeatRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception)) + CompletableFuture.completedFuture[Unit](()) + } else { + groupCoordinator.shareGroupHeartbeat( + request.context, + shareGroupHeartbeatRequest.data, + ).handle[Unit] { (response, exception) => + + if (exception != null) { + requestHelper.sendMaybeThrottle(request, shareGroupHeartbeatRequest.getErrorResponse(exception)) + } else { + requestHelper.sendMaybeThrottle(request, new ShareGroupHeartbeatResponse(response)) + } + } + } + } + + def handleShareGroupDescribe(request: RequestChannel.Request): CompletableFuture[Unit] = { + val shareGroupDescribeRequest = request.body[ShareGroupDescribeRequest] + + if (!isShareGroupProtocolEnabled) { + requestHelper.sendMaybeThrottle(request, shareGroupDescribeRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) + CompletableFuture.completedFuture[Unit](()) + } else { + val response = new ShareGroupDescribeResponseData() + + val authorizedGroups = new ArrayBuffer[String]() + shareGroupDescribeRequest.data.groupIds.forEach { groupId => + if (!authHelper.authorize(request.context, DESCRIBE, GROUP, groupId)) { + response.groups.add(new ShareGroupDescribeResponseData.DescribedGroup() + .setGroupId(groupId) + .setErrorCode(Errors.GROUP_AUTHORIZATION_FAILED.code) + ) + } else { + authorizedGroups += groupId + } + } + + groupCoordinator.shareGroupDescribe( + request.context, + authorizedGroups.asJava + ).handle[Unit] { (results, exception) => + if (exception != null) { + requestHelper.sendMaybeThrottle(request, shareGroupDescribeRequest.getErrorResponse(exception)) + } else { + if (response.groups.isEmpty) { + // If the response is empty, we can directly reuse the results. + response.setGroups(results) + } else { + // Otherwise, we have to copy the results into the existing ones. + response.groups.addAll(results) + } + + requestHelper.sendMaybeThrottle(request, new ShareGroupDescribeResponse(response)) + } + } + } + } + def handleShareFetchRequest(request: RequestChannel.Request): Unit = { val shareFetchRequest = request.body[ShareFetchRequest] // TODO: Implement the ShareFetchRequest handling @@ -4004,6 +4072,10 @@ class KafkaApis(val requestChannel: RequestChannel, CompletableFuture.completedFuture[Unit](()) } + private def isShareGroupProtocolEnabled: Boolean = { + config.isNewGroupCoordinatorEnabled && config.shareGroupConfig.isShareGroupEnabled + } + private def updateRecordConversionStats(request: RequestChannel.Request, tp: TopicPartition, conversionStats: RecordValidationStats): Unit = { diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index ec819c5011a..73ed8a3844e 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -81,7 +81,7 @@ import org.apache.kafka.server.ClientMetricsManager import org.apache.kafka.server.authorizer.{Action, AuthorizationResult, Authorizer} import org.apache.kafka.server.common.MetadataVersion.{IBP_0_10_2_IV0, IBP_2_2_IV1} import org.apache.kafka.server.common.{FinalizedFeatures, MetadataVersion} -import org.apache.kafka.server.config.{ConfigType, KRaftConfigs, ReplicationConfigs, ServerConfigs, ServerLogConfigs} +import org.apache.kafka.server.config.{ConfigType, KRaftConfigs, ReplicationConfigs, ServerConfigs, ServerLogConfigs, ShareGroupConfig} import org.apache.kafka.server.metrics.ClientMetricsTestUtils import org.apache.kafka.server.util.{FutureUtils, MockTime} import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchParams, FetchPartitionData, LogConfig} @@ -7383,4 +7383,178 @@ class KafkaApisTest extends Logging { val expectedResponse = new ListClientMetricsResourcesResponseData().setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code) assertEquals(expectedResponse, response.data) } + + @Test + def testShareGroupHeartbeatReturnsUnsupportedVersion(): Unit = { + val shareGroupHeartbeatRequest = new ShareGroupHeartbeatRequestData().setGroupId("group") + + val requestChannelRequest = buildRequest(new ShareGroupHeartbeatRequest.Builder(shareGroupHeartbeatRequest, true).build()) + metadataCache = MetadataCache.kRaftMetadataCache(brokerId) + kafkaApis = createKafkaApis(raftSupport = true) + kafkaApis.handle(requestChannelRequest, RequestLocal.NoCaching) + + val expectedHeartbeatResponse = new ShareGroupHeartbeatResponseData() + .setErrorCode(Errors.UNSUPPORTED_VERSION.code) + val response = verifyNoThrottling[ShareGroupHeartbeatResponse](requestChannelRequest) + assertEquals(expectedHeartbeatResponse, response.data) + } + + @Test + def testShareGroupHeartbeatRequest(): Unit = { + val shareGroupHeartbeatRequest = new ShareGroupHeartbeatRequestData().setGroupId("group") + + val requestChannelRequest = buildRequest(new ShareGroupHeartbeatRequest.Builder(shareGroupHeartbeatRequest, true).build()) + + val future = new CompletableFuture[ShareGroupHeartbeatResponseData]() + when(groupCoordinator.shareGroupHeartbeat( + requestChannelRequest.context, + shareGroupHeartbeatRequest + )).thenReturn(future) + metadataCache = MetadataCache.kRaftMetadataCache(brokerId) + kafkaApis = createKafkaApis( + overrideProperties = Map(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true", ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), + raftSupport = true + ) + kafkaApis.handle(requestChannelRequest, RequestLocal.NoCaching) + + val shareGroupHeartbeatResponse = new ShareGroupHeartbeatResponseData() + .setMemberId("member") + + future.complete(shareGroupHeartbeatResponse) + val response = verifyNoThrottling[ShareGroupHeartbeatResponse](requestChannelRequest) + assertEquals(shareGroupHeartbeatResponse, response.data) + } + + @Test + def testShareGroupHeartbeatRequestAuthorizationFailed(): Unit = { + val shareGroupHeartbeatRequest = new ShareGroupHeartbeatRequestData().setGroupId("group") + + val requestChannelRequest = buildRequest(new ShareGroupHeartbeatRequest.Builder(shareGroupHeartbeatRequest, true).build()) + + val authorizer: Authorizer = mock(classOf[Authorizer]) + when(authorizer.authorize(any[RequestContext], any[util.List[Action]])) + .thenReturn(Seq(AuthorizationResult.DENIED).asJava) + metadataCache = MetadataCache.kRaftMetadataCache(brokerId) + kafkaApis = createKafkaApis( + overrideProperties = Map(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true", ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), + authorizer = Some(authorizer), + raftSupport = true + ) + kafkaApis.handle(requestChannelRequest, RequestLocal.NoCaching) + + val response = verifyNoThrottling[ShareGroupHeartbeatResponse](requestChannelRequest) + assertEquals(Errors.GROUP_AUTHORIZATION_FAILED.code, response.data.errorCode) + } + + @Test + def testShareGroupHeartbeatRequestFutureFailed(): Unit = { + val shareGroupHeartbeatRequest = new ShareGroupHeartbeatRequestData().setGroupId("group") + + val requestChannelRequest = buildRequest(new ShareGroupHeartbeatRequest.Builder(shareGroupHeartbeatRequest, true).build()) + + val future = new CompletableFuture[ShareGroupHeartbeatResponseData]() + when(groupCoordinator.shareGroupHeartbeat( + requestChannelRequest.context, + shareGroupHeartbeatRequest + )).thenReturn(future) + metadataCache = MetadataCache.kRaftMetadataCache(brokerId) + kafkaApis = createKafkaApis( + overrideProperties = Map(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true", ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), + raftSupport = true + ) + kafkaApis.handle(requestChannelRequest, RequestLocal.NoCaching) + + future.completeExceptionally(Errors.FENCED_MEMBER_EPOCH.exception) + val response = verifyNoThrottling[ShareGroupHeartbeatResponse](requestChannelRequest) + assertEquals(Errors.FENCED_MEMBER_EPOCH.code, response.data.errorCode) + } + + @Test + def testShareGroupDescribeSuccess(): Unit = { + val groupIds = List("share-group-id-0", "share-group-id-1").asJava + val describedGroups: util.List[ShareGroupDescribeResponseData.DescribedGroup] = List( + new ShareGroupDescribeResponseData.DescribedGroup().setGroupId(groupIds.get(0)), + new ShareGroupDescribeResponseData.DescribedGroup().setGroupId(groupIds.get(1)) + ).asJava + getShareGroupDescribeResponse(groupIds, Map(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true", ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true") + , true, null, describedGroups) + } + + @Test + def testShareGroupDescribeReturnsUnsupportedVersion(): Unit = { + val groupIds = List("share-group-id-0", "share-group-id-1").asJava + val describedGroups: util.List[ShareGroupDescribeResponseData.DescribedGroup] = List( + new ShareGroupDescribeResponseData.DescribedGroup().setGroupId(groupIds.get(0)), + new ShareGroupDescribeResponseData.DescribedGroup().setGroupId(groupIds.get(1)) + ).asJava + val response = getShareGroupDescribeResponse(groupIds, Map.empty, false, null, describedGroups) + assertNotNull(response.data) + assertEquals(2, response.data.groups.size) + response.data.groups.forEach(group => assertEquals(Errors.UNSUPPORTED_VERSION.code(), group.errorCode())) + } + + @Test + def testShareGroupDescribeRequestAuthorizationFailed(): Unit = { + val groupIds = List("share-group-id-0", "share-group-id-1").asJava + val describedGroups: util.List[ShareGroupDescribeResponseData.DescribedGroup] = List().asJava + val authorizer: Authorizer = mock(classOf[Authorizer]) + when(authorizer.authorize(any[RequestContext], any[util.List[Action]])) + .thenReturn(Seq(AuthorizationResult.DENIED).asJava) + val response = getShareGroupDescribeResponse(groupIds, Map(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true", ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true") + , false, authorizer, describedGroups) + assertNotNull(response.data) + assertEquals(2, response.data.groups.size) + response.data.groups.forEach(group => assertEquals(Errors.GROUP_AUTHORIZATION_FAILED.code(), group.errorCode())) + } + + @Test + def testShareGroupDescribeRequestAuthorizationFailedForOneGroup(): Unit = { + val groupIds = List("share-group-id-fail-0", "share-group-id-1").asJava + val describedGroups: util.List[ShareGroupDescribeResponseData.DescribedGroup] = List( + new ShareGroupDescribeResponseData.DescribedGroup().setGroupId(groupIds.get(1)) + ).asJava + + val authorizer: Authorizer = mock(classOf[Authorizer]) + when(authorizer.authorize(any[RequestContext], any[util.List[Action]])) + .thenReturn(Seq(AuthorizationResult.DENIED).asJava, Seq(AuthorizationResult.ALLOWED).asJava) + + val response = getShareGroupDescribeResponse(groupIds, Map(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true", ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true") + , false, authorizer, describedGroups) + + assertNotNull(response.data) + assertEquals(2, response.data.groups.size) + assertEquals(Errors.GROUP_AUTHORIZATION_FAILED.code(), response.data.groups.get(0).errorCode()) + assertEquals(Errors.NONE.code(), response.data.groups.get(1).errorCode()) + } + + def getShareGroupDescribeResponse(groupIds: util.List[String], configOverrides: Map[String, String] = Map.empty, + verifyNoErr: Boolean = true, authorizer: Authorizer = null, + describedGroups: util.List[ShareGroupDescribeResponseData.DescribedGroup]): ShareGroupDescribeResponse = { + val shareGroupDescribeRequestData = new ShareGroupDescribeRequestData() + shareGroupDescribeRequestData.groupIds.addAll(groupIds) + val requestChannelRequest = buildRequest(new ShareGroupDescribeRequest.Builder(shareGroupDescribeRequestData, true).build()) + + val future = new CompletableFuture[util.List[ShareGroupDescribeResponseData.DescribedGroup]]() + when(groupCoordinator.shareGroupDescribe( + any[RequestContext], + any[util.List[String]] + )).thenReturn(future) + metadataCache = MetadataCache.kRaftMetadataCache(brokerId) + kafkaApis = createKafkaApis( + overrideProperties = configOverrides, + authorizer = Option(authorizer), + raftSupport = true + ) + kafkaApis.handle(requestChannelRequest, RequestLocal.NoCaching) + + future.complete(describedGroups) + + val response = verifyNoThrottling[ShareGroupDescribeResponse](requestChannelRequest) + if (verifyNoErr) { + val expectedShareGroupDescribeResponseData = new ShareGroupDescribeResponseData() + .setGroups(describedGroups) + assertEquals(expectedShareGroupDescribeResponseData, response.data) + } + response + } }