From 9242723e4cdfdbdcbd76a66b9965c25c2fbf6abf Mon Sep 17 00:00:00 2001 From: Xuan-Zhang Gong Date: Wed, 11 Sep 2024 14:34:05 +0800 Subject: [PATCH] KAFKA-17435 remove use.incremental.alter.configs (#17027) Reviewers: Chia-Ping Tsai --- .../connect/mirror/MirrorSourceConfig.java | 24 ---- .../connect/mirror/MirrorSourceConnector.java | 90 ++++---------- .../mirror/MirrorSourceConnectorTest.java | 112 ------------------ docs/upgrade.html | 4 +- 4 files changed, 23 insertions(+), 207 deletions(-) diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConfig.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConfig.java index 18951c58428..974db4057b8 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConfig.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConfig.java @@ -74,19 +74,6 @@ public class MirrorSourceConfig extends MirrorConnectorConfig { public static final String SYNC_TOPIC_CONFIGS_INTERVAL_SECONDS = SYNC_TOPIC_CONFIGS + INTERVAL_SECONDS_SUFFIX; private static final String SYNC_TOPIC_CONFIGS_INTERVAL_SECONDS_DOC = "Frequency of topic config sync."; public static final long SYNC_TOPIC_CONFIGS_INTERVAL_SECONDS_DEFAULT = 10 * 60; - @Deprecated - public static final String USE_INCREMENTAL_ALTER_CONFIGS = "use.incremental.alter.configs"; - private static final String USE_INCREMENTAL_ALTER_CONFIG_DOC = "Deprecated. Which API to use for syncing topic configs. " + - "The valid values are 'requested', 'required' and 'never'. " + - "By default, set to 'requested', which means the IncrementalAlterConfigs API is being used for syncing topic configurations " + - "and if any request receives an error from an incompatible broker, it will fallback to using the deprecated AlterConfigs API. " + - "If explicitly set to 'required', the IncrementalAlterConfigs API is used without the fallback logic and +" + - "if it receives an error from an incompatible broker, the connector will fail." + - "If explicitly set to 'never', the AlterConfig is always used." + - "This setting will be removed and the behaviour of 'required' will be used in Kafka 4.0, therefore users should ensure that target broker is at least 2.3.0"; - public static final String REQUEST_INCREMENTAL_ALTER_CONFIGS = "requested"; - public static final String REQUIRE_INCREMENTAL_ALTER_CONFIGS = "required"; - public static final String NEVER_USE_INCREMENTAL_ALTER_CONFIGS = "never"; public static final String SYNC_TOPIC_ACLS_ENABLED = SYNC_TOPIC_ACLS + ENABLED_SUFFIX; private static final String SYNC_TOPIC_ACLS_ENABLED_DOC = "Whether to periodically configure remote topic ACLs to match their corresponding upstream topics."; @@ -183,10 +170,6 @@ public class MirrorSourceConfig extends MirrorConnectorConfig { } } - String useIncrementalAlterConfigs() { - return getString(USE_INCREMENTAL_ALTER_CONFIGS); - } - Duration syncTopicAclsInterval() { if (getBoolean(SYNC_TOPIC_ACLS_ENABLED)) { return Duration.ofSeconds(getLong(SYNC_TOPIC_ACLS_INTERVAL_SECONDS)); @@ -298,13 +281,6 @@ public class MirrorSourceConfig extends MirrorConnectorConfig { SYNC_TOPIC_CONFIGS_INTERVAL_SECONDS_DEFAULT, ConfigDef.Importance.LOW, SYNC_TOPIC_CONFIGS_INTERVAL_SECONDS_DOC) - .define( - USE_INCREMENTAL_ALTER_CONFIGS, - ConfigDef.Type.STRING, - REQUEST_INCREMENTAL_ALTER_CONFIGS, - in(REQUEST_INCREMENTAL_ALTER_CONFIGS, REQUIRE_INCREMENTAL_ALTER_CONFIGS, NEVER_USE_INCREMENTAL_ALTER_CONFIGS), - ConfigDef.Importance.LOW, - USE_INCREMENTAL_ALTER_CONFIG_DOC) .define( SYNC_TOPIC_ACLS_ENABLED, ConfigDef.Type.BOOLEAN, diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java index 425e4b7749e..14929bd1750 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java @@ -66,7 +66,6 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -106,7 +105,6 @@ public class MirrorSourceConnector extends SourceConnector { private int replicationFactor; private Admin sourceAdminClient; private Admin targetAdminClient; - private volatile boolean useIncrementalAlterConfigs; public MirrorSourceConnector() { // nop @@ -127,18 +125,6 @@ public class MirrorSourceConnector extends SourceConnector { this.configPropertyFilter = configPropertyFilter; } - // visible for testing the deprecated setting "use.incremental.alter.configs" - // this constructor should be removed when the deprecated setting is removed in Kafka 4.0 - MirrorSourceConnector(SourceAndTarget sourceAndTarget, ReplicationPolicy replicationPolicy, - MirrorSourceConfig config, ConfigPropertyFilter configPropertyFilter, Admin targetAdmin) { - this.sourceAndTarget = sourceAndTarget; - this.replicationPolicy = replicationPolicy; - this.configPropertyFilter = configPropertyFilter; - this.config = config; - this.useIncrementalAlterConfigs = !config.useIncrementalAlterConfigs().equals(MirrorSourceConfig.NEVER_USE_INCREMENTAL_ALTER_CONFIGS); - this.targetAdminClient = targetAdmin; - } - // visible for testing MirrorSourceConnector(Admin sourceAdminClient, Admin targetAdminClient, MirrorSourceConfig config) { this.sourceAdminClient = sourceAdminClient; @@ -161,7 +147,6 @@ public class MirrorSourceConnector extends SourceConnector { replicationFactor = config.replicationFactor(); sourceAdminClient = config.forwardingAdmin(config.sourceAdminConfig("replication-source-admin")); targetAdminClient = config.forwardingAdmin(config.targetAdminConfig("replication-target-admin")); - useIncrementalAlterConfigs = !config.useIncrementalAlterConfigs().equals(MirrorSourceConfig.NEVER_USE_INCREMENTAL_ALTER_CONFIGS); scheduler = new Scheduler(getClass(), config.entityLabel(), config.adminTimeout()); scheduler.execute(this::createOffsetSyncsTopic, "creating upstream offset-syncs topic"); @@ -441,15 +426,10 @@ public class MirrorSourceConnector extends SourceConnector { // visible for testing void syncTopicConfigs() throws InterruptedException, ExecutionException { - boolean incremental = useIncrementalAlterConfigs; Map sourceConfigs = describeTopicConfigs(topicsBeingReplicated()); Map targetConfigs = sourceConfigs.entrySet().stream() - .collect(Collectors.toMap(x -> formatRemoteTopic(x.getKey()), x -> targetConfig(x.getValue(), incremental))); - if (incremental) { - incrementalAlterConfigs(targetConfigs); - } else { - deprecatedAlterConfigs(targetConfigs); - } + .collect(Collectors.toMap(x -> formatRemoteTopic(x.getKey()), x -> targetConfig(x.getValue(), true))); + incrementalAlterConfigs(targetConfigs); } private void createOffsetSyncsTopic() { @@ -607,26 +587,6 @@ public class MirrorSourceConnector extends SourceConnector { .collect(Collectors.toMap(ConfigEntry::name, ConfigEntry::value)); } - // visible for testing - // use deprecated alterConfigs API for broker compatibility back to 0.11.0 - @SuppressWarnings("deprecation") - void deprecatedAlterConfigs(Map topicConfigs) throws ExecutionException, InterruptedException { - Map configs = topicConfigs.entrySet().stream() - .collect(Collectors.toMap(x -> - new ConfigResource(ConfigResource.Type.TOPIC, x.getKey()), Entry::getValue)); - log.trace("Syncing configs for {} topics.", configs.size()); - adminCall( - () -> { - targetAdminClient.alterConfigs(configs).values().forEach((k, v) -> v.whenComplete((x, e) -> { - if (e != null) { - log.warn("Could not alter configuration of topic {}.", k.name(), e); - } - })); - return null; - }, - () -> String.format("alter topic configs %s on %s cluster", topicConfigs, config.targetClusterAlias()) - ); - } // visible for testing void incrementalAlterConfigs(Map topicConfigs) throws ExecutionException, InterruptedException { @@ -644,34 +604,24 @@ public class MirrorSourceConnector extends SourceConnector { configOps.put(configResource, ops); } log.trace("Syncing configs for {} topics.", configOps.size()); - AtomicReference encounteredError = new AtomicReference<>(false); - adminCall( - () -> { - targetAdminClient.incrementalAlterConfigs(configOps).values().forEach((k, v) -> v.whenComplete((x, e) -> { - if (e != null) { - if (config.useIncrementalAlterConfigs().equals(MirrorSourceConfig.REQUEST_INCREMENTAL_ALTER_CONFIGS) - && e instanceof UnsupportedVersionException && !encounteredError.get()) { - //Fallback logic - log.warn("The target cluster {} is not compatible with IncrementalAlterConfigs API. " - + "Therefore using deprecated AlterConfigs API for syncing configs for topic {}", - sourceAndTarget.target(), k.name(), e); - encounteredError.set(true); - useIncrementalAlterConfigs = false; - } else if (config.useIncrementalAlterConfigs().equals(MirrorSourceConfig.REQUIRE_INCREMENTAL_ALTER_CONFIGS) - && e instanceof UnsupportedVersionException && !encounteredError.get()) { - log.error("Failed to sync configs for topic {} on cluster {} with IncrementalAlterConfigs API", k.name(), sourceAndTarget.target(), e); - encounteredError.set(true); - context.raiseError(new ConnectException("use.incremental.alter.configs was set to \"required\", but the target cluster '" - + sourceAndTarget.target() + "' is not compatible with IncrementalAlterConfigs API", e)); - } else { - log.warn("Could not alter configuration of topic {}.", k.name(), e); - } - } - })); - return null; - }, - () -> String.format("incremental alter topic configs %s on %s cluster", topicConfigs, config.targetClusterAlias()) - ); + adminCall(() -> { + targetAdminClient.incrementalAlterConfigs(configOps).values() + .forEach((k, v) -> v.whenComplete((x, e) -> { + if (e instanceof UnsupportedVersionException) { + log.error("Failed to sync configs for topic {} on cluster {} with " + + "IncrementalAlterConfigs API", k.name(), sourceAndTarget.target(), e); + context.raiseError(new ConnectException("the target cluster '" + + sourceAndTarget.target() + "' is not compatible with " + + "IncrementalAlterConfigs " + + "API", e)); + } else { + log.warn("Could not alter configuration of topic {}.", k.name(), e); + } + })); + return null; + }, + () -> String.format("incremental alter topic configs %s on %s cluster", topicConfigs, + config.targetClusterAlias())); } private void updateTopicAcls(List bindings) throws ExecutionException, InterruptedException { diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java index 8b0eebab663..fae73092a7f 100644 --- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java @@ -17,7 +17,6 @@ package org.apache.kafka.connect.mirror; import org.apache.kafka.clients.admin.Admin; -import org.apache.kafka.clients.admin.AlterConfigOp; import org.apache.kafka.clients.admin.Config; import org.apache.kafka.clients.admin.ConfigEntry; import org.apache.kafka.clients.admin.DescribeAclsResult; @@ -33,7 +32,6 @@ import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.config.ConfigValue; import org.apache.kafka.common.errors.SecurityDisabledException; import org.apache.kafka.common.errors.TopicAuthorizationException; -import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.resource.PatternType; import org.apache.kafka.common.resource.ResourcePattern; import org.apache.kafka.common.resource.ResourceType; @@ -59,7 +57,6 @@ import java.util.concurrent.ExecutionException; import java.util.function.Function; import java.util.stream.Collectors; -import static org.apache.kafka.clients.admin.AdminClientTestUtils.alterConfigsResult; import static org.apache.kafka.clients.consumer.ConsumerConfig.ISOLATION_LEVEL_CONFIG; import static org.apache.kafka.connect.mirror.MirrorConnectorConfig.CONSUMER_CLIENT_PREFIX; import static org.apache.kafka.connect.mirror.MirrorConnectorConfig.SOURCE_PREFIX; @@ -76,14 +73,12 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.ArgumentMatchers.isA; import static org.mockito.Mockito.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.eq; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -368,113 +363,6 @@ public class MirrorSourceConnectorTest { verify(connector).createNewTopics(any(), any()); } - @Test - @Deprecated - public void testIncrementalAlterConfigsRequested() throws Exception { - Map props = makeProps(); - props.put(MirrorSourceConfig.USE_INCREMENTAL_ALTER_CONFIGS, MirrorSourceConfig.REQUEST_INCREMENTAL_ALTER_CONFIGS); - MirrorSourceConfig connectorConfig = new MirrorSourceConfig(props); - - Admin admin = mock(Admin.class); - MirrorSourceConnector connector = spy(new MirrorSourceConnector(new SourceAndTarget("source", "target"), - new DefaultReplicationPolicy(), connectorConfig, new DefaultConfigPropertyFilter(), admin)); - final String topic = "testtopic"; - List entries = Collections.singletonList(new ConfigEntry("name-1", "value-1")); - Config config = new Config(entries); - doReturn(Collections.singletonMap(topic, config)).when(connector).describeTopicConfigs(any()); - doReturn(alterConfigsResult(new ConfigResource(ConfigResource.Type.TOPIC, topic), new UnsupportedVersionException("Unsupported API"))).when(admin).incrementalAlterConfigs(any()); - doNothing().when(connector).deprecatedAlterConfigs(any()); - connector.syncTopicConfigs(); - Map topicConfigs = Collections.singletonMap("source." + topic, config); - verify(connector).incrementalAlterConfigs(topicConfigs); - - // the next time we sync topic configurations, expect to use the deprecated API - connector.syncTopicConfigs(); - verify(connector, times(1)).deprecatedAlterConfigs(topicConfigs); - } - - @Test - @Deprecated - public void testIncrementalAlterConfigsRequired() throws Exception { - Map props = makeProps(); - props.put(MirrorSourceConfig.USE_INCREMENTAL_ALTER_CONFIGS, MirrorSourceConfig.REQUIRE_INCREMENTAL_ALTER_CONFIGS); - MirrorSourceConfig connectorConfig = new MirrorSourceConfig(props); - - Admin admin = mock(Admin.class); - MirrorSourceConnector connector = spy(new MirrorSourceConnector(new SourceAndTarget("source", "target"), - new DefaultReplicationPolicy(), connectorConfig, new DefaultConfigPropertyFilter(), admin)); - final String topic = "testtopic"; - List entries = new ArrayList<>(); - ConfigEntry entryWithNonDefaultValue = new ConfigEntry("name-1", "value-1"); - ConfigEntry entryWithDefaultValue = new ConfigEntry("name-2", "value-2", ConfigEntry.ConfigSource.DEFAULT_CONFIG, false, false, - Collections.emptyList(), ConfigEntry.ConfigType.STRING, ""); - entries.add(entryWithNonDefaultValue); - entries.add(entryWithDefaultValue); - Config config = new Config(entries); - doReturn(Collections.singletonMap(topic, config)).when(connector).describeTopicConfigs(any()); - - doAnswer(invocation -> { - Map> configOps = invocation.getArgument(0); - assertNotNull(configOps); - assertEquals(1, configOps.size()); - - ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, "source." + topic); - Collection ops = new ArrayList<>(); - ops.add(new AlterConfigOp(entryWithNonDefaultValue, AlterConfigOp.OpType.SET)); - ops.add(new AlterConfigOp(entryWithDefaultValue, AlterConfigOp.OpType.DELETE)); - - assertEquals(ops, configOps.get(configResource)); - - return alterConfigsResult(configResource); - }).when(admin).incrementalAlterConfigs(any()); - - connector.syncTopicConfigs(); - Map topicConfigs = Collections.singletonMap("source." + topic, config); - verify(connector).incrementalAlterConfigs(topicConfigs); - } - - @Test - @Deprecated - public void testIncrementalAlterConfigsRequiredButUnsupported() throws Exception { - Map props = makeProps(); - props.put(MirrorSourceConfig.USE_INCREMENTAL_ALTER_CONFIGS, MirrorSourceConfig.REQUIRE_INCREMENTAL_ALTER_CONFIGS); - MirrorSourceConfig connectorConfig = new MirrorSourceConfig(props); - - Admin admin = mock(Admin.class); - ConnectorContext connectorContext = mock(ConnectorContext.class); - MirrorSourceConnector connector = spy(new MirrorSourceConnector(new SourceAndTarget("source", "target"), - new DefaultReplicationPolicy(), connectorConfig, new DefaultConfigPropertyFilter(), admin)); - connector.initialize(connectorContext); - final String topic = "testtopic"; - List entries = Collections.singletonList(new ConfigEntry("name-1", "value-1")); - Config config = new Config(entries); - doReturn(Collections.singletonMap(topic, config)).when(connector).describeTopicConfigs(any()); - doReturn(alterConfigsResult(new ConfigResource(ConfigResource.Type.TOPIC, topic), new UnsupportedVersionException("Unsupported API"))).when(admin).incrementalAlterConfigs(any()); - - connector.syncTopicConfigs(); - verify(connectorContext).raiseError(isA(ConnectException.class)); - } - - - @Test - @Deprecated - public void testIncrementalAlterConfigsNeverUsed() throws Exception { - Map props = makeProps(); - props.put(MirrorSourceConfig.USE_INCREMENTAL_ALTER_CONFIGS, MirrorSourceConfig.NEVER_USE_INCREMENTAL_ALTER_CONFIGS); - MirrorSourceConfig connectorConfigs = new MirrorSourceConfig(props); - - MirrorSourceConnector connector = spy(new MirrorSourceConnector(new SourceAndTarget("source", "target"), - new DefaultReplicationPolicy(), connectorConfigs, new DefaultConfigPropertyFilter(), null)); - final String topic = "testtopic"; - List entries = Collections.singletonList(new ConfigEntry("name-1", "value-1")); - Config config = new Config(entries); - doReturn(Collections.singletonMap(topic, config)).when(connector).describeTopicConfigs(any()); - doNothing().when(connector).deprecatedAlterConfigs(any()); - connector.syncTopicConfigs(); - Map topicConfigs = Collections.singletonMap("source." + topic, config); - verify(connector).deprecatedAlterConfigs(topicConfigs); - verify(connector, never()).incrementalAlterConfigs(any()); - } @Test public void testMirrorSourceConnectorTaskConfig() { diff --git a/docs/upgrade.html b/docs/upgrade.html index 9e0699cbabc..e3b0dd10282 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -22,7 +22,7 @@

Upgrading to 4.0.0 from any version 0.8.x through 3.9.x

Notable changes in 4.0.0
    -
  • A number of deprecated classes, methods and tools have been removed from the clients, connect, core and tools modules:
  • +
  • A number of deprecated classes, methods,configurations and tools have been removed from the clients, connect, core and tools modules:
    • The original MirrorMaker (MM1) and related classes have been removed. Please use the Connect-based MirrorMaker (MM2), as described in the @@ -33,6 +33,8 @@ org.apache.kafka.tools.api.RecordReader interface to build custom readers for the kafka-console-producer tool.
    • +
    • Remove use.incremental.alter.configs. The modified behavior is identical to the previous required configuration, + therefore users should ensure that target broker is at least 2.3.0
    • The kafka.tools.DefaultMessageFormatter class has been removed. Please use the org.apache.kafka.tools.consumer.DefaultMessageFormatter class instead.