KAFKA-18961: Time-based refresh for server-side RE2J regex (#19904)

Consumers can subscribe to an RE2J SubscriptionPattern that will be
resolved and maintained on the server-side (KIP-848). Currently, those
regexes are refreshed on the coordinator when a consumer subscribes to a
new regex, or if there is a new topic metadata image (to ensure regex
resolution stays up-to-date with existing topics)

But with
[KAFKA-18813](https://issues.apache.org/jira/browse/KAFKA-18813), the
topics matching a regex are filtered based on ACLs. This generates a new
situation, as regexes resolution do not stay up-to-date as topics become
visible (ACLs added/delete).

This patch introduces time-based refresh for the subscribed regex by
- Adding internal `group.consumer.regex.batch.refresh.max.interval.ms`
config
that controls the refresh interval.
- Schedule a regex refresh when updating regex subscription if the
latest refresh is older than the max interval.

Reviewers: David Jacot <djacot@confluent.io>
This commit is contained in:
Dongnuo Lyu 2025-06-12 07:54:39 -04:00 committed by GitHub
parent 2a7457f2dd
commit af012e1ec2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 289 additions and 12 deletions

View File

@ -111,6 +111,7 @@ class AbstractAuthorizerIntegrationTest extends BaseRequestTest {
properties.put(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, "1")
properties.put(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, "1")
properties.put(GroupCoordinatorConfig.CONSUMER_GROUP_REGEX_REFRESH_INTERVAL_MS_CONFIG, "10000")
properties.put(TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, "1")
properties.put(TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG, "1")
properties.put(TransactionLogConfig.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG, "1")

View File

@ -3076,6 +3076,29 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
sendAndReceiveRegexHeartbeat(response, listenerName, None)
}
@Test
def testConsumerGroupHeartbeatWithRegexWithTopicDescribeAclAddedAndRemoved(): Unit = {
createTopicWithBrokerPrincipal(topic)
val allowAllOpsAcl = new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, ALL, ALLOW)
addAndVerifyAcls(Set(allowAllOpsAcl), groupResource)
val memberId = Uuid.randomUuid.toString;
var response = sendAndReceiveFirstRegexHeartbeat(memberId, listenerName)
TestUtils.tryUntilNoAssertionError() {
response = sendAndReceiveRegexHeartbeat(response, listenerName, Some(0), true)
}
addAndVerifyAcls(topicDescribeAcl(topicResource), topicResource)
TestUtils.tryUntilNoAssertionError(waitTime = 25000) {
response = sendAndReceiveRegexHeartbeat(response, listenerName, Some(1))
}
removeAndVerifyAcls(topicDescribeAcl(topicResource), topicResource)
TestUtils.tryUntilNoAssertionError(waitTime = 25000) {
response = sendAndReceiveRegexHeartbeat(response, listenerName, Some(0))
}
}
@Test
def testConsumerGroupHeartbeatWithRegexWithDifferentMemberAcls(): Unit = {
createTopicWithBrokerPrincipal(topic, numPartitions = 2)
@ -3093,7 +3116,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
// member permissions while computing assignments.
var member2Response = sendAndReceiveFirstRegexHeartbeat("memberWithLimitedAccess", listenerName)
member1Response = sendAndReceiveRegexHeartbeat(member1Response, interBrokerListenerName, Some(1))
member1Response = sendAndReceiveRegexHeartbeat(member1Response, interBrokerListenerName, None, fullRequest = true)
member1Response = sendAndReceiveRegexHeartbeat(member1Response, interBrokerListenerName, Some(1), fullRequest = true)
member2Response = sendAndReceiveRegexHeartbeat(member2Response, listenerName, Some(1))
// Create another topic and send heartbeats on member1 to trigger regex refresh
@ -3624,6 +3647,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
data = data
.setTopicPartitions(partitions.asJava)
.setSubscribedTopicRegex("^top.*")
.setRebalanceTimeoutMs(5 * 60 * 1000)
}
val request = new ConsumerGroupHeartbeatRequest.Builder(data).build()
val resource = Set[ResourceType](GROUP, TOPIC)

View File

@ -207,6 +207,11 @@ public class GroupCoordinatorConfig {
ConsumerGroupMigrationPolicy.DOWNGRADE + ": only downgrade from consumer group to classic group is enabled, " +
ConsumerGroupMigrationPolicy.DISABLED + ": neither upgrade nor downgrade is enabled.";
public static final String CONSUMER_GROUP_REGEX_REFRESH_INTERVAL_MS_CONFIG = "group.consumer.regex.refresh.interval.ms";
public static final String CONSUMER_GROUP_REGEX_REFRESH_INTERVAL_MS_DOC = "The interval at which the group coordinator will refresh " +
"the topics matching the group subscribed regexes. This is only applicable to consumer groups using the consumer group protocol. ";
public static final int CONSUMER_GROUP_REGEX_REFRESH_INTERVAL_MS_DEFAULT = 10 * 60 * 1000; // 10 minutes
///
/// Share group configs
///
@ -318,6 +323,8 @@ public class GroupCoordinatorConfig {
.define(CONSUMER_GROUP_MAX_SIZE_CONFIG, INT, CONSUMER_GROUP_MAX_SIZE_DEFAULT, atLeast(1), MEDIUM, CONSUMER_GROUP_MAX_SIZE_DOC)
.define(CONSUMER_GROUP_ASSIGNORS_CONFIG, LIST, CONSUMER_GROUP_ASSIGNORS_DEFAULT, null, MEDIUM, CONSUMER_GROUP_ASSIGNORS_DOC)
.define(CONSUMER_GROUP_MIGRATION_POLICY_CONFIG, STRING, CONSUMER_GROUP_MIGRATION_POLICY_DEFAULT, ConfigDef.CaseInsensitiveValidString.in(Utils.enumOptions(ConsumerGroupMigrationPolicy.class)), MEDIUM, CONSUMER_GROUP_MIGRATION_POLICY_DOC)
// Interval config used for testing purposes.
.defineInternal(CONSUMER_GROUP_REGEX_REFRESH_INTERVAL_MS_CONFIG, INT, CONSUMER_GROUP_REGEX_REFRESH_INTERVAL_MS_DEFAULT, atLeast(10 * 1000), MEDIUM, CONSUMER_GROUP_REGEX_REFRESH_INTERVAL_MS_DOC)
// Share group configs
.define(SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG, INT, SHARE_GROUP_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM, SHARE_GROUP_SESSION_TIMEOUT_MS_DOC)
@ -370,6 +377,7 @@ public class GroupCoordinatorConfig {
private final int consumerGroupMaxSessionTimeoutMs;
private final int consumerGroupMinHeartbeatIntervalMs;
private final int consumerGroupMaxHeartbeatIntervalMs;
private final int consumerGroupRegexRefreshIntervalMs;
// Share group configurations
private final int shareGroupMaxSize;
private final int shareGroupSessionTimeoutMs;
@ -419,6 +427,7 @@ public class GroupCoordinatorConfig {
this.consumerGroupMaxSessionTimeoutMs = config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG);
this.consumerGroupMinHeartbeatIntervalMs = config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG);
this.consumerGroupMaxHeartbeatIntervalMs = config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG);
this.consumerGroupRegexRefreshIntervalMs = config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_REGEX_REFRESH_INTERVAL_MS_CONFIG);
// Share group configurations
this.shareGroupSessionTimeoutMs = config.getInt(GroupCoordinatorConfig.SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG);
this.shareGroupMinSessionTimeoutMs = config.getInt(GroupCoordinatorConfig.SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG);
@ -810,6 +819,13 @@ public class GroupCoordinatorConfig {
return consumerGroupMaxHeartbeatIntervalMs;
}
/**
* The consumer group regex batch refresh max interval in milliseconds.
*/
public int consumerGroupRegexRefreshIntervalMs() {
return consumerGroupRegexRefreshIntervalMs;
}
/**
* The share group session timeout in milliseconds.
*/

View File

@ -402,8 +402,10 @@ public class GroupMetadataManager {
/**
* The minimum amount of time between two consecutive refreshes of
* the regular expressions within a single group.
*
* Package private for setting the lower limit of the refresh interval.
*/
private static final long REGEX_BATCH_REFRESH_INTERVAL_MS = 10_000L;
static final long REGEX_BATCH_REFRESH_MIN_INTERVAL_MS = 10_000L;
/**
* The log context.
@ -3076,6 +3078,7 @@ public class GroupMetadataManager {
ConsumerGroupMember updatedMember,
List<CoordinatorRecord> records
) {
final long currentTimeMs = time.milliseconds();
String groupId = group.groupId();
String memberId = updatedMember.memberId();
String oldSubscribedTopicRegex = member.subscribedTopicRegex();
@ -3116,8 +3119,9 @@ public class GroupMetadataManager {
// 0. The group is subscribed to regular expressions.
// 1. There is no ongoing refresh for the group.
// 2. The last refresh is older than 10s.
// 3. The group has unresolved regular expressions.
// 4. The metadata image has new topics.
// 3.1 The group has unresolved regular expressions.
// 3.2 Or the metadata image has new topics.
// 3.3 Or the last refresh is older than the batch refresh max interval.
// 0. The group is subscribed to regular expressions. We also take the one
// that the current may have just introduced.
@ -3134,11 +3138,11 @@ public class GroupMetadataManager {
// 2. The last refresh is older than 10s. If the group does not have any regular
// expressions but the current member just brought a new one, we should continue.
long lastRefreshTimeMs = group.lastResolvedRegularExpressionRefreshTimeMs();
if (time.milliseconds() <= lastRefreshTimeMs + REGEX_BATCH_REFRESH_INTERVAL_MS) {
if (currentTimeMs <= lastRefreshTimeMs + REGEX_BATCH_REFRESH_MIN_INTERVAL_MS) {
return bumpGroupEpoch;
}
// 3. The group has unresolved regular expressions.
// 3.1 The group has unresolved regular expressions.
Map<String, Integer> subscribedRegularExpressions = new HashMap<>(group.subscribedRegularExpressions());
if (isNotEmpty(oldSubscribedTopicRegex)) {
subscribedRegularExpressions.compute(oldSubscribedTopicRegex, Utils::decValue);
@ -3149,9 +3153,12 @@ public class GroupMetadataManager {
requireRefresh |= subscribedRegularExpressions.size() != group.numResolvedRegularExpressions();
// 4. The metadata has new topics that we must consider.
// 3.2 The metadata has new topics that we must consider.
requireRefresh |= group.lastResolvedRegularExpressionVersion() < lastMetadataImageWithNewTopics;
// 3.3 The last refresh is older than the batch refresh max interval.
requireRefresh |= currentTimeMs > lastRefreshTimeMs + config.consumerGroupRegexRefreshIntervalMs();
if (requireRefresh && !subscribedRegularExpressions.isEmpty()) {
Set<String> regexes = Collections.unmodifiableSet(subscribedRegularExpressions.keySet());
executor.schedule(

View File

@ -198,6 +198,7 @@ public class GroupCoordinatorConfigTest {
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, 666);
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, 111);
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, 222);
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_REGEX_REFRESH_INTERVAL_MS_CONFIG, 15 * 60 * 1000);
GroupCoordinatorConfig config = createConfig(configs);
@ -226,6 +227,7 @@ public class GroupCoordinatorConfigTest {
assertEquals(666, config.consumerGroupMaxSessionTimeoutMs());
assertEquals(111, config.consumerGroupMinHeartbeatIntervalMs());
assertEquals(222, config.consumerGroupMaxHeartbeatIntervalMs());
assertEquals(15 * 60 * 1000, config.consumerGroupRegexRefreshIntervalMs());
}
@Test

View File

@ -20804,7 +20804,7 @@ public class GroupMetadataManagerTest {
.withMetadataHash(computeGroupHash(Map.of(fooTopicName, fooTopicHash))))
.build();
// sleep for more than REGEX_BATCH_REFRESH_INTERVAL_MS
// sleep for more than REGEX_BATCH_REFRESH_MIN_INTERVAL_MS
context.time.sleep(10001L);
Map<String, AuthorizationResult> acls = new HashMap<>();
@ -20887,7 +20887,7 @@ public class GroupMetadataManagerTest {
context.processTasks()
);
// sleep for more than REGEX_BATCH_REFRESH_INTERVAL_MS
// sleep for more than REGEX_BATCH_REFRESH_MIN_INTERVAL_MS
context.time.sleep(10001L);
// Access to the bar topic is granted.
@ -20972,6 +20972,233 @@ public class GroupMetadataManagerTest {
);
}
@Test
public void testConsumerGroupMemberJoinsRefreshTopicAuthorization() {
String groupId = "fooup";
String memberId1 = Uuid.randomUuid().toString();
String memberId2 = Uuid.randomUuid().toString();
Uuid fooTopicId = Uuid.randomUuid();
Uuid barTopicId = Uuid.randomUuid();
String fooTopicName = "foo";
String barTopicName = "bar";
MetadataImage metadataImage = new MetadataImageBuilder()
.addTopic(fooTopicId, fooTopicName, 6)
.addTopic(barTopicId, barTopicName, 3)
.build(12345L);
long fooTopicHash = computeTopicHash(fooTopicName, metadataImage);
long barTopicHash = computeTopicHash(barTopicName, metadataImage);
MockPartitionAssignor assignor = new MockPartitionAssignor("range");
Authorizer authorizer = mock(Authorizer.class);
Plugin<Authorizer> authorizerPlugin = Plugin.wrapInstance(authorizer, null, "authorizer.class.name");
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(assignor))
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_REGEX_REFRESH_INTERVAL_MS_CONFIG, 60000)
.withMetadataImage(metadataImage)
.withAuthorizerPlugin(authorizerPlugin)
.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
.withMember(new ConsumerGroupMember.Builder(memberId1)
.setState(MemberState.STABLE)
.setMemberEpoch(10)
.setPreviousMemberEpoch(10)
.setClientId(DEFAULT_CLIENT_ID)
.setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
.setRebalanceTimeoutMs(5000)
.setSubscribedTopicNames(List.of("foo"))
.setServerAssignorName("range")
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(fooTopicId, 0, 1, 2)))
.build())
.withMember(new ConsumerGroupMember.Builder(memberId2)
.setState(MemberState.STABLE)
.setMemberEpoch(10)
.setPreviousMemberEpoch(10)
.setClientId(DEFAULT_CLIENT_ID)
.setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
.setRebalanceTimeoutMs(5000)
.setSubscribedTopicRegex("foo*")
.setServerAssignorName("range")
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(fooTopicId, 3, 4, 5)))
.build())
.withAssignment(memberId1, mkAssignment(
mkTopicAssignment(fooTopicId, 0, 1, 2)))
.withAssignment(memberId2, mkAssignment(
mkTopicAssignment(fooTopicId, 3, 4, 5)))
.withResolvedRegularExpression("foo*", new ResolvedRegularExpression(
Set.of(fooTopicName), 0L, 0L))
.withAssignmentEpoch(10)
.withMetadataHash(computeGroupHash(Map.of(fooTopicName, fooTopicHash))))
.build();
// sleep for more than REGEX_BATCH_REFRESH_MIN_INTERVAL_MS
context.time.sleep(10001L);
Map<String, AuthorizationResult> acls = new HashMap<>();
acls.put(fooTopicName, AuthorizationResult.ALLOWED);
acls.put(barTopicName, AuthorizationResult.DENIED);
when(authorizer.authorize(any(), any())).thenAnswer(invocation -> {
List<Action> actions = invocation.getArgument(1);
return actions.stream()
.map(action -> acls.getOrDefault(action.resourcePattern().name(), AuthorizationResult.DENIED))
.collect(Collectors.toList());
});
// Member 2 heartbeats with a different regular expression.
CoordinatorResult<ConsumerGroupHeartbeatResponseData, CoordinatorRecord> result1 = context.consumerGroupHeartbeat(
new ConsumerGroupHeartbeatRequestData()
.setGroupId(groupId)
.setMemberId(memberId2)
.setMemberEpoch(10)
.setRebalanceTimeoutMs(5000)
.setSubscribedTopicRegex("foo*|bar*")
.setServerAssignor("range")
.setTopicPartitions(List.of()),
ApiKeys.CONSUMER_GROUP_HEARTBEAT.latestVersion()
);
assertResponseEquals(
new ConsumerGroupHeartbeatResponseData()
.setMemberId(memberId2)
.setMemberEpoch(10)
.setHeartbeatIntervalMs(5000)
.setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment()
.setTopicPartitions(List.of(
new ConsumerGroupHeartbeatResponseData.TopicPartitions()
.setTopicId(fooTopicId)
.setPartitions(List.of(3, 4, 5))))),
result1.response()
);
ConsumerGroupMember expectedMember2 = new ConsumerGroupMember.Builder(memberId2)
.setState(MemberState.STABLE)
.setMemberEpoch(10)
.setPreviousMemberEpoch(10)
.setClientId(DEFAULT_CLIENT_ID)
.setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
.setRebalanceTimeoutMs(5000)
.setSubscribedTopicRegex("foo*|bar*")
.setServerAssignorName("range")
.build();
assertRecordsEquals(
List.of(
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember2),
GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone(groupId, "foo*")
),
result1.records()
);
// Execute pending tasks.
assertEquals(
List.of(
new MockCoordinatorExecutor.ExecutorResult<>(
groupId + "-regex",
new CoordinatorResult<>(List.of(
// The resolution of the new regex is persisted.
GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionRecord(
groupId,
"foo*|bar*",
new ResolvedRegularExpression(
Set.of("foo"),
12345L,
context.time.milliseconds()
)
),
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, computeGroupHash(Map.of(
fooTopicName, fooTopicHash
)))
))
)
),
context.processTasks()
);
// sleep for more than REGEX_REFRESH_INTERVAL_MS
context.time.sleep(60001L);
// Access to the bar topic is granted.
acls.put(barTopicName, AuthorizationResult.ALLOWED);
assignor.prepareGroupAssignment(new GroupAssignment(Map.of(
memberId1, new MemberAssignmentImpl(mkAssignment(
mkTopicAssignment(fooTopicId, 0, 1, 2)
)),
memberId2, new MemberAssignmentImpl(mkAssignment(
mkTopicAssignment(fooTopicId, 3, 4, 5)
))
)));
// Member 2 heartbeats again with the same regex.
CoordinatorResult<ConsumerGroupHeartbeatResponseData, CoordinatorRecord> result2 = context.consumerGroupHeartbeat(
new ConsumerGroupHeartbeatRequestData()
.setGroupId(groupId)
.setMemberId(memberId2)
.setMemberEpoch(10)
.setRebalanceTimeoutMs(5000)
.setSubscribedTopicRegex("foo*|bar*")
.setServerAssignor("range")
.setTopicPartitions(List.of()),
ApiKeys.CONSUMER_GROUP_HEARTBEAT.latestVersion()
);
expectedMember2 = new ConsumerGroupMember.Builder(memberId2)
.setState(MemberState.STABLE)
.setMemberEpoch(11)
.setPreviousMemberEpoch(10)
.setClientId(DEFAULT_CLIENT_ID)
.setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
.setRebalanceTimeoutMs(5000)
.setSubscribedTopicRegex("foo*|bar*")
.setServerAssignorName("range")
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(fooTopicId, 3, 4, 5)))
.build();
assertResponseEquals(
new ConsumerGroupHeartbeatResponseData()
.setMemberId(memberId2)
.setMemberEpoch(11)
.setHeartbeatIntervalMs(5000)
.setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment()
.setTopicPartitions(List.of(
new ConsumerGroupHeartbeatResponseData.TopicPartitions()
.setTopicId(fooTopicId)
.setPartitions(List.of(3, 4, 5))))),
result2.response()
);
assertRecordsEquals(
List.of(
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 11),
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, expectedMember2)
),
result2.records()
);
// A regex refresh is triggered and the bar topic is included.
assertRecordsEquals(
List.of(
// The resolution of the new regex is persisted.
GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionRecord(
groupId,
"foo*|bar*",
new ResolvedRegularExpression(
Set.of("foo", "bar"),
12345L,
context.time.milliseconds()
)
),
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 12, computeGroupHash(Map.of(
fooTopicName, fooTopicHash,
barTopicName, barTopicHash
)))
),
context.processTasks().get(0).result.records()
);
}
@Test
public void testResolvedRegularExpressionsRemovedWhenMembersLeaveOrFenced() {
String groupId = "fooup";