KAFKA-16743,KAFKA-16744: KafkaApis support for share group heartbeat and describe (#16574)

Added handling of share group heartbeat and describe in KafkaApis. The Implementation of heartbeat and describe is with group coordinator.

Reviewers:  Manikumar Reddy <manikumar.reddy@gmail.com>, Rahul <rahul.nirgude@mastercard.com>
This commit is contained in:
Apoorv Mittal 2024-07-15 14:36:20 +01:00 committed by GitHub
parent 8aee314a46
commit 3a442ffe32
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 247 additions and 1 deletions

View File

@ -258,6 +258,8 @@ class KafkaApis(val requestChannel: RequestChannel,
case ApiKeys.LIST_CLIENT_METRICS_RESOURCES => handleListClientMetricsResources(request)
case ApiKeys.ADD_RAFT_VOTER => forwardToControllerOrFail(request)
case ApiKeys.REMOVE_RAFT_VOTER => forwardToControllerOrFail(request)
case ApiKeys.SHARE_GROUP_HEARTBEAT => handleShareGroupHeartbeat(request).exceptionally(handleError)
case ApiKeys.SHARE_GROUP_DESCRIBE => handleShareGroupDescribe(request).exceptionally(handleError)
case ApiKeys.SHARE_FETCH => handleShareFetchRequest(request)
case ApiKeys.SHARE_ACKNOWLEDGE => handleShareAcknowledgeRequest(request)
case ApiKeys.INITIALIZE_SHARE_GROUP_STATE => handleInitializeShareGroupStateRequest(request)
@ -3955,6 +3957,72 @@ class KafkaApis(val requestChannel: RequestChannel,
}
}
def handleShareGroupHeartbeat(request: RequestChannel.Request): CompletableFuture[Unit] = {
val shareGroupHeartbeatRequest = request.body[ShareGroupHeartbeatRequest]
if (!isShareGroupProtocolEnabled) {
requestHelper.sendMaybeThrottle(request, shareGroupHeartbeatRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
CompletableFuture.completedFuture[Unit](())
} else if (!authHelper.authorize(request.context, READ, GROUP, shareGroupHeartbeatRequest.data.groupId)) {
requestHelper.sendMaybeThrottle(request, shareGroupHeartbeatRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
CompletableFuture.completedFuture[Unit](())
} else {
groupCoordinator.shareGroupHeartbeat(
request.context,
shareGroupHeartbeatRequest.data,
).handle[Unit] { (response, exception) =>
if (exception != null) {
requestHelper.sendMaybeThrottle(request, shareGroupHeartbeatRequest.getErrorResponse(exception))
} else {
requestHelper.sendMaybeThrottle(request, new ShareGroupHeartbeatResponse(response))
}
}
}
}
def handleShareGroupDescribe(request: RequestChannel.Request): CompletableFuture[Unit] = {
val shareGroupDescribeRequest = request.body[ShareGroupDescribeRequest]
if (!isShareGroupProtocolEnabled) {
requestHelper.sendMaybeThrottle(request, shareGroupDescribeRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
CompletableFuture.completedFuture[Unit](())
} else {
val response = new ShareGroupDescribeResponseData()
val authorizedGroups = new ArrayBuffer[String]()
shareGroupDescribeRequest.data.groupIds.forEach { groupId =>
if (!authHelper.authorize(request.context, DESCRIBE, GROUP, groupId)) {
response.groups.add(new ShareGroupDescribeResponseData.DescribedGroup()
.setGroupId(groupId)
.setErrorCode(Errors.GROUP_AUTHORIZATION_FAILED.code)
)
} else {
authorizedGroups += groupId
}
}
groupCoordinator.shareGroupDescribe(
request.context,
authorizedGroups.asJava
).handle[Unit] { (results, exception) =>
if (exception != null) {
requestHelper.sendMaybeThrottle(request, shareGroupDescribeRequest.getErrorResponse(exception))
} else {
if (response.groups.isEmpty) {
// If the response is empty, we can directly reuse the results.
response.setGroups(results)
} else {
// Otherwise, we have to copy the results into the existing ones.
response.groups.addAll(results)
}
requestHelper.sendMaybeThrottle(request, new ShareGroupDescribeResponse(response))
}
}
}
}
def handleShareFetchRequest(request: RequestChannel.Request): Unit = {
val shareFetchRequest = request.body[ShareFetchRequest]
// TODO: Implement the ShareFetchRequest handling
@ -4004,6 +4072,10 @@ class KafkaApis(val requestChannel: RequestChannel,
CompletableFuture.completedFuture[Unit](())
}
private def isShareGroupProtocolEnabled: Boolean = {
config.isNewGroupCoordinatorEnabled && config.shareGroupConfig.isShareGroupEnabled
}
private def updateRecordConversionStats(request: RequestChannel.Request,
tp: TopicPartition,
conversionStats: RecordValidationStats): Unit = {

View File

@ -81,7 +81,7 @@ import org.apache.kafka.server.ClientMetricsManager
import org.apache.kafka.server.authorizer.{Action, AuthorizationResult, Authorizer}
import org.apache.kafka.server.common.MetadataVersion.{IBP_0_10_2_IV0, IBP_2_2_IV1}
import org.apache.kafka.server.common.{FinalizedFeatures, MetadataVersion}
import org.apache.kafka.server.config.{ConfigType, KRaftConfigs, ReplicationConfigs, ServerConfigs, ServerLogConfigs}
import org.apache.kafka.server.config.{ConfigType, KRaftConfigs, ReplicationConfigs, ServerConfigs, ServerLogConfigs, ShareGroupConfig}
import org.apache.kafka.server.metrics.ClientMetricsTestUtils
import org.apache.kafka.server.util.{FutureUtils, MockTime}
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchParams, FetchPartitionData, LogConfig}
@ -7383,4 +7383,178 @@ class KafkaApisTest extends Logging {
val expectedResponse = new ListClientMetricsResourcesResponseData().setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code)
assertEquals(expectedResponse, response.data)
}
@Test
def testShareGroupHeartbeatReturnsUnsupportedVersion(): Unit = {
val shareGroupHeartbeatRequest = new ShareGroupHeartbeatRequestData().setGroupId("group")
val requestChannelRequest = buildRequest(new ShareGroupHeartbeatRequest.Builder(shareGroupHeartbeatRequest, true).build())
metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
kafkaApis = createKafkaApis(raftSupport = true)
kafkaApis.handle(requestChannelRequest, RequestLocal.NoCaching)
val expectedHeartbeatResponse = new ShareGroupHeartbeatResponseData()
.setErrorCode(Errors.UNSUPPORTED_VERSION.code)
val response = verifyNoThrottling[ShareGroupHeartbeatResponse](requestChannelRequest)
assertEquals(expectedHeartbeatResponse, response.data)
}
@Test
def testShareGroupHeartbeatRequest(): Unit = {
val shareGroupHeartbeatRequest = new ShareGroupHeartbeatRequestData().setGroupId("group")
val requestChannelRequest = buildRequest(new ShareGroupHeartbeatRequest.Builder(shareGroupHeartbeatRequest, true).build())
val future = new CompletableFuture[ShareGroupHeartbeatResponseData]()
when(groupCoordinator.shareGroupHeartbeat(
requestChannelRequest.context,
shareGroupHeartbeatRequest
)).thenReturn(future)
metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
kafkaApis = createKafkaApis(
overrideProperties = Map(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true", ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
raftSupport = true
)
kafkaApis.handle(requestChannelRequest, RequestLocal.NoCaching)
val shareGroupHeartbeatResponse = new ShareGroupHeartbeatResponseData()
.setMemberId("member")
future.complete(shareGroupHeartbeatResponse)
val response = verifyNoThrottling[ShareGroupHeartbeatResponse](requestChannelRequest)
assertEquals(shareGroupHeartbeatResponse, response.data)
}
@Test
def testShareGroupHeartbeatRequestAuthorizationFailed(): Unit = {
val shareGroupHeartbeatRequest = new ShareGroupHeartbeatRequestData().setGroupId("group")
val requestChannelRequest = buildRequest(new ShareGroupHeartbeatRequest.Builder(shareGroupHeartbeatRequest, true).build())
val authorizer: Authorizer = mock(classOf[Authorizer])
when(authorizer.authorize(any[RequestContext], any[util.List[Action]]))
.thenReturn(Seq(AuthorizationResult.DENIED).asJava)
metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
kafkaApis = createKafkaApis(
overrideProperties = Map(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true", ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
authorizer = Some(authorizer),
raftSupport = true
)
kafkaApis.handle(requestChannelRequest, RequestLocal.NoCaching)
val response = verifyNoThrottling[ShareGroupHeartbeatResponse](requestChannelRequest)
assertEquals(Errors.GROUP_AUTHORIZATION_FAILED.code, response.data.errorCode)
}
@Test
def testShareGroupHeartbeatRequestFutureFailed(): Unit = {
val shareGroupHeartbeatRequest = new ShareGroupHeartbeatRequestData().setGroupId("group")
val requestChannelRequest = buildRequest(new ShareGroupHeartbeatRequest.Builder(shareGroupHeartbeatRequest, true).build())
val future = new CompletableFuture[ShareGroupHeartbeatResponseData]()
when(groupCoordinator.shareGroupHeartbeat(
requestChannelRequest.context,
shareGroupHeartbeatRequest
)).thenReturn(future)
metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
kafkaApis = createKafkaApis(
overrideProperties = Map(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true", ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
raftSupport = true
)
kafkaApis.handle(requestChannelRequest, RequestLocal.NoCaching)
future.completeExceptionally(Errors.FENCED_MEMBER_EPOCH.exception)
val response = verifyNoThrottling[ShareGroupHeartbeatResponse](requestChannelRequest)
assertEquals(Errors.FENCED_MEMBER_EPOCH.code, response.data.errorCode)
}
@Test
def testShareGroupDescribeSuccess(): Unit = {
val groupIds = List("share-group-id-0", "share-group-id-1").asJava
val describedGroups: util.List[ShareGroupDescribeResponseData.DescribedGroup] = List(
new ShareGroupDescribeResponseData.DescribedGroup().setGroupId(groupIds.get(0)),
new ShareGroupDescribeResponseData.DescribedGroup().setGroupId(groupIds.get(1))
).asJava
getShareGroupDescribeResponse(groupIds, Map(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true", ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true")
, true, null, describedGroups)
}
@Test
def testShareGroupDescribeReturnsUnsupportedVersion(): Unit = {
val groupIds = List("share-group-id-0", "share-group-id-1").asJava
val describedGroups: util.List[ShareGroupDescribeResponseData.DescribedGroup] = List(
new ShareGroupDescribeResponseData.DescribedGroup().setGroupId(groupIds.get(0)),
new ShareGroupDescribeResponseData.DescribedGroup().setGroupId(groupIds.get(1))
).asJava
val response = getShareGroupDescribeResponse(groupIds, Map.empty, false, null, describedGroups)
assertNotNull(response.data)
assertEquals(2, response.data.groups.size)
response.data.groups.forEach(group => assertEquals(Errors.UNSUPPORTED_VERSION.code(), group.errorCode()))
}
@Test
def testShareGroupDescribeRequestAuthorizationFailed(): Unit = {
val groupIds = List("share-group-id-0", "share-group-id-1").asJava
val describedGroups: util.List[ShareGroupDescribeResponseData.DescribedGroup] = List().asJava
val authorizer: Authorizer = mock(classOf[Authorizer])
when(authorizer.authorize(any[RequestContext], any[util.List[Action]]))
.thenReturn(Seq(AuthorizationResult.DENIED).asJava)
val response = getShareGroupDescribeResponse(groupIds, Map(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true", ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true")
, false, authorizer, describedGroups)
assertNotNull(response.data)
assertEquals(2, response.data.groups.size)
response.data.groups.forEach(group => assertEquals(Errors.GROUP_AUTHORIZATION_FAILED.code(), group.errorCode()))
}
@Test
def testShareGroupDescribeRequestAuthorizationFailedForOneGroup(): Unit = {
val groupIds = List("share-group-id-fail-0", "share-group-id-1").asJava
val describedGroups: util.List[ShareGroupDescribeResponseData.DescribedGroup] = List(
new ShareGroupDescribeResponseData.DescribedGroup().setGroupId(groupIds.get(1))
).asJava
val authorizer: Authorizer = mock(classOf[Authorizer])
when(authorizer.authorize(any[RequestContext], any[util.List[Action]]))
.thenReturn(Seq(AuthorizationResult.DENIED).asJava, Seq(AuthorizationResult.ALLOWED).asJava)
val response = getShareGroupDescribeResponse(groupIds, Map(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true", ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true")
, false, authorizer, describedGroups)
assertNotNull(response.data)
assertEquals(2, response.data.groups.size)
assertEquals(Errors.GROUP_AUTHORIZATION_FAILED.code(), response.data.groups.get(0).errorCode())
assertEquals(Errors.NONE.code(), response.data.groups.get(1).errorCode())
}
def getShareGroupDescribeResponse(groupIds: util.List[String], configOverrides: Map[String, String] = Map.empty,
verifyNoErr: Boolean = true, authorizer: Authorizer = null,
describedGroups: util.List[ShareGroupDescribeResponseData.DescribedGroup]): ShareGroupDescribeResponse = {
val shareGroupDescribeRequestData = new ShareGroupDescribeRequestData()
shareGroupDescribeRequestData.groupIds.addAll(groupIds)
val requestChannelRequest = buildRequest(new ShareGroupDescribeRequest.Builder(shareGroupDescribeRequestData, true).build())
val future = new CompletableFuture[util.List[ShareGroupDescribeResponseData.DescribedGroup]]()
when(groupCoordinator.shareGroupDescribe(
any[RequestContext],
any[util.List[String]]
)).thenReturn(future)
metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
kafkaApis = createKafkaApis(
overrideProperties = configOverrides,
authorizer = Option(authorizer),
raftSupport = true
)
kafkaApis.handle(requestChannelRequest, RequestLocal.NoCaching)
future.complete(describedGroups)
val response = verifyNoThrottling[ShareGroupDescribeResponse](requestChannelRequest)
if (verifyNoErr) {
val expectedShareGroupDescribeResponseData = new ShareGroupDescribeResponseData()
.setGroups(describedGroups)
assertEquals(expectedShareGroupDescribeResponseData, response.data)
}
response
}
}