diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java index cdc1d47fcfe..088330e5494 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java @@ -234,22 +234,24 @@ public class MirrorMaker { plugins.compareAndSwapWithDelegatingLoader(); DistributedConfig distributedConfig = new DistributedConfig(workerProps); String kafkaClusterId = distributedConfig.kafkaClusterId(); + String clientIdBase = ConnectUtils.clientIdBase(distributedConfig); // Create the admin client to be shared by all backing stores for this herder Map adminProps = new HashMap<>(distributedConfig.originals()); ConnectUtils.addMetricsContextProperties(adminProps, distributedConfig, kafkaClusterId); SharedTopicAdmin sharedAdmin = new SharedTopicAdmin(adminProps); - KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore(sharedAdmin); + KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore(sharedAdmin, () -> clientIdBase); offsetBackingStore.configure(distributedConfig); Worker worker = new Worker(workerId, time, plugins, distributedConfig, offsetBackingStore, CLIENT_CONFIG_OVERRIDE_POLICY); WorkerConfigTransformer configTransformer = worker.configTransformer(); Converter internalValueConverter = worker.getInternalValueConverter(); - StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, internalValueConverter, sharedAdmin); + StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, internalValueConverter, sharedAdmin, clientIdBase); statusBackingStore.configure(distributedConfig); ConfigBackingStore configBackingStore = new KafkaConfigBackingStore( internalValueConverter, distributedConfig, configTransformer, - sharedAdmin); + sharedAdmin, + clientIdBase); // Pass the shared admin to the distributed herder as an additional AutoCloseable object that should be closed when the // herder is stopped. MirrorMaker has multiple herders, and having the herder own the close responsibility is much easier than // tracking the various shared admin objects in this class. diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java index 469f73add76..513e5e5a152 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java @@ -46,6 +46,8 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; +import static org.apache.kafka.clients.CommonClientConfigs.CLIENT_ID_CONFIG; + /** *

* Command line utility that runs Kafka Connect in distributed mode. In this mode, the process joints a group of other workers @@ -103,12 +105,15 @@ public class ConnectDistributed { URI advertisedUrl = rest.advertisedUrl(); String workerId = advertisedUrl.getHost() + ":" + advertisedUrl.getPort(); + String clientIdBase = ConnectUtils.clientIdBase(config); + // Create the admin client to be shared by all backing stores. Map adminProps = new HashMap<>(config.originals()); ConnectUtils.addMetricsContextProperties(adminProps, config, kafkaClusterId); + adminProps.put(CLIENT_ID_CONFIG, clientIdBase + "shared-admin"); SharedTopicAdmin sharedAdmin = new SharedTopicAdmin(adminProps); - KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore(sharedAdmin); + KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore(sharedAdmin, () -> clientIdBase); offsetBackingStore.configure(config); ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy = plugins.newPlugin( @@ -119,14 +124,15 @@ public class ConnectDistributed { WorkerConfigTransformer configTransformer = worker.configTransformer(); Converter internalValueConverter = worker.getInternalValueConverter(); - StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, internalValueConverter, sharedAdmin); + StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, internalValueConverter, sharedAdmin, clientIdBase); statusBackingStore.configure(config); ConfigBackingStore configBackingStore = new KafkaConfigBackingStore( internalValueConverter, config, configTransformer, - sharedAdmin); + sharedAdmin, + clientIdBase); // Pass the shared admin to the distributed herder as an additional AutoCloseable object that should be closed when the // herder is stopped. This is easier than having to track and own the lifecycle ourselves. diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java index edbab8ec040..bceb73e1715 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.connect.storage; +import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -65,6 +66,7 @@ import java.util.HashSet; import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.TreeSet; import java.util.concurrent.ExecutionException; @@ -261,6 +263,7 @@ public class KafkaConfigBackingStore implements ConfigBackingStore { final Map> taskConfigs = new HashMap<>(); private final Supplier topicAdminSupplier; private SharedTopicAdmin ownTopicAdmin; + private final String clientId; // Set of connectors where we saw a task commit with an incomplete set of task config updates, indicating the data // is in an inconsistent state and we cannot safely use them until they have been refreshed. @@ -289,15 +292,16 @@ public class KafkaConfigBackingStore implements ConfigBackingStore { @Deprecated public KafkaConfigBackingStore(Converter converter, DistributedConfig config, WorkerConfigTransformer configTransformer) { - this(converter, config, configTransformer, null); + this(converter, config, configTransformer, null, "connect-distributed-"); } - public KafkaConfigBackingStore(Converter converter, DistributedConfig config, WorkerConfigTransformer configTransformer, Supplier adminSupplier) { + public KafkaConfigBackingStore(Converter converter, DistributedConfig config, WorkerConfigTransformer configTransformer, Supplier adminSupplier, String clientIdBase) { this.lock = new Object(); this.started = false; this.converter = converter; this.offset = -1; this.topicAdminSupplier = adminSupplier; + this.clientId = Objects.requireNonNull(clientIdBase) + "configs"; this.baseProducerProps = baseProducerProps(config); // By default, Connect disables idempotent behavior for all producers, even though idempotence became @@ -390,6 +394,7 @@ public class KafkaConfigBackingStore implements ConfigBackingStore { Map fencableProducerProps(DistributedConfig workerConfig) { Map result = new HashMap<>(baseProducerProps(workerConfig)); + result.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-leader"); // Always require producer acks to all to ensure durable writes result.put(ProducerConfig.ACKS_CONFIG, "all"); // We can set this to 5 instead of 1 without risking reordering because we are using an idempotent producer @@ -663,14 +668,16 @@ public class KafkaConfigBackingStore implements ConfigBackingStore { // package private for testing KafkaBasedLog setupAndCreateKafkaBasedLog(String topic, final WorkerConfig config) { - Map producerProps = new HashMap<>(baseProducerProps); - String clusterId = config.kafkaClusterId(); Map originals = config.originals(); + Map producerProps = new HashMap<>(baseProducerProps); + producerProps.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId); + Map consumerProps = new HashMap<>(originals); consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); + consumerProps.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId); ConnectUtils.addMetricsContextProperties(consumerProps, config, clusterId); if (config.exactlyOnceSourceEnabled()) { ConnectUtils.ensureProperty( @@ -682,6 +689,7 @@ public class KafkaConfigBackingStore implements ConfigBackingStore { Map adminProps = new HashMap<>(originals); ConnectUtils.addMetricsContextProperties(adminProps, config, clusterId); + adminProps.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId); Supplier adminSupplier; if (topicAdminSupplier != null) { adminSupplier = topicAdminSupplier; diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java index 28fd37709da..095e965a9dd 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.connect.storage; +import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; @@ -86,7 +87,7 @@ public class KafkaOffsetBackingStore implements OffsetBackingStore { Consumer consumer, TopicAdmin topicAdmin ) { - return new KafkaOffsetBackingStore(() -> topicAdmin) { + return new KafkaOffsetBackingStore(() -> topicAdmin, KafkaOffsetBackingStore::noClientId) { @Override public void configure(final WorkerConfig config) { this.exactlyOnce = config.exactlyOnceSourceEnabled(); @@ -116,7 +117,7 @@ public class KafkaOffsetBackingStore implements OffsetBackingStore { Consumer consumer, TopicAdmin topicAdmin ) { - return new KafkaOffsetBackingStore(() -> topicAdmin) { + return new KafkaOffsetBackingStore(() -> topicAdmin, KafkaOffsetBackingStore::noClientId) { @Override public void configure(final WorkerConfig config) { this.exactlyOnce = config.exactlyOnceSourceEnabled(); @@ -133,9 +134,14 @@ public class KafkaOffsetBackingStore implements OffsetBackingStore { }; } + private static String noClientId() { + throw new UnsupportedOperationException("This offset store should not instantiate any Kafka clients"); + } + protected KafkaBasedLog offsetLog; private final HashMap data = new HashMap<>(); private final Supplier topicAdminSupplier; + private final Supplier clientIdBase; private SharedTopicAdmin ownTopicAdmin; protected boolean exactlyOnce; @@ -144,11 +150,12 @@ public class KafkaOffsetBackingStore implements OffsetBackingStore { * store to instantiate and close its own {@link TopicAdmin} during {@link #configure(WorkerConfig)} * and {@link #stop()}, respectively. * - * @deprecated use {@link #KafkaOffsetBackingStore(Supplier)} instead + * @deprecated use {@link #KafkaOffsetBackingStore(Supplier, Supplier)} instead */ @Deprecated public KafkaOffsetBackingStore() { this.topicAdminSupplier = null; + this.clientIdBase = () -> "connect-distributed-"; } /** @@ -158,9 +165,14 @@ public class KafkaOffsetBackingStore implements OffsetBackingStore { * {@link TopicAdmin#close(Duration) closing} it when it is no longer needed. * @param topicAdmin a {@link Supplier} for the {@link TopicAdmin} to use for this backing store; * may not be null, and may not return null + * @param clientIdBase a {@link Supplier} that will be used to create a + * {@link CommonClientConfigs#CLIENT_ID_DOC client ID} for Kafka clients instantiated by this store; + * may not be null, and may not return null, but may throw {@link UnsupportedOperationException} + * if this offset store should not create its own Kafka clients */ - public KafkaOffsetBackingStore(Supplier topicAdmin) { + public KafkaOffsetBackingStore(Supplier topicAdmin, Supplier clientIdBase) { this.topicAdminSupplier = Objects.requireNonNull(topicAdmin); + this.clientIdBase = Objects.requireNonNull(clientIdBase); } @@ -173,6 +185,7 @@ public class KafkaOffsetBackingStore implements OffsetBackingStore { this.exactlyOnce = config.exactlyOnceSourceEnabled(); String clusterId = config.kafkaClusterId(); + String clientId = Objects.requireNonNull(clientIdBase.get()) + "offsets"; Map originals = config.originals(); Map producerProps = new HashMap<>(originals); @@ -185,11 +198,13 @@ public class KafkaOffsetBackingStore implements OffsetBackingStore { // These settings might change when https://cwiki.apache.org/confluence/display/KAFKA/KIP-318%3A+Make+Kafka+Connect+Source+idempotent // gets approved and scheduled for release. producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "false"); + producerProps.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId); ConnectUtils.addMetricsContextProperties(producerProps, config, clusterId); Map consumerProps = new HashMap<>(originals); consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); + consumerProps.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId); ConnectUtils.addMetricsContextProperties(consumerProps, config, clusterId); if (config.exactlyOnceSourceEnabled()) { ConnectUtils.ensureProperty( @@ -200,6 +215,7 @@ public class KafkaOffsetBackingStore implements OffsetBackingStore { } Map adminProps = new HashMap<>(originals); + adminProps.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId); ConnectUtils.addMetricsContextProperties(adminProps, config, clusterId); Supplier adminSupplier; if (topicAdminSupplier != null) { @@ -276,8 +292,8 @@ public class KafkaOffsetBackingStore implements OffsetBackingStore { *

* Note: if the now-deprecated {@link #KafkaOffsetBackingStore()} constructor was used to create * this store, the underlying admin client allocated for interacting with the offsets topic will be closed. - * On the other hand, if the recommended {@link #KafkaOffsetBackingStore(Supplier)} constructor was used to - * create this store, the admin client derived from the given {@link Supplier} will not be closed and it is the + * On the other hand, if the recommended {@link #KafkaOffsetBackingStore(Supplier, Supplier)} constructor was + * used to create this store, the admin client derived from the given {@link Supplier} will not be closed and it is the * caller's responsibility to manage its lifecycle accordingly. */ @Override diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java index ebf4939bfdd..1049d9b71ff 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.connect.storage; +import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -131,6 +132,7 @@ public class KafkaStatusBackingStore implements StatusBackingStore { protected final Map> connectors; protected final ConcurrentMap> topics; private final Supplier topicAdminSupplier; + private final String clientId; private String statusTopic; private KafkaBasedLog kafkaLog; @@ -139,16 +141,17 @@ public class KafkaStatusBackingStore implements StatusBackingStore { @Deprecated public KafkaStatusBackingStore(Time time, Converter converter) { - this(time, converter, null); + this(time, converter, null, "connect-distributed-"); } - public KafkaStatusBackingStore(Time time, Converter converter, Supplier topicAdminSupplier) { + public KafkaStatusBackingStore(Time time, Converter converter, Supplier topicAdminSupplier, String clientIdBase) { this.time = time; this.converter = converter; this.tasks = new Table<>(); this.connectors = new HashMap<>(); this.topics = new ConcurrentHashMap<>(); this.topicAdminSupplier = topicAdminSupplier; + this.clientId = Objects.requireNonNull(clientIdBase) + "statuses"; } // visible for testing @@ -176,14 +179,17 @@ public class KafkaStatusBackingStore implements StatusBackingStore { // These settings might change when https://cwiki.apache.org/confluence/display/KAFKA/KIP-318%3A+Make+Kafka+Connect+Source+idempotent // gets approved and scheduled for release. producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "false"); // disable idempotence since retries is force to 0 + producerProps.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId); ConnectUtils.addMetricsContextProperties(producerProps, config, clusterId); Map consumerProps = new HashMap<>(originals); consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); + consumerProps.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId); ConnectUtils.addMetricsContextProperties(consumerProps, config, clusterId); Map adminProps = new HashMap<>(originals); + adminProps.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId); ConnectUtils.addMetricsContextProperties(adminProps, config, clusterId); Supplier adminSupplier; if (topicAdminSupplier != null) { @@ -208,7 +214,8 @@ public class KafkaStatusBackingStore implements StatusBackingStore { this.kafkaLog = createKafkaBasedLog(statusTopic, producerProps, consumerProps, readCallback, topicDescription, adminSupplier); } - private KafkaBasedLog createKafkaBasedLog(String topic, Map producerProps, + // Visible for testing + protected KafkaBasedLog createKafkaBasedLog(String topic, Map producerProps, Map consumerProps, Callback> consumedCallback, final NewTopic topicDescription, Supplier adminSupplier) { diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java index a043d0f4709..b9c7cdc66c8 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java @@ -37,6 +37,8 @@ import java.util.function.Function; import java.util.stream.Collector; import java.util.stream.Collectors; +import static org.apache.kafka.clients.CommonClientConfigs.CLIENT_ID_CONFIG; + public final class ConnectUtils { private static final Logger log = LoggerFactory.getLogger(ConnectUtils.class); @@ -171,4 +173,23 @@ public final class ConnectUtils { return new ConnectException(message, t); } + /** + * Create the base of a {@link CommonClientConfigs#CLIENT_ID_DOC client ID} that can be + * used for Kafka clients instantiated by this worker. Workers should append an extra identifier + * to the end of this base ID to include extra information on what they are using it for; for example, + * {@code clientIdBase(config) + "configs"} could be used as the client ID for a consumer, producer, + * or admin client used to interact with a worker's config topic. + * @param config the worker config; may not be null + * @return the base client ID for this worker; never null, never empty, and will always end in a + * hyphen ('-') + */ + public static String clientIdBase(WorkerConfig config) { + String result = Optional.ofNullable(config.groupId()) + .orElse("connect"); + String userSpecifiedClientId = config.getString(CLIENT_ID_CONFIG); + if (userSpecifiedClientId != null) { + result += "-" + userSpecifiedClientId; + } + return result + "-"; + } } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java index 557d6775257..6a5c002e5aa 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java @@ -65,6 +65,7 @@ import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; +import static org.apache.kafka.clients.CommonClientConfigs.CLIENT_ID_CONFIG; import static org.apache.kafka.clients.consumer.ConsumerConfig.ISOLATION_LEVEL_CONFIG; import static org.apache.kafka.clients.producer.ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG; import static org.apache.kafka.clients.producer.ProducerConfig.TRANSACTIONAL_ID_CONFIG; @@ -84,6 +85,7 @@ import static org.junit.Assert.assertTrue; @PrepareForTest({KafkaConfigBackingStore.class, WorkerConfig.class}) @PowerMockIgnore({"javax.management.*", "javax.crypto.*"}) public class KafkaConfigBackingStoreTest { + private static final String CLIENT_ID_BASE = "test-client-id-"; private static final String TOPIC = "connect-configs"; private static final short TOPIC_REPLICATION_FACTOR = 5; private static final Map DEFAULT_CONFIG_STORAGE_PROPS = new HashMap<>(); @@ -183,7 +185,7 @@ public class KafkaConfigBackingStoreTest { configStorage = PowerMock.createPartialMock( KafkaConfigBackingStore.class, new String[]{"createKafkaBasedLog", "createFencableProducer"}, - converter, config, null); + converter, config, null, null, CLIENT_ID_BASE); Whitebox.setInternalState(configStorage, "configLog", storeLog); configStorage.setUpdateListener(configUpdateListener); } @@ -1416,6 +1418,27 @@ public class KafkaConfigBackingStoreTest { PowerMock.verifyAll(); } + @Test + public void testClientIds() throws Exception { + Map workerProps = new HashMap<>(DEFAULT_CONFIG_STORAGE_PROPS); + workerProps.put(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, "enabled"); + DistributedConfig config = new DistributedConfig(workerProps); + createStore(config, storeLog); + + expectConfigure(); + PowerMock.replayAll(); + + configStorage.setupAndCreateKafkaBasedLog(TOPIC, config); + Map fencableProducerProps = configStorage.fencableProducerProps(config); + + final String expectedClientId = CLIENT_ID_BASE + "configs"; + assertEquals(expectedClientId, capturedProducerProps.getValue().get(CLIENT_ID_CONFIG)); + assertEquals(expectedClientId, capturedConsumerProps.getValue().get(CLIENT_ID_CONFIG)); + assertEquals(expectedClientId + "-leader", fencableProducerProps.get(CLIENT_ID_CONFIG)); + + PowerMock.verifyAll(); + } + private void expectConfigure() throws Exception { PowerMock.expectPrivate(configStorage, "createKafkaBasedLog", EasyMock.capture(capturedTopic), EasyMock.capture(capturedProducerProps), diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java index d019c983989..e8a4223e45d 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java @@ -29,6 +29,7 @@ import org.apache.kafka.connect.runtime.WorkerConfig; import org.apache.kafka.connect.runtime.distributed.DistributedConfig; import org.apache.kafka.connect.util.Callback; import org.apache.kafka.connect.util.KafkaBasedLog; +import org.apache.kafka.connect.util.SharedTopicAdmin; import org.apache.kafka.connect.util.TopicAdmin; import org.easymock.Capture; import org.easymock.EasyMock; @@ -56,6 +57,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Supplier; +import static org.apache.kafka.clients.CommonClientConfigs.CLIENT_ID_CONFIG; import static org.apache.kafka.clients.consumer.ConsumerConfig.ISOLATION_LEVEL_CONFIG; import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG; import static org.junit.Assert.assertEquals; @@ -69,6 +71,7 @@ import static org.junit.Assert.fail; @PrepareForTest({KafkaOffsetBackingStore.class, WorkerConfig.class}) @PowerMockIgnore({"javax.management.*", "javax.crypto.*"}) public class KafkaOffsetBackingStoreTest { + private static final String CLIENT_ID_BASE = "test-client-id-"; private static final String TOPIC = "connect-offsets"; private static final short TOPIC_PARTITIONS = 2; private static final short TOPIC_REPLICATION_FACTOR = 5; @@ -116,7 +119,17 @@ public class KafkaOffsetBackingStoreTest { @Before public void setUp() throws Exception { - store = PowerMock.createPartialMockAndInvokeDefaultConstructor(KafkaOffsetBackingStore.class, "createKafkaBasedLog"); + Supplier adminSupplier = () -> { + fail("Should not attempt to instantiate admin in these tests"); + // Should never be reached; only add this thrown exception to satisfy the compiler + throw new AssertionError(); + }; + Supplier clientIdBase = () -> CLIENT_ID_BASE; + store = PowerMock.createPartialMock( + KafkaOffsetBackingStore.class, + new String[] {"createKafkaBasedLog"}, + adminSupplier, clientIdBase + ); } @Test @@ -469,6 +482,24 @@ public class KafkaOffsetBackingStoreTest { PowerMock.verifyAll(); } + @Test + public void testClientIds() throws Exception { + Map workerProps = new HashMap<>(DEFAULT_PROPS); + DistributedConfig config = new DistributedConfig(workerProps); + + expectConfigure(); + expectClusterId(); + PowerMock.replayAll(); + + store.configure(config); + + final String expectedClientId = CLIENT_ID_BASE + "offsets"; + assertEquals(expectedClientId, capturedProducerProps.getValue().get(CLIENT_ID_CONFIG)); + assertEquals(expectedClientId, capturedConsumerProps.getValue().get(CLIENT_ID_CONFIG)); + + PowerMock.verifyAll(); + } + private void expectConfigure() throws Exception { PowerMock.expectPrivate(store, "createKafkaBasedLog", EasyMock.capture(capturedTopic), EasyMock.capture(capturedProducerProps), EasyMock.capture(capturedConsumerProps), EasyMock.capture(capturedConsumedCallback), diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaStatusBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaStatusBackingStoreTest.java index e109c9f2f4d..0b4293f31a5 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaStatusBackingStoreTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaStatusBackingStoreTest.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.connect.storage; +import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.common.config.ConfigException; @@ -33,6 +34,7 @@ import org.apache.kafka.connect.runtime.WorkerConfig; import org.apache.kafka.connect.runtime.distributed.DistributedConfig; import org.apache.kafka.connect.util.ConnectorTaskId; import org.apache.kafka.connect.util.KafkaBasedLog; +import org.apache.kafka.connect.util.TopicAdmin; import org.junit.Before; import org.junit.Test; import org.mockito.ArgumentCaptor; @@ -42,7 +44,9 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Optional; +import java.util.function.Supplier; +import static org.apache.kafka.clients.CommonClientConfigs.CLIENT_ID_CONFIG; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.any; import static org.mockito.Mockito.doAnswer; @@ -354,6 +358,31 @@ public class KafkaStatusBackingStoreTest { assertTrue(store.getAll(CONNECTOR).isEmpty()); } + @Test + public void testClientIds() { + String clientIdBase = "test-client-id-"; + Supplier topicAdminSupplier = () -> mock(TopicAdmin.class); + + Map capturedProducerProps = new HashMap<>(); + Map capturedConsumerProps = new HashMap<>(); + + store = new KafkaStatusBackingStore(new MockTime(), converter, topicAdminSupplier, clientIdBase) { + @Override + protected KafkaBasedLog createKafkaBasedLog(String topic, Map producerProps, Map consumerProps, org.apache.kafka.connect.util.Callback> consumedCallback, NewTopic topicDescription, Supplier adminSupplier) { + capturedProducerProps.putAll(producerProps); + capturedConsumerProps.putAll(consumerProps); + return kafkaBasedLog; + } + }; + + when(workerConfig.getString(DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG)).thenReturn("connect-statuses"); + store.configure(workerConfig); + + final String expectedClientId = clientIdBase + "statuses"; + assertEquals(expectedClientId, capturedProducerProps.get(CLIENT_ID_CONFIG)); + assertEquals(expectedClientId, capturedConsumerProps.get(CLIENT_ID_CONFIG)); + } + private static ConsumerRecord consumerRecord(long offset, String key, byte[] value) { return new ConsumerRecord<>(STATUS_TOPIC, 0, offset, System.currentTimeMillis(), TimestampType.CREATE_TIME, 0, 0, key, value, new RecordHeaders(), Optional.empty()); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/ConnectUtilsTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/ConnectUtilsTest.java index d68add14eb3..04701f2f712 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/ConnectUtilsTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/ConnectUtilsTest.java @@ -27,8 +27,11 @@ import java.util.HashMap; import java.util.Map; import java.util.Optional; +import static org.apache.kafka.clients.CommonClientConfigs.CLIENT_ID_CONFIG; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class ConnectUtilsTest { @@ -134,4 +137,30 @@ public class ConnectUtilsTest { assertEquals(Collections.singletonMap("\u1984", "big brother"), props); } + @Test + public void testClientIdBase() { + String groupId = "connect-cluster"; + String userSpecifiedClientId = "worker-57"; + + String expectedClientIdBase = groupId + "-" + userSpecifiedClientId + "-"; + assertClientIdBase(groupId, userSpecifiedClientId, expectedClientIdBase); + + expectedClientIdBase = groupId + "-"; + assertClientIdBase(groupId, null, expectedClientIdBase); + + expectedClientIdBase = "connect-"; + assertClientIdBase(null, null, expectedClientIdBase); + + expectedClientIdBase = "connect-" + userSpecifiedClientId + "-"; + assertClientIdBase(null, userSpecifiedClientId, expectedClientIdBase); + } + + private void assertClientIdBase(String groupId, String userSpecifiedClientId, String expectedClientIdBase) { + WorkerConfig config = mock(WorkerConfig.class); + when(config.groupId()).thenReturn(groupId); + when(config.getString(CLIENT_ID_CONFIG)).thenReturn(userSpecifiedClientId); + String actualClientIdBase = ConnectUtils.clientIdBase(config); + assertEquals(expectedClientIdBase, actualClientIdBase); + } + }