From 64f3ee4c336339ee17163cc742442fef0db7b62c Mon Sep 17 00:00:00 2001 From: David Jacot Date: Mon, 4 Nov 2024 15:38:09 +0100 Subject: [PATCH] KAFKA-17593; [2/N] Update request validation & validate regex (#17651) This patch does two things: 1) Change the validation of the ConsumerGroupHeartbeat request to accept subscribed topic names and/or subscribed topic regex. At least of them must be set in the first request with epoch 0. 2) Validate the provided regular expression by compiling it. Co-authored-by: Lianet Magrans Reviewers: Jeff Kim , Lianet Magrans --- .../import-control-group-coordinator.xml | 1 + .../ConsumerGroupHeartbeatRequestTest.scala | 78 +++++++++++++++++++ .../group/GroupMetadataManager.java | 37 ++++++++- .../group/GroupMetadataManagerTest.java | 52 +++++++++++-- 4 files changed, 157 insertions(+), 11 deletions(-) diff --git a/checkstyle/import-control-group-coordinator.xml b/checkstyle/import-control-group-coordinator.xml index 55d265343af..0619ea444d5 100644 --- a/checkstyle/import-control-group-coordinator.xml +++ b/checkstyle/import-control-group-coordinator.xml @@ -70,6 +70,7 @@ + diff --git a/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala b/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala index d869109ab73..63004b99d2a 100644 --- a/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala @@ -164,6 +164,84 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) { assertEquals(-1, consumerGroupHeartbeatResponse.data.memberEpoch) } + @ClusterTest + def testConsumerGroupHeartbeatWithRegularExpression(): Unit = { + val raftCluster = cluster.asInstanceOf[RaftClusterInstance] + val admin = cluster.createAdminClient() + + // Creates the __consumer_offsets topics because it won't be created automatically + // in this test because it does not use FindCoordinator API. + TestUtils.createOffsetsTopicWithAdmin( + admin = admin, + brokers = raftCluster.brokers.values().asScala.toSeq, + controllers = raftCluster.controllers().values().asScala.toSeq + ) + + // Heartbeat request to join the group. Note that the member subscribes + // to an nonexistent topic. + val consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder( + new ConsumerGroupHeartbeatRequestData() + .setGroupId("grp") + .setMemberId(Uuid.randomUuid().toString) + .setMemberEpoch(0) + .setRebalanceTimeoutMs(5 * 60 * 1000) + .setSubscribedTopicRegex("foo") + .setTopicPartitions(List.empty.asJava), + true + ).build() + + // Send the request until receiving a successful response. There is a delay + // here because the group coordinator is loaded in the background. + var consumerGroupHeartbeatResponse: ConsumerGroupHeartbeatResponse = null + TestUtils.waitUntilTrue(() => { + consumerGroupHeartbeatResponse = connectAndReceive(consumerGroupHeartbeatRequest) + consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code + }, msg = s"Could not join the group successfully. Last response $consumerGroupHeartbeatResponse.") + + // Verify the response. + assertNotNull(consumerGroupHeartbeatResponse.data.memberId) + assertEquals(1, consumerGroupHeartbeatResponse.data.memberEpoch) + assertEquals(new ConsumerGroupHeartbeatResponseData.Assignment(), consumerGroupHeartbeatResponse.data.assignment) + } + + @ClusterTest + def testConsumerGroupHeartbeatWithInvalidRegularExpression(): Unit = { + val raftCluster = cluster.asInstanceOf[RaftClusterInstance] + val admin = cluster.createAdminClient() + + // Creates the __consumer_offsets topics because it won't be created automatically + // in this test because it does not use FindCoordinator API. + TestUtils.createOffsetsTopicWithAdmin( + admin = admin, + brokers = raftCluster.brokers.values().asScala.toSeq, + controllers = raftCluster.controllers().values().asScala.toSeq + ) + + // Heartbeat request to join the group. Note that the member subscribes + // to an nonexistent topic. + val consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder( + new ConsumerGroupHeartbeatRequestData() + .setGroupId("grp") + .setMemberId(Uuid.randomUuid().toString) + .setMemberEpoch(0) + .setRebalanceTimeoutMs(5 * 60 * 1000) + .setSubscribedTopicRegex("[") + .setTopicPartitions(List.empty.asJava), + true + ).build() + + // Send the request until receiving a successful response. There is a delay + // here because the group coordinator is loaded in the background. + var consumerGroupHeartbeatResponse: ConsumerGroupHeartbeatResponse = null + TestUtils.waitUntilTrue(() => { + consumerGroupHeartbeatResponse = connectAndReceive(consumerGroupHeartbeatRequest) + consumerGroupHeartbeatResponse.data.errorCode == Errors.INVALID_REGULAR_EXPRESSION.code + }, msg = s"Did not receive the expected error. Last response $consumerGroupHeartbeatResponse.") + + // Verify the response. + assertEquals(Errors.INVALID_REGULAR_EXPRESSION.code, consumerGroupHeartbeatResponse.data.errorCode) + } + @ClusterTest def testRejoiningStaticMemberGetsAssignmentsBackWhenNewGroupCoordinatorIsEnabled(): Unit = { val raftCluster = cluster.asInstanceOf[RaftClusterInstance] diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index 9a5c517a956..3435f3b3d2f 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -27,6 +27,7 @@ import org.apache.kafka.common.errors.GroupIdNotFoundException; import org.apache.kafka.common.errors.GroupMaxSizeReachedException; import org.apache.kafka.common.errors.IllegalGenerationException; import org.apache.kafka.common.errors.InconsistentGroupProtocolException; +import org.apache.kafka.common.errors.InvalidRegularExpression; import org.apache.kafka.common.errors.InvalidRequestException; import org.apache.kafka.common.errors.RebalanceInProgressException; import org.apache.kafka.common.errors.UnknownMemberIdException; @@ -118,6 +119,9 @@ import org.apache.kafka.timeline.SnapshotRegistry; import org.apache.kafka.timeline.TimelineHashMap; import org.apache.kafka.timeline.TimelineHashSet; +import com.google.re2j.Pattern; +import com.google.re2j.PatternSyntaxException; + import org.slf4j.Logger; import java.nio.ByteBuffer; @@ -1312,7 +1316,6 @@ public class GroupMetadataManager { throwIfEmptyString(request.groupId(), "GroupId can't be empty."); throwIfEmptyString(request.instanceId(), "InstanceId can't be empty."); throwIfEmptyString(request.rackId(), "RackId can't be empty."); - throwIfNotNull(request.subscribedTopicRegex(), "SubscribedTopicRegex is not supported yet."); if (request.memberEpoch() == 0) { if (request.rebalanceTimeoutMs() == -1) { @@ -1321,8 +1324,10 @@ public class GroupMetadataManager { if (request.topicPartitions() == null || !request.topicPartitions().isEmpty()) { throw new InvalidRequestException("TopicPartitions must be empty when (re-)joining."); } - if (request.subscribedTopicNames() == null || request.subscribedTopicNames().isEmpty()) { - throw new InvalidRequestException("SubscribedTopicNames must be set in first request."); + boolean hasSubscribedTopicNames = request.subscribedTopicNames() != null && !request.subscribedTopicNames().isEmpty(); + boolean hasSubscribedTopicRegex = request.subscribedTopicRegex() != null && !request.subscribedTopicRegex().isEmpty(); + if (!hasSubscribedTopicNames && !hasSubscribedTopicRegex) { + throw new InvalidRequestException("SubscribedTopicNames or SubscribedTopicRegex must be set in first request."); } } else if (request.memberEpoch() == LEAVE_GROUP_STATIC_MEMBER_EPOCH) { throwIfNull(request.instanceId(), "InstanceId can't be null."); @@ -1642,6 +1647,24 @@ public class GroupMetadataManager { } } + /** + * Validates if the provided regular expression is valid. + * + * @param regex The regular expression to validate. + * @throws InvalidRegularExpression if the regular expression is invalid. + */ + private static void throwIfRegularExpressionIsInvalid( + String regex + ) throws InvalidRegularExpression { + try { + Pattern.compile(regex); + } catch (PatternSyntaxException ex) { + throw new InvalidRegularExpression( + String.format("SubscribedTopicRegex `%s` is not a valid regular expression: %s.", + regex, ex.getDescription())); + } + } + /** * Deserialize the subscription in JoinGroupRequestProtocolCollection. * All the protocols have the same subscription, so the method picks a random one. @@ -2390,13 +2413,14 @@ public class GroupMetadataManager { * @param records The list to accumulate any new records. * @return A boolean indicating whether the updatedMember has a different * subscribedTopicNames/subscribedTopicRegex from the old member. + * @throws InvalidRegularExpression if the regular expression is invalid. */ private boolean hasMemberSubscriptionChanged( String groupId, ConsumerGroupMember member, ConsumerGroupMember updatedMember, List records - ) { + ) throws InvalidRegularExpression { String memberId = updatedMember.memberId(); if (!updatedMember.equals(member)) { records.add(newConsumerGroupMemberSubscriptionRecord(groupId, updatedMember)); @@ -2410,6 +2434,11 @@ public class GroupMetadataManager { if (!updatedMember.subscribedTopicRegex().equals(member.subscribedTopicRegex())) { log.debug("[GroupId {}] Member {} updated its subscribed regex to: {}.", groupId, memberId, updatedMember.subscribedTopicRegex()); + // If the regular expression has changed, we compile it to ensure that + // its syntax is valid. + if (updatedMember.subscribedTopicRegex() != null) { + throwIfRegularExpressionIsInvalid(updatedMember.subscribedTopicRegex()); + } return true; } } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java index 5d1962da233..b912b4f18de 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java @@ -27,6 +27,7 @@ import org.apache.kafka.common.errors.GroupIdNotFoundException; import org.apache.kafka.common.errors.GroupMaxSizeReachedException; import org.apache.kafka.common.errors.IllegalGenerationException; import org.apache.kafka.common.errors.InconsistentGroupProtocolException; +import org.apache.kafka.common.errors.InvalidRegularExpression; import org.apache.kafka.common.errors.InvalidRequestException; import org.apache.kafka.common.errors.NotLeaderOrFollowerException; import org.apache.kafka.common.errors.RebalanceInProgressException; @@ -206,7 +207,7 @@ public class GroupMetadataManagerTest { .setRebalanceTimeoutMs(5000))); assertEquals("TopicPartitions must be empty when (re-)joining.", ex.getMessage()); - // SubscribedTopicNames must be present and empty in the first request (epoch == 0). + // SubscribedTopicNames or SubscribedTopicRegex must be present and non-empty in the first request (epoch == 0). ex = assertThrows(InvalidRequestException.class, () -> context.consumerGroupHeartbeat( new ConsumerGroupHeartbeatRequestData() .setMemberId(memberId) @@ -214,7 +215,7 @@ public class GroupMetadataManagerTest { .setMemberEpoch(0) .setRebalanceTimeoutMs(5000) .setTopicPartitions(Collections.emptyList()))); - assertEquals("SubscribedTopicNames must be set in first request.", ex.getMessage()); + assertEquals("SubscribedTopicNames or SubscribedTopicRegex must be set in first request.", ex.getMessage()); // InstanceId must be non-empty if provided in all requests. ex = assertThrows(InvalidRequestException.class, () -> context.consumerGroupHeartbeat( @@ -257,20 +258,57 @@ public class GroupMetadataManagerTest { @Test public void testConsumerHeartbeatRegexValidation() { + String memberId = Uuid.randomUuid().toString(); MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + assignor.prepareGroupAssignment(new GroupAssignment(Collections.emptyMap())); GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() .withConsumerGroupAssignors(Collections.singletonList(assignor)) .build(); - Exception ex; - // Regex not supported for now. This test will evolve to actually validate the regex when it's supported - ex = assertThrows(InvalidRequestException.class, () -> context.consumerGroupHeartbeat( + + // Subscribing with an invalid regular expression fails. + Exception ex = assertThrows(InvalidRegularExpression.class, () -> context.consumerGroupHeartbeat( new ConsumerGroupHeartbeatRequestData() .setMemberId(Uuid.randomUuid().toString()) .setGroupId("foo") + .setMemberId(memberId) .setMemberEpoch(0) .setRebalanceTimeoutMs(5000) - .setSubscribedTopicRegex("t*"))); - assertEquals("SubscribedTopicRegex is not supported yet.", ex.getMessage()); + .setSubscribedTopicRegex("[") + .setTopicPartitions(Collections.emptyList()))); + assertEquals("SubscribedTopicRegex `[` is not a valid regular expression: missing closing ].", ex.getMessage()); + + // Subscribing with a valid regular expression succeeds. + CoordinatorResult result = context.consumerGroupHeartbeat( + new ConsumerGroupHeartbeatRequestData() + .setGroupId("foo") + .setMemberId(memberId) + .setMemberEpoch(0) + .setRebalanceTimeoutMs(5000) + .setSubscribedTopicRegex(".*") + .setTopicPartitions(Collections.emptyList())); + assertEquals(1, result.response().memberEpoch()); + + // Updating the subscription to an invalid regular expression fails. + assertThrows(InvalidRegularExpression.class, () -> context.consumerGroupHeartbeat( + new ConsumerGroupHeartbeatRequestData() + .setGroupId("foo") + .setMemberId(memberId) + .setMemberEpoch(1) + .setRebalanceTimeoutMs(5000) + .setSubscribedTopicRegex("[") + .setTopicPartitions(Collections.emptyList()))); + assertEquals("SubscribedTopicRegex `[` is not a valid regular expression: missing closing ].", ex.getMessage()); + + // Updating the subscription to topic names succeeds (checking when the regex becomes null). + result = context.consumerGroupHeartbeat( + new ConsumerGroupHeartbeatRequestData() + .setGroupId("foo") + .setMemberId(memberId) + .setMemberEpoch(1) + .setRebalanceTimeoutMs(5000) + .setSubscribedTopicNames(Collections.singletonList("foo")) + .setTopicPartitions(Collections.emptyList())); + assertEquals(2, result.response().memberEpoch()); } @Test