KAFKA-12717: Remove internal Connect converter properties (KIP-738) (#10854)

Removed Connect Distributed worker's internal converter properties.

Author: Chris Egerton <chrise@confluent.io>
Reviewer: Randall Hauch <rhauch@gmail.com>
This commit is contained in:
Chris Egerton 2021-07-01 22:02:24 -04:00 committed by GitHub
parent 51796bcdef
commit cad2f5e120
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 73 additions and 184 deletions

View File

@ -36,6 +36,8 @@ import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePo
import org.apache.kafka.connect.connector.policy.ConnectorClientConfigRequest; import org.apache.kafka.connect.connector.policy.ConnectorClientConfigRequest;
import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.health.ConnectorType; import org.apache.kafka.connect.health.ConnectorType;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.json.JsonConverterConfig;
import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup; import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
import org.apache.kafka.connect.runtime.distributed.ClusterConfigState; import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
import org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter; import org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter;
@ -147,17 +149,9 @@ public class Worker {
this.connectorClientConfigOverridePolicy = connectorClientConfigOverridePolicy; this.connectorClientConfigOverridePolicy = connectorClientConfigOverridePolicy;
this.workerMetricsGroup = new WorkerMetricsGroup(metrics); this.workerMetricsGroup = new WorkerMetricsGroup(metrics);
// Internal converters are required properties, thus getClass won't return null. Map<String, String> internalConverterConfig = Collections.singletonMap(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, "false");
this.internalKeyConverter = plugins.newConverter( this.internalKeyConverter = plugins.newInternalConverter(true, JsonConverter.class.getName(), internalConverterConfig);
config, this.internalValueConverter = plugins.newInternalConverter(false, JsonConverter.class.getName(), internalConverterConfig);
WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG,
ClassLoaderUsage.PLUGINS
);
this.internalValueConverter = plugins.newConverter(
config,
WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG,
ClassLoaderUsage.PLUGINS
);
this.offsetBackingStore = offsetBackingStore; this.offsetBackingStore = offsetBackingStore;
this.offsetBackingStore.configure(config); this.offsetBackingStore.configure(config);

View File

@ -26,9 +26,6 @@ import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs; import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.Utils; import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.json.JsonConverterConfig;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.SimpleHeaderConverter; import org.apache.kafka.connect.storage.SimpleHeaderConverter;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -97,36 +94,6 @@ public class WorkerConfig extends AbstractConfig {
" header values to strings and deserialize them by inferring the schemas."; " header values to strings and deserialize them by inferring the schemas.";
public static final String HEADER_CONVERTER_CLASS_DEFAULT = SimpleHeaderConverter.class.getName(); public static final String HEADER_CONVERTER_CLASS_DEFAULT = SimpleHeaderConverter.class.getName();
/**
* @deprecated As of 2.0.0
*/
@Deprecated
public static final String INTERNAL_KEY_CONVERTER_CLASS_CONFIG = "internal.key.converter";
public static final String INTERNAL_KEY_CONVERTER_CLASS_DOC =
"Converter class used to convert between Kafka Connect format and the serialized form that is written to Kafka." +
" This controls the format of the keys in messages written to or read from Kafka, and since this is" +
" independent of connectors it allows any connector to work with any serialization format." +
" Examples of common formats include JSON and Avro." +
" This setting controls the format used for internal bookkeeping data used by the framework, such as" +
" configs and offsets, so users can typically use any functioning Converter implementation." +
" Deprecated; will be removed in an upcoming version.";
/**
* @deprecated As of 2.0.0
*/
@Deprecated
public static final String INTERNAL_VALUE_CONVERTER_CLASS_CONFIG = "internal.value.converter";
public static final String INTERNAL_VALUE_CONVERTER_CLASS_DOC =
"Converter class used to convert between Kafka Connect format and the serialized form that is written to Kafka." +
" This controls the format of the values in messages written to or read from Kafka, and since this is" +
" independent of connectors it allows any connector to work with any serialization format." +
" Examples of common formats include JSON and Avro." +
" This setting controls the format used for internal bookkeeping data used by the framework, such as" +
" configs and offsets, so users can typically use any functioning Converter implementation." +
" Deprecated; will be removed in an upcoming version.";
private static final Class<? extends Converter> INTERNAL_CONVERTER_DEFAULT = JsonConverter.class;
public static final String TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG public static final String TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG
= "task.shutdown.graceful.timeout.ms"; = "task.shutdown.graceful.timeout.ms";
private static final String TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_DOC = private static final String TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_DOC =
@ -275,10 +242,6 @@ public class WorkerConfig extends AbstractConfig {
Importance.HIGH, KEY_CONVERTER_CLASS_DOC) Importance.HIGH, KEY_CONVERTER_CLASS_DOC)
.define(VALUE_CONVERTER_CLASS_CONFIG, Type.CLASS, .define(VALUE_CONVERTER_CLASS_CONFIG, Type.CLASS,
Importance.HIGH, VALUE_CONVERTER_CLASS_DOC) Importance.HIGH, VALUE_CONVERTER_CLASS_DOC)
.define(INTERNAL_KEY_CONVERTER_CLASS_CONFIG, Type.CLASS, INTERNAL_CONVERTER_DEFAULT,
Importance.LOW, INTERNAL_KEY_CONVERTER_CLASS_DOC)
.define(INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, Type.CLASS, INTERNAL_CONVERTER_DEFAULT,
Importance.LOW, INTERNAL_VALUE_CONVERTER_CLASS_DOC)
.define(TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG, Type.LONG, .define(TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG, Type.LONG,
TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_DEFAULT, Importance.LOW, TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_DEFAULT, Importance.LOW,
TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_DOC) TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_DOC)
@ -341,55 +304,24 @@ public class WorkerConfig extends AbstractConfig {
.withClientSslSupport(); .withClientSslSupport();
} }
private void logInternalConverterDeprecationWarnings(Map<String, String> props) { private void logInternalConverterRemovalWarnings(Map<String, String> props) {
String[] deprecatedConfigs = new String[] { List<String> removedProperties = new ArrayList<>();
INTERNAL_KEY_CONVERTER_CLASS_CONFIG, for (String property : Arrays.asList("internal.key.converter", "internal.value.converter")) {
INTERNAL_VALUE_CONVERTER_CLASS_CONFIG if (props.containsKey(property)) {
}; removedProperties.add(property);
for (String config : deprecatedConfigs) {
if (props.containsKey(config)) {
Class<?> internalConverterClass = getClass(config);
logDeprecatedProperty(config, internalConverterClass.getCanonicalName(), INTERNAL_CONVERTER_DEFAULT.getCanonicalName(), null);
if (internalConverterClass.equals(INTERNAL_CONVERTER_DEFAULT)) {
// log the properties for this converter ...
for (Map.Entry<String, Object> propEntry : originalsWithPrefix(config + ".").entrySet()) {
String prop = propEntry.getKey();
String propValue = propEntry.getValue().toString();
String defaultValue = JsonConverterConfig.SCHEMAS_ENABLE_CONFIG.equals(prop) ? "false" : null;
logDeprecatedProperty(config + "." + prop, propValue, defaultValue, config);
}
}
} }
removedProperties.addAll(originalsWithPrefix(property + ".").keySet());
} }
} if (!removedProperties.isEmpty()) {
private void logDeprecatedProperty(String propName, String propValue, String defaultValue, String prefix) {
String prefixNotice = prefix != null
? " (along with all configuration for '" + prefix + "')"
: "";
if (defaultValue != null && defaultValue.equalsIgnoreCase(propValue)) {
log.info(
"Worker configuration property '{}'{} is deprecated and may be removed in an upcoming release. "
+ "The specified value '{}' matches the default, so this property can be safely removed from the worker configuration.",
propName,
prefixNotice,
propValue
);
} else if (defaultValue != null) {
log.warn( log.warn(
"Worker configuration property '{}'{} is deprecated and may be removed in an upcoming release. " "The worker has been configured with one or more internal converter properties ({}). "
+ "The specified value '{}' does NOT match the default and recommended value '{}'.", + "Support for these properties was deprecated in version 2.0 and removed in version 3.0, "
propName, + "and specifying them will have no effect. "
prefixNotice, + "Instead, an instance of the JsonConverter with schemas.enable "
propValue, + "set to false will be used. For more information, please visit "
defaultValue + "http://kafka.apache.org/documentation/#upgrade and consult the upgrade notes"
); + "for the 3.0 release.",
} else { removedProperties);
log.warn(
"Worker configuration property '{}'{} is deprecated and may be removed in an upcoming release.",
propName,
prefixNotice
);
} }
} }
@ -432,7 +364,7 @@ public class WorkerConfig extends AbstractConfig {
public WorkerConfig(ConfigDef definition, Map<String, String> props) { public WorkerConfig(ConfigDef definition, Map<String, String> props) {
super(definition, props); super(definition, props);
logInternalConverterDeprecationWarnings(props); logInternalConverterRemovalWarnings(props);
logPluginPathConfigProviderWarning(props); logPluginPathConfigProviderWarning(props);
} }

View File

@ -25,8 +25,6 @@ import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.connector.Connector; import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.connector.Task; import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.json.JsonConverterConfig;
import org.apache.kafka.connect.runtime.WorkerConfig; import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.storage.Converter; import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.ConverterConfig; import org.apache.kafka.connect.storage.ConverterConfig;
@ -121,12 +119,6 @@ public class Plugins {
); );
} }
@SuppressWarnings("deprecation")
protected static boolean isInternalConverter(String classPropertyName) {
return classPropertyName.equals(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG)
|| classPropertyName.equals(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG);
}
public static ClassLoader compareAndSwapLoaders(ClassLoader loader) { public static ClassLoader compareAndSwapLoaders(ClassLoader loader) {
ClassLoader current = Thread.currentThread().getContextClassLoader(); ClassLoader current = Thread.currentThread().getContextClassLoader();
if (!current.equals(loader)) { if (!current.equals(loader)) {
@ -238,11 +230,11 @@ public class Plugins {
* @throws ConnectException if the {@link Converter} implementation class could not be found * @throws ConnectException if the {@link Converter} implementation class could not be found
*/ */
public Converter newConverter(AbstractConfig config, String classPropertyName, ClassLoaderUsage classLoaderUsage) { public Converter newConverter(AbstractConfig config, String classPropertyName, ClassLoaderUsage classLoaderUsage) {
if (!config.originals().containsKey(classPropertyName) && !isInternalConverter(classPropertyName)) { if (!config.originals().containsKey(classPropertyName)) {
// This configuration does not define the converter via the specified property name, and // This configuration does not define the converter via the specified property name
// it does not represent an internal converter (which has a default available)
return null; return null;
} }
Class<? extends Converter> klass = null; Class<? extends Converter> klass = null;
switch (classLoaderUsage) { switch (classLoaderUsage) {
case CURRENT_CLASSLOADER: case CURRENT_CLASSLOADER:
@ -270,9 +262,7 @@ public class Plugins {
} }
// Determine whether this is a key or value converter based upon the supplied property name ... // Determine whether this is a key or value converter based upon the supplied property name ...
@SuppressWarnings("deprecation") final boolean isKeyConverter = WorkerConfig.KEY_CONVERTER_CLASS_CONFIG.equals(classPropertyName);
final boolean isKeyConverter = WorkerConfig.KEY_CONVERTER_CLASS_CONFIG.equals(classPropertyName)
|| WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG.equals(classPropertyName);
// Configure the Converter using only the old configuration mechanism ... // Configure the Converter using only the old configuration mechanism ...
String configPrefix = classPropertyName + "."; String configPrefix = classPropertyName + ".";
@ -280,22 +270,39 @@ public class Plugins {
log.debug("Configuring the {} converter with configuration keys:{}{}", log.debug("Configuring the {} converter with configuration keys:{}{}",
isKeyConverter ? "key" : "value", System.lineSeparator(), converterConfig.keySet()); isKeyConverter ? "key" : "value", System.lineSeparator(), converterConfig.keySet());
// Have to override schemas.enable from true to false for internal JSON converters Converter plugin;
// Don't have to warn the user about anything since all deprecation warnings take place in the ClassLoader savedLoader = compareAndSwapLoaders(klass.getClassLoader());
// WorkerConfig class try {
if (JsonConverter.class.isAssignableFrom(klass) && isInternalConverter(classPropertyName)) { plugin = newPlugin(klass);
// If they haven't explicitly specified values for internal.key.converter.schemas.enable plugin.configure(converterConfig, isKeyConverter);
// or internal.value.converter.schemas.enable, we can safely default them to false } finally {
if (!converterConfig.containsKey(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG)) { compareAndSwapLoaders(savedLoader);
converterConfig.put(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, false); }
} return plugin;
}
/**
* Load an internal converter, used by the worker for (de)serializing data in internal topics.
*
* @param isKey whether the converter is a key converter
* @param className the class name of the converter
* @param converterConfig the properties to configure the converter with
* @return the instantiated and configured {@link Converter}; never null
* @throws ConnectException if the {@link Converter} implementation class could not be found
*/
public Converter newInternalConverter(boolean isKey, String className, Map<String, String> converterConfig) {
Class<? extends Converter> klass;
try {
klass = pluginClass(delegatingLoader, className, Converter.class);
} catch (ClassNotFoundException e) {
throw new ConnectException("Failed to load internal converter class " + className);
} }
Converter plugin; Converter plugin;
ClassLoader savedLoader = compareAndSwapLoaders(klass.getClassLoader()); ClassLoader savedLoader = compareAndSwapLoaders(klass.getClassLoader());
try { try {
plugin = newPlugin(klass); plugin = newPlugin(klass);
plugin.configure(converterConfig, isKeyConverter); plugin.configure(converterConfig, isKey);
} finally { } finally {
compareAndSwapLoaders(savedLoader); compareAndSwapLoaders(savedLoader);
} }

View File

@ -46,8 +46,6 @@ public class ConnectMetricsTest {
static { static {
DEFAULT_WORKER_CONFIG.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter"); DEFAULT_WORKER_CONFIG.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
DEFAULT_WORKER_CONFIG.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter"); DEFAULT_WORKER_CONFIG.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
DEFAULT_WORKER_CONFIG.put(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
DEFAULT_WORKER_CONFIG.put(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
} }
private ConnectMetrics metrics; private ConnectMetrics metrics;

View File

@ -191,10 +191,6 @@ public class ErrorHandlingTaskTest {
Map<String, String> workerProps = new HashMap<>(); Map<String, String> workerProps = new HashMap<>();
workerProps.put("key.converter", "org.apache.kafka.connect.json.JsonConverter"); workerProps.put("key.converter", "org.apache.kafka.connect.json.JsonConverter");
workerProps.put("value.converter", "org.apache.kafka.connect.json.JsonConverter"); workerProps.put("value.converter", "org.apache.kafka.connect.json.JsonConverter");
workerProps.put("internal.key.converter", "org.apache.kafka.connect.json.JsonConverter");
workerProps.put("internal.value.converter", "org.apache.kafka.connect.json.JsonConverter");
workerProps.put("internal.key.converter.schemas.enable", "false");
workerProps.put("internal.value.converter.schemas.enable", "false");
workerProps.put("offset.storage.file.filename", "/tmp/connect.offsets"); workerProps.put("offset.storage.file.filename", "/tmp/connect.offsets");
workerProps.put(TOPIC_CREATION_ENABLE_CONFIG, String.valueOf(enableTopicCreation)); workerProps.put(TOPIC_CREATION_ENABLE_CONFIG, String.valueOf(enableTopicCreation));
pluginLoader = PowerMock.createMock(PluginClassLoader.class); pluginLoader = PowerMock.createMock(PluginClassLoader.class);

View File

@ -49,8 +49,6 @@ public class MockConnectMetrics extends ConnectMetrics {
static { static {
DEFAULT_WORKER_CONFIG.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter"); DEFAULT_WORKER_CONFIG.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
DEFAULT_WORKER_CONFIG.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter"); DEFAULT_WORKER_CONFIG.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
DEFAULT_WORKER_CONFIG.put(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
DEFAULT_WORKER_CONFIG.put(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
DEFAULT_WORKER_CONFIG.put(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName()); DEFAULT_WORKER_CONFIG.put(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
} }

View File

@ -67,10 +67,6 @@ public class SourceTaskOffsetCommitterTest extends ThreadedTest {
Map<String, String> workerProps = new HashMap<>(); Map<String, String> workerProps = new HashMap<>();
workerProps.put("key.converter", "org.apache.kafka.connect.json.JsonConverter"); workerProps.put("key.converter", "org.apache.kafka.connect.json.JsonConverter");
workerProps.put("value.converter", "org.apache.kafka.connect.json.JsonConverter"); workerProps.put("value.converter", "org.apache.kafka.connect.json.JsonConverter");
workerProps.put("internal.key.converter", "org.apache.kafka.connect.json.JsonConverter");
workerProps.put("internal.value.converter", "org.apache.kafka.connect.json.JsonConverter");
workerProps.put("internal.key.converter.schemas.enable", "false");
workerProps.put("internal.value.converter.schemas.enable", "false");
workerProps.put("offset.storage.file.filename", "/tmp/connect.offsets"); workerProps.put("offset.storage.file.filename", "/tmp/connect.offsets");
workerProps.put("offset.flush.interval.ms", workerProps.put("offset.flush.interval.ms",
Long.toString(DEFAULT_OFFSET_COMMIT_INTERVAL_MS)); Long.toString(DEFAULT_OFFSET_COMMIT_INTERVAL_MS));

View File

@ -162,10 +162,6 @@ public class WorkerSinkTaskTest {
Map<String, String> workerProps = new HashMap<>(); Map<String, String> workerProps = new HashMap<>();
workerProps.put("key.converter", "org.apache.kafka.connect.json.JsonConverter"); workerProps.put("key.converter", "org.apache.kafka.connect.json.JsonConverter");
workerProps.put("value.converter", "org.apache.kafka.connect.json.JsonConverter"); workerProps.put("value.converter", "org.apache.kafka.connect.json.JsonConverter");
workerProps.put("internal.key.converter", "org.apache.kafka.connect.json.JsonConverter");
workerProps.put("internal.value.converter", "org.apache.kafka.connect.json.JsonConverter");
workerProps.put("internal.key.converter.schemas.enable", "false");
workerProps.put("internal.value.converter.schemas.enable", "false");
workerProps.put("offset.storage.file.filename", "/tmp/connect.offsets"); workerProps.put("offset.storage.file.filename", "/tmp/connect.offsets");
workerConfig = new StandaloneConfig(workerProps); workerConfig = new StandaloneConfig(workerProps);
pluginLoader = PowerMock.createMock(PluginClassLoader.class); pluginLoader = PowerMock.createMock(PluginClassLoader.class);

View File

@ -133,10 +133,6 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
Map<String, String> workerProps = new HashMap<>(); Map<String, String> workerProps = new HashMap<>();
workerProps.put("key.converter", "org.apache.kafka.connect.json.JsonConverter"); workerProps.put("key.converter", "org.apache.kafka.connect.json.JsonConverter");
workerProps.put("value.converter", "org.apache.kafka.connect.json.JsonConverter"); workerProps.put("value.converter", "org.apache.kafka.connect.json.JsonConverter");
workerProps.put("internal.key.converter", "org.apache.kafka.connect.json.JsonConverter");
workerProps.put("internal.value.converter", "org.apache.kafka.connect.json.JsonConverter");
workerProps.put("internal.key.converter.schemas.enable", "false");
workerProps.put("internal.value.converter.schemas.enable", "false");
workerProps.put("offset.storage.file.filename", "/tmp/connect.offsets"); workerProps.put("offset.storage.file.filename", "/tmp/connect.offsets");
pluginLoader = PowerMock.createMock(PluginClassLoader.class); pluginLoader = PowerMock.createMock(PluginClassLoader.class);
workerConfig = new StandaloneConfig(workerProps); workerConfig = new StandaloneConfig(workerProps);

View File

@ -190,10 +190,6 @@ public class WorkerSourceTaskTest extends ThreadedTest {
Map<String, String> props = new HashMap<>(); Map<String, String> props = new HashMap<>();
props.put("key.converter", "org.apache.kafka.connect.json.JsonConverter"); props.put("key.converter", "org.apache.kafka.connect.json.JsonConverter");
props.put("value.converter", "org.apache.kafka.connect.json.JsonConverter"); props.put("value.converter", "org.apache.kafka.connect.json.JsonConverter");
props.put("internal.key.converter", "org.apache.kafka.connect.json.JsonConverter");
props.put("internal.value.converter", "org.apache.kafka.connect.json.JsonConverter");
props.put("internal.key.converter.schemas.enable", "false");
props.put("internal.value.converter.schemas.enable", "false");
props.put("offset.storage.file.filename", "/tmp/connect.offsets"); props.put("offset.storage.file.filename", "/tmp/connect.offsets");
props.put(TOPIC_CREATION_ENABLE_CONFIG, String.valueOf(enableTopicCreation)); props.put(TOPIC_CREATION_ENABLE_CONFIG, String.valueOf(enableTopicCreation));
return props; return props;

View File

@ -1420,17 +1420,17 @@ public class WorkerTest extends ThreadedTest {
// Instantiate and configure internal // Instantiate and configure internal
EasyMock.expect( EasyMock.expect(
plugins.newConverter( plugins.newInternalConverter(
config, EasyMock.eq(true),
WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, EasyMock.anyString(),
ClassLoaderUsage.PLUGINS EasyMock.anyObject()
) )
).andReturn(internalKeyConverter); ).andReturn(internalKeyConverter);
EasyMock.expect( EasyMock.expect(
plugins.newConverter( plugins.newInternalConverter(
config, EasyMock.eq(false),
WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, EasyMock.anyString(),
ClassLoaderUsage.PLUGINS EasyMock.anyObject()
) )
).andReturn(internalValueConverter); ).andReturn(internalValueConverter);
EasyMock.expectLastCall(); EasyMock.expectLastCall();

View File

@ -118,8 +118,6 @@ public class DistributedHerderTest {
// The WorkerConfig base class has some required settings without defaults // The WorkerConfig base class has some required settings without defaults
HERDER_CONFIG.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter"); HERDER_CONFIG.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
HERDER_CONFIG.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter"); HERDER_CONFIG.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
HERDER_CONFIG.put(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
HERDER_CONFIG.put(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
HERDER_CONFIG.put(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, "connect-offsets"); HERDER_CONFIG.put(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, "connect-offsets");
} }
private static final String MEMBER_URL = "memberUrl"; private static final String MEMBER_URL = "memberUrl";

View File

@ -77,10 +77,6 @@ public class PluginsTest {
props.put("value.converter." + JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, "true"); props.put("value.converter." + JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, "true");
props.put("key.converter.extra.config", "foo1"); props.put("key.converter.extra.config", "foo1");
props.put("value.converter.extra.config", "foo2"); props.put("value.converter.extra.config", "foo2");
props.put(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, TestInternalConverter.class.getName());
props.put(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, TestInternalConverter.class.getName());
props.put("internal.key.converter.extra.config", "bar1");
props.put("internal.value.converter.extra.config", "bar2");
props.put(WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG, TestHeaderConverter.class.getName()); props.put(WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG, TestHeaderConverter.class.getName());
props.put("header.converter.extra.config", "baz"); props.put("header.converter.extra.config", "baz");
@ -107,17 +103,9 @@ public class PluginsTest {
@SuppressWarnings("deprecation") @SuppressWarnings("deprecation")
@Test @Test
public void shouldInstantiateAndConfigureInternalConverters() { public void shouldInstantiateAndConfigureInternalConverters() {
instantiateAndConfigureInternalConverter(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, ClassLoaderUsage.CURRENT_CLASSLOADER); instantiateAndConfigureInternalConverter(true, Collections.singletonMap(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, "false"));
// Validate schemas.enable is defaulted to false for internal converter // Validate schemas.enable is set to false
assertEquals(false, internalConverter.configs.get(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG)); assertEquals("false", internalConverter.configs.get(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG));
// Validate internal converter properties can still be set
assertEquals("bar1", internalConverter.configs.get("extra.config"));
instantiateAndConfigureInternalConverter(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, ClassLoaderUsage.PLUGINS);
// Validate schemas.enable is defaulted to false for internal converter
assertEquals(false, internalConverter.configs.get(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG));
// Validate internal converter properties can still be set
assertEquals("bar2", internalConverter.configs.get("extra.config"));
} }
@Test @Test
@ -375,8 +363,8 @@ public class PluginsTest {
assertNotNull(headerConverter); assertNotNull(headerConverter);
} }
protected void instantiateAndConfigureInternalConverter(String configPropName, ClassLoaderUsage classLoaderUsage) { protected void instantiateAndConfigureInternalConverter(boolean isKey, Map<String, String> config) {
internalConverter = (TestInternalConverter) plugins.newConverter(config, configPropName, classLoaderUsage); internalConverter = (TestInternalConverter) plugins.newInternalConverter(isKey, TestInternalConverter.class.getName(), config);
assertNotNull(internalConverter); assertNotNull(internalConverter);
} }
@ -475,6 +463,7 @@ public class PluginsTest {
public static class TestInternalConverter extends JsonConverter { public static class TestInternalConverter extends JsonConverter {
public Map<String, ?> configs; public Map<String, ?> configs;
@Override
public void configure(Map<String, ?> configs) { public void configure(Map<String, ?> configs) {
this.configs = configs; this.configs = configs;
super.configure(configs); super.configure(configs);

View File

@ -89,8 +89,6 @@ public class RestServerTest {
workerProps.put(DistributedConfig.GROUP_ID_CONFIG, "connect-test-group"); workerProps.put(DistributedConfig.GROUP_ID_CONFIG, "connect-test-group");
workerProps.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter"); workerProps.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
workerProps.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter"); workerProps.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
workerProps.put(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
workerProps.put(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
workerProps.put(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, "connect-offsets"); workerProps.put(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, "connect-offsets");
workerProps.put(WorkerConfig.LISTENERS_CONFIG, "HTTP://localhost:0"); workerProps.put(WorkerConfig.LISTENERS_CONFIG, "HTTP://localhost:0");

View File

@ -40,8 +40,6 @@ public class SSLUtilsTest {
DEFAULT_CONFIG.put(DistributedConfig.GROUP_ID_CONFIG, "connect-test-group"); DEFAULT_CONFIG.put(DistributedConfig.GROUP_ID_CONFIG, "connect-test-group");
DEFAULT_CONFIG.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter"); DEFAULT_CONFIG.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
DEFAULT_CONFIG.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter"); DEFAULT_CONFIG.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
DEFAULT_CONFIG.put(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
DEFAULT_CONFIG.put(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
DEFAULT_CONFIG.put(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, "connect-offsets"); DEFAULT_CONFIG.put(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, "connect-offsets");
} }
@ -140,8 +138,6 @@ public class SSLUtilsTest {
configMap.put(StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG, "/tmp/offset/file"); configMap.put(StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG, "/tmp/offset/file");
configMap.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter"); configMap.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
configMap.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter"); configMap.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
configMap.put(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
configMap.put(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
configMap.put("ssl.keystore.location", "/path/to/keystore"); configMap.put("ssl.keystore.location", "/path/to/keystore");
configMap.put("ssl.keystore.password", "123456"); configMap.put("ssl.keystore.password", "123456");
configMap.put("ssl.key.password", "123456"); configMap.put("ssl.key.password", "123456");
@ -170,8 +166,6 @@ public class SSLUtilsTest {
configMap.put(StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG, "/tmp/offset/file"); configMap.put(StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG, "/tmp/offset/file");
configMap.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter"); configMap.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
configMap.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter"); configMap.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
configMap.put(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
configMap.put(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
configMap.put("ssl.keystore.location", "/path/to/keystore"); configMap.put("ssl.keystore.location", "/path/to/keystore");
configMap.put("ssl.keystore.password", "123456"); configMap.put("ssl.keystore.password", "123456");
configMap.put("ssl.key.password", "123456"); configMap.put("ssl.key.password", "123456");

View File

@ -64,8 +64,6 @@ public class FileOffsetBackingStoreTest {
props.put(StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG, tempFile.getAbsolutePath()); props.put(StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG, tempFile.getAbsolutePath());
props.put(StandaloneConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter"); props.put(StandaloneConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
props.put(StandaloneConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter"); props.put(StandaloneConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
props.put(StandaloneConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
props.put(StandaloneConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
config = new StandaloneConfig(props); config = new StandaloneConfig(props);
store.configure(config); store.configure(config);
store.start(); store.start();

View File

@ -89,8 +89,6 @@ public class KafkaConfigBackingStoreTest {
DEFAULT_CONFIG_STORAGE_PROPS.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "broker1:9092,broker2:9093"); DEFAULT_CONFIG_STORAGE_PROPS.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "broker1:9092,broker2:9093");
DEFAULT_CONFIG_STORAGE_PROPS.put(DistributedConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter"); DEFAULT_CONFIG_STORAGE_PROPS.put(DistributedConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
DEFAULT_CONFIG_STORAGE_PROPS.put(DistributedConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter"); DEFAULT_CONFIG_STORAGE_PROPS.put(DistributedConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
DEFAULT_CONFIG_STORAGE_PROPS.put(DistributedConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
DEFAULT_CONFIG_STORAGE_PROPS.put(DistributedConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
DEFAULT_DISTRIBUTED_CONFIG = new DistributedConfig(DEFAULT_CONFIG_STORAGE_PROPS); DEFAULT_DISTRIBUTED_CONFIG = new DistributedConfig(DEFAULT_CONFIG_STORAGE_PROPS);
} }

View File

@ -82,8 +82,6 @@ public class KafkaOffsetBackingStoreTest {
DEFAULT_PROPS.put(DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG, "status-topic"); DEFAULT_PROPS.put(DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG, "status-topic");
DEFAULT_PROPS.put(DistributedConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter"); DEFAULT_PROPS.put(DistributedConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
DEFAULT_PROPS.put(DistributedConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter"); DEFAULT_PROPS.put(DistributedConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
DEFAULT_PROPS.put(DistributedConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
DEFAULT_PROPS.put(DistributedConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
DEFAULT_DISTRIBUTED_CONFIG = new DistributedConfig(DEFAULT_PROPS); DEFAULT_DISTRIBUTED_CONFIG = new DistributedConfig(DEFAULT_PROPS);
} }

View File

@ -83,6 +83,13 @@
understood by brokers or version 2.5 or higher, so you must upgrade your kafka cluster to get the stronger semantics. Otherwise, you can just pass understood by brokers or version 2.5 or higher, so you must upgrade your kafka cluster to get the stronger semantics. Otherwise, you can just pass
in <code>new ConsumerGroupMetadata(consumerGroupId)</code> to work with older brokers. See <a href="https://cwiki.apache.org/confluence/x/zJONCg">KIP-732</a> for more details. in <code>new ConsumerGroupMetadata(consumerGroupId)</code> to work with older brokers. See <a href="https://cwiki.apache.org/confluence/x/zJONCg">KIP-732</a> for more details.
</li> </li>
<li>
The Connect <code>internal.key.converter</code> and <code>internal.value.converter</code> properties have been completely <a href="https://cwiki.apache.org/confluence/x/2YDOCg">removed</a>.
The use of these Connect worker properties has been deprecated since version 2.0.0.
Workers are now hardcoded to use the JSON converter with <code>schemas.enable</code> set to <code>false</code>. If your cluster has been using
a different internal key or value converter, you can follow the migration steps outlined in <a href="https://cwiki.apache.org/confluence/x/2YDOCg">KIP-738</a>
to safely upgrade your Connect cluster to 3.0.
</li>
<li> The Connect-based MirrorMaker (MM2) includes changes to support <code>IdentityReplicationPolicy</code>, enabling replication without renaming topics. <li> The Connect-based MirrorMaker (MM2) includes changes to support <code>IdentityReplicationPolicy</code>, enabling replication without renaming topics.
The existing <code>DefaultReplicationPolicy</code> is still used by default, but identity replication can be enabled via the The existing <code>DefaultReplicationPolicy</code> is still used by default, but identity replication can be enabled via the
<code>replication.policy</code> configuration property. This is especially useful for users migrating from the older MirrorMaker (MM1), or for <code>replication.policy</code> configuration property. This is especially useful for users migrating from the older MirrorMaker (MM1), or for