KAFKA-14837/14842:Avoid the rebalance caused by the addition and deletion of irrelevant groups for MirrorCheckPointConnector (#13446)

Reviewers: Chris Egerton <chrise@aiven.io>
This commit is contained in:
hudeqi 2023-03-28 21:19:52 +08:00 committed by GitHub
parent c14f56b484
commit f7ea9cfb50
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 91 additions and 6 deletions

View File

@ -18,6 +18,8 @@ package org.apache.kafka.connect.mirror;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.ConsumerGroupListing;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.Utils;
@ -30,6 +32,7 @@ import org.slf4j.LoggerFactory;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -46,6 +49,7 @@ public class MirrorCheckpointConnector extends SourceConnector {
private Scheduler scheduler;
private MirrorCheckpointConfig config;
private TopicFilter topicFilter;
private GroupFilter groupFilter;
private Admin sourceAdminClient;
private Admin targetAdminClient;
@ -70,6 +74,7 @@ public class MirrorCheckpointConnector extends SourceConnector {
}
String connectorName = config.connectorName();
sourceAndTarget = new SourceAndTarget(config.sourceClusterAlias(), config.targetClusterAlias());
topicFilter = config.topicFilter();
groupFilter = config.groupFilter();
sourceAdminClient = config.forwardingAdmin(config.sourceAdminConfig());
targetAdminClient = config.forwardingAdmin(config.targetAdminConfig());
@ -88,6 +93,7 @@ public class MirrorCheckpointConnector extends SourceConnector {
return;
}
Utils.closeQuietly(scheduler, "scheduler");
Utils.closeQuietly(topicFilter, "topic filter");
Utils.closeQuietly(groupFilter, "group filter");
Utils.closeQuietly(sourceAdminClient, "source admin client");
Utils.closeQuietly(targetAdminClient, "target admin client");
@ -150,10 +156,31 @@ public class MirrorCheckpointConnector extends SourceConnector {
List<String> findConsumerGroups()
throws InterruptedException, ExecutionException {
return listConsumerGroups().stream()
List<String> filteredGroups = listConsumerGroups().stream()
.map(ConsumerGroupListing::groupId)
.filter(this::shouldReplicate)
.filter(this::shouldReplicateByGroupFilter)
.collect(Collectors.toList());
List<String> checkpointGroups = new LinkedList<>();
List<String> irrelevantGroups = new LinkedList<>();
for (String group : filteredGroups) {
Set<String> consumedTopics = listConsumerGroupOffsets(group).keySet().stream()
.map(TopicPartition::topic)
.filter(this::shouldReplicateByTopicFilter)
.collect(Collectors.toSet());
// Only perform checkpoints for groups that have offsets for at least one topic that's accepted
// by the topic filter.
if (consumedTopics.size() > 0) {
checkpointGroups.add(group);
} else {
irrelevantGroups.add(group);
}
}
log.debug("Ignoring the following groups which do not have any offsets for topics that are accepted by " +
"the topic filter: {}", irrelevantGroups);
return checkpointGroups;
}
Collection<ConsumerGroupListing> listConsumerGroups()
@ -167,9 +194,18 @@ public class MirrorCheckpointConnector extends SourceConnector {
config.checkpointsTopicReplicationFactor(),
targetAdminClient
);
}
}
boolean shouldReplicate(String group) {
Map<TopicPartition, OffsetAndMetadata> listConsumerGroupOffsets(String group)
throws InterruptedException, ExecutionException {
return sourceAdminClient.listConsumerGroupOffsets(group).partitionsToOffsetAndMetadata().get();
}
boolean shouldReplicateByGroupFilter(String group) {
return groupFilter.shouldReplicateGroup(group);
}
boolean shouldReplicateByTopicFilter(String topic) {
return topicFilter.shouldReplicateTopic(topic);
}
}

View File

@ -180,7 +180,7 @@ public class MirrorCheckpointTask extends SourceTask {
private List<Checkpoint> checkpointsForGroup(String group) throws ExecutionException, InterruptedException {
return listConsumerGroupOffsets(group).entrySet().stream()
.filter(x -> shouldCheckpointTopic(x.getKey().topic()))
.filter(x -> shouldCheckpointTopic(x.getKey().topic())) // Only perform relevant checkpoints filtered by "topic filter"
.map(x -> checkpoint(group, x.getKey(), x.getValue()))
.flatMap(o -> o.map(Stream::of).orElseGet(Stream::empty)) // do not emit checkpoints for partitions that don't have offset-syncs
.filter(x -> x.downstreamOffset() >= 0) // ignore offsets we cannot translate accurately

View File

@ -17,12 +17,15 @@
package org.apache.kafka.connect.mirror;
import org.apache.kafka.clients.admin.ConsumerGroupListing;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@ -123,13 +126,59 @@ public class MirrorCheckpointConnectorTest {
Collection<ConsumerGroupListing> groups = Arrays.asList(
new ConsumerGroupListing("g1", true),
new ConsumerGroupListing("g2", false));
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
offsets.put(new TopicPartition("t1", 0), new OffsetAndMetadata(0));
doReturn(groups).when(connector).listConsumerGroups();
doReturn(true).when(connector).shouldReplicate(anyString());
doReturn(true).when(connector).shouldReplicateByTopicFilter(anyString());
doReturn(true).when(connector).shouldReplicateByGroupFilter(anyString());
doReturn(offsets).when(connector).listConsumerGroupOffsets(anyString());
List<String> groupFound = connector.findConsumerGroups();
Set<String> expectedGroups = groups.stream().map(ConsumerGroupListing::groupId).collect(Collectors.toSet());
assertEquals(expectedGroups, new HashSet<>(groupFound),
"Expected groups are not the same as findConsumerGroups");
doReturn(false).when(connector).shouldReplicateByTopicFilter(anyString());
List<String> topicFilterGroupFound = connector.findConsumerGroups();
assertEquals(Collections.emptyList(), topicFilterGroupFound);
}
@Test
public void testFindConsumerGroupsInCommonScenarios() throws Exception {
MirrorCheckpointConfig config = new MirrorCheckpointConfig(makeProps());
MirrorCheckpointConnector connector = new MirrorCheckpointConnector(Collections.emptyList(), config);
connector = spy(connector);
Collection<ConsumerGroupListing> groups = Arrays.asList(
new ConsumerGroupListing("g1", true),
new ConsumerGroupListing("g2", false),
new ConsumerGroupListing("g3", false),
new ConsumerGroupListing("g4", false));
Map<TopicPartition, OffsetAndMetadata> offsetsForGroup1 = new HashMap<>();
Map<TopicPartition, OffsetAndMetadata> offsetsForGroup2 = new HashMap<>();
Map<TopicPartition, OffsetAndMetadata> offsetsForGroup3 = new HashMap<>();
Map<TopicPartition, OffsetAndMetadata> offsetsForGroup4 = new HashMap<>();
offsetsForGroup1.put(new TopicPartition("t1", 0), new OffsetAndMetadata(0));
offsetsForGroup1.put(new TopicPartition("t2", 0), new OffsetAndMetadata(0));
offsetsForGroup2.put(new TopicPartition("t2", 0), new OffsetAndMetadata(0));
offsetsForGroup2.put(new TopicPartition("t3", 0), new OffsetAndMetadata(0));
offsetsForGroup3.put(new TopicPartition("t3", 0), new OffsetAndMetadata(0));
offsetsForGroup4.put(new TopicPartition("t3", 0), new OffsetAndMetadata(0));
doReturn(groups).when(connector).listConsumerGroups();
doReturn(false).when(connector).shouldReplicateByTopicFilter("t1");
doReturn(true).when(connector).shouldReplicateByTopicFilter("t2");
doReturn(false).when(connector).shouldReplicateByTopicFilter("t3");
doReturn(true).when(connector).shouldReplicateByGroupFilter("g1");
doReturn(true).when(connector).shouldReplicateByGroupFilter("g2");
doReturn(true).when(connector).shouldReplicateByGroupFilter("g3");
doReturn(false).when(connector).shouldReplicateByGroupFilter("g4");
doReturn(offsetsForGroup1).when(connector).listConsumerGroupOffsets("g1");
doReturn(offsetsForGroup2).when(connector).listConsumerGroupOffsets("g2");
doReturn(offsetsForGroup3).when(connector).listConsumerGroupOffsets("g3");
doReturn(offsetsForGroup4).when(connector).listConsumerGroupOffsets("g4");
List<String> groupFound = connector.findConsumerGroups();
assertEquals(groupFound, Arrays.asList("g1", "g2"));
}
}