KAFKA-17592; Support for SubscribedTopicsRegex in ConsumerGroupHeartbeat RPC (#17257)

This patch includes:
- Bump ConsumerGroupHeartbeatRequest version to include subscribedTopicRegex field
- Introduce new error code for InvalidRegularExpression 
- Bump ConsumerGroupHeartbeatResponse version to support new regex error
- Wire the new field into the GroupMetadataManager when processing HB

Reviewers: David Jacot <djacot@confluent.io>
This commit is contained in:
Lianet Magrans 2024-09-25 03:52:05 -04:00 committed by GitHub
parent bb97d63d41
commit ab0df20489
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 62 additions and 4 deletions

View File

@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.errors;
/**
* Thrown when a regular expression received in a request is not valid.
*/
public class InvalidRegularExpression extends ApiException {
public InvalidRegularExpression(String message) {
super(message);
}
}

View File

@ -67,6 +67,7 @@ import org.apache.kafka.common.errors.InvalidPrincipalTypeException;
import org.apache.kafka.common.errors.InvalidProducerEpochException;
import org.apache.kafka.common.errors.InvalidRecordStateException;
import org.apache.kafka.common.errors.InvalidRegistrationException;
import org.apache.kafka.common.errors.InvalidRegularExpression;
import org.apache.kafka.common.errors.InvalidReplicaAssignmentException;
import org.apache.kafka.common.errors.InvalidReplicationFactorException;
import org.apache.kafka.common.errors.InvalidRequestException;
@ -409,7 +410,8 @@ public enum Errors {
FENCED_STATE_EPOCH(124, "The share coordinator rejected the request because the share-group state epoch did not match.", FencedStateEpochException::new),
INVALID_VOTER_KEY(125, "The voter key doesn't match the receiving replica's key.", InvalidVoterKeyException::new),
DUPLICATE_VOTER(126, "The voter is already part of the set of voters.", DuplicateVoterException::new),
VOTER_NOT_FOUND(127, "The voter is not part of the set of voters.", VoterNotFoundException::new);
VOTER_NOT_FOUND(127, "The voter is not part of the set of voters.", VoterNotFoundException::new),
INVALID_REGULAR_EXPRESSION(128, "The regular expression is not valid.", InvalidRegularExpression::new);
private static final Logger log = LoggerFactory.getLogger(Errors.class);

View File

@ -38,6 +38,7 @@ import java.util.Map;
* - {@link Errors#UNSUPPORTED_ASSIGNOR}
* - {@link Errors#UNRELEASED_INSTANCE_ID}
* - {@link Errors#GROUP_MAX_SIZE_REACHED}
* - {@link Errors#INVALID_REGULAR_EXPRESSION}
*/
public class ConsumerGroupHeartbeatResponse extends AbstractResponse {
private final ConsumerGroupHeartbeatResponseData data;

View File

@ -18,8 +18,10 @@
"type": "request",
"listeners": ["zkBroker", "broker"],
"name": "ConsumerGroupHeartbeatRequest",
"validVersions": "0",
// Version 1 adds SubscribedTopicRegex (KIP-848).
"validVersions": "0-1",
"flexibleVersions": "0+",
"latestVersionUnstable": true,
"fields": [
{ "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
"about": "The group identifier." },
@ -35,6 +37,8 @@
"about": "-1 if it didn't change since the last heartbeat; the maximum time in milliseconds that the coordinator will wait on the member to revoke its partitions otherwise." },
{ "name": "SubscribedTopicNames", "type": "[]string", "versions": "0+", "nullableVersions": "0+", "default": "null", "entityType": "topicName",
"about": "null if it didn't change since the last heartbeat; the subscribed topic names otherwise." },
{ "name": "SubscribedTopicRegex", "type": "string", "versions": "1+", "nullableVersions": "1+", "default": "null",
"about": "null if it didn't change since the last heartbeat; the subscribed topic regex otherwise" },
{ "name": "ServerAssignor", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
"about": "null if not used or if it didn't change since the last heartbeat; the server side assignor to use otherwise." },
{ "name": "TopicPartitions", "type": "[]TopicPartitions", "versions": "0+", "nullableVersions": "0+", "default": "null",

View File

@ -17,7 +17,7 @@
"apiKey": 68,
"type": "response",
"name": "ConsumerGroupHeartbeatResponse",
"validVersions": "0",
"validVersions": "0-1",
"flexibleVersions": "0+",
// Supported errors:
// - GROUP_AUTHORIZATION_FAILED (version 0+)
@ -30,6 +30,7 @@
// - UNSUPPORTED_ASSIGNOR (version 0+)
// - UNRELEASED_INSTANCE_ID (version 0+)
// - GROUP_MAX_SIZE_REACHED (version 0+)
// - INVALID_SUBSCRIPTION_REGEX (version 1+)
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },

View File

@ -1280,6 +1280,7 @@ public class GroupMetadataManager {
throwIfEmptyString(request.groupId(), "GroupId can't be empty.");
throwIfEmptyString(request.instanceId(), "InstanceId can't be empty.");
throwIfEmptyString(request.rackId(), "RackId can't be empty.");
throwIfNotNull(request.subscribedTopicRegex(), "SubscribedTopicRegex is not supported yet.");
if (request.memberEpoch() > 0 || request.memberEpoch() == LEAVE_GROUP_MEMBER_EPOCH) {
throwIfEmptyString(request.memberId(), "MemberId can't be empty.");
@ -1685,7 +1686,9 @@ public class GroupMetadataManager {
* @param clientId The client id.
* @param clientHost The client host.
* @param subscribedTopicNames The list of subscribed topic names from the request
* of null.
* or null.
* @param subscribedTopicRegex The regular expression based subscription from the request
* or null.
* @param assignorName The assignor name from the request or null.
* @param ownedTopicPartitions The list of owned partitions from the request or null.
*
@ -1702,6 +1705,7 @@ public class GroupMetadataManager {
String clientId,
String clientHost,
List<String> subscribedTopicNames,
String subscribedTopicRegex,
String assignorName,
List<ConsumerGroupHeartbeatRequestData.TopicPartitions> ownedTopicPartitions
) throws ApiException {
@ -1749,6 +1753,7 @@ public class GroupMetadataManager {
.maybeUpdateRebalanceTimeoutMs(ofSentinel(rebalanceTimeoutMs))
.maybeUpdateServerAssignorName(Optional.ofNullable(assignorName))
.maybeUpdateSubscribedTopicNames(Optional.ofNullable(subscribedTopicNames))
.maybeUpdateSubscribedTopicRegex(Optional.ofNullable(subscribedTopicRegex))
.setClientId(clientId)
.setClientHost(clientHost)
.setClassicMemberMetadata(null)
@ -3159,6 +3164,7 @@ public class GroupMetadataManager {
context.clientId(),
context.clientAddress.toString(),
request.subscribedTopicNames(),
request.subscribedTopicRegex(),
request.serverAssignor(),
request.topicPartitions()
);

View File

@ -248,6 +248,23 @@ public class GroupMetadataManagerTest {
assertEquals("InstanceId can't be null.", ex.getMessage());
}
@Test
public void testConsumerHeartbeatRegexValidation() {
MockPartitionAssignor assignor = new MockPartitionAssignor("range");
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.withConsumerGroupAssignors(Collections.singletonList(assignor))
.build();
Exception ex;
// Regex not supported for now. This test will evolve to actually validate the regex when it's supported
ex = assertThrows(InvalidRequestException.class, () -> context.consumerGroupHeartbeat(
new ConsumerGroupHeartbeatRequestData()
.setGroupId("foo")
.setMemberEpoch(0)
.setRebalanceTimeoutMs(5000)
.setSubscribedTopicRegex("t*")));
assertEquals("SubscribedTopicRegex is not supported yet.", ex.getMessage());
}
@Test
public void testMemberIdGeneration() {
MockPartitionAssignor assignor = new MockPartitionAssignor("range");