diff --git a/checkstyle/import-control-group-coordinator.xml b/checkstyle/import-control-group-coordinator.xml
index 55d265343af..0619ea444d5 100644
--- a/checkstyle/import-control-group-coordinator.xml
+++ b/checkstyle/import-control-group-coordinator.xml
@@ -70,6 +70,7 @@
+
diff --git a/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala b/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
index d869109ab73..63004b99d2a 100644
--- a/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
@@ -164,6 +164,84 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) {
assertEquals(-1, consumerGroupHeartbeatResponse.data.memberEpoch)
}
+ @ClusterTest
+ def testConsumerGroupHeartbeatWithRegularExpression(): Unit = {
+ val raftCluster = cluster.asInstanceOf[RaftClusterInstance]
+ val admin = cluster.createAdminClient()
+
+ // Creates the __consumer_offsets topics because it won't be created automatically
+ // in this test because it does not use FindCoordinator API.
+ TestUtils.createOffsetsTopicWithAdmin(
+ admin = admin,
+ brokers = raftCluster.brokers.values().asScala.toSeq,
+ controllers = raftCluster.controllers().values().asScala.toSeq
+ )
+
+ // Heartbeat request to join the group. Note that the member subscribes
+ // to an nonexistent topic.
+ val consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId("grp")
+ .setMemberId(Uuid.randomUuid().toString)
+ .setMemberEpoch(0)
+ .setRebalanceTimeoutMs(5 * 60 * 1000)
+ .setSubscribedTopicRegex("foo")
+ .setTopicPartitions(List.empty.asJava),
+ true
+ ).build()
+
+ // Send the request until receiving a successful response. There is a delay
+ // here because the group coordinator is loaded in the background.
+ var consumerGroupHeartbeatResponse: ConsumerGroupHeartbeatResponse = null
+ TestUtils.waitUntilTrue(() => {
+ consumerGroupHeartbeatResponse = connectAndReceive(consumerGroupHeartbeatRequest)
+ consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code
+ }, msg = s"Could not join the group successfully. Last response $consumerGroupHeartbeatResponse.")
+
+ // Verify the response.
+ assertNotNull(consumerGroupHeartbeatResponse.data.memberId)
+ assertEquals(1, consumerGroupHeartbeatResponse.data.memberEpoch)
+ assertEquals(new ConsumerGroupHeartbeatResponseData.Assignment(), consumerGroupHeartbeatResponse.data.assignment)
+ }
+
+ @ClusterTest
+ def testConsumerGroupHeartbeatWithInvalidRegularExpression(): Unit = {
+ val raftCluster = cluster.asInstanceOf[RaftClusterInstance]
+ val admin = cluster.createAdminClient()
+
+ // Creates the __consumer_offsets topics because it won't be created automatically
+ // in this test because it does not use FindCoordinator API.
+ TestUtils.createOffsetsTopicWithAdmin(
+ admin = admin,
+ brokers = raftCluster.brokers.values().asScala.toSeq,
+ controllers = raftCluster.controllers().values().asScala.toSeq
+ )
+
+ // Heartbeat request to join the group. Note that the member subscribes
+ // to an nonexistent topic.
+ val consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId("grp")
+ .setMemberId(Uuid.randomUuid().toString)
+ .setMemberEpoch(0)
+ .setRebalanceTimeoutMs(5 * 60 * 1000)
+ .setSubscribedTopicRegex("[")
+ .setTopicPartitions(List.empty.asJava),
+ true
+ ).build()
+
+ // Send the request until receiving a successful response. There is a delay
+ // here because the group coordinator is loaded in the background.
+ var consumerGroupHeartbeatResponse: ConsumerGroupHeartbeatResponse = null
+ TestUtils.waitUntilTrue(() => {
+ consumerGroupHeartbeatResponse = connectAndReceive(consumerGroupHeartbeatRequest)
+ consumerGroupHeartbeatResponse.data.errorCode == Errors.INVALID_REGULAR_EXPRESSION.code
+ }, msg = s"Did not receive the expected error. Last response $consumerGroupHeartbeatResponse.")
+
+ // Verify the response.
+ assertEquals(Errors.INVALID_REGULAR_EXPRESSION.code, consumerGroupHeartbeatResponse.data.errorCode)
+ }
+
@ClusterTest
def testRejoiningStaticMemberGetsAssignmentsBackWhenNewGroupCoordinatorIsEnabled(): Unit = {
val raftCluster = cluster.asInstanceOf[RaftClusterInstance]
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 9a5c517a956..3435f3b3d2f 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -27,6 +27,7 @@ import org.apache.kafka.common.errors.GroupIdNotFoundException;
import org.apache.kafka.common.errors.GroupMaxSizeReachedException;
import org.apache.kafka.common.errors.IllegalGenerationException;
import org.apache.kafka.common.errors.InconsistentGroupProtocolException;
+import org.apache.kafka.common.errors.InvalidRegularExpression;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.RebalanceInProgressException;
import org.apache.kafka.common.errors.UnknownMemberIdException;
@@ -118,6 +119,9 @@ import org.apache.kafka.timeline.SnapshotRegistry;
import org.apache.kafka.timeline.TimelineHashMap;
import org.apache.kafka.timeline.TimelineHashSet;
+import com.google.re2j.Pattern;
+import com.google.re2j.PatternSyntaxException;
+
import org.slf4j.Logger;
import java.nio.ByteBuffer;
@@ -1312,7 +1316,6 @@ public class GroupMetadataManager {
throwIfEmptyString(request.groupId(), "GroupId can't be empty.");
throwIfEmptyString(request.instanceId(), "InstanceId can't be empty.");
throwIfEmptyString(request.rackId(), "RackId can't be empty.");
- throwIfNotNull(request.subscribedTopicRegex(), "SubscribedTopicRegex is not supported yet.");
if (request.memberEpoch() == 0) {
if (request.rebalanceTimeoutMs() == -1) {
@@ -1321,8 +1324,10 @@ public class GroupMetadataManager {
if (request.topicPartitions() == null || !request.topicPartitions().isEmpty()) {
throw new InvalidRequestException("TopicPartitions must be empty when (re-)joining.");
}
- if (request.subscribedTopicNames() == null || request.subscribedTopicNames().isEmpty()) {
- throw new InvalidRequestException("SubscribedTopicNames must be set in first request.");
+ boolean hasSubscribedTopicNames = request.subscribedTopicNames() != null && !request.subscribedTopicNames().isEmpty();
+ boolean hasSubscribedTopicRegex = request.subscribedTopicRegex() != null && !request.subscribedTopicRegex().isEmpty();
+ if (!hasSubscribedTopicNames && !hasSubscribedTopicRegex) {
+ throw new InvalidRequestException("SubscribedTopicNames or SubscribedTopicRegex must be set in first request.");
}
} else if (request.memberEpoch() == LEAVE_GROUP_STATIC_MEMBER_EPOCH) {
throwIfNull(request.instanceId(), "InstanceId can't be null.");
@@ -1642,6 +1647,24 @@ public class GroupMetadataManager {
}
}
+ /**
+ * Validates if the provided regular expression is valid.
+ *
+ * @param regex The regular expression to validate.
+ * @throws InvalidRegularExpression if the regular expression is invalid.
+ */
+ private static void throwIfRegularExpressionIsInvalid(
+ String regex
+ ) throws InvalidRegularExpression {
+ try {
+ Pattern.compile(regex);
+ } catch (PatternSyntaxException ex) {
+ throw new InvalidRegularExpression(
+ String.format("SubscribedTopicRegex `%s` is not a valid regular expression: %s.",
+ regex, ex.getDescription()));
+ }
+ }
+
/**
* Deserialize the subscription in JoinGroupRequestProtocolCollection.
* All the protocols have the same subscription, so the method picks a random one.
@@ -2390,13 +2413,14 @@ public class GroupMetadataManager {
* @param records The list to accumulate any new records.
* @return A boolean indicating whether the updatedMember has a different
* subscribedTopicNames/subscribedTopicRegex from the old member.
+ * @throws InvalidRegularExpression if the regular expression is invalid.
*/
private boolean hasMemberSubscriptionChanged(
String groupId,
ConsumerGroupMember member,
ConsumerGroupMember updatedMember,
List records
- ) {
+ ) throws InvalidRegularExpression {
String memberId = updatedMember.memberId();
if (!updatedMember.equals(member)) {
records.add(newConsumerGroupMemberSubscriptionRecord(groupId, updatedMember));
@@ -2410,6 +2434,11 @@ public class GroupMetadataManager {
if (!updatedMember.subscribedTopicRegex().equals(member.subscribedTopicRegex())) {
log.debug("[GroupId {}] Member {} updated its subscribed regex to: {}.",
groupId, memberId, updatedMember.subscribedTopicRegex());
+ // If the regular expression has changed, we compile it to ensure that
+ // its syntax is valid.
+ if (updatedMember.subscribedTopicRegex() != null) {
+ throwIfRegularExpressionIsInvalid(updatedMember.subscribedTopicRegex());
+ }
return true;
}
}
diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index 5d1962da233..b912b4f18de 100644
--- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -27,6 +27,7 @@ import org.apache.kafka.common.errors.GroupIdNotFoundException;
import org.apache.kafka.common.errors.GroupMaxSizeReachedException;
import org.apache.kafka.common.errors.IllegalGenerationException;
import org.apache.kafka.common.errors.InconsistentGroupProtocolException;
+import org.apache.kafka.common.errors.InvalidRegularExpression;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
import org.apache.kafka.common.errors.RebalanceInProgressException;
@@ -206,7 +207,7 @@ public class GroupMetadataManagerTest {
.setRebalanceTimeoutMs(5000)));
assertEquals("TopicPartitions must be empty when (re-)joining.", ex.getMessage());
- // SubscribedTopicNames must be present and empty in the first request (epoch == 0).
+ // SubscribedTopicNames or SubscribedTopicRegex must be present and non-empty in the first request (epoch == 0).
ex = assertThrows(InvalidRequestException.class, () -> context.consumerGroupHeartbeat(
new ConsumerGroupHeartbeatRequestData()
.setMemberId(memberId)
@@ -214,7 +215,7 @@ public class GroupMetadataManagerTest {
.setMemberEpoch(0)
.setRebalanceTimeoutMs(5000)
.setTopicPartitions(Collections.emptyList())));
- assertEquals("SubscribedTopicNames must be set in first request.", ex.getMessage());
+ assertEquals("SubscribedTopicNames or SubscribedTopicRegex must be set in first request.", ex.getMessage());
// InstanceId must be non-empty if provided in all requests.
ex = assertThrows(InvalidRequestException.class, () -> context.consumerGroupHeartbeat(
@@ -257,20 +258,57 @@ public class GroupMetadataManagerTest {
@Test
public void testConsumerHeartbeatRegexValidation() {
+ String memberId = Uuid.randomUuid().toString();
MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+ assignor.prepareGroupAssignment(new GroupAssignment(Collections.emptyMap()));
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.withConsumerGroupAssignors(Collections.singletonList(assignor))
.build();
- Exception ex;
- // Regex not supported for now. This test will evolve to actually validate the regex when it's supported
- ex = assertThrows(InvalidRequestException.class, () -> context.consumerGroupHeartbeat(
+
+ // Subscribing with an invalid regular expression fails.
+ Exception ex = assertThrows(InvalidRegularExpression.class, () -> context.consumerGroupHeartbeat(
new ConsumerGroupHeartbeatRequestData()
.setMemberId(Uuid.randomUuid().toString())
.setGroupId("foo")
+ .setMemberId(memberId)
.setMemberEpoch(0)
.setRebalanceTimeoutMs(5000)
- .setSubscribedTopicRegex("t*")));
- assertEquals("SubscribedTopicRegex is not supported yet.", ex.getMessage());
+ .setSubscribedTopicRegex("[")
+ .setTopicPartitions(Collections.emptyList())));
+ assertEquals("SubscribedTopicRegex `[` is not a valid regular expression: missing closing ].", ex.getMessage());
+
+ // Subscribing with a valid regular expression succeeds.
+ CoordinatorResult result = context.consumerGroupHeartbeat(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId("foo")
+ .setMemberId(memberId)
+ .setMemberEpoch(0)
+ .setRebalanceTimeoutMs(5000)
+ .setSubscribedTopicRegex(".*")
+ .setTopicPartitions(Collections.emptyList()));
+ assertEquals(1, result.response().memberEpoch());
+
+ // Updating the subscription to an invalid regular expression fails.
+ assertThrows(InvalidRegularExpression.class, () -> context.consumerGroupHeartbeat(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId("foo")
+ .setMemberId(memberId)
+ .setMemberEpoch(1)
+ .setRebalanceTimeoutMs(5000)
+ .setSubscribedTopicRegex("[")
+ .setTopicPartitions(Collections.emptyList())));
+ assertEquals("SubscribedTopicRegex `[` is not a valid regular expression: missing closing ].", ex.getMessage());
+
+ // Updating the subscription to topic names succeeds (checking when the regex becomes null).
+ result = context.consumerGroupHeartbeat(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId("foo")
+ .setMemberId(memberId)
+ .setMemberEpoch(1)
+ .setRebalanceTimeoutMs(5000)
+ .setSubscribedTopicNames(Collections.singletonList("foo"))
+ .setTopicPartitions(Collections.emptyList()));
+ assertEquals(2, result.response().memberEpoch());
}
@Test