KAFKA-14160: Streamline clusterId retrieval in Connect (#12536)

Cache the Kafka cluster Id once it has been retrieved to avoid creating many Admin clients at startup.

Reviewers: Chris Egerton <fearthecellos@gmail.com>
This commit is contained in:
Mickael Maison 2022-08-23 17:09:22 +02:00 committed by GitHub
parent 6df57679d0
commit 4bd3fd840d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 160 additions and 127 deletions

View File

@ -233,7 +233,7 @@ public class MirrorMaker {
Plugins plugins = new Plugins(workerProps);
plugins.compareAndSwapWithDelegatingLoader();
DistributedConfig distributedConfig = new DistributedConfig(workerProps);
String kafkaClusterId = ConnectUtils.lookupKafkaClusterId(distributedConfig);
String kafkaClusterId = distributedConfig.kafkaClusterId();
// Create the admin client to be shared by all backing stores for this herder
Map<String, Object> adminProps = new HashMap<>(distributedConfig.originals());
ConnectUtils.addMetricsContextProperties(adminProps, distributedConfig, kafkaClusterId);

View File

@ -94,7 +94,7 @@ public class ConnectDistributed {
plugins.compareAndSwapWithDelegatingLoader();
DistributedConfig config = new DistributedConfig(workerProps);
String kafkaClusterId = ConnectUtils.lookupKafkaClusterId(config);
String kafkaClusterId = config.kafkaClusterId();
log.debug("Kafka cluster ID: {}", kafkaClusterId);
RestServer rest = new RestServer(config);

View File

@ -32,7 +32,7 @@ import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
import org.apache.kafka.connect.runtime.standalone.StandaloneHerder;
import org.apache.kafka.connect.storage.FileOffsetBackingStore;
import org.apache.kafka.connect.util.ConnectUtils;
import org.apache.kafka.connect.storage.OffsetBackingStore;
import org.apache.kafka.connect.util.FutureCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -79,7 +79,7 @@ public class ConnectStandalone {
plugins.compareAndSwapWithDelegatingLoader();
StandaloneConfig config = new StandaloneConfig(workerProps);
String kafkaClusterId = ConnectUtils.lookupKafkaClusterId(config);
String kafkaClusterId = config.kafkaClusterId();
log.debug("Kafka cluster ID: {}", kafkaClusterId);
RestServer rest = new RestServer(config);
@ -88,10 +88,13 @@ public class ConnectStandalone {
URI advertisedUrl = rest.advertisedUrl();
String workerId = advertisedUrl.getHost() + ":" + advertisedUrl.getPort();
OffsetBackingStore offsetBackingStore = new FileOffsetBackingStore();
offsetBackingStore.configure(config);
ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy = plugins.newPlugin(
config.getString(WorkerConfig.CONNECTOR_CLIENT_POLICY_CLASS_CONFIG),
config, ConnectorClientConfigOverridePolicy.class);
Worker worker = new Worker(workerId, time, plugins, config, new FileOffsetBackingStore(),
Worker worker = new Worker(workerId, time, plugins, config, offsetBackingStore,
connectorClientConfigOverridePolicy);
Herder herder = new StandaloneHerder(worker, kafkaClusterId, connectorClientConfigOverridePolicy);

View File

@ -153,7 +153,7 @@ public class Worker {
ExecutorService executorService,
ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy
) {
this.kafkaClusterId = ConnectUtils.lookupKafkaClusterId(config);
this.kafkaClusterId = config.kafkaClusterId();
this.metrics = new ConnectMetrics(workerId, config, time, kafkaClusterId);
this.executor = executorService;
this.workerId = workerId;
@ -168,7 +168,6 @@ public class Worker {
this.internalValueConverter = plugins.newInternalConverter(false, JsonConverter.class.getName(), internalConverterConfig);
this.globalOffsetBackingStore = globalOffsetBackingStore;
this.globalOffsetBackingStore.configure(config);
this.workerConfigTransformer = initConfigTransformer();

View File

@ -18,6 +18,8 @@ package org.apache.kafka.connect.runtime;
import org.apache.kafka.clients.ClientDnsLookup;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
@ -27,6 +29,7 @@ import org.apache.kafka.common.config.SslClientAuth;
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.storage.SimpleHeaderConverter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -38,6 +41,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.regex.Pattern;
import org.eclipse.jetty.util.StringUtil;
@ -188,7 +192,7 @@ public class WorkerConfig extends AbstractConfig {
public static final String CONNECTOR_CLIENT_POLICY_CLASS_CONFIG = "connector.client.config.override.policy";
public static final String CONNECTOR_CLIENT_POLICY_CLASS_DOC =
"Class name or alias of implementation of <code>ConnectorClientConfigOverridePolicy</code>. Defines what client configurations can be "
+ "overriden by the connector. The default implementation is `All`, meaning connector configurations can override all client properties. "
+ "overridden by the connector. The default implementation is `All`, meaning connector configurations can override all client properties. "
+ "The other possible policies in the framework include `None` to disallow connectors from overriding client properties, "
+ "and `Principal` to allow connectors to override only client principals.";
public static final String CONNECTOR_CLIENT_POLICY_CLASS_DEFAULT = "All";
@ -306,6 +310,35 @@ public class WorkerConfig extends AbstractConfig {
.withClientSslSupport();
}
private String kafkaClusterId;
public static String lookupKafkaClusterId(WorkerConfig config) {
log.info("Creating Kafka admin client");
try (Admin adminClient = Admin.create(config.originals())) {
return lookupKafkaClusterId(adminClient);
}
}
static String lookupKafkaClusterId(Admin adminClient) {
log.debug("Looking up Kafka cluster ID");
try {
KafkaFuture<String> clusterIdFuture = adminClient.describeCluster().clusterId();
if (clusterIdFuture == null) {
log.info("Kafka cluster version is too old to return cluster ID");
return null;
}
log.debug("Fetching Kafka cluster ID");
String kafkaClusterId = clusterIdFuture.get();
log.info("Kafka cluster ID: {}", kafkaClusterId);
return kafkaClusterId;
} catch (InterruptedException e) {
throw new ConnectException("Unexpectedly interrupted when looking up Kafka cluster info", e);
} catch (ExecutionException e) {
throw new ConnectException("Failed to connect to and describe Kafka cluster. "
+ "Check worker's broker connection and security properties.", e);
}
}
private void logInternalConverterRemovalWarnings(Map<String, String> props) {
List<String> removedProperties = new ArrayList<>();
for (String property : Arrays.asList("internal.key.converter", "internal.value.converter")) {
@ -321,7 +354,7 @@ public class WorkerConfig extends AbstractConfig {
+ "and specifying them will have no effect. "
+ "Instead, an instance of the JsonConverter with schemas.enable "
+ "set to false will be used. For more information, please visit "
+ "http://kafka.apache.org/documentation/#upgrade and consult the upgrade notes"
+ "https://kafka.apache.org/documentation/#upgrade and consult the upgrade notes"
+ "for the 3.0 release.",
removedProperties);
}
@ -409,6 +442,13 @@ public class WorkerConfig extends AbstractConfig {
return null;
}
public String kafkaClusterId() {
if (kafkaClusterId == null) {
kafkaClusterId = lookupKafkaClusterId(this);
}
return kafkaClusterId;
}
@Override
protected Map<String, Object> postProcessParsedConfig(final Map<String, Object> parsedValues) {
return CommonClientConfigs.postProcessReconnectBackoffConfigs(this, parsedValues);
@ -434,7 +474,7 @@ public class WorkerConfig extends AbstractConfig {
String[] configTokens = config.trim().split("\\s+", 2);
if (configTokens.length != 2) {
throw new ConfigException(String.format("Invalid format of header config '%s'. "
+ "Expected: '[ation] [header name]:[header value]'", config));
+ "Expected: '[action] [header name]:[header value]'", config));
}
// validate action

View File

@ -39,7 +39,6 @@ import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.storage.ConfigBackingStore;
import org.apache.kafka.connect.util.ConnectUtils;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.slf4j.Logger;
@ -62,12 +61,9 @@ public class WorkerGroupMember {
private static final String JMX_PREFIX = "kafka.connect";
private final Logger log;
private final Time time;
private final String clientId;
private final ConsumerNetworkClient client;
private final Metrics metrics;
private final Metadata metadata;
private final long retryBackoffMs;
private final WorkerCoordinator coordinator;
private boolean stopped = false;
@ -80,7 +76,6 @@ public class WorkerGroupMember {
String clientId,
LogContext logContext) {
try {
this.time = time;
this.clientId = clientId;
this.log = logContext.logger(WorkerGroupMember.class);
@ -98,23 +93,23 @@ public class WorkerGroupMember {
Map<String, Object> contextLabels = new HashMap<>();
contextLabels.putAll(config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX));
contextLabels.put(WorkerConfig.CONNECT_KAFKA_CLUSTER_ID, ConnectUtils.lookupKafkaClusterId(config));
contextLabels.put(WorkerConfig.CONNECT_KAFKA_CLUSTER_ID, config.kafkaClusterId());
contextLabels.put(WorkerConfig.CONNECT_GROUP_ID, config.getString(DistributedConfig.GROUP_ID_CONFIG));
MetricsContext metricsContext = new KafkaMetricsContext(JMX_PREFIX, contextLabels);
this.metrics = new Metrics(metricConfig, reporters, time, metricsContext);
this.retryBackoffMs = config.getLong(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG);
this.metadata = new Metadata(retryBackoffMs, config.getLong(CommonClientConfigs.METADATA_MAX_AGE_CONFIG),
long retryBackoffMs = config.getLong(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG);
Metadata metadata = new Metadata(retryBackoffMs, config.getLong(CommonClientConfigs.METADATA_MAX_AGE_CONFIG),
logContext, new ClusterResourceListeners());
List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(
config.getList(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG),
config.getString(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG));
this.metadata.bootstrap(addresses);
metadata.bootstrap(addresses);
String metricGrpPrefix = "connect";
ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config, time, logContext);
NetworkClient netClient = new NetworkClient(
new Selector(config.getLong(CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG), metrics, time, metricGrpPrefix, channelBuilder, logContext),
this.metadata,
metadata,
clientId,
100, // a fixed large enough value will suffice
config.getLong(CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG),
@ -142,7 +137,7 @@ public class WorkerGroupMember {
this.client,
metrics,
metricGrpPrefix,
this.time,
time,
restUrl,
configStorage,
listener,

View File

@ -378,7 +378,7 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
private Map<String, Object> baseProducerProps(WorkerConfig workerConfig) {
Map<String, Object> producerProps = new HashMap<>(workerConfig.originals());
String kafkaClusterId = ConnectUtils.lookupKafkaClusterId(workerConfig);
String kafkaClusterId = workerConfig.kafkaClusterId();
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
producerProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.MAX_VALUE);
@ -665,7 +665,7 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
KafkaBasedLog<String, byte[]> setupAndCreateKafkaBasedLog(String topic, final WorkerConfig config) {
Map<String, Object> producerProps = new HashMap<>(baseProducerProps);
String clusterId = ConnectUtils.lookupKafkaClusterId(config);
String clusterId = config.kafkaClusterId();
Map<String, Object> originals = config.originals();
Map<String, Object> consumerProps = new HashMap<>(originals);

View File

@ -172,7 +172,7 @@ public class KafkaOffsetBackingStore implements OffsetBackingStore {
this.exactlyOnce = config.exactlyOnceSourceEnabled();
String clusterId = ConnectUtils.lookupKafkaClusterId(config);
String clusterId = config.kafkaClusterId();
Map<String, Object> originals = config.originals();
Map<String, Object> producerProps = new HashMap<>(originals);

View File

@ -164,7 +164,7 @@ public class KafkaStatusBackingStore implements StatusBackingStore {
if (this.statusTopic == null || this.statusTopic.trim().length() == 0)
throw new ConfigException("Must specify topic for connector status.");
String clusterId = ConnectUtils.lookupKafkaClusterId(config);
String clusterId = config.kafkaClusterId();
Map<String, Object> originals = config.originals();
Map<String, Object> producerProps = new HashMap<>(originals);
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

View File

@ -17,8 +17,6 @@
package org.apache.kafka.connect.util;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.InvalidRecordException;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.connect.connector.Connector;
@ -35,7 +33,6 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import java.util.stream.Collector;
import java.util.stream.Collectors;
@ -52,33 +49,6 @@ public final class ConnectUtils {
throw new InvalidRecordException(String.format("Invalid record timestamp %d", timestamp));
}
public static String lookupKafkaClusterId(WorkerConfig config) {
log.info("Creating Kafka admin client");
try (Admin adminClient = Admin.create(config.originals())) {
return lookupKafkaClusterId(adminClient);
}
}
static String lookupKafkaClusterId(Admin adminClient) {
log.debug("Looking up Kafka cluster ID");
try {
KafkaFuture<String> clusterIdFuture = adminClient.describeCluster().clusterId();
if (clusterIdFuture == null) {
log.info("Kafka cluster version is too old to return cluster ID");
return null;
}
log.debug("Fetching Kafka cluster ID");
String kafkaClusterId = clusterIdFuture.get();
log.info("Kafka cluster ID: {}", kafkaClusterId);
return kafkaClusterId;
} catch (InterruptedException e) {
throw new ConnectException("Unexpectedly interrupted when looking up Kafka cluster info", e);
} catch (ExecutionException e) {
throw new ConnectException("Failed to connect to and describe Kafka cluster. "
+ "Check worker's broker connection and security properties.", e);
}
}
/**
* Ensure that the {@link Map properties} contain an expected value for the given key, inserting the
* expected value into the properties if necessary.

View File

@ -17,9 +17,16 @@
package org.apache.kafka.connect.runtime;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.MockAdminClient;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
import org.apache.kafka.connect.errors.ConnectException;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.MockedStatic;
import org.mockito.internal.stubbing.answers.CallsRealMethods;
import java.util.Arrays;
import java.util.HashMap;
@ -31,6 +38,9 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.times;
public class WorkerConfigTest {
private static final List<String> VALID_HEADER_CONFIGS = Arrays.asList(
@ -58,6 +68,20 @@ public class WorkerConfigTest {
"set X-Frame-Options:DENY, add :no-cache, no-store, must-revalidate "
);
private static final String CLUSTER_ID = "cluster-id";
private MockedStatic<WorkerConfig> workerConfigMockedStatic;
@Before
public void setup() {
workerConfigMockedStatic = mockStatic(WorkerConfig.class, new CallsRealMethods());
workerConfigMockedStatic.when(() -> WorkerConfig.lookupKafkaClusterId(any(WorkerConfig.class))).thenReturn(CLUSTER_ID);
}
@After
public void teardown() {
workerConfigMockedStatic.close();
}
@Test
public void testListenersConfigAllowedValues() {
Map<String, String> props = baseProps();
@ -157,6 +181,50 @@ public class WorkerConfigTest {
assertTrue(ce.getMessage().contains(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG));
}
@Test
public void testLookupKafkaClusterId() {
final Node broker1 = new Node(0, "dummyHost-1", 1234);
final Node broker2 = new Node(1, "dummyHost-2", 1234);
List<Node> cluster = Arrays.asList(broker1, broker2);
MockAdminClient adminClient = new MockAdminClient.Builder().
brokers(cluster).build();
assertEquals(MockAdminClient.DEFAULT_CLUSTER_ID, WorkerConfig.lookupKafkaClusterId(adminClient));
}
@Test
public void testLookupNullKafkaClusterId() {
final Node broker1 = new Node(0, "dummyHost-1", 1234);
final Node broker2 = new Node(1, "dummyHost-2", 1234);
List<Node> cluster = Arrays.asList(broker1, broker2);
MockAdminClient adminClient = new MockAdminClient.Builder().
brokers(cluster).clusterId(null).build();
assertNull(WorkerConfig.lookupKafkaClusterId(adminClient));
}
@Test
public void testLookupKafkaClusterIdTimeout() {
final Node broker1 = new Node(0, "dummyHost-1", 1234);
final Node broker2 = new Node(1, "dummyHost-2", 1234);
List<Node> cluster = Arrays.asList(broker1, broker2);
MockAdminClient adminClient = new MockAdminClient.Builder().
brokers(cluster).build();
adminClient.timeoutNextRequest(1);
assertThrows(ConnectException.class, () -> WorkerConfig.lookupKafkaClusterId(adminClient));
}
@Test
public void testKafkaClusterId() {
Map<String, String> props = baseProps();
WorkerConfig config = new WorkerConfig(WorkerConfig.baseConfigDef(), props);
assertEquals(CLUSTER_ID, config.kafkaClusterId());
workerConfigMockedStatic.verify(() -> WorkerConfig.lookupKafkaClusterId(any(WorkerConfig.class)), times(1));
// next calls hit the cache
assertEquals(CLUSTER_ID, config.kafkaClusterId());
workerConfigMockedStatic.verify(() -> WorkerConfig.lookupKafkaClusterId(any(WorkerConfig.class)), times(1));
}
private void assertInvalidHeaderConfig(String config) {
assertThrows(ConfigException.class, () -> WorkerConfig.validateHttpResponseHeaderConfig(config));
}

View File

@ -59,7 +59,6 @@ import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.HeaderConverter;
import org.apache.kafka.connect.storage.OffsetBackingStore;
import org.apache.kafka.connect.storage.StatusBackingStore;
import org.apache.kafka.connect.util.ConnectUtils;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.FutureCallback;
import org.apache.kafka.connect.util.ParameterizedTest;
@ -201,7 +200,7 @@ public class WorkerTest {
private final boolean enableTopicCreation;
private MockedStatic<Plugins> pluginsMockedStatic;
private MockedStatic<ConnectUtils> connectUtilsMockedStatic;
private MockedStatic<WorkerConfig> workerConfigMockedStatic;
private MockedConstruction<WorkerSourceTask> sourceTaskMockedConstruction;
private MockitoSession mockitoSession;
@ -263,8 +262,8 @@ public class WorkerTest {
pluginsMockedStatic = mockStatic(Plugins.class);
// pass through things that aren't explicitly mocked out
connectUtilsMockedStatic = mockStatic(ConnectUtils.class, new CallsRealMethods());
connectUtilsMockedStatic.when(() -> ConnectUtils.lookupKafkaClusterId(any(WorkerConfig.class))).thenReturn(CLUSTER_ID);
workerConfigMockedStatic = mockStatic(WorkerConfig.class, new CallsRealMethods());
workerConfigMockedStatic.when(() -> WorkerConfig.lookupKafkaClusterId(any(WorkerConfig.class))).thenReturn(CLUSTER_ID);
// Make calls to new WorkerSourceTask() return a mock to avoid the source task trying to connect to a broker.
sourceTaskMockedConstruction = mockConstructionWithAnswer(WorkerSourceTask.class, invocation -> {
@ -289,7 +288,7 @@ public class WorkerTest {
// Ideal would be to use try-with-resources in an individual test, but it introduced a rather large level of
// indentation of most test bodies, hence sticking with setup() / teardown()
pluginsMockedStatic.close();
connectUtilsMockedStatic.close();
workerConfigMockedStatic.close();
sourceTaskMockedConstruction.close();
mockitoSession.finishMocking();
@ -309,7 +308,7 @@ public class WorkerTest {
pluginsMockedStatic.when(() -> Plugins.compareAndSwapLoaders(pluginLoader)).thenReturn(delegatingLoader);
pluginsMockedStatic.when(() -> Plugins.compareAndSwapLoaders(delegatingLoader)).thenReturn(pluginLoader);
connectUtilsMockedStatic.when(() -> ConnectUtils.lookupKafkaClusterId(any(WorkerConfig.class)))
workerConfigMockedStatic.when(() -> WorkerConfig.lookupKafkaClusterId(any(WorkerConfig.class)))
.thenReturn(CLUSTER_ID);
worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, noneConnectorClientConfigOverridePolicy);
@ -359,7 +358,7 @@ public class WorkerTest {
pluginsMockedStatic.verify(() -> Plugins.compareAndSwapLoaders(pluginLoader), times(2));
pluginsMockedStatic.verify(() -> Plugins.compareAndSwapLoaders(delegatingLoader), times(2));
connectUtilsMockedStatic.verify(() -> ConnectUtils.lookupKafkaClusterId(any(WorkerConfig.class)));
workerConfigMockedStatic.verify(() -> WorkerConfig.lookupKafkaClusterId(any(WorkerConfig.class)));
verify(sourceConnector).stop();
verify(connectorStatusListener).onShutdown(CONNECTOR_ID);
@ -387,7 +386,7 @@ public class WorkerTest {
when(plugins.delegatingLoader()).thenReturn(delegatingLoader);
when(delegatingLoader.connectorLoader(nonConnectorClass)).thenReturn(delegatingLoader);
pluginsMockedStatic.when(() -> Plugins.compareAndSwapLoaders(delegatingLoader)).thenReturn(delegatingLoader);
connectUtilsMockedStatic.when(() -> ConnectUtils.lookupKafkaClusterId(any(WorkerConfig.class)))
workerConfigMockedStatic.when(() -> WorkerConfig.lookupKafkaClusterId(any(WorkerConfig.class)))
.thenReturn("test-cluster");
when(plugins.newConnector(anyString())).thenThrow(exception);
@ -422,7 +421,7 @@ public class WorkerTest {
verify(plugins).newConnector(anyString());
verify(connectorStatusListener).onFailure(eq(CONNECTOR_ID), any(ConnectException.class));
pluginsMockedStatic.verify(() -> Plugins.compareAndSwapLoaders(delegatingLoader), times(2));
connectUtilsMockedStatic.verify(() -> ConnectUtils.lookupKafkaClusterId(any(WorkerConfig.class)));
workerConfigMockedStatic.verify(() -> WorkerConfig.lookupKafkaClusterId(any(WorkerConfig.class)));
}
@Test
@ -437,7 +436,7 @@ public class WorkerTest {
pluginsMockedStatic.when(() -> Plugins.compareAndSwapLoaders(pluginLoader)).thenReturn(delegatingLoader);
pluginsMockedStatic.when(() -> Plugins.compareAndSwapLoaders(delegatingLoader)).thenReturn(pluginLoader);
connectUtilsMockedStatic.when(() -> ConnectUtils.lookupKafkaClusterId(any(WorkerConfig.class)))
workerConfigMockedStatic.when(() -> WorkerConfig.lookupKafkaClusterId(any(WorkerConfig.class)))
.thenReturn("test-cluster");
connectorProps.put(CONNECTOR_CLASS_CONFIG, connectorAlias);
@ -479,7 +478,7 @@ public class WorkerTest {
pluginsMockedStatic.verify(() -> Plugins.compareAndSwapLoaders(pluginLoader), times(2));
pluginsMockedStatic.verify(() -> Plugins.compareAndSwapLoaders(delegatingLoader), times(2));
connectUtilsMockedStatic.verify(() -> ConnectUtils.lookupKafkaClusterId(any(WorkerConfig.class)));
workerConfigMockedStatic.verify(() -> WorkerConfig.lookupKafkaClusterId(any(WorkerConfig.class)));
}
@Test
@ -529,7 +528,7 @@ public class WorkerTest {
verify(ctx).close();
pluginsMockedStatic.verify(() -> Plugins.compareAndSwapLoaders(delegatingLoader), times(2));
connectUtilsMockedStatic.verify(() -> ConnectUtils.lookupKafkaClusterId(any(WorkerConfig.class)));
workerConfigMockedStatic.verify(() -> WorkerConfig.lookupKafkaClusterId(any(WorkerConfig.class)));
}
@Test
@ -676,7 +675,7 @@ public class WorkerTest {
pluginsMockedStatic.verify(() -> Plugins.compareAndSwapLoaders(pluginLoader), times(2));
pluginsMockedStatic.verify(() -> Plugins.compareAndSwapLoaders(delegatingLoader), times(2));
connectUtilsMockedStatic.verify(() -> ConnectUtils.lookupKafkaClusterId(any(WorkerConfig.class)));
workerConfigMockedStatic.verify(() -> WorkerConfig.lookupKafkaClusterId(any(WorkerConfig.class)));
}
@Test
@ -734,7 +733,7 @@ public class WorkerTest {
pluginsMockedStatic.verify(() -> Plugins.compareAndSwapLoaders(pluginLoader), times(2));
pluginsMockedStatic.verify(() -> Plugins.compareAndSwapLoaders(delegatingLoader), times(2));
connectUtilsMockedStatic.verify(() -> ConnectUtils.lookupKafkaClusterId(any(WorkerConfig.class)));
workerConfigMockedStatic.verify(() -> WorkerConfig.lookupKafkaClusterId(any(WorkerConfig.class)));
}
@Test
@ -806,7 +805,7 @@ public class WorkerTest {
pluginsMockedStatic.verify(() -> Plugins.compareAndSwapLoaders(pluginLoader), times(2));
pluginsMockedStatic.verify(() -> Plugins.compareAndSwapLoaders(delegatingLoader), times(2));
connectUtilsMockedStatic.verify(() -> ConnectUtils.lookupKafkaClusterId(any(WorkerConfig.class)));
workerConfigMockedStatic.verify(() -> WorkerConfig.lookupKafkaClusterId(any(WorkerConfig.class)));
}
@Test
@ -918,7 +917,7 @@ public class WorkerTest {
mockInternalConverters();
mockFileConfigProvider();
connectUtilsMockedStatic.when(() -> ConnectUtils.lookupKafkaClusterId(any())).thenReturn(CLUSTER_ID);
workerConfigMockedStatic.when(() -> WorkerConfig.lookupKafkaClusterId(any(WorkerConfig.class))).thenReturn(CLUSTER_ID);
worker = new Worker(WORKER_ID,
new MockTime(),
@ -935,7 +934,7 @@ public class WorkerTest {
assertEquals(1L, (long) metricGroup.taskCounter("c2").metricValue(0L));
assertEquals(0L, (long) metricGroup.taskCounter("fakeConnector").metricValue(0L));
connectUtilsMockedStatic.verify(() -> ConnectUtils.lookupKafkaClusterId(any()));
workerConfigMockedStatic.verify(() -> WorkerConfig.lookupKafkaClusterId(any(WorkerConfig.class)));
}
@Test
@ -1250,7 +1249,7 @@ public class WorkerTest {
Map<String, String> props = new HashMap<>(workerProps);
props.put("admin.client.id", "testid");
props.put("admin.metadata.max.age.ms", "5000");
props.put("producer.bootstrap.servers", "cbeauho.com");
props.put("producer.bootstrap.servers", "localhost:1234");
props.put("consumer.bootstrap.servers", "localhost:4761");
WorkerConfig configWithOverrides = new StandaloneConfig(props);
@ -1919,7 +1918,6 @@ public class WorkerTest {
private void verifyStorage() {
verify(offsetBackingStore).configure(any(WorkerConfig.class));
verify(offsetBackingStore).start();
verify(herder).statusBackingStore();
verify(offsetBackingStore).stop();

View File

@ -25,7 +25,6 @@ import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.runtime.MockConnectMetrics;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.storage.ConfigBackingStore;
import org.apache.kafka.connect.util.ConnectUtils;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
@ -65,11 +64,10 @@ public class WorkerGroupMemberTest {
LogContext logContext = new LogContext("[Worker clientId=client-1 + groupId= group-1]");
try (MockedStatic<ConnectUtils> utilities = mockStatic(ConnectUtils.class)) {
utilities.when(() -> ConnectUtils.lookupKafkaClusterId(any())).thenReturn("cluster-1");
try (MockedStatic<WorkerConfig> utilities = mockStatic(WorkerConfig.class)) {
utilities.when(() -> WorkerConfig.lookupKafkaClusterId(any())).thenReturn("cluster-1");
member = new WorkerGroupMember(config, "", configBackingStore, null, Time.SYSTEM, "client-1", logContext);
utilities.verify(() -> ConnectUtils.lookupKafkaClusterId(any()));
utilities.verify(() -> WorkerConfig.lookupKafkaClusterId(any()));
}
boolean entered = false;

View File

@ -33,9 +33,9 @@ import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.runtime.RestartRequest;
import org.apache.kafka.connect.runtime.TargetState;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.ConnectUtils;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.KafkaBasedLog;
import org.apache.kafka.connect.util.TestFuture;
@ -81,7 +81,7 @@ import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
@RunWith(PowerMockRunner.class)
@PrepareForTest({KafkaConfigBackingStore.class, ConnectUtils.class})
@PrepareForTest({KafkaConfigBackingStore.class, WorkerConfig.class})
@PowerMockIgnore({"javax.management.*", "javax.crypto.*"})
public class KafkaConfigBackingStoreTest {
private static final String TOPIC = "connect-configs";
@ -190,9 +190,9 @@ public class KafkaConfigBackingStoreTest {
@Before
public void setUp() {
PowerMock.mockStaticPartial(ConnectUtils.class, "lookupKafkaClusterId");
EasyMock.expect(ConnectUtils.lookupKafkaClusterId(EasyMock.anyObject())).andReturn("test-cluster").anyTimes();
PowerMock.replay(ConnectUtils.class);
PowerMock.mockStaticPartial(WorkerConfig.class, "lookupKafkaClusterId");
EasyMock.expect(WorkerConfig.lookupKafkaClusterId(EasyMock.anyObject())).andReturn("test-cluster").anyTimes();
PowerMock.replay(WorkerConfig.class);
createStore(DEFAULT_DISTRIBUTED_CONFIG, storeLog);
}

View File

@ -25,9 +25,9 @@ import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.ConnectUtils;
import org.apache.kafka.connect.util.KafkaBasedLog;
import org.apache.kafka.connect.util.TopicAdmin;
import org.easymock.Capture;
@ -66,7 +66,7 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@RunWith(PowerMockRunner.class)
@PrepareForTest({KafkaOffsetBackingStore.class, ConnectUtils.class})
@PrepareForTest({KafkaOffsetBackingStore.class, WorkerConfig.class})
@PowerMockIgnore({"javax.management.*", "javax.crypto.*"})
public class KafkaOffsetBackingStoreTest {
private static final String TOPIC = "connect-offsets";
@ -491,8 +491,8 @@ public class KafkaOffsetBackingStoreTest {
}
private void expectClusterId() {
PowerMock.mockStaticPartial(ConnectUtils.class, "lookupKafkaClusterId");
EasyMock.expect(ConnectUtils.lookupKafkaClusterId(EasyMock.anyObject())).andReturn("test-cluster").anyTimes();
PowerMock.mockStaticPartial(WorkerConfig.class, "lookupKafkaClusterId");
EasyMock.expect(WorkerConfig.lookupKafkaClusterId(EasyMock.anyObject())).andReturn("test-cluster").anyTimes();
}
private static ByteBuffer buffer(String v) {

View File

@ -17,59 +17,21 @@
package org.apache.kafka.connect.util;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.MockAdminClient;
import org.apache.kafka.common.Node;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
import org.junit.Test;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThrows;
public class ConnectUtilsTest {
@Test
public void testLookupKafkaClusterId() {
final Node broker1 = new Node(0, "dummyHost-1", 1234);
final Node broker2 = new Node(1, "dummyHost-2", 1234);
List<Node> cluster = Arrays.asList(broker1, broker2);
MockAdminClient adminClient = new MockAdminClient.Builder().
brokers(cluster).build();
assertEquals(MockAdminClient.DEFAULT_CLUSTER_ID, ConnectUtils.lookupKafkaClusterId(adminClient));
}
@Test
public void testLookupNullKafkaClusterId() {
final Node broker1 = new Node(0, "dummyHost-1", 1234);
final Node broker2 = new Node(1, "dummyHost-2", 1234);
List<Node> cluster = Arrays.asList(broker1, broker2);
MockAdminClient adminClient = new MockAdminClient.Builder().
brokers(cluster).clusterId(null).build();
assertNull(ConnectUtils.lookupKafkaClusterId(adminClient));
}
@Test
public void testLookupKafkaClusterIdTimeout() {
final Node broker1 = new Node(0, "dummyHost-1", 1234);
final Node broker2 = new Node(1, "dummyHost-2", 1234);
List<Node> cluster = Arrays.asList(broker1, broker2);
MockAdminClient adminClient = new MockAdminClient.Builder().
brokers(cluster).build();
adminClient.timeoutNextRequest(1);
assertThrows(ConnectException.class, () -> ConnectUtils.lookupKafkaClusterId(adminClient));
}
@Test
public void testAddMetricsContextPropertiesDistributed() {
Map<String, String> props = new HashMap<>();