KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface (#12845)

This patch adds `joinGroup` to the new `GroupCoordinator` interface and updates `KafkaApis` to use it.

For the context, I will do the same for all the other interactions with the current group coordinator. In order to limit the changes, I have chosen to introduce the `GroupCoordinatorAdapter` that translates the new interface to the old one. It is basically a wrapper. This allows keeping the current group coordinator untouched for now and focus on the `KafkaApis` changes. Eventually, we can remove `GroupCoordinatorAdapter`.

Reviewers: Justine Olshan <jolshan@confluent.io>, Jeff Kim <jeff.kim@confluent.io>, Luke Chen <showuon@gmail.com>, Jason Gustafson <jason@confluent.io>
This commit is contained in:
David Jacot 2022-11-29 20:39:12 +01:00 committed by GitHub
parent 471e029f0a
commit 98e19b3000
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 472 additions and 235 deletions

View File

@ -326,6 +326,13 @@
</subpackage>
</subpackage>
<subpackage name="coordinator">
<subpackage name="group">
<allow pkg="org.apache.kafka.common.message" />
<allow pkg="org.apache.kafka.common.requests" />
</subpackage>
</subpackage>
<subpackage name="server">
<allow pkg="org.apache.kafka.common" />

View File

@ -18,6 +18,7 @@
package kafka.server.builders;
import kafka.coordinator.group.GroupCoordinator;
import kafka.coordinator.group.GroupCoordinatorAdapter;
import kafka.coordinator.transaction.TransactionCoordinator;
import kafka.network.RequestChannel;
import kafka.server.ApiVersionManager;
@ -178,6 +179,7 @@ public class KafkaApisBuilder {
metadataSupport,
replicaManager,
groupCoordinator,
new GroupCoordinatorAdapter(groupCoordinator),
txnCoordinator,
autoTopicCreationManager,
brokerId,

View File

@ -0,0 +1,86 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.coordinator.group
import kafka.server.RequestLocal
import org.apache.kafka.common.message.{JoinGroupRequestData, JoinGroupResponseData}
import org.apache.kafka.common.requests.RequestContext
import org.apache.kafka.common.utils.BufferSupplier
import java.util.concurrent.CompletableFuture
import scala.jdk.CollectionConverters._
/**
* GroupCoordinatorAdapter is a thin wrapper around kafka.coordinator.group.GroupCoordinator
* that exposes the new org.apache.kafka.coordinator.group.GroupCoordinator interface.
*/
class GroupCoordinatorAdapter(
val coordinator: GroupCoordinator
) extends org.apache.kafka.coordinator.group.GroupCoordinator {
override def joinGroup(
context: RequestContext,
request: JoinGroupRequestData,
bufferSupplier: BufferSupplier
): CompletableFuture[JoinGroupResponseData] = {
val future = new CompletableFuture[JoinGroupResponseData]()
def callback(joinResult: JoinGroupResult): Unit = {
future.complete(new JoinGroupResponseData()
.setErrorCode(joinResult.error.code)
.setGenerationId(joinResult.generationId)
.setProtocolType(joinResult.protocolType.orNull)
.setProtocolName(joinResult.protocolName.orNull)
.setLeader(joinResult.leaderId)
.setSkipAssignment(joinResult.skipAssignment)
.setMemberId(joinResult.memberId)
.setMembers(joinResult.members.asJava)
)
}
val groupInstanceId = Option(request.groupInstanceId)
// Only return MEMBER_ID_REQUIRED error if joinGroupRequest version is >= 4
// and groupInstanceId is configured to unknown.
val requireKnownMemberId = context.apiVersion >= 4 && groupInstanceId.isEmpty
val protocols = request.protocols.valuesList.asScala.map { protocol =>
(protocol.name, protocol.metadata)
}.toList
val supportSkippingAssignment = context.apiVersion >= 9
coordinator.handleJoinGroup(
request.groupId,
request.memberId,
groupInstanceId,
requireKnownMemberId,
supportSkippingAssignment,
context.clientId,
context.clientAddress.toString,
request.rebalanceTimeoutMs,
request.sessionTimeoutMs,
request.protocolType,
protocols,
callback,
Option(request.reason),
RequestLocal(bufferSupplier)
)
future
}
}

View File

@ -24,7 +24,7 @@ import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.{CompletableFuture, ExecutionException, TimeUnit, TimeoutException}
import kafka.cluster.Broker.ServerInfo
import kafka.coordinator.group.GroupCoordinator
import kafka.coordinator.group.{GroupCoordinator, GroupCoordinatorAdapter}
import kafka.coordinator.transaction.{ProducerIdManager, TransactionCoordinator}
import kafka.log.LogManager
import kafka.network.{DataPlaneAcceptor, SocketServer}
@ -404,6 +404,7 @@ class BrokerServer(
metadataSupport = raftSupport,
replicaManager = replicaManager,
groupCoordinator = groupCoordinator,
newGroupCoordinator = new GroupCoordinatorAdapter(groupCoordinator),
txnCoordinator = transactionCoordinator,
autoTopicCreationManager = autoTopicCreationManager,
brokerId = config.nodeId,

View File

@ -72,16 +72,16 @@ import org.apache.kafka.common.security.token.delegation.{DelegationToken, Token
import org.apache.kafka.common.utils.{ProducerIdAndEpoch, Time}
import org.apache.kafka.common.{Node, TopicIdPartition, TopicPartition, Uuid}
import org.apache.kafka.server.authorizer._
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.MetadataVersion.{IBP_0_11_0_IV0, IBP_2_3_IV0}
import java.lang.{Long => JLong}
import java.nio.ByteBuffer
import java.util
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.{CompletableFuture, ConcurrentHashMap}
import java.util.concurrent.atomic.AtomicInteger
import java.util.{Collections, Optional}
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.MetadataVersion.{IBP_0_11_0_IV0, IBP_2_3_IV0}
import scala.annotation.nowarn
import scala.collection.{Map, Seq, Set, immutable, mutable}
import scala.jdk.CollectionConverters._
@ -94,6 +94,9 @@ class KafkaApis(val requestChannel: RequestChannel,
val metadataSupport: MetadataSupport,
val replicaManager: ReplicaManager,
val groupCoordinator: GroupCoordinator,
// newGroupCoordinator is temporary here. It will be removed when
// the migration to the new interface is completed in this class.
val newGroupCoordinator: org.apache.kafka.coordinator.group.GroupCoordinator,
val txnCoordinator: TransactionCoordinator,
val autoTopicCreationManager: AutoTopicCreationManager,
val brokerId: Int,
@ -108,7 +111,8 @@ class KafkaApis(val requestChannel: RequestChannel,
val clusterId: String,
time: Time,
val tokenManager: DelegationTokenManager,
val apiVersionManager: ApiVersionManager) extends ApiRequestHandler with Logging {
val apiVersionManager: ApiVersionManager
) extends ApiRequestHandler with Logging {
type FetchResponseStats = Map[TopicPartition, RecordConversionStats]
this.logIdent = "[KafkaApi-%d] ".format(brokerId)
@ -161,6 +165,12 @@ class KafkaApis(val requestChannel: RequestChannel,
* Top-level method that handles all requests and multiplexes to the right api
*/
override def handle(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
def handleError(e: Throwable): Unit = {
error(s"Unexpected error handling request ${request.requestDesc(true)} " +
s"with context ${request.context}", e)
requestHelper.handleError(request, e)
}
try {
trace(s"Handling request:${request.requestDesc(true)} from connection ${request.context.connectionId};" +
s"securityProtocol:${request.context.securityProtocol},principal:${request.context.principal}")
@ -183,7 +193,7 @@ class KafkaApis(val requestChannel: RequestChannel,
case ApiKeys.OFFSET_COMMIT => handleOffsetCommitRequest(request, requestLocal)
case ApiKeys.OFFSET_FETCH => handleOffsetFetchRequest(request)
case ApiKeys.FIND_COORDINATOR => handleFindCoordinatorRequest(request)
case ApiKeys.JOIN_GROUP => handleJoinGroupRequest(request, requestLocal)
case ApiKeys.JOIN_GROUP => handleJoinGroupRequest(request, requestLocal).exceptionally(handleError)
case ApiKeys.HEARTBEAT => handleHeartbeatRequest(request)
case ApiKeys.LEAVE_GROUP => handleLeaveGroupRequest(request)
case ApiKeys.SYNC_GROUP => handleSyncGroupRequest(request, requestLocal)
@ -238,10 +248,7 @@ class KafkaApis(val requestChannel: RequestChannel,
}
} catch {
case e: FatalExitError => throw e
case e: Throwable =>
error(s"Unexpected error handling request ${request.requestDesc(true)} " +
s"with context ${request.context}", e)
requestHelper.handleError(request, e)
case e: Throwable => handleError(e)
} finally {
// try to complete delayed action. In order to avoid conflicting locking, the actions to complete delayed requests
// are kept in a queue. We add the logic to check the ReplicaManager queue at the end of KafkaApis.handle() and the
@ -1647,69 +1654,42 @@ class KafkaApis(val requestChannel: RequestChannel,
}
}
def handleJoinGroupRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
def handleJoinGroupRequest(
request: RequestChannel.Request,
requestLocal: RequestLocal
): CompletableFuture[Unit] = {
val joinGroupRequest = request.body[JoinGroupRequest]
// the callback for sending a join-group response
def sendResponseCallback(joinResult: JoinGroupResult): Unit = {
def createResponse(requestThrottleMs: Int): AbstractResponse = {
val responseBody = new JoinGroupResponse(
new JoinGroupResponseData()
.setThrottleTimeMs(requestThrottleMs)
.setErrorCode(joinResult.error.code)
.setGenerationId(joinResult.generationId)
.setProtocolType(joinResult.protocolType.orNull)
.setProtocolName(joinResult.protocolName.orNull)
.setLeader(joinResult.leaderId)
.setSkipAssignment(joinResult.skipAssignment)
.setMemberId(joinResult.memberId)
.setMembers(joinResult.members.asJava),
request.context.apiVersion
)
trace("Sending join group response %s for correlation id %d to client %s."
.format(responseBody, request.header.correlationId, request.header.clientId))
responseBody
}
requestHelper.sendResponseMaybeThrottle(request, createResponse)
def sendResponse(response: AbstractResponse): Unit = {
trace("Sending join group response %s for correlation id %d to client %s."
.format(response, request.header.correlationId, request.header.clientId))
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {
response.maybeSetThrottleTimeMs(requestThrottleMs)
response
})
}
if (joinGroupRequest.data.groupInstanceId != null && config.interBrokerProtocolVersion.isLessThan(IBP_2_3_IV0)) {
// Only enable static membership when IBP >= 2.3, because it is not safe for the broker to use the static member logic
// until we are sure that all brokers support it. If static group being loaded by an older coordinator, it will discard
// the group.instance.id field, so static members could accidentally become "dynamic", which leads to wrong states.
sendResponseCallback(JoinGroupResult(JoinGroupRequest.UNKNOWN_MEMBER_ID, Errors.UNSUPPORTED_VERSION))
sendResponse(joinGroupRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
CompletableFuture.completedFuture[Unit](())
} else if (!authHelper.authorize(request.context, READ, GROUP, joinGroupRequest.data.groupId)) {
sendResponseCallback(JoinGroupResult(JoinGroupRequest.UNKNOWN_MEMBER_ID, Errors.GROUP_AUTHORIZATION_FAILED))
sendResponse(joinGroupRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
CompletableFuture.completedFuture[Unit](())
} else {
val groupInstanceId = Option(joinGroupRequest.data.groupInstanceId)
// Only return MEMBER_ID_REQUIRED error if joinGroupRequest version is >= 4
// and groupInstanceId is configured to unknown.
val requireKnownMemberId = joinGroupRequest.version >= 4 && groupInstanceId.isEmpty
// let the coordinator handle join-group
val protocols = joinGroupRequest.data.protocols.valuesList.asScala.map { protocol =>
(protocol.name, protocol.metadata)
}.toList
val supportSkippingAssignment = joinGroupRequest.version >= 9
groupCoordinator.handleJoinGroup(
joinGroupRequest.data.groupId,
joinGroupRequest.data.memberId,
groupInstanceId,
requireKnownMemberId,
supportSkippingAssignment,
request.header.clientId,
request.context.clientAddress.toString,
joinGroupRequest.data.rebalanceTimeoutMs,
joinGroupRequest.data.sessionTimeoutMs,
joinGroupRequest.data.protocolType,
protocols,
sendResponseCallback,
Option(joinGroupRequest.data.reason),
requestLocal)
newGroupCoordinator.joinGroup(
request.context,
joinGroupRequest.data,
requestLocal.bufferSupplier
).handle[Unit] { (response, exception) =>
if (exception != null) {
sendResponse(joinGroupRequest.getErrorResponse(exception))
} else {
sendResponse(new JoinGroupResponse(response, request.context.apiVersion))
}
}
}
}

View File

@ -25,7 +25,7 @@ import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
import kafka.cluster.{Broker, EndPoint}
import kafka.common.{GenerateBrokerIdException, InconsistentBrokerIdException, InconsistentClusterIdException}
import kafka.controller.KafkaController
import kafka.coordinator.group.GroupCoordinator
import kafka.coordinator.group.{GroupCoordinator, GroupCoordinatorAdapter}
import kafka.coordinator.transaction.{ProducerIdManager, TransactionCoordinator}
import kafka.log.LogManager
import kafka.metrics.KafkaMetricsReporter
@ -408,6 +408,7 @@ class KafkaServer(
metadataSupport = zkSupport,
replicaManager = replicaManager,
groupCoordinator = groupCoordinator,
newGroupCoordinator = new GroupCoordinatorAdapter(groupCoordinator),
txnCoordinator = transactionCoordinator,
autoTopicCreationManager = autoTopicCreationManager,
brokerId = config.brokerId,

View File

@ -0,0 +1,144 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.coordinator.group
import kafka.coordinator.group.GroupCoordinatorConcurrencyTest.JoinGroupCallback
import kafka.server.RequestLocal
import org.apache.kafka.common.message.{JoinGroupRequestData, JoinGroupResponseData}
import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocol
import org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember
import org.apache.kafka.common.network.{ClientInformation, ListenerName}
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.{RequestContext, RequestHeader}
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.common.utils.BufferSupplier
import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
import org.junit.jupiter.params.ParameterizedTest
import org.mockito.{ArgumentCaptor, ArgumentMatchers}
import org.mockito.Mockito.{mock, verify}
import java.net.InetAddress
import scala.jdk.CollectionConverters._
class GroupCoordinatorAdapterTest {
private def makeContext(
apiKey: ApiKeys,
apiVersion: Short
): RequestContext = {
new RequestContext(
new RequestHeader(apiKey, apiVersion, "client", 0),
"1",
InetAddress.getLocalHost,
KafkaPrincipal.ANONYMOUS,
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT),
SecurityProtocol.PLAINTEXT,
ClientInformation.EMPTY,
false
)
}
@ParameterizedTest
@ApiKeyVersionsSource(apiKey = ApiKeys.JOIN_GROUP)
def testJoinGroup(version: Short): Unit = {
val groupCoordinator = mock(classOf[GroupCoordinator])
val adapter = new GroupCoordinatorAdapter(groupCoordinator)
val ctx = makeContext(ApiKeys.JOIN_GROUP, version)
val request = new JoinGroupRequestData()
.setGroupId("group")
.setMemberId("member")
.setProtocolType("consumer")
.setRebalanceTimeoutMs(1000)
.setSessionTimeoutMs(2000)
.setReason("reason")
.setProtocols(new JoinGroupRequestData.JoinGroupRequestProtocolCollection(List(
new JoinGroupRequestProtocol()
.setName("first")
.setMetadata("first".getBytes()),
new JoinGroupRequestProtocol()
.setName("second")
.setMetadata("second".getBytes())).iterator.asJava))
val bufferSupplier = BufferSupplier.create()
val future = adapter.joinGroup(ctx, request, bufferSupplier)
assertFalse(future.isDone)
val capturedProtocols: ArgumentCaptor[List[(String, Array[Byte])]] =
ArgumentCaptor.forClass(classOf[List[(String, Array[Byte])]])
val capturedCallback: ArgumentCaptor[JoinGroupCallback] =
ArgumentCaptor.forClass(classOf[JoinGroupCallback])
verify(groupCoordinator).handleJoinGroup(
ArgumentMatchers.eq(request.groupId),
ArgumentMatchers.eq(request.memberId),
ArgumentMatchers.eq(None),
ArgumentMatchers.eq(if (version >= 4) true else false),
ArgumentMatchers.eq(if (version >= 9) true else false),
ArgumentMatchers.eq(ctx.clientId),
ArgumentMatchers.eq(InetAddress.getLocalHost.toString),
ArgumentMatchers.eq(request.rebalanceTimeoutMs),
ArgumentMatchers.eq(request.sessionTimeoutMs),
ArgumentMatchers.eq(request.protocolType),
capturedProtocols.capture(),
capturedCallback.capture(),
ArgumentMatchers.eq(Some("reason")),
ArgumentMatchers.eq(RequestLocal(bufferSupplier))
)
assertEquals(List(
("first", "first"),
("second", "second")
), capturedProtocols.getValue.map { case (name, metadata) =>
(name, new String(metadata))
})
capturedCallback.getValue.apply(JoinGroupResult(
members = List(
new JoinGroupResponseMember()
.setMemberId("member")
.setMetadata("member".getBytes())
.setGroupInstanceId("instance")
),
memberId = "member",
generationId = 10,
protocolType = Some("consumer"),
protocolName = Some("range"),
leaderId = "leader",
skipAssignment = true,
error = Errors.UNKNOWN_MEMBER_ID
))
val expectedData = new JoinGroupResponseData()
.setMembers(List(new JoinGroupResponseMember()
.setMemberId("member")
.setMetadata("member".getBytes())
.setGroupInstanceId("instance")).asJava)
.setMemberId("member")
.setGenerationId(10)
.setProtocolType("consumer")
.setProtocolName("range")
.setLeader("leader")
.setSkipAssignment(true)
.setErrorCode(Errors.UNKNOWN_MEMBER_ID.code)
assertTrue(future.isDone)
assertEquals(expectedData, future.get())
}
}

View File

@ -21,12 +21,12 @@ import java.net.InetAddress
import java.nio.charset.StandardCharsets
import java.util
import java.util.Arrays.asList
import java.util.concurrent.TimeUnit
import java.util.concurrent.{CompletableFuture, TimeUnit}
import java.util.{Collections, Optional, Properties, Random}
import kafka.api.LeaderAndIsr
import kafka.cluster.Broker
import kafka.controller.{ControllerContext, KafkaController}
import kafka.coordinator.group.GroupCoordinatorConcurrencyTest.{JoinGroupCallback, SyncGroupCallback}
import kafka.coordinator.group.GroupCoordinatorConcurrencyTest.SyncGroupCallback
import kafka.coordinator.group._
import kafka.coordinator.transaction.{InitProducerIdResult, TransactionCoordinator}
import kafka.log.AppendOrigin
@ -56,7 +56,6 @@ import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.{Alter
import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.{AlterableConfig => IAlterableConfig}
import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.{AlterableConfigCollection => IAlterableConfigCollection}
import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.{AlterConfigsResourceResponse => IAlterConfigsResourceResponse}
import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocol
import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity
import org.apache.kafka.common.message.ListOffsetsRequestData.{ListOffsetsPartition, ListOffsetsTopic}
import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic
@ -77,6 +76,7 @@ import org.apache.kafka.common.requests.WriteTxnMarkersRequest.TxnMarkerEntry
import org.apache.kafka.common.requests.{FetchMetadata => JFetchMetadata, _}
import org.apache.kafka.common.resource.{PatternType, Resource, ResourcePattern, ResourceType}
import org.apache.kafka.common.security.auth.{KafkaPrincipal, KafkaPrincipalSerde, SecurityProtocol}
import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource
import org.apache.kafka.common.utils.{ProducerIdAndEpoch, SecurityUtils, Utils}
import org.apache.kafka.common.{ElectionType, IsolationLevel, Node, TopicIdPartition, TopicPartition, Uuid}
import org.apache.kafka.server.authorizer.{Action, AuthorizationResult, Authorizer}
@ -100,6 +100,8 @@ class KafkaApisTest {
private val requestChannelMetrics: RequestChannel.Metrics = mock(classOf[RequestChannel.Metrics])
private val replicaManager: ReplicaManager = mock(classOf[ReplicaManager])
private val groupCoordinator: GroupCoordinator = mock(classOf[GroupCoordinator])
private val newGroupCoordinator: org.apache.kafka.coordinator.group.GroupCoordinator =
mock(classOf[org.apache.kafka.coordinator.group.GroupCoordinator])
private val adminManager: ZkAdminManager = mock(classOf[ZkAdminManager])
private val txnCoordinator: TransactionCoordinator = mock(classOf[TransactionCoordinator])
private val controller: KafkaController = mock(classOf[KafkaController])
@ -188,6 +190,7 @@ class KafkaApisTest {
requestChannel = requestChannel,
replicaManager = replicaManager,
groupCoordinator = groupCoordinator,
newGroupCoordinator = newGroupCoordinator,
txnCoordinator = txnCoordinator,
autoTopicCreationManager = autoTopicCreationManager,
brokerId = brokerId,
@ -2524,196 +2527,186 @@ class KafkaApisTest {
assertEquals(MemoryRecords.EMPTY, FetchResponse.recordsOrFail(partitionData))
}
@Test
def testJoinGroupProtocolsOrder(): Unit = {
val protocols = List(
("first", "first".getBytes()),
("second", "second".getBytes())
)
@ParameterizedTest
@ApiKeyVersionsSource(apiKey = ApiKeys.JOIN_GROUP)
def testHandleJoinGroupRequest(version: Short): Unit = {
val joinGroupRequest = new JoinGroupRequestData()
.setGroupId("group")
.setMemberId("member")
.setProtocolType("consumer")
.setRebalanceTimeoutMs(1000)
.setSessionTimeoutMs(2000)
val groupId = "group"
val memberId = "member1"
val protocolType = "consumer"
val rebalanceTimeoutMs = 10
val sessionTimeoutMs = 5
val capturedProtocols: ArgumentCaptor[List[(String, Array[Byte])]] = ArgumentCaptor.forClass(classOf[List[(String, Array[Byte])]])
val requestChannelRequest = buildRequest(new JoinGroupRequest.Builder(joinGroupRequest).build(version))
val expectedJoinGroupRequest = new JoinGroupRequestData()
.setGroupId(joinGroupRequest.groupId)
.setMemberId(joinGroupRequest.memberId)
.setProtocolType(joinGroupRequest.protocolType)
.setRebalanceTimeoutMs(if (version >= 1) joinGroupRequest.rebalanceTimeoutMs else joinGroupRequest.sessionTimeoutMs)
.setSessionTimeoutMs(joinGroupRequest.sessionTimeoutMs)
val future = new CompletableFuture[JoinGroupResponseData]()
when(newGroupCoordinator.joinGroup(
requestChannelRequest.context,
expectedJoinGroupRequest,
RequestLocal.NoCaching.bufferSupplier
)).thenReturn(future)
createKafkaApis().handleJoinGroupRequest(
buildRequest(
new JoinGroupRequest.Builder(
new JoinGroupRequestData()
.setGroupId(groupId)
.setMemberId(memberId)
.setProtocolType(protocolType)
.setRebalanceTimeoutMs(rebalanceTimeoutMs)
.setSessionTimeoutMs(sessionTimeoutMs)
.setProtocols(new JoinGroupRequestData.JoinGroupRequestProtocolCollection(
protocols.map { case (name, protocol) => new JoinGroupRequestProtocol()
.setName(name).setMetadata(protocol)
}.iterator.asJava))
).build()
),
RequestLocal.withThreadConfinedCaching)
verify(groupCoordinator).handleJoinGroup(
ArgumentMatchers.eq(groupId),
ArgumentMatchers.eq(memberId),
ArgumentMatchers.eq(None),
ArgumentMatchers.eq(true),
ArgumentMatchers.eq(true),
ArgumentMatchers.eq(clientId),
ArgumentMatchers.eq(InetAddress.getLocalHost.toString),
ArgumentMatchers.eq(rebalanceTimeoutMs),
ArgumentMatchers.eq(sessionTimeoutMs),
ArgumentMatchers.eq(protocolType),
capturedProtocols.capture(),
any(),
any(),
any()
requestChannelRequest,
RequestLocal.NoCaching
)
val capturedProtocolsList = capturedProtocols.getValue
assertEquals(protocols.size, capturedProtocolsList.size)
protocols.zip(capturedProtocolsList).foreach { case ((expectedName, expectedBytes), (name, bytes)) =>
assertEquals(expectedName, name)
assertArrayEquals(expectedBytes, bytes)
}
val expectedJoinGroupResponse = new JoinGroupResponseData()
.setMemberId("member")
.setGenerationId(0)
.setLeader("leader")
.setProtocolType("consumer")
.setProtocolName("range")
future.complete(expectedJoinGroupResponse)
val capturedResponse = verifyNoThrottling(requestChannelRequest)
val response = capturedResponse.getValue.asInstanceOf[JoinGroupResponse]
assertEquals(expectedJoinGroupResponse, response.data)
}
@ParameterizedTest
@ApiKeyVersionsSource(apiKey = ApiKeys.JOIN_GROUP)
def testJoinGroupProtocolNameBackwardCompatibility(version: Short): Unit = {
val joinGroupRequest = new JoinGroupRequestData()
.setGroupId("group")
.setMemberId("member")
.setProtocolType("consumer")
.setRebalanceTimeoutMs(1000)
.setSessionTimeoutMs(2000)
val requestChannelRequest = buildRequest(new JoinGroupRequest.Builder(joinGroupRequest).build(version))
val expectedJoinGroupRequest = new JoinGroupRequestData()
.setGroupId(joinGroupRequest.groupId)
.setMemberId(joinGroupRequest.memberId)
.setProtocolType(joinGroupRequest.protocolType)
.setRebalanceTimeoutMs(if (version >= 1) joinGroupRequest.rebalanceTimeoutMs else joinGroupRequest.sessionTimeoutMs)
.setSessionTimeoutMs(joinGroupRequest.sessionTimeoutMs)
val future = new CompletableFuture[JoinGroupResponseData]()
when(newGroupCoordinator.joinGroup(
requestChannelRequest.context,
expectedJoinGroupRequest,
RequestLocal.NoCaching.bufferSupplier
)).thenReturn(future)
createKafkaApis().handleJoinGroupRequest(
requestChannelRequest,
RequestLocal.NoCaching
)
val joinGroupResponse = new JoinGroupResponseData()
.setErrorCode(Errors.INCONSISTENT_GROUP_PROTOCOL.code)
.setMemberId("member")
.setProtocolName(null)
val expectedJoinGroupResponse = new JoinGroupResponseData()
.setErrorCode(Errors.INCONSISTENT_GROUP_PROTOCOL.code)
.setMemberId("member")
.setProtocolName(if (version >= 7) null else GroupCoordinator.NoProtocol)
future.complete(joinGroupResponse)
val capturedResponse = verifyNoThrottling(requestChannelRequest)
val response = capturedResponse.getValue.asInstanceOf[JoinGroupResponse]
assertEquals(expectedJoinGroupResponse, response.data)
}
@Test
def testJoinGroupWhenAnErrorOccurs(): Unit = {
for (version <- ApiKeys.JOIN_GROUP.oldestVersion to ApiKeys.JOIN_GROUP.latestVersion) {
testJoinGroupWhenAnErrorOccurs(version.asInstanceOf[Short])
}
}
def testHandleJoinGroupRequestFutureFailed(): Unit = {
val joinGroupRequest = new JoinGroupRequestData()
.setGroupId("group")
.setMemberId("member")
.setProtocolType("consumer")
.setRebalanceTimeoutMs(1000)
.setSessionTimeoutMs(2000)
def testJoinGroupWhenAnErrorOccurs(version: Short): Unit = {
reset(groupCoordinator, clientRequestQuotaManager, requestChannel, replicaManager)
val requestChannelRequest = buildRequest(new JoinGroupRequest.Builder(joinGroupRequest).build())
val groupId = "group"
val memberId = "member1"
val protocolType = "consumer"
val rebalanceTimeoutMs = 10
val sessionTimeoutMs = 5
val future = new CompletableFuture[JoinGroupResponseData]()
when(newGroupCoordinator.joinGroup(
requestChannelRequest.context,
joinGroupRequest,
RequestLocal.NoCaching.bufferSupplier
)).thenReturn(future)
val capturedCallback: ArgumentCaptor[JoinGroupCallback] = ArgumentCaptor.forClass(classOf[JoinGroupCallback])
val joinGroupRequest = new JoinGroupRequest.Builder(
new JoinGroupRequestData()
.setGroupId(groupId)
.setMemberId(memberId)
.setProtocolType(protocolType)
.setRebalanceTimeoutMs(rebalanceTimeoutMs)
.setSessionTimeoutMs(sessionTimeoutMs)
).build(version)
val requestChannelRequest = buildRequest(joinGroupRequest)
createKafkaApis().handleJoinGroupRequest(requestChannelRequest, RequestLocal.withThreadConfinedCaching)
verify(groupCoordinator).handleJoinGroup(
ArgumentMatchers.eq(groupId),
ArgumentMatchers.eq(memberId),
ArgumentMatchers.eq(None),
ArgumentMatchers.eq(if (version >= 4) true else false),
ArgumentMatchers.eq(if (version >= 9) true else false),
ArgumentMatchers.eq(clientId),
ArgumentMatchers.eq(InetAddress.getLocalHost.toString),
ArgumentMatchers.eq(if (version >= 1) rebalanceTimeoutMs else sessionTimeoutMs),
ArgumentMatchers.eq(sessionTimeoutMs),
ArgumentMatchers.eq(protocolType),
ArgumentMatchers.eq(List.empty),
capturedCallback.capture(),
any(),
any()
createKafkaApis().handleJoinGroupRequest(
requestChannelRequest,
RequestLocal.NoCaching
)
capturedCallback.getValue.apply(JoinGroupResult(memberId, Errors.INCONSISTENT_GROUP_PROTOCOL))
future.completeExceptionally(Errors.REQUEST_TIMED_OUT.exception)
val capturedResponse = verifyNoThrottling(requestChannelRequest)
val response = capturedResponse.getValue.asInstanceOf[JoinGroupResponse]
assertEquals(Errors.INCONSISTENT_GROUP_PROTOCOL, response.error)
assertEquals(0, response.data.members.size)
assertEquals(memberId, response.data.memberId)
assertEquals(GroupCoordinator.NoGeneration, response.data.generationId)
assertEquals(GroupCoordinator.NoLeader, response.data.leader)
assertNull(response.data.protocolType)
if (version >= 7) {
assertNull(response.data.protocolName)
} else {
assertEquals(GroupCoordinator.NoProtocol, response.data.protocolName)
}
assertEquals(Errors.REQUEST_TIMED_OUT, response.error)
}
@Test
def testJoinGroupProtocolType(): Unit = {
for (version <- ApiKeys.JOIN_GROUP.oldestVersion to ApiKeys.JOIN_GROUP.latestVersion) {
testJoinGroupProtocolType(version.asInstanceOf[Short])
}
}
def testHandleJoinGroupRequestAuthorizationFailed(): Unit = {
val joinGroupRequest = new JoinGroupRequestData()
.setGroupId("group")
.setMemberId("member")
.setProtocolType("consumer")
.setRebalanceTimeoutMs(1000)
.setSessionTimeoutMs(2000)
def testJoinGroupProtocolType(version: Short): Unit = {
reset(groupCoordinator, clientRequestQuotaManager, requestChannel, replicaManager)
val requestChannelRequest = buildRequest(new JoinGroupRequest.Builder(joinGroupRequest).build())
val groupId = "group"
val memberId = "member1"
val protocolType = "consumer"
val protocolName = "range"
val rebalanceTimeoutMs = 10
val sessionTimeoutMs = 5
val authorizer: Authorizer = mock(classOf[Authorizer])
when(authorizer.authorize(any[RequestContext], any[util.List[Action]]))
.thenReturn(Seq(AuthorizationResult.DENIED).asJava)
val capturedCallback: ArgumentCaptor[JoinGroupCallback] = ArgumentCaptor.forClass(classOf[JoinGroupCallback])
val joinGroupRequest = new JoinGroupRequest.Builder(
new JoinGroupRequestData()
.setGroupId(groupId)
.setMemberId(memberId)
.setProtocolType(protocolType)
.setRebalanceTimeoutMs(rebalanceTimeoutMs)
.setSessionTimeoutMs(sessionTimeoutMs)
).build(version)
val requestChannelRequest = buildRequest(joinGroupRequest)
createKafkaApis().handleJoinGroupRequest(requestChannelRequest, RequestLocal.withThreadConfinedCaching)
verify(groupCoordinator).handleJoinGroup(
ArgumentMatchers.eq(groupId),
ArgumentMatchers.eq(memberId),
ArgumentMatchers.eq(None),
ArgumentMatchers.eq(if (version >= 4) true else false),
ArgumentMatchers.eq(if (version >= 9) true else false),
ArgumentMatchers.eq(clientId),
ArgumentMatchers.eq(InetAddress.getLocalHost.toString),
ArgumentMatchers.eq(if (version >= 1) rebalanceTimeoutMs else sessionTimeoutMs),
ArgumentMatchers.eq(sessionTimeoutMs),
ArgumentMatchers.eq(protocolType),
ArgumentMatchers.eq(List.empty),
capturedCallback.capture(),
any(),
any()
createKafkaApis(authorizer = Some(authorizer)).handleJoinGroupRequest(
requestChannelRequest,
RequestLocal.NoCaching
)
capturedCallback.getValue.apply(JoinGroupResult(
members = List.empty,
memberId = memberId,
generationId = 0,
protocolType = Some(protocolType),
protocolName = Some(protocolName),
leaderId = memberId,
skipAssignment = true,
error = Errors.NONE
))
val capturedResponse = verifyNoThrottling(requestChannelRequest)
val response = capturedResponse.getValue.asInstanceOf[JoinGroupResponse]
assertEquals(Errors.GROUP_AUTHORIZATION_FAILED, response.error)
}
assertEquals(Errors.NONE, response.error)
assertEquals(0, response.data.members.size)
assertEquals(memberId, response.data.memberId)
assertEquals(0, response.data.generationId)
assertEquals(memberId, response.data.leader)
assertEquals(protocolName, response.data.protocolName)
assertEquals(protocolType, response.data.protocolType)
assertTrue(response.data.skipAssignment)
@Test
def testHandleJoinGroupRequestUnexpectedException(): Unit = {
val joinGroupRequest = new JoinGroupRequestData()
.setGroupId("group")
.setMemberId("member")
.setProtocolType("consumer")
.setRebalanceTimeoutMs(1000)
.setSessionTimeoutMs(2000)
val requestChannelRequest = buildRequest(new JoinGroupRequest.Builder(joinGroupRequest).build())
val future = new CompletableFuture[JoinGroupResponseData]()
when(newGroupCoordinator.joinGroup(
requestChannelRequest.context,
joinGroupRequest,
RequestLocal.NoCaching.bufferSupplier
)).thenReturn(future)
var response: JoinGroupResponse = null
when(requestChannel.sendResponse(any(), any(), any())).thenAnswer { _ =>
throw new Exception("Something went wrong")
}.thenAnswer { invocation =>
response = invocation.getArgument(1, classOf[JoinGroupResponse])
}
createKafkaApis().handle(
requestChannelRequest,
RequestLocal.NoCaching
)
future.completeExceptionally(Errors.NOT_COORDINATOR.exception)
// The exception expected here is the one thrown by `sendResponse`. As
// `Exception` is not a Kafka errors, `UNKNOWN_SERVER_ERROR` is returned.
assertEquals(Errors.UNKNOWN_SERVER_ERROR, response.error)
}
@Test

View File

@ -14,8 +14,31 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group;
import org.apache.kafka.common.message.JoinGroupRequestData;
import org.apache.kafka.common.message.JoinGroupResponseData;
import org.apache.kafka.common.requests.RequestContext;
import org.apache.kafka.common.utils.BufferSupplier;
import java.util.concurrent.CompletableFuture;
public interface GroupCoordinator {
/**
* Join a Generic Group.
*
* @param context The request context.
* @param request The JoinGroupRequest data.
* @param bufferSupplier The buffer supplier tight to the request thread.
*
* @return A future yielding the response or an exception.
*/
CompletableFuture<JoinGroupResponseData> joinGroup(
RequestContext context,
JoinGroupRequestData request,
BufferSupplier bufferSupplier
);
}