From 0aa365add881428815c3d71fd5ce575fb8fb8089 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?D=C3=A1niel=20Urb=C3=A1n?= <48119872+urbandan@users.noreply.github.com> Date: Fri, 31 Mar 2023 16:50:11 +0200 Subject: [PATCH] KAFKA-14838: Add flow/connector/task/role information to MM2 Kafka client.id configs (#13458) Reviewers: Chris Egerton --- .../mirror/MirrorCheckpointConfig.java | 15 ++++++--- .../mirror/MirrorCheckpointConnector.java | 14 ++++---- .../connect/mirror/MirrorCheckpointTask.java | 6 ++-- .../mirror/MirrorCheckpointTaskConfig.java | 12 ++++++- .../connect/mirror/MirrorConnectorConfig.java | 32 +++++++++++++++---- .../mirror/MirrorHeartbeatConnector.java | 4 +-- .../kafka/connect/mirror/MirrorMaker.java | 3 ++ .../connect/mirror/MirrorMakerConfig.java | 1 + .../connect/mirror/MirrorSourceConfig.java | 15 ++++++--- .../connect/mirror/MirrorSourceConnector.java | 13 ++++---- .../connect/mirror/MirrorSourceTask.java | 2 +- .../mirror/MirrorSourceTaskConfig.java | 12 ++++++- .../kafka/connect/mirror/Scheduler.java | 4 +-- .../mirror/MirrorCheckpointConfigTest.java | 19 +++++++++-- .../mirror/MirrorConnectorConfigTest.java | 26 +++++++++------ .../connect/mirror/MirrorMakerConfigTest.java | 2 ++ .../mirror/MirrorSourceConfigTest.java | 31 +++++++++++++++--- .../mirror/MirrorSourceMetricsTest.java | 1 + .../kafka/connect/mirror/TestUtils.java | 10 ++++++ .../distributed/DistributedHerder.java | 9 ++++-- 20 files changed, 172 insertions(+), 59 deletions(-) diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConfig.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConfig.java index 122d8ad1e7f..bed71ff0db2 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConfig.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConfig.java @@ -73,6 +73,10 @@ public class MirrorCheckpointConfig extends MirrorConnectorConfig { public static final String GROUP_FILTER_CLASS = "group.filter.class"; private static final String GROUP_FILTER_CLASS_DOC = "GroupFilter to use. Selects consumer groups to replicate."; public static final Class GROUP_FILTER_CLASS_DEFAULT = DefaultGroupFilter.class; + public static final String OFFSET_SYNCS_SOURCE_CONSUMER_ROLE = "offset-syncs-source-consumer"; + public static final String OFFSET_SYNCS_TARGET_CONSUMER_ROLE = "offset-syncs-target-consumer"; + public static final String OFFSET_SYNCS_SOURCE_ADMIN_ROLE = "offset-syncs-source-admin"; + public static final String OFFSET_SYNCS_TARGET_ADMIN_ROLE = "offset-syncs-target-admin"; public MirrorCheckpointConfig(Map props) { super(CONNECTOR_CONFIG_DEF, ConfigUtils.translateDeprecatedConfigs(props, new String[][]{ @@ -123,9 +127,10 @@ public class MirrorCheckpointConfig extends MirrorConnectorConfig { } } - Map taskConfigForConsumerGroups(List groups) { + Map taskConfigForConsumerGroups(List groups, int taskIndex) { Map props = originalsStrings(); props.put(TASK_CONSUMER_GROUPS, String.join(",", groups)); + props.put(TASK_INDEX, String.valueOf(taskIndex)); return props; } @@ -146,14 +151,14 @@ public class MirrorCheckpointConfig extends MirrorConnectorConfig { Map offsetSyncsTopicConsumerConfig() { return SOURCE_CLUSTER_ALIAS_DEFAULT.equals(offsetSyncsTopicLocation()) - ? sourceConsumerConfig() - : targetConsumerConfig(); + ? sourceConsumerConfig(OFFSET_SYNCS_SOURCE_CONSUMER_ROLE) + : targetConsumerConfig(OFFSET_SYNCS_TARGET_CONSUMER_ROLE); } Map offsetSyncsTopicAdminConfig() { return SOURCE_CLUSTER_ALIAS_DEFAULT.equals(offsetSyncsTopicLocation()) - ? sourceAdminConfig() - : targetAdminConfig(); + ? sourceAdminConfig(OFFSET_SYNCS_SOURCE_ADMIN_ROLE) + : targetAdminConfig(OFFSET_SYNCS_TARGET_ADMIN_ROLE); } Duration consumerPollTimeout() { diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java index 5780b288fc2..80154e8211f 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java @@ -38,6 +38,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; +import java.util.stream.IntStream; /** Replicate consumer group state between clusters. Emits checkpoint records. * @@ -76,9 +77,9 @@ public class MirrorCheckpointConnector extends SourceConnector { sourceAndTarget = new SourceAndTarget(config.sourceClusterAlias(), config.targetClusterAlias()); topicFilter = config.topicFilter(); groupFilter = config.groupFilter(); - sourceAdminClient = config.forwardingAdmin(config.sourceAdminConfig()); - targetAdminClient = config.forwardingAdmin(config.targetAdminConfig()); - scheduler = new Scheduler(MirrorCheckpointConnector.class, config.adminTimeout()); + sourceAdminClient = config.forwardingAdmin(config.sourceAdminConfig("checkpoint-source-admin")); + targetAdminClient = config.forwardingAdmin(config.targetAdminConfig("checkpoint-target-admin")); + scheduler = new Scheduler(getClass(), config.entityLabel(), config.adminTimeout()); scheduler.execute(this::createInternalTopics, "creating internal topics"); scheduler.execute(this::loadInitialConsumerGroups, "loading initial consumer groups"); scheduler.scheduleRepeatingDelayed(this::refreshConsumerGroups, config.refreshGroupsInterval(), @@ -109,14 +110,15 @@ public class MirrorCheckpointConnector extends SourceConnector { public List> taskConfigs(int maxTasks) { // if the replication is disabled, known consumer group is empty, or checkpoint emission is // disabled by setting 'emit.checkpoints.enabled' to false, the interval of checkpoint emission - // will be negative and no 'MirrorHeartbeatTask' will be created + // will be negative and no 'MirrorCheckpointTask' will be created if (!config.enabled() || knownConsumerGroups.isEmpty() || config.emitCheckpointsInterval().isNegative()) { return Collections.emptyList(); } int numTasks = Math.min(maxTasks, knownConsumerGroups.size()); - return ConnectorUtils.groupPartitions(knownConsumerGroups, numTasks).stream() - .map(config::taskConfigForConsumerGroups) + List> groupsPartitioned = ConnectorUtils.groupPartitions(knownConsumerGroups, numTasks); + return IntStream.range(0, numTasks) + .mapToObj(i -> config.taskConfigForConsumerGroups(groupsPartitioned.get(i), i)) .collect(Collectors.toList()); } diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java index 4357f8ad5ba..5e3c6e5539a 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java @@ -96,12 +96,12 @@ public class MirrorCheckpointTask extends SourceTask { interval = config.emitCheckpointsInterval(); pollTimeout = config.consumerPollTimeout(); offsetSyncStore = new OffsetSyncStore(config); - sourceAdminClient = config.forwardingAdmin(config.sourceAdminConfig()); - targetAdminClient = config.forwardingAdmin(config.targetAdminConfig()); + sourceAdminClient = config.forwardingAdmin(config.sourceAdminConfig("checkpoint-source-admin")); + targetAdminClient = config.forwardingAdmin(config.targetAdminConfig("checkpoint-target-admin")); metrics = config.metrics(); idleConsumerGroupsOffset = new HashMap<>(); checkpointsPerConsumerGroup = new HashMap<>(); - scheduler = new Scheduler(MirrorCheckpointTask.class, config.adminTimeout()); + scheduler = new Scheduler(getClass(), config.entityLabel(), config.adminTimeout()); scheduler.execute(() -> { offsetSyncStore.start(); scheduler.scheduleRepeating(this::refreshIdleConsumerGroupOffset, config.syncGroupOffsetsInterval(), diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskConfig.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskConfig.java index 4dfd2d781ce..757651383c9 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskConfig.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskConfig.java @@ -46,12 +46,22 @@ public class MirrorCheckpointTaskConfig extends MirrorCheckpointConfig { return metrics; } + @Override + String entityLabel() { + return super.entityLabel() + "-" + (getInt(TASK_INDEX) == null ? "?" : getInt(TASK_INDEX)); + } + protected static final ConfigDef TASK_CONFIG_DEF = new ConfigDef(CONNECTOR_CONFIG_DEF) .define( TASK_CONSUMER_GROUPS, ConfigDef.Type.LIST, null, ConfigDef.Importance.LOW, - TASK_CONSUMER_GROUPS_DOC); + TASK_CONSUMER_GROUPS_DOC) + .define(TASK_INDEX, + ConfigDef.Type.INT, + null, + ConfigDef.Importance.LOW, + "The index of the task"); } diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java index 0f158d3252d..3f520bf3aa1 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java @@ -107,6 +107,7 @@ public abstract class MirrorConnectorConfig extends AbstractConfig { public static final String OFFSET_SYNCS_TOPIC_LOCATION = "offset-syncs.topic.location"; public static final String OFFSET_SYNCS_TOPIC_LOCATION_DEFAULT = SOURCE_CLUSTER_ALIAS_DEFAULT; public static final String OFFSET_SYNCS_TOPIC_LOCATION_DOC = "The location (source/target) of the offset-syncs topic."; + public static final String TASK_INDEX = "task.index"; private final ReplicationPolicy replicationPolicy; @@ -139,17 +140,20 @@ public abstract class MirrorConnectorConfig extends AbstractConfig { return replicationPolicy; } - Map sourceProducerConfig() { + Map sourceProducerConfig(String role) { Map props = new HashMap<>(); props.putAll(originalsWithPrefix(SOURCE_CLUSTER_PREFIX)); props.keySet().retainAll(MirrorClientConfig.CLIENT_CONFIG_DEF.names()); props.putAll(originalsWithPrefix(PRODUCER_CLIENT_PREFIX)); props.putAll(originalsWithPrefix(SOURCE_PREFIX + PRODUCER_CLIENT_PREFIX)); + addClientId(props, role); return props; } - Map sourceConsumerConfig() { - return sourceConsumerConfig(originals()); + Map sourceConsumerConfig(String role) { + Map result = sourceConsumerConfig(originals()); + addClientId(result, role); + return result; } static Map sourceConsumerConfig(Map props) { @@ -163,25 +167,27 @@ public abstract class MirrorConnectorConfig extends AbstractConfig { return result; } - Map targetAdminConfig() { + Map targetAdminConfig(String role) { Map props = new HashMap<>(); props.putAll(originalsWithPrefix(TARGET_CLUSTER_PREFIX)); props.keySet().retainAll(MirrorClientConfig.CLIENT_CONFIG_DEF.names()); props.putAll(originalsWithPrefix(ADMIN_CLIENT_PREFIX)); props.putAll(originalsWithPrefix(TARGET_PREFIX + ADMIN_CLIENT_PREFIX)); + addClientId(props, role); return props; } - Map targetProducerConfig() { + Map targetProducerConfig(String role) { Map props = new HashMap<>(); props.putAll(originalsWithPrefix(TARGET_CLUSTER_PREFIX)); props.keySet().retainAll(MirrorClientConfig.CLIENT_CONFIG_DEF.names()); props.putAll(originalsWithPrefix(PRODUCER_CLIENT_PREFIX)); props.putAll(originalsWithPrefix(TARGET_PREFIX + PRODUCER_CLIENT_PREFIX)); + addClientId(props, role); return props; } - Map targetConsumerConfig() { + Map targetConsumerConfig(String role) { Map props = new HashMap<>(); props.putAll(originalsWithPrefix(TARGET_CLUSTER_PREFIX)); props.keySet().retainAll(MirrorClientConfig.CLIENT_CONFIG_DEF.names()); @@ -189,15 +195,17 @@ public abstract class MirrorConnectorConfig extends AbstractConfig { props.putAll(originalsWithPrefix(TARGET_PREFIX + CONSUMER_CLIENT_PREFIX)); props.put(ENABLE_AUTO_COMMIT_CONFIG, "false"); props.putIfAbsent(AUTO_OFFSET_RESET_CONFIG, "earliest"); + addClientId(props, role); return props; } - Map sourceAdminConfig() { + Map sourceAdminConfig(String role) { Map props = new HashMap<>(); props.putAll(originalsWithPrefix(SOURCE_CLUSTER_PREFIX)); props.keySet().retainAll(MirrorClientConfig.CLIENT_CONFIG_DEF.names()); props.putAll(originalsWithPrefix(ADMIN_CLIENT_PREFIX)); props.putAll(originalsWithPrefix(SOURCE_PREFIX + ADMIN_CLIENT_PREFIX)); + addClientId(props, role); return props; } @@ -223,6 +231,16 @@ public abstract class MirrorConnectorConfig extends AbstractConfig { } } + void addClientId(Map props, String role) { + String clientId = entityLabel() + (role == null ? "" : "|" + role); + props.compute(CommonClientConfigs.CLIENT_ID_CONFIG, + (k, userClientId) -> (userClientId == null ? "" : userClientId + "|") + clientId); + } + + String entityLabel() { + return sourceClusterAlias() + "->" + targetClusterAlias() + "|" + connectorName(); + } + @SuppressWarnings("deprecation") protected static final ConfigDef BASE_CONNECTOR_CONFIG_DEF = new ConfigDef(ConnectorConfig.configDef()) .define( diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorHeartbeatConnector.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorHeartbeatConnector.java index 0464f6f3952..6410e8fc3f9 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorHeartbeatConnector.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorHeartbeatConnector.java @@ -48,8 +48,8 @@ public class MirrorHeartbeatConnector extends SourceConnector { @Override public void start(Map props) { config = new MirrorHeartbeatConfig(props); - targetAdminClient = config.forwardingAdmin(config.targetAdminConfig()); - scheduler = new Scheduler(MirrorHeartbeatConnector.class, config.adminTimeout()); + targetAdminClient = config.forwardingAdmin(config.targetAdminConfig("heartbeats-target-admin")); + scheduler = new Scheduler(getClass(), config.entityLabel(), config.adminTimeout()); scheduler.execute(this::createInternalTopics, "creating internal topics"); } diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java index d33d6b4f686..4af27a66387 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java @@ -64,6 +64,8 @@ import java.util.Properties; import java.util.stream.Collectors; import java.io.File; +import static org.apache.kafka.clients.CommonClientConfigs.CLIENT_ID_CONFIG; + /** * Entry point for "MirrorMaker 2.0". *

@@ -267,6 +269,7 @@ public class MirrorMaker { String clientIdBase = ConnectUtils.clientIdBase(distributedConfig); // Create the admin client to be shared by all backing stores for this herder Map adminProps = new HashMap<>(distributedConfig.originals()); + adminProps.put(CLIENT_ID_CONFIG, clientIdBase + "shared-admin"); ConnectUtils.addMetricsContextProperties(adminProps, distributedConfig, kafkaClusterId); SharedTopicAdmin sharedAdmin = new SharedTopicAdmin(adminProps); KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore(sharedAdmin, () -> clientIdBase); diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java index 731d0b04fd1..4f1efeb6bfe 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java @@ -199,6 +199,7 @@ public class MirrorMakerConfig extends AbstractConfig { props.putAll(stringsWithPrefix(CONFIG_PROVIDERS_CONFIG)); // fill in reasonable defaults + props.putIfAbsent(CommonClientConfigs.CLIENT_ID_CONFIG, sourceAndTarget.toString()); props.putIfAbsent(GROUP_ID_CONFIG, sourceAndTarget.source() + "-mm2"); props.putIfAbsent(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, "mm2-offsets." + sourceAndTarget.source() + ".internal"); diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConfig.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConfig.java index 08aaec21130..3e582b50455 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConfig.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConfig.java @@ -95,6 +95,10 @@ public class MirrorSourceConfig extends MirrorConnectorConfig { + "Metrics have the target, topic and partition tags. When this setting is enabled, it adds the source tag. " + "This configuration will be removed in Kafka 4.0 and the default behavior will be to always have the source tag."; public static final boolean ADD_SOURCE_ALIAS_TO_METRICS_DEFAULT = false; + public static final String OFFSET_SYNCS_SOURCE_PRODUCER_ROLE = "offset-syncs-source-producer"; + public static final String OFFSET_SYNCS_TARGET_PRODUCER_ROLE = "offset-syncs-target-producer"; + public static final String OFFSET_SYNCS_SOURCE_ADMIN_ROLE = "offset-syncs-source-admin"; + public static final String OFFSET_SYNCS_TARGET_ADMIN_ROLE = "offset-syncs-target-admin"; public MirrorSourceConfig(Map props) { super(CONNECTOR_CONFIG_DEF, ConfigUtils.translateDeprecatedConfigs(props, new String[][]{ @@ -110,12 +114,13 @@ public class MirrorSourceConfig extends MirrorConnectorConfig { return getString(ConnectorConfig.NAME_CONFIG); } - Map taskConfigForTopicPartitions(List topicPartitions) { + Map taskConfigForTopicPartitions(List topicPartitions, int taskIndex) { Map props = originalsStrings(); String topicPartitionsString = topicPartitions.stream() .map(MirrorUtils::encodeTopicPartition) .collect(Collectors.joining(",")); props.put(TASK_TOPIC_PARTITIONS, topicPartitionsString); + props.put(TASK_INDEX, String.valueOf(taskIndex)); return props; } @@ -132,14 +137,14 @@ public class MirrorSourceConfig extends MirrorConnectorConfig { Map offsetSyncsTopicAdminConfig() { return SOURCE_CLUSTER_ALIAS_DEFAULT.equals(offsetSyncsTopicLocation()) - ? sourceAdminConfig() - : targetAdminConfig(); + ? sourceAdminConfig(OFFSET_SYNCS_SOURCE_ADMIN_ROLE) + : targetAdminConfig(OFFSET_SYNCS_TARGET_ADMIN_ROLE); } Map offsetSyncsTopicProducerConfig() { return SOURCE_CLUSTER_ALIAS_DEFAULT.equals(offsetSyncsTopicLocation()) - ? sourceProducerConfig() - : targetProducerConfig(); + ? sourceProducerConfig(OFFSET_SYNCS_SOURCE_PRODUCER_ROLE) + : targetProducerConfig(OFFSET_SYNCS_TARGET_PRODUCER_ROLE); } String checkpointsTopic() { diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java index 6c9d779fe11..4dd3c740700 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java @@ -62,6 +62,7 @@ import java.util.Collections; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; import java.util.stream.Collectors; +import java.util.stream.IntStream; import java.util.stream.Stream; import java.util.concurrent.ExecutionException; @@ -136,10 +137,10 @@ public class MirrorSourceConnector extends SourceConnector { configPropertyFilter = config.configPropertyFilter(); replicationPolicy = config.replicationPolicy(); replicationFactor = config.replicationFactor(); - sourceAdminClient = config.forwardingAdmin(config.sourceAdminConfig()); - targetAdminClient = config.forwardingAdmin(config.targetAdminConfig()); + sourceAdminClient = config.forwardingAdmin(config.sourceAdminConfig("replication-source-admin")); + targetAdminClient = config.forwardingAdmin(config.targetAdminConfig("replication-target-admin")); offsetSyncsAdminClient = config.forwardingAdmin(config.offsetSyncsTopicAdminConfig()); - scheduler = new Scheduler(MirrorSourceConnector.class, config.adminTimeout()); + scheduler = new Scheduler(getClass(), config.entityLabel(), config.adminTimeout()); scheduler.execute(this::createOffsetSyncsTopic, "creating upstream offset-syncs topic"); scheduler.execute(this::loadTopicPartitions, "loading initial set of topic-partitions"); scheduler.execute(this::computeAndCreateTopicPartitions, "creating downstream topic-partitions"); @@ -198,9 +199,9 @@ public class MirrorSourceConnector extends SourceConnector { roundRobinByTask.get(index).add(partition); count++; } - - return roundRobinByTask.stream().map(config::taskConfigForTopicPartitions) - .collect(Collectors.toList()); + return IntStream.range(0, numTasks) + .mapToObj(i -> config.taskConfigForTopicPartitions(roundRobinByTask.get(i), i)) + .collect(Collectors.toList()); } @Override diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java index e088e68f31f..84e393edb36 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java @@ -100,7 +100,7 @@ public class MirrorSourceTask extends SourceTask { replicationPolicy = config.replicationPolicy(); partitionStates = new HashMap<>(); offsetSyncsTopic = config.offsetSyncsTopic(); - consumer = MirrorUtils.newConsumer(config.sourceConsumerConfig()); + consumer = MirrorUtils.newConsumer(config.sourceConsumerConfig("replication-consumer")); offsetProducer = MirrorUtils.newProducer(config.offsetSyncsTopicProducerConfig()); Set taskTopicPartitions = config.taskTopicPartitions(); Map topicPartitionOffsets = loadOffsets(taskTopicPartitions); diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTaskConfig.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTaskConfig.java index 488053efc13..3a2eacd1a9e 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTaskConfig.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTaskConfig.java @@ -48,6 +48,11 @@ public class MirrorSourceTaskConfig extends MirrorSourceConfig { metricsReporters().forEach(metrics::addReporter); return metrics; } + + @Override + String entityLabel() { + return super.entityLabel() + "-" + (getInt(TASK_INDEX) == null ? "?" : getInt(TASK_INDEX)); + } protected static final ConfigDef TASK_CONFIG_DEF = new ConfigDef(CONNECTOR_CONFIG_DEF) .define( @@ -55,5 +60,10 @@ public class MirrorSourceTaskConfig extends MirrorSourceConfig { ConfigDef.Type.LIST, null, ConfigDef.Importance.LOW, - TASK_TOPIC_PARTITIONS_DOC); + TASK_TOPIC_PARTITIONS_DOC) + .define(TASK_INDEX, + ConfigDef.Type.INT, + null, + ConfigDef.Importance.LOW, + "The index of the task"); } diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/Scheduler.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/Scheduler.java index 0644d6a6c6c..642252a3e25 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/Scheduler.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/Scheduler.java @@ -39,8 +39,8 @@ class Scheduler implements AutoCloseable { this.timeout = timeout; } - Scheduler(Class clazz, Duration timeout) { - this("Scheduler for " + clazz.getSimpleName(), timeout); + Scheduler(Class clazz, String role, Duration timeout) { + this("Scheduler for " + clazz.getSimpleName() + ": " + role, timeout); } void scheduleRepeating(Task task, Duration interval, String description) { diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointConfigTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointConfigTest.java index d874991dc01..495fd2ebe15 100644 --- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointConfigTest.java +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointConfigTest.java @@ -24,6 +24,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import static org.apache.kafka.connect.mirror.TestUtils.assertEqualsExceptClientId; import static org.apache.kafka.connect.mirror.TestUtils.makeProps; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -35,7 +36,7 @@ public class MirrorCheckpointConfigTest { public void testTaskConfigConsumerGroups() { List groups = Arrays.asList("consumer-1", "consumer-2", "consumer-3"); MirrorCheckpointConfig config = new MirrorCheckpointConfig(makeProps()); - Map props = config.taskConfigForConsumerGroups(groups); + Map props = config.taskConfigForConsumerGroups(groups, 1); MirrorCheckpointTaskConfig taskConfig = new MirrorCheckpointTaskConfig(props); assertEquals(taskConfig.taskConsumerGroups(), new HashSet<>(groups), "Setting consumer groups property configuration failed"); @@ -76,9 +77,21 @@ public class MirrorCheckpointConfigTest { "fetch.min.bytes", "1" ); MirrorCheckpointConfig config = new MirrorCheckpointConfig(connectorProps); - assertEquals(config.sourceConsumerConfig(), config.offsetSyncsTopicConsumerConfig()); + Map sourceConsumerConfig = config.sourceConsumerConfig("test"); + Map offsetSyncsTopicSourceConsumerConfig = config.offsetSyncsTopicConsumerConfig(); + assertEqualsExceptClientId(sourceConsumerConfig, offsetSyncsTopicSourceConsumerConfig); + assertEquals("source1->target2|ConnectorName|test", sourceConsumerConfig.get("client.id")); + assertEquals( + "source1->target2|ConnectorName|" + MirrorCheckpointConfig.OFFSET_SYNCS_SOURCE_CONSUMER_ROLE, + offsetSyncsTopicSourceConsumerConfig.get("client.id")); connectorProps.put("offset-syncs.topic.location", "target"); config = new MirrorCheckpointConfig(connectorProps); - assertEquals(config.targetConsumerConfig(), config.offsetSyncsTopicConsumerConfig()); + Map targetConsumerConfig = config.targetConsumerConfig("test"); + Map offsetSyncsTopicTargetConsumerConfig = config.offsetSyncsTopicConsumerConfig(); + assertEqualsExceptClientId(targetConsumerConfig, offsetSyncsTopicTargetConsumerConfig); + assertEquals("source1->target2|ConnectorName|test", targetConsumerConfig.get("client.id")); + assertEquals( + "source1->target2|ConnectorName|" + MirrorCheckpointConfig.OFFSET_SYNCS_TARGET_CONSUMER_ROLE, + offsetSyncsTopicTargetConsumerConfig.get("client.id")); } } diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorConfigTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorConfigTest.java index 7b5e9ffe292..8754908e3b5 100644 --- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorConfigTest.java +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorConfigTest.java @@ -46,11 +46,12 @@ public class MirrorConnectorConfigTest { MirrorConnectorConfig.CONSUMER_CLIENT_PREFIX + "max.poll.interval.ms", "120000" ); MirrorConnectorConfig config = new TestMirrorConnectorConfig(connectorProps); - Map connectorConsumerProps = config.sourceConsumerConfig(); + Map connectorConsumerProps = config.sourceConsumerConfig("test"); Map expectedConsumerProps = new HashMap<>(); expectedConsumerProps.put("enable.auto.commit", "false"); expectedConsumerProps.put("auto.offset.reset", "earliest"); expectedConsumerProps.put("max.poll.interval.ms", "120000"); + expectedConsumerProps.put("client.id", "source1->target2|ConnectorName|test"); assertEquals(expectedConsumerProps, connectorConsumerProps); // checking auto.offset.reset override works @@ -58,7 +59,7 @@ public class MirrorConnectorConfigTest { MirrorConnectorConfig.CONSUMER_CLIENT_PREFIX + "auto.offset.reset", "latest" ); config = new TestMirrorConnectorConfig(connectorProps); - connectorConsumerProps = config.sourceConsumerConfig(); + connectorConsumerProps = config.sourceConsumerConfig("test"); expectedConsumerProps.put("auto.offset.reset", "latest"); expectedConsumerProps.remove("max.poll.interval.ms"); assertEquals(expectedConsumerProps, connectorConsumerProps, @@ -73,11 +74,12 @@ public class MirrorConnectorConfigTest { prefix + "max.poll.interval.ms", "100" ); MirrorConnectorConfig config = new TestMirrorConnectorConfig(connectorProps); - Map connectorConsumerProps = config.sourceConsumerConfig(); + Map connectorConsumerProps = config.sourceConsumerConfig("test"); Map expectedConsumerProps = new HashMap<>(); expectedConsumerProps.put("enable.auto.commit", "false"); expectedConsumerProps.put("auto.offset.reset", "latest"); expectedConsumerProps.put("max.poll.interval.ms", "100"); + expectedConsumerProps.put("client.id", "source1->target2|ConnectorName|test"); assertEquals(expectedConsumerProps, connectorConsumerProps, prefix + " source consumer config not matching"); } @@ -88,9 +90,10 @@ public class MirrorConnectorConfigTest { MirrorConnectorConfig.PRODUCER_CLIENT_PREFIX + "acks", "1" ); MirrorConnectorConfig config = new TestMirrorConnectorConfig(connectorProps); - Map connectorProducerProps = config.sourceProducerConfig(); + Map connectorProducerProps = config.sourceProducerConfig("test"); Map expectedProducerProps = new HashMap<>(); expectedProducerProps.put("acks", "1"); + expectedProducerProps.put("client.id", "source1->target2|ConnectorName|test"); assertEquals(expectedProducerProps, connectorProducerProps, MirrorConnectorConfig.PRODUCER_CLIENT_PREFIX + " source product config not matching"); } @@ -100,9 +103,10 @@ public class MirrorConnectorConfigTest { String prefix = MirrorConnectorConfig.SOURCE_PREFIX + MirrorConnectorConfig.PRODUCER_CLIENT_PREFIX; Map connectorProps = makeProps(prefix + "acks", "1"); MirrorConnectorConfig config = new TestMirrorConnectorConfig(connectorProps); - Map connectorProducerProps = config.sourceProducerConfig(); + Map connectorProducerProps = config.sourceProducerConfig("test"); Map expectedProducerProps = new HashMap<>(); expectedProducerProps.put("acks", "1"); + expectedProducerProps.put("client.id", "source1->target2|ConnectorName|test"); assertEquals(expectedProducerProps, connectorProducerProps, prefix + " source producer config not matching"); } @@ -114,9 +118,10 @@ public class MirrorConnectorConfigTest { "connections.max.idle.ms", "10000" ); MirrorConnectorConfig config = new TestMirrorConnectorConfig(connectorProps); - Map connectorAdminProps = config.sourceAdminConfig(); + Map connectorAdminProps = config.sourceAdminConfig("test"); Map expectedAdminProps = new HashMap<>(); expectedAdminProps.put("connections.max.idle.ms", "10000"); + expectedAdminProps.put("client.id", "source1->target2|ConnectorName|test"); assertEquals(expectedAdminProps, connectorAdminProps, MirrorConnectorConfig.ADMIN_CLIENT_PREFIX + " source connector admin props not matching"); } @@ -126,9 +131,10 @@ public class MirrorConnectorConfigTest { String prefix = MirrorConnectorConfig.SOURCE_PREFIX + MirrorConnectorConfig.ADMIN_CLIENT_PREFIX; Map connectorProps = makeProps(prefix + "connections.max.idle.ms", "10000"); MirrorConnectorConfig config = new TestMirrorConnectorConfig(connectorProps); - Map connectorAdminProps = config.sourceAdminConfig(); + Map connectorAdminProps = config.sourceAdminConfig("test"); Map expectedAdminProps = new HashMap<>(); expectedAdminProps.put("connections.max.idle.ms", "10000"); + expectedAdminProps.put("client.id", "source1->target2|ConnectorName|test"); assertEquals(expectedAdminProps, connectorAdminProps, prefix + " source connector admin props not matching"); } @@ -139,9 +145,10 @@ public class MirrorConnectorConfigTest { "connections.max.idle.ms", "10000" ); MirrorConnectorConfig config = new TestMirrorConnectorConfig(connectorProps); - Map connectorAdminProps = config.targetAdminConfig(); + Map connectorAdminProps = config.targetAdminConfig("test"); Map expectedAdminProps = new HashMap<>(); expectedAdminProps.put("connections.max.idle.ms", "10000"); + expectedAdminProps.put("client.id", "source1->target2|ConnectorName|test"); assertEquals(expectedAdminProps, connectorAdminProps, MirrorConnectorConfig.ADMIN_CLIENT_PREFIX + " target connector admin props not matching"); } @@ -151,9 +158,10 @@ public class MirrorConnectorConfigTest { String prefix = MirrorConnectorConfig.TARGET_PREFIX + MirrorConnectorConfig.ADMIN_CLIENT_PREFIX; Map connectorProps = makeProps(prefix + "connections.max.idle.ms", "10000"); MirrorConnectorConfig config = new TestMirrorConnectorConfig(connectorProps); - Map connectorAdminProps = config.targetAdminConfig(); + Map connectorAdminProps = config.targetAdminConfig("test"); Map expectedAdminProps = new HashMap<>(); expectedAdminProps.put("connections.max.idle.ms", "10000"); + expectedAdminProps.put("client.id", "source1->target2|ConnectorName|test"); assertEquals(expectedAdminProps, connectorAdminProps, prefix + " source connector admin props not matching"); } diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java index 9c017263a8c..085b56f6f76 100644 --- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java @@ -248,9 +248,11 @@ public class MirrorMakerConfigTest { SourceAndTarget a = new SourceAndTarget("b", "a"); SourceAndTarget b = new SourceAndTarget("a", "b"); Map aProps = mirrorConfig.workerConfig(a); + assertEquals("b->a", aProps.get("client.id")); assertEquals("123", aProps.get("offset.storage.replication.factor")); assertEquals("__", aProps.get("replication.policy.separator")); Map bProps = mirrorConfig.workerConfig(b); + assertEquals("a->b", bProps.get("client.id")); assertEquals("456", bProps.get("status.storage.replication.factor")); assertEquals("client-one", bProps.get("producer.client.id"), "producer props should be passed through to worker producer config: " + bProps); diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConfigTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConfigTest.java index a30f8e00cf2..b7037310737 100644 --- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConfigTest.java +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConfigTest.java @@ -26,6 +26,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import static org.apache.kafka.connect.mirror.TestUtils.assertEqualsExceptClientId; import static org.apache.kafka.connect.mirror.TestUtils.makeProps; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -39,7 +40,7 @@ public class MirrorSourceConfigTest { List topicPartitions = Arrays.asList(new TopicPartition("topic-1", 2), new TopicPartition("topic-3", 4), new TopicPartition("topic-5", 6)); MirrorSourceConfig config = new MirrorSourceConfig(makeProps()); - Map props = config.taskConfigForTopicPartitions(topicPartitions); + Map props = config.taskConfigForTopicPartitions(topicPartitions, 1); MirrorSourceTaskConfig taskConfig = new MirrorSourceTaskConfig(props); assertEquals(taskConfig.taskTopicPartitions(), new HashSet<>(topicPartitions), "Setting topic property configuration failed"); @@ -144,10 +145,20 @@ public class MirrorSourceConfigTest { "fetch.min.bytes", "1" ); MirrorSourceConfig config = new MirrorSourceConfig(connectorProps); - assertEquals(config.sourceProducerConfig(), config.offsetSyncsTopicProducerConfig()); + Map sourceProducerConfig = config.sourceProducerConfig("test"); + Map offsetSyncsTopicSourceProducerConfig = config.offsetSyncsTopicProducerConfig(); + assertEqualsExceptClientId(sourceProducerConfig, offsetSyncsTopicSourceProducerConfig); + assertEquals("source1->target2|ConnectorName|test", sourceProducerConfig.get("client.id")); + assertEquals("source1->target2|ConnectorName|" + MirrorSourceConfig.OFFSET_SYNCS_SOURCE_PRODUCER_ROLE, + offsetSyncsTopicSourceProducerConfig.get("client.id")); connectorProps.put("offset-syncs.topic.location", "target"); config = new MirrorSourceConfig(connectorProps); - assertEquals(config.targetProducerConfig(), config.offsetSyncsTopicProducerConfig()); + Map targetProducerConfig = config.targetProducerConfig("test"); + Map offsetSyncsTopicTargetProducerConfig = config.offsetSyncsTopicProducerConfig(); + assertEqualsExceptClientId(targetProducerConfig, offsetSyncsTopicTargetProducerConfig); + assertEquals("source1->target2|ConnectorName|test", targetProducerConfig.get("client.id")); + assertEquals("source1->target2|ConnectorName|" + MirrorSourceConfig.OFFSET_SYNCS_TARGET_PRODUCER_ROLE, + offsetSyncsTopicTargetProducerConfig.get("client.id")); } @Test @@ -159,9 +170,19 @@ public class MirrorSourceConfigTest { "retries", "123" ); MirrorSourceConfig config = new MirrorSourceConfig(connectorProps); - assertEquals(config.sourceAdminConfig(), config.offsetSyncsTopicAdminConfig()); + Map sourceAdminConfig = config.sourceAdminConfig("test"); + Map offsetSyncsTopicSourceAdminConfig = config.offsetSyncsTopicAdminConfig(); + assertEqualsExceptClientId(sourceAdminConfig, offsetSyncsTopicSourceAdminConfig); + assertEquals("source1->target2|ConnectorName|test", sourceAdminConfig.get("client.id")); + assertEquals("source1->target2|ConnectorName|" + MirrorSourceConfig.OFFSET_SYNCS_SOURCE_ADMIN_ROLE, + offsetSyncsTopicSourceAdminConfig.get("client.id")); connectorProps.put("offset-syncs.topic.location", "target"); config = new MirrorSourceConfig(connectorProps); - assertEquals(config.targetAdminConfig(), config.offsetSyncsTopicAdminConfig()); + Map targetAdminConfig = config.targetAdminConfig("test"); + Map offsetSyncsTopicTargetAdminConfig = config.offsetSyncsTopicAdminConfig(); + assertEqualsExceptClientId(targetAdminConfig, offsetSyncsTopicTargetAdminConfig); + assertEquals("source1->target2|ConnectorName|test", targetAdminConfig.get("client.id")); + assertEquals("source1->target2|ConnectorName|" + MirrorSourceConfig.OFFSET_SYNCS_TARGET_ADMIN_ROLE, + offsetSyncsTopicTargetAdminConfig.get("client.id")); } } diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceMetricsTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceMetricsTest.java index b2d0a77c3a5..ae43c72dcb0 100644 --- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceMetricsTest.java +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceMetricsTest.java @@ -47,6 +47,7 @@ public class MirrorSourceMetricsTest { configs.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, MirrorSourceConnector.class.getName()); configs.put(MirrorConnectorConfig.SOURCE_CLUSTER_ALIAS, SOURCE); configs.put(MirrorConnectorConfig.TARGET_CLUSTER_ALIAS, TARGET); + configs.put(MirrorConnectorConfig.TASK_INDEX, "0"); configs.put(MirrorSourceTaskConfig.TASK_TOPIC_PARTITIONS, TP.toString()); reporter = new TestReporter(); } diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/TestUtils.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/TestUtils.java index 64f689a56d0..ebe77fddf92 100644 --- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/TestUtils.java +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/TestUtils.java @@ -19,6 +19,8 @@ package org.apache.kafka.connect.mirror; import java.util.HashMap; import java.util.Map; +import static org.junit.jupiter.api.Assertions.assertEquals; + public class TestUtils { static Map makeProps(String... keyValues) { @@ -43,4 +45,12 @@ public class TestUtils { } return records; } + + public static void assertEqualsExceptClientId(Map expected, Map actual) { + Map expectedWithoutClientId = new HashMap<>(expected); + expectedWithoutClientId.remove("client.id"); + Map actualWithoutClientId = new HashMap<>(actual); + actualWithoutClientId.remove("client.id"); + assertEquals(expectedWithoutClientId, actualWithoutClientId); + } } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java index 046ccb8e256..23278f998f7 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java @@ -291,6 +291,9 @@ public class DistributedHerder extends AbstractHerder implements Runnable { String clientIdConfig = config.getString(CommonClientConfigs.CLIENT_ID_CONFIG); String clientId = clientIdConfig.length() <= 0 ? "connect-" + CONNECT_CLIENT_ID_SEQUENCE.getAndIncrement() : clientIdConfig; + // Thread factory uses String.format and '%' is handled as a placeholder + // need to escape if the client.id contains an actual % character + String escapedClientIdForThreadNameFormat = clientId.replace("%", "%%"); LogContext logContext = new LogContext("[Worker clientId=" + clientId + ", groupId=" + this.workerGroupId + "] "); log = logContext.logger(DistributedHerder.class); @@ -303,17 +306,17 @@ public class DistributedHerder extends AbstractHerder implements Runnable { TimeUnit.MILLISECONDS, new LinkedBlockingDeque<>(1), ThreadUtils.createThreadFactory( - this.getClass().getSimpleName() + "-" + clientId + "-%d", false)); + this.getClass().getSimpleName() + "-" + escapedClientIdForThreadNameFormat + "-%d", false)); this.forwardRequestExecutor = forwardRequestExecutor != null ? forwardRequestExecutor : Executors.newFixedThreadPool( 1, - ThreadUtils.createThreadFactory("ForwardRequestExecutor-" + clientId + "-%d", false) + ThreadUtils.createThreadFactory("ForwardRequestExecutor-" + escapedClientIdForThreadNameFormat + "-%d", false) ); this.startAndStopExecutor = Executors.newFixedThreadPool(START_STOP_THREAD_POOL_SIZE, ThreadUtils.createThreadFactory( - "StartAndStopExecutor-" + clientId + "-%d", false)); + "StartAndStopExecutor-" + escapedClientIdForThreadNameFormat + "-%d", false)); this.config = config; stopping = new AtomicBoolean(false);