diff --git a/clients/src/main/java/org/apache/kafka/common/errors/InvalidRegularExpression.java b/clients/src/main/java/org/apache/kafka/common/errors/InvalidRegularExpression.java new file mode 100644 index 00000000000..f1ea4d19abc --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/errors/InvalidRegularExpression.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.errors; + +/** + * Thrown when a regular expression received in a request is not valid. + */ +public class InvalidRegularExpression extends ApiException { + public InvalidRegularExpression(String message) { + super(message); + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java index b304d7e08ff..a80ec308ebb 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java @@ -67,6 +67,7 @@ import org.apache.kafka.common.errors.InvalidPrincipalTypeException; import org.apache.kafka.common.errors.InvalidProducerEpochException; import org.apache.kafka.common.errors.InvalidRecordStateException; import org.apache.kafka.common.errors.InvalidRegistrationException; +import org.apache.kafka.common.errors.InvalidRegularExpression; import org.apache.kafka.common.errors.InvalidReplicaAssignmentException; import org.apache.kafka.common.errors.InvalidReplicationFactorException; import org.apache.kafka.common.errors.InvalidRequestException; @@ -409,7 +410,8 @@ public enum Errors { FENCED_STATE_EPOCH(124, "The share coordinator rejected the request because the share-group state epoch did not match.", FencedStateEpochException::new), INVALID_VOTER_KEY(125, "The voter key doesn't match the receiving replica's key.", InvalidVoterKeyException::new), DUPLICATE_VOTER(126, "The voter is already part of the set of voters.", DuplicateVoterException::new), - VOTER_NOT_FOUND(127, "The voter is not part of the set of voters.", VoterNotFoundException::new); + VOTER_NOT_FOUND(127, "The voter is not part of the set of voters.", VoterNotFoundException::new), + INVALID_REGULAR_EXPRESSION(128, "The regular expression is not valid.", InvalidRegularExpression::new); private static final Logger log = LoggerFactory.getLogger(Errors.class); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ConsumerGroupHeartbeatResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ConsumerGroupHeartbeatResponse.java index 6f881cc8efa..76f89ed4df8 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ConsumerGroupHeartbeatResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ConsumerGroupHeartbeatResponse.java @@ -38,6 +38,7 @@ import java.util.Map; * - {@link Errors#UNSUPPORTED_ASSIGNOR} * - {@link Errors#UNRELEASED_INSTANCE_ID} * - {@link Errors#GROUP_MAX_SIZE_REACHED} + * - {@link Errors#INVALID_REGULAR_EXPRESSION} */ public class ConsumerGroupHeartbeatResponse extends AbstractResponse { private final ConsumerGroupHeartbeatResponseData data; diff --git a/clients/src/main/resources/common/message/ConsumerGroupHeartbeatRequest.json b/clients/src/main/resources/common/message/ConsumerGroupHeartbeatRequest.json index 71c6e2e2502..669e2547758 100644 --- a/clients/src/main/resources/common/message/ConsumerGroupHeartbeatRequest.json +++ b/clients/src/main/resources/common/message/ConsumerGroupHeartbeatRequest.json @@ -18,8 +18,10 @@ "type": "request", "listeners": ["zkBroker", "broker"], "name": "ConsumerGroupHeartbeatRequest", - "validVersions": "0", + // Version 1 adds SubscribedTopicRegex (KIP-848). + "validVersions": "0-1", "flexibleVersions": "0+", + "latestVersionUnstable": true, "fields": [ { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId", "about": "The group identifier." }, @@ -35,6 +37,8 @@ "about": "-1 if it didn't change since the last heartbeat; the maximum time in milliseconds that the coordinator will wait on the member to revoke its partitions otherwise." }, { "name": "SubscribedTopicNames", "type": "[]string", "versions": "0+", "nullableVersions": "0+", "default": "null", "entityType": "topicName", "about": "null if it didn't change since the last heartbeat; the subscribed topic names otherwise." }, + { "name": "SubscribedTopicRegex", "type": "string", "versions": "1+", "nullableVersions": "1+", "default": "null", + "about": "null if it didn't change since the last heartbeat; the subscribed topic regex otherwise" }, { "name": "ServerAssignor", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "null if not used or if it didn't change since the last heartbeat; the server side assignor to use otherwise." }, { "name": "TopicPartitions", "type": "[]TopicPartitions", "versions": "0+", "nullableVersions": "0+", "default": "null", diff --git a/clients/src/main/resources/common/message/ConsumerGroupHeartbeatResponse.json b/clients/src/main/resources/common/message/ConsumerGroupHeartbeatResponse.json index fb55f80bd40..cda757cb32f 100644 --- a/clients/src/main/resources/common/message/ConsumerGroupHeartbeatResponse.json +++ b/clients/src/main/resources/common/message/ConsumerGroupHeartbeatResponse.json @@ -17,7 +17,7 @@ "apiKey": 68, "type": "response", "name": "ConsumerGroupHeartbeatResponse", - "validVersions": "0", + "validVersions": "0-1", "flexibleVersions": "0+", // Supported errors: // - GROUP_AUTHORIZATION_FAILED (version 0+) @@ -30,6 +30,7 @@ // - UNSUPPORTED_ASSIGNOR (version 0+) // - UNRELEASED_INSTANCE_ID (version 0+) // - GROUP_MAX_SIZE_REACHED (version 0+) + // - INVALID_SUBSCRIPTION_REGEX (version 1+) "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index 2a48c87c32d..ecf7efaad49 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -1280,6 +1280,7 @@ public class GroupMetadataManager { throwIfEmptyString(request.groupId(), "GroupId can't be empty."); throwIfEmptyString(request.instanceId(), "InstanceId can't be empty."); throwIfEmptyString(request.rackId(), "RackId can't be empty."); + throwIfNotNull(request.subscribedTopicRegex(), "SubscribedTopicRegex is not supported yet."); if (request.memberEpoch() > 0 || request.memberEpoch() == LEAVE_GROUP_MEMBER_EPOCH) { throwIfEmptyString(request.memberId(), "MemberId can't be empty."); @@ -1685,7 +1686,9 @@ public class GroupMetadataManager { * @param clientId The client id. * @param clientHost The client host. * @param subscribedTopicNames The list of subscribed topic names from the request - * of null. + * or null. + * @param subscribedTopicRegex The regular expression based subscription from the request + * or null. * @param assignorName The assignor name from the request or null. * @param ownedTopicPartitions The list of owned partitions from the request or null. * @@ -1702,6 +1705,7 @@ public class GroupMetadataManager { String clientId, String clientHost, List subscribedTopicNames, + String subscribedTopicRegex, String assignorName, List ownedTopicPartitions ) throws ApiException { @@ -1749,6 +1753,7 @@ public class GroupMetadataManager { .maybeUpdateRebalanceTimeoutMs(ofSentinel(rebalanceTimeoutMs)) .maybeUpdateServerAssignorName(Optional.ofNullable(assignorName)) .maybeUpdateSubscribedTopicNames(Optional.ofNullable(subscribedTopicNames)) + .maybeUpdateSubscribedTopicRegex(Optional.ofNullable(subscribedTopicRegex)) .setClientId(clientId) .setClientHost(clientHost) .setClassicMemberMetadata(null) @@ -3159,6 +3164,7 @@ public class GroupMetadataManager { context.clientId(), context.clientAddress.toString(), request.subscribedTopicNames(), + request.subscribedTopicRegex(), request.serverAssignor(), request.topicPartitions() ); diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java index 2a56cd912cd..653662b859b 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java @@ -248,6 +248,23 @@ public class GroupMetadataManagerTest { assertEquals("InstanceId can't be null.", ex.getMessage()); } + @Test + public void testConsumerHeartbeatRegexValidation() { + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withConsumerGroupAssignors(Collections.singletonList(assignor)) + .build(); + Exception ex; + // Regex not supported for now. This test will evolve to actually validate the regex when it's supported + ex = assertThrows(InvalidRequestException.class, () -> context.consumerGroupHeartbeat( + new ConsumerGroupHeartbeatRequestData() + .setGroupId("foo") + .setMemberEpoch(0) + .setRebalanceTimeoutMs(5000) + .setSubscribedTopicRegex("t*"))); + assertEquals("SubscribedTopicRegex is not supported yet.", ex.getMessage()); + } + @Test public void testMemberIdGeneration() { MockPartitionAssignor assignor = new MockPartitionAssignor("range");