KAFKA-19162: Topology metadata contains non-deterministically ordered topic configs (#19491)

Topology description sent to broker in KIP-1071 contains
non-deterministically ordered topic configs.  Since the topology is
compared to the groups topology upon joining we may run into
`INVALID_REQUEST: Topology updates are not supported yet` failures if
the topology sent by the  application does not match the group topology
due to different topic config order.

This PR ensures that topic configs are ordered, to avoid an
`INVALID_REQUEST` error.

Reviewers: Matthias J. Sax <matthias@confluent.io>
This commit is contained in:
Lucas Brutschy 2025-04-17 06:12:17 +02:00 committed by GitHub
parent 144101a7c1
commit 5f80de3923
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 15 additions and 2 deletions

View File

@ -237,6 +237,7 @@ public class StreamsGroupHeartbeatRequestManager implements RequestManager {
repartitionTopicInfo.topicConfigs().add(new StreamsGroupHeartbeatRequestData.KeyValue().setKey(k).setValue(v))
);
repartitionTopicsInfo.add(repartitionTopicInfo);
repartitionTopicInfo.topicConfigs().sort(Comparator.comparing(StreamsGroupHeartbeatRequestData.KeyValue::key));
}
repartitionTopicsInfo.sort(Comparator.comparing(StreamsGroupHeartbeatRequestData.TopicInfo::name));
return repartitionTopicsInfo;
@ -251,6 +252,7 @@ public class StreamsGroupHeartbeatRequestManager implements RequestManager {
changelogTopic.getValue().topicConfigs().forEach((k, v) ->
changelogTopicInfo.topicConfigs().add(new StreamsGroupHeartbeatRequestData.KeyValue().setKey(k).setValue(v))
);
changelogTopicInfo.topicConfigs().sort(Comparator.comparing(StreamsGroupHeartbeatRequestData.KeyValue::key));
changelogTopicsInfo.add(changelogTopicInfo);
}
changelogTopicsInfo.sort(Comparator.comparing(StreamsGroupHeartbeatRequestData.TopicInfo::name));

View File

@ -108,7 +108,7 @@ class StreamsGroupHeartbeatRequestManagerTest {
private static final String REPARTITION_SOURCE_TOPIC_1 = "repartitionSourceTopic1";
private static final String REPARTITION_SOURCE_TOPIC_2 = "repartitionSourceTopic2";
private static final Map<String, StreamsRebalanceData.TopicInfo> REPARTITION_SOURCE_TOPICS = Map.of(
REPARTITION_SOURCE_TOPIC_1, new StreamsRebalanceData.TopicInfo(Optional.of(2), Optional.of((short) 1), Map.of("config1", "value1")),
REPARTITION_SOURCE_TOPIC_1, new StreamsRebalanceData.TopicInfo(Optional.of(2), Optional.of((short) 1), Map.of("config3", "value3", "config1", "value1")),
REPARTITION_SOURCE_TOPIC_2, new StreamsRebalanceData.TopicInfo(Optional.of(3), Optional.of((short) 3), Collections.emptyMap())
);
private static final String CHANGELOG_TOPIC_1 = "changelogTopic1";
@ -117,7 +117,7 @@ class StreamsGroupHeartbeatRequestManagerTest {
private static final Map<String, StreamsRebalanceData.TopicInfo> CHANGELOG_TOPICS = Map.of(
CHANGELOG_TOPIC_1, new StreamsRebalanceData.TopicInfo(Optional.empty(), Optional.of((short) 1), Map.of()),
CHANGELOG_TOPIC_2, new StreamsRebalanceData.TopicInfo(Optional.empty(), Optional.of((short) 2), Map.of()),
CHANGELOG_TOPIC_3, new StreamsRebalanceData.TopicInfo(Optional.empty(), Optional.of((short) 3), Map.of("config2", "value2"))
CHANGELOG_TOPIC_3, new StreamsRebalanceData.TopicInfo(Optional.empty(), Optional.of((short) 3), Map.of("config4", "value4", "config2", "value2"))
);
private static final Collection<Set<String>> COPARTITION_GROUP = Set.of(
Set.of(SOURCE_TOPIC_1, REPARTITION_SOURCE_TOPIC_2),
@ -664,6 +664,7 @@ class StreamsGroupHeartbeatRequestManagerTest {
assertEquals(repartitionTopic.numPartitions().get(), topicInfo.partitions());
assertEquals(repartitionTopic.replicationFactor().get(), topicInfo.replicationFactor());
assertEquals(repartitionTopic.topicConfigs().size(), topicInfo.topicConfigs().size());
assertTrue(isSorted(topicInfo.topicConfigs(), Comparator.comparing(StreamsGroupHeartbeatRequestData.KeyValue::key)));
});
assertEquals(CHANGELOG_TOPICS.size(), subtopology1.stateChangelogTopics().size());
subtopology1.stateChangelogTopics().forEach(topicInfo -> {
@ -672,6 +673,7 @@ class StreamsGroupHeartbeatRequestManagerTest {
final StreamsRebalanceData.TopicInfo changelogTopic = CHANGELOG_TOPICS.get(topicInfo.name());
assertEquals(changelogTopic.replicationFactor().get(), topicInfo.replicationFactor());
assertEquals(changelogTopic.topicConfigs().size(), topicInfo.topicConfigs().size());
assertTrue(isSorted(topicInfo.topicConfigs(), Comparator.comparing(StreamsGroupHeartbeatRequestData.KeyValue::key)));
});
assertEquals(2, subtopology1.copartitionGroups().size());
final StreamsGroupHeartbeatRequestData.CopartitionGroup expectedCopartitionGroupData1 =
@ -701,6 +703,15 @@ class StreamsGroupHeartbeatRequestManagerTest {
assertNull(nonJoiningRequestData.topology());
}
private <V> boolean isSorted(List<V> collection, Comparator<V> comparator) {
for (int i = 1; i < collection.size(); i++) {
if (comparator.compare(collection.get(i - 1), collection.get(i)) > 0) {
return false;
}
}
return true;
}
@ParameterizedTest
@MethodSource("provideNonJoiningStates")
public void testBuildingHeartbeatRequestRebalanceTimeoutSentWhenJoining(final MemberState memberState) {