mirror of https://github.com/apache/kafka.git
KAFKA-14745: Cache the ReplicationPolicy instance in MirrorConnectorConfig (#13328)
Reviewers: Chris Egerton <fearthecellos@gmail.com>
This commit is contained in:
parent
517b5d2b09
commit
71fa008b45
|
@ -108,8 +108,11 @@ public abstract class MirrorConnectorConfig extends AbstractConfig {
|
||||||
public static final String OFFSET_SYNCS_TOPIC_LOCATION_DEFAULT = SOURCE_CLUSTER_ALIAS_DEFAULT;
|
public static final String OFFSET_SYNCS_TOPIC_LOCATION_DEFAULT = SOURCE_CLUSTER_ALIAS_DEFAULT;
|
||||||
public static final String OFFSET_SYNCS_TOPIC_LOCATION_DOC = "The location (source/target) of the offset-syncs topic.";
|
public static final String OFFSET_SYNCS_TOPIC_LOCATION_DOC = "The location (source/target) of the offset-syncs topic.";
|
||||||
|
|
||||||
|
private final ReplicationPolicy replicationPolicy;
|
||||||
|
|
||||||
protected MirrorConnectorConfig(ConfigDef configDef, Map<String, String> props) {
|
protected MirrorConnectorConfig(ConfigDef configDef, Map<String, String> props) {
|
||||||
super(configDef, props, true);
|
super(configDef, props, true);
|
||||||
|
replicationPolicy = getConfiguredInstance(REPLICATION_POLICY_CLASS, ReplicationPolicy.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
String connectorName() {
|
String connectorName() {
|
||||||
|
@ -133,7 +136,7 @@ public abstract class MirrorConnectorConfig extends AbstractConfig {
|
||||||
}
|
}
|
||||||
|
|
||||||
ReplicationPolicy replicationPolicy() {
|
ReplicationPolicy replicationPolicy() {
|
||||||
return getConfiguredInstance(REPLICATION_POLICY_CLASS, ReplicationPolicy.class);
|
return replicationPolicy;
|
||||||
}
|
}
|
||||||
|
|
||||||
Map<String, Object> sourceProducerConfig() {
|
Map<String, Object> sourceProducerConfig() {
|
||||||
|
|
|
@ -171,10 +171,6 @@ public class MirrorSourceConfig extends MirrorConnectorConfig {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
ReplicationPolicy replicationPolicy() {
|
|
||||||
return getConfiguredInstance(REPLICATION_POLICY_CLASS, ReplicationPolicy.class);
|
|
||||||
}
|
|
||||||
|
|
||||||
int replicationFactor() {
|
int replicationFactor() {
|
||||||
return getInt(REPLICATION_FACTOR);
|
return getInt(REPLICATION_FACTOR);
|
||||||
}
|
}
|
||||||
|
|
|
@ -29,6 +29,7 @@ import static org.apache.kafka.connect.mirror.TestUtils.makeProps;
|
||||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertSame;
|
||||||
|
|
||||||
public class MirrorConnectorConfigTest {
|
public class MirrorConnectorConfigTest {
|
||||||
|
|
||||||
|
@ -183,4 +184,10 @@ public class MirrorConnectorConfigTest {
|
||||||
assertEquals(2, config.metricsReporters().size());
|
assertEquals(2, config.metricsReporters().size());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testReplicationPolicy() {
|
||||||
|
MirrorConnectorConfig config = new TestMirrorConnectorConfig(makeProps());
|
||||||
|
assertSame(config.replicationPolicy(), config.replicationPolicy());
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue