KAFKA-12254: Ensure MM2 creates topics with source topic configs (#10217)

MM2 creates new topics on the destination cluster with default configurations. It has an async periodic task to refresh topic configurations from the source to destination. However, this opens up a window where the destination cluster has data produced to it with default configurations. In the worst case, this could cause data loss if the destination topic is created without the right cleanup.policy. This commit fixes the above issue by ensuring that the right configurations are supplied to AdminClient#createTopics when MM2 creates topics on the destination cluster.

Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>
This commit is contained in:
Dhruvil Shah 2021-03-01 01:30:30 -08:00 committed by GitHub
parent 5813446730
commit cc088c5abe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 92 additions and 29 deletions

View File

@ -49,6 +49,7 @@ import java.util.Set;
import java.util.HashSet;
import java.util.Collection;
import java.util.Collections;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.concurrent.ExecutionException;
@ -306,40 +307,82 @@ public class MirrorSourceConnector extends SourceConnector {
MirrorUtils.createSinglePartitionCompactedTopic(config.offsetSyncsTopic(), config.offsetSyncsTopicReplicationFactor(), config.sourceAdminConfig());
}
// visible for testing
void computeAndCreateTopicPartitions()
throws InterruptedException, ExecutionException {
Map<String, Long> partitionCounts = knownSourceTopicPartitions.stream()
.collect(Collectors.groupingBy(TopicPartition::topic, Collectors.counting())).entrySet().stream()
.collect(Collectors.toMap(x -> formatRemoteTopic(x.getKey()), Entry::getValue));
Set<String> knownTargetTopics = toTopics(knownTargetTopicPartitions);
List<NewTopic> newTopics = partitionCounts.entrySet().stream()
.filter(x -> !knownTargetTopics.contains(x.getKey()))
.map(x -> new NewTopic(x.getKey(), x.getValue().intValue(), (short) replicationFactor))
.collect(Collectors.toList());
Map<String, NewPartitions> newPartitions = partitionCounts.entrySet().stream()
.filter(x -> knownTargetTopics.contains(x.getKey()))
.collect(Collectors.toMap(Entry::getKey, x -> NewPartitions.increaseTo(x.getValue().intValue())));
createTopicPartitions(partitionCounts, newTopics, newPartitions);
void computeAndCreateTopicPartitions() throws ExecutionException, InterruptedException {
// get source and target topics with respective partition counts
Map<String, Long> sourceTopicToPartitionCounts = knownSourceTopicPartitions.stream()
.collect(Collectors.groupingBy(TopicPartition::topic, Collectors.counting())).entrySet().stream()
.collect(Collectors.toMap(Entry::getKey, Entry::getValue));
Map<String, Long> targetTopicToPartitionCounts = knownTargetTopicPartitions.stream()
.collect(Collectors.groupingBy(TopicPartition::topic, Collectors.counting())).entrySet().stream()
.collect(Collectors.toMap(Entry::getKey, Entry::getValue));
Set<String> knownSourceTopics = sourceTopicToPartitionCounts.keySet();
Set<String> knownTargetTopics = targetTopicToPartitionCounts.keySet();
Map<String, String> sourceToRemoteTopics = knownSourceTopics.stream()
.collect(Collectors.toMap(Function.identity(), sourceTopic -> formatRemoteTopic(sourceTopic)));
// compute existing and new source topics
Map<Boolean, Set<String>> partitionedSourceTopics = knownSourceTopics.stream()
.collect(Collectors.partitioningBy(sourceTopic -> knownTargetTopics.contains(sourceToRemoteTopics.get(sourceTopic)),
Collectors.toSet()));
Set<String> existingSourceTopics = partitionedSourceTopics.get(true);
Set<String> newSourceTopics = partitionedSourceTopics.get(false);
// create new topics
if (!newSourceTopics.isEmpty())
createNewTopics(newSourceTopics, sourceTopicToPartitionCounts);
// compute topics with new partitions
Map<String, Long> sourceTopicsWithNewPartitions = existingSourceTopics.stream()
.filter(sourceTopic -> {
String targetTopic = sourceToRemoteTopics.get(sourceTopic);
return sourceTopicToPartitionCounts.get(sourceTopic) > targetTopicToPartitionCounts.get(targetTopic);
})
.collect(Collectors.toMap(Function.identity(), sourceTopicToPartitionCounts::get));
// create new partitions
if (!sourceTopicsWithNewPartitions.isEmpty()) {
Map<String, NewPartitions> newTargetPartitions = sourceTopicsWithNewPartitions.entrySet().stream()
.collect(Collectors.toMap(sourceTopicAndPartitionCount -> sourceToRemoteTopics.get(sourceTopicAndPartitionCount.getKey()),
sourceTopicAndPartitionCount -> NewPartitions.increaseTo(sourceTopicAndPartitionCount.getValue().intValue())));
createNewPartitions(newTargetPartitions);
}
}
private void createNewTopics(Set<String> newSourceTopics, Map<String, Long> sourceTopicToPartitionCounts)
throws ExecutionException, InterruptedException {
Map<String, Config> sourceTopicToConfig = describeTopicConfigs(newSourceTopics);
Map<String, NewTopic> newTopics = newSourceTopics.stream()
.map(sourceTopic -> {
String remoteTopic = formatRemoteTopic(sourceTopic);
int partitionCount = sourceTopicToPartitionCounts.get(sourceTopic).intValue();
Map<String, String> configs = configToMap(sourceTopicToConfig.get(sourceTopic));
return new NewTopic(remoteTopic, partitionCount, (short) replicationFactor)
.configs(configs);
})
.collect(Collectors.toMap(NewTopic::name, Function.identity()));
createNewTopics(newTopics);
}
// visible for testing
void createTopicPartitions(Map<String, Long> partitionCounts, List<NewTopic> newTopics,
Map<String, NewPartitions> newPartitions) {
targetAdminClient.createTopics(newTopics, new CreateTopicsOptions()).values().forEach((k, v) -> v.whenComplete((x, e) -> {
void createNewTopics(Map<String, NewTopic> newTopics) {
targetAdminClient.createTopics(newTopics.values(), new CreateTopicsOptions()).values().forEach((k, v) -> v.whenComplete((x, e) -> {
if (e != null) {
log.warn("Could not create topic {}.", k, e);
} else {
log.info("Created remote topic {} with {} partitions.", k, partitionCounts.get(k));
log.info("Created remote topic {} with {} partitions.", k, newTopics.get(k).numPartitions());
}
}));
}
void createNewPartitions(Map<String, NewPartitions> newPartitions) {
targetAdminClient.createPartitions(newPartitions).values().forEach((k, v) -> v.whenComplete((x, e) -> {
if (e instanceof InvalidPartitionsException) {
// swallow, this is normal
} else if (e != null) {
log.warn("Could not create topic-partitions for {}.", k, e);
} else {
log.info("Increased size of {} to {} partitions.", k, partitionCounts.get(k));
log.info("Increased size of {} to {} partitions.", k, newPartitions.get(k).totalCount());
}
}));
}
@ -359,6 +402,11 @@ public class MirrorSourceConnector extends SourceConnector {
return adminClient.describeTopics(topics).all().get().values();
}
static Map<String, String> configToMap(Config config) {
return config.entries().stream()
.collect(Collectors.toMap(ConfigEntry::name, ConfigEntry::value));
}
@SuppressWarnings("deprecation")
// use deprecated alterConfigs API for broker compatibility back to 0.11.0
private void updateTopicConfigs(Map<String, Config> topicConfigs)
@ -390,7 +438,7 @@ public class MirrorSourceConnector extends SourceConnector {
.map(x -> new TopicPartition(topic, x.partition()));
}
private Map<String, Config> describeTopicConfigs(Set<String> topics)
Map<String, Config> describeTopicConfigs(Set<String> topics)
throws InterruptedException, ExecutionException {
Set<ConfigResource> resources = topics.stream()
.map(x -> new ConfigResource(ConfigResource.Type.TOPIC, x))

View File

@ -183,10 +183,16 @@ public class MirrorSourceConnectorTest {
connector.initialize(mock(ConnectorContext.class));
connector = spy(connector);
Config topicConfig = new Config(Arrays.asList(
new ConfigEntry("cleanup.policy", "compact"),
new ConfigEntry("segment.bytes", "100")));
Map<String, Config> configs = Collections.singletonMap("topic", topicConfig);
List<TopicPartition> sourceTopicPartitions = Collections.singletonList(new TopicPartition("topic", 0));
doReturn(sourceTopicPartitions).when(connector).findSourceTopicPartitions();
doReturn(Collections.emptyList()).when(connector).findTargetTopicPartitions();
doNothing().when(connector).createTopicPartitions(any(), any(), any());
doReturn(configs).when(connector).describeTopicConfigs(Collections.singleton("topic"));
doNothing().when(connector).createNewTopics(any());
connector.refreshTopicPartitions();
// if target topic is not created, refreshTopicPartitions() will call createTopicPartitions() again
@ -194,13 +200,15 @@ public class MirrorSourceConnectorTest {
Map<String, Long> expectedPartitionCounts = new HashMap<>();
expectedPartitionCounts.put("source.topic", 1L);
List<NewTopic> expectedNewTopics = Arrays.asList(new NewTopic("source.topic", 1, (short) 0));
Map<String, String> configMap = MirrorSourceConnector.configToMap(topicConfig);
assertEquals(2, configMap.size());
Map<String, NewTopic> expectedNewTopics = new HashMap<>();
expectedNewTopics.put("source.topic", new NewTopic("source.topic", 1, (short) 0).configs(configMap));
verify(connector, times(2)).computeAndCreateTopicPartitions();
verify(connector, times(2)).createTopicPartitions(
eq(expectedPartitionCounts),
eq(expectedNewTopics),
eq(Collections.emptyMap()));
verify(connector, times(2)).createNewTopics(eq(expectedNewTopics));
verify(connector, times(0)).createNewPartitions(any());
List<TopicPartition> targetTopicPartitions = Collections.singletonList(new TopicPartition("source.topic", 0));
doReturn(targetTopicPartitions).when(connector).findTargetTopicPartitions();
@ -217,11 +225,19 @@ public class MirrorSourceConnectorTest {
connector.initialize(mock(ConnectorContext.class));
connector = spy(connector);
Config topicConfig = new Config(Arrays.asList(
new ConfigEntry("cleanup.policy", "compact"),
new ConfigEntry("segment.bytes", "100")));
Map<String, Config> configs = Collections.singletonMap("source.topic", topicConfig);
List<TopicPartition> sourceTopicPartitions = Collections.emptyList();
List<TopicPartition> targetTopicPartitions = Collections.singletonList(new TopicPartition("source.topic", 0));
doReturn(sourceTopicPartitions).when(connector).findSourceTopicPartitions();
doReturn(targetTopicPartitions).when(connector).findTargetTopicPartitions();
doNothing().when(connector).createTopicPartitions(any(), any(), any());
doReturn(configs).when(connector).describeTopicConfigs(Collections.singleton("source.topic"));
doReturn(Collections.emptyMap()).when(connector).describeTopicConfigs(Collections.emptySet());
doNothing().when(connector).createNewTopics(any());
doNothing().when(connector).createNewPartitions(any());
// partitions appearing on the target cluster should not cause reconfiguration
connector.refreshTopicPartitions();
@ -234,6 +250,5 @@ public class MirrorSourceConnectorTest {
// when partitions are added to the source cluster, reconfiguration is triggered
connector.refreshTopicPartitions();
verify(connector, times(1)).computeAndCreateTopicPartitions();
}
}