mirror of https://github.com/apache/kafka.git
MINOR: add repartitionSourceTopics to Streams group description (#19561)
This is a follow-up of this #19433 This PR aims at adding the `repartition source topics` to the output of `--describe` for streams groups. Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
This commit is contained in:
parent
416f8941ca
commit
443c01ca80
|
@ -368,7 +368,9 @@ public class StreamsGroupCommand {
|
||||||
private static Set<TopicPartition> getTopicPartitions(List<StreamsGroupMemberAssignment.TaskIds> taskIds, StreamsGroupDescription description) {
|
private static Set<TopicPartition> getTopicPartitions(List<StreamsGroupMemberAssignment.TaskIds> taskIds, StreamsGroupDescription description) {
|
||||||
Map<String, List<String>> allSourceTopics = new HashMap<>();
|
Map<String, List<String>> allSourceTopics = new HashMap<>();
|
||||||
for (StreamsGroupSubtopologyDescription subtopologyDescription : description.subtopologies()) {
|
for (StreamsGroupSubtopologyDescription subtopologyDescription : description.subtopologies()) {
|
||||||
allSourceTopics.put(subtopologyDescription.subtopologyId(), subtopologyDescription.sourceTopics());
|
List<String> topics = new ArrayList<>(subtopologyDescription.sourceTopics());
|
||||||
|
topics.addAll(subtopologyDescription.repartitionSourceTopics().keySet());
|
||||||
|
allSourceTopics.put(subtopologyDescription.subtopologyId(), topics);
|
||||||
}
|
}
|
||||||
Set<TopicPartition> topicPartitions = new HashSet<>();
|
Set<TopicPartition> topicPartitions = new HashSet<>();
|
||||||
|
|
||||||
|
|
|
@ -95,7 +95,9 @@ public class DescribeStreamsGroupTest {
|
||||||
final List<String> expectedHeader = List.of("GROUP", "TOPIC", "PARTITION", "OFFSET-LAG");
|
final List<String> expectedHeader = List.of("GROUP", "TOPIC", "PARTITION", "OFFSET-LAG");
|
||||||
final Set<List<String>> expectedRows = Set.of(
|
final Set<List<String>> expectedRows = Set.of(
|
||||||
List.of(APP_ID, INPUT_TOPIC, "0", "0"),
|
List.of(APP_ID, INPUT_TOPIC, "0", "0"),
|
||||||
List.of(APP_ID, INPUT_TOPIC, "1", "0"));
|
List.of(APP_ID, INPUT_TOPIC, "1", "0"),
|
||||||
|
List.of(APP_ID, "streams-group-command-test-KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition", "0", "0"),
|
||||||
|
List.of(APP_ID, "streams-group-command-test-KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition", "1", "0"));
|
||||||
|
|
||||||
validateDescribeOutput(
|
validateDescribeOutput(
|
||||||
Arrays.asList("--bootstrap-server", cluster.bootstrapServers(), "--describe"), expectedHeader, expectedRows, List.of());
|
Arrays.asList("--bootstrap-server", cluster.bootstrapServers(), "--describe"), expectedHeader, expectedRows, List.of());
|
||||||
|
@ -109,7 +111,9 @@ public class DescribeStreamsGroupTest {
|
||||||
final List<String> expectedHeader = List.of("GROUP", "TOPIC", "PARTITION", "CURRENT-OFFSET", "LEADER-EPOCH", "LOG-END-OFFSET", "OFFSET-LAG");
|
final List<String> expectedHeader = List.of("GROUP", "TOPIC", "PARTITION", "CURRENT-OFFSET", "LEADER-EPOCH", "LOG-END-OFFSET", "OFFSET-LAG");
|
||||||
final Set<List<String>> expectedRows = Set.of(
|
final Set<List<String>> expectedRows = Set.of(
|
||||||
List.of(APP_ID, INPUT_TOPIC, "0", "-", "-", "0", "0"),
|
List.of(APP_ID, INPUT_TOPIC, "0", "-", "-", "0", "0"),
|
||||||
List.of(APP_ID, INPUT_TOPIC, "1", "-", "-", "0", "0"));
|
List.of(APP_ID, INPUT_TOPIC, "1", "-", "-", "0", "0"),
|
||||||
|
List.of(APP_ID, "streams-group-command-test-KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition", "0", "-", "-", "0", "0"),
|
||||||
|
List.of(APP_ID, "streams-group-command-test-KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition", "1", "-", "-", "0", "0"));
|
||||||
|
|
||||||
validateDescribeOutput(
|
validateDescribeOutput(
|
||||||
Arrays.asList("--bootstrap-server", cluster.bootstrapServers(), "--describe", "--verbose"), expectedHeader, expectedRows, List.of());
|
Arrays.asList("--bootstrap-server", cluster.bootstrapServers(), "--describe", "--verbose"), expectedHeader, expectedRows, List.of());
|
||||||
|
|
Loading…
Reference in New Issue