mirror of https://github.com/apache/kafka.git
KAFKA-17593; [3/N] Track the number of subscribed members per regular expression in ConsumerGroup (#17653)
This patch adds a data structure to ConsumerGroup to track the number of members subscribed to each regular expressions in the group. This will be useful to know whether a regex is new in the group or whether a regex must be removed from the group. Reviewers: Jeff Kim <jeff.kim@confluent.io>, Lianet Magrans <lmagrans@confluent.io>
This commit is contained in:
parent
64f3ee4c33
commit
5cf91e4cbe
|
@ -130,6 +130,11 @@ public class ConsumerGroup extends ModernGroup<ConsumerGroupMember> {
|
|||
*/
|
||||
private final TimelineHashMap<Uuid, TimelineHashMap<Integer, Integer>> currentPartitionEpoch;
|
||||
|
||||
/**
|
||||
* The number of members subscribed to each regular expressions.
|
||||
*/
|
||||
private final TimelineHashMap<String, Integer> subscribedRegularExpressions;
|
||||
|
||||
public ConsumerGroup(
|
||||
SnapshotRegistry snapshotRegistry,
|
||||
String groupId,
|
||||
|
@ -143,6 +148,7 @@ public class ConsumerGroup extends ModernGroup<ConsumerGroupMember> {
|
|||
this.numClassicProtocolMembers = new TimelineInteger(snapshotRegistry);
|
||||
this.classicProtocolMembersSupportedProtocols = new TimelineHashMap<>(snapshotRegistry, 0);
|
||||
this.currentPartitionEpoch = new TimelineHashMap<>(snapshotRegistry, 0);
|
||||
this.subscribedRegularExpressions = new TimelineHashMap<>(snapshotRegistry, 0);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -294,6 +300,7 @@ public class ConsumerGroup extends ModernGroup<ConsumerGroupMember> {
|
|||
maybeUpdateSubscribedTopicNamesAndGroupSubscriptionType(oldMember, newMember);
|
||||
maybeUpdateServerAssignors(oldMember, newMember);
|
||||
maybeUpdatePartitionEpoch(oldMember, newMember);
|
||||
maybeUpdateSubscribedRegularExpression(oldMember, newMember);
|
||||
updateStaticMember(newMember);
|
||||
maybeUpdateGroupState();
|
||||
maybeUpdateNumClassicProtocolMembers(oldMember, newMember);
|
||||
|
@ -317,6 +324,7 @@ public class ConsumerGroup extends ModernGroup<ConsumerGroupMember> {
|
|||
maybeUpdateSubscribedTopicNamesAndGroupSubscriptionType(oldMember, null);
|
||||
maybeUpdateServerAssignors(oldMember, null);
|
||||
maybeRemovePartitionEpoch(oldMember);
|
||||
maybeUpdateSubscribedRegularExpression(oldMember, null);
|
||||
removeStaticMember(oldMember);
|
||||
maybeUpdateGroupState();
|
||||
maybeUpdateNumClassicProtocolMembers(oldMember, null);
|
||||
|
@ -334,6 +342,13 @@ public class ConsumerGroup extends ModernGroup<ConsumerGroupMember> {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @return The number of members subscribed to the provided regex.
|
||||
*/
|
||||
public int numSubscribedMembers(String regex) {
|
||||
return subscribedRegularExpressions.getOrDefault(regex, 0);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return The number of members that use the classic protocol.
|
||||
*/
|
||||
|
@ -365,7 +380,8 @@ public class ConsumerGroup extends ModernGroup<ConsumerGroupMember> {
|
|||
* @return The epoch or -1.
|
||||
*/
|
||||
public int currentPartitionEpoch(
|
||||
Uuid topicId, int partitionId
|
||||
Uuid topicId,
|
||||
int partitionId
|
||||
) {
|
||||
Map<Integer, Integer> partitions = currentPartitionEpoch.get(topicId);
|
||||
if (partitions == null) {
|
||||
|
@ -665,6 +681,26 @@ public class ConsumerGroup extends ModernGroup<ConsumerGroupMember> {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Updates the number of the members that use the regular expression.
|
||||
*
|
||||
* @param oldMember The old member.
|
||||
* @param newMember The new member.
|
||||
*/
|
||||
private void maybeUpdateSubscribedRegularExpression(
|
||||
ConsumerGroupMember oldMember,
|
||||
ConsumerGroupMember newMember
|
||||
) {
|
||||
// Decrement the count of the old regex.
|
||||
if (oldMember != null && oldMember.subscribedTopicRegex() != null) {
|
||||
subscribedRegularExpressions.compute(oldMember.subscribedTopicRegex(), Utils::decValue);
|
||||
}
|
||||
// Increment the count of the new regex.
|
||||
if (newMember != null && newMember.subscribedTopicRegex() != null) {
|
||||
subscribedRegularExpressions.compute(newMember.subscribedTopicRegex(), Utils::incValue);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Updates the number of the members that use the classic protocol.
|
||||
*
|
||||
|
|
|
@ -1599,4 +1599,69 @@ public class ConsumerGroupTest {
|
|||
assertEquals(expectedConsumerGroup.preferredServerAssignor(), consumerGroup.preferredServerAssignor());
|
||||
assertEquals(expectedConsumerGroup.members(), consumerGroup.members());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSubscribedRegularExpressionCount() {
|
||||
ConsumerGroup consumerGroup = createConsumerGroup("foo");
|
||||
|
||||
ConsumerGroupMember member1 = new ConsumerGroupMember.Builder("member1")
|
||||
.setSubscribedTopicRegex("regex1")
|
||||
.build();
|
||||
ConsumerGroupMember member2 = new ConsumerGroupMember.Builder("member2")
|
||||
.setSubscribedTopicRegex("regex2")
|
||||
.build();
|
||||
ConsumerGroupMember member3 = new ConsumerGroupMember.Builder("member3")
|
||||
.setSubscribedTopicRegex("regex1")
|
||||
.build();
|
||||
|
||||
// Assert the initial state.
|
||||
assertEquals(0, consumerGroup.numSubscribedMembers("regex1"));
|
||||
assertEquals(0, consumerGroup.numSubscribedMembers("regex2"));
|
||||
assertEquals(0, consumerGroup.numSubscribedMembers("regex3"));
|
||||
|
||||
// Add member 1.
|
||||
consumerGroup.updateMember(member1);
|
||||
assertEquals(1, consumerGroup.numSubscribedMembers("regex1"));
|
||||
assertEquals(0, consumerGroup.numSubscribedMembers("regex2"));
|
||||
assertEquals(0, consumerGroup.numSubscribedMembers("regex3"));
|
||||
|
||||
// Add member 2.
|
||||
consumerGroup.updateMember(member2);
|
||||
assertEquals(1, consumerGroup.numSubscribedMembers("regex1"));
|
||||
assertEquals(1, consumerGroup.numSubscribedMembers("regex2"));
|
||||
assertEquals(0, consumerGroup.numSubscribedMembers("regex3"));
|
||||
|
||||
// Add member 3.
|
||||
consumerGroup.updateMember(member3);
|
||||
assertEquals(2, consumerGroup.numSubscribedMembers("regex1"));
|
||||
assertEquals(1, consumerGroup.numSubscribedMembers("regex2"));
|
||||
assertEquals(0, consumerGroup.numSubscribedMembers("regex3"));
|
||||
|
||||
// Update member 3.
|
||||
member3 = new ConsumerGroupMember.Builder(member3)
|
||||
.setSubscribedTopicRegex("regex2")
|
||||
.build();
|
||||
consumerGroup.updateMember(member3);
|
||||
assertEquals(1, consumerGroup.numSubscribedMembers("regex1"));
|
||||
assertEquals(2, consumerGroup.numSubscribedMembers("regex2"));
|
||||
assertEquals(0, consumerGroup.numSubscribedMembers("regex3"));
|
||||
|
||||
// Remove member 1.
|
||||
consumerGroup.removeMember(member1.memberId());
|
||||
assertEquals(0, consumerGroup.numSubscribedMembers("regex1"));
|
||||
assertEquals(2, consumerGroup.numSubscribedMembers("regex2"));
|
||||
assertEquals(0, consumerGroup.numSubscribedMembers("regex3"));
|
||||
|
||||
// Remove member 2.
|
||||
consumerGroup.removeMember(member2.memberId());
|
||||
assertEquals(0, consumerGroup.numSubscribedMembers("regex1"));
|
||||
assertEquals(1, consumerGroup.numSubscribedMembers("regex2"));
|
||||
assertEquals(0, consumerGroup.numSubscribedMembers("regex3"));
|
||||
|
||||
// Remove member 3.
|
||||
consumerGroup.removeMember(member3.memberId());
|
||||
assertEquals(0, consumerGroup.numSubscribedMembers("regex1"));
|
||||
assertEquals(0, consumerGroup.numSubscribedMembers("regex2"));
|
||||
assertEquals(0, consumerGroup.numSubscribedMembers("regex3"));
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue