diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 05ed16abffb..bc155353097 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -232,16 +232,16 @@ class KafkaApis(val requestChannel: RequestChannel, case ApiKeys.REMOVE_RAFT_VOTER => forwardToController(request) case ApiKeys.SHARE_GROUP_HEARTBEAT => handleShareGroupHeartbeat(request).exceptionally(handleError) case ApiKeys.SHARE_GROUP_DESCRIBE => handleShareGroupDescribe(request).exceptionally(handleError) - case ApiKeys.SHARE_FETCH => handleShareFetchRequest(request) - case ApiKeys.SHARE_ACKNOWLEDGE => handleShareAcknowledgeRequest(request) - case ApiKeys.INITIALIZE_SHARE_GROUP_STATE => handleInitializeShareGroupStateRequest(request) - case ApiKeys.READ_SHARE_GROUP_STATE => handleReadShareGroupStateRequest(request) - case ApiKeys.WRITE_SHARE_GROUP_STATE => handleWriteShareGroupStateRequest(request) - case ApiKeys.DELETE_SHARE_GROUP_STATE => handleDeleteShareGroupStateRequest(request) - case ApiKeys.READ_SHARE_GROUP_STATE_SUMMARY => handleReadShareGroupStateSummaryRequest(request) - case ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS => handleDescribeShareGroupOffsetsRequest(request) - case ApiKeys.ALTER_SHARE_GROUP_OFFSETS => handleAlterShareGroupOffsetsRequest(request) - case ApiKeys.DELETE_SHARE_GROUP_OFFSETS => handleDeleteShareGroupOffsetsRequest(request) + case ApiKeys.SHARE_FETCH => handleShareFetchRequest(request).exceptionally(handleError) + case ApiKeys.SHARE_ACKNOWLEDGE => handleShareAcknowledgeRequest(request).exceptionally(handleError) + case ApiKeys.INITIALIZE_SHARE_GROUP_STATE => handleInitializeShareGroupStateRequest(request).exceptionally(handleError) + case ApiKeys.READ_SHARE_GROUP_STATE => handleReadShareGroupStateRequest(request).exceptionally(handleError) + case ApiKeys.WRITE_SHARE_GROUP_STATE => handleWriteShareGroupStateRequest(request).exceptionally(handleError) + case ApiKeys.DELETE_SHARE_GROUP_STATE => handleDeleteShareGroupStateRequest(request).exceptionally(handleError) + case ApiKeys.READ_SHARE_GROUP_STATE_SUMMARY => handleReadShareGroupStateSummaryRequest(request).exceptionally(handleError) + case ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS => handleDescribeShareGroupOffsetsRequest(request).exceptionally(handleError) + case ApiKeys.ALTER_SHARE_GROUP_OFFSETS => handleAlterShareGroupOffsetsRequest(request).exceptionally(handleError) + case ApiKeys.DELETE_SHARE_GROUP_OFFSETS => handleDeleteShareGroupOffsetsRequest(request).exceptionally(handleError) case ApiKeys.STREAMS_GROUP_DESCRIBE => handleStreamsGroupDescribe(request).exceptionally(handleError) case ApiKeys.STREAMS_GROUP_HEARTBEAT => handleStreamsGroupHeartbeat(request).exceptionally(handleError) case _ => throw new IllegalStateException(s"No handler for request api key ${request.header.apiKey}") @@ -3108,12 +3108,12 @@ class KafkaApis(val requestChannel: RequestChannel, /** * Handle a shareFetch request */ - def handleShareFetchRequest(request: RequestChannel.Request): Unit = { + def handleShareFetchRequest(request: RequestChannel.Request): CompletableFuture[Unit] = { val shareFetchRequest = request.body[ShareFetchRequest] if (!isShareGroupProtocolEnabled) { requestHelper.sendMaybeThrottle(request, shareFetchRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, Errors.UNSUPPORTED_VERSION.exception)) - return + return CompletableFuture.completedFuture[Unit](()) } val groupId = shareFetchRequest.data.groupId @@ -3121,7 +3121,7 @@ class KafkaApis(val requestChannel: RequestChannel, // Share Fetch needs permission to perform the READ action on the named group resource (groupId) if (!authHelper.authorize(request.context, READ, GROUP, groupId)) { requestHelper.sendMaybeThrottle(request, shareFetchRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, Errors.GROUP_AUTHORIZATION_FAILED.exception)) - return + return CompletableFuture.completedFuture[Unit](()) } val memberId = shareFetchRequest.data.memberId @@ -3156,10 +3156,10 @@ class KafkaApis(val requestChannel: RequestChannel, } } ) - return + return CompletableFuture.completedFuture[Unit](()) case e: Exception => requestHelper.sendMaybeThrottle(request, shareFetchRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, e)) - return + return CompletableFuture.completedFuture[Unit](()) } val erroneousAndValidPartitionData: ErroneousAndValidPartitionData = shareFetchContext.getErroneousAndValidTopicIdPartitions @@ -3429,13 +3429,13 @@ class KafkaApis(val requestChannel: RequestChannel, } } - def handleShareAcknowledgeRequest(request: RequestChannel.Request): Unit = { + def handleShareAcknowledgeRequest(request: RequestChannel.Request): CompletableFuture[Unit] = { val shareAcknowledgeRequest = request.body[ShareAcknowledgeRequest] if (!isShareGroupProtocolEnabled) { requestHelper.sendMaybeThrottle(request, shareAcknowledgeRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, Errors.UNSUPPORTED_VERSION.exception)) - return + return CompletableFuture.completedFuture[Unit](()) } val groupId = shareAcknowledgeRequest.data.groupId @@ -3444,7 +3444,7 @@ class KafkaApis(val requestChannel: RequestChannel, if (!authHelper.authorize(request.context, READ, GROUP, groupId)) { requestHelper.sendMaybeThrottle(request, shareAcknowledgeRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, Errors.GROUP_AUTHORIZATION_FAILED.exception)) - return + return CompletableFuture.completedFuture[Unit](()) } val memberId = shareAcknowledgeRequest.data.memberId @@ -3457,7 +3457,7 @@ class KafkaApis(val requestChannel: RequestChannel, } catch { case e: Exception => requestHelper.sendMaybeThrottle(request, shareAcknowledgeRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, e)) - return + return CompletableFuture.completedFuture[Unit](()) } val topicIdPartitionSeq: mutable.Set[TopicIdPartition] = mutable.Set() @@ -3623,7 +3623,7 @@ class KafkaApis(val requestChannel: RequestChannel, } } - def handleDescribeShareGroupOffsetsRequest(request: RequestChannel.Request): Unit = { + def handleDescribeShareGroupOffsetsRequest(request: RequestChannel.Request): CompletableFuture[Unit] = { val describeShareGroupOffsetsRequest = request.body[DescribeShareGroupOffsetsRequest] val groups = describeShareGroupOffsetsRequest.groups() @@ -3736,27 +3736,27 @@ class KafkaApis(val requestChannel: RequestChannel, } } - def handleAlterShareGroupOffsetsRequest(request: RequestChannel.Request): Unit = { + def handleAlterShareGroupOffsetsRequest(request: RequestChannel.Request): CompletableFuture[Unit] = { val alterShareGroupOffsetsRequest = request.body[AlterShareGroupOffsetsRequest] if (!isShareGroupProtocolEnabled) { requestHelper.sendMaybeThrottle(request, alterShareGroupOffsetsRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, Errors.UNSUPPORTED_VERSION.exception)) - return + return CompletableFuture.completedFuture[Unit](()) } requestHelper.sendMaybeThrottle(request, alterShareGroupOffsetsRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) CompletableFuture.completedFuture[Unit](()) } - def handleDeleteShareGroupOffsetsRequest(request: RequestChannel.Request): Unit = { + def handleDeleteShareGroupOffsetsRequest(request: RequestChannel.Request): CompletableFuture[Unit] = { val deleteShareGroupOffsetsRequest = request.body[DeleteShareGroupOffsetsRequest] val groupId = deleteShareGroupOffsetsRequest.data.groupId if (!isShareGroupProtocolEnabled) { requestHelper.sendMaybeThrottle(request, deleteShareGroupOffsetsRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, Errors.UNSUPPORTED_VERSION.exception)) - return + return CompletableFuture.completedFuture[Unit](()) } else if (!authHelper.authorize(request.context, DELETE, GROUP, groupId)) { requestHelper.sendMaybeThrottle(request, deleteShareGroupOffsetsRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, Errors.GROUP_AUTHORIZATION_FAILED.exception)) - return + return CompletableFuture.completedFuture[Unit](()) } val deleteShareGroupOffsetsResponseTopics: util.List[DeleteShareGroupOffsetsResponseTopic] = new util.ArrayList[DeleteShareGroupOffsetsResponseTopic]() @@ -3783,7 +3783,7 @@ class KafkaApis(val requestChannel: RequestChannel, new DeleteShareGroupOffsetsResponse( new DeleteShareGroupOffsetsResponseData() .setResponses(deleteShareGroupOffsetsResponseTopics))) - return + return CompletableFuture.completedFuture[Unit](()) } groupCoordinator.deleteShareGroupOffsets(