KAFKA-17435 remove use.incremental.alter.configs (#17027)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Xuan-Zhang Gong 2024-09-11 14:34:05 +08:00 committed by GitHub
parent 11d8069fcd
commit 9242723e4c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 23 additions and 207 deletions

View File

@ -74,19 +74,6 @@ public class MirrorSourceConfig extends MirrorConnectorConfig {
public static final String SYNC_TOPIC_CONFIGS_INTERVAL_SECONDS = SYNC_TOPIC_CONFIGS + INTERVAL_SECONDS_SUFFIX;
private static final String SYNC_TOPIC_CONFIGS_INTERVAL_SECONDS_DOC = "Frequency of topic config sync.";
public static final long SYNC_TOPIC_CONFIGS_INTERVAL_SECONDS_DEFAULT = 10 * 60;
@Deprecated
public static final String USE_INCREMENTAL_ALTER_CONFIGS = "use.incremental.alter.configs";
private static final String USE_INCREMENTAL_ALTER_CONFIG_DOC = "Deprecated. Which API to use for syncing topic configs. " +
"The valid values are 'requested', 'required' and 'never'. " +
"By default, set to 'requested', which means the IncrementalAlterConfigs API is being used for syncing topic configurations " +
"and if any request receives an error from an incompatible broker, it will fallback to using the deprecated AlterConfigs API. " +
"If explicitly set to 'required', the IncrementalAlterConfigs API is used without the fallback logic and +" +
"if it receives an error from an incompatible broker, the connector will fail." +
"If explicitly set to 'never', the AlterConfig is always used." +
"This setting will be removed and the behaviour of 'required' will be used in Kafka 4.0, therefore users should ensure that target broker is at least 2.3.0";
public static final String REQUEST_INCREMENTAL_ALTER_CONFIGS = "requested";
public static final String REQUIRE_INCREMENTAL_ALTER_CONFIGS = "required";
public static final String NEVER_USE_INCREMENTAL_ALTER_CONFIGS = "never";
public static final String SYNC_TOPIC_ACLS_ENABLED = SYNC_TOPIC_ACLS + ENABLED_SUFFIX;
private static final String SYNC_TOPIC_ACLS_ENABLED_DOC = "Whether to periodically configure remote topic ACLs to match their corresponding upstream topics.";
@ -183,10 +170,6 @@ public class MirrorSourceConfig extends MirrorConnectorConfig {
}
}
String useIncrementalAlterConfigs() {
return getString(USE_INCREMENTAL_ALTER_CONFIGS);
}
Duration syncTopicAclsInterval() {
if (getBoolean(SYNC_TOPIC_ACLS_ENABLED)) {
return Duration.ofSeconds(getLong(SYNC_TOPIC_ACLS_INTERVAL_SECONDS));
@ -298,13 +281,6 @@ public class MirrorSourceConfig extends MirrorConnectorConfig {
SYNC_TOPIC_CONFIGS_INTERVAL_SECONDS_DEFAULT,
ConfigDef.Importance.LOW,
SYNC_TOPIC_CONFIGS_INTERVAL_SECONDS_DOC)
.define(
USE_INCREMENTAL_ALTER_CONFIGS,
ConfigDef.Type.STRING,
REQUEST_INCREMENTAL_ALTER_CONFIGS,
in(REQUEST_INCREMENTAL_ALTER_CONFIGS, REQUIRE_INCREMENTAL_ALTER_CONFIGS, NEVER_USE_INCREMENTAL_ALTER_CONFIGS),
ConfigDef.Importance.LOW,
USE_INCREMENTAL_ALTER_CONFIG_DOC)
.define(
SYNC_TOPIC_ACLS_ENABLED,
ConfigDef.Type.BOOLEAN,

View File

@ -66,7 +66,6 @@ import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@ -106,7 +105,6 @@ public class MirrorSourceConnector extends SourceConnector {
private int replicationFactor;
private Admin sourceAdminClient;
private Admin targetAdminClient;
private volatile boolean useIncrementalAlterConfigs;
public MirrorSourceConnector() {
// nop
@ -127,18 +125,6 @@ public class MirrorSourceConnector extends SourceConnector {
this.configPropertyFilter = configPropertyFilter;
}
// visible for testing the deprecated setting "use.incremental.alter.configs"
// this constructor should be removed when the deprecated setting is removed in Kafka 4.0
MirrorSourceConnector(SourceAndTarget sourceAndTarget, ReplicationPolicy replicationPolicy,
MirrorSourceConfig config, ConfigPropertyFilter configPropertyFilter, Admin targetAdmin) {
this.sourceAndTarget = sourceAndTarget;
this.replicationPolicy = replicationPolicy;
this.configPropertyFilter = configPropertyFilter;
this.config = config;
this.useIncrementalAlterConfigs = !config.useIncrementalAlterConfigs().equals(MirrorSourceConfig.NEVER_USE_INCREMENTAL_ALTER_CONFIGS);
this.targetAdminClient = targetAdmin;
}
// visible for testing
MirrorSourceConnector(Admin sourceAdminClient, Admin targetAdminClient, MirrorSourceConfig config) {
this.sourceAdminClient = sourceAdminClient;
@ -161,7 +147,6 @@ public class MirrorSourceConnector extends SourceConnector {
replicationFactor = config.replicationFactor();
sourceAdminClient = config.forwardingAdmin(config.sourceAdminConfig("replication-source-admin"));
targetAdminClient = config.forwardingAdmin(config.targetAdminConfig("replication-target-admin"));
useIncrementalAlterConfigs = !config.useIncrementalAlterConfigs().equals(MirrorSourceConfig.NEVER_USE_INCREMENTAL_ALTER_CONFIGS);
scheduler = new Scheduler(getClass(), config.entityLabel(), config.adminTimeout());
scheduler.execute(this::createOffsetSyncsTopic, "creating upstream offset-syncs topic");
@ -441,15 +426,10 @@ public class MirrorSourceConnector extends SourceConnector {
// visible for testing
void syncTopicConfigs()
throws InterruptedException, ExecutionException {
boolean incremental = useIncrementalAlterConfigs;
Map<String, Config> sourceConfigs = describeTopicConfigs(topicsBeingReplicated());
Map<String, Config> targetConfigs = sourceConfigs.entrySet().stream()
.collect(Collectors.toMap(x -> formatRemoteTopic(x.getKey()), x -> targetConfig(x.getValue(), incremental)));
if (incremental) {
incrementalAlterConfigs(targetConfigs);
} else {
deprecatedAlterConfigs(targetConfigs);
}
.collect(Collectors.toMap(x -> formatRemoteTopic(x.getKey()), x -> targetConfig(x.getValue(), true)));
incrementalAlterConfigs(targetConfigs);
}
private void createOffsetSyncsTopic() {
@ -607,26 +587,6 @@ public class MirrorSourceConnector extends SourceConnector {
.collect(Collectors.toMap(ConfigEntry::name, ConfigEntry::value));
}
// visible for testing
// use deprecated alterConfigs API for broker compatibility back to 0.11.0
@SuppressWarnings("deprecation")
void deprecatedAlterConfigs(Map<String, Config> topicConfigs) throws ExecutionException, InterruptedException {
Map<ConfigResource, Config> configs = topicConfigs.entrySet().stream()
.collect(Collectors.toMap(x ->
new ConfigResource(ConfigResource.Type.TOPIC, x.getKey()), Entry::getValue));
log.trace("Syncing configs for {} topics.", configs.size());
adminCall(
() -> {
targetAdminClient.alterConfigs(configs).values().forEach((k, v) -> v.whenComplete((x, e) -> {
if (e != null) {
log.warn("Could not alter configuration of topic {}.", k.name(), e);
}
}));
return null;
},
() -> String.format("alter topic configs %s on %s cluster", topicConfigs, config.targetClusterAlias())
);
}
// visible for testing
void incrementalAlterConfigs(Map<String, Config> topicConfigs) throws ExecutionException, InterruptedException {
@ -644,34 +604,24 @@ public class MirrorSourceConnector extends SourceConnector {
configOps.put(configResource, ops);
}
log.trace("Syncing configs for {} topics.", configOps.size());
AtomicReference<Boolean> encounteredError = new AtomicReference<>(false);
adminCall(
() -> {
targetAdminClient.incrementalAlterConfigs(configOps).values().forEach((k, v) -> v.whenComplete((x, e) -> {
if (e != null) {
if (config.useIncrementalAlterConfigs().equals(MirrorSourceConfig.REQUEST_INCREMENTAL_ALTER_CONFIGS)
&& e instanceof UnsupportedVersionException && !encounteredError.get()) {
//Fallback logic
log.warn("The target cluster {} is not compatible with IncrementalAlterConfigs API. "
+ "Therefore using deprecated AlterConfigs API for syncing configs for topic {}",
sourceAndTarget.target(), k.name(), e);
encounteredError.set(true);
useIncrementalAlterConfigs = false;
} else if (config.useIncrementalAlterConfigs().equals(MirrorSourceConfig.REQUIRE_INCREMENTAL_ALTER_CONFIGS)
&& e instanceof UnsupportedVersionException && !encounteredError.get()) {
log.error("Failed to sync configs for topic {} on cluster {} with IncrementalAlterConfigs API", k.name(), sourceAndTarget.target(), e);
encounteredError.set(true);
context.raiseError(new ConnectException("use.incremental.alter.configs was set to \"required\", but the target cluster '"
+ sourceAndTarget.target() + "' is not compatible with IncrementalAlterConfigs API", e));
} else {
log.warn("Could not alter configuration of topic {}.", k.name(), e);
}
}
}));
return null;
},
() -> String.format("incremental alter topic configs %s on %s cluster", topicConfigs, config.targetClusterAlias())
);
adminCall(() -> {
targetAdminClient.incrementalAlterConfigs(configOps).values()
.forEach((k, v) -> v.whenComplete((x, e) -> {
if (e instanceof UnsupportedVersionException) {
log.error("Failed to sync configs for topic {} on cluster {} with " +
"IncrementalAlterConfigs API", k.name(), sourceAndTarget.target(), e);
context.raiseError(new ConnectException("the target cluster '"
+ sourceAndTarget.target() + "' is not compatible with " +
"IncrementalAlterConfigs " +
"API", e));
} else {
log.warn("Could not alter configuration of topic {}.", k.name(), e);
}
}));
return null;
},
() -> String.format("incremental alter topic configs %s on %s cluster", topicConfigs,
config.targetClusterAlias()));
}
private void updateTopicAcls(List<AclBinding> bindings) throws ExecutionException, InterruptedException {

View File

@ -17,7 +17,6 @@
package org.apache.kafka.connect.mirror;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.DescribeAclsResult;
@ -33,7 +32,6 @@ import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.common.errors.SecurityDisabledException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.resource.ResourceType;
@ -59,7 +57,6 @@ import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import java.util.stream.Collectors;
import static org.apache.kafka.clients.admin.AdminClientTestUtils.alterConfigsResult;
import static org.apache.kafka.clients.consumer.ConsumerConfig.ISOLATION_LEVEL_CONFIG;
import static org.apache.kafka.connect.mirror.MirrorConnectorConfig.CONSUMER_CLIENT_PREFIX;
import static org.apache.kafka.connect.mirror.MirrorConnectorConfig.SOURCE_PREFIX;
@ -76,14 +73,12 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@ -368,113 +363,6 @@ public class MirrorSourceConnectorTest {
verify(connector).createNewTopics(any(), any());
}
@Test
@Deprecated
public void testIncrementalAlterConfigsRequested() throws Exception {
Map<String, String> props = makeProps();
props.put(MirrorSourceConfig.USE_INCREMENTAL_ALTER_CONFIGS, MirrorSourceConfig.REQUEST_INCREMENTAL_ALTER_CONFIGS);
MirrorSourceConfig connectorConfig = new MirrorSourceConfig(props);
Admin admin = mock(Admin.class);
MirrorSourceConnector connector = spy(new MirrorSourceConnector(new SourceAndTarget("source", "target"),
new DefaultReplicationPolicy(), connectorConfig, new DefaultConfigPropertyFilter(), admin));
final String topic = "testtopic";
List<ConfigEntry> entries = Collections.singletonList(new ConfigEntry("name-1", "value-1"));
Config config = new Config(entries);
doReturn(Collections.singletonMap(topic, config)).when(connector).describeTopicConfigs(any());
doReturn(alterConfigsResult(new ConfigResource(ConfigResource.Type.TOPIC, topic), new UnsupportedVersionException("Unsupported API"))).when(admin).incrementalAlterConfigs(any());
doNothing().when(connector).deprecatedAlterConfigs(any());
connector.syncTopicConfigs();
Map<String, Config> topicConfigs = Collections.singletonMap("source." + topic, config);
verify(connector).incrementalAlterConfigs(topicConfigs);
// the next time we sync topic configurations, expect to use the deprecated API
connector.syncTopicConfigs();
verify(connector, times(1)).deprecatedAlterConfigs(topicConfigs);
}
@Test
@Deprecated
public void testIncrementalAlterConfigsRequired() throws Exception {
Map<String, String> props = makeProps();
props.put(MirrorSourceConfig.USE_INCREMENTAL_ALTER_CONFIGS, MirrorSourceConfig.REQUIRE_INCREMENTAL_ALTER_CONFIGS);
MirrorSourceConfig connectorConfig = new MirrorSourceConfig(props);
Admin admin = mock(Admin.class);
MirrorSourceConnector connector = spy(new MirrorSourceConnector(new SourceAndTarget("source", "target"),
new DefaultReplicationPolicy(), connectorConfig, new DefaultConfigPropertyFilter(), admin));
final String topic = "testtopic";
List<ConfigEntry> entries = new ArrayList<>();
ConfigEntry entryWithNonDefaultValue = new ConfigEntry("name-1", "value-1");
ConfigEntry entryWithDefaultValue = new ConfigEntry("name-2", "value-2", ConfigEntry.ConfigSource.DEFAULT_CONFIG, false, false,
Collections.emptyList(), ConfigEntry.ConfigType.STRING, "");
entries.add(entryWithNonDefaultValue);
entries.add(entryWithDefaultValue);
Config config = new Config(entries);
doReturn(Collections.singletonMap(topic, config)).when(connector).describeTopicConfigs(any());
doAnswer(invocation -> {
Map<ConfigResource, Collection<AlterConfigOp>> configOps = invocation.getArgument(0);
assertNotNull(configOps);
assertEquals(1, configOps.size());
ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, "source." + topic);
Collection<AlterConfigOp> ops = new ArrayList<>();
ops.add(new AlterConfigOp(entryWithNonDefaultValue, AlterConfigOp.OpType.SET));
ops.add(new AlterConfigOp(entryWithDefaultValue, AlterConfigOp.OpType.DELETE));
assertEquals(ops, configOps.get(configResource));
return alterConfigsResult(configResource);
}).when(admin).incrementalAlterConfigs(any());
connector.syncTopicConfigs();
Map<String, Config> topicConfigs = Collections.singletonMap("source." + topic, config);
verify(connector).incrementalAlterConfigs(topicConfigs);
}
@Test
@Deprecated
public void testIncrementalAlterConfigsRequiredButUnsupported() throws Exception {
Map<String, String> props = makeProps();
props.put(MirrorSourceConfig.USE_INCREMENTAL_ALTER_CONFIGS, MirrorSourceConfig.REQUIRE_INCREMENTAL_ALTER_CONFIGS);
MirrorSourceConfig connectorConfig = new MirrorSourceConfig(props);
Admin admin = mock(Admin.class);
ConnectorContext connectorContext = mock(ConnectorContext.class);
MirrorSourceConnector connector = spy(new MirrorSourceConnector(new SourceAndTarget("source", "target"),
new DefaultReplicationPolicy(), connectorConfig, new DefaultConfigPropertyFilter(), admin));
connector.initialize(connectorContext);
final String topic = "testtopic";
List<ConfigEntry> entries = Collections.singletonList(new ConfigEntry("name-1", "value-1"));
Config config = new Config(entries);
doReturn(Collections.singletonMap(topic, config)).when(connector).describeTopicConfigs(any());
doReturn(alterConfigsResult(new ConfigResource(ConfigResource.Type.TOPIC, topic), new UnsupportedVersionException("Unsupported API"))).when(admin).incrementalAlterConfigs(any());
connector.syncTopicConfigs();
verify(connectorContext).raiseError(isA(ConnectException.class));
}
@Test
@Deprecated
public void testIncrementalAlterConfigsNeverUsed() throws Exception {
Map<String, String> props = makeProps();
props.put(MirrorSourceConfig.USE_INCREMENTAL_ALTER_CONFIGS, MirrorSourceConfig.NEVER_USE_INCREMENTAL_ALTER_CONFIGS);
MirrorSourceConfig connectorConfigs = new MirrorSourceConfig(props);
MirrorSourceConnector connector = spy(new MirrorSourceConnector(new SourceAndTarget("source", "target"),
new DefaultReplicationPolicy(), connectorConfigs, new DefaultConfigPropertyFilter(), null));
final String topic = "testtopic";
List<ConfigEntry> entries = Collections.singletonList(new ConfigEntry("name-1", "value-1"));
Config config = new Config(entries);
doReturn(Collections.singletonMap(topic, config)).when(connector).describeTopicConfigs(any());
doNothing().when(connector).deprecatedAlterConfigs(any());
connector.syncTopicConfigs();
Map<String, Config> topicConfigs = Collections.singletonMap("source." + topic, config);
verify(connector).deprecatedAlterConfigs(topicConfigs);
verify(connector, never()).incrementalAlterConfigs(any());
}
@Test
public void testMirrorSourceConnectorTaskConfig() {

View File

@ -22,7 +22,7 @@
<h4><a id="upgrade_4_0_0" href="#upgrade_4_0_0">Upgrading to 4.0.0 from any version 0.8.x through 3.9.x</a></h4>
<h5><a id="upgrade_400_notable" href="#upgrade_400_notable">Notable changes in 4.0.0</a></h5>
<ul>
<li>A number of deprecated classes, methods and tools have been removed from the <code>clients</code>, <code>connect</code>, <code>core</code> and <code>tools</code> modules:</li>
<li>A number of deprecated classes, methods,configurations and tools have been removed from the <code>clients</code>, <code>connect</code>, <code>core</code> and <code>tools</code> modules:</li>
<ul>
<li> The original MirrorMaker (MM1) and related classes have been removed. Please use the Connect-based
MirrorMaker (MM2), as described in the
@ -33,6 +33,8 @@
<a href="/{{version}}/javadoc/org/apache/kafka/tools/api/RecordReader.html"><code>org.apache.kafka.tools.api.RecordReader</code></a>
interface to build custom readers for the <code>kafka-console-producer</code> tool.
</li>
<li> Remove <code>use.incremental.alter.configs</code>. The modified behavior is identical to the previous <code>required</code> configuration,
therefore users should ensure that target broker is at least 2.3.0</li>
<li>
The <code>kafka.tools.DefaultMessageFormatter</code> class has been removed. Please use the <code>org.apache.kafka.tools.consumer.DefaultMessageFormatter</code> class instead.
</li>