MINOR: Refactor shared-group request handle methods to return CompletableFuture for consistent error handling (#19724)
CI / build (push) Waiting to run Details

This PR is based on the discussion here:
https://github.com/apache/kafka/pull/18929#discussion_r2083238645

Currently, the handle methods related to `shared‐group` requests  are
inconsistent: some return `Unit` while others return
`CompletableFuture[Unit]` without a clear rationale. This PR
standardizes all of them to return `CompletableFuture[Unit]` and ensures
consistent error handling by chaining `.exceptionally(handleError)` to
each call site.

Reviewers: PoAn Yang <payang@apache.org>, Ken Huang
 <s7133700@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Hong-Yi Chen 2025-05-18 17:45:50 +00:00 committed by GitHub
parent 6596ba3a78
commit ce4940f989
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 26 additions and 26 deletions

View File

@ -232,16 +232,16 @@ class KafkaApis(val requestChannel: RequestChannel,
case ApiKeys.REMOVE_RAFT_VOTER => forwardToController(request) case ApiKeys.REMOVE_RAFT_VOTER => forwardToController(request)
case ApiKeys.SHARE_GROUP_HEARTBEAT => handleShareGroupHeartbeat(request).exceptionally(handleError) case ApiKeys.SHARE_GROUP_HEARTBEAT => handleShareGroupHeartbeat(request).exceptionally(handleError)
case ApiKeys.SHARE_GROUP_DESCRIBE => handleShareGroupDescribe(request).exceptionally(handleError) case ApiKeys.SHARE_GROUP_DESCRIBE => handleShareGroupDescribe(request).exceptionally(handleError)
case ApiKeys.SHARE_FETCH => handleShareFetchRequest(request) case ApiKeys.SHARE_FETCH => handleShareFetchRequest(request).exceptionally(handleError)
case ApiKeys.SHARE_ACKNOWLEDGE => handleShareAcknowledgeRequest(request) case ApiKeys.SHARE_ACKNOWLEDGE => handleShareAcknowledgeRequest(request).exceptionally(handleError)
case ApiKeys.INITIALIZE_SHARE_GROUP_STATE => handleInitializeShareGroupStateRequest(request) case ApiKeys.INITIALIZE_SHARE_GROUP_STATE => handleInitializeShareGroupStateRequest(request).exceptionally(handleError)
case ApiKeys.READ_SHARE_GROUP_STATE => handleReadShareGroupStateRequest(request) case ApiKeys.READ_SHARE_GROUP_STATE => handleReadShareGroupStateRequest(request).exceptionally(handleError)
case ApiKeys.WRITE_SHARE_GROUP_STATE => handleWriteShareGroupStateRequest(request) case ApiKeys.WRITE_SHARE_GROUP_STATE => handleWriteShareGroupStateRequest(request).exceptionally(handleError)
case ApiKeys.DELETE_SHARE_GROUP_STATE => handleDeleteShareGroupStateRequest(request) case ApiKeys.DELETE_SHARE_GROUP_STATE => handleDeleteShareGroupStateRequest(request).exceptionally(handleError)
case ApiKeys.READ_SHARE_GROUP_STATE_SUMMARY => handleReadShareGroupStateSummaryRequest(request) case ApiKeys.READ_SHARE_GROUP_STATE_SUMMARY => handleReadShareGroupStateSummaryRequest(request).exceptionally(handleError)
case ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS => handleDescribeShareGroupOffsetsRequest(request) case ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS => handleDescribeShareGroupOffsetsRequest(request).exceptionally(handleError)
case ApiKeys.ALTER_SHARE_GROUP_OFFSETS => handleAlterShareGroupOffsetsRequest(request) case ApiKeys.ALTER_SHARE_GROUP_OFFSETS => handleAlterShareGroupOffsetsRequest(request).exceptionally(handleError)
case ApiKeys.DELETE_SHARE_GROUP_OFFSETS => handleDeleteShareGroupOffsetsRequest(request) case ApiKeys.DELETE_SHARE_GROUP_OFFSETS => handleDeleteShareGroupOffsetsRequest(request).exceptionally(handleError)
case ApiKeys.STREAMS_GROUP_DESCRIBE => handleStreamsGroupDescribe(request).exceptionally(handleError) case ApiKeys.STREAMS_GROUP_DESCRIBE => handleStreamsGroupDescribe(request).exceptionally(handleError)
case ApiKeys.STREAMS_GROUP_HEARTBEAT => handleStreamsGroupHeartbeat(request).exceptionally(handleError) case ApiKeys.STREAMS_GROUP_HEARTBEAT => handleStreamsGroupHeartbeat(request).exceptionally(handleError)
case _ => throw new IllegalStateException(s"No handler for request api key ${request.header.apiKey}") case _ => throw new IllegalStateException(s"No handler for request api key ${request.header.apiKey}")
@ -3108,12 +3108,12 @@ class KafkaApis(val requestChannel: RequestChannel,
/** /**
* Handle a shareFetch request * Handle a shareFetch request
*/ */
def handleShareFetchRequest(request: RequestChannel.Request): Unit = { def handleShareFetchRequest(request: RequestChannel.Request): CompletableFuture[Unit] = {
val shareFetchRequest = request.body[ShareFetchRequest] val shareFetchRequest = request.body[ShareFetchRequest]
if (!isShareGroupProtocolEnabled) { if (!isShareGroupProtocolEnabled) {
requestHelper.sendMaybeThrottle(request, shareFetchRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, Errors.UNSUPPORTED_VERSION.exception)) requestHelper.sendMaybeThrottle(request, shareFetchRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, Errors.UNSUPPORTED_VERSION.exception))
return return CompletableFuture.completedFuture[Unit](())
} }
val groupId = shareFetchRequest.data.groupId val groupId = shareFetchRequest.data.groupId
@ -3121,7 +3121,7 @@ class KafkaApis(val requestChannel: RequestChannel,
// Share Fetch needs permission to perform the READ action on the named group resource (groupId) // Share Fetch needs permission to perform the READ action on the named group resource (groupId)
if (!authHelper.authorize(request.context, READ, GROUP, groupId)) { if (!authHelper.authorize(request.context, READ, GROUP, groupId)) {
requestHelper.sendMaybeThrottle(request, shareFetchRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, Errors.GROUP_AUTHORIZATION_FAILED.exception)) requestHelper.sendMaybeThrottle(request, shareFetchRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, Errors.GROUP_AUTHORIZATION_FAILED.exception))
return return CompletableFuture.completedFuture[Unit](())
} }
val memberId = shareFetchRequest.data.memberId val memberId = shareFetchRequest.data.memberId
@ -3156,10 +3156,10 @@ class KafkaApis(val requestChannel: RequestChannel,
} }
} }
) )
return return CompletableFuture.completedFuture[Unit](())
case e: Exception => case e: Exception =>
requestHelper.sendMaybeThrottle(request, shareFetchRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, e)) requestHelper.sendMaybeThrottle(request, shareFetchRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, e))
return return CompletableFuture.completedFuture[Unit](())
} }
val erroneousAndValidPartitionData: ErroneousAndValidPartitionData = shareFetchContext.getErroneousAndValidTopicIdPartitions val erroneousAndValidPartitionData: ErroneousAndValidPartitionData = shareFetchContext.getErroneousAndValidTopicIdPartitions
@ -3429,13 +3429,13 @@ class KafkaApis(val requestChannel: RequestChannel,
} }
} }
def handleShareAcknowledgeRequest(request: RequestChannel.Request): Unit = { def handleShareAcknowledgeRequest(request: RequestChannel.Request): CompletableFuture[Unit] = {
val shareAcknowledgeRequest = request.body[ShareAcknowledgeRequest] val shareAcknowledgeRequest = request.body[ShareAcknowledgeRequest]
if (!isShareGroupProtocolEnabled) { if (!isShareGroupProtocolEnabled) {
requestHelper.sendMaybeThrottle(request, requestHelper.sendMaybeThrottle(request,
shareAcknowledgeRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, Errors.UNSUPPORTED_VERSION.exception)) shareAcknowledgeRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, Errors.UNSUPPORTED_VERSION.exception))
return return CompletableFuture.completedFuture[Unit](())
} }
val groupId = shareAcknowledgeRequest.data.groupId val groupId = shareAcknowledgeRequest.data.groupId
@ -3444,7 +3444,7 @@ class KafkaApis(val requestChannel: RequestChannel,
if (!authHelper.authorize(request.context, READ, GROUP, groupId)) { if (!authHelper.authorize(request.context, READ, GROUP, groupId)) {
requestHelper.sendMaybeThrottle(request, requestHelper.sendMaybeThrottle(request,
shareAcknowledgeRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, Errors.GROUP_AUTHORIZATION_FAILED.exception)) shareAcknowledgeRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, Errors.GROUP_AUTHORIZATION_FAILED.exception))
return return CompletableFuture.completedFuture[Unit](())
} }
val memberId = shareAcknowledgeRequest.data.memberId val memberId = shareAcknowledgeRequest.data.memberId
@ -3457,7 +3457,7 @@ class KafkaApis(val requestChannel: RequestChannel,
} catch { } catch {
case e: Exception => case e: Exception =>
requestHelper.sendMaybeThrottle(request, shareAcknowledgeRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, e)) requestHelper.sendMaybeThrottle(request, shareAcknowledgeRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, e))
return return CompletableFuture.completedFuture[Unit](())
} }
val topicIdPartitionSeq: mutable.Set[TopicIdPartition] = mutable.Set() val topicIdPartitionSeq: mutable.Set[TopicIdPartition] = mutable.Set()
@ -3623,7 +3623,7 @@ class KafkaApis(val requestChannel: RequestChannel,
} }
} }
def handleDescribeShareGroupOffsetsRequest(request: RequestChannel.Request): Unit = { def handleDescribeShareGroupOffsetsRequest(request: RequestChannel.Request): CompletableFuture[Unit] = {
val describeShareGroupOffsetsRequest = request.body[DescribeShareGroupOffsetsRequest] val describeShareGroupOffsetsRequest = request.body[DescribeShareGroupOffsetsRequest]
val groups = describeShareGroupOffsetsRequest.groups() val groups = describeShareGroupOffsetsRequest.groups()
@ -3736,27 +3736,27 @@ class KafkaApis(val requestChannel: RequestChannel,
} }
} }
def handleAlterShareGroupOffsetsRequest(request: RequestChannel.Request): Unit = { def handleAlterShareGroupOffsetsRequest(request: RequestChannel.Request): CompletableFuture[Unit] = {
val alterShareGroupOffsetsRequest = request.body[AlterShareGroupOffsetsRequest] val alterShareGroupOffsetsRequest = request.body[AlterShareGroupOffsetsRequest]
if (!isShareGroupProtocolEnabled) { if (!isShareGroupProtocolEnabled) {
requestHelper.sendMaybeThrottle(request, alterShareGroupOffsetsRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, Errors.UNSUPPORTED_VERSION.exception)) requestHelper.sendMaybeThrottle(request, alterShareGroupOffsetsRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, Errors.UNSUPPORTED_VERSION.exception))
return return CompletableFuture.completedFuture[Unit](())
} }
requestHelper.sendMaybeThrottle(request, alterShareGroupOffsetsRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) requestHelper.sendMaybeThrottle(request, alterShareGroupOffsetsRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
CompletableFuture.completedFuture[Unit](()) CompletableFuture.completedFuture[Unit](())
} }
def handleDeleteShareGroupOffsetsRequest(request: RequestChannel.Request): Unit = { def handleDeleteShareGroupOffsetsRequest(request: RequestChannel.Request): CompletableFuture[Unit] = {
val deleteShareGroupOffsetsRequest = request.body[DeleteShareGroupOffsetsRequest] val deleteShareGroupOffsetsRequest = request.body[DeleteShareGroupOffsetsRequest]
val groupId = deleteShareGroupOffsetsRequest.data.groupId val groupId = deleteShareGroupOffsetsRequest.data.groupId
if (!isShareGroupProtocolEnabled) { if (!isShareGroupProtocolEnabled) {
requestHelper.sendMaybeThrottle(request, deleteShareGroupOffsetsRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, Errors.UNSUPPORTED_VERSION.exception)) requestHelper.sendMaybeThrottle(request, deleteShareGroupOffsetsRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, Errors.UNSUPPORTED_VERSION.exception))
return return CompletableFuture.completedFuture[Unit](())
} else if (!authHelper.authorize(request.context, DELETE, GROUP, groupId)) { } else if (!authHelper.authorize(request.context, DELETE, GROUP, groupId)) {
requestHelper.sendMaybeThrottle(request, deleteShareGroupOffsetsRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, Errors.GROUP_AUTHORIZATION_FAILED.exception)) requestHelper.sendMaybeThrottle(request, deleteShareGroupOffsetsRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, Errors.GROUP_AUTHORIZATION_FAILED.exception))
return return CompletableFuture.completedFuture[Unit](())
} }
val deleteShareGroupOffsetsResponseTopics: util.List[DeleteShareGroupOffsetsResponseTopic] = new util.ArrayList[DeleteShareGroupOffsetsResponseTopic]() val deleteShareGroupOffsetsResponseTopics: util.List[DeleteShareGroupOffsetsResponseTopic] = new util.ArrayList[DeleteShareGroupOffsetsResponseTopic]()
@ -3783,7 +3783,7 @@ class KafkaApis(val requestChannel: RequestChannel,
new DeleteShareGroupOffsetsResponse( new DeleteShareGroupOffsetsResponse(
new DeleteShareGroupOffsetsResponseData() new DeleteShareGroupOffsetsResponseData()
.setResponses(deleteShareGroupOffsetsResponseTopics))) .setResponses(deleteShareGroupOffsetsResponseTopics)))
return return CompletableFuture.completedFuture[Unit](())
} }
groupCoordinator.deleteShareGroupOffsets( groupCoordinator.deleteShareGroupOffsets(