mirror of https://github.com/apache/kafka.git
KAFKA-17593; [2/N] Update request validation & validate regex (#17651)
This patch does two things: 1) Change the validation of the ConsumerGroupHeartbeat request to accept subscribed topic names and/or subscribed topic regex. At least of them must be set in the first request with epoch 0. 2) Validate the provided regular expression by compiling it. Co-authored-by: Lianet Magrans <lmagrans@confluent.io> Reviewers: Jeff Kim <jeff.kim@confluent.io>, Lianet Magrans <lmagrans@confluent.io>
This commit is contained in:
parent
8eb8301294
commit
64f3ee4c33
|
@ -70,6 +70,7 @@
|
|||
<allow pkg="org.apache.kafka.timeline" />
|
||||
<allow pkg="org.apache.kafka.coordinator.common" />
|
||||
<allow pkg="org.apache.kafka.coordinator.common.runtime" />
|
||||
<allow pkg="com.google.re2j" />
|
||||
<subpackage name="metrics">
|
||||
<allow pkg="com.yammer.metrics"/>
|
||||
<allow pkg="org.HdrHistogram" />
|
||||
|
|
|
@ -164,6 +164,84 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) {
|
|||
assertEquals(-1, consumerGroupHeartbeatResponse.data.memberEpoch)
|
||||
}
|
||||
|
||||
@ClusterTest
|
||||
def testConsumerGroupHeartbeatWithRegularExpression(): Unit = {
|
||||
val raftCluster = cluster.asInstanceOf[RaftClusterInstance]
|
||||
val admin = cluster.createAdminClient()
|
||||
|
||||
// Creates the __consumer_offsets topics because it won't be created automatically
|
||||
// in this test because it does not use FindCoordinator API.
|
||||
TestUtils.createOffsetsTopicWithAdmin(
|
||||
admin = admin,
|
||||
brokers = raftCluster.brokers.values().asScala.toSeq,
|
||||
controllers = raftCluster.controllers().values().asScala.toSeq
|
||||
)
|
||||
|
||||
// Heartbeat request to join the group. Note that the member subscribes
|
||||
// to an nonexistent topic.
|
||||
val consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder(
|
||||
new ConsumerGroupHeartbeatRequestData()
|
||||
.setGroupId("grp")
|
||||
.setMemberId(Uuid.randomUuid().toString)
|
||||
.setMemberEpoch(0)
|
||||
.setRebalanceTimeoutMs(5 * 60 * 1000)
|
||||
.setSubscribedTopicRegex("foo")
|
||||
.setTopicPartitions(List.empty.asJava),
|
||||
true
|
||||
).build()
|
||||
|
||||
// Send the request until receiving a successful response. There is a delay
|
||||
// here because the group coordinator is loaded in the background.
|
||||
var consumerGroupHeartbeatResponse: ConsumerGroupHeartbeatResponse = null
|
||||
TestUtils.waitUntilTrue(() => {
|
||||
consumerGroupHeartbeatResponse = connectAndReceive(consumerGroupHeartbeatRequest)
|
||||
consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code
|
||||
}, msg = s"Could not join the group successfully. Last response $consumerGroupHeartbeatResponse.")
|
||||
|
||||
// Verify the response.
|
||||
assertNotNull(consumerGroupHeartbeatResponse.data.memberId)
|
||||
assertEquals(1, consumerGroupHeartbeatResponse.data.memberEpoch)
|
||||
assertEquals(new ConsumerGroupHeartbeatResponseData.Assignment(), consumerGroupHeartbeatResponse.data.assignment)
|
||||
}
|
||||
|
||||
@ClusterTest
|
||||
def testConsumerGroupHeartbeatWithInvalidRegularExpression(): Unit = {
|
||||
val raftCluster = cluster.asInstanceOf[RaftClusterInstance]
|
||||
val admin = cluster.createAdminClient()
|
||||
|
||||
// Creates the __consumer_offsets topics because it won't be created automatically
|
||||
// in this test because it does not use FindCoordinator API.
|
||||
TestUtils.createOffsetsTopicWithAdmin(
|
||||
admin = admin,
|
||||
brokers = raftCluster.brokers.values().asScala.toSeq,
|
||||
controllers = raftCluster.controllers().values().asScala.toSeq
|
||||
)
|
||||
|
||||
// Heartbeat request to join the group. Note that the member subscribes
|
||||
// to an nonexistent topic.
|
||||
val consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder(
|
||||
new ConsumerGroupHeartbeatRequestData()
|
||||
.setGroupId("grp")
|
||||
.setMemberId(Uuid.randomUuid().toString)
|
||||
.setMemberEpoch(0)
|
||||
.setRebalanceTimeoutMs(5 * 60 * 1000)
|
||||
.setSubscribedTopicRegex("[")
|
||||
.setTopicPartitions(List.empty.asJava),
|
||||
true
|
||||
).build()
|
||||
|
||||
// Send the request until receiving a successful response. There is a delay
|
||||
// here because the group coordinator is loaded in the background.
|
||||
var consumerGroupHeartbeatResponse: ConsumerGroupHeartbeatResponse = null
|
||||
TestUtils.waitUntilTrue(() => {
|
||||
consumerGroupHeartbeatResponse = connectAndReceive(consumerGroupHeartbeatRequest)
|
||||
consumerGroupHeartbeatResponse.data.errorCode == Errors.INVALID_REGULAR_EXPRESSION.code
|
||||
}, msg = s"Did not receive the expected error. Last response $consumerGroupHeartbeatResponse.")
|
||||
|
||||
// Verify the response.
|
||||
assertEquals(Errors.INVALID_REGULAR_EXPRESSION.code, consumerGroupHeartbeatResponse.data.errorCode)
|
||||
}
|
||||
|
||||
@ClusterTest
|
||||
def testRejoiningStaticMemberGetsAssignmentsBackWhenNewGroupCoordinatorIsEnabled(): Unit = {
|
||||
val raftCluster = cluster.asInstanceOf[RaftClusterInstance]
|
||||
|
|
|
@ -27,6 +27,7 @@ import org.apache.kafka.common.errors.GroupIdNotFoundException;
|
|||
import org.apache.kafka.common.errors.GroupMaxSizeReachedException;
|
||||
import org.apache.kafka.common.errors.IllegalGenerationException;
|
||||
import org.apache.kafka.common.errors.InconsistentGroupProtocolException;
|
||||
import org.apache.kafka.common.errors.InvalidRegularExpression;
|
||||
import org.apache.kafka.common.errors.InvalidRequestException;
|
||||
import org.apache.kafka.common.errors.RebalanceInProgressException;
|
||||
import org.apache.kafka.common.errors.UnknownMemberIdException;
|
||||
|
@ -118,6 +119,9 @@ import org.apache.kafka.timeline.SnapshotRegistry;
|
|||
import org.apache.kafka.timeline.TimelineHashMap;
|
||||
import org.apache.kafka.timeline.TimelineHashSet;
|
||||
|
||||
import com.google.re2j.Pattern;
|
||||
import com.google.re2j.PatternSyntaxException;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
|
@ -1312,7 +1316,6 @@ public class GroupMetadataManager {
|
|||
throwIfEmptyString(request.groupId(), "GroupId can't be empty.");
|
||||
throwIfEmptyString(request.instanceId(), "InstanceId can't be empty.");
|
||||
throwIfEmptyString(request.rackId(), "RackId can't be empty.");
|
||||
throwIfNotNull(request.subscribedTopicRegex(), "SubscribedTopicRegex is not supported yet.");
|
||||
|
||||
if (request.memberEpoch() == 0) {
|
||||
if (request.rebalanceTimeoutMs() == -1) {
|
||||
|
@ -1321,8 +1324,10 @@ public class GroupMetadataManager {
|
|||
if (request.topicPartitions() == null || !request.topicPartitions().isEmpty()) {
|
||||
throw new InvalidRequestException("TopicPartitions must be empty when (re-)joining.");
|
||||
}
|
||||
if (request.subscribedTopicNames() == null || request.subscribedTopicNames().isEmpty()) {
|
||||
throw new InvalidRequestException("SubscribedTopicNames must be set in first request.");
|
||||
boolean hasSubscribedTopicNames = request.subscribedTopicNames() != null && !request.subscribedTopicNames().isEmpty();
|
||||
boolean hasSubscribedTopicRegex = request.subscribedTopicRegex() != null && !request.subscribedTopicRegex().isEmpty();
|
||||
if (!hasSubscribedTopicNames && !hasSubscribedTopicRegex) {
|
||||
throw new InvalidRequestException("SubscribedTopicNames or SubscribedTopicRegex must be set in first request.");
|
||||
}
|
||||
} else if (request.memberEpoch() == LEAVE_GROUP_STATIC_MEMBER_EPOCH) {
|
||||
throwIfNull(request.instanceId(), "InstanceId can't be null.");
|
||||
|
@ -1642,6 +1647,24 @@ public class GroupMetadataManager {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Validates if the provided regular expression is valid.
|
||||
*
|
||||
* @param regex The regular expression to validate.
|
||||
* @throws InvalidRegularExpression if the regular expression is invalid.
|
||||
*/
|
||||
private static void throwIfRegularExpressionIsInvalid(
|
||||
String regex
|
||||
) throws InvalidRegularExpression {
|
||||
try {
|
||||
Pattern.compile(regex);
|
||||
} catch (PatternSyntaxException ex) {
|
||||
throw new InvalidRegularExpression(
|
||||
String.format("SubscribedTopicRegex `%s` is not a valid regular expression: %s.",
|
||||
regex, ex.getDescription()));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Deserialize the subscription in JoinGroupRequestProtocolCollection.
|
||||
* All the protocols have the same subscription, so the method picks a random one.
|
||||
|
@ -2390,13 +2413,14 @@ public class GroupMetadataManager {
|
|||
* @param records The list to accumulate any new records.
|
||||
* @return A boolean indicating whether the updatedMember has a different
|
||||
* subscribedTopicNames/subscribedTopicRegex from the old member.
|
||||
* @throws InvalidRegularExpression if the regular expression is invalid.
|
||||
*/
|
||||
private boolean hasMemberSubscriptionChanged(
|
||||
String groupId,
|
||||
ConsumerGroupMember member,
|
||||
ConsumerGroupMember updatedMember,
|
||||
List<CoordinatorRecord> records
|
||||
) {
|
||||
) throws InvalidRegularExpression {
|
||||
String memberId = updatedMember.memberId();
|
||||
if (!updatedMember.equals(member)) {
|
||||
records.add(newConsumerGroupMemberSubscriptionRecord(groupId, updatedMember));
|
||||
|
@ -2410,6 +2434,11 @@ public class GroupMetadataManager {
|
|||
if (!updatedMember.subscribedTopicRegex().equals(member.subscribedTopicRegex())) {
|
||||
log.debug("[GroupId {}] Member {} updated its subscribed regex to: {}.",
|
||||
groupId, memberId, updatedMember.subscribedTopicRegex());
|
||||
// If the regular expression has changed, we compile it to ensure that
|
||||
// its syntax is valid.
|
||||
if (updatedMember.subscribedTopicRegex() != null) {
|
||||
throwIfRegularExpressionIsInvalid(updatedMember.subscribedTopicRegex());
|
||||
}
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -27,6 +27,7 @@ import org.apache.kafka.common.errors.GroupIdNotFoundException;
|
|||
import org.apache.kafka.common.errors.GroupMaxSizeReachedException;
|
||||
import org.apache.kafka.common.errors.IllegalGenerationException;
|
||||
import org.apache.kafka.common.errors.InconsistentGroupProtocolException;
|
||||
import org.apache.kafka.common.errors.InvalidRegularExpression;
|
||||
import org.apache.kafka.common.errors.InvalidRequestException;
|
||||
import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
|
||||
import org.apache.kafka.common.errors.RebalanceInProgressException;
|
||||
|
@ -206,7 +207,7 @@ public class GroupMetadataManagerTest {
|
|||
.setRebalanceTimeoutMs(5000)));
|
||||
assertEquals("TopicPartitions must be empty when (re-)joining.", ex.getMessage());
|
||||
|
||||
// SubscribedTopicNames must be present and empty in the first request (epoch == 0).
|
||||
// SubscribedTopicNames or SubscribedTopicRegex must be present and non-empty in the first request (epoch == 0).
|
||||
ex = assertThrows(InvalidRequestException.class, () -> context.consumerGroupHeartbeat(
|
||||
new ConsumerGroupHeartbeatRequestData()
|
||||
.setMemberId(memberId)
|
||||
|
@ -214,7 +215,7 @@ public class GroupMetadataManagerTest {
|
|||
.setMemberEpoch(0)
|
||||
.setRebalanceTimeoutMs(5000)
|
||||
.setTopicPartitions(Collections.emptyList())));
|
||||
assertEquals("SubscribedTopicNames must be set in first request.", ex.getMessage());
|
||||
assertEquals("SubscribedTopicNames or SubscribedTopicRegex must be set in first request.", ex.getMessage());
|
||||
|
||||
// InstanceId must be non-empty if provided in all requests.
|
||||
ex = assertThrows(InvalidRequestException.class, () -> context.consumerGroupHeartbeat(
|
||||
|
@ -257,20 +258,57 @@ public class GroupMetadataManagerTest {
|
|||
|
||||
@Test
|
||||
public void testConsumerHeartbeatRegexValidation() {
|
||||
String memberId = Uuid.randomUuid().toString();
|
||||
MockPartitionAssignor assignor = new MockPartitionAssignor("range");
|
||||
assignor.prepareGroupAssignment(new GroupAssignment(Collections.emptyMap()));
|
||||
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
|
||||
.withConsumerGroupAssignors(Collections.singletonList(assignor))
|
||||
.build();
|
||||
Exception ex;
|
||||
// Regex not supported for now. This test will evolve to actually validate the regex when it's supported
|
||||
ex = assertThrows(InvalidRequestException.class, () -> context.consumerGroupHeartbeat(
|
||||
|
||||
// Subscribing with an invalid regular expression fails.
|
||||
Exception ex = assertThrows(InvalidRegularExpression.class, () -> context.consumerGroupHeartbeat(
|
||||
new ConsumerGroupHeartbeatRequestData()
|
||||
.setMemberId(Uuid.randomUuid().toString())
|
||||
.setGroupId("foo")
|
||||
.setMemberId(memberId)
|
||||
.setMemberEpoch(0)
|
||||
.setRebalanceTimeoutMs(5000)
|
||||
.setSubscribedTopicRegex("t*")));
|
||||
assertEquals("SubscribedTopicRegex is not supported yet.", ex.getMessage());
|
||||
.setSubscribedTopicRegex("[")
|
||||
.setTopicPartitions(Collections.emptyList())));
|
||||
assertEquals("SubscribedTopicRegex `[` is not a valid regular expression: missing closing ].", ex.getMessage());
|
||||
|
||||
// Subscribing with a valid regular expression succeeds.
|
||||
CoordinatorResult<ConsumerGroupHeartbeatResponseData, CoordinatorRecord> result = context.consumerGroupHeartbeat(
|
||||
new ConsumerGroupHeartbeatRequestData()
|
||||
.setGroupId("foo")
|
||||
.setMemberId(memberId)
|
||||
.setMemberEpoch(0)
|
||||
.setRebalanceTimeoutMs(5000)
|
||||
.setSubscribedTopicRegex(".*")
|
||||
.setTopicPartitions(Collections.emptyList()));
|
||||
assertEquals(1, result.response().memberEpoch());
|
||||
|
||||
// Updating the subscription to an invalid regular expression fails.
|
||||
assertThrows(InvalidRegularExpression.class, () -> context.consumerGroupHeartbeat(
|
||||
new ConsumerGroupHeartbeatRequestData()
|
||||
.setGroupId("foo")
|
||||
.setMemberId(memberId)
|
||||
.setMemberEpoch(1)
|
||||
.setRebalanceTimeoutMs(5000)
|
||||
.setSubscribedTopicRegex("[")
|
||||
.setTopicPartitions(Collections.emptyList())));
|
||||
assertEquals("SubscribedTopicRegex `[` is not a valid regular expression: missing closing ].", ex.getMessage());
|
||||
|
||||
// Updating the subscription to topic names succeeds (checking when the regex becomes null).
|
||||
result = context.consumerGroupHeartbeat(
|
||||
new ConsumerGroupHeartbeatRequestData()
|
||||
.setGroupId("foo")
|
||||
.setMemberId(memberId)
|
||||
.setMemberEpoch(1)
|
||||
.setRebalanceTimeoutMs(5000)
|
||||
.setSubscribedTopicNames(Collections.singletonList("foo"))
|
||||
.setTopicPartitions(Collections.emptyList()));
|
||||
assertEquals(2, result.response().memberEpoch());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
Loading…
Reference in New Issue