KAFKA-14098: Add meaningful client IDs for Connect workers (#12544)

Reviewers: Greg Harris <greg.harris@aiven.io>, Mickael Maison <mickael.maison@gmail.com>
This commit is contained in:
Chris Egerton 2022-11-08 10:22:30 -05:00 committed by GitHub
parent 4560978ed7
commit bb84476215
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 193 additions and 21 deletions

View File

@ -234,22 +234,24 @@ public class MirrorMaker {
plugins.compareAndSwapWithDelegatingLoader();
DistributedConfig distributedConfig = new DistributedConfig(workerProps);
String kafkaClusterId = distributedConfig.kafkaClusterId();
String clientIdBase = ConnectUtils.clientIdBase(distributedConfig);
// Create the admin client to be shared by all backing stores for this herder
Map<String, Object> adminProps = new HashMap<>(distributedConfig.originals());
ConnectUtils.addMetricsContextProperties(adminProps, distributedConfig, kafkaClusterId);
SharedTopicAdmin sharedAdmin = new SharedTopicAdmin(adminProps);
KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore(sharedAdmin);
KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore(sharedAdmin, () -> clientIdBase);
offsetBackingStore.configure(distributedConfig);
Worker worker = new Worker(workerId, time, plugins, distributedConfig, offsetBackingStore, CLIENT_CONFIG_OVERRIDE_POLICY);
WorkerConfigTransformer configTransformer = worker.configTransformer();
Converter internalValueConverter = worker.getInternalValueConverter();
StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, internalValueConverter, sharedAdmin);
StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, internalValueConverter, sharedAdmin, clientIdBase);
statusBackingStore.configure(distributedConfig);
ConfigBackingStore configBackingStore = new KafkaConfigBackingStore(
internalValueConverter,
distributedConfig,
configTransformer,
sharedAdmin);
sharedAdmin,
clientIdBase);
// Pass the shared admin to the distributed herder as an additional AutoCloseable object that should be closed when the
// herder is stopped. MirrorMaker has multiple herders, and having the herder own the close responsibility is much easier than
// tracking the various shared admin objects in this class.

View File

@ -46,6 +46,8 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import static org.apache.kafka.clients.CommonClientConfigs.CLIENT_ID_CONFIG;
/**
* <p>
* Command line utility that runs Kafka Connect in distributed mode. In this mode, the process joints a group of other workers
@ -103,12 +105,15 @@ public class ConnectDistributed {
URI advertisedUrl = rest.advertisedUrl();
String workerId = advertisedUrl.getHost() + ":" + advertisedUrl.getPort();
String clientIdBase = ConnectUtils.clientIdBase(config);
// Create the admin client to be shared by all backing stores.
Map<String, Object> adminProps = new HashMap<>(config.originals());
ConnectUtils.addMetricsContextProperties(adminProps, config, kafkaClusterId);
adminProps.put(CLIENT_ID_CONFIG, clientIdBase + "shared-admin");
SharedTopicAdmin sharedAdmin = new SharedTopicAdmin(adminProps);
KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore(sharedAdmin);
KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore(sharedAdmin, () -> clientIdBase);
offsetBackingStore.configure(config);
ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy = plugins.newPlugin(
@ -119,14 +124,15 @@ public class ConnectDistributed {
WorkerConfigTransformer configTransformer = worker.configTransformer();
Converter internalValueConverter = worker.getInternalValueConverter();
StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, internalValueConverter, sharedAdmin);
StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, internalValueConverter, sharedAdmin, clientIdBase);
statusBackingStore.configure(config);
ConfigBackingStore configBackingStore = new KafkaConfigBackingStore(
internalValueConverter,
config,
configTransformer,
sharedAdmin);
sharedAdmin,
clientIdBase);
// Pass the shared admin to the distributed herder as an additional AutoCloseable object that should be closed when the
// herder is stopped. This is easier than having to track and own the lifecycle ourselves.

View File

@ -16,6 +16,7 @@
*/
package org.apache.kafka.connect.storage;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
@ -65,6 +66,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ExecutionException;
@ -261,6 +263,7 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
final Map<ConnectorTaskId, Map<String, String>> taskConfigs = new HashMap<>();
private final Supplier<TopicAdmin> topicAdminSupplier;
private SharedTopicAdmin ownTopicAdmin;
private final String clientId;
// Set of connectors where we saw a task commit with an incomplete set of task config updates, indicating the data
// is in an inconsistent state and we cannot safely use them until they have been refreshed.
@ -289,15 +292,16 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
@Deprecated
public KafkaConfigBackingStore(Converter converter, DistributedConfig config, WorkerConfigTransformer configTransformer) {
this(converter, config, configTransformer, null);
this(converter, config, configTransformer, null, "connect-distributed-");
}
public KafkaConfigBackingStore(Converter converter, DistributedConfig config, WorkerConfigTransformer configTransformer, Supplier<TopicAdmin> adminSupplier) {
public KafkaConfigBackingStore(Converter converter, DistributedConfig config, WorkerConfigTransformer configTransformer, Supplier<TopicAdmin> adminSupplier, String clientIdBase) {
this.lock = new Object();
this.started = false;
this.converter = converter;
this.offset = -1;
this.topicAdminSupplier = adminSupplier;
this.clientId = Objects.requireNonNull(clientIdBase) + "configs";
this.baseProducerProps = baseProducerProps(config);
// By default, Connect disables idempotent behavior for all producers, even though idempotence became
@ -390,6 +394,7 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
Map<String, Object> fencableProducerProps(DistributedConfig workerConfig) {
Map<String, Object> result = new HashMap<>(baseProducerProps(workerConfig));
result.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-leader");
// Always require producer acks to all to ensure durable writes
result.put(ProducerConfig.ACKS_CONFIG, "all");
// We can set this to 5 instead of 1 without risking reordering because we are using an idempotent producer
@ -663,14 +668,16 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
// package private for testing
KafkaBasedLog<String, byte[]> setupAndCreateKafkaBasedLog(String topic, final WorkerConfig config) {
Map<String, Object> producerProps = new HashMap<>(baseProducerProps);
String clusterId = config.kafkaClusterId();
Map<String, Object> originals = config.originals();
Map<String, Object> producerProps = new HashMap<>(baseProducerProps);
producerProps.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId);
Map<String, Object> consumerProps = new HashMap<>(originals);
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
consumerProps.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId);
ConnectUtils.addMetricsContextProperties(consumerProps, config, clusterId);
if (config.exactlyOnceSourceEnabled()) {
ConnectUtils.ensureProperty(
@ -682,6 +689,7 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
Map<String, Object> adminProps = new HashMap<>(originals);
ConnectUtils.addMetricsContextProperties(adminProps, config, clusterId);
adminProps.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId);
Supplier<TopicAdmin> adminSupplier;
if (topicAdminSupplier != null) {
adminSupplier = topicAdminSupplier;

View File

@ -16,6 +16,7 @@
*/
package org.apache.kafka.connect.storage;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
@ -86,7 +87,7 @@ public class KafkaOffsetBackingStore implements OffsetBackingStore {
Consumer<byte[], byte[]> consumer,
TopicAdmin topicAdmin
) {
return new KafkaOffsetBackingStore(() -> topicAdmin) {
return new KafkaOffsetBackingStore(() -> topicAdmin, KafkaOffsetBackingStore::noClientId) {
@Override
public void configure(final WorkerConfig config) {
this.exactlyOnce = config.exactlyOnceSourceEnabled();
@ -116,7 +117,7 @@ public class KafkaOffsetBackingStore implements OffsetBackingStore {
Consumer<byte[], byte[]> consumer,
TopicAdmin topicAdmin
) {
return new KafkaOffsetBackingStore(() -> topicAdmin) {
return new KafkaOffsetBackingStore(() -> topicAdmin, KafkaOffsetBackingStore::noClientId) {
@Override
public void configure(final WorkerConfig config) {
this.exactlyOnce = config.exactlyOnceSourceEnabled();
@ -133,9 +134,14 @@ public class KafkaOffsetBackingStore implements OffsetBackingStore {
};
}
private static String noClientId() {
throw new UnsupportedOperationException("This offset store should not instantiate any Kafka clients");
}
protected KafkaBasedLog<byte[], byte[]> offsetLog;
private final HashMap<ByteBuffer, ByteBuffer> data = new HashMap<>();
private final Supplier<TopicAdmin> topicAdminSupplier;
private final Supplier<String> clientIdBase;
private SharedTopicAdmin ownTopicAdmin;
protected boolean exactlyOnce;
@ -144,11 +150,12 @@ public class KafkaOffsetBackingStore implements OffsetBackingStore {
* store to instantiate and close its own {@link TopicAdmin} during {@link #configure(WorkerConfig)}
* and {@link #stop()}, respectively.
*
* @deprecated use {@link #KafkaOffsetBackingStore(Supplier)} instead
* @deprecated use {@link #KafkaOffsetBackingStore(Supplier, Supplier)} instead
*/
@Deprecated
public KafkaOffsetBackingStore() {
this.topicAdminSupplier = null;
this.clientIdBase = () -> "connect-distributed-";
}
/**
@ -158,9 +165,14 @@ public class KafkaOffsetBackingStore implements OffsetBackingStore {
* {@link TopicAdmin#close(Duration) closing} it when it is no longer needed.
* @param topicAdmin a {@link Supplier} for the {@link TopicAdmin} to use for this backing store;
* may not be null, and may not return null
* @param clientIdBase a {@link Supplier} that will be used to create a
* {@link CommonClientConfigs#CLIENT_ID_DOC client ID} for Kafka clients instantiated by this store;
* may not be null, and may not return null, but may throw {@link UnsupportedOperationException}
* if this offset store should not create its own Kafka clients
*/
public KafkaOffsetBackingStore(Supplier<TopicAdmin> topicAdmin) {
public KafkaOffsetBackingStore(Supplier<TopicAdmin> topicAdmin, Supplier<String> clientIdBase) {
this.topicAdminSupplier = Objects.requireNonNull(topicAdmin);
this.clientIdBase = Objects.requireNonNull(clientIdBase);
}
@ -173,6 +185,7 @@ public class KafkaOffsetBackingStore implements OffsetBackingStore {
this.exactlyOnce = config.exactlyOnceSourceEnabled();
String clusterId = config.kafkaClusterId();
String clientId = Objects.requireNonNull(clientIdBase.get()) + "offsets";
Map<String, Object> originals = config.originals();
Map<String, Object> producerProps = new HashMap<>(originals);
@ -185,11 +198,13 @@ public class KafkaOffsetBackingStore implements OffsetBackingStore {
// These settings might change when https://cwiki.apache.org/confluence/display/KAFKA/KIP-318%3A+Make+Kafka+Connect+Source+idempotent
// gets approved and scheduled for release.
producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "false");
producerProps.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId);
ConnectUtils.addMetricsContextProperties(producerProps, config, clusterId);
Map<String, Object> consumerProps = new HashMap<>(originals);
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
consumerProps.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId);
ConnectUtils.addMetricsContextProperties(consumerProps, config, clusterId);
if (config.exactlyOnceSourceEnabled()) {
ConnectUtils.ensureProperty(
@ -200,6 +215,7 @@ public class KafkaOffsetBackingStore implements OffsetBackingStore {
}
Map<String, Object> adminProps = new HashMap<>(originals);
adminProps.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId);
ConnectUtils.addMetricsContextProperties(adminProps, config, clusterId);
Supplier<TopicAdmin> adminSupplier;
if (topicAdminSupplier != null) {
@ -276,8 +292,8 @@ public class KafkaOffsetBackingStore implements OffsetBackingStore {
* <p>
* <b>Note:</b> if the now-deprecated {@link #KafkaOffsetBackingStore()} constructor was used to create
* this store, the underlying admin client allocated for interacting with the offsets topic will be closed.
* On the other hand, if the recommended {@link #KafkaOffsetBackingStore(Supplier)} constructor was used to
* create this store, the admin client derived from the given {@link Supplier} will not be closed and it is the
* On the other hand, if the recommended {@link #KafkaOffsetBackingStore(Supplier, Supplier)} constructor was
* used to create this store, the admin client derived from the given {@link Supplier} will not be closed and it is the
* caller's responsibility to manage its lifecycle accordingly.
*/
@Override

View File

@ -16,6 +16,7 @@
*/
package org.apache.kafka.connect.storage;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
@ -131,6 +132,7 @@ public class KafkaStatusBackingStore implements StatusBackingStore {
protected final Map<String, CacheEntry<ConnectorStatus>> connectors;
protected final ConcurrentMap<String, ConcurrentMap<String, TopicStatus>> topics;
private final Supplier<TopicAdmin> topicAdminSupplier;
private final String clientId;
private String statusTopic;
private KafkaBasedLog<String, byte[]> kafkaLog;
@ -139,16 +141,17 @@ public class KafkaStatusBackingStore implements StatusBackingStore {
@Deprecated
public KafkaStatusBackingStore(Time time, Converter converter) {
this(time, converter, null);
this(time, converter, null, "connect-distributed-");
}
public KafkaStatusBackingStore(Time time, Converter converter, Supplier<TopicAdmin> topicAdminSupplier) {
public KafkaStatusBackingStore(Time time, Converter converter, Supplier<TopicAdmin> topicAdminSupplier, String clientIdBase) {
this.time = time;
this.converter = converter;
this.tasks = new Table<>();
this.connectors = new HashMap<>();
this.topics = new ConcurrentHashMap<>();
this.topicAdminSupplier = topicAdminSupplier;
this.clientId = Objects.requireNonNull(clientIdBase) + "statuses";
}
// visible for testing
@ -176,14 +179,17 @@ public class KafkaStatusBackingStore implements StatusBackingStore {
// These settings might change when https://cwiki.apache.org/confluence/display/KAFKA/KIP-318%3A+Make+Kafka+Connect+Source+idempotent
// gets approved and scheduled for release.
producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "false"); // disable idempotence since retries is force to 0
producerProps.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId);
ConnectUtils.addMetricsContextProperties(producerProps, config, clusterId);
Map<String, Object> consumerProps = new HashMap<>(originals);
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
consumerProps.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId);
ConnectUtils.addMetricsContextProperties(consumerProps, config, clusterId);
Map<String, Object> adminProps = new HashMap<>(originals);
adminProps.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId);
ConnectUtils.addMetricsContextProperties(adminProps, config, clusterId);
Supplier<TopicAdmin> adminSupplier;
if (topicAdminSupplier != null) {
@ -208,7 +214,8 @@ public class KafkaStatusBackingStore implements StatusBackingStore {
this.kafkaLog = createKafkaBasedLog(statusTopic, producerProps, consumerProps, readCallback, topicDescription, adminSupplier);
}
private KafkaBasedLog<String, byte[]> createKafkaBasedLog(String topic, Map<String, Object> producerProps,
// Visible for testing
protected KafkaBasedLog<String, byte[]> createKafkaBasedLog(String topic, Map<String, Object> producerProps,
Map<String, Object> consumerProps,
Callback<ConsumerRecord<String, byte[]>> consumedCallback,
final NewTopic topicDescription, Supplier<TopicAdmin> adminSupplier) {

View File

@ -37,6 +37,8 @@ import java.util.function.Function;
import java.util.stream.Collector;
import java.util.stream.Collectors;
import static org.apache.kafka.clients.CommonClientConfigs.CLIENT_ID_CONFIG;
public final class ConnectUtils {
private static final Logger log = LoggerFactory.getLogger(ConnectUtils.class);
@ -171,4 +173,23 @@ public final class ConnectUtils {
return new ConnectException(message, t);
}
/**
* Create the base of a {@link CommonClientConfigs#CLIENT_ID_DOC client ID} that can be
* used for Kafka clients instantiated by this worker. Workers should append an extra identifier
* to the end of this base ID to include extra information on what they are using it for; for example,
* {@code clientIdBase(config) + "configs"} could be used as the client ID for a consumer, producer,
* or admin client used to interact with a worker's config topic.
* @param config the worker config; may not be null
* @return the base client ID for this worker; never null, never empty, and will always end in a
* hyphen ('-')
*/
public static String clientIdBase(WorkerConfig config) {
String result = Optional.ofNullable(config.groupId())
.orElse("connect");
String userSpecifiedClientId = config.getString(CLIENT_ID_CONFIG);
if (userSpecifiedClientId != null) {
result += "-" + userSpecifiedClientId;
}
return result + "-";
}
}

View File

@ -65,6 +65,7 @@ import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import static org.apache.kafka.clients.CommonClientConfigs.CLIENT_ID_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.ISOLATION_LEVEL_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.TRANSACTIONAL_ID_CONFIG;
@ -84,6 +85,7 @@ import static org.junit.Assert.assertTrue;
@PrepareForTest({KafkaConfigBackingStore.class, WorkerConfig.class})
@PowerMockIgnore({"javax.management.*", "javax.crypto.*"})
public class KafkaConfigBackingStoreTest {
private static final String CLIENT_ID_BASE = "test-client-id-";
private static final String TOPIC = "connect-configs";
private static final short TOPIC_REPLICATION_FACTOR = 5;
private static final Map<String, String> DEFAULT_CONFIG_STORAGE_PROPS = new HashMap<>();
@ -183,7 +185,7 @@ public class KafkaConfigBackingStoreTest {
configStorage = PowerMock.createPartialMock(
KafkaConfigBackingStore.class,
new String[]{"createKafkaBasedLog", "createFencableProducer"},
converter, config, null);
converter, config, null, null, CLIENT_ID_BASE);
Whitebox.setInternalState(configStorage, "configLog", storeLog);
configStorage.setUpdateListener(configUpdateListener);
}
@ -1416,6 +1418,27 @@ public class KafkaConfigBackingStoreTest {
PowerMock.verifyAll();
}
@Test
public void testClientIds() throws Exception {
Map<String, String> workerProps = new HashMap<>(DEFAULT_CONFIG_STORAGE_PROPS);
workerProps.put(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, "enabled");
DistributedConfig config = new DistributedConfig(workerProps);
createStore(config, storeLog);
expectConfigure();
PowerMock.replayAll();
configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
Map<String, Object> fencableProducerProps = configStorage.fencableProducerProps(config);
final String expectedClientId = CLIENT_ID_BASE + "configs";
assertEquals(expectedClientId, capturedProducerProps.getValue().get(CLIENT_ID_CONFIG));
assertEquals(expectedClientId, capturedConsumerProps.getValue().get(CLIENT_ID_CONFIG));
assertEquals(expectedClientId + "-leader", fencableProducerProps.get(CLIENT_ID_CONFIG));
PowerMock.verifyAll();
}
private void expectConfigure() throws Exception {
PowerMock.expectPrivate(configStorage, "createKafkaBasedLog",
EasyMock.capture(capturedTopic), EasyMock.capture(capturedProducerProps),

View File

@ -29,6 +29,7 @@ import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.KafkaBasedLog;
import org.apache.kafka.connect.util.SharedTopicAdmin;
import org.apache.kafka.connect.util.TopicAdmin;
import org.easymock.Capture;
import org.easymock.EasyMock;
@ -56,6 +57,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import static org.apache.kafka.clients.CommonClientConfigs.CLIENT_ID_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.ISOLATION_LEVEL_CONFIG;
import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG;
import static org.junit.Assert.assertEquals;
@ -69,6 +71,7 @@ import static org.junit.Assert.fail;
@PrepareForTest({KafkaOffsetBackingStore.class, WorkerConfig.class})
@PowerMockIgnore({"javax.management.*", "javax.crypto.*"})
public class KafkaOffsetBackingStoreTest {
private static final String CLIENT_ID_BASE = "test-client-id-";
private static final String TOPIC = "connect-offsets";
private static final short TOPIC_PARTITIONS = 2;
private static final short TOPIC_REPLICATION_FACTOR = 5;
@ -116,7 +119,17 @@ public class KafkaOffsetBackingStoreTest {
@Before
public void setUp() throws Exception {
store = PowerMock.createPartialMockAndInvokeDefaultConstructor(KafkaOffsetBackingStore.class, "createKafkaBasedLog");
Supplier<SharedTopicAdmin> adminSupplier = () -> {
fail("Should not attempt to instantiate admin in these tests");
// Should never be reached; only add this thrown exception to satisfy the compiler
throw new AssertionError();
};
Supplier<String> clientIdBase = () -> CLIENT_ID_BASE;
store = PowerMock.createPartialMock(
KafkaOffsetBackingStore.class,
new String[] {"createKafkaBasedLog"},
adminSupplier, clientIdBase
);
}
@Test
@ -469,6 +482,24 @@ public class KafkaOffsetBackingStoreTest {
PowerMock.verifyAll();
}
@Test
public void testClientIds() throws Exception {
Map<String, String> workerProps = new HashMap<>(DEFAULT_PROPS);
DistributedConfig config = new DistributedConfig(workerProps);
expectConfigure();
expectClusterId();
PowerMock.replayAll();
store.configure(config);
final String expectedClientId = CLIENT_ID_BASE + "offsets";
assertEquals(expectedClientId, capturedProducerProps.getValue().get(CLIENT_ID_CONFIG));
assertEquals(expectedClientId, capturedConsumerProps.getValue().get(CLIENT_ID_CONFIG));
PowerMock.verifyAll();
}
private void expectConfigure() throws Exception {
PowerMock.expectPrivate(store, "createKafkaBasedLog", EasyMock.capture(capturedTopic), EasyMock.capture(capturedProducerProps),
EasyMock.capture(capturedConsumerProps), EasyMock.capture(capturedConsumedCallback),

View File

@ -16,6 +16,7 @@
*/
package org.apache.kafka.connect.storage;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.common.config.ConfigException;
@ -33,6 +34,7 @@ import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.KafkaBasedLog;
import org.apache.kafka.connect.util.TopicAdmin;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
@ -42,7 +44,9 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.function.Supplier;
import static org.apache.kafka.clients.CommonClientConfigs.CLIENT_ID_CONFIG;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doAnswer;
@ -354,6 +358,31 @@ public class KafkaStatusBackingStoreTest {
assertTrue(store.getAll(CONNECTOR).isEmpty());
}
@Test
public void testClientIds() {
String clientIdBase = "test-client-id-";
Supplier<TopicAdmin> topicAdminSupplier = () -> mock(TopicAdmin.class);
Map<String, Object> capturedProducerProps = new HashMap<>();
Map<String, Object> capturedConsumerProps = new HashMap<>();
store = new KafkaStatusBackingStore(new MockTime(), converter, topicAdminSupplier, clientIdBase) {
@Override
protected KafkaBasedLog<String, byte[]> createKafkaBasedLog(String topic, Map<String, Object> producerProps, Map<String, Object> consumerProps, org.apache.kafka.connect.util.Callback<ConsumerRecord<String, byte[]>> consumedCallback, NewTopic topicDescription, Supplier<TopicAdmin> adminSupplier) {
capturedProducerProps.putAll(producerProps);
capturedConsumerProps.putAll(consumerProps);
return kafkaBasedLog;
}
};
when(workerConfig.getString(DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG)).thenReturn("connect-statuses");
store.configure(workerConfig);
final String expectedClientId = clientIdBase + "statuses";
assertEquals(expectedClientId, capturedProducerProps.get(CLIENT_ID_CONFIG));
assertEquals(expectedClientId, capturedConsumerProps.get(CLIENT_ID_CONFIG));
}
private static ConsumerRecord<String, byte[]> consumerRecord(long offset, String key, byte[] value) {
return new ConsumerRecord<>(STATUS_TOPIC, 0, offset, System.currentTimeMillis(),
TimestampType.CREATE_TIME, 0, 0, key, value, new RecordHeaders(), Optional.empty());

View File

@ -27,8 +27,11 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import static org.apache.kafka.clients.CommonClientConfigs.CLIENT_ID_CONFIG;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class ConnectUtilsTest {
@ -134,4 +137,30 @@ public class ConnectUtilsTest {
assertEquals(Collections.singletonMap("\u1984", "big brother"), props);
}
@Test
public void testClientIdBase() {
String groupId = "connect-cluster";
String userSpecifiedClientId = "worker-57";
String expectedClientIdBase = groupId + "-" + userSpecifiedClientId + "-";
assertClientIdBase(groupId, userSpecifiedClientId, expectedClientIdBase);
expectedClientIdBase = groupId + "-";
assertClientIdBase(groupId, null, expectedClientIdBase);
expectedClientIdBase = "connect-";
assertClientIdBase(null, null, expectedClientIdBase);
expectedClientIdBase = "connect-" + userSpecifiedClientId + "-";
assertClientIdBase(null, userSpecifiedClientId, expectedClientIdBase);
}
private void assertClientIdBase(String groupId, String userSpecifiedClientId, String expectedClientIdBase) {
WorkerConfig config = mock(WorkerConfig.class);
when(config.groupId()).thenReturn(groupId);
when(config.getString(CLIENT_ID_CONFIG)).thenReturn(userSpecifiedClientId);
String actualClientIdBase = ConnectUtils.clientIdBase(config);
assertEquals(expectedClientIdBase, actualClientIdBase);
}
}