diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java index 3dc8c198f5c..73201b5add9 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java @@ -36,6 +36,8 @@ import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePo import org.apache.kafka.connect.connector.policy.ConnectorClientConfigRequest; import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.health.ConnectorType; +import org.apache.kafka.connect.json.JsonConverter; +import org.apache.kafka.connect.json.JsonConverterConfig; import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup; import org.apache.kafka.connect.runtime.distributed.ClusterConfigState; import org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter; @@ -147,17 +149,9 @@ public class Worker { this.connectorClientConfigOverridePolicy = connectorClientConfigOverridePolicy; this.workerMetricsGroup = new WorkerMetricsGroup(metrics); - // Internal converters are required properties, thus getClass won't return null. - this.internalKeyConverter = plugins.newConverter( - config, - WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, - ClassLoaderUsage.PLUGINS - ); - this.internalValueConverter = plugins.newConverter( - config, - WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, - ClassLoaderUsage.PLUGINS - ); + Map internalConverterConfig = Collections.singletonMap(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, "false"); + this.internalKeyConverter = plugins.newInternalConverter(true, JsonConverter.class.getName(), internalConverterConfig); + this.internalValueConverter = plugins.newInternalConverter(false, JsonConverter.class.getName(), internalConverterConfig); this.offsetBackingStore = offsetBackingStore; this.offsetBackingStore.configure(config); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java index 341582a3c9e..73b743bbe11 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java @@ -26,9 +26,6 @@ import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.config.internals.BrokerSecurityConfigs; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.utils.Utils; -import org.apache.kafka.connect.json.JsonConverter; -import org.apache.kafka.connect.json.JsonConverterConfig; -import org.apache.kafka.connect.storage.Converter; import org.apache.kafka.connect.storage.SimpleHeaderConverter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -97,36 +94,6 @@ public class WorkerConfig extends AbstractConfig { " header values to strings and deserialize them by inferring the schemas."; public static final String HEADER_CONVERTER_CLASS_DEFAULT = SimpleHeaderConverter.class.getName(); - /** - * @deprecated As of 2.0.0 - */ - @Deprecated - public static final String INTERNAL_KEY_CONVERTER_CLASS_CONFIG = "internal.key.converter"; - public static final String INTERNAL_KEY_CONVERTER_CLASS_DOC = - "Converter class used to convert between Kafka Connect format and the serialized form that is written to Kafka." + - " This controls the format of the keys in messages written to or read from Kafka, and since this is" + - " independent of connectors it allows any connector to work with any serialization format." + - " Examples of common formats include JSON and Avro." + - " This setting controls the format used for internal bookkeeping data used by the framework, such as" + - " configs and offsets, so users can typically use any functioning Converter implementation." + - " Deprecated; will be removed in an upcoming version."; - - /** - * @deprecated As of 2.0.0 - */ - @Deprecated - public static final String INTERNAL_VALUE_CONVERTER_CLASS_CONFIG = "internal.value.converter"; - public static final String INTERNAL_VALUE_CONVERTER_CLASS_DOC = - "Converter class used to convert between Kafka Connect format and the serialized form that is written to Kafka." + - " This controls the format of the values in messages written to or read from Kafka, and since this is" + - " independent of connectors it allows any connector to work with any serialization format." + - " Examples of common formats include JSON and Avro." + - " This setting controls the format used for internal bookkeeping data used by the framework, such as" + - " configs and offsets, so users can typically use any functioning Converter implementation." + - " Deprecated; will be removed in an upcoming version."; - - private static final Class INTERNAL_CONVERTER_DEFAULT = JsonConverter.class; - public static final String TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG = "task.shutdown.graceful.timeout.ms"; private static final String TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_DOC = @@ -275,10 +242,6 @@ public class WorkerConfig extends AbstractConfig { Importance.HIGH, KEY_CONVERTER_CLASS_DOC) .define(VALUE_CONVERTER_CLASS_CONFIG, Type.CLASS, Importance.HIGH, VALUE_CONVERTER_CLASS_DOC) - .define(INTERNAL_KEY_CONVERTER_CLASS_CONFIG, Type.CLASS, INTERNAL_CONVERTER_DEFAULT, - Importance.LOW, INTERNAL_KEY_CONVERTER_CLASS_DOC) - .define(INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, Type.CLASS, INTERNAL_CONVERTER_DEFAULT, - Importance.LOW, INTERNAL_VALUE_CONVERTER_CLASS_DOC) .define(TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG, Type.LONG, TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_DEFAULT, Importance.LOW, TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_DOC) @@ -341,55 +304,24 @@ public class WorkerConfig extends AbstractConfig { .withClientSslSupport(); } - private void logInternalConverterDeprecationWarnings(Map props) { - String[] deprecatedConfigs = new String[] { - INTERNAL_KEY_CONVERTER_CLASS_CONFIG, - INTERNAL_VALUE_CONVERTER_CLASS_CONFIG - }; - for (String config : deprecatedConfigs) { - if (props.containsKey(config)) { - Class internalConverterClass = getClass(config); - logDeprecatedProperty(config, internalConverterClass.getCanonicalName(), INTERNAL_CONVERTER_DEFAULT.getCanonicalName(), null); - if (internalConverterClass.equals(INTERNAL_CONVERTER_DEFAULT)) { - // log the properties for this converter ... - for (Map.Entry propEntry : originalsWithPrefix(config + ".").entrySet()) { - String prop = propEntry.getKey(); - String propValue = propEntry.getValue().toString(); - String defaultValue = JsonConverterConfig.SCHEMAS_ENABLE_CONFIG.equals(prop) ? "false" : null; - logDeprecatedProperty(config + "." + prop, propValue, defaultValue, config); - } - } + private void logInternalConverterRemovalWarnings(Map props) { + List removedProperties = new ArrayList<>(); + for (String property : Arrays.asList("internal.key.converter", "internal.value.converter")) { + if (props.containsKey(property)) { + removedProperties.add(property); } + removedProperties.addAll(originalsWithPrefix(property + ".").keySet()); } - } - - private void logDeprecatedProperty(String propName, String propValue, String defaultValue, String prefix) { - String prefixNotice = prefix != null - ? " (along with all configuration for '" + prefix + "')" - : ""; - if (defaultValue != null && defaultValue.equalsIgnoreCase(propValue)) { - log.info( - "Worker configuration property '{}'{} is deprecated and may be removed in an upcoming release. " - + "The specified value '{}' matches the default, so this property can be safely removed from the worker configuration.", - propName, - prefixNotice, - propValue - ); - } else if (defaultValue != null) { + if (!removedProperties.isEmpty()) { log.warn( - "Worker configuration property '{}'{} is deprecated and may be removed in an upcoming release. " - + "The specified value '{}' does NOT match the default and recommended value '{}'.", - propName, - prefixNotice, - propValue, - defaultValue - ); - } else { - log.warn( - "Worker configuration property '{}'{} is deprecated and may be removed in an upcoming release.", - propName, - prefixNotice - ); + "The worker has been configured with one or more internal converter properties ({}). " + + "Support for these properties was deprecated in version 2.0 and removed in version 3.0, " + + "and specifying them will have no effect. " + + "Instead, an instance of the JsonConverter with schemas.enable " + + "set to false will be used. For more information, please visit " + + "http://kafka.apache.org/documentation/#upgrade and consult the upgrade notes" + + "for the 3.0 release.", + removedProperties); } } @@ -432,7 +364,7 @@ public class WorkerConfig extends AbstractConfig { public WorkerConfig(ConfigDef definition, Map props) { super(definition, props); - logInternalConverterDeprecationWarnings(props); + logInternalConverterRemovalWarnings(props); logPluginPathConfigProviderWarning(props); } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java index 6ab8a766044..5ad8dac5681 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java @@ -25,8 +25,6 @@ import org.apache.kafka.connect.connector.ConnectRecord; import org.apache.kafka.connect.connector.Connector; import org.apache.kafka.connect.connector.Task; import org.apache.kafka.connect.errors.ConnectException; -import org.apache.kafka.connect.json.JsonConverter; -import org.apache.kafka.connect.json.JsonConverterConfig; import org.apache.kafka.connect.runtime.WorkerConfig; import org.apache.kafka.connect.storage.Converter; import org.apache.kafka.connect.storage.ConverterConfig; @@ -121,12 +119,6 @@ public class Plugins { ); } - @SuppressWarnings("deprecation") - protected static boolean isInternalConverter(String classPropertyName) { - return classPropertyName.equals(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG) - || classPropertyName.equals(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG); - } - public static ClassLoader compareAndSwapLoaders(ClassLoader loader) { ClassLoader current = Thread.currentThread().getContextClassLoader(); if (!current.equals(loader)) { @@ -238,11 +230,11 @@ public class Plugins { * @throws ConnectException if the {@link Converter} implementation class could not be found */ public Converter newConverter(AbstractConfig config, String classPropertyName, ClassLoaderUsage classLoaderUsage) { - if (!config.originals().containsKey(classPropertyName) && !isInternalConverter(classPropertyName)) { - // This configuration does not define the converter via the specified property name, and - // it does not represent an internal converter (which has a default available) + if (!config.originals().containsKey(classPropertyName)) { + // This configuration does not define the converter via the specified property name return null; } + Class klass = null; switch (classLoaderUsage) { case CURRENT_CLASSLOADER: @@ -270,9 +262,7 @@ public class Plugins { } // Determine whether this is a key or value converter based upon the supplied property name ... - @SuppressWarnings("deprecation") - final boolean isKeyConverter = WorkerConfig.KEY_CONVERTER_CLASS_CONFIG.equals(classPropertyName) - || WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG.equals(classPropertyName); + final boolean isKeyConverter = WorkerConfig.KEY_CONVERTER_CLASS_CONFIG.equals(classPropertyName); // Configure the Converter using only the old configuration mechanism ... String configPrefix = classPropertyName + "."; @@ -280,22 +270,39 @@ public class Plugins { log.debug("Configuring the {} converter with configuration keys:{}{}", isKeyConverter ? "key" : "value", System.lineSeparator(), converterConfig.keySet()); - // Have to override schemas.enable from true to false for internal JSON converters - // Don't have to warn the user about anything since all deprecation warnings take place in the - // WorkerConfig class - if (JsonConverter.class.isAssignableFrom(klass) && isInternalConverter(classPropertyName)) { - // If they haven't explicitly specified values for internal.key.converter.schemas.enable - // or internal.value.converter.schemas.enable, we can safely default them to false - if (!converterConfig.containsKey(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG)) { - converterConfig.put(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, false); - } + Converter plugin; + ClassLoader savedLoader = compareAndSwapLoaders(klass.getClassLoader()); + try { + plugin = newPlugin(klass); + plugin.configure(converterConfig, isKeyConverter); + } finally { + compareAndSwapLoaders(savedLoader); + } + return plugin; + } + + /** + * Load an internal converter, used by the worker for (de)serializing data in internal topics. + * + * @param isKey whether the converter is a key converter + * @param className the class name of the converter + * @param converterConfig the properties to configure the converter with + * @return the instantiated and configured {@link Converter}; never null + * @throws ConnectException if the {@link Converter} implementation class could not be found + */ + public Converter newInternalConverter(boolean isKey, String className, Map converterConfig) { + Class klass; + try { + klass = pluginClass(delegatingLoader, className, Converter.class); + } catch (ClassNotFoundException e) { + throw new ConnectException("Failed to load internal converter class " + className); } Converter plugin; ClassLoader savedLoader = compareAndSwapLoaders(klass.getClassLoader()); try { plugin = newPlugin(klass); - plugin.configure(converterConfig, isKeyConverter); + plugin.configure(converterConfig, isKey); } finally { compareAndSwapLoaders(savedLoader); } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectMetricsTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectMetricsTest.java index 31aca97e204..98bf8e023b3 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectMetricsTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectMetricsTest.java @@ -46,8 +46,6 @@ public class ConnectMetricsTest { static { DEFAULT_WORKER_CONFIG.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter"); DEFAULT_WORKER_CONFIG.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter"); - DEFAULT_WORKER_CONFIG.put(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter"); - DEFAULT_WORKER_CONFIG.put(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter"); } private ConnectMetrics metrics; diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java index 33ebd3df391..c6f79fe2859 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java @@ -191,10 +191,6 @@ public class ErrorHandlingTaskTest { Map workerProps = new HashMap<>(); workerProps.put("key.converter", "org.apache.kafka.connect.json.JsonConverter"); workerProps.put("value.converter", "org.apache.kafka.connect.json.JsonConverter"); - workerProps.put("internal.key.converter", "org.apache.kafka.connect.json.JsonConverter"); - workerProps.put("internal.value.converter", "org.apache.kafka.connect.json.JsonConverter"); - workerProps.put("internal.key.converter.schemas.enable", "false"); - workerProps.put("internal.value.converter.schemas.enable", "false"); workerProps.put("offset.storage.file.filename", "/tmp/connect.offsets"); workerProps.put(TOPIC_CREATION_ENABLE_CONFIG, String.valueOf(enableTopicCreation)); pluginLoader = PowerMock.createMock(PluginClassLoader.class); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/MockConnectMetrics.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/MockConnectMetrics.java index fa49358baaf..4abbc64259e 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/MockConnectMetrics.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/MockConnectMetrics.java @@ -49,8 +49,6 @@ public class MockConnectMetrics extends ConnectMetrics { static { DEFAULT_WORKER_CONFIG.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter"); DEFAULT_WORKER_CONFIG.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter"); - DEFAULT_WORKER_CONFIG.put(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter"); - DEFAULT_WORKER_CONFIG.put(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter"); DEFAULT_WORKER_CONFIG.put(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName()); } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitterTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitterTest.java index fb7ed82af73..278a73d16d4 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitterTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitterTest.java @@ -67,10 +67,6 @@ public class SourceTaskOffsetCommitterTest extends ThreadedTest { Map workerProps = new HashMap<>(); workerProps.put("key.converter", "org.apache.kafka.connect.json.JsonConverter"); workerProps.put("value.converter", "org.apache.kafka.connect.json.JsonConverter"); - workerProps.put("internal.key.converter", "org.apache.kafka.connect.json.JsonConverter"); - workerProps.put("internal.value.converter", "org.apache.kafka.connect.json.JsonConverter"); - workerProps.put("internal.key.converter.schemas.enable", "false"); - workerProps.put("internal.value.converter.schemas.enable", "false"); workerProps.put("offset.storage.file.filename", "/tmp/connect.offsets"); workerProps.put("offset.flush.interval.ms", Long.toString(DEFAULT_OFFSET_COMMIT_INTERVAL_MS)); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java index b7492d31f50..651ef5b072b 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java @@ -162,10 +162,6 @@ public class WorkerSinkTaskTest { Map workerProps = new HashMap<>(); workerProps.put("key.converter", "org.apache.kafka.connect.json.JsonConverter"); workerProps.put("value.converter", "org.apache.kafka.connect.json.JsonConverter"); - workerProps.put("internal.key.converter", "org.apache.kafka.connect.json.JsonConverter"); - workerProps.put("internal.value.converter", "org.apache.kafka.connect.json.JsonConverter"); - workerProps.put("internal.key.converter.schemas.enable", "false"); - workerProps.put("internal.value.converter.schemas.enable", "false"); workerProps.put("offset.storage.file.filename", "/tmp/connect.offsets"); workerConfig = new StandaloneConfig(workerProps); pluginLoader = PowerMock.createMock(PluginClassLoader.class); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java index 7b4474bc656..5918747fd19 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java @@ -133,10 +133,6 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest { Map workerProps = new HashMap<>(); workerProps.put("key.converter", "org.apache.kafka.connect.json.JsonConverter"); workerProps.put("value.converter", "org.apache.kafka.connect.json.JsonConverter"); - workerProps.put("internal.key.converter", "org.apache.kafka.connect.json.JsonConverter"); - workerProps.put("internal.value.converter", "org.apache.kafka.connect.json.JsonConverter"); - workerProps.put("internal.key.converter.schemas.enable", "false"); - workerProps.put("internal.value.converter.schemas.enable", "false"); workerProps.put("offset.storage.file.filename", "/tmp/connect.offsets"); pluginLoader = PowerMock.createMock(PluginClassLoader.class); workerConfig = new StandaloneConfig(workerProps); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java index 4d358ca46b1..640c1d8259e 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java @@ -190,10 +190,6 @@ public class WorkerSourceTaskTest extends ThreadedTest { Map props = new HashMap<>(); props.put("key.converter", "org.apache.kafka.connect.json.JsonConverter"); props.put("value.converter", "org.apache.kafka.connect.json.JsonConverter"); - props.put("internal.key.converter", "org.apache.kafka.connect.json.JsonConverter"); - props.put("internal.value.converter", "org.apache.kafka.connect.json.JsonConverter"); - props.put("internal.key.converter.schemas.enable", "false"); - props.put("internal.value.converter.schemas.enable", "false"); props.put("offset.storage.file.filename", "/tmp/connect.offsets"); props.put(TOPIC_CREATION_ENABLE_CONFIG, String.valueOf(enableTopicCreation)); return props; diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java index 171f16db996..fe89745bec7 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java @@ -1420,17 +1420,17 @@ public class WorkerTest extends ThreadedTest { // Instantiate and configure internal EasyMock.expect( - plugins.newConverter( - config, - WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, - ClassLoaderUsage.PLUGINS + plugins.newInternalConverter( + EasyMock.eq(true), + EasyMock.anyString(), + EasyMock.anyObject() ) ).andReturn(internalKeyConverter); EasyMock.expect( - plugins.newConverter( - config, - WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, - ClassLoaderUsage.PLUGINS + plugins.newInternalConverter( + EasyMock.eq(false), + EasyMock.anyString(), + EasyMock.anyObject() ) ).andReturn(internalValueConverter); EasyMock.expectLastCall(); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java index 99c41e6ccb7..245bb754439 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java @@ -118,8 +118,6 @@ public class DistributedHerderTest { // The WorkerConfig base class has some required settings without defaults HERDER_CONFIG.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter"); HERDER_CONFIG.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter"); - HERDER_CONFIG.put(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter"); - HERDER_CONFIG.put(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter"); HERDER_CONFIG.put(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, "connect-offsets"); } private static final String MEMBER_URL = "memberUrl"; diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java index 43081aa069a..5083a2d971e 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java @@ -77,10 +77,6 @@ public class PluginsTest { props.put("value.converter." + JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, "true"); props.put("key.converter.extra.config", "foo1"); props.put("value.converter.extra.config", "foo2"); - props.put(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, TestInternalConverter.class.getName()); - props.put(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, TestInternalConverter.class.getName()); - props.put("internal.key.converter.extra.config", "bar1"); - props.put("internal.value.converter.extra.config", "bar2"); props.put(WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG, TestHeaderConverter.class.getName()); props.put("header.converter.extra.config", "baz"); @@ -107,17 +103,9 @@ public class PluginsTest { @SuppressWarnings("deprecation") @Test public void shouldInstantiateAndConfigureInternalConverters() { - instantiateAndConfigureInternalConverter(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, ClassLoaderUsage.CURRENT_CLASSLOADER); - // Validate schemas.enable is defaulted to false for internal converter - assertEquals(false, internalConverter.configs.get(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG)); - // Validate internal converter properties can still be set - assertEquals("bar1", internalConverter.configs.get("extra.config")); - - instantiateAndConfigureInternalConverter(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, ClassLoaderUsage.PLUGINS); - // Validate schemas.enable is defaulted to false for internal converter - assertEquals(false, internalConverter.configs.get(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG)); - // Validate internal converter properties can still be set - assertEquals("bar2", internalConverter.configs.get("extra.config")); + instantiateAndConfigureInternalConverter(true, Collections.singletonMap(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, "false")); + // Validate schemas.enable is set to false + assertEquals("false", internalConverter.configs.get(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG)); } @Test @@ -375,8 +363,8 @@ public class PluginsTest { assertNotNull(headerConverter); } - protected void instantiateAndConfigureInternalConverter(String configPropName, ClassLoaderUsage classLoaderUsage) { - internalConverter = (TestInternalConverter) plugins.newConverter(config, configPropName, classLoaderUsage); + protected void instantiateAndConfigureInternalConverter(boolean isKey, Map config) { + internalConverter = (TestInternalConverter) plugins.newInternalConverter(isKey, TestInternalConverter.class.getName(), config); assertNotNull(internalConverter); } @@ -475,6 +463,7 @@ public class PluginsTest { public static class TestInternalConverter extends JsonConverter { public Map configs; + @Override public void configure(Map configs) { this.configs = configs; super.configure(configs); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java index e69bffdbcf9..e889b888bf0 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java @@ -89,8 +89,6 @@ public class RestServerTest { workerProps.put(DistributedConfig.GROUP_ID_CONFIG, "connect-test-group"); workerProps.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter"); workerProps.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter"); - workerProps.put(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter"); - workerProps.put(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter"); workerProps.put(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, "connect-offsets"); workerProps.put(WorkerConfig.LISTENERS_CONFIG, "HTTP://localhost:0"); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/util/SSLUtilsTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/util/SSLUtilsTest.java index 8959a6c6e0b..b8ffbcffbed 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/util/SSLUtilsTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/util/SSLUtilsTest.java @@ -40,8 +40,6 @@ public class SSLUtilsTest { DEFAULT_CONFIG.put(DistributedConfig.GROUP_ID_CONFIG, "connect-test-group"); DEFAULT_CONFIG.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter"); DEFAULT_CONFIG.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter"); - DEFAULT_CONFIG.put(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter"); - DEFAULT_CONFIG.put(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter"); DEFAULT_CONFIG.put(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, "connect-offsets"); } @@ -140,8 +138,6 @@ public class SSLUtilsTest { configMap.put(StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG, "/tmp/offset/file"); configMap.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter"); configMap.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter"); - configMap.put(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter"); - configMap.put(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter"); configMap.put("ssl.keystore.location", "/path/to/keystore"); configMap.put("ssl.keystore.password", "123456"); configMap.put("ssl.key.password", "123456"); @@ -170,8 +166,6 @@ public class SSLUtilsTest { configMap.put(StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG, "/tmp/offset/file"); configMap.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter"); configMap.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter"); - configMap.put(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter"); - configMap.put(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter"); configMap.put("ssl.keystore.location", "/path/to/keystore"); configMap.put("ssl.keystore.password", "123456"); configMap.put("ssl.key.password", "123456"); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/FileOffsetBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/FileOffsetBackingStoreTest.java index 571844d488d..9944a5d8f9b 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/FileOffsetBackingStoreTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/FileOffsetBackingStoreTest.java @@ -64,8 +64,6 @@ public class FileOffsetBackingStoreTest { props.put(StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG, tempFile.getAbsolutePath()); props.put(StandaloneConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter"); props.put(StandaloneConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter"); - props.put(StandaloneConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter"); - props.put(StandaloneConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter"); config = new StandaloneConfig(props); store.configure(config); store.start(); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java index 0d4ff24d74c..662f0f8365c 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java @@ -89,8 +89,6 @@ public class KafkaConfigBackingStoreTest { DEFAULT_CONFIG_STORAGE_PROPS.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "broker1:9092,broker2:9093"); DEFAULT_CONFIG_STORAGE_PROPS.put(DistributedConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter"); DEFAULT_CONFIG_STORAGE_PROPS.put(DistributedConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter"); - DEFAULT_CONFIG_STORAGE_PROPS.put(DistributedConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter"); - DEFAULT_CONFIG_STORAGE_PROPS.put(DistributedConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter"); DEFAULT_DISTRIBUTED_CONFIG = new DistributedConfig(DEFAULT_CONFIG_STORAGE_PROPS); } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java index 07eadc87893..cdce2a186a2 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java @@ -82,8 +82,6 @@ public class KafkaOffsetBackingStoreTest { DEFAULT_PROPS.put(DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG, "status-topic"); DEFAULT_PROPS.put(DistributedConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter"); DEFAULT_PROPS.put(DistributedConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter"); - DEFAULT_PROPS.put(DistributedConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter"); - DEFAULT_PROPS.put(DistributedConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter"); DEFAULT_DISTRIBUTED_CONFIG = new DistributedConfig(DEFAULT_PROPS); } diff --git a/docs/upgrade.html b/docs/upgrade.html index 4f2a3d8e08e..678984cf6b5 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -83,6 +83,13 @@ understood by brokers or version 2.5 or higher, so you must upgrade your kafka cluster to get the stronger semantics. Otherwise, you can just pass in new ConsumerGroupMetadata(consumerGroupId) to work with older brokers. See KIP-732 for more details. +
  • + The Connect internal.key.converter and internal.value.converter properties have been completely removed. + The use of these Connect worker properties has been deprecated since version 2.0.0. + Workers are now hardcoded to use the JSON converter with schemas.enable set to false. If your cluster has been using + a different internal key or value converter, you can follow the migration steps outlined in KIP-738 + to safely upgrade your Connect cluster to 3.0. +
  • The Connect-based MirrorMaker (MM2) includes changes to support IdentityReplicationPolicy, enabling replication without renaming topics. The existing DefaultReplicationPolicy is still used by default, but identity replication can be enabled via the replication.policy configuration property. This is especially useful for users migrating from the older MirrorMaker (MM1), or for