KAFKA-15031: Add plugin.discovery to Connect worker configuration (KIP-898) (#14055)

Reviewers: Chris Egerton <chrise@aiven.io>
This commit is contained in:
Greg Harris 2023-08-08 10:06:35 -07:00 committed by GitHub
parent 60a5117001
commit ff4fed5cbe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
27 changed files with 688 additions and 7 deletions

View File

@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.kafka.connect.storage.StringConverter

View File

@ -0,0 +1,17 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.kafka.connect.storage.SimpleHeaderConverter
org.apache.kafka.connect.storage.StringConverter

View File

@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.kafka.connect.sink.SinkConnectorTest$TestSinkConnector

View File

@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.kafka.connect.source.SourceConnectorTest$TestSourceConnector

View File

@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.kafka.connect.file.FileStreamSinkConnector

View File

@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.kafka.connect.file.FileStreamSourceConnector

View File

@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.kafka.connect.json.JsonConverter

View File

@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.kafka.connect.json.JsonConverter

View File

@ -0,0 +1,18 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.kafka.connect.mirror.MirrorCheckpointConnector
org.apache.kafka.connect.mirror.MirrorHeartbeatConnector
org.apache.kafka.connect.mirror.MirrorSourceConnector

View File

@ -25,7 +25,9 @@ import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.runtime.isolation.PluginDiscoveryMode;
import org.apache.kafka.connect.runtime.rest.RestServerConfig;
import org.apache.kafka.connect.storage.SimpleHeaderConverter;
import org.slf4j.Logger;
@ -35,6 +37,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
@ -42,6 +45,10 @@ import java.util.concurrent.ExecutionException;
import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
import static org.apache.kafka.connect.runtime.SourceConnectorConfig.TOPIC_CREATION_PREFIX;
import static org.apache.kafka.connect.runtime.isolation.PluginDiscoveryMode.ONLY_SCAN;
import static org.apache.kafka.connect.runtime.isolation.PluginDiscoveryMode.HYBRID_WARN;
import static org.apache.kafka.connect.runtime.isolation.PluginDiscoveryMode.HYBRID_FAIL;
import static org.apache.kafka.connect.runtime.isolation.PluginDiscoveryMode.SERVICE_LOAD;
/**
* Common base class providing configuration for Kafka Connect workers, whether standalone or distributed.
@ -122,6 +129,18 @@ public class WorkerConfig extends AbstractConfig {
+ "by the worker's scanner before config providers are initialized and used to "
+ "replace variables.";
public static final String PLUGIN_DISCOVERY_CONFIG = "plugin.discovery";
protected static final String PLUGIN_DISCOVERY_DOC = "Method to use to discover plugins present in the classpath "
+ "and plugin.path configuration. This can be one of multiple values with the following meanings:\n"
+ "* " + ONLY_SCAN + ": Discover plugins only by reflection. "
+ "Plugins which are not discoverable by ServiceLoader will not impact worker startup.\n"
+ "* " + HYBRID_WARN + ": Discover plugins reflectively and by ServiceLoader. "
+ "Plugins which are not discoverable by ServiceLoader will print warnings during worker startup.\n"
+ "* " + HYBRID_FAIL + ": Discover plugins reflectively and by ServiceLoader. "
+ "Plugins which are not discoverable by ServiceLoader will cause worker startup to fail.\n"
+ "* " + SERVICE_LOAD + ": Discover plugins only by ServiceLoader. Faster startup than other modes. "
+ "Plugins which are not discoverable by ServiceLoader may not be usable.";
public static final String CONFIG_PROVIDERS_CONFIG = "config.providers";
protected static final String CONFIG_PROVIDERS_DOC =
"Comma-separated names of <code>ConfigProvider</code> classes, loaded and used "
@ -199,6 +218,12 @@ public class WorkerConfig extends AbstractConfig {
null,
Importance.LOW,
PLUGIN_PATH_DOC)
.define(PLUGIN_DISCOVERY_CONFIG,
Type.STRING,
PluginDiscoveryMode.HYBRID_WARN.toString(),
ConfigDef.CaseInsensitiveValidString.in(Utils.enumOptions(PluginDiscoveryMode.class)),
Importance.LOW,
PLUGIN_DISCOVERY_DOC)
.define(METRICS_SAMPLE_WINDOW_MS_CONFIG, Type.LONG,
30000, atLeast(0), Importance.LOW,
CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_DOC)
@ -401,6 +426,16 @@ public class WorkerConfig extends AbstractConfig {
return props.get(WorkerConfig.PLUGIN_PATH_CONFIG);
}
public static PluginDiscoveryMode pluginDiscovery(Map<String, String> props) {
String value = props.getOrDefault(WorkerConfig.PLUGIN_DISCOVERY_CONFIG, PluginDiscoveryMode.HYBRID_WARN.toString());
try {
return PluginDiscoveryMode.valueOf(value.toUpperCase(Locale.ROOT));
} catch (IllegalArgumentException e) {
throw new ConnectException("Invalid " + PLUGIN_DISCOVERY_CONFIG + " value, must be one of "
+ Arrays.toString(Utils.enumOptions(PluginDiscoveryMode.class)));
}
}
public WorkerConfig(ConfigDef definition, Map<String, String> props) {
super(definition, props);
logInternalConverterRemovalWarnings(props);

View File

@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.runtime.isolation;
import java.util.Locale;
/**
* Strategy to use to discover plugins usable on a Connect worker.
* @see <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-898%3A+Modernize+Connect+plugin+discovery">KIP-898</a>
*/
public enum PluginDiscoveryMode {
/**
* Scan for plugins reflectively. This corresponds to the legacy behavior of Connect prior to KIP-898.
* <p>Note: the following plugins are still loaded using {@link java.util.ServiceLoader} in this mode:
* <ul>
* <li>{@link org.apache.kafka.common.config.provider.ConfigProvider}</li>
* <li>{@link org.apache.kafka.connect.rest.ConnectRestExtension}</li>
* <li>{@link org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy}</li>
* </ul>
*/
ONLY_SCAN,
/**
* Scan for plugins reflectively and via {@link java.util.ServiceLoader}.
* Emit warnings if one or more plugins is not available via {@link java.util.ServiceLoader}
*/
HYBRID_WARN,
/**
* Scan for plugins reflectively and via {@link java.util.ServiceLoader}.
* Fail worker during startup if one or more plugins is not available via {@link java.util.ServiceLoader}
*/
HYBRID_FAIL,
/**
* Discover plugins via {@link java.util.ServiceLoader} only.
* Plugins may not be usable if they are not available via {@link java.util.ServiceLoader}
*/
SERVICE_LOAD;
public boolean reflectivelyScan() {
return this != SERVICE_LOAD;
}
public boolean serviceLoad() {
return this != ONLY_SCAN;
}
@Override
public String toString() {
return name().toLowerCase(Locale.ROOT);
}
}

View File

@ -39,11 +39,15 @@ import org.slf4j.LoggerFactory;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.stream.Collectors;
public class Plugins {
@ -63,16 +67,64 @@ public class Plugins {
// VisibleForTesting
Plugins(Map<String, String> props, ClassLoader parent, ClassLoaderFactory factory) {
String pluginPath = WorkerConfig.pluginPath(props);
PluginDiscoveryMode discoveryMode = WorkerConfig.pluginDiscovery(props);
Set<Path> pluginLocations = PluginUtils.pluginLocations(pluginPath);
delegatingLoader = factory.newDelegatingClassLoader(parent);
Set<PluginSource> pluginSources = PluginUtils.pluginSources(pluginLocations, delegatingLoader, factory);
scanResult = initLoaders(pluginSources);
scanResult = initLoaders(pluginSources, discoveryMode);
}
private PluginScanResult initLoaders(Set<PluginSource> pluginSources) {
PluginScanResult reflectiveScanResult = new ReflectionScanner().discoverPlugins(pluginSources);
delegatingLoader.installDiscoveredPlugins(reflectiveScanResult);
return reflectiveScanResult;
public PluginScanResult initLoaders(Set<PluginSource> pluginSources, PluginDiscoveryMode discoveryMode) {
PluginScanResult empty = new PluginScanResult(Collections.emptyList());
PluginScanResult serviceLoadingScanResult;
try {
serviceLoadingScanResult = discoveryMode.serviceLoad() ?
new ServiceLoaderScanner().discoverPlugins(pluginSources) : empty;
} catch (Throwable t) {
throw new ConnectException(String.format(
"Unable to perform ServiceLoader scanning as requested by %s=%s. It may be possible to fix this issue by reconfiguring %s=%s",
WorkerConfig.PLUGIN_DISCOVERY_CONFIG, discoveryMode,
WorkerConfig.PLUGIN_DISCOVERY_CONFIG, PluginDiscoveryMode.ONLY_SCAN), t);
}
PluginScanResult reflectiveScanResult = discoveryMode.reflectivelyScan() ?
new ReflectionScanner().discoverPlugins(pluginSources) : empty;
PluginScanResult scanResult = new PluginScanResult(Arrays.asList(reflectiveScanResult, serviceLoadingScanResult));
maybeReportHybridDiscoveryIssue(discoveryMode, serviceLoadingScanResult, scanResult);
delegatingLoader.installDiscoveredPlugins(scanResult);
return scanResult;
}
// visible for testing
static void maybeReportHybridDiscoveryIssue(PluginDiscoveryMode discoveryMode, PluginScanResult serviceLoadingScanResult, PluginScanResult mergedResult) {
SortedSet<PluginDesc<?>> missingPlugins = new TreeSet<>();
mergedResult.forEach(missingPlugins::add);
serviceLoadingScanResult.forEach(missingPlugins::remove);
if (missingPlugins.isEmpty()) {
if (discoveryMode == PluginDiscoveryMode.HYBRID_WARN || discoveryMode == PluginDiscoveryMode.HYBRID_FAIL) {
log.warn("All plugins have ServiceLoader manifests, consider reconfiguring {}={}",
WorkerConfig.PLUGIN_DISCOVERY_CONFIG, PluginDiscoveryMode.SERVICE_LOAD);
}
} else {
String message = String.format(
"One or more plugins are missing ServiceLoader manifests may not be usable with %s=%s: %s%n" +
"Read the documentation at %s for instructions on migrating your plugins " +
"to take advantage of the performance improvements of %s mode.",
WorkerConfig.PLUGIN_DISCOVERY_CONFIG,
PluginDiscoveryMode.SERVICE_LOAD,
missingPlugins.stream()
.map(pluginDesc -> pluginDesc.location() + "\t" + pluginDesc.className() + "\t" + pluginDesc.type() + "\t" + pluginDesc.version())
.collect(Collectors.joining("\n", "[\n", "\n]")),
"https://kafka.apache.org/documentation.html#connect_plugindiscovery",
PluginDiscoveryMode.SERVICE_LOAD
);
if (discoveryMode == PluginDiscoveryMode.HYBRID_WARN) {
log.warn("{} To silence this warning, set {}={} in the worker config.",
message, WorkerConfig.PLUGIN_DISCOVERY_CONFIG, PluginDiscoveryMode.ONLY_SCAN);
} else if (discoveryMode == PluginDiscoveryMode.HYBRID_FAIL) {
throw new ConnectException(String.format("%s To silence this error, set %s=%s in the worker config.",
message, WorkerConfig.PLUGIN_DISCOVERY_CONFIG, PluginDiscoveryMode.HYBRID_WARN));
}
}
}
private static <T> String pluginNames(Collection<PluginDesc<T>> plugins) {

View File

@ -0,0 +1,17 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.kafka.connect.tools.MockSinkConnector
org.apache.kafka.connect.tools.VerifiableSinkConnector

View File

@ -0,0 +1,18 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.kafka.connect.tools.MockSourceConnector
org.apache.kafka.connect.tools.SchemaSourceConnector
org.apache.kafka.connect.tools.VerifiableSourceConnector

View File

@ -0,0 +1,21 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.kafka.connect.converters.ByteArrayConverter
org.apache.kafka.connect.converters.DoubleConverter
org.apache.kafka.connect.converters.FloatConverter
org.apache.kafka.connect.converters.IntegerConverter
org.apache.kafka.connect.converters.LongConverter
org.apache.kafka.connect.converters.ShortConverter

View File

@ -0,0 +1,21 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.kafka.connect.converters.ByteArrayConverter
org.apache.kafka.connect.converters.DoubleConverter
org.apache.kafka.connect.converters.FloatConverter
org.apache.kafka.connect.converters.IntegerConverter
org.apache.kafka.connect.converters.LongConverter
org.apache.kafka.connect.converters.ShortConverter

View File

@ -31,6 +31,7 @@ import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.provider.ConfigProvider;
import org.apache.kafka.common.utils.LogCaptureAppender;
import org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.converters.ByteArrayConverter;
@ -46,6 +47,7 @@ import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.isolation.Plugins.ClassLoaderUsage;
import org.apache.kafka.connect.runtime.isolation.TestPlugins.TestPlugin;
import org.apache.kafka.connect.runtime.rest.RestServerConfig;
import org.apache.kafka.connect.sink.SinkConnector;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.ConverterConfig;
import org.apache.kafka.connect.storage.ConverterType;
@ -58,6 +60,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedSet;
import java.util.stream.Collectors;
import static org.junit.Assert.assertEquals;
@ -76,6 +79,9 @@ public class PluginsTest {
private TestConverter converter;
private TestHeaderConverter headerConverter;
private TestInternalConverter internalConverter;
private PluginScanResult nonEmpty;
private PluginScanResult empty;
private String missingPluginClass;
@Before
public void setup() {
@ -94,6 +100,22 @@ public class PluginsTest {
props.put(WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG, TestHeaderConverter.class.getName());
props.put("header.converter.extra.config", "baz");
// Set up some PluginScanResult instances to test the plugin discovery modes
SortedSet<PluginDesc<SinkConnector>> sinkConnectors = (SortedSet<PluginDesc<SinkConnector>>) plugins.sinkConnectors();
missingPluginClass = sinkConnectors.first().className();
nonEmpty = new PluginScanResult(
sinkConnectors,
Collections.emptySortedSet(),
Collections.emptySortedSet(),
Collections.emptySortedSet(),
Collections.emptySortedSet(),
Collections.emptySortedSet(),
Collections.emptySortedSet(),
Collections.emptySortedSet(),
Collections.emptySortedSet()
);
empty = new PluginScanResult(Collections.emptyList());
createConfig();
}
@ -476,6 +498,102 @@ public class PluginsTest {
}
}
@Test
public void testOnlyScanNoPlugins() {
try (LogCaptureAppender logCaptureAppender = LogCaptureAppender.createAndRegister(Plugins.class)) {
Plugins.maybeReportHybridDiscoveryIssue(PluginDiscoveryMode.ONLY_SCAN, empty, empty);
assertTrue(logCaptureAppender.getEvents().stream().noneMatch(e -> e.getLevel().contains("ERROR") || e.getLevel().equals("WARN")));
}
}
@Test
public void testOnlyScanWithPlugins() {
try (LogCaptureAppender logCaptureAppender = LogCaptureAppender.createAndRegister(Plugins.class)) {
Plugins.maybeReportHybridDiscoveryIssue(PluginDiscoveryMode.ONLY_SCAN, empty, nonEmpty);
assertTrue(logCaptureAppender.getEvents().stream().noneMatch(e -> e.getLevel().contains("ERROR") || e.getLevel().equals("WARN")));
}
}
@Test
public void testHybridWarnNoPlugins() {
try (LogCaptureAppender logCaptureAppender = LogCaptureAppender.createAndRegister(Plugins.class)) {
Plugins.maybeReportHybridDiscoveryIssue(PluginDiscoveryMode.HYBRID_WARN, empty, empty);
assertTrue(logCaptureAppender.getEvents().stream().anyMatch(e ->
e.getLevel().equals("WARN")
// These log messages must contain the config name, it is referenced in the documentation.
&& e.getMessage().contains(WorkerConfig.PLUGIN_DISCOVERY_CONFIG)
));
}
}
@Test
public void testHybridWarnWithPlugins() {
try (LogCaptureAppender logCaptureAppender = LogCaptureAppender.createAndRegister(Plugins.class)) {
Plugins.maybeReportHybridDiscoveryIssue(PluginDiscoveryMode.HYBRID_WARN, nonEmpty, nonEmpty);
assertTrue(logCaptureAppender.getEvents().stream().anyMatch(e ->
e.getLevel().equals("WARN")
&& !e.getMessage().contains(missingPluginClass)
&& e.getMessage().contains(WorkerConfig.PLUGIN_DISCOVERY_CONFIG)
));
}
}
@Test
public void testHybridWarnMissingPlugins() {
try (LogCaptureAppender logCaptureAppender = LogCaptureAppender.createAndRegister(Plugins.class)) {
Plugins.maybeReportHybridDiscoveryIssue(PluginDiscoveryMode.HYBRID_WARN, empty, nonEmpty);
assertTrue(logCaptureAppender.getEvents().stream().anyMatch(e ->
e.getLevel().equals("WARN")
&& e.getMessage().contains(missingPluginClass)
&& e.getMessage().contains(WorkerConfig.PLUGIN_DISCOVERY_CONFIG)
));
}
}
@Test
public void testHybridFailNoPlugins() {
try (LogCaptureAppender logCaptureAppender = LogCaptureAppender.createAndRegister(Plugins.class)) {
Plugins.maybeReportHybridDiscoveryIssue(PluginDiscoveryMode.HYBRID_FAIL, empty, empty);
assertTrue(logCaptureAppender.getEvents().stream().anyMatch(e ->
e.getLevel().equals("WARN")
&& e.getMessage().contains(WorkerConfig.PLUGIN_DISCOVERY_CONFIG)
));
}
}
@Test
public void testHybridFailWithPlugins() {
try (LogCaptureAppender logCaptureAppender = LogCaptureAppender.createAndRegister(Plugins.class)) {
Plugins.maybeReportHybridDiscoveryIssue(PluginDiscoveryMode.HYBRID_FAIL, nonEmpty, nonEmpty);
assertTrue(logCaptureAppender.getEvents().stream().anyMatch(e ->
e.getLevel().equals("WARN")
&& !e.getMessage().contains(missingPluginClass)
&& e.getMessage().contains(WorkerConfig.PLUGIN_DISCOVERY_CONFIG)
));
}
}
@Test
public void testHybridFailMissingPlugins() {
assertThrows(ConnectException.class, () -> Plugins.maybeReportHybridDiscoveryIssue(PluginDiscoveryMode.HYBRID_FAIL, empty, nonEmpty));
}
@Test
public void testServiceLoadNoPlugins() {
try (LogCaptureAppender logCaptureAppender = LogCaptureAppender.createAndRegister(Plugins.class)) {
Plugins.maybeReportHybridDiscoveryIssue(PluginDiscoveryMode.SERVICE_LOAD, empty, empty);
assertTrue(logCaptureAppender.getEvents().stream().noneMatch(e -> e.getLevel().contains("ERROR") || e.getLevel().equals("WARN")));
}
}
@Test
public void testServiceLoadWithPlugins() {
try (LogCaptureAppender logCaptureAppender = LogCaptureAppender.createAndRegister(Plugins.class)) {
Plugins.maybeReportHybridDiscoveryIssue(PluginDiscoveryMode.SERVICE_LOAD, nonEmpty, nonEmpty);
assertTrue(logCaptureAppender.getEvents().stream().noneMatch(e -> e.getLevel().contains("ERROR") || e.getLevel().equals("WARN")));
}
}
private void assertClassLoaderReadsVersionFromResource(
TestPlugin parentResource, TestPlugin childResource, String className, String... expectedVersions) {
URL[] systemPath = TestPlugins.pluginPath(parentResource)

View File

@ -44,13 +44,13 @@ import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -239,7 +239,8 @@ public class SynchronizationTest {
};
// THREAD 2: loads a class by delegating upward starting from the PluginClassLoader
String t2Class = JsonConverter.class.getName();
// Use any non-plugin class that no plugins depend on, so that the class isn't loaded during plugin discovery
String t2Class = Mockito.class.getName();
// PluginClassLoader breakpoint will only trigger on this thread
pclBreakpoint.set(t2Class::equals);
Runnable thread2 = () -> {

View File

@ -57,6 +57,7 @@ import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG;
import static org.apache.kafka.connect.runtime.WorkerConfig.BOOTSTRAP_SERVERS_CONFIG;
import static org.apache.kafka.connect.runtime.WorkerConfig.PLUGIN_DISCOVERY_CONFIG;
import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.CONFIG_STORAGE_REPLICATION_FACTOR_CONFIG;
import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.CONFIG_TOPIC_CONFIG;
import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.OFFSET_STORAGE_REPLICATION_FACTOR_CONFIG;
@ -276,6 +277,7 @@ public class EmbeddedConnectCluster {
putIfAbsent(workerProps, STATUS_STORAGE_REPLICATION_FACTOR_CONFIG, internalTopicsReplFactor);
putIfAbsent(workerProps, KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
putIfAbsent(workerProps, VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
putIfAbsent(workerProps, PLUGIN_DISCOVERY_CONFIG, "hybrid_fail");
for (int i = 0; i < numInitialWorkers; i++) {
addWorker();

View File

@ -0,0 +1,20 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.kafka.connect.integration.BlockingConnectorTest$BlockingSinkConnector
org.apache.kafka.connect.integration.BlockingConnectorTest$TaskInitializeBlockingSinkConnector
org.apache.kafka.connect.integration.ErrantRecordSinkConnector
org.apache.kafka.connect.integration.MonitorableSinkConnector
org.apache.kafka.connect.runtime.SampleSinkConnector

View File

@ -0,0 +1,25 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.kafka.connect.integration.BlockingConnectorTest$BlockingConnector
org.apache.kafka.connect.integration.BlockingConnectorTest$InitializeBlockingConnector
org.apache.kafka.connect.integration.BlockingConnectorTest$ConfigBlockingConnector
org.apache.kafka.connect.integration.BlockingConnectorTest$ValidateBlockingConnector
org.apache.kafka.connect.integration.BlockingConnectorTest$BlockingSourceConnector
org.apache.kafka.connect.integration.BlockingConnectorTest$TaskInitializeBlockingSourceConnector
org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest$NaughtyConnector
org.apache.kafka.connect.integration.MonitorableSourceConnector
org.apache.kafka.connect.runtime.SampleSourceConnector
org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResourceTest$ConnectorPluginsResourceTestConnector

View File

@ -0,0 +1,20 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.kafka.connect.runtime.SampleConverterWithHeaders
org.apache.kafka.connect.runtime.ErrorHandlingTaskTest$FaultyConverter
org.apache.kafka.connect.runtime.isolation.PluginsTest$TestConverter
org.apache.kafka.connect.runtime.isolation.PluginsTest$TestInternalConverter
org.apache.kafka.connect.runtime.isolation.PluginUtilsTest$CollidingConverter

View File

@ -0,0 +1,20 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.kafka.connect.runtime.SampleHeaderConverter
org.apache.kafka.connect.runtime.ErrorHandlingTaskTest$FaultyConverter
org.apache.kafka.connect.runtime.isolation.PluginsTest$TestHeaderConverter
org.apache.kafka.connect.runtime.isolation.PluginsTest$TestInternalConverter
org.apache.kafka.connect.runtime.isolation.PluginUtilsTest$CollidingHeaderConverter

View File

@ -0,0 +1,23 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.kafka.connect.integration.ErrorHandlingIntegrationTest$FaultyPassthrough
org.apache.kafka.connect.runtime.ErrorHandlingTaskTest$FaultyPassthrough
org.apache.kafka.connect.runtime.ConnectorConfigTest$SimpleTransformation
org.apache.kafka.connect.runtime.ConnectorConfigTest$HasDuplicateConfigTransformation
org.apache.kafka.connect.runtime.ConnectorConfigTest$AbstractKeyValueTransformation$Key
org.apache.kafka.connect.runtime.ConnectorConfigTest$AbstractKeyValueTransformation$Value
org.apache.kafka.connect.runtime.SampleTransformation
org.apache.kafka.connect.runtime.isolation.PluginUtilsTest$Colliding

View File

@ -0,0 +1,17 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.kafka.connect.runtime.ConnectorConfigTest$TestPredicate
org.apache.kafka.connect.runtime.SamplePredicate

View File

@ -0,0 +1,41 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.kafka.connect.transforms.Cast$Key
org.apache.kafka.connect.transforms.Cast$Value
org.apache.kafka.connect.transforms.DropHeaders
org.apache.kafka.connect.transforms.ExtractField$Key
org.apache.kafka.connect.transforms.ExtractField$Value
org.apache.kafka.connect.transforms.Filter
org.apache.kafka.connect.transforms.Flatten$Key
org.apache.kafka.connect.transforms.Flatten$Value
org.apache.kafka.connect.transforms.HeaderFrom$Key
org.apache.kafka.connect.transforms.HeaderFrom$Value
org.apache.kafka.connect.transforms.HoistField$Key
org.apache.kafka.connect.transforms.HoistField$Value
org.apache.kafka.connect.transforms.InsertField$Key
org.apache.kafka.connect.transforms.InsertField$Value
org.apache.kafka.connect.transforms.InsertHeader
org.apache.kafka.connect.transforms.MaskField$Key
org.apache.kafka.connect.transforms.MaskField$Value
org.apache.kafka.connect.transforms.RegexRouter
org.apache.kafka.connect.transforms.ReplaceField$Key
org.apache.kafka.connect.transforms.ReplaceField$Value
org.apache.kafka.connect.transforms.SetSchemaMetadata$Key
org.apache.kafka.connect.transforms.SetSchemaMetadata$Value
org.apache.kafka.connect.transforms.TimestampConverter$Key
org.apache.kafka.connect.transforms.TimestampConverter$Value
org.apache.kafka.connect.transforms.TimestampRouter
org.apache.kafka.connect.transforms.ValueToKey

View File

@ -0,0 +1,18 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.kafka.connect.transforms.predicates.HasHeaderKey
org.apache.kafka.connect.transforms.predicates.RecordIsTombstone
org.apache.kafka.connect.transforms.predicates.TopicNameMatches