KAFKA-14980: Fix MirrorSourceConnector source consumer configuration (#13723)

Reviewers: Mickael Maison <mickael.maison@gmail.com>, Josep Prat <josep.prat@aiven.io>
This commit is contained in:
Chris Egerton 2023-05-19 05:45:01 -07:00 committed by GitHub
parent c90a08c37e
commit a14e73a036
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 6 additions and 3 deletions

View File

@ -158,7 +158,7 @@ public abstract class MirrorConnectorConfig extends AbstractConfig {
static Map<String, Object> sourceConsumerConfig(Map<String, ?> props) {
Map<String, Object> result = new HashMap<>();
result.putAll(Utils.entriesWithPrefix(props, SOURCE_PREFIX));
result.putAll(Utils.entriesWithPrefix(props, SOURCE_CLUSTER_PREFIX));
result.keySet().retainAll(MirrorClientConfig.CLIENT_CONFIG_DEF.names());
result.putAll(Utils.entriesWithPrefix(props, CONSUMER_CLIENT_PREFIX));
result.putAll(Utils.entriesWithPrefix(props, SOURCE_PREFIX + CONSUMER_CLIENT_PREFIX));

View File

@ -43,7 +43,8 @@ public class MirrorConnectorConfigTest {
@Test
public void testSourceConsumerConfig() {
Map<String, String> connectorProps = makeProps(
MirrorConnectorConfig.CONSUMER_CLIENT_PREFIX + "max.poll.interval.ms", "120000"
MirrorConnectorConfig.CONSUMER_CLIENT_PREFIX + "max.poll.interval.ms", "120000",
MirrorConnectorConfig.SOURCE_CLUSTER_PREFIX + "bootstrap.servers", "localhost:2345"
);
MirrorConnectorConfig config = new TestMirrorConnectorConfig(connectorProps);
Map<String, Object> connectorConsumerProps = config.sourceConsumerConfig("test");
@ -52,11 +53,13 @@ public class MirrorConnectorConfigTest {
expectedConsumerProps.put("auto.offset.reset", "earliest");
expectedConsumerProps.put("max.poll.interval.ms", "120000");
expectedConsumerProps.put("client.id", "source1->target2|ConnectorName|test");
expectedConsumerProps.put("bootstrap.servers", "localhost:2345");
assertEquals(expectedConsumerProps, connectorConsumerProps);
// checking auto.offset.reset override works
connectorProps = makeProps(
MirrorConnectorConfig.CONSUMER_CLIENT_PREFIX + "auto.offset.reset", "latest"
MirrorConnectorConfig.CONSUMER_CLIENT_PREFIX + "auto.offset.reset", "latest",
MirrorConnectorConfig.SOURCE_CLUSTER_PREFIX + "bootstrap.servers", "localhost:2345"
);
config = new TestMirrorConnectorConfig(connectorProps);
connectorConsumerProps = config.sourceConsumerConfig("test");