MINOR: Allow Checkpoints for consumers using static partition assignments (#9545)

Reviewers: Mickael Maison <mickael.maison@gmail.com>
This commit is contained in:
Samuel Cantero 2020-11-19 12:01:22 -03:00 committed by GitHub
parent 7ceb34bbec
commit 1625984149
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 35 additions and 6 deletions

View File

@ -145,16 +145,15 @@ public class MirrorCheckpointConnector extends SourceConnector {
knownConsumerGroups = findConsumerGroups();
}
private List<String> findConsumerGroups()
List<String> findConsumerGroups()
throws InterruptedException, ExecutionException {
return listConsumerGroups().stream()
.filter(x -> !x.isSimpleConsumerGroup())
.map(x -> x.groupId())
.filter(this::shouldReplicate)
.collect(Collectors.toList());
}
private Collection<ConsumerGroupListing> listConsumerGroups()
Collection<ConsumerGroupListing> listConsumerGroups()
throws InterruptedException, ExecutionException {
return sourceAdminClient.listConsumerGroups().valid().get();
}

View File

@ -16,12 +16,25 @@
*/
package org.apache.kafka.connect.mirror;
import static org.apache.kafka.connect.mirror.TestUtils.makeProps;
import static org.junit.Assert.assertEquals;
import org.apache.kafka.clients.admin.ConsumerGroupListing;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import org.junit.Test;
import java.util.Set;
import java.util.stream.Collectors;
import static org.apache.kafka.connect.mirror.TestUtils.makeProps;
import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
public class MirrorCheckpointConnectorTest {
@ -64,4 +77,21 @@ public class MirrorCheckpointConnectorTest {
assertEquals(0, output.size());
}
@Test
public void testFindConsumerGroups() throws Exception {
MirrorConnectorConfig config = new MirrorConnectorConfig(makeProps());
MirrorCheckpointConnector connector = new MirrorCheckpointConnector(Collections.emptyList(), config);
connector = spy(connector);
Collection<ConsumerGroupListing> groups = Arrays.asList(
new ConsumerGroupListing("g1", true),
new ConsumerGroupListing("g2", false));
doReturn(groups).when(connector).listConsumerGroups();
doReturn(true).when(connector).shouldReplicate(anyString());
List<String> groupFound = connector.findConsumerGroups();
Set<String> expectedGroups = groups.stream().map(g -> g.groupId()).collect(Collectors.toSet());
assertEquals(expectedGroups, new HashSet<>(groupFound));
}
}