KAFKA-19730: StreamsGroupDescribe result is missing topology (#20574)

When toology not configured.

In the streams group heartbeat, we validate the topology set for the
group against the topic metadata, to generate the "configured topology"
which has a specific number of partitions for each topic.

In streams group describe, we use the configured topology to expose this
information to the user. However, we leave the topology information as
null in the streams group describe response, if the topology is not
configured. This triggers an IllegalStateException in the admin client
implementation.

Instead, we should expose the unconfigured topology when the configured
topology is not available, which will still expose useful information.

Reviewers: Matthias J. Sax <matthias@confluent.io>

---------

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
This commit is contained in:
Lucas Brutschy 2025-09-26 19:40:47 +02:00 committed by GitHub
parent ac495f9ef7
commit fb0518c34e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 346 additions and 4 deletions

View File

@ -4463,6 +4463,47 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
} }
} }
@Test
def testDescribeStreamsGroupsNotReady(): Unit = {
val streamsGroupId = "stream_group_id"
val testTopicName = "test_topic"
val config = createConfig
client = Admin.create(config)
val streams = createStreamsGroup(
inputTopic = testTopicName,
streamsGroupId = streamsGroupId
)
streams.poll(JDuration.ofMillis(500L))
try {
TestUtils.waitUntilTrue(() => {
val firstGroup = client.listGroups().all().get().stream()
.filter(g => g.groupId() == streamsGroupId).findFirst().orElse(null)
firstGroup.groupState().orElse(null) == GroupState.NOT_READY && firstGroup.groupId() == streamsGroupId
}, "Stream group not NOT_READY yet")
// Verify the describe call works correctly
val describedGroups = client.describeStreamsGroups(util.List.of(streamsGroupId)).all().get()
val group = describedGroups.get(streamsGroupId)
assertNotNull(group)
assertEquals(streamsGroupId, group.groupId())
assertFalse(group.members().isEmpty)
assertNotNull(group.subtopologies())
assertFalse(group.subtopologies().isEmpty)
// Verify the topology contains the expected source and sink topics
val subtopologies = group.subtopologies().asScala
assertTrue(subtopologies.exists(subtopology =>
subtopology.sourceTopics().contains(testTopicName)))
} finally {
Utils.closeQuietly(streams, "streams")
Utils.closeQuietly(client, "adminClient")
}
}
@Test @Test
def testDeleteStreamsGroups(): Unit = { def testDeleteStreamsGroups(): Unit = {
val testTopicName = "test_topic" val testTopicName = "test_topic"

View File

@ -1039,7 +1039,16 @@ public class StreamsGroup implements Group {
.setGroupEpoch(groupEpoch.get(committedOffset)) .setGroupEpoch(groupEpoch.get(committedOffset))
.setGroupState(state.get(committedOffset).toString()) .setGroupState(state.get(committedOffset).toString())
.setAssignmentEpoch(targetAssignmentEpoch.get(committedOffset)) .setAssignmentEpoch(targetAssignmentEpoch.get(committedOffset))
.setTopology(configuredTopology.get(committedOffset).map(ConfiguredTopology::asStreamsGroupDescribeTopology).orElse(null)); .setTopology(
configuredTopology.get(committedOffset)
.filter(ConfiguredTopology::isReady)
.map(ConfiguredTopology::asStreamsGroupDescribeTopology)
.orElse(
topology.get(committedOffset)
.map(StreamsTopology::asStreamsGroupDescribeTopology)
.orElseThrow(() -> new IllegalStateException("There should always be a topology for a streams group."))
)
);
members.entrySet(committedOffset).forEach( members.entrySet(committedOffset).forEach(
entry -> describedGroup.members().add( entry -> describedGroup.members().add(
entry.getValue().asStreamsGroupDescribeMember( entry.getValue().asStreamsGroupDescribeMember(

View File

@ -16,12 +16,14 @@
*/ */
package org.apache.kafka.coordinator.group.streams; package org.apache.kafka.coordinator.group.streams;
import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData; import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData;
import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue; import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.Subtopology; import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.Subtopology;
import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.TopicInfo; import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.TopicInfo;
import java.util.Collections; import java.util.Collections;
import java.util.Comparator;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.Set; import java.util.Set;
@ -95,4 +97,43 @@ public record StreamsTopology(int topologyEpoch,
.collect(Collectors.toMap(StreamsGroupTopologyValue.Subtopology::subtopologyId, x -> x)); .collect(Collectors.toMap(StreamsGroupTopologyValue.Subtopology::subtopologyId, x -> x));
return new StreamsTopology(topology.epoch(), subtopologyMap); return new StreamsTopology(topology.epoch(), subtopologyMap);
} }
public StreamsGroupDescribeResponseData.Topology asStreamsGroupDescribeTopology() {
return new StreamsGroupDescribeResponseData.Topology()
.setEpoch(topologyEpoch)
.setSubtopologies(
subtopologies.entrySet().stream()
.sorted(Map.Entry.comparingByKey())
.map(entry -> asStreamsGroupDescribeSubtopology(entry.getKey(), entry.getValue()))
.toList()
);
}
private StreamsGroupDescribeResponseData.Subtopology asStreamsGroupDescribeSubtopology(String subtopologyId, StreamsGroupTopologyValue.Subtopology subtopology) {
return new StreamsGroupDescribeResponseData.Subtopology()
.setSubtopologyId(subtopologyId)
.setSourceTopics(subtopology.sourceTopics().stream().sorted().toList())
.setRepartitionSinkTopics(subtopology.repartitionSinkTopics().stream().sorted().toList())
.setRepartitionSourceTopics(subtopology.repartitionSourceTopics().stream()
.map(this::asStreamsGroupDescribeTopicInfo)
.sorted(Comparator.comparing(StreamsGroupDescribeResponseData.TopicInfo::name)).toList())
.setStateChangelogTopics(subtopology.stateChangelogTopics().stream()
.map(this::asStreamsGroupDescribeTopicInfo)
.sorted(Comparator.comparing(StreamsGroupDescribeResponseData.TopicInfo::name)).toList());
}
private StreamsGroupDescribeResponseData.TopicInfo asStreamsGroupDescribeTopicInfo(StreamsGroupTopologyValue.TopicInfo topicInfo) {
return new StreamsGroupDescribeResponseData.TopicInfo()
.setName(topicInfo.name())
.setPartitions(topicInfo.partitions())
.setReplicationFactor(topicInfo.replicationFactor())
.setTopicConfigs(
topicInfo.topicConfigs().stream().map(
topicConfig -> new StreamsGroupDescribeResponseData.KeyValue()
.setKey(topicConfig.key())
.setValue(topicConfig.value())
).toList()
);
}
} }

View File

@ -9645,19 +9645,43 @@ public class GroupMetadataManagerTest {
.setProcessId("processId") .setProcessId("processId")
.setMemberEpoch(epoch) .setMemberEpoch(epoch)
.setPreviousMemberEpoch(epoch - 1); .setPreviousMemberEpoch(epoch - 1);
String subtopology1 = "subtopology1";
String fooTopicName = "foo";
StreamsTopology topology = new StreamsTopology(
0,
Map.of(subtopology1,
new StreamsGroupTopologyValue.Subtopology()
.setSubtopologyId(subtopology1)
.setSourceTopics(List.of(fooTopicName))
)
);
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.withStreamsGroup(new StreamsGroupBuilder(streamsGroupIds.get(0), epoch)) .withStreamsGroup(new StreamsGroupBuilder(streamsGroupIds.get(0), epoch)
.withTopology(topology)
)
.withStreamsGroup(new StreamsGroupBuilder(streamsGroupIds.get(1), epoch) .withStreamsGroup(new StreamsGroupBuilder(streamsGroupIds.get(1), epoch)
.withMember(memberBuilder.build())) .withMember(memberBuilder.build())
.withTopology(topology)
)
.build(); .build();
StreamsGroupDescribeResponseData.Topology expectedTopology =
new StreamsGroupDescribeResponseData.Topology()
.setEpoch(0)
.setSubtopologies(List.of(
new StreamsGroupDescribeResponseData.Subtopology()
.setSubtopologyId(subtopology1)
.setSourceTopics(List.of(fooTopicName))
));
List<StreamsGroupDescribeResponseData.DescribedGroup> expected = Arrays.asList( List<StreamsGroupDescribeResponseData.DescribedGroup> expected = Arrays.asList(
new StreamsGroupDescribeResponseData.DescribedGroup() new StreamsGroupDescribeResponseData.DescribedGroup()
.setGroupEpoch(epoch) .setGroupEpoch(epoch)
.setGroupId(streamsGroupIds.get(0)) .setGroupId(streamsGroupIds.get(0))
.setGroupState(StreamsGroupState.EMPTY.toString()) .setGroupState(StreamsGroupState.EMPTY.toString())
.setAssignmentEpoch(0), .setAssignmentEpoch(0)
.setTopology(expectedTopology),
new StreamsGroupDescribeResponseData.DescribedGroup() new StreamsGroupDescribeResponseData.DescribedGroup()
.setGroupEpoch(epoch) .setGroupEpoch(epoch)
.setGroupId(streamsGroupIds.get(1)) .setGroupId(streamsGroupIds.get(1))
@ -9666,6 +9690,7 @@ public class GroupMetadataManagerTest {
TasksTuple.EMPTY TasksTuple.EMPTY
) )
)) ))
.setTopology(expectedTopology)
.setGroupState(StreamsGroupState.NOT_READY.toString()) .setGroupState(StreamsGroupState.NOT_READY.toString())
); );
List<StreamsGroupDescribeResponseData.DescribedGroup> actual = context.sendStreamsGroupDescribe(streamsGroupIds); List<StreamsGroupDescribeResponseData.DescribedGroup> actual = context.sendStreamsGroupDescribe(streamsGroupIds);
@ -9695,6 +9720,16 @@ public class GroupMetadataManagerTest {
String memberId1 = "memberId1"; String memberId1 = "memberId1";
String memberId2 = "memberId2"; String memberId2 = "memberId2";
String subtopologyId = "subtopology1"; String subtopologyId = "subtopology1";
String fooTopicName = "foo";
StreamsGroupTopologyValue topology = new StreamsGroupTopologyValue()
.setEpoch(0)
.setSubtopologies(
List.of(
new StreamsGroupTopologyValue.Subtopology()
.setSubtopologyId(subtopologyId)
.setSourceTopics(List.of(fooTopicName))
)
);
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder().build(); GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder().build();
@ -9702,6 +9737,7 @@ public class GroupMetadataManagerTest {
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(streamsGroupId, memberBuilder1.build())); context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(streamsGroupId, memberBuilder1.build()));
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(streamsGroupId, memberBuilder1.build())); context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(streamsGroupId, memberBuilder1.build()));
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(streamsGroupId, epoch + 1, 0)); context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(streamsGroupId, epoch + 1, 0));
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(streamsGroupId, topology));
TasksTuple assignment = new TasksTuple( TasksTuple assignment = new TasksTuple(
Map.of(subtopologyId, Set.of(0, 1)), Map.of(subtopologyId, Set.of(0, 1)),
@ -9733,6 +9769,17 @@ public class GroupMetadataManagerTest {
memberBuilder1.build().asStreamsGroupDescribeMember(TasksTuple.EMPTY), memberBuilder1.build().asStreamsGroupDescribeMember(TasksTuple.EMPTY),
memberBuilder2.build().asStreamsGroupDescribeMember(assignment) memberBuilder2.build().asStreamsGroupDescribeMember(assignment)
)) ))
.setTopology(
new StreamsGroupDescribeResponseData.Topology()
.setEpoch(0)
.setSubtopologies(
List.of(
new StreamsGroupDescribeResponseData.Subtopology()
.setSubtopologyId(subtopologyId)
.setSourceTopics(List.of(fooTopicName))
)
)
)
.setGroupState(StreamsGroup.StreamsGroupState.NOT_READY.toString()) .setGroupState(StreamsGroup.StreamsGroupState.NOT_READY.toString())
.setGroupEpoch(epoch + 2); .setGroupEpoch(epoch + 2);
assertEquals(1, actual.size()); assertEquals(1, actual.size());

View File

@ -1019,4 +1019,124 @@ public class StreamsGroupTest {
streamsGroup.removeMember(memberId2); streamsGroup.removeMember(memberId2);
assertEquals(Optional.empty(), streamsGroup.getShutdownRequestMemberId()); assertEquals(Optional.empty(), streamsGroup.getShutdownRequestMemberId());
} }
@Test
public void testAsDescribedGroupWithStreamsTopologyHavingSubtopologies() {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
StreamsGroup group = new StreamsGroup(LOG_CONTEXT, snapshotRegistry, "group-id-with-topology");
snapshotRegistry.idempotentCreateSnapshot(0);
// Create a topology with subtopologies
Map<String, StreamsGroupTopologyValue.Subtopology> subtopologies = Map.of(
"sub-1", new StreamsGroupTopologyValue.Subtopology()
.setSubtopologyId("sub-1")
.setSourceTopics(List.of("input-topic"))
.setRepartitionSourceTopics(List.of(
new StreamsGroupTopologyValue.TopicInfo().setName("repartition-topic")
))
.setStateChangelogTopics(List.of(
new StreamsGroupTopologyValue.TopicInfo().setName("changelog-topic")
))
);
group.setGroupEpoch(2);
group.setTopology(new StreamsTopology(2, subtopologies));
group.setTargetAssignmentEpoch(2);
group.updateMember(new StreamsGroupMember.Builder("member1")
.setMemberEpoch(2)
.setPreviousMemberEpoch(1)
.setState(MemberState.STABLE)
.setInstanceId("instance1")
.setRackId("rack1")
.setClientId("client1")
.setClientHost("host1")
.setRebalanceTimeoutMs(1000)
.setTopologyEpoch(2)
.setProcessId("process1")
.setUserEndpoint(new StreamsGroupMemberMetadataValue.Endpoint().setHost("host1").setPort(9092))
.setClientTags(Map.of("tag1", "value1"))
.setAssignedTasks(new TasksTuple(Map.of(), Map.of(), Map.of()))
.setTasksPendingRevocation(new TasksTuple(Map.of(), Map.of(), Map.of()))
.build());
snapshotRegistry.idempotentCreateSnapshot(1);
StreamsGroupDescribeResponseData.DescribedGroup describedGroup = group.asDescribedGroup(1);
assertEquals("group-id-with-topology", describedGroup.groupId());
assertEquals(StreamsGroup.StreamsGroupState.NOT_READY.toString(), describedGroup.groupState());
assertEquals(2, describedGroup.groupEpoch());
assertEquals(2, describedGroup.assignmentEpoch());
// Verify topology is correctly described
assertNotNull(describedGroup.topology());
assertEquals(2, describedGroup.topology().epoch());
assertEquals(1, describedGroup.topology().subtopologies().size());
StreamsGroupDescribeResponseData.Subtopology subtopology = describedGroup.topology().subtopologies().get(0);
assertEquals("sub-1", subtopology.subtopologyId());
assertEquals(List.of("input-topic"), subtopology.sourceTopics());
assertEquals(1, subtopology.repartitionSourceTopics().size());
assertEquals("repartition-topic", subtopology.repartitionSourceTopics().get(0).name());
assertEquals(1, subtopology.stateChangelogTopics().size());
assertEquals("changelog-topic", subtopology.stateChangelogTopics().get(0).name());
assertEquals(1, describedGroup.members().size());
assertEquals("member1", describedGroup.members().get(0).memberId());
}
@Test
public void testAsDescribedGroupPrefersConfiguredTopologyOverStreamsTopology() {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
StreamsGroup group = new StreamsGroup(LOG_CONTEXT, snapshotRegistry, "group-id-configured");
snapshotRegistry.idempotentCreateSnapshot(0);
// Create both StreamsTopology and ConfiguredTopology
Map<String, StreamsGroupTopologyValue.Subtopology> subtopologies = Map.of(
"sub-1", new StreamsGroupTopologyValue.Subtopology()
.setSubtopologyId("sub-1")
.setSourceTopics(List.of("streams-topic"))
);
group.setGroupEpoch(3);
group.setTopology(new StreamsTopology(2, subtopologies));
group.setConfiguredTopology(new ConfiguredTopology(3, 0, Optional.of(new TreeMap<>()), Map.of(), Optional.empty()));
group.setTargetAssignmentEpoch(3);
snapshotRegistry.idempotentCreateSnapshot(1);
StreamsGroupDescribeResponseData.DescribedGroup describedGroup = group.asDescribedGroup(1);
// Should prefer ConfiguredTopology over StreamsTopology
assertNotNull(describedGroup.topology());
assertEquals(3, describedGroup.topology().epoch()); // ConfiguredTopology epoch
assertEquals(0, describedGroup.topology().subtopologies().size()); // Empty configured topology
}
@Test
public void testAsDescribedGroupFallbackToStreamsTopologyWhenConfiguredTopologyEmpty() {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
StreamsGroup group = new StreamsGroup(LOG_CONTEXT, snapshotRegistry, "group-id-fallback");
snapshotRegistry.idempotentCreateSnapshot(0);
// Create StreamsTopology with subtopologies
Map<String, StreamsGroupTopologyValue.Subtopology> subtopologies = Map.of(
"sub-1", new StreamsGroupTopologyValue.Subtopology()
.setSubtopologyId("sub-1")
.setSourceTopics(List.of("fallback-topic"))
);
group.setGroupEpoch(4);
group.setTopology(new StreamsTopology(4, subtopologies));
// No ConfiguredTopology set, so should fallback to StreamsTopology
group.setTargetAssignmentEpoch(4);
snapshotRegistry.idempotentCreateSnapshot(1);
StreamsGroupDescribeResponseData.DescribedGroup describedGroup = group.asDescribedGroup(1);
// Should use StreamsTopology when ConfiguredTopology is not available
assertNotNull(describedGroup.topology());
assertEquals(4, describedGroup.topology().epoch()); // StreamsTopology epoch
assertEquals(1, describedGroup.topology().subtopologies().size());
assertEquals("sub-1", describedGroup.topology().subtopologies().get(0).subtopologyId());
assertEquals(List.of("fallback-topic"), describedGroup.topology().subtopologies().get(0).sourceTopics());
}
} }

View File

@ -16,6 +16,7 @@
*/ */
package org.apache.kafka.coordinator.group.streams; package org.apache.kafka.coordinator.group.streams;
import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData; import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData;
import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue; import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.Subtopology; import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.Subtopology;
@ -24,6 +25,7 @@ import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.To
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import java.util.Arrays; import java.util.Arrays;
import java.util.Comparator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
@ -120,6 +122,88 @@ public class StreamsTopologyTest {
assertEquals(mkSubtopology2(), topology.subtopologies().get(SUBTOPOLOGY_ID_2)); assertEquals(mkSubtopology2(), topology.subtopologies().get(SUBTOPOLOGY_ID_2));
} }
@Test
public void asStreamsGroupDescribeTopologyShouldReturnCorrectStructure() {
Map<String, Subtopology> subtopologies = mkMap(
mkEntry(SUBTOPOLOGY_ID_1, mkSubtopology1()),
mkEntry(SUBTOPOLOGY_ID_2, mkSubtopology2())
);
StreamsTopology topology = new StreamsTopology(1, subtopologies);
StreamsGroupDescribeResponseData.Topology describeTopology = topology.asStreamsGroupDescribeTopology();
assertEquals(1, describeTopology.epoch());
assertEquals(2, describeTopology.subtopologies().size());
// Verify subtopologies are correctly converted and sorted
List<StreamsGroupDescribeResponseData.Subtopology> sortedSubtopologies =
describeTopology.subtopologies().stream()
.sorted(Comparator.comparing(StreamsGroupDescribeResponseData.Subtopology::subtopologyId))
.toList();
// Verify first subtopology
StreamsGroupDescribeResponseData.Subtopology sub1 = sortedSubtopologies.get(0);
assertEquals(SUBTOPOLOGY_ID_1, sub1.subtopologyId());
// Source topics are sorted alphabetically
assertEquals(List.of(REPARTITION_TOPIC_1, REPARTITION_TOPIC_2, SOURCE_TOPIC_1, SOURCE_TOPIC_2),
sub1.sourceTopics());
assertEquals(List.of(REPARTITION_TOPIC_3), sub1.repartitionSinkTopics());
assertEquals(2, sub1.repartitionSourceTopics().size());
assertEquals(2, sub1.stateChangelogTopics().size());
// Verify second subtopology
StreamsGroupDescribeResponseData.Subtopology sub2 = sortedSubtopologies.get(1);
assertEquals(SUBTOPOLOGY_ID_2, sub2.subtopologyId());
// Source topics are sorted alphabetically
assertEquals(List.of(REPARTITION_TOPIC_3, SOURCE_TOPIC_3), sub2.sourceTopics());
assertEquals(List.of(), sub2.repartitionSinkTopics());
assertEquals(1, sub2.repartitionSourceTopics().size());
assertEquals(1, sub2.stateChangelogTopics().size());
}
@Test
public void asStreamsGroupDescribeTopicInfoShouldConvertCorrectly() {
Map<String, Subtopology> subtopologies = mkMap(
mkEntry(SUBTOPOLOGY_ID_1, mkSubtopology1())
);
StreamsTopology topology = new StreamsTopology(1, subtopologies);
StreamsGroupDescribeResponseData.Topology describeTopology = topology.asStreamsGroupDescribeTopology();
StreamsGroupDescribeResponseData.Subtopology describedSub = describeTopology.subtopologies().get(0);
// Verify repartition source topics are correctly converted
List<StreamsGroupDescribeResponseData.TopicInfo> repartitionTopics = describedSub.repartitionSourceTopics();
assertEquals(2, repartitionTopics.size());
// Find the first repartition topic (they should be sorted by name)
StreamsGroupDescribeResponseData.TopicInfo firstTopic = repartitionTopics.stream()
.filter(topic -> topic.name().equals(REPARTITION_TOPIC_1))
.findFirst()
.orElseThrow();
assertEquals(REPARTITION_TOPIC_1, firstTopic.name());
// Verify changelog topics are correctly converted
List<StreamsGroupDescribeResponseData.TopicInfo> changelogTopics = describedSub.stateChangelogTopics();
assertEquals(2, changelogTopics.size());
// Find the first changelog topic (they should be sorted by name)
StreamsGroupDescribeResponseData.TopicInfo firstChangelog = changelogTopics.stream()
.filter(topic -> topic.name().equals(CHANGELOG_TOPIC_1))
.findFirst()
.orElseThrow();
assertEquals(CHANGELOG_TOPIC_1, firstChangelog.name());
}
@Test
public void asStreamsGroupDescribeTopologyWithEmptySubtopologies() {
StreamsTopology topology = new StreamsTopology(0, Map.of());
StreamsGroupDescribeResponseData.Topology describeTopology = topology.asStreamsGroupDescribeTopology();
assertEquals(0, describeTopology.epoch());
assertEquals(0, describeTopology.subtopologies().size());
}
private Subtopology mkSubtopology1() { private Subtopology mkSubtopology1() {
return new Subtopology() return new Subtopology()
.setSubtopologyId(SUBTOPOLOGY_ID_1) .setSubtopologyId(SUBTOPOLOGY_ID_1)