diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java index 6896cf74157..0f158d3252d 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java @@ -108,8 +108,11 @@ public abstract class MirrorConnectorConfig extends AbstractConfig { public static final String OFFSET_SYNCS_TOPIC_LOCATION_DEFAULT = SOURCE_CLUSTER_ALIAS_DEFAULT; public static final String OFFSET_SYNCS_TOPIC_LOCATION_DOC = "The location (source/target) of the offset-syncs topic."; + private final ReplicationPolicy replicationPolicy; + protected MirrorConnectorConfig(ConfigDef configDef, Map props) { super(configDef, props, true); + replicationPolicy = getConfiguredInstance(REPLICATION_POLICY_CLASS, ReplicationPolicy.class); } String connectorName() { @@ -133,7 +136,7 @@ public abstract class MirrorConnectorConfig extends AbstractConfig { } ReplicationPolicy replicationPolicy() { - return getConfiguredInstance(REPLICATION_POLICY_CLASS, ReplicationPolicy.class); + return replicationPolicy; } Map sourceProducerConfig() { diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConfig.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConfig.java index 1bc2c8830cd..ff15357c248 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConfig.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConfig.java @@ -171,10 +171,6 @@ public class MirrorSourceConfig extends MirrorConnectorConfig { } } - ReplicationPolicy replicationPolicy() { - return getConfiguredInstance(REPLICATION_POLICY_CLASS, ReplicationPolicy.class); - } - int replicationFactor() { return getInt(REPLICATION_FACTOR); } diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorConfigTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorConfigTest.java index dbf255d8bb4..7b5e9ffe292 100644 --- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorConfigTest.java +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorConfigTest.java @@ -29,6 +29,7 @@ import static org.apache.kafka.connect.mirror.TestUtils.makeProps; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertSame; public class MirrorConnectorConfigTest { @@ -183,4 +184,10 @@ public class MirrorConnectorConfigTest { assertEquals(2, config.metricsReporters().size()); } + @Test + public void testReplicationPolicy() { + MirrorConnectorConfig config = new TestMirrorConnectorConfig(makeProps()); + assertSame(config.replicationPolicy(), config.replicationPolicy()); + } + }