diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java index 7ac68319bca..cdc1d47fcfe 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java @@ -233,7 +233,7 @@ public class MirrorMaker { Plugins plugins = new Plugins(workerProps); plugins.compareAndSwapWithDelegatingLoader(); DistributedConfig distributedConfig = new DistributedConfig(workerProps); - String kafkaClusterId = ConnectUtils.lookupKafkaClusterId(distributedConfig); + String kafkaClusterId = distributedConfig.kafkaClusterId(); // Create the admin client to be shared by all backing stores for this herder Map adminProps = new HashMap<>(distributedConfig.originals()); ConnectUtils.addMetricsContextProperties(adminProps, distributedConfig, kafkaClusterId); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java index 8d93e795911..469f73add76 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java @@ -94,7 +94,7 @@ public class ConnectDistributed { plugins.compareAndSwapWithDelegatingLoader(); DistributedConfig config = new DistributedConfig(workerProps); - String kafkaClusterId = ConnectUtils.lookupKafkaClusterId(config); + String kafkaClusterId = config.kafkaClusterId(); log.debug("Kafka cluster ID: {}", kafkaClusterId); RestServer rest = new RestServer(config); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java index 19cc115d9dd..815be29fbec 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java @@ -32,7 +32,7 @@ import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo; import org.apache.kafka.connect.runtime.standalone.StandaloneConfig; import org.apache.kafka.connect.runtime.standalone.StandaloneHerder; import org.apache.kafka.connect.storage.FileOffsetBackingStore; -import org.apache.kafka.connect.util.ConnectUtils; +import org.apache.kafka.connect.storage.OffsetBackingStore; import org.apache.kafka.connect.util.FutureCallback; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -79,7 +79,7 @@ public class ConnectStandalone { plugins.compareAndSwapWithDelegatingLoader(); StandaloneConfig config = new StandaloneConfig(workerProps); - String kafkaClusterId = ConnectUtils.lookupKafkaClusterId(config); + String kafkaClusterId = config.kafkaClusterId(); log.debug("Kafka cluster ID: {}", kafkaClusterId); RestServer rest = new RestServer(config); @@ -88,10 +88,13 @@ public class ConnectStandalone { URI advertisedUrl = rest.advertisedUrl(); String workerId = advertisedUrl.getHost() + ":" + advertisedUrl.getPort(); + OffsetBackingStore offsetBackingStore = new FileOffsetBackingStore(); + offsetBackingStore.configure(config); + ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy = plugins.newPlugin( config.getString(WorkerConfig.CONNECTOR_CLIENT_POLICY_CLASS_CONFIG), config, ConnectorClientConfigOverridePolicy.class); - Worker worker = new Worker(workerId, time, plugins, config, new FileOffsetBackingStore(), + Worker worker = new Worker(workerId, time, plugins, config, offsetBackingStore, connectorClientConfigOverridePolicy); Herder herder = new StandaloneHerder(worker, kafkaClusterId, connectorClientConfigOverridePolicy); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java index 5bc67693d0a..55e445ca8e1 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java @@ -153,7 +153,7 @@ public class Worker { ExecutorService executorService, ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy ) { - this.kafkaClusterId = ConnectUtils.lookupKafkaClusterId(config); + this.kafkaClusterId = config.kafkaClusterId(); this.metrics = new ConnectMetrics(workerId, config, time, kafkaClusterId); this.executor = executorService; this.workerId = workerId; @@ -168,7 +168,6 @@ public class Worker { this.internalValueConverter = plugins.newInternalConverter(false, JsonConverter.class.getName(), internalConverterConfig); this.globalOffsetBackingStore = globalOffsetBackingStore; - this.globalOffsetBackingStore.configure(config); this.workerConfigTransformer = initConfigTransformer(); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java index 38dbeb87e1b..d8cec7173ee 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java @@ -18,6 +18,8 @@ package org.apache.kafka.connect.runtime; import org.apache.kafka.clients.ClientDnsLookup; import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef.Importance; @@ -27,6 +29,7 @@ import org.apache.kafka.common.config.SslClientAuth; import org.apache.kafka.common.config.internals.BrokerSecurityConfigs; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.storage.SimpleHeaderConverter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,6 +41,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.concurrent.ExecutionException; import java.util.regex.Pattern; import org.eclipse.jetty.util.StringUtil; @@ -188,7 +192,7 @@ public class WorkerConfig extends AbstractConfig { public static final String CONNECTOR_CLIENT_POLICY_CLASS_CONFIG = "connector.client.config.override.policy"; public static final String CONNECTOR_CLIENT_POLICY_CLASS_DOC = "Class name or alias of implementation of ConnectorClientConfigOverridePolicy. Defines what client configurations can be " - + "overriden by the connector. The default implementation is `All`, meaning connector configurations can override all client properties. " + + "overridden by the connector. The default implementation is `All`, meaning connector configurations can override all client properties. " + "The other possible policies in the framework include `None` to disallow connectors from overriding client properties, " + "and `Principal` to allow connectors to override only client principals."; public static final String CONNECTOR_CLIENT_POLICY_CLASS_DEFAULT = "All"; @@ -306,6 +310,35 @@ public class WorkerConfig extends AbstractConfig { .withClientSslSupport(); } + private String kafkaClusterId; + + public static String lookupKafkaClusterId(WorkerConfig config) { + log.info("Creating Kafka admin client"); + try (Admin adminClient = Admin.create(config.originals())) { + return lookupKafkaClusterId(adminClient); + } + } + + static String lookupKafkaClusterId(Admin adminClient) { + log.debug("Looking up Kafka cluster ID"); + try { + KafkaFuture clusterIdFuture = adminClient.describeCluster().clusterId(); + if (clusterIdFuture == null) { + log.info("Kafka cluster version is too old to return cluster ID"); + return null; + } + log.debug("Fetching Kafka cluster ID"); + String kafkaClusterId = clusterIdFuture.get(); + log.info("Kafka cluster ID: {}", kafkaClusterId); + return kafkaClusterId; + } catch (InterruptedException e) { + throw new ConnectException("Unexpectedly interrupted when looking up Kafka cluster info", e); + } catch (ExecutionException e) { + throw new ConnectException("Failed to connect to and describe Kafka cluster. " + + "Check worker's broker connection and security properties.", e); + } + } + private void logInternalConverterRemovalWarnings(Map props) { List removedProperties = new ArrayList<>(); for (String property : Arrays.asList("internal.key.converter", "internal.value.converter")) { @@ -321,7 +354,7 @@ public class WorkerConfig extends AbstractConfig { + "and specifying them will have no effect. " + "Instead, an instance of the JsonConverter with schemas.enable " + "set to false will be used. For more information, please visit " - + "http://kafka.apache.org/documentation/#upgrade and consult the upgrade notes" + + "https://kafka.apache.org/documentation/#upgrade and consult the upgrade notes" + "for the 3.0 release.", removedProperties); } @@ -409,6 +442,13 @@ public class WorkerConfig extends AbstractConfig { return null; } + public String kafkaClusterId() { + if (kafkaClusterId == null) { + kafkaClusterId = lookupKafkaClusterId(this); + } + return kafkaClusterId; + } + @Override protected Map postProcessParsedConfig(final Map parsedValues) { return CommonClientConfigs.postProcessReconnectBackoffConfigs(this, parsedValues); @@ -434,7 +474,7 @@ public class WorkerConfig extends AbstractConfig { String[] configTokens = config.trim().split("\\s+", 2); if (configTokens.length != 2) { throw new ConfigException(String.format("Invalid format of header config '%s'. " - + "Expected: '[ation] [header name]:[header value]'", config)); + + "Expected: '[action] [header name]:[header value]'", config)); } // validate action diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java index 4c1d6a5b9e1..d7ad3f4eb31 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java @@ -39,7 +39,6 @@ import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.connect.runtime.WorkerConfig; import org.apache.kafka.connect.storage.ConfigBackingStore; -import org.apache.kafka.connect.util.ConnectUtils; import org.apache.kafka.connect.util.ConnectorTaskId; import org.slf4j.Logger; @@ -62,12 +61,9 @@ public class WorkerGroupMember { private static final String JMX_PREFIX = "kafka.connect"; private final Logger log; - private final Time time; private final String clientId; private final ConsumerNetworkClient client; private final Metrics metrics; - private final Metadata metadata; - private final long retryBackoffMs; private final WorkerCoordinator coordinator; private boolean stopped = false; @@ -80,7 +76,6 @@ public class WorkerGroupMember { String clientId, LogContext logContext) { try { - this.time = time; this.clientId = clientId; this.log = logContext.logger(WorkerGroupMember.class); @@ -98,23 +93,23 @@ public class WorkerGroupMember { Map contextLabels = new HashMap<>(); contextLabels.putAll(config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX)); - contextLabels.put(WorkerConfig.CONNECT_KAFKA_CLUSTER_ID, ConnectUtils.lookupKafkaClusterId(config)); + contextLabels.put(WorkerConfig.CONNECT_KAFKA_CLUSTER_ID, config.kafkaClusterId()); contextLabels.put(WorkerConfig.CONNECT_GROUP_ID, config.getString(DistributedConfig.GROUP_ID_CONFIG)); MetricsContext metricsContext = new KafkaMetricsContext(JMX_PREFIX, contextLabels); this.metrics = new Metrics(metricConfig, reporters, time, metricsContext); - this.retryBackoffMs = config.getLong(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG); - this.metadata = new Metadata(retryBackoffMs, config.getLong(CommonClientConfigs.METADATA_MAX_AGE_CONFIG), + long retryBackoffMs = config.getLong(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG); + Metadata metadata = new Metadata(retryBackoffMs, config.getLong(CommonClientConfigs.METADATA_MAX_AGE_CONFIG), logContext, new ClusterResourceListeners()); List addresses = ClientUtils.parseAndValidateAddresses( config.getList(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG), config.getString(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG)); - this.metadata.bootstrap(addresses); + metadata.bootstrap(addresses); String metricGrpPrefix = "connect"; ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config, time, logContext); NetworkClient netClient = new NetworkClient( new Selector(config.getLong(CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG), metrics, time, metricGrpPrefix, channelBuilder, logContext), - this.metadata, + metadata, clientId, 100, // a fixed large enough value will suffice config.getLong(CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG), @@ -142,7 +137,7 @@ public class WorkerGroupMember { this.client, metrics, metricGrpPrefix, - this.time, + time, restUrl, configStorage, listener, diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java index 6edd0e5a76d..edbab8ec040 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java @@ -378,7 +378,7 @@ public class KafkaConfigBackingStore implements ConfigBackingStore { private Map baseProducerProps(WorkerConfig workerConfig) { Map producerProps = new HashMap<>(workerConfig.originals()); - String kafkaClusterId = ConnectUtils.lookupKafkaClusterId(workerConfig); + String kafkaClusterId = workerConfig.kafkaClusterId(); producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); producerProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.MAX_VALUE); @@ -665,7 +665,7 @@ public class KafkaConfigBackingStore implements ConfigBackingStore { KafkaBasedLog setupAndCreateKafkaBasedLog(String topic, final WorkerConfig config) { Map producerProps = new HashMap<>(baseProducerProps); - String clusterId = ConnectUtils.lookupKafkaClusterId(config); + String clusterId = config.kafkaClusterId(); Map originals = config.originals(); Map consumerProps = new HashMap<>(originals); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java index 66935720026..28fd37709da 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java @@ -172,7 +172,7 @@ public class KafkaOffsetBackingStore implements OffsetBackingStore { this.exactlyOnce = config.exactlyOnceSourceEnabled(); - String clusterId = ConnectUtils.lookupKafkaClusterId(config); + String clusterId = config.kafkaClusterId(); Map originals = config.originals(); Map producerProps = new HashMap<>(originals); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java index 3ba6996da8a..ebf4939bfdd 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java @@ -164,7 +164,7 @@ public class KafkaStatusBackingStore implements StatusBackingStore { if (this.statusTopic == null || this.statusTopic.trim().length() == 0) throw new ConfigException("Must specify topic for connector status."); - String clusterId = ConnectUtils.lookupKafkaClusterId(config); + String clusterId = config.kafkaClusterId(); Map originals = config.originals(); Map producerProps = new HashMap<>(originals); producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java index 0af14cc7f30..a043d0f4709 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java @@ -17,8 +17,6 @@ package org.apache.kafka.connect.util; import org.apache.kafka.clients.CommonClientConfigs; -import org.apache.kafka.clients.admin.Admin; -import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.InvalidRecordException; import org.apache.kafka.common.record.RecordBatch; import org.apache.kafka.connect.connector.Connector; @@ -35,7 +33,6 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; -import java.util.concurrent.ExecutionException; import java.util.function.Function; import java.util.stream.Collector; import java.util.stream.Collectors; @@ -52,33 +49,6 @@ public final class ConnectUtils { throw new InvalidRecordException(String.format("Invalid record timestamp %d", timestamp)); } - public static String lookupKafkaClusterId(WorkerConfig config) { - log.info("Creating Kafka admin client"); - try (Admin adminClient = Admin.create(config.originals())) { - return lookupKafkaClusterId(adminClient); - } - } - - static String lookupKafkaClusterId(Admin adminClient) { - log.debug("Looking up Kafka cluster ID"); - try { - KafkaFuture clusterIdFuture = adminClient.describeCluster().clusterId(); - if (clusterIdFuture == null) { - log.info("Kafka cluster version is too old to return cluster ID"); - return null; - } - log.debug("Fetching Kafka cluster ID"); - String kafkaClusterId = clusterIdFuture.get(); - log.info("Kafka cluster ID: {}", kafkaClusterId); - return kafkaClusterId; - } catch (InterruptedException e) { - throw new ConnectException("Unexpectedly interrupted when looking up Kafka cluster info", e); - } catch (ExecutionException e) { - throw new ConnectException("Failed to connect to and describe Kafka cluster. " - + "Check worker's broker connection and security properties.", e); - } - } - /** * Ensure that the {@link Map properties} contain an expected value for the given key, inserting the * expected value into the properties if necessary. diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTest.java index f02892888e3..ec9a843d4da 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTest.java @@ -17,9 +17,16 @@ package org.apache.kafka.connect.runtime; import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.admin.MockAdminClient; +import org.apache.kafka.common.Node; import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.config.internals.BrokerSecurityConfigs; +import org.apache.kafka.connect.errors.ConnectException; +import org.junit.After; +import org.junit.Before; import org.junit.Test; +import org.mockito.MockedStatic; +import org.mockito.internal.stubbing.answers.CallsRealMethods; import java.util.Arrays; import java.util.HashMap; @@ -31,6 +38,9 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.times; public class WorkerConfigTest { private static final List VALID_HEADER_CONFIGS = Arrays.asList( @@ -58,6 +68,20 @@ public class WorkerConfigTest { "set X-Frame-Options:DENY, add :no-cache, no-store, must-revalidate " ); + private static final String CLUSTER_ID = "cluster-id"; + private MockedStatic workerConfigMockedStatic; + + @Before + public void setup() { + workerConfigMockedStatic = mockStatic(WorkerConfig.class, new CallsRealMethods()); + workerConfigMockedStatic.when(() -> WorkerConfig.lookupKafkaClusterId(any(WorkerConfig.class))).thenReturn(CLUSTER_ID); + } + + @After + public void teardown() { + workerConfigMockedStatic.close(); + } + @Test public void testListenersConfigAllowedValues() { Map props = baseProps(); @@ -157,6 +181,50 @@ public class WorkerConfigTest { assertTrue(ce.getMessage().contains(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG)); } + @Test + public void testLookupKafkaClusterId() { + final Node broker1 = new Node(0, "dummyHost-1", 1234); + final Node broker2 = new Node(1, "dummyHost-2", 1234); + List cluster = Arrays.asList(broker1, broker2); + MockAdminClient adminClient = new MockAdminClient.Builder(). + brokers(cluster).build(); + assertEquals(MockAdminClient.DEFAULT_CLUSTER_ID, WorkerConfig.lookupKafkaClusterId(adminClient)); + } + + @Test + public void testLookupNullKafkaClusterId() { + final Node broker1 = new Node(0, "dummyHost-1", 1234); + final Node broker2 = new Node(1, "dummyHost-2", 1234); + List cluster = Arrays.asList(broker1, broker2); + MockAdminClient adminClient = new MockAdminClient.Builder(). + brokers(cluster).clusterId(null).build(); + assertNull(WorkerConfig.lookupKafkaClusterId(adminClient)); + } + + @Test + public void testLookupKafkaClusterIdTimeout() { + final Node broker1 = new Node(0, "dummyHost-1", 1234); + final Node broker2 = new Node(1, "dummyHost-2", 1234); + List cluster = Arrays.asList(broker1, broker2); + MockAdminClient adminClient = new MockAdminClient.Builder(). + brokers(cluster).build(); + adminClient.timeoutNextRequest(1); + + assertThrows(ConnectException.class, () -> WorkerConfig.lookupKafkaClusterId(adminClient)); + } + + @Test + public void testKafkaClusterId() { + Map props = baseProps(); + WorkerConfig config = new WorkerConfig(WorkerConfig.baseConfigDef(), props); + assertEquals(CLUSTER_ID, config.kafkaClusterId()); + workerConfigMockedStatic.verify(() -> WorkerConfig.lookupKafkaClusterId(any(WorkerConfig.class)), times(1)); + + // next calls hit the cache + assertEquals(CLUSTER_ID, config.kafkaClusterId()); + workerConfigMockedStatic.verify(() -> WorkerConfig.lookupKafkaClusterId(any(WorkerConfig.class)), times(1)); + } + private void assertInvalidHeaderConfig(String config) { assertThrows(ConfigException.class, () -> WorkerConfig.validateHttpResponseHeaderConfig(config)); } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java index ef9398ace3d..9eb622cafbd 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java @@ -59,7 +59,6 @@ import org.apache.kafka.connect.storage.Converter; import org.apache.kafka.connect.storage.HeaderConverter; import org.apache.kafka.connect.storage.OffsetBackingStore; import org.apache.kafka.connect.storage.StatusBackingStore; -import org.apache.kafka.connect.util.ConnectUtils; import org.apache.kafka.connect.util.ConnectorTaskId; import org.apache.kafka.connect.util.FutureCallback; import org.apache.kafka.connect.util.ParameterizedTest; @@ -201,7 +200,7 @@ public class WorkerTest { private final boolean enableTopicCreation; private MockedStatic pluginsMockedStatic; - private MockedStatic connectUtilsMockedStatic; + private MockedStatic workerConfigMockedStatic; private MockedConstruction sourceTaskMockedConstruction; private MockitoSession mockitoSession; @@ -263,8 +262,8 @@ public class WorkerTest { pluginsMockedStatic = mockStatic(Plugins.class); // pass through things that aren't explicitly mocked out - connectUtilsMockedStatic = mockStatic(ConnectUtils.class, new CallsRealMethods()); - connectUtilsMockedStatic.when(() -> ConnectUtils.lookupKafkaClusterId(any(WorkerConfig.class))).thenReturn(CLUSTER_ID); + workerConfigMockedStatic = mockStatic(WorkerConfig.class, new CallsRealMethods()); + workerConfigMockedStatic.when(() -> WorkerConfig.lookupKafkaClusterId(any(WorkerConfig.class))).thenReturn(CLUSTER_ID); // Make calls to new WorkerSourceTask() return a mock to avoid the source task trying to connect to a broker. sourceTaskMockedConstruction = mockConstructionWithAnswer(WorkerSourceTask.class, invocation -> { @@ -289,7 +288,7 @@ public class WorkerTest { // Ideal would be to use try-with-resources in an individual test, but it introduced a rather large level of // indentation of most test bodies, hence sticking with setup() / teardown() pluginsMockedStatic.close(); - connectUtilsMockedStatic.close(); + workerConfigMockedStatic.close(); sourceTaskMockedConstruction.close(); mockitoSession.finishMocking(); @@ -309,7 +308,7 @@ public class WorkerTest { pluginsMockedStatic.when(() -> Plugins.compareAndSwapLoaders(pluginLoader)).thenReturn(delegatingLoader); pluginsMockedStatic.when(() -> Plugins.compareAndSwapLoaders(delegatingLoader)).thenReturn(pluginLoader); - connectUtilsMockedStatic.when(() -> ConnectUtils.lookupKafkaClusterId(any(WorkerConfig.class))) + workerConfigMockedStatic.when(() -> WorkerConfig.lookupKafkaClusterId(any(WorkerConfig.class))) .thenReturn(CLUSTER_ID); worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, noneConnectorClientConfigOverridePolicy); @@ -359,7 +358,7 @@ public class WorkerTest { pluginsMockedStatic.verify(() -> Plugins.compareAndSwapLoaders(pluginLoader), times(2)); pluginsMockedStatic.verify(() -> Plugins.compareAndSwapLoaders(delegatingLoader), times(2)); - connectUtilsMockedStatic.verify(() -> ConnectUtils.lookupKafkaClusterId(any(WorkerConfig.class))); + workerConfigMockedStatic.verify(() -> WorkerConfig.lookupKafkaClusterId(any(WorkerConfig.class))); verify(sourceConnector).stop(); verify(connectorStatusListener).onShutdown(CONNECTOR_ID); @@ -387,7 +386,7 @@ public class WorkerTest { when(plugins.delegatingLoader()).thenReturn(delegatingLoader); when(delegatingLoader.connectorLoader(nonConnectorClass)).thenReturn(delegatingLoader); pluginsMockedStatic.when(() -> Plugins.compareAndSwapLoaders(delegatingLoader)).thenReturn(delegatingLoader); - connectUtilsMockedStatic.when(() -> ConnectUtils.lookupKafkaClusterId(any(WorkerConfig.class))) + workerConfigMockedStatic.when(() -> WorkerConfig.lookupKafkaClusterId(any(WorkerConfig.class))) .thenReturn("test-cluster"); when(plugins.newConnector(anyString())).thenThrow(exception); @@ -422,7 +421,7 @@ public class WorkerTest { verify(plugins).newConnector(anyString()); verify(connectorStatusListener).onFailure(eq(CONNECTOR_ID), any(ConnectException.class)); pluginsMockedStatic.verify(() -> Plugins.compareAndSwapLoaders(delegatingLoader), times(2)); - connectUtilsMockedStatic.verify(() -> ConnectUtils.lookupKafkaClusterId(any(WorkerConfig.class))); + workerConfigMockedStatic.verify(() -> WorkerConfig.lookupKafkaClusterId(any(WorkerConfig.class))); } @Test @@ -437,7 +436,7 @@ public class WorkerTest { pluginsMockedStatic.when(() -> Plugins.compareAndSwapLoaders(pluginLoader)).thenReturn(delegatingLoader); pluginsMockedStatic.when(() -> Plugins.compareAndSwapLoaders(delegatingLoader)).thenReturn(pluginLoader); - connectUtilsMockedStatic.when(() -> ConnectUtils.lookupKafkaClusterId(any(WorkerConfig.class))) + workerConfigMockedStatic.when(() -> WorkerConfig.lookupKafkaClusterId(any(WorkerConfig.class))) .thenReturn("test-cluster"); connectorProps.put(CONNECTOR_CLASS_CONFIG, connectorAlias); @@ -479,7 +478,7 @@ public class WorkerTest { pluginsMockedStatic.verify(() -> Plugins.compareAndSwapLoaders(pluginLoader), times(2)); pluginsMockedStatic.verify(() -> Plugins.compareAndSwapLoaders(delegatingLoader), times(2)); - connectUtilsMockedStatic.verify(() -> ConnectUtils.lookupKafkaClusterId(any(WorkerConfig.class))); + workerConfigMockedStatic.verify(() -> WorkerConfig.lookupKafkaClusterId(any(WorkerConfig.class))); } @Test @@ -529,7 +528,7 @@ public class WorkerTest { verify(ctx).close(); pluginsMockedStatic.verify(() -> Plugins.compareAndSwapLoaders(delegatingLoader), times(2)); - connectUtilsMockedStatic.verify(() -> ConnectUtils.lookupKafkaClusterId(any(WorkerConfig.class))); + workerConfigMockedStatic.verify(() -> WorkerConfig.lookupKafkaClusterId(any(WorkerConfig.class))); } @Test @@ -676,7 +675,7 @@ public class WorkerTest { pluginsMockedStatic.verify(() -> Plugins.compareAndSwapLoaders(pluginLoader), times(2)); pluginsMockedStatic.verify(() -> Plugins.compareAndSwapLoaders(delegatingLoader), times(2)); - connectUtilsMockedStatic.verify(() -> ConnectUtils.lookupKafkaClusterId(any(WorkerConfig.class))); + workerConfigMockedStatic.verify(() -> WorkerConfig.lookupKafkaClusterId(any(WorkerConfig.class))); } @Test @@ -734,7 +733,7 @@ public class WorkerTest { pluginsMockedStatic.verify(() -> Plugins.compareAndSwapLoaders(pluginLoader), times(2)); pluginsMockedStatic.verify(() -> Plugins.compareAndSwapLoaders(delegatingLoader), times(2)); - connectUtilsMockedStatic.verify(() -> ConnectUtils.lookupKafkaClusterId(any(WorkerConfig.class))); + workerConfigMockedStatic.verify(() -> WorkerConfig.lookupKafkaClusterId(any(WorkerConfig.class))); } @Test @@ -806,7 +805,7 @@ public class WorkerTest { pluginsMockedStatic.verify(() -> Plugins.compareAndSwapLoaders(pluginLoader), times(2)); pluginsMockedStatic.verify(() -> Plugins.compareAndSwapLoaders(delegatingLoader), times(2)); - connectUtilsMockedStatic.verify(() -> ConnectUtils.lookupKafkaClusterId(any(WorkerConfig.class))); + workerConfigMockedStatic.verify(() -> WorkerConfig.lookupKafkaClusterId(any(WorkerConfig.class))); } @Test @@ -918,7 +917,7 @@ public class WorkerTest { mockInternalConverters(); mockFileConfigProvider(); - connectUtilsMockedStatic.when(() -> ConnectUtils.lookupKafkaClusterId(any())).thenReturn(CLUSTER_ID); + workerConfigMockedStatic.when(() -> WorkerConfig.lookupKafkaClusterId(any(WorkerConfig.class))).thenReturn(CLUSTER_ID); worker = new Worker(WORKER_ID, new MockTime(), @@ -935,7 +934,7 @@ public class WorkerTest { assertEquals(1L, (long) metricGroup.taskCounter("c2").metricValue(0L)); assertEquals(0L, (long) metricGroup.taskCounter("fakeConnector").metricValue(0L)); - connectUtilsMockedStatic.verify(() -> ConnectUtils.lookupKafkaClusterId(any())); + workerConfigMockedStatic.verify(() -> WorkerConfig.lookupKafkaClusterId(any(WorkerConfig.class))); } @Test @@ -1250,7 +1249,7 @@ public class WorkerTest { Map props = new HashMap<>(workerProps); props.put("admin.client.id", "testid"); props.put("admin.metadata.max.age.ms", "5000"); - props.put("producer.bootstrap.servers", "cbeauho.com"); + props.put("producer.bootstrap.servers", "localhost:1234"); props.put("consumer.bootstrap.servers", "localhost:4761"); WorkerConfig configWithOverrides = new StandaloneConfig(props); @@ -1919,7 +1918,6 @@ public class WorkerTest { private void verifyStorage() { - verify(offsetBackingStore).configure(any(WorkerConfig.class)); verify(offsetBackingStore).start(); verify(herder).statusBackingStore(); verify(offsetBackingStore).stop(); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMemberTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMemberTest.java index 563d71dbed6..fa910bb54c6 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMemberTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMemberTest.java @@ -25,7 +25,6 @@ import org.apache.kafka.common.utils.Time; import org.apache.kafka.connect.runtime.MockConnectMetrics; import org.apache.kafka.connect.runtime.WorkerConfig; import org.apache.kafka.connect.storage.ConfigBackingStore; -import org.apache.kafka.connect.util.ConnectUtils; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; @@ -65,12 +64,11 @@ public class WorkerGroupMemberTest { LogContext logContext = new LogContext("[Worker clientId=client-1 + groupId= group-1]"); - - try (MockedStatic utilities = mockStatic(ConnectUtils.class)) { - utilities.when(() -> ConnectUtils.lookupKafkaClusterId(any())).thenReturn("cluster-1"); + try (MockedStatic utilities = mockStatic(WorkerConfig.class)) { + utilities.when(() -> WorkerConfig.lookupKafkaClusterId(any())).thenReturn("cluster-1"); member = new WorkerGroupMember(config, "", configBackingStore, null, Time.SYSTEM, "client-1", logContext); - utilities.verify(() -> ConnectUtils.lookupKafkaClusterId(any())); - } + utilities.verify(() -> WorkerConfig.lookupKafkaClusterId(any())); + } boolean entered = false; for (MetricsReporter reporter : member.metrics().reporters()) { diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java index f3b74f06664..557d6775257 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java @@ -33,9 +33,9 @@ import org.apache.kafka.connect.data.SchemaAndValue; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.runtime.RestartRequest; import org.apache.kafka.connect.runtime.TargetState; +import org.apache.kafka.connect.runtime.WorkerConfig; import org.apache.kafka.connect.runtime.distributed.DistributedConfig; import org.apache.kafka.connect.util.Callback; -import org.apache.kafka.connect.util.ConnectUtils; import org.apache.kafka.connect.util.ConnectorTaskId; import org.apache.kafka.connect.util.KafkaBasedLog; import org.apache.kafka.connect.util.TestFuture; @@ -81,7 +81,7 @@ import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; @RunWith(PowerMockRunner.class) -@PrepareForTest({KafkaConfigBackingStore.class, ConnectUtils.class}) +@PrepareForTest({KafkaConfigBackingStore.class, WorkerConfig.class}) @PowerMockIgnore({"javax.management.*", "javax.crypto.*"}) public class KafkaConfigBackingStoreTest { private static final String TOPIC = "connect-configs"; @@ -190,9 +190,9 @@ public class KafkaConfigBackingStoreTest { @Before public void setUp() { - PowerMock.mockStaticPartial(ConnectUtils.class, "lookupKafkaClusterId"); - EasyMock.expect(ConnectUtils.lookupKafkaClusterId(EasyMock.anyObject())).andReturn("test-cluster").anyTimes(); - PowerMock.replay(ConnectUtils.class); + PowerMock.mockStaticPartial(WorkerConfig.class, "lookupKafkaClusterId"); + EasyMock.expect(WorkerConfig.lookupKafkaClusterId(EasyMock.anyObject())).andReturn("test-cluster").anyTimes(); + PowerMock.replay(WorkerConfig.class); createStore(DEFAULT_DISTRIBUTED_CONFIG, storeLog); } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java index cf11230f3d2..d019c983989 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java @@ -25,9 +25,9 @@ import org.apache.kafka.common.IsolationLevel; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.connect.runtime.WorkerConfig; import org.apache.kafka.connect.runtime.distributed.DistributedConfig; import org.apache.kafka.connect.util.Callback; -import org.apache.kafka.connect.util.ConnectUtils; import org.apache.kafka.connect.util.KafkaBasedLog; import org.apache.kafka.connect.util.TopicAdmin; import org.easymock.Capture; @@ -66,7 +66,7 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @RunWith(PowerMockRunner.class) -@PrepareForTest({KafkaOffsetBackingStore.class, ConnectUtils.class}) +@PrepareForTest({KafkaOffsetBackingStore.class, WorkerConfig.class}) @PowerMockIgnore({"javax.management.*", "javax.crypto.*"}) public class KafkaOffsetBackingStoreTest { private static final String TOPIC = "connect-offsets"; @@ -491,8 +491,8 @@ public class KafkaOffsetBackingStoreTest { } private void expectClusterId() { - PowerMock.mockStaticPartial(ConnectUtils.class, "lookupKafkaClusterId"); - EasyMock.expect(ConnectUtils.lookupKafkaClusterId(EasyMock.anyObject())).andReturn("test-cluster").anyTimes(); + PowerMock.mockStaticPartial(WorkerConfig.class, "lookupKafkaClusterId"); + EasyMock.expect(WorkerConfig.lookupKafkaClusterId(EasyMock.anyObject())).andReturn("test-cluster").anyTimes(); } private static ByteBuffer buffer(String v) { diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/ConnectUtilsTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/ConnectUtilsTest.java index d1330277e8e..d68add14eb3 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/ConnectUtilsTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/ConnectUtilsTest.java @@ -17,59 +17,21 @@ package org.apache.kafka.connect.util; import org.apache.kafka.clients.CommonClientConfigs; -import org.apache.kafka.clients.admin.MockAdminClient; -import org.apache.kafka.common.Node; -import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.runtime.WorkerConfig; import org.apache.kafka.connect.runtime.distributed.DistributedConfig; import org.apache.kafka.connect.runtime.standalone.StandaloneConfig; import org.junit.Test; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.Optional; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertThrows; public class ConnectUtilsTest { - @Test - public void testLookupKafkaClusterId() { - final Node broker1 = new Node(0, "dummyHost-1", 1234); - final Node broker2 = new Node(1, "dummyHost-2", 1234); - List cluster = Arrays.asList(broker1, broker2); - MockAdminClient adminClient = new MockAdminClient.Builder(). - brokers(cluster).build(); - assertEquals(MockAdminClient.DEFAULT_CLUSTER_ID, ConnectUtils.lookupKafkaClusterId(adminClient)); - } - - @Test - public void testLookupNullKafkaClusterId() { - final Node broker1 = new Node(0, "dummyHost-1", 1234); - final Node broker2 = new Node(1, "dummyHost-2", 1234); - List cluster = Arrays.asList(broker1, broker2); - MockAdminClient adminClient = new MockAdminClient.Builder(). - brokers(cluster).clusterId(null).build(); - assertNull(ConnectUtils.lookupKafkaClusterId(adminClient)); - } - - @Test - public void testLookupKafkaClusterIdTimeout() { - final Node broker1 = new Node(0, "dummyHost-1", 1234); - final Node broker2 = new Node(1, "dummyHost-2", 1234); - List cluster = Arrays.asList(broker1, broker2); - MockAdminClient adminClient = new MockAdminClient.Builder(). - brokers(cluster).build(); - adminClient.timeoutNextRequest(1); - - assertThrows(ConnectException.class, () -> ConnectUtils.lookupKafkaClusterId(adminClient)); - } - @Test public void testAddMetricsContextPropertiesDistributed() { Map props = new HashMap<>();