KAFKA-16095: Update list group state type filter to include the states for the new consumer group type (#15211)

While using —list —state the current accepted values correspond to the classic group type states. This patch adds the new states introduced by KIP-848. It also make the matching on the server case insensitive.

Co-authored-by: d00791190 <dinglan6@huawei.com>

Reviewers: Ritika Reddy <rreddy@confluent.io>, David Jacot <djacot@confluent.io>
This commit is contained in:
DL1231 2024-01-29 23:19:05 +08:00 committed by GitHub
parent 94ab8c16ba
commit 82920ffad0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 164 additions and 40 deletions

View File

@ -18,6 +18,7 @@
package org.apache.kafka.common;
import java.util.Arrays;
import java.util.Locale;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
@ -31,10 +32,12 @@ public enum ConsumerGroupState {
COMPLETING_REBALANCE("CompletingRebalance"),
STABLE("Stable"),
DEAD("Dead"),
EMPTY("Empty");
EMPTY("Empty"),
ASSIGNING("Assigning"),
RECONCILING("Reconciling");
private final static Map<String, ConsumerGroupState> NAME_TO_ENUM = Arrays.stream(values())
.collect(Collectors.toMap(state -> state.name, Function.identity()));
.collect(Collectors.toMap(state -> state.name.toUpperCase(Locale.ROOT), Function.identity()));
private final String name;
@ -43,10 +46,10 @@ public enum ConsumerGroupState {
}
/**
* Parse a string into a consumer group state.
* Case-insensitive consumer group state lookup by string name.
*/
public static ConsumerGroupState parse(String name) {
ConsumerGroupState state = NAME_TO_ENUM.get(name);
ConsumerGroupState state = NAME_TO_ENUM.get(name.toUpperCase(Locale.ROOT));
return state == null ? UNKNOWN : state;
}

View File

@ -1114,8 +1114,10 @@ private[group] class GroupCoordinator(
// if states is empty, return all groups
val groups = if (states.isEmpty)
groupManager.currentGroups
else
groupManager.currentGroups.filter(g => states.contains(g.summary.state))
else {
val caseInsensitiveStates = states.map(_.toLowerCase)
groupManager.currentGroups.filter(g => g.isInStates(caseInsensitiveStates))
}
(errorCode, groups.map(_.overview).toList)
}
}

View File

@ -35,6 +35,7 @@ import scala.jdk.CollectionConverters._
private[group] sealed trait GroupState {
val validPreviousStates: Set[GroupState]
val toLowerCaseString: String = toString.toLowerCase
}
/**
@ -841,6 +842,10 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
.format(groupId, targetState.validPreviousStates.mkString(","), targetState, state))
}
def isInStates(states: collection.Set[String]): Boolean = {
states.contains(state.toLowerCaseString)
}
override def toString: String = {
"GroupMetadata(" +
s"groupId=$groupId, " +

View File

@ -95,9 +95,16 @@ class ListConsumerGroupTest extends ConsumerGroupCommandTest {
result = ConsumerGroupCommand.consumerGroupStatesFromString("Dead,CompletingRebalance,")
assertEquals(Set(ConsumerGroupState.DEAD, ConsumerGroupState.COMPLETING_REBALANCE), result)
assertThrows(classOf[IllegalArgumentException], () => ConsumerGroupCommand.consumerGroupStatesFromString("bad, wrong"))
result = ConsumerGroupCommand.consumerGroupStatesFromString("stable")
assertEquals(Set(ConsumerGroupState.STABLE), result)
assertThrows(classOf[IllegalArgumentException], () => ConsumerGroupCommand.consumerGroupStatesFromString("stable"))
result = ConsumerGroupCommand.consumerGroupStatesFromString("stable, assigning")
assertEquals(Set(ConsumerGroupState.STABLE, ConsumerGroupState.ASSIGNING), result)
result = ConsumerGroupCommand.consumerGroupStatesFromString("dead,reconciling,")
assertEquals(Set(ConsumerGroupState.DEAD, ConsumerGroupState.RECONCILING), result)
assertThrows(classOf[IllegalArgumentException], () => ConsumerGroupCommand.consumerGroupStatesFromString("bad, wrong"))
assertThrows(classOf[IllegalArgumentException], () => ConsumerGroupCommand.consumerGroupStatesFromString(" bad, Stable"))
@ -129,6 +136,12 @@ class ListConsumerGroupTest extends ConsumerGroupCommandTest {
out = TestUtils.grabConsoleOutput(ConsumerGroupCommand.main(cgcArgs))
out.contains("STATE") && out.contains(group) && out.contains("Stable")
}, s"Expected to find $group in state Stable and the header, but found $out")
cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list", "--state", "stable")
TestUtils.waitUntilTrue(() => {
out = TestUtils.grabConsoleOutput(ConsumerGroupCommand.main(cgcArgs))
out.contains("STATE") && out.contains(group) && out.contains("Stable")
}, s"Expected to find $group in state Stable and the header, but found $out")
}
}

View File

@ -146,7 +146,7 @@ class ListGroupsRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBa
// We need v4 or newer to request groups by states.
if (version >= 4) {
assertEquals(
if (useNewProtocol) List(response4) else List.empty,
if (useNewProtocol) List(response4, response1) else List(response1),
listGroups(
statesFilter = List(ConsumerGroupState.STABLE.toString),
version = version.toShort
@ -165,7 +165,7 @@ class ListGroupsRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBa
)
assertEquals(
if (useNewProtocol) List(response1, response3, response6).toSet else List(response1, response3).toSet,
if (useNewProtocol) List(response4, response1, response3, response6).toSet else List(response1, response3).toSet,
listGroups(
statesFilter = List(
ClassicGroupState.STABLE.toString,

View File

@ -21,6 +21,7 @@ import org.apache.kafka.common.message.ListGroupsResponseData;
import java.util.List;
import java.util.Optional;
import java.util.Set;
/**
* Interface common for all groups.
@ -60,7 +61,7 @@ public interface Group {
/**
* @return the group formatted as a list group response based on the committed offset.
*/
public ListGroupsResponseData.ListedGroup asListedGroup(long committedOffset);
ListGroupsResponseData.ListedGroup asListedGroup(long committedOffset);
/**
* @return The group id.
@ -133,4 +134,12 @@ public interface Group {
* @return The offset expiration condition for the group or Empty if no such condition exists.
*/
Optional<OffsetExpirationCondition> offsetExpirationCondition();
/**
* Returns true if the statesFilter contains the current state with given committedOffset.
*
* @param statesFilter The states to filter, which must be lowercase.
* @return true if the state includes, false otherwise.
*/
boolean isInStates(Set<String> statesFilter, long committedOffset);
}

View File

@ -462,7 +462,8 @@ public class GroupMetadataManager {
public List<ListGroupsResponseData.ListedGroup> listGroups(List<String> statesFilter, long committedOffset) {
Stream<Group> groupStream = groups.values(committedOffset).stream();
if (!statesFilter.isEmpty()) {
groupStream = groupStream.filter(group -> statesFilter.contains(group.stateAsString(committedOffset)));
Set<String> caseInsensitiveFilterSet = statesFilter.stream().map(String::toLowerCase).map(String::trim).collect(Collectors.toSet());
groupStream = groupStream.filter(group -> group.isInStates(caseInsensitiveFilterSet, committedOffset));
}
return groupStream.map(group -> group.asListedGroup(committedOffset)).collect(Collectors.toList());
}

View File

@ -956,6 +956,11 @@ public class ClassicGroup implements Group {
return Optional.empty();
}
@Override
public boolean isInStates(Set<String> statesFilter, long committedOffset) {
return statesFilter.contains(state.toLowerCaseString());
}
/**
* Verify the member id is up to date for static members. Return true if both conditions met:
* 1. given member is a known static member to group

View File

@ -19,6 +19,7 @@ package org.apache.kafka.coordinator.group.classic;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Locale;
import java.util.Set;
/**
@ -104,6 +105,7 @@ public enum ClassicGroupState {
DEAD("Dead");
private final String name;
private final String lowerCaseName;
private Set<ClassicGroupState> validPreviousStates;
static {
@ -116,6 +118,7 @@ public enum ClassicGroupState {
ClassicGroupState(String name) {
this.name = name;
this.lowerCaseName = name.toLowerCase(Locale.ROOT);
}
@Override
@ -123,6 +126,10 @@ public enum ClassicGroupState {
return name;
}
public String toLowerCaseString() {
return lowerCaseName;
}
private void addValidPreviousStates(ClassicGroupState... validPreviousStates) {
this.validPreviousStates = new HashSet<>(Arrays.asList(validPreviousStates));
}

View File

@ -42,6 +42,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
@ -59,22 +60,29 @@ import static org.apache.kafka.coordinator.group.consumer.ConsumerGroup.Consumer
public class ConsumerGroup implements Group {
public enum ConsumerGroupState {
EMPTY("empty"),
ASSIGNING("assigning"),
RECONCILING("reconciling"),
STABLE("stable"),
DEAD("dead");
EMPTY("Empty"),
ASSIGNING("Assigning"),
RECONCILING("Reconciling"),
STABLE("Stable"),
DEAD("Dead");
private final String name;
private final String lowerCaseName;
ConsumerGroupState(String name) {
this.name = name;
this.lowerCaseName = name.toLowerCase(Locale.ROOT);
}
@Override
public String toString() {
return name;
}
public String toLowerCaseString() {
return lowerCaseName;
}
}
public static class DeadlineAndEpoch {
@ -739,6 +747,11 @@ public class ConsumerGroup implements Group {
return Optional.of(new OffsetExpirationConditionImpl(offsetAndMetadata -> offsetAndMetadata.commitTimestampMs));
}
@Override
public boolean isInStates(Set<String> statesFilter, long committedOffset) {
return statesFilter.contains(state.get(committedOffset).toLowerCaseString());
}
/**
* Throws a StaleMemberEpochException if the received member epoch does not match
* the expected member epoch.

View File

@ -9633,6 +9633,13 @@ public class GroupMetadataManagerTest {
assertEquals(expectAllGroupMap, actualAllGroupMap);
// List group with case insensitive empty
actualAllGroupMap =
context.sendListGroups(Collections.singletonList("empty"))
.stream().collect(Collectors.toMap(ListGroupsResponseData.ListedGroup::groupId, Function.identity()));
assertEquals(expectAllGroupMap, actualAllGroupMap);
context.commit();
actualAllGroupMap = context.sendListGroups(Collections.emptyList()).stream()
.collect(Collectors.toMap(ListGroupsResponseData.ListedGroup::groupId, Function.identity()));
@ -9660,6 +9667,21 @@ public class GroupMetadataManagerTest {
).collect(Collectors.toMap(ListGroupsResponseData.ListedGroup::groupId, Function.identity()));
assertEquals(expectAllGroupMap, actualAllGroupMap);
actualAllGroupMap = context.sendListGroups(Arrays.asList("empty", "Assigning")).stream()
.collect(Collectors.toMap(ListGroupsResponseData.ListedGroup::groupId, Function.identity()));
expectAllGroupMap = Stream.of(
new ListGroupsResponseData.ListedGroup()
.setGroupId(classicGroup.groupId())
.setProtocolType(classicGroupType)
.setGroupState(EMPTY.toString()),
new ListGroupsResponseData.ListedGroup()
.setGroupId(consumerGroupId)
.setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
.setGroupState(ConsumerGroup.ConsumerGroupState.ASSIGNING.toString())
).collect(Collectors.toMap(ListGroupsResponseData.ListedGroup::groupId, Function.identity()));
assertEquals(expectAllGroupMap, actualAllGroupMap);
}
@Test
@ -9684,7 +9706,7 @@ public class GroupMetadataManagerTest {
new ConsumerGroupDescribeResponseData.DescribedGroup()
.setGroupEpoch(epoch)
.setGroupId(consumerGroupIds.get(0))
.setGroupState("empty")
.setGroupState(ConsumerGroup.ConsumerGroupState.EMPTY.toString())
.setAssignorName("range"),
new ConsumerGroupDescribeResponseData.DescribedGroup()
.setGroupEpoch(epoch)
@ -9695,7 +9717,7 @@ public class GroupMetadataManagerTest {
new MetadataImageBuilder().build().topics()
)
))
.setGroupState("assigning")
.setGroupState(ConsumerGroup.ConsumerGroupState.ASSIGNING.toString())
.setAssignorName("assignorName")
);
List<ConsumerGroupDescribeResponseData.DescribedGroup> actual = context.sendConsumerGroupDescribe(consumerGroupIds);
@ -9774,7 +9796,7 @@ public class GroupMetadataManagerTest {
memberBuilder1.build().asConsumerGroupDescribeMember(new Assignment(Collections.emptyMap()), metadataImage.topics()),
memberBuilder2.build().asConsumerGroupDescribeMember(new Assignment(assignmentMap), metadataImage.topics())
))
.setGroupState("assigning")
.setGroupState(ConsumerGroup.ConsumerGroupState.ASSIGNING.toString())
.setAssignorName("range")
.setGroupEpoch(epoch + 2);
expected = Collections.singletonList(

View File

@ -42,6 +42,7 @@ import org.apache.kafka.timeline.SnapshotRegistry;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Optional;
@ -1274,6 +1275,27 @@ public class ClassicGroupTest {
verify(metrics, times(1)).onClassicGroupStateTransition(STABLE, DEAD);
}
@Test
public void testIsInStates() {
ClassicGroup group = new ClassicGroup(new LogContext(), "groupId", EMPTY, Time.SYSTEM, mock(GroupCoordinatorMetricsShard.class));
assertTrue(group.isInStates(Collections.singleton("empty"), 0));
group.transitionTo(PREPARING_REBALANCE);
assertTrue(group.isInStates(Collections.singleton("preparingrebalance"), 0));
assertFalse(group.isInStates(Collections.singleton("PreparingRebalance"), 0));
group.transitionTo(COMPLETING_REBALANCE);
assertTrue(group.isInStates(new HashSet<>(Collections.singletonList("completingrebalance")), 0));
group.transitionTo(STABLE);
assertTrue(group.isInStates(Collections.singleton("stable"), 0));
assertFalse(group.isInStates(Collections.singleton("empty"), 0));
group.transitionTo(DEAD);
assertTrue(group.isInStates(new HashSet<>(Arrays.asList("dead", " ")), 0));
}
private void assertState(ClassicGroup group, ClassicGroupState targetState) {
Set<ClassicGroupState> otherStates = new HashSet<>();
otherStates.add(STABLE);

View File

@ -1024,4 +1024,26 @@ public class ConsumerGroupTest {
assertEquals(ConsumerGroup.ConsumerGroupState.EMPTY, consumerGroup.state());
verify(metrics, times(1)).onConsumerGroupStateTransition(ConsumerGroup.ConsumerGroupState.STABLE, ConsumerGroup.ConsumerGroupState.EMPTY);
}
@Test
public void testIsInStatesCaseInsensitive() {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
GroupCoordinatorMetricsShard metricsShard = new GroupCoordinatorMetricsShard(
snapshotRegistry,
Collections.emptyMap(),
new TopicPartition("__consumer_offsets", 0)
);
ConsumerGroup group = new ConsumerGroup(snapshotRegistry, "group-foo", metricsShard);
snapshotRegistry.getOrCreateSnapshot(0);
assertTrue(group.isInStates(Collections.singleton("empty"), 0));
assertFalse(group.isInStates(Collections.singleton("Empty"), 0));
group.updateMember(new ConsumerGroupMember.Builder("member1")
.setSubscribedTopicNames(Collections.singletonList("foo"))
.build());
snapshotRegistry.getOrCreateSnapshot(1);
assertTrue(group.isInStates(Collections.singleton("empty"), 0));
assertTrue(group.isInStates(Collections.singleton("stable"), 1));
assertFalse(group.isInStates(Collections.singleton("empty"), 1));
}
}

View File

@ -227,14 +227,14 @@ public class GroupCoordinatorMetricsShardTest {
assertGaugeValue(metrics, metrics.metricName("group-count", "group-coordinator-metrics",
Collections.singletonMap("protocol", "consumer")), 4);
assertGaugeValue(metrics, metrics.metricName("consumer-group-count", "group-coordinator-metrics",
Collections.singletonMap("state", "empty")), 0);
Collections.singletonMap("state", ConsumerGroup.ConsumerGroupState.EMPTY.toString())), 0);
assertGaugeValue(metrics, metrics.metricName("consumer-group-count", "group-coordinator-metrics",
Collections.singletonMap("state", "assigning")), 1);
Collections.singletonMap("state", ConsumerGroup.ConsumerGroupState.ASSIGNING.toString())), 1);
assertGaugeValue(metrics, metrics.metricName("consumer-group-count", "group-coordinator-metrics",
Collections.singletonMap("state", "reconciling")), 1);
Collections.singletonMap("state", ConsumerGroup.ConsumerGroupState.RECONCILING.toString())), 1);
assertGaugeValue(metrics, metrics.metricName("consumer-group-count", "group-coordinator-metrics",
Collections.singletonMap("state", "stable")), 2);
Collections.singletonMap("state", ConsumerGroup.ConsumerGroupState.STABLE.toString())), 2);
assertGaugeValue(metrics, metrics.metricName("consumer-group-count", "group-coordinator-metrics",
Collections.singletonMap("state", "dead")), 0);
Collections.singletonMap("state", ConsumerGroup.ConsumerGroupState.DEAD.toString())), 0);
}
}

View File

@ -74,23 +74,23 @@ public class GroupCoordinatorMetricsTest {
metrics.metricName(
"consumer-group-count",
GroupCoordinatorMetrics.METRICS_GROUP,
Collections.singletonMap("state", "empty")),
Collections.singletonMap("state", ConsumerGroupState.EMPTY.toString())),
metrics.metricName(
"consumer-group-count",
GroupCoordinatorMetrics.METRICS_GROUP,
Collections.singletonMap("state", "assigning")),
Collections.singletonMap("state", ConsumerGroupState.ASSIGNING.toString())),
metrics.metricName(
"consumer-group-count",
GroupCoordinatorMetrics.METRICS_GROUP,
Collections.singletonMap("state", "reconciling")),
Collections.singletonMap("state", ConsumerGroupState.RECONCILING.toString())),
metrics.metricName(
"consumer-group-count",
GroupCoordinatorMetrics.METRICS_GROUP,
Collections.singletonMap("state", "stable")),
Collections.singletonMap("state", ConsumerGroupState.STABLE.toString())),
metrics.metricName(
"consumer-group-count",
GroupCoordinatorMetrics.METRICS_GROUP,
Collections.singletonMap("state", "dead"))
Collections.singletonMap("state", ConsumerGroupState.DEAD.toString()))
));
try {