KAFKA-14838: Add flow/connector/task/role information to MM2 Kafka client.id configs (#13458)

Reviewers: Chris Egerton <chrise@aiven.io>
This commit is contained in:
Dániel Urbán 2023-03-31 16:50:11 +02:00 committed by GitHub
parent 3c4472d701
commit 0aa365add8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 172 additions and 59 deletions

View File

@ -73,6 +73,10 @@ public class MirrorCheckpointConfig extends MirrorConnectorConfig {
public static final String GROUP_FILTER_CLASS = "group.filter.class";
private static final String GROUP_FILTER_CLASS_DOC = "GroupFilter to use. Selects consumer groups to replicate.";
public static final Class<?> GROUP_FILTER_CLASS_DEFAULT = DefaultGroupFilter.class;
public static final String OFFSET_SYNCS_SOURCE_CONSUMER_ROLE = "offset-syncs-source-consumer";
public static final String OFFSET_SYNCS_TARGET_CONSUMER_ROLE = "offset-syncs-target-consumer";
public static final String OFFSET_SYNCS_SOURCE_ADMIN_ROLE = "offset-syncs-source-admin";
public static final String OFFSET_SYNCS_TARGET_ADMIN_ROLE = "offset-syncs-target-admin";
public MirrorCheckpointConfig(Map<String, String> props) {
super(CONNECTOR_CONFIG_DEF, ConfigUtils.translateDeprecatedConfigs(props, new String[][]{
@ -123,9 +127,10 @@ public class MirrorCheckpointConfig extends MirrorConnectorConfig {
}
}
Map<String, String> taskConfigForConsumerGroups(List<String> groups) {
Map<String, String> taskConfigForConsumerGroups(List<String> groups, int taskIndex) {
Map<String, String> props = originalsStrings();
props.put(TASK_CONSUMER_GROUPS, String.join(",", groups));
props.put(TASK_INDEX, String.valueOf(taskIndex));
return props;
}
@ -146,14 +151,14 @@ public class MirrorCheckpointConfig extends MirrorConnectorConfig {
Map<String, Object> offsetSyncsTopicConsumerConfig() {
return SOURCE_CLUSTER_ALIAS_DEFAULT.equals(offsetSyncsTopicLocation())
? sourceConsumerConfig()
: targetConsumerConfig();
? sourceConsumerConfig(OFFSET_SYNCS_SOURCE_CONSUMER_ROLE)
: targetConsumerConfig(OFFSET_SYNCS_TARGET_CONSUMER_ROLE);
}
Map<String, Object> offsetSyncsTopicAdminConfig() {
return SOURCE_CLUSTER_ALIAS_DEFAULT.equals(offsetSyncsTopicLocation())
? sourceAdminConfig()
: targetAdminConfig();
? sourceAdminConfig(OFFSET_SYNCS_SOURCE_ADMIN_ROLE)
: targetAdminConfig(OFFSET_SYNCS_TARGET_ADMIN_ROLE);
}
Duration consumerPollTimeout() {

View File

@ -38,6 +38,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
/** Replicate consumer group state between clusters. Emits checkpoint records.
*
@ -76,9 +77,9 @@ public class MirrorCheckpointConnector extends SourceConnector {
sourceAndTarget = new SourceAndTarget(config.sourceClusterAlias(), config.targetClusterAlias());
topicFilter = config.topicFilter();
groupFilter = config.groupFilter();
sourceAdminClient = config.forwardingAdmin(config.sourceAdminConfig());
targetAdminClient = config.forwardingAdmin(config.targetAdminConfig());
scheduler = new Scheduler(MirrorCheckpointConnector.class, config.adminTimeout());
sourceAdminClient = config.forwardingAdmin(config.sourceAdminConfig("checkpoint-source-admin"));
targetAdminClient = config.forwardingAdmin(config.targetAdminConfig("checkpoint-target-admin"));
scheduler = new Scheduler(getClass(), config.entityLabel(), config.adminTimeout());
scheduler.execute(this::createInternalTopics, "creating internal topics");
scheduler.execute(this::loadInitialConsumerGroups, "loading initial consumer groups");
scheduler.scheduleRepeatingDelayed(this::refreshConsumerGroups, config.refreshGroupsInterval(),
@ -109,14 +110,15 @@ public class MirrorCheckpointConnector extends SourceConnector {
public List<Map<String, String>> taskConfigs(int maxTasks) {
// if the replication is disabled, known consumer group is empty, or checkpoint emission is
// disabled by setting 'emit.checkpoints.enabled' to false, the interval of checkpoint emission
// will be negative and no 'MirrorHeartbeatTask' will be created
// will be negative and no 'MirrorCheckpointTask' will be created
if (!config.enabled() || knownConsumerGroups.isEmpty()
|| config.emitCheckpointsInterval().isNegative()) {
return Collections.emptyList();
}
int numTasks = Math.min(maxTasks, knownConsumerGroups.size());
return ConnectorUtils.groupPartitions(knownConsumerGroups, numTasks).stream()
.map(config::taskConfigForConsumerGroups)
List<List<String>> groupsPartitioned = ConnectorUtils.groupPartitions(knownConsumerGroups, numTasks);
return IntStream.range(0, numTasks)
.mapToObj(i -> config.taskConfigForConsumerGroups(groupsPartitioned.get(i), i))
.collect(Collectors.toList());
}

View File

@ -96,12 +96,12 @@ public class MirrorCheckpointTask extends SourceTask {
interval = config.emitCheckpointsInterval();
pollTimeout = config.consumerPollTimeout();
offsetSyncStore = new OffsetSyncStore(config);
sourceAdminClient = config.forwardingAdmin(config.sourceAdminConfig());
targetAdminClient = config.forwardingAdmin(config.targetAdminConfig());
sourceAdminClient = config.forwardingAdmin(config.sourceAdminConfig("checkpoint-source-admin"));
targetAdminClient = config.forwardingAdmin(config.targetAdminConfig("checkpoint-target-admin"));
metrics = config.metrics();
idleConsumerGroupsOffset = new HashMap<>();
checkpointsPerConsumerGroup = new HashMap<>();
scheduler = new Scheduler(MirrorCheckpointTask.class, config.adminTimeout());
scheduler = new Scheduler(getClass(), config.entityLabel(), config.adminTimeout());
scheduler.execute(() -> {
offsetSyncStore.start();
scheduler.scheduleRepeating(this::refreshIdleConsumerGroupOffset, config.syncGroupOffsetsInterval(),

View File

@ -46,12 +46,22 @@ public class MirrorCheckpointTaskConfig extends MirrorCheckpointConfig {
return metrics;
}
@Override
String entityLabel() {
return super.entityLabel() + "-" + (getInt(TASK_INDEX) == null ? "?" : getInt(TASK_INDEX));
}
protected static final ConfigDef TASK_CONFIG_DEF = new ConfigDef(CONNECTOR_CONFIG_DEF)
.define(
TASK_CONSUMER_GROUPS,
ConfigDef.Type.LIST,
null,
ConfigDef.Importance.LOW,
TASK_CONSUMER_GROUPS_DOC);
TASK_CONSUMER_GROUPS_DOC)
.define(TASK_INDEX,
ConfigDef.Type.INT,
null,
ConfigDef.Importance.LOW,
"The index of the task");
}

View File

@ -107,6 +107,7 @@ public abstract class MirrorConnectorConfig extends AbstractConfig {
public static final String OFFSET_SYNCS_TOPIC_LOCATION = "offset-syncs.topic.location";
public static final String OFFSET_SYNCS_TOPIC_LOCATION_DEFAULT = SOURCE_CLUSTER_ALIAS_DEFAULT;
public static final String OFFSET_SYNCS_TOPIC_LOCATION_DOC = "The location (source/target) of the offset-syncs topic.";
public static final String TASK_INDEX = "task.index";
private final ReplicationPolicy replicationPolicy;
@ -139,17 +140,20 @@ public abstract class MirrorConnectorConfig extends AbstractConfig {
return replicationPolicy;
}
Map<String, Object> sourceProducerConfig() {
Map<String, Object> sourceProducerConfig(String role) {
Map<String, Object> props = new HashMap<>();
props.putAll(originalsWithPrefix(SOURCE_CLUSTER_PREFIX));
props.keySet().retainAll(MirrorClientConfig.CLIENT_CONFIG_DEF.names());
props.putAll(originalsWithPrefix(PRODUCER_CLIENT_PREFIX));
props.putAll(originalsWithPrefix(SOURCE_PREFIX + PRODUCER_CLIENT_PREFIX));
addClientId(props, role);
return props;
}
Map<String, Object> sourceConsumerConfig() {
return sourceConsumerConfig(originals());
Map<String, Object> sourceConsumerConfig(String role) {
Map<String, Object> result = sourceConsumerConfig(originals());
addClientId(result, role);
return result;
}
static Map<String, Object> sourceConsumerConfig(Map<String, ?> props) {
@ -163,25 +167,27 @@ public abstract class MirrorConnectorConfig extends AbstractConfig {
return result;
}
Map<String, Object> targetAdminConfig() {
Map<String, Object> targetAdminConfig(String role) {
Map<String, Object> props = new HashMap<>();
props.putAll(originalsWithPrefix(TARGET_CLUSTER_PREFIX));
props.keySet().retainAll(MirrorClientConfig.CLIENT_CONFIG_DEF.names());
props.putAll(originalsWithPrefix(ADMIN_CLIENT_PREFIX));
props.putAll(originalsWithPrefix(TARGET_PREFIX + ADMIN_CLIENT_PREFIX));
addClientId(props, role);
return props;
}
Map<String, Object> targetProducerConfig() {
Map<String, Object> targetProducerConfig(String role) {
Map<String, Object> props = new HashMap<>();
props.putAll(originalsWithPrefix(TARGET_CLUSTER_PREFIX));
props.keySet().retainAll(MirrorClientConfig.CLIENT_CONFIG_DEF.names());
props.putAll(originalsWithPrefix(PRODUCER_CLIENT_PREFIX));
props.putAll(originalsWithPrefix(TARGET_PREFIX + PRODUCER_CLIENT_PREFIX));
addClientId(props, role);
return props;
}
Map<String, Object> targetConsumerConfig() {
Map<String, Object> targetConsumerConfig(String role) {
Map<String, Object> props = new HashMap<>();
props.putAll(originalsWithPrefix(TARGET_CLUSTER_PREFIX));
props.keySet().retainAll(MirrorClientConfig.CLIENT_CONFIG_DEF.names());
@ -189,15 +195,17 @@ public abstract class MirrorConnectorConfig extends AbstractConfig {
props.putAll(originalsWithPrefix(TARGET_PREFIX + CONSUMER_CLIENT_PREFIX));
props.put(ENABLE_AUTO_COMMIT_CONFIG, "false");
props.putIfAbsent(AUTO_OFFSET_RESET_CONFIG, "earliest");
addClientId(props, role);
return props;
}
Map<String, Object> sourceAdminConfig() {
Map<String, Object> sourceAdminConfig(String role) {
Map<String, Object> props = new HashMap<>();
props.putAll(originalsWithPrefix(SOURCE_CLUSTER_PREFIX));
props.keySet().retainAll(MirrorClientConfig.CLIENT_CONFIG_DEF.names());
props.putAll(originalsWithPrefix(ADMIN_CLIENT_PREFIX));
props.putAll(originalsWithPrefix(SOURCE_PREFIX + ADMIN_CLIENT_PREFIX));
addClientId(props, role);
return props;
}
@ -223,6 +231,16 @@ public abstract class MirrorConnectorConfig extends AbstractConfig {
}
}
void addClientId(Map<String, Object> props, String role) {
String clientId = entityLabel() + (role == null ? "" : "|" + role);
props.compute(CommonClientConfigs.CLIENT_ID_CONFIG,
(k, userClientId) -> (userClientId == null ? "" : userClientId + "|") + clientId);
}
String entityLabel() {
return sourceClusterAlias() + "->" + targetClusterAlias() + "|" + connectorName();
}
@SuppressWarnings("deprecation")
protected static final ConfigDef BASE_CONNECTOR_CONFIG_DEF = new ConfigDef(ConnectorConfig.configDef())
.define(

View File

@ -48,8 +48,8 @@ public class MirrorHeartbeatConnector extends SourceConnector {
@Override
public void start(Map<String, String> props) {
config = new MirrorHeartbeatConfig(props);
targetAdminClient = config.forwardingAdmin(config.targetAdminConfig());
scheduler = new Scheduler(MirrorHeartbeatConnector.class, config.adminTimeout());
targetAdminClient = config.forwardingAdmin(config.targetAdminConfig("heartbeats-target-admin"));
scheduler = new Scheduler(getClass(), config.entityLabel(), config.adminTimeout());
scheduler.execute(this::createInternalTopics, "creating internal topics");
}

View File

@ -64,6 +64,8 @@ import java.util.Properties;
import java.util.stream.Collectors;
import java.io.File;
import static org.apache.kafka.clients.CommonClientConfigs.CLIENT_ID_CONFIG;
/**
* Entry point for "MirrorMaker 2.0".
* <p>
@ -267,6 +269,7 @@ public class MirrorMaker {
String clientIdBase = ConnectUtils.clientIdBase(distributedConfig);
// Create the admin client to be shared by all backing stores for this herder
Map<String, Object> adminProps = new HashMap<>(distributedConfig.originals());
adminProps.put(CLIENT_ID_CONFIG, clientIdBase + "shared-admin");
ConnectUtils.addMetricsContextProperties(adminProps, distributedConfig, kafkaClusterId);
SharedTopicAdmin sharedAdmin = new SharedTopicAdmin(adminProps);
KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore(sharedAdmin, () -> clientIdBase);

View File

@ -199,6 +199,7 @@ public class MirrorMakerConfig extends AbstractConfig {
props.putAll(stringsWithPrefix(CONFIG_PROVIDERS_CONFIG));
// fill in reasonable defaults
props.putIfAbsent(CommonClientConfigs.CLIENT_ID_CONFIG, sourceAndTarget.toString());
props.putIfAbsent(GROUP_ID_CONFIG, sourceAndTarget.source() + "-mm2");
props.putIfAbsent(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, "mm2-offsets."
+ sourceAndTarget.source() + ".internal");

View File

@ -95,6 +95,10 @@ public class MirrorSourceConfig extends MirrorConnectorConfig {
+ "Metrics have the target, topic and partition tags. When this setting is enabled, it adds the source tag. "
+ "This configuration will be removed in Kafka 4.0 and the default behavior will be to always have the source tag.";
public static final boolean ADD_SOURCE_ALIAS_TO_METRICS_DEFAULT = false;
public static final String OFFSET_SYNCS_SOURCE_PRODUCER_ROLE = "offset-syncs-source-producer";
public static final String OFFSET_SYNCS_TARGET_PRODUCER_ROLE = "offset-syncs-target-producer";
public static final String OFFSET_SYNCS_SOURCE_ADMIN_ROLE = "offset-syncs-source-admin";
public static final String OFFSET_SYNCS_TARGET_ADMIN_ROLE = "offset-syncs-target-admin";
public MirrorSourceConfig(Map<String, String> props) {
super(CONNECTOR_CONFIG_DEF, ConfigUtils.translateDeprecatedConfigs(props, new String[][]{
@ -110,12 +114,13 @@ public class MirrorSourceConfig extends MirrorConnectorConfig {
return getString(ConnectorConfig.NAME_CONFIG);
}
Map<String, String> taskConfigForTopicPartitions(List<TopicPartition> topicPartitions) {
Map<String, String> taskConfigForTopicPartitions(List<TopicPartition> topicPartitions, int taskIndex) {
Map<String, String> props = originalsStrings();
String topicPartitionsString = topicPartitions.stream()
.map(MirrorUtils::encodeTopicPartition)
.collect(Collectors.joining(","));
props.put(TASK_TOPIC_PARTITIONS, topicPartitionsString);
props.put(TASK_INDEX, String.valueOf(taskIndex));
return props;
}
@ -132,14 +137,14 @@ public class MirrorSourceConfig extends MirrorConnectorConfig {
Map<String, Object> offsetSyncsTopicAdminConfig() {
return SOURCE_CLUSTER_ALIAS_DEFAULT.equals(offsetSyncsTopicLocation())
? sourceAdminConfig()
: targetAdminConfig();
? sourceAdminConfig(OFFSET_SYNCS_SOURCE_ADMIN_ROLE)
: targetAdminConfig(OFFSET_SYNCS_TARGET_ADMIN_ROLE);
}
Map<String, Object> offsetSyncsTopicProducerConfig() {
return SOURCE_CLUSTER_ALIAS_DEFAULT.equals(offsetSyncsTopicLocation())
? sourceProducerConfig()
: targetProducerConfig();
? sourceProducerConfig(OFFSET_SYNCS_SOURCE_PRODUCER_ROLE)
: targetProducerConfig(OFFSET_SYNCS_TARGET_PRODUCER_ROLE);
}
String checkpointsTopic() {

View File

@ -62,6 +62,7 @@ import java.util.Collections;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import java.util.concurrent.ExecutionException;
@ -136,10 +137,10 @@ public class MirrorSourceConnector extends SourceConnector {
configPropertyFilter = config.configPropertyFilter();
replicationPolicy = config.replicationPolicy();
replicationFactor = config.replicationFactor();
sourceAdminClient = config.forwardingAdmin(config.sourceAdminConfig());
targetAdminClient = config.forwardingAdmin(config.targetAdminConfig());
sourceAdminClient = config.forwardingAdmin(config.sourceAdminConfig("replication-source-admin"));
targetAdminClient = config.forwardingAdmin(config.targetAdminConfig("replication-target-admin"));
offsetSyncsAdminClient = config.forwardingAdmin(config.offsetSyncsTopicAdminConfig());
scheduler = new Scheduler(MirrorSourceConnector.class, config.adminTimeout());
scheduler = new Scheduler(getClass(), config.entityLabel(), config.adminTimeout());
scheduler.execute(this::createOffsetSyncsTopic, "creating upstream offset-syncs topic");
scheduler.execute(this::loadTopicPartitions, "loading initial set of topic-partitions");
scheduler.execute(this::computeAndCreateTopicPartitions, "creating downstream topic-partitions");
@ -198,8 +199,8 @@ public class MirrorSourceConnector extends SourceConnector {
roundRobinByTask.get(index).add(partition);
count++;
}
return roundRobinByTask.stream().map(config::taskConfigForTopicPartitions)
return IntStream.range(0, numTasks)
.mapToObj(i -> config.taskConfigForTopicPartitions(roundRobinByTask.get(i), i))
.collect(Collectors.toList());
}

View File

@ -100,7 +100,7 @@ public class MirrorSourceTask extends SourceTask {
replicationPolicy = config.replicationPolicy();
partitionStates = new HashMap<>();
offsetSyncsTopic = config.offsetSyncsTopic();
consumer = MirrorUtils.newConsumer(config.sourceConsumerConfig());
consumer = MirrorUtils.newConsumer(config.sourceConsumerConfig("replication-consumer"));
offsetProducer = MirrorUtils.newProducer(config.offsetSyncsTopicProducerConfig());
Set<TopicPartition> taskTopicPartitions = config.taskTopicPartitions();
Map<TopicPartition, Long> topicPartitionOffsets = loadOffsets(taskTopicPartitions);

View File

@ -49,11 +49,21 @@ public class MirrorSourceTaskConfig extends MirrorSourceConfig {
return metrics;
}
@Override
String entityLabel() {
return super.entityLabel() + "-" + (getInt(TASK_INDEX) == null ? "?" : getInt(TASK_INDEX));
}
protected static final ConfigDef TASK_CONFIG_DEF = new ConfigDef(CONNECTOR_CONFIG_DEF)
.define(
TASK_TOPIC_PARTITIONS,
ConfigDef.Type.LIST,
null,
ConfigDef.Importance.LOW,
TASK_TOPIC_PARTITIONS_DOC);
TASK_TOPIC_PARTITIONS_DOC)
.define(TASK_INDEX,
ConfigDef.Type.INT,
null,
ConfigDef.Importance.LOW,
"The index of the task");
}

View File

@ -39,8 +39,8 @@ class Scheduler implements AutoCloseable {
this.timeout = timeout;
}
Scheduler(Class<?> clazz, Duration timeout) {
this("Scheduler for " + clazz.getSimpleName(), timeout);
Scheduler(Class<?> clazz, String role, Duration timeout) {
this("Scheduler for " + clazz.getSimpleName() + ": " + role, timeout);
}
void scheduleRepeating(Task task, Duration interval, String description) {

View File

@ -24,6 +24,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import static org.apache.kafka.connect.mirror.TestUtils.assertEqualsExceptClientId;
import static org.apache.kafka.connect.mirror.TestUtils.makeProps;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@ -35,7 +36,7 @@ public class MirrorCheckpointConfigTest {
public void testTaskConfigConsumerGroups() {
List<String> groups = Arrays.asList("consumer-1", "consumer-2", "consumer-3");
MirrorCheckpointConfig config = new MirrorCheckpointConfig(makeProps());
Map<String, String> props = config.taskConfigForConsumerGroups(groups);
Map<String, String> props = config.taskConfigForConsumerGroups(groups, 1);
MirrorCheckpointTaskConfig taskConfig = new MirrorCheckpointTaskConfig(props);
assertEquals(taskConfig.taskConsumerGroups(), new HashSet<>(groups),
"Setting consumer groups property configuration failed");
@ -76,9 +77,21 @@ public class MirrorCheckpointConfigTest {
"fetch.min.bytes", "1"
);
MirrorCheckpointConfig config = new MirrorCheckpointConfig(connectorProps);
assertEquals(config.sourceConsumerConfig(), config.offsetSyncsTopicConsumerConfig());
Map<String, Object> sourceConsumerConfig = config.sourceConsumerConfig("test");
Map<String, Object> offsetSyncsTopicSourceConsumerConfig = config.offsetSyncsTopicConsumerConfig();
assertEqualsExceptClientId(sourceConsumerConfig, offsetSyncsTopicSourceConsumerConfig);
assertEquals("source1->target2|ConnectorName|test", sourceConsumerConfig.get("client.id"));
assertEquals(
"source1->target2|ConnectorName|" + MirrorCheckpointConfig.OFFSET_SYNCS_SOURCE_CONSUMER_ROLE,
offsetSyncsTopicSourceConsumerConfig.get("client.id"));
connectorProps.put("offset-syncs.topic.location", "target");
config = new MirrorCheckpointConfig(connectorProps);
assertEquals(config.targetConsumerConfig(), config.offsetSyncsTopicConsumerConfig());
Map<String, Object> targetConsumerConfig = config.targetConsumerConfig("test");
Map<String, Object> offsetSyncsTopicTargetConsumerConfig = config.offsetSyncsTopicConsumerConfig();
assertEqualsExceptClientId(targetConsumerConfig, offsetSyncsTopicTargetConsumerConfig);
assertEquals("source1->target2|ConnectorName|test", targetConsumerConfig.get("client.id"));
assertEquals(
"source1->target2|ConnectorName|" + MirrorCheckpointConfig.OFFSET_SYNCS_TARGET_CONSUMER_ROLE,
offsetSyncsTopicTargetConsumerConfig.get("client.id"));
}
}

View File

@ -46,11 +46,12 @@ public class MirrorConnectorConfigTest {
MirrorConnectorConfig.CONSUMER_CLIENT_PREFIX + "max.poll.interval.ms", "120000"
);
MirrorConnectorConfig config = new TestMirrorConnectorConfig(connectorProps);
Map<String, Object> connectorConsumerProps = config.sourceConsumerConfig();
Map<String, Object> connectorConsumerProps = config.sourceConsumerConfig("test");
Map<String, Object> expectedConsumerProps = new HashMap<>();
expectedConsumerProps.put("enable.auto.commit", "false");
expectedConsumerProps.put("auto.offset.reset", "earliest");
expectedConsumerProps.put("max.poll.interval.ms", "120000");
expectedConsumerProps.put("client.id", "source1->target2|ConnectorName|test");
assertEquals(expectedConsumerProps, connectorConsumerProps);
// checking auto.offset.reset override works
@ -58,7 +59,7 @@ public class MirrorConnectorConfigTest {
MirrorConnectorConfig.CONSUMER_CLIENT_PREFIX + "auto.offset.reset", "latest"
);
config = new TestMirrorConnectorConfig(connectorProps);
connectorConsumerProps = config.sourceConsumerConfig();
connectorConsumerProps = config.sourceConsumerConfig("test");
expectedConsumerProps.put("auto.offset.reset", "latest");
expectedConsumerProps.remove("max.poll.interval.ms");
assertEquals(expectedConsumerProps, connectorConsumerProps,
@ -73,11 +74,12 @@ public class MirrorConnectorConfigTest {
prefix + "max.poll.interval.ms", "100"
);
MirrorConnectorConfig config = new TestMirrorConnectorConfig(connectorProps);
Map<String, Object> connectorConsumerProps = config.sourceConsumerConfig();
Map<String, Object> connectorConsumerProps = config.sourceConsumerConfig("test");
Map<String, Object> expectedConsumerProps = new HashMap<>();
expectedConsumerProps.put("enable.auto.commit", "false");
expectedConsumerProps.put("auto.offset.reset", "latest");
expectedConsumerProps.put("max.poll.interval.ms", "100");
expectedConsumerProps.put("client.id", "source1->target2|ConnectorName|test");
assertEquals(expectedConsumerProps, connectorConsumerProps,
prefix + " source consumer config not matching");
}
@ -88,9 +90,10 @@ public class MirrorConnectorConfigTest {
MirrorConnectorConfig.PRODUCER_CLIENT_PREFIX + "acks", "1"
);
MirrorConnectorConfig config = new TestMirrorConnectorConfig(connectorProps);
Map<String, Object> connectorProducerProps = config.sourceProducerConfig();
Map<String, Object> connectorProducerProps = config.sourceProducerConfig("test");
Map<String, Object> expectedProducerProps = new HashMap<>();
expectedProducerProps.put("acks", "1");
expectedProducerProps.put("client.id", "source1->target2|ConnectorName|test");
assertEquals(expectedProducerProps, connectorProducerProps,
MirrorConnectorConfig.PRODUCER_CLIENT_PREFIX + " source product config not matching");
}
@ -100,9 +103,10 @@ public class MirrorConnectorConfigTest {
String prefix = MirrorConnectorConfig.SOURCE_PREFIX + MirrorConnectorConfig.PRODUCER_CLIENT_PREFIX;
Map<String, String> connectorProps = makeProps(prefix + "acks", "1");
MirrorConnectorConfig config = new TestMirrorConnectorConfig(connectorProps);
Map<String, Object> connectorProducerProps = config.sourceProducerConfig();
Map<String, Object> connectorProducerProps = config.sourceProducerConfig("test");
Map<String, Object> expectedProducerProps = new HashMap<>();
expectedProducerProps.put("acks", "1");
expectedProducerProps.put("client.id", "source1->target2|ConnectorName|test");
assertEquals(expectedProducerProps, connectorProducerProps,
prefix + " source producer config not matching");
}
@ -114,9 +118,10 @@ public class MirrorConnectorConfigTest {
"connections.max.idle.ms", "10000"
);
MirrorConnectorConfig config = new TestMirrorConnectorConfig(connectorProps);
Map<String, Object> connectorAdminProps = config.sourceAdminConfig();
Map<String, Object> connectorAdminProps = config.sourceAdminConfig("test");
Map<String, Object> expectedAdminProps = new HashMap<>();
expectedAdminProps.put("connections.max.idle.ms", "10000");
expectedAdminProps.put("client.id", "source1->target2|ConnectorName|test");
assertEquals(expectedAdminProps, connectorAdminProps,
MirrorConnectorConfig.ADMIN_CLIENT_PREFIX + " source connector admin props not matching");
}
@ -126,9 +131,10 @@ public class MirrorConnectorConfigTest {
String prefix = MirrorConnectorConfig.SOURCE_PREFIX + MirrorConnectorConfig.ADMIN_CLIENT_PREFIX;
Map<String, String> connectorProps = makeProps(prefix + "connections.max.idle.ms", "10000");
MirrorConnectorConfig config = new TestMirrorConnectorConfig(connectorProps);
Map<String, Object> connectorAdminProps = config.sourceAdminConfig();
Map<String, Object> connectorAdminProps = config.sourceAdminConfig("test");
Map<String, Object> expectedAdminProps = new HashMap<>();
expectedAdminProps.put("connections.max.idle.ms", "10000");
expectedAdminProps.put("client.id", "source1->target2|ConnectorName|test");
assertEquals(expectedAdminProps, connectorAdminProps, prefix + " source connector admin props not matching");
}
@ -139,9 +145,10 @@ public class MirrorConnectorConfigTest {
"connections.max.idle.ms", "10000"
);
MirrorConnectorConfig config = new TestMirrorConnectorConfig(connectorProps);
Map<String, Object> connectorAdminProps = config.targetAdminConfig();
Map<String, Object> connectorAdminProps = config.targetAdminConfig("test");
Map<String, Object> expectedAdminProps = new HashMap<>();
expectedAdminProps.put("connections.max.idle.ms", "10000");
expectedAdminProps.put("client.id", "source1->target2|ConnectorName|test");
assertEquals(expectedAdminProps, connectorAdminProps,
MirrorConnectorConfig.ADMIN_CLIENT_PREFIX + " target connector admin props not matching");
}
@ -151,9 +158,10 @@ public class MirrorConnectorConfigTest {
String prefix = MirrorConnectorConfig.TARGET_PREFIX + MirrorConnectorConfig.ADMIN_CLIENT_PREFIX;
Map<String, String> connectorProps = makeProps(prefix + "connections.max.idle.ms", "10000");
MirrorConnectorConfig config = new TestMirrorConnectorConfig(connectorProps);
Map<String, Object> connectorAdminProps = config.targetAdminConfig();
Map<String, Object> connectorAdminProps = config.targetAdminConfig("test");
Map<String, Object> expectedAdminProps = new HashMap<>();
expectedAdminProps.put("connections.max.idle.ms", "10000");
expectedAdminProps.put("client.id", "source1->target2|ConnectorName|test");
assertEquals(expectedAdminProps, connectorAdminProps, prefix + " source connector admin props not matching");
}

View File

@ -248,9 +248,11 @@ public class MirrorMakerConfigTest {
SourceAndTarget a = new SourceAndTarget("b", "a");
SourceAndTarget b = new SourceAndTarget("a", "b");
Map<String, String> aProps = mirrorConfig.workerConfig(a);
assertEquals("b->a", aProps.get("client.id"));
assertEquals("123", aProps.get("offset.storage.replication.factor"));
assertEquals("__", aProps.get("replication.policy.separator"));
Map<String, String> bProps = mirrorConfig.workerConfig(b);
assertEquals("a->b", bProps.get("client.id"));
assertEquals("456", bProps.get("status.storage.replication.factor"));
assertEquals("client-one", bProps.get("producer.client.id"),
"producer props should be passed through to worker producer config: " + bProps);

View File

@ -26,6 +26,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import static org.apache.kafka.connect.mirror.TestUtils.assertEqualsExceptClientId;
import static org.apache.kafka.connect.mirror.TestUtils.makeProps;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@ -39,7 +40,7 @@ public class MirrorSourceConfigTest {
List<TopicPartition> topicPartitions = Arrays.asList(new TopicPartition("topic-1", 2),
new TopicPartition("topic-3", 4), new TopicPartition("topic-5", 6));
MirrorSourceConfig config = new MirrorSourceConfig(makeProps());
Map<String, String> props = config.taskConfigForTopicPartitions(topicPartitions);
Map<String, String> props = config.taskConfigForTopicPartitions(topicPartitions, 1);
MirrorSourceTaskConfig taskConfig = new MirrorSourceTaskConfig(props);
assertEquals(taskConfig.taskTopicPartitions(), new HashSet<>(topicPartitions),
"Setting topic property configuration failed");
@ -144,10 +145,20 @@ public class MirrorSourceConfigTest {
"fetch.min.bytes", "1"
);
MirrorSourceConfig config = new MirrorSourceConfig(connectorProps);
assertEquals(config.sourceProducerConfig(), config.offsetSyncsTopicProducerConfig());
Map<String, Object> sourceProducerConfig = config.sourceProducerConfig("test");
Map<String, Object> offsetSyncsTopicSourceProducerConfig = config.offsetSyncsTopicProducerConfig();
assertEqualsExceptClientId(sourceProducerConfig, offsetSyncsTopicSourceProducerConfig);
assertEquals("source1->target2|ConnectorName|test", sourceProducerConfig.get("client.id"));
assertEquals("source1->target2|ConnectorName|" + MirrorSourceConfig.OFFSET_SYNCS_SOURCE_PRODUCER_ROLE,
offsetSyncsTopicSourceProducerConfig.get("client.id"));
connectorProps.put("offset-syncs.topic.location", "target");
config = new MirrorSourceConfig(connectorProps);
assertEquals(config.targetProducerConfig(), config.offsetSyncsTopicProducerConfig());
Map<String, Object> targetProducerConfig = config.targetProducerConfig("test");
Map<String, Object> offsetSyncsTopicTargetProducerConfig = config.offsetSyncsTopicProducerConfig();
assertEqualsExceptClientId(targetProducerConfig, offsetSyncsTopicTargetProducerConfig);
assertEquals("source1->target2|ConnectorName|test", targetProducerConfig.get("client.id"));
assertEquals("source1->target2|ConnectorName|" + MirrorSourceConfig.OFFSET_SYNCS_TARGET_PRODUCER_ROLE,
offsetSyncsTopicTargetProducerConfig.get("client.id"));
}
@Test
@ -159,9 +170,19 @@ public class MirrorSourceConfigTest {
"retries", "123"
);
MirrorSourceConfig config = new MirrorSourceConfig(connectorProps);
assertEquals(config.sourceAdminConfig(), config.offsetSyncsTopicAdminConfig());
Map<String, Object> sourceAdminConfig = config.sourceAdminConfig("test");
Map<String, Object> offsetSyncsTopicSourceAdminConfig = config.offsetSyncsTopicAdminConfig();
assertEqualsExceptClientId(sourceAdminConfig, offsetSyncsTopicSourceAdminConfig);
assertEquals("source1->target2|ConnectorName|test", sourceAdminConfig.get("client.id"));
assertEquals("source1->target2|ConnectorName|" + MirrorSourceConfig.OFFSET_SYNCS_SOURCE_ADMIN_ROLE,
offsetSyncsTopicSourceAdminConfig.get("client.id"));
connectorProps.put("offset-syncs.topic.location", "target");
config = new MirrorSourceConfig(connectorProps);
assertEquals(config.targetAdminConfig(), config.offsetSyncsTopicAdminConfig());
Map<String, Object> targetAdminConfig = config.targetAdminConfig("test");
Map<String, Object> offsetSyncsTopicTargetAdminConfig = config.offsetSyncsTopicAdminConfig();
assertEqualsExceptClientId(targetAdminConfig, offsetSyncsTopicTargetAdminConfig);
assertEquals("source1->target2|ConnectorName|test", targetAdminConfig.get("client.id"));
assertEquals("source1->target2|ConnectorName|" + MirrorSourceConfig.OFFSET_SYNCS_TARGET_ADMIN_ROLE,
offsetSyncsTopicTargetAdminConfig.get("client.id"));
}
}

View File

@ -47,6 +47,7 @@ public class MirrorSourceMetricsTest {
configs.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, MirrorSourceConnector.class.getName());
configs.put(MirrorConnectorConfig.SOURCE_CLUSTER_ALIAS, SOURCE);
configs.put(MirrorConnectorConfig.TARGET_CLUSTER_ALIAS, TARGET);
configs.put(MirrorConnectorConfig.TASK_INDEX, "0");
configs.put(MirrorSourceTaskConfig.TASK_TOPIC_PARTITIONS, TP.toString());
reporter = new TestReporter();
}

View File

@ -19,6 +19,8 @@ package org.apache.kafka.connect.mirror;
import java.util.HashMap;
import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class TestUtils {
static Map<String, String> makeProps(String... keyValues) {
@ -43,4 +45,12 @@ public class TestUtils {
}
return records;
}
public static void assertEqualsExceptClientId(Map<String, Object> expected, Map<String, Object> actual) {
Map<String, Object> expectedWithoutClientId = new HashMap<>(expected);
expectedWithoutClientId.remove("client.id");
Map<String, Object> actualWithoutClientId = new HashMap<>(actual);
actualWithoutClientId.remove("client.id");
assertEquals(expectedWithoutClientId, actualWithoutClientId);
}
}

View File

@ -291,6 +291,9 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
String clientIdConfig = config.getString(CommonClientConfigs.CLIENT_ID_CONFIG);
String clientId = clientIdConfig.length() <= 0 ? "connect-" + CONNECT_CLIENT_ID_SEQUENCE.getAndIncrement() : clientIdConfig;
// Thread factory uses String.format and '%' is handled as a placeholder
// need to escape if the client.id contains an actual % character
String escapedClientIdForThreadNameFormat = clientId.replace("%", "%%");
LogContext logContext = new LogContext("[Worker clientId=" + clientId + ", groupId=" + this.workerGroupId + "] ");
log = logContext.logger(DistributedHerder.class);
@ -303,17 +306,17 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
TimeUnit.MILLISECONDS,
new LinkedBlockingDeque<>(1),
ThreadUtils.createThreadFactory(
this.getClass().getSimpleName() + "-" + clientId + "-%d", false));
this.getClass().getSimpleName() + "-" + escapedClientIdForThreadNameFormat + "-%d", false));
this.forwardRequestExecutor = forwardRequestExecutor != null
? forwardRequestExecutor
: Executors.newFixedThreadPool(
1,
ThreadUtils.createThreadFactory("ForwardRequestExecutor-" + clientId + "-%d", false)
ThreadUtils.createThreadFactory("ForwardRequestExecutor-" + escapedClientIdForThreadNameFormat + "-%d", false)
);
this.startAndStopExecutor = Executors.newFixedThreadPool(START_STOP_THREAD_POOL_SIZE,
ThreadUtils.createThreadFactory(
"StartAndStopExecutor-" + clientId + "-%d", false));
"StartAndStopExecutor-" + escapedClientIdForThreadNameFormat + "-%d", false));
this.config = config;
stopping = new AtomicBoolean(false);