KAFKA-17028: FindCoordinator v6 initial implementation (#16440)

KIP-932 introduces FindCoordinator v6 for finding share coordinators. The initial implementation:

Checks that share coordinators are only requested with v6 or above.
Share coordinator requests are authorized as cluster actions (this is for inter-broker use only)
Responds with COORDINATOR_NOT_AVAILABLE because share coordinators are not yet available.
When the share coordinator code is delivered, the request handling will be gated by configurations which enable share groups and the share coordinator specifically. If these are not enabled, COORDINATOR_NOT_AVAILABLE is the response.

Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>
This commit is contained in:
Andrew Schofield 2024-06-25 16:43:16 +01:00 committed by GitHub
parent 0353337f5f
commit 63304fb6e5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 40 additions and 5 deletions

View File

@ -1498,7 +1498,10 @@ public class TransactionManager {
break;
case TRANSACTION:
transactionCoordinator = node;
break;
default:
log.error("Group coordinator lookup failed: Unexpected coordinator type in response");
fatalError(new IllegalStateException("Group coordinator lookup failed: Unexpected coordinator type in response"));
}
result.done();
log.info("Discovered {} coordinator {}", coordinatorType.toString().toLowerCase(Locale.ROOT), node);

View File

@ -28,6 +28,7 @@ public class Topic {
public static final String GROUP_METADATA_TOPIC_NAME = "__consumer_offsets";
public static final String TRANSACTION_STATE_TOPIC_NAME = "__transaction_state";
public static final String SHARE_GROUP_STATE_TOPIC_NAME = "__share_group_state";
public static final String CLUSTER_METADATA_TOPIC_NAME = "__cluster_metadata";
public static final TopicPartition CLUSTER_METADATA_TOPIC_PARTITION = new TopicPartition(
CLUSTER_METADATA_TOPIC_NAME,
@ -36,7 +37,7 @@ public class Topic {
public static final String LEGAL_CHARS = "[a-zA-Z0-9._-]";
private static final Set<String> INTERNAL_TOPICS = Collections.unmodifiableSet(
Utils.mkSet(GROUP_METADATA_TOPIC_NAME, TRANSACTION_STATE_TOPIC_NAME));
Utils.mkSet(GROUP_METADATA_TOPIC_NAME, TRANSACTION_STATE_TOPIC_NAME, SHARE_GROUP_STATE_TOPIC_NAME));
private static final int MAX_NAME_LENGTH = 249;

View File

@ -116,7 +116,7 @@ public class FindCoordinatorRequest extends AbstractRequest {
}
public enum CoordinatorType {
GROUP((byte) 0), TRANSACTION((byte) 1);
GROUP((byte) 0), TRANSACTION((byte) 1), SHARE((byte) 2);
final byte id;
@ -134,6 +134,8 @@ public class FindCoordinatorRequest extends AbstractRequest {
return GROUP;
case 1:
return TRANSACTION;
case 2:
return SHARE;
default:
throw new InvalidRequestException("Unknown coordinator type received: " + id);
}

View File

@ -1681,6 +1681,11 @@ class KafkaApis(val requestChannel: RequestChannel,
else if (keyType == CoordinatorType.TRANSACTION.id &&
!authHelper.authorize(request.context, DESCRIBE, TRANSACTIONAL_ID, key))
(Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED, Node.noNode)
else if (keyType == CoordinatorType.SHARE.id && request.context.apiVersion < 6)
(Errors.INVALID_REQUEST, Node.noNode)
else if (keyType == CoordinatorType.SHARE.id &&
!authHelper.authorize(request.context, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME))
(Errors.CLUSTER_AUTHORIZATION_FAILED, Node.noNode)
else {
val (partition, internalTopicName) = CoordinatorType.forId(keyType) match {
case CoordinatorType.GROUP =>
@ -1688,6 +1693,10 @@ class KafkaApis(val requestChannel: RequestChannel,
case CoordinatorType.TRANSACTION =>
(txnCoordinator.partitionFor(key), TRANSACTION_STATE_TOPIC_NAME)
case CoordinatorType.SHARE =>
// When share coordinator support is implemented in KIP-932, a proper check will go here
return (Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode)
}
val topicMetadata = metadataCache.getTopicMetadata(Set(internalTopicName), request.context.listenerName)

View File

@ -1170,8 +1170,19 @@ class KafkaApisTest extends Logging {
testFindCoordinatorWithTopicCreation(CoordinatorType.TRANSACTION, hasEnoughLiveBrokers = false, version = 3)
}
@Test
def testFindCoordinatorTooOldForShareStateTopic(): Unit = {
testFindCoordinatorWithTopicCreation(CoordinatorType.SHARE, checkAutoCreateTopic = false, version = 5)
}
@Test
def testFindCoordinatorNoShareCoordinatorForShareStateTopic(): Unit = {
testFindCoordinatorWithTopicCreation(CoordinatorType.SHARE, checkAutoCreateTopic = false)
}
private def testFindCoordinatorWithTopicCreation(coordinatorType: CoordinatorType,
hasEnoughLiveBrokers: Boolean = true,
checkAutoCreateTopic: Boolean = true,
version: Short = ApiKeys.FIND_COORDINATOR.latestVersion): Unit = {
val authorizer: Authorizer = mock(classOf[Authorizer])
@ -1202,6 +1213,10 @@ class KafkaApisTest extends Logging {
authorizeResource(authorizer, AclOperation.DESCRIBE, ResourceType.TRANSACTIONAL_ID,
groupId, AuthorizationResult.ALLOWED)
Topic.TRANSACTION_STATE_TOPIC_NAME
case CoordinatorType.SHARE =>
authorizeResource(authorizer, AclOperation.CLUSTER_ACTION, ResourceType.CLUSTER,
Resource.CLUSTER_NAME, AuthorizationResult.ALLOWED)
Topic.SHARE_GROUP_STATE_TOPIC_NAME
case _ =>
throw new IllegalStateException(s"Unknown coordinator type $coordinatorType")
}
@ -1227,13 +1242,18 @@ class KafkaApisTest extends Logging {
kafkaApis.handleFindCoordinatorRequest(request)
val response = verifyNoThrottling[FindCoordinatorResponse](request)
if (version >= 4) {
if (coordinatorType == CoordinatorType.SHARE && version < 6) {
assertEquals(Errors.INVALID_REQUEST.code, response.data.coordinators.get(0).errorCode)
} else if (version >= 4) {
assertEquals(Errors.COORDINATOR_NOT_AVAILABLE.code, response.data.coordinators.get(0).errorCode)
assertEquals(groupId, response.data.coordinators.get(0).key)
} else {
assertEquals(Errors.COORDINATOR_NOT_AVAILABLE.code, response.data.errorCode)
assertTrue(capturedRequest.getValue.isEmpty)
}
if (checkAutoCreateTopic) {
assertTrue(capturedRequest.getValue.isEmpty)
}
assertTrue(capturedRequest.getValue.isEmpty)
}
@Test