KAFKA-14367; Add `Heartbeat` to the new `GroupCoordinator` interface (#12848)

This patch adds `heartbeat` to the new `GroupCoordinator` interface and updates `KafkaApis` to use it.

Reviewers: Justine Olshan <jolshan@confluent.io>, Jeff Kim <jeff.kim@confluent.io>, Jason Gustafson <jason@confluent.io>
This commit is contained in:
David Jacot 2022-12-01 19:59:33 +01:00 committed by GitHub
parent 836db84014
commit f5305fb38d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 167 additions and 28 deletions

View File

@ -17,7 +17,7 @@
package kafka.coordinator.group
import kafka.server.RequestLocal
import org.apache.kafka.common.message.{JoinGroupRequestData, JoinGroupResponseData}
import org.apache.kafka.common.message.{HeartbeatRequestData, HeartbeatResponseData, JoinGroupRequestData, JoinGroupResponseData}
import org.apache.kafka.common.requests.RequestContext
import org.apache.kafka.common.utils.BufferSupplier
@ -83,4 +83,22 @@ class GroupCoordinatorAdapter(
future
}
override def heartbeat(
context: RequestContext,
request: HeartbeatRequestData
): CompletableFuture[HeartbeatResponseData] = {
val future = new CompletableFuture[HeartbeatResponseData]()
coordinator.handleHeartbeat(
request.groupId,
request.memberId,
Option(request.groupInstanceId),
request.generationId,
error => future.complete(new HeartbeatResponseData()
.setErrorCode(error.code))
)
future
}
}

View File

@ -194,7 +194,7 @@ class KafkaApis(val requestChannel: RequestChannel,
case ApiKeys.OFFSET_FETCH => handleOffsetFetchRequest(request)
case ApiKeys.FIND_COORDINATOR => handleFindCoordinatorRequest(request)
case ApiKeys.JOIN_GROUP => handleJoinGroupRequest(request, requestLocal).exceptionally(handleError)
case ApiKeys.HEARTBEAT => handleHeartbeatRequest(request)
case ApiKeys.HEARTBEAT => handleHeartbeatRequest(request).exceptionally(handleError)
case ApiKeys.LEAVE_GROUP => handleLeaveGroupRequest(request)
case ApiKeys.SYNC_GROUP => handleSyncGroupRequest(request, requestLocal)
case ApiKeys.DESCRIBE_GROUPS => handleDescribeGroupRequest(request)
@ -1764,42 +1764,36 @@ class KafkaApis(val requestChannel: RequestChannel,
})
}
def handleHeartbeatRequest(request: RequestChannel.Request): Unit = {
def handleHeartbeatRequest(request: RequestChannel.Request): CompletableFuture[Unit] = {
val heartbeatRequest = request.body[HeartbeatRequest]
// the callback for sending a heartbeat response
def sendResponseCallback(error: Errors): Unit = {
def createResponse(requestThrottleMs: Int): AbstractResponse = {
val response = new HeartbeatResponse(
new HeartbeatResponseData()
.setThrottleTimeMs(requestThrottleMs)
.setErrorCode(error.code))
trace("Sending heartbeat response %s for correlation id %d to client %s."
.format(response, request.header.correlationId, request.header.clientId))
def sendResponse(response: AbstractResponse): Unit = {
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {
response.maybeSetThrottleTimeMs(requestThrottleMs)
response
}
requestHelper.sendResponseMaybeThrottle(request, createResponse)
})
}
if (heartbeatRequest.data.groupInstanceId != null && config.interBrokerProtocolVersion.isLessThan(IBP_2_3_IV0)) {
// Only enable static membership when IBP >= 2.3, because it is not safe for the broker to use the static member logic
// until we are sure that all brokers support it. If static group being loaded by an older coordinator, it will discard
// the group.instance.id field, so static members could accidentally become "dynamic", which leads to wrong states.
sendResponseCallback(Errors.UNSUPPORTED_VERSION)
sendResponse(heartbeatRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
CompletableFuture.completedFuture[Unit](())
} else if (!authHelper.authorize(request.context, READ, GROUP, heartbeatRequest.data.groupId)) {
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
new HeartbeatResponse(
new HeartbeatResponseData()
.setThrottleTimeMs(requestThrottleMs)
.setErrorCode(Errors.GROUP_AUTHORIZATION_FAILED.code)))
sendResponse(heartbeatRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
CompletableFuture.completedFuture[Unit](())
} else {
// let the coordinator to handle heartbeat
groupCoordinator.handleHeartbeat(
heartbeatRequest.data.groupId,
heartbeatRequest.data.memberId,
Option(heartbeatRequest.data.groupInstanceId),
heartbeatRequest.data.generationId,
sendResponseCallback)
newGroupCoordinator.heartbeat(
request.context,
heartbeatRequest.data
).handle[Unit] { (response, exception) =>
if (exception != null) {
sendResponse(heartbeatRequest.getErrorResponse(exception))
} else {
sendResponse(new HeartbeatResponse(response))
}
}
}
}

View File

@ -18,7 +18,7 @@ package kafka.coordinator.group
import kafka.coordinator.group.GroupCoordinatorConcurrencyTest.JoinGroupCallback
import kafka.server.RequestLocal
import org.apache.kafka.common.message.{JoinGroupRequestData, JoinGroupResponseData}
import org.apache.kafka.common.message.{HeartbeatRequestData, HeartbeatResponseData, JoinGroupRequestData, JoinGroupResponseData}
import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocol
import org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember
import org.apache.kafka.common.network.{ClientInformation, ListenerName}
@ -28,6 +28,7 @@ import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.common.utils.BufferSupplier
import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
import org.junit.jupiter.api.Test
import org.junit.jupiter.params.ParameterizedTest
import org.mockito.{ArgumentCaptor, ArgumentMatchers}
import org.mockito.Mockito.{mock, verify}
@ -141,4 +142,36 @@ class GroupCoordinatorAdapterTest {
assertEquals(expectedData, future.get())
}
@Test
def testHeartbeat(): Unit = {
val groupCoordinator = mock(classOf[GroupCoordinator])
val adapter = new GroupCoordinatorAdapter(groupCoordinator)
val ctx = makeContext(ApiKeys.HEARTBEAT, ApiKeys.HEARTBEAT.latestVersion)
val data = new HeartbeatRequestData()
.setGroupId("group")
.setMemberId("member1")
.setGenerationId(0)
val future = adapter.heartbeat(ctx, data)
val capturedCallback: ArgumentCaptor[Errors => Unit] =
ArgumentCaptor.forClass(classOf[Errors => Unit])
verify(groupCoordinator).handleHeartbeat(
ArgumentMatchers.eq(data.groupId),
ArgumentMatchers.eq(data.memberId),
ArgumentMatchers.eq(None),
ArgumentMatchers.eq(data.generationId),
capturedCallback.capture(),
)
assertFalse(future.isDone)
capturedCallback.getValue.apply(Errors.NONE)
assertTrue(future.isDone)
assertEquals(new HeartbeatResponseData(), future.get())
}
}

View File

@ -2825,6 +2825,86 @@ class KafkaApisTest {
}
}
@ParameterizedTest
@ApiKeyVersionsSource(apiKey = ApiKeys.HEARTBEAT)
def testHandleHeartbeatRequest(version: Short): Unit = {
val heartbeatRequest = new HeartbeatRequestData()
.setGroupId("group")
.setMemberId("member")
.setGenerationId(0)
val requestChannelRequest = buildRequest(new HeartbeatRequest.Builder(heartbeatRequest).build(version))
val expectedHeartbeatRequest = new HeartbeatRequestData()
.setGroupId("group")
.setMemberId("member")
.setGenerationId(0)
val future = new CompletableFuture[HeartbeatResponseData]()
when(newGroupCoordinator.heartbeat(
requestChannelRequest.context,
expectedHeartbeatRequest
)).thenReturn(future)
createKafkaApis().handleHeartbeatRequest(requestChannelRequest)
val expectedHeartbeatResponse = new HeartbeatResponseData()
future.complete(expectedHeartbeatResponse)
val capturedResponse = verifyNoThrottling(requestChannelRequest)
val response = capturedResponse.getValue.asInstanceOf[HeartbeatResponse]
assertEquals(expectedHeartbeatResponse, response.data)
}
@Test
def testHandleHeartbeatRequestFutureFailed(): Unit = {
val heartbeatRequest = new HeartbeatRequestData()
.setGroupId("group")
.setMemberId("member")
.setGenerationId(0)
val requestChannelRequest = buildRequest(new HeartbeatRequest.Builder(heartbeatRequest).build())
val expectedHeartbeatRequest = new HeartbeatRequestData()
.setGroupId("group")
.setMemberId("member")
.setGenerationId(0)
val future = new CompletableFuture[HeartbeatResponseData]()
when(newGroupCoordinator.heartbeat(
requestChannelRequest.context,
expectedHeartbeatRequest
)).thenReturn(future)
createKafkaApis().handleHeartbeatRequest(requestChannelRequest)
future.completeExceptionally(Errors.UNKNOWN_SERVER_ERROR.exception)
val capturedResponse = verifyNoThrottling(requestChannelRequest)
val response = capturedResponse.getValue.asInstanceOf[HeartbeatResponse]
assertEquals(Errors.UNKNOWN_SERVER_ERROR, response.error)
}
@Test
def testHandleHeartbeatRequestAuthenticationFailed(): Unit = {
val heartbeatRequest = new HeartbeatRequestData()
.setGroupId("group")
.setMemberId("member")
.setGenerationId(0)
val requestChannelRequest = buildRequest(new HeartbeatRequest.Builder(heartbeatRequest).build())
val authorizer: Authorizer = mock(classOf[Authorizer])
when(authorizer.authorize(any[RequestContext], any[util.List[Action]]))
.thenReturn(Seq(AuthorizationResult.DENIED).asJava)
createKafkaApis(authorizer = Some(authorizer)).handleHeartbeatRequest(
requestChannelRequest
)
val capturedResponse = verifyNoThrottling(requestChannelRequest)
val response = capturedResponse.getValue.asInstanceOf[HeartbeatResponse]
assertEquals(Errors.GROUP_AUTHORIZATION_FAILED, response.error)
}
@Test
def rejectJoinGroupRequestWhenStaticMembershipNotSupported(): Unit = {
val joinGroupRequest = new JoinGroupRequest.Builder(

View File

@ -16,6 +16,8 @@
*/
package org.apache.kafka.coordinator.group;
import org.apache.kafka.common.message.HeartbeatRequestData;
import org.apache.kafka.common.message.HeartbeatResponseData;
import org.apache.kafka.common.message.JoinGroupRequestData;
import org.apache.kafka.common.message.JoinGroupResponseData;
import org.apache.kafka.common.requests.RequestContext;
@ -40,5 +42,17 @@ public interface GroupCoordinator {
BufferSupplier bufferSupplier
);
/**
* Heartbeat to a Generic Group.
*
* @param context The coordinator request context.
* @param request The HeartbeatRequest data.
*
* @return A future yielding the response or an exception.
*/
CompletableFuture<HeartbeatResponseData> heartbeat(
RequestContext context,
HeartbeatRequestData request
);
}