From 11459ae7e99c12132a0821c953769dd9976619ab Mon Sep 17 00:00:00 2001 From: Bruno Cadonna Date: Thu, 9 Jan 2025 13:16:03 +0100 Subject: [PATCH] KAFKA-18453: Add StreamsTopology class to group coordinator (#18446) Adds a class that represent the topology of a Streams group sent by a Streams client in the Streams group heartbeat during initialization to the group coordinator. This topology representation is used together with the partition metadata on the broker to create a configured topology. Reviewer: Lucas Brutschy --- .../group/streams/StreamsTopology.java | 84 ++++++++++ .../group/streams/StreamsTopologyTest.java | 150 ++++++++++++++++++ 2 files changed, 234 insertions(+) create mode 100644 group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsTopology.java create mode 100644 group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsTopologyTest.java diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsTopology.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsTopology.java new file mode 100644 index 00000000000..49ce9f9b4fd --- /dev/null +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsTopology.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.streams; + +import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.Subtopology; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.TopicInfo; + +import java.util.Collections; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * Contains the topology sent by a Streams client in the Streams heartbeat during initialization. + *

+ * This topology is used together with the partition metadata on the broker to create a + * {@link org.apache.kafka.coordinator.group.streams.topics.ConfiguredTopology configured topology}. + * This class allows to look-up subtopologies by subtopology ID in constant time by getting the subtopologies map. + * The information in this class is fully backed by records stored in the __consumer_offsets topic. + * + * @param topologyEpoch The epoch of the topology (must be non-negative). + * @param subtopologies The subtopologies of the topology containing information about source topics, + * repartition topics, changelog topics, co-partition groups etc. (must be non-null) + */ +public record StreamsTopology(int topologyEpoch, + Map subtopologies) { + + public StreamsTopology { + if (topologyEpoch < 0) { + throw new IllegalArgumentException("Topology epoch must be non-negative."); + } + subtopologies = Collections.unmodifiableMap(Objects.requireNonNull(subtopologies, "Subtopologies cannot be null.")); + } + + /** + * Returns the set of topics that are required by the topology. + *

+ * The required topics are used to determine the partition metadata on the brokers needed to configure the topology. + * + * @return set of topics required by the topology + */ + public Set requiredTopics() { + return subtopologies.values().stream() + .flatMap(x -> + Stream.concat( + Stream.concat( + x.sourceTopics().stream(), + x.repartitionSourceTopics().stream().map(TopicInfo::name) + ), + x.stateChangelogTopics().stream().map(TopicInfo::name) + ) + ).collect(Collectors.toSet()); + } + + /** + * Creates an instance of StreamsTopology from a StreamsGroupTopologyValue record. + * + * @param record The StreamsGroupTopologyValue record. + * @return The instance of StreamsTopology created from the record. + */ + public static StreamsTopology fromRecord(StreamsGroupTopologyValue record) { + return new StreamsTopology( + record.epoch(), + record.subtopologies().stream().collect(Collectors.toMap(Subtopology::subtopologyId, x -> x)) + ); + } +} diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsTopologyTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsTopologyTest.java new file mode 100644 index 00000000000..89c785d633e --- /dev/null +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsTopologyTest.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.streams; + +import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.Subtopology; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.TopicInfo; + +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class StreamsTopologyTest { + + private static final String SUBTOPOLOGY_ID_1 = "subtopology-1"; + private static final String SUBTOPOLOGY_ID_2 = "subtopology-2"; + private static final String SOURCE_TOPIC_1 = "source-topic-1"; + private static final String SOURCE_TOPIC_2 = "source-topic-2"; + private static final String SOURCE_TOPIC_3 = "source-topic-3"; + private static final String REPARTITION_TOPIC_1 = "repartition-topic-1"; + private static final String REPARTITION_TOPIC_2 = "repartition-topic-2"; + private static final String REPARTITION_TOPIC_3 = "repartition-topic-3"; + private static final String CHANGELOG_TOPIC_1 = "changelog-1"; + private static final String CHANGELOG_TOPIC_2 = "changelog-2"; + private static final String CHANGELOG_TOPIC_3 = "changelog-3"; + + @Test + public void subtopologiesMapShouldNotBeNull() { + final Exception exception = assertThrows(NullPointerException.class, () -> new StreamsTopology(1, null)); + assertEquals("Subtopologies cannot be null.", exception.getMessage()); + } + + @Test + public void topologyEpochShouldNotBeNegative() { + Map subtopologies = mkMap( + mkEntry(SUBTOPOLOGY_ID_1, mkSubtopology1()) + ); + final Exception exception = assertThrows(IllegalArgumentException.class, () -> new StreamsTopology(-1, subtopologies)); + assertEquals("Topology epoch must be non-negative.", exception.getMessage()); + } + + @Test + public void subtopologiesMapShouldBeImmutable() { + Map subtopologies = mkMap( + mkEntry(SUBTOPOLOGY_ID_1, mkSubtopology1()) + ); + assertThrows( + UnsupportedOperationException.class, + () -> new StreamsTopology(1, subtopologies).subtopologies().put("subtopology-2", mkSubtopology2()) + ); + } + + @Test + public void requiredTopicsShouldBeCorrect() { + Map subtopologies = mkMap( + mkEntry(SUBTOPOLOGY_ID_1, mkSubtopology1()), + mkEntry(SUBTOPOLOGY_ID_2, mkSubtopology2()) + ); + StreamsTopology topology = new StreamsTopology(1, subtopologies); + Set expectedTopics = Set.of( + SOURCE_TOPIC_1, SOURCE_TOPIC_2, SOURCE_TOPIC_3, + REPARTITION_TOPIC_1, REPARTITION_TOPIC_2, REPARTITION_TOPIC_3, + CHANGELOG_TOPIC_1, CHANGELOG_TOPIC_2, CHANGELOG_TOPIC_3 + ); + + assertEquals(expectedTopics, topology.requiredTopics()); + } + + @Test + public void fromRecordShouldCreateCorrectTopology() { + StreamsGroupTopologyValue record = new StreamsGroupTopologyValue() + .setEpoch(1) + .setSubtopologies(Arrays.asList(mkSubtopology1(), mkSubtopology2())); + StreamsTopology topology = StreamsTopology.fromRecord(record); + assertEquals(1, topology.topologyEpoch()); + assertEquals(2, topology.subtopologies().size()); + assertTrue(topology.subtopologies().containsKey(SUBTOPOLOGY_ID_1)); + assertEquals(mkSubtopology1(), topology.subtopologies().get(SUBTOPOLOGY_ID_1)); + assertTrue(topology.subtopologies().containsKey(SUBTOPOLOGY_ID_2)); + assertEquals(mkSubtopology2(), topology.subtopologies().get(SUBTOPOLOGY_ID_2)); + } + + private Subtopology mkSubtopology1() { + return new Subtopology() + .setSubtopologyId(SUBTOPOLOGY_ID_1) + .setSourceTopics(List.of( + SOURCE_TOPIC_1, + SOURCE_TOPIC_2, + REPARTITION_TOPIC_1, + REPARTITION_TOPIC_2 + )) + .setRepartitionSourceTopics(List.of( + new TopicInfo().setName(REPARTITION_TOPIC_1), + new TopicInfo().setName(REPARTITION_TOPIC_2) + )) + .setRepartitionSinkTopics(List.of( + REPARTITION_TOPIC_3 + )) + .setStateChangelogTopics(List.of( + new TopicInfo().setName(CHANGELOG_TOPIC_1), + new TopicInfo().setName(CHANGELOG_TOPIC_2) + )) + .setCopartitionGroups(List.of( + new StreamsGroupTopologyValue.CopartitionGroup() + .setRepartitionSourceTopics(List.of((short) 0)) + .setSourceTopics(List.of((short) 0)), + new StreamsGroupTopologyValue.CopartitionGroup() + .setRepartitionSourceTopics(List.of((short) 1)) + .setSourceTopics(List.of((short) 1)) + )); + } + + private Subtopology mkSubtopology2() { + return new Subtopology() + .setSubtopologyId(SUBTOPOLOGY_ID_2) + .setSourceTopics(List.of( + SOURCE_TOPIC_3, + REPARTITION_TOPIC_3 + )) + .setRepartitionSourceTopics(List.of( + new TopicInfo().setName(REPARTITION_TOPIC_3) + )) + .setStateChangelogTopics(List.of( + new TopicInfo().setName(CHANGELOG_TOPIC_3) + )); + } +}