From fb0518c34e7b5c28dbeb234e504117a06ae20970 Mon Sep 17 00:00:00 2001 From: Lucas Brutschy Date: Fri, 26 Sep 2025 19:40:47 +0200 Subject: [PATCH] KAFKA-19730: StreamsGroupDescribe result is missing topology (#20574) When toology not configured. In the streams group heartbeat, we validate the topology set for the group against the topic metadata, to generate the "configured topology" which has a specific number of partitions for each topic. In streams group describe, we use the configured topology to expose this information to the user. However, we leave the topology information as null in the streams group describe response, if the topology is not configured. This triggers an IllegalStateException in the admin client implementation. Instead, we should expose the unconfigured topology when the configured topology is not available, which will still expose useful information. Reviewers: Matthias J. Sax --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- .../api/PlaintextAdminIntegrationTest.scala | 41 ++++++ .../group/streams/StreamsGroup.java | 11 +- .../group/streams/StreamsTopology.java | 41 ++++++ .../group/GroupMetadataManagerTest.java | 53 +++++++- .../group/streams/StreamsGroupTest.java | 120 ++++++++++++++++++ .../group/streams/StreamsTopologyTest.java | 84 ++++++++++++ 6 files changed, 346 insertions(+), 4 deletions(-) diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala index 1c3a7ed42e3..4a686f3d4d8 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala @@ -4463,6 +4463,47 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { } } + @Test + def testDescribeStreamsGroupsNotReady(): Unit = { + val streamsGroupId = "stream_group_id" + val testTopicName = "test_topic" + + val config = createConfig + client = Admin.create(config) + + val streams = createStreamsGroup( + inputTopic = testTopicName, + streamsGroupId = streamsGroupId + ) + streams.poll(JDuration.ofMillis(500L)) + + try { + TestUtils.waitUntilTrue(() => { + val firstGroup = client.listGroups().all().get().stream() + .filter(g => g.groupId() == streamsGroupId).findFirst().orElse(null) + firstGroup.groupState().orElse(null) == GroupState.NOT_READY && firstGroup.groupId() == streamsGroupId + }, "Stream group not NOT_READY yet") + + // Verify the describe call works correctly + val describedGroups = client.describeStreamsGroups(util.List.of(streamsGroupId)).all().get() + val group = describedGroups.get(streamsGroupId) + assertNotNull(group) + assertEquals(streamsGroupId, group.groupId()) + assertFalse(group.members().isEmpty) + assertNotNull(group.subtopologies()) + assertFalse(group.subtopologies().isEmpty) + + // Verify the topology contains the expected source and sink topics + val subtopologies = group.subtopologies().asScala + assertTrue(subtopologies.exists(subtopology => + subtopology.sourceTopics().contains(testTopicName))) + + } finally { + Utils.closeQuietly(streams, "streams") + Utils.closeQuietly(client, "adminClient") + } + } + @Test def testDeleteStreamsGroups(): Unit = { val testTopicName = "test_topic" diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java index 72d4386321e..61f61d101f1 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java @@ -1039,7 +1039,16 @@ public class StreamsGroup implements Group { .setGroupEpoch(groupEpoch.get(committedOffset)) .setGroupState(state.get(committedOffset).toString()) .setAssignmentEpoch(targetAssignmentEpoch.get(committedOffset)) - .setTopology(configuredTopology.get(committedOffset).map(ConfiguredTopology::asStreamsGroupDescribeTopology).orElse(null)); + .setTopology( + configuredTopology.get(committedOffset) + .filter(ConfiguredTopology::isReady) + .map(ConfiguredTopology::asStreamsGroupDescribeTopology) + .orElse( + topology.get(committedOffset) + .map(StreamsTopology::asStreamsGroupDescribeTopology) + .orElseThrow(() -> new IllegalStateException("There should always be a topology for a streams group.")) + ) + ); members.entrySet(committedOffset).forEach( entry -> describedGroup.members().add( entry.getValue().asStreamsGroupDescribeMember( diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsTopology.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsTopology.java index 498ff00a595..25ea4376331 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsTopology.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsTopology.java @@ -16,12 +16,14 @@ */ package org.apache.kafka.coordinator.group.streams; +import org.apache.kafka.common.message.StreamsGroupDescribeResponseData; import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData; import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue; import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.Subtopology; import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.TopicInfo; import java.util.Collections; +import java.util.Comparator; import java.util.Map; import java.util.Objects; import java.util.Set; @@ -95,4 +97,43 @@ public record StreamsTopology(int topologyEpoch, .collect(Collectors.toMap(StreamsGroupTopologyValue.Subtopology::subtopologyId, x -> x)); return new StreamsTopology(topology.epoch(), subtopologyMap); } + + public StreamsGroupDescribeResponseData.Topology asStreamsGroupDescribeTopology() { + return new StreamsGroupDescribeResponseData.Topology() + .setEpoch(topologyEpoch) + .setSubtopologies( + subtopologies.entrySet().stream() + .sorted(Map.Entry.comparingByKey()) + .map(entry -> asStreamsGroupDescribeSubtopology(entry.getKey(), entry.getValue())) + .toList() + ); + } + + private StreamsGroupDescribeResponseData.Subtopology asStreamsGroupDescribeSubtopology(String subtopologyId, StreamsGroupTopologyValue.Subtopology subtopology) { + return new StreamsGroupDescribeResponseData.Subtopology() + .setSubtopologyId(subtopologyId) + .setSourceTopics(subtopology.sourceTopics().stream().sorted().toList()) + .setRepartitionSinkTopics(subtopology.repartitionSinkTopics().stream().sorted().toList()) + .setRepartitionSourceTopics(subtopology.repartitionSourceTopics().stream() + .map(this::asStreamsGroupDescribeTopicInfo) + .sorted(Comparator.comparing(StreamsGroupDescribeResponseData.TopicInfo::name)).toList()) + .setStateChangelogTopics(subtopology.stateChangelogTopics().stream() + .map(this::asStreamsGroupDescribeTopicInfo) + .sorted(Comparator.comparing(StreamsGroupDescribeResponseData.TopicInfo::name)).toList()); + } + + private StreamsGroupDescribeResponseData.TopicInfo asStreamsGroupDescribeTopicInfo(StreamsGroupTopologyValue.TopicInfo topicInfo) { + return new StreamsGroupDescribeResponseData.TopicInfo() + .setName(topicInfo.name()) + .setPartitions(topicInfo.partitions()) + .setReplicationFactor(topicInfo.replicationFactor()) + .setTopicConfigs( + topicInfo.topicConfigs().stream().map( + topicConfig -> new StreamsGroupDescribeResponseData.KeyValue() + .setKey(topicConfig.key()) + .setValue(topicConfig.value()) + ).toList() + ); + } + } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java index 8d4ae4fbe07..957ae7e8147 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java @@ -9645,19 +9645,43 @@ public class GroupMetadataManagerTest { .setProcessId("processId") .setMemberEpoch(epoch) .setPreviousMemberEpoch(epoch - 1); + String subtopology1 = "subtopology1"; + String fooTopicName = "foo"; + StreamsTopology topology = new StreamsTopology( + 0, + Map.of(subtopology1, + new StreamsGroupTopologyValue.Subtopology() + .setSubtopologyId(subtopology1) + .setSourceTopics(List.of(fooTopicName)) + ) + ); GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() - .withStreamsGroup(new StreamsGroupBuilder(streamsGroupIds.get(0), epoch)) + .withStreamsGroup(new StreamsGroupBuilder(streamsGroupIds.get(0), epoch) + .withTopology(topology) + ) .withStreamsGroup(new StreamsGroupBuilder(streamsGroupIds.get(1), epoch) - .withMember(memberBuilder.build())) + .withMember(memberBuilder.build()) + .withTopology(topology) + ) .build(); + StreamsGroupDescribeResponseData.Topology expectedTopology = + new StreamsGroupDescribeResponseData.Topology() + .setEpoch(0) + .setSubtopologies(List.of( + new StreamsGroupDescribeResponseData.Subtopology() + .setSubtopologyId(subtopology1) + .setSourceTopics(List.of(fooTopicName)) + )); + List expected = Arrays.asList( new StreamsGroupDescribeResponseData.DescribedGroup() .setGroupEpoch(epoch) .setGroupId(streamsGroupIds.get(0)) .setGroupState(StreamsGroupState.EMPTY.toString()) - .setAssignmentEpoch(0), + .setAssignmentEpoch(0) + .setTopology(expectedTopology), new StreamsGroupDescribeResponseData.DescribedGroup() .setGroupEpoch(epoch) .setGroupId(streamsGroupIds.get(1)) @@ -9666,6 +9690,7 @@ public class GroupMetadataManagerTest { TasksTuple.EMPTY ) )) + .setTopology(expectedTopology) .setGroupState(StreamsGroupState.NOT_READY.toString()) ); List actual = context.sendStreamsGroupDescribe(streamsGroupIds); @@ -9695,6 +9720,16 @@ public class GroupMetadataManagerTest { String memberId1 = "memberId1"; String memberId2 = "memberId2"; String subtopologyId = "subtopology1"; + String fooTopicName = "foo"; + StreamsGroupTopologyValue topology = new StreamsGroupTopologyValue() + .setEpoch(0) + .setSubtopologies( + List.of( + new StreamsGroupTopologyValue.Subtopology() + .setSubtopologyId(subtopologyId) + .setSourceTopics(List.of(fooTopicName)) + ) + ); GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder().build(); @@ -9702,6 +9737,7 @@ public class GroupMetadataManagerTest { context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(streamsGroupId, memberBuilder1.build())); context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(streamsGroupId, memberBuilder1.build())); context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(streamsGroupId, epoch + 1, 0)); + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(streamsGroupId, topology)); TasksTuple assignment = new TasksTuple( Map.of(subtopologyId, Set.of(0, 1)), @@ -9733,6 +9769,17 @@ public class GroupMetadataManagerTest { memberBuilder1.build().asStreamsGroupDescribeMember(TasksTuple.EMPTY), memberBuilder2.build().asStreamsGroupDescribeMember(assignment) )) + .setTopology( + new StreamsGroupDescribeResponseData.Topology() + .setEpoch(0) + .setSubtopologies( + List.of( + new StreamsGroupDescribeResponseData.Subtopology() + .setSubtopologyId(subtopologyId) + .setSourceTopics(List.of(fooTopicName)) + ) + ) + ) .setGroupState(StreamsGroup.StreamsGroupState.NOT_READY.toString()) .setGroupEpoch(epoch + 2); assertEquals(1, actual.size()); diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java index 71feb2a1e90..ba24abd2b80 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java @@ -1019,4 +1019,124 @@ public class StreamsGroupTest { streamsGroup.removeMember(memberId2); assertEquals(Optional.empty(), streamsGroup.getShutdownRequestMemberId()); } + + @Test + public void testAsDescribedGroupWithStreamsTopologyHavingSubtopologies() { + SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext()); + StreamsGroup group = new StreamsGroup(LOG_CONTEXT, snapshotRegistry, "group-id-with-topology"); + snapshotRegistry.idempotentCreateSnapshot(0); + + // Create a topology with subtopologies + Map subtopologies = Map.of( + "sub-1", new StreamsGroupTopologyValue.Subtopology() + .setSubtopologyId("sub-1") + .setSourceTopics(List.of("input-topic")) + .setRepartitionSourceTopics(List.of( + new StreamsGroupTopologyValue.TopicInfo().setName("repartition-topic") + )) + .setStateChangelogTopics(List.of( + new StreamsGroupTopologyValue.TopicInfo().setName("changelog-topic") + )) + ); + + group.setGroupEpoch(2); + group.setTopology(new StreamsTopology(2, subtopologies)); + group.setTargetAssignmentEpoch(2); + group.updateMember(new StreamsGroupMember.Builder("member1") + .setMemberEpoch(2) + .setPreviousMemberEpoch(1) + .setState(MemberState.STABLE) + .setInstanceId("instance1") + .setRackId("rack1") + .setClientId("client1") + .setClientHost("host1") + .setRebalanceTimeoutMs(1000) + .setTopologyEpoch(2) + .setProcessId("process1") + .setUserEndpoint(new StreamsGroupMemberMetadataValue.Endpoint().setHost("host1").setPort(9092)) + .setClientTags(Map.of("tag1", "value1")) + .setAssignedTasks(new TasksTuple(Map.of(), Map.of(), Map.of())) + .setTasksPendingRevocation(new TasksTuple(Map.of(), Map.of(), Map.of())) + .build()); + snapshotRegistry.idempotentCreateSnapshot(1); + + StreamsGroupDescribeResponseData.DescribedGroup describedGroup = group.asDescribedGroup(1); + + assertEquals("group-id-with-topology", describedGroup.groupId()); + assertEquals(StreamsGroup.StreamsGroupState.NOT_READY.toString(), describedGroup.groupState()); + assertEquals(2, describedGroup.groupEpoch()); + assertEquals(2, describedGroup.assignmentEpoch()); + + // Verify topology is correctly described + assertNotNull(describedGroup.topology()); + assertEquals(2, describedGroup.topology().epoch()); + assertEquals(1, describedGroup.topology().subtopologies().size()); + + StreamsGroupDescribeResponseData.Subtopology subtopology = describedGroup.topology().subtopologies().get(0); + assertEquals("sub-1", subtopology.subtopologyId()); + assertEquals(List.of("input-topic"), subtopology.sourceTopics()); + assertEquals(1, subtopology.repartitionSourceTopics().size()); + assertEquals("repartition-topic", subtopology.repartitionSourceTopics().get(0).name()); + assertEquals(1, subtopology.stateChangelogTopics().size()); + assertEquals("changelog-topic", subtopology.stateChangelogTopics().get(0).name()); + + assertEquals(1, describedGroup.members().size()); + assertEquals("member1", describedGroup.members().get(0).memberId()); + } + + @Test + public void testAsDescribedGroupPrefersConfiguredTopologyOverStreamsTopology() { + SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext()); + StreamsGroup group = new StreamsGroup(LOG_CONTEXT, snapshotRegistry, "group-id-configured"); + snapshotRegistry.idempotentCreateSnapshot(0); + + // Create both StreamsTopology and ConfiguredTopology + Map subtopologies = Map.of( + "sub-1", new StreamsGroupTopologyValue.Subtopology() + .setSubtopologyId("sub-1") + .setSourceTopics(List.of("streams-topic")) + ); + + group.setGroupEpoch(3); + group.setTopology(new StreamsTopology(2, subtopologies)); + group.setConfiguredTopology(new ConfiguredTopology(3, 0, Optional.of(new TreeMap<>()), Map.of(), Optional.empty())); + group.setTargetAssignmentEpoch(3); + snapshotRegistry.idempotentCreateSnapshot(1); + + StreamsGroupDescribeResponseData.DescribedGroup describedGroup = group.asDescribedGroup(1); + + // Should prefer ConfiguredTopology over StreamsTopology + assertNotNull(describedGroup.topology()); + assertEquals(3, describedGroup.topology().epoch()); // ConfiguredTopology epoch + assertEquals(0, describedGroup.topology().subtopologies().size()); // Empty configured topology + } + + @Test + public void testAsDescribedGroupFallbackToStreamsTopologyWhenConfiguredTopologyEmpty() { + SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext()); + StreamsGroup group = new StreamsGroup(LOG_CONTEXT, snapshotRegistry, "group-id-fallback"); + snapshotRegistry.idempotentCreateSnapshot(0); + + // Create StreamsTopology with subtopologies + Map subtopologies = Map.of( + "sub-1", new StreamsGroupTopologyValue.Subtopology() + .setSubtopologyId("sub-1") + .setSourceTopics(List.of("fallback-topic")) + ); + + group.setGroupEpoch(4); + group.setTopology(new StreamsTopology(4, subtopologies)); + // No ConfiguredTopology set, so should fallback to StreamsTopology + group.setTargetAssignmentEpoch(4); + snapshotRegistry.idempotentCreateSnapshot(1); + + StreamsGroupDescribeResponseData.DescribedGroup describedGroup = group.asDescribedGroup(1); + + // Should use StreamsTopology when ConfiguredTopology is not available + assertNotNull(describedGroup.topology()); + assertEquals(4, describedGroup.topology().epoch()); // StreamsTopology epoch + assertEquals(1, describedGroup.topology().subtopologies().size()); + assertEquals("sub-1", describedGroup.topology().subtopologies().get(0).subtopologyId()); + assertEquals(List.of("fallback-topic"), describedGroup.topology().subtopologies().get(0).sourceTopics()); + } } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsTopologyTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsTopologyTest.java index a9d269f2120..83ea799cdc9 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsTopologyTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsTopologyTest.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.coordinator.group.streams; +import org.apache.kafka.common.message.StreamsGroupDescribeResponseData; import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData; import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue; import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.Subtopology; @@ -24,6 +25,7 @@ import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.To import org.junit.jupiter.api.Test; import java.util.Arrays; +import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.Set; @@ -120,6 +122,88 @@ public class StreamsTopologyTest { assertEquals(mkSubtopology2(), topology.subtopologies().get(SUBTOPOLOGY_ID_2)); } + @Test + public void asStreamsGroupDescribeTopologyShouldReturnCorrectStructure() { + Map subtopologies = mkMap( + mkEntry(SUBTOPOLOGY_ID_1, mkSubtopology1()), + mkEntry(SUBTOPOLOGY_ID_2, mkSubtopology2()) + ); + StreamsTopology topology = new StreamsTopology(1, subtopologies); + + StreamsGroupDescribeResponseData.Topology describeTopology = topology.asStreamsGroupDescribeTopology(); + + assertEquals(1, describeTopology.epoch()); + assertEquals(2, describeTopology.subtopologies().size()); + + // Verify subtopologies are correctly converted and sorted + List sortedSubtopologies = + describeTopology.subtopologies().stream() + .sorted(Comparator.comparing(StreamsGroupDescribeResponseData.Subtopology::subtopologyId)) + .toList(); + + // Verify first subtopology + StreamsGroupDescribeResponseData.Subtopology sub1 = sortedSubtopologies.get(0); + assertEquals(SUBTOPOLOGY_ID_1, sub1.subtopologyId()); + // Source topics are sorted alphabetically + assertEquals(List.of(REPARTITION_TOPIC_1, REPARTITION_TOPIC_2, SOURCE_TOPIC_1, SOURCE_TOPIC_2), + sub1.sourceTopics()); + assertEquals(List.of(REPARTITION_TOPIC_3), sub1.repartitionSinkTopics()); + assertEquals(2, sub1.repartitionSourceTopics().size()); + assertEquals(2, sub1.stateChangelogTopics().size()); + + // Verify second subtopology + StreamsGroupDescribeResponseData.Subtopology sub2 = sortedSubtopologies.get(1); + assertEquals(SUBTOPOLOGY_ID_2, sub2.subtopologyId()); + // Source topics are sorted alphabetically + assertEquals(List.of(REPARTITION_TOPIC_3, SOURCE_TOPIC_3), sub2.sourceTopics()); + assertEquals(List.of(), sub2.repartitionSinkTopics()); + assertEquals(1, sub2.repartitionSourceTopics().size()); + assertEquals(1, sub2.stateChangelogTopics().size()); + } + + @Test + public void asStreamsGroupDescribeTopicInfoShouldConvertCorrectly() { + Map subtopologies = mkMap( + mkEntry(SUBTOPOLOGY_ID_1, mkSubtopology1()) + ); + StreamsTopology topology = new StreamsTopology(1, subtopologies); + + StreamsGroupDescribeResponseData.Topology describeTopology = topology.asStreamsGroupDescribeTopology(); + StreamsGroupDescribeResponseData.Subtopology describedSub = describeTopology.subtopologies().get(0); + + // Verify repartition source topics are correctly converted + List repartitionTopics = describedSub.repartitionSourceTopics(); + assertEquals(2, repartitionTopics.size()); + + // Find the first repartition topic (they should be sorted by name) + StreamsGroupDescribeResponseData.TopicInfo firstTopic = repartitionTopics.stream() + .filter(topic -> topic.name().equals(REPARTITION_TOPIC_1)) + .findFirst() + .orElseThrow(); + assertEquals(REPARTITION_TOPIC_1, firstTopic.name()); + + // Verify changelog topics are correctly converted + List changelogTopics = describedSub.stateChangelogTopics(); + assertEquals(2, changelogTopics.size()); + + // Find the first changelog topic (they should be sorted by name) + StreamsGroupDescribeResponseData.TopicInfo firstChangelog = changelogTopics.stream() + .filter(topic -> topic.name().equals(CHANGELOG_TOPIC_1)) + .findFirst() + .orElseThrow(); + assertEquals(CHANGELOG_TOPIC_1, firstChangelog.name()); + } + + @Test + public void asStreamsGroupDescribeTopologyWithEmptySubtopologies() { + StreamsTopology topology = new StreamsTopology(0, Map.of()); + + StreamsGroupDescribeResponseData.Topology describeTopology = topology.asStreamsGroupDescribeTopology(); + + assertEquals(0, describeTopology.epoch()); + assertEquals(0, describeTopology.subtopologies().size()); + } + private Subtopology mkSubtopology1() { return new Subtopology() .setSubtopologyId(SUBTOPOLOGY_ID_1)