From cbd72cc216ec0a2f3fb427a859fefded79f8059c Mon Sep 17 00:00:00 2001 From: Kuan-Po Tseng Date: Wed, 5 Mar 2025 19:23:12 +0800 Subject: [PATCH] KAFKA-14121: AlterPartitionReassignments API should allow callers to specify the option of preserving the replication factor (#18983) Reviewers: Christo Lolov , Chia-Ping Tsai , TengYao Chi --- .../org/apache/kafka/clients/admin/Admin.java | 7 + .../AlterPartitionReassignmentsOptions.java | 21 +++ .../kafka/clients/admin/KafkaAdminClient.java | 1 + .../AlterPartitionReassignmentsRequest.java | 6 + .../AlterPartitionReassignmentsRequest.json | 5 +- .../AlterPartitionReassignmentsResponse.json | 5 +- .../api/PlaintextAdminIntegrationTest.scala | 19 ++- .../controller/ReplicationControlManager.java | 42 +++++- .../ReplicationControlManagerTest.java | 129 ++++++++++++++++++ .../reassign/ReassignPartitionsCommand.java | 26 ++-- .../ReassignPartitionsCommandOptions.java | 2 + .../tools/other/ReplicationQuotasTestRig.java | 2 +- .../ReassignPartitionsCommandTest.java | 20 ++- .../reassign/ReassignPartitionsUnitTest.java | 10 +- 14 files changed, 270 insertions(+), 25 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java index dcffef60d09..a0bb85899f8 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java @@ -1083,6 +1083,13 @@ public interface Admin extends AutoCloseable { * if the request timed out before the controller could record the new assignments. *
  • {@link org.apache.kafka.common.errors.InvalidReplicaAssignmentException} * If the specified assignment was not valid.
  • + *
  • {@link org.apache.kafka.common.errors.InvalidReplicationFactorException} + * If the replication factor was changed in an invalid way. + * Only thrown when {@link AlterPartitionReassignmentsOptions#allowReplicationFactorChange()} is set to false and + * the request is attempting to alter reassignments (not cancel)
  • + *
  • {@link org.apache.kafka.common.errors.UnsupportedVersionException} + * If {@link AlterPartitionReassignmentsOptions#allowReplicationFactorChange()} was changed outside the default + * and the server does not support the option (e.g due to an old Kafka version).
  • *
  • {@link org.apache.kafka.common.errors.NoReassignmentInProgressException} * If there was an attempt to cancel a reassignment for a partition which was not being reassigned.
  • * diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AlterPartitionReassignmentsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/AlterPartitionReassignmentsOptions.java index 166e90404c3..74c9f3dcdec 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/AlterPartitionReassignmentsOptions.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/AlterPartitionReassignmentsOptions.java @@ -23,4 +23,25 @@ import java.util.Map; * Options for {@link AdminClient#alterPartitionReassignments(Map, AlterPartitionReassignmentsOptions)} */ public class AlterPartitionReassignmentsOptions extends AbstractOptions { + + private boolean allowReplicationFactorChange = true; + + /** + * Set the option indicating if the alter partition reassignments call should be + * allowed to alter the replication factor of a partition. + * In cases where it is not allowed, any replication factor change will result in an exception thrown by the API. + */ + public AlterPartitionReassignmentsOptions allowReplicationFactorChange(boolean allow) { + this.allowReplicationFactorChange = allow; + return this; + } + + /** + * A boolean indicating if the alter partition reassignments should be + * allowed to alter the replication factor of a partition. + * In cases where it is not allowed, any replication factor change will result in an exception thrown by the API. + */ + public boolean allowReplicationFactorChange() { + return this.allowReplicationFactorChange; + } } diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 725e48c3656..8432ec754a0 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -3944,6 +3944,7 @@ public class KafkaAdminClient extends AdminClient { data.topics().add(reassignableTopic); } data.setTimeoutMs(timeoutMs); + data.setAllowReplicationFactorChange(options.allowReplicationFactorChange()); return new AlterPartitionReassignmentsRequest.Builder(data); } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionReassignmentsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionReassignmentsRequest.java index 2d289cc1497..8032207943a 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionReassignmentsRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionReassignmentsRequest.java @@ -17,6 +17,7 @@ package org.apache.kafka.common.requests; +import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData; import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData.ReassignableTopic; import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData; @@ -42,6 +43,11 @@ public class AlterPartitionReassignmentsRequest extends AbstractRequest { @Override public AlterPartitionReassignmentsRequest build(short version) { + if (!data.allowReplicationFactorChange() && version < 1) { + throw new UnsupportedVersionException("The broker does not support the AllowReplicationFactorChange " + + "option for the AlterPartitionReassignments API. Consider re-sending the request without the " + + "option or updating the server version"); + } return new AlterPartitionReassignmentsRequest(data, version); } diff --git a/clients/src/main/resources/common/message/AlterPartitionReassignmentsRequest.json b/clients/src/main/resources/common/message/AlterPartitionReassignmentsRequest.json index f3047feb0a3..d0ccc1c088e 100644 --- a/clients/src/main/resources/common/message/AlterPartitionReassignmentsRequest.json +++ b/clients/src/main/resources/common/message/AlterPartitionReassignmentsRequest.json @@ -18,11 +18,14 @@ "type": "request", "listeners": ["broker", "controller"], "name": "AlterPartitionReassignmentsRequest", - "validVersions": "0", + // Version 1 adds the ability to allow/disallow changing the replication factor as part of the request. + "validVersions": "0-1", "flexibleVersions": "0+", "fields": [ { "name": "TimeoutMs", "type": "int32", "versions": "0+", "default": "60000", "about": "The time in ms to wait for the request to complete." }, + { "name": "AllowReplicationFactorChange", "type": "bool", "versions": "1+", "default": "true", + "about": "The option indicating whether changing the replication factor of any given partition as part of this request is a valid move." }, { "name": "Topics", "type": "[]ReassignableTopic", "versions": "0+", "about": "The topics to reassign.", "fields": [ { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName", diff --git a/clients/src/main/resources/common/message/AlterPartitionReassignmentsResponse.json b/clients/src/main/resources/common/message/AlterPartitionReassignmentsResponse.json index 0b8f60b0bab..36ce87968ec 100644 --- a/clients/src/main/resources/common/message/AlterPartitionReassignmentsResponse.json +++ b/clients/src/main/resources/common/message/AlterPartitionReassignmentsResponse.json @@ -17,11 +17,14 @@ "apiKey": 45, "type": "response", "name": "AlterPartitionReassignmentsResponse", - "validVersions": "0", + // Version 1 adds the ability to allow/disallow changing the replication factor as part of the request. + "validVersions": "0-1", "flexibleVersions": "0+", "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, + { "name": "AllowReplicationFactorChange", "type": "bool", "versions": "1+", "default": "true", "ignorable": true, + "about": "The option indicating whether changing the replication factor of any given partition as part of the request was allowed." }, { "name": "ErrorCode", "type": "int16", "versions": "0+", "about": "The top-level error code, or 0 if there was no error." }, { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala index b57f4cd722c..18c5c2bbd05 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala @@ -3452,13 +3452,28 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { val tp1 = new TopicPartition(topic, 0) val tp2 = new TopicPartition(topic, 1) val tp3 = new TopicPartition(topic, 2) - createTopic(topic, numPartitions = 4) - + createTopic(topic, numPartitions = 4, replicationFactor = 2) val validAssignment = Optional.of(new NewPartitionReassignment( (0 until brokerCount).map(_.asInstanceOf[Integer]).asJava )) + val alterOptions = new AlterPartitionReassignmentsOptions + alterOptions.allowReplicationFactorChange(false) + val alterReplicaNumberTo1 = Optional.of(new NewPartitionReassignment(List(1.asInstanceOf[Integer]).asJava)) + val alterReplicaNumberTo2 = Optional.of(new NewPartitionReassignment((0 until brokerCount - 1).map(_.asInstanceOf[Integer]).asJava)) + val alterReplicaNumberTo3 = Optional.of(new NewPartitionReassignment((0 until brokerCount).map(_.asInstanceOf[Integer]).asJava)) + val alterReplicaResults = client.alterPartitionReassignments(Map( + tp1 -> alterReplicaNumberTo1, + tp2 -> alterReplicaNumberTo2, + tp3 -> alterReplicaNumberTo3, + ).asJava, alterOptions).values() + assertDoesNotThrow(() => alterReplicaResults.get(tp2).get()) + assertEquals("The replication factor is changed from 2 to 1", + assertFutureThrows(classOf[InvalidReplicationFactorException], alterReplicaResults.get(tp1)).getMessage) + assertEquals("The replication factor is changed from 2 to 3", + assertFutureThrows(classOf[InvalidReplicationFactorException], alterReplicaResults.get(tp3)).getMessage) + val nonExistentTp1 = new TopicPartition("topicA", 0) val nonExistentTp2 = new TopicPartition(topic, 4) val nonExistentPartitionsResult = client.alterPartitionReassignments(Map( diff --git a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java index b5d4993f90b..d4c613003c3 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java @@ -2058,8 +2058,10 @@ public class ReplicationControlManager { ControllerResult alterPartitionReassignments(AlterPartitionReassignmentsRequestData request) { List records = BoundedList.newArrayBacked(MAX_RECORDS_PER_USER_OP); + boolean allowRFChange = request.allowReplicationFactorChange(); AlterPartitionReassignmentsResponseData result = - new AlterPartitionReassignmentsResponseData().setErrorMessage(null); + new AlterPartitionReassignmentsResponseData().setErrorMessage(null) + .setAllowReplicationFactorChange(allowRFChange); int successfulAlterations = 0, totalAlterations = 0; for (ReassignableTopic topic : request.topics()) { ReassignableTopicResponse topicResponse = new ReassignableTopicResponse(). @@ -2067,7 +2069,7 @@ public class ReplicationControlManager { for (ReassignablePartition partition : topic.partitions()) { ApiError error = ApiError.NONE; try { - alterPartitionReassignment(topic.name(), partition, records); + alterPartitionReassignment(topic.name(), partition, records, allowRFChange); successfulAlterations++; } catch (Throwable e) { log.info("Unable to alter partition reassignment for " + @@ -2090,7 +2092,8 @@ public class ReplicationControlManager { void alterPartitionReassignment(String topicName, ReassignablePartition target, - List records) { + List records, + boolean allowRFChange) { Uuid topicId = topicsByName.get(topicName); if (topicId == null) { throw new UnknownTopicOrPartitionException("Unable to find a topic " + @@ -2111,7 +2114,7 @@ public class ReplicationControlManager { if (target.replicas() == null) { record = cancelPartitionReassignment(topicName, tp, part); } else { - record = changePartitionReassignment(tp, part, target); + record = changePartitionReassignment(tp, part, target, allowRFChange); } record.ifPresent(records::add); } @@ -2175,18 +2178,23 @@ public class ReplicationControlManager { * @param tp The topic id and partition id. * @param part The existing partition info. * @param target The target partition info. + * @param allowRFChange Validate if partition replication factor can change. KIP-860 * * @return The ChangePartitionRecord for the new partition assignment, * or empty if no change is needed. */ Optional changePartitionReassignment(TopicIdPartition tp, PartitionRegistration part, - ReassignablePartition target) { + ReassignablePartition target, + boolean allowRFChange) { // Check that the requested partition assignment is valid. PartitionAssignment currentAssignment = new PartitionAssignment(Replicas.toList(part.replicas), part::directory); PartitionAssignment targetAssignment = new PartitionAssignment(target.replicas(), clusterDescriber); validateManualPartitionAssignment(targetAssignment, OptionalInt.empty()); + if (!allowRFChange) { + validatePartitionReplicationFactorUnchanged(part, target); + } List currentReplicas = Replicas.toList(part.replicas); PartitionReassignmentReplicas reassignment = @@ -2406,6 +2414,30 @@ public class ReplicationControlManager { newPartInfo.elr); } + private void validatePartitionReplicationFactorUnchanged(PartitionRegistration part, + ReassignablePartition target) { + int currentReassignmentSetSize; + if (isReassignmentInProgress(part)) { + Set set = new HashSet<>(); + for (int r : part.replicas) { + set.add(r); + } + for (int r : part.addingReplicas) { + set.add(r); + } + for (int r : part.removingReplicas) { + set.remove(r); + } + currentReassignmentSetSize = set.size(); + } else { + currentReassignmentSetSize = part.replicas.length; + } + if (currentReassignmentSetSize != target.replicas().size()) { + throw new InvalidReplicationFactorException("The replication factor is changed from " + + currentReassignmentSetSize + " to " + target.replicas().size()); + } + } + private static final class IneligibleReplica { private final int replicaId; private final String reason; diff --git a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java index ef58e2bc7f5..0516b134c78 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java @@ -1922,6 +1922,135 @@ public class ReplicationControlManagerTest { assertEquals(NONE_REASSIGNING, replication.listPartitionReassignments(null, Long.MAX_VALUE)); } + @ParameterizedTest + @ApiKeyVersionsSource(apiKey = ApiKeys.ALTER_PARTITION) + public void testAlterPartitionDisallowReplicationFactorChange(short version) { + MetadataVersion metadataVersion = MetadataVersion.latestTesting(); + ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder() + .setMetadataVersion(metadataVersion) + .build(); + ReplicationControlManager replication = ctx.replicationControl; + ctx.registerBrokers(0, 1, 2, 3); + ctx.unfenceBrokers(0, 1, 2, 3); + ctx.createTestTopic("foo", new int[][] {new int[] {0, 1, 2}, new int[] {0, 1, 2}, new int[] {0, 1, 2}}); + + ControllerResult alterResult = + replication.alterPartitionReassignments( + new AlterPartitionReassignmentsRequestData().setTopics(singletonList( + new ReassignableTopic().setName("foo").setPartitions(asList( + new ReassignablePartition().setPartitionIndex(0). + setReplicas(asList(1, 2, 3)), + new ReassignablePartition().setPartitionIndex(1). + setReplicas(asList(0, 1)), + new ReassignablePartition().setPartitionIndex(2). + setReplicas(asList(0, 1, 2, 3)))))). + setAllowReplicationFactorChange(false)); + assertEquals(new AlterPartitionReassignmentsResponseData(). + setErrorMessage(null).setAllowReplicationFactorChange(false).setResponses(singletonList( + new ReassignableTopicResponse().setName("foo").setPartitions(asList( + new ReassignablePartitionResponse().setPartitionIndex(0). + setErrorMessage(null), + new ReassignablePartitionResponse().setPartitionIndex(1). + setErrorCode(INVALID_REPLICATION_FACTOR.code()). + setErrorMessage("The replication factor is changed from 3 to 2"), + new ReassignablePartitionResponse().setPartitionIndex(2). + setErrorCode(INVALID_REPLICATION_FACTOR.code()). + setErrorMessage("The replication factor is changed from 3 to 4"))))), + alterResult.response()); + ctx.replay(alterResult.records()); + ListPartitionReassignmentsResponseData currentReassigning = + new ListPartitionReassignmentsResponseData().setErrorMessage(null). + setTopics(singletonList(new OngoingTopicReassignment(). + setName("foo").setPartitions(singletonList( + new OngoingPartitionReassignment().setPartitionIndex(0). + setRemovingReplicas(singletonList(0)). + setAddingReplicas(singletonList(3)). + setReplicas(asList(1, 2, 3, 0)))))); + assertEquals(currentReassigning, replication.listPartitionReassignments(singletonList( + new ListPartitionReassignmentsTopics().setName("foo"). + setPartitionIndexes(asList(0, 1, 2))), Long.MAX_VALUE)); + + // test alter replica factor not allow to change when partition reassignment is ongoing + ControllerResult alterReassigningResult = + replication.alterPartitionReassignments( + new AlterPartitionReassignmentsRequestData().setTopics(singletonList( + new ReassignableTopic().setName("foo").setPartitions(singletonList( + new ReassignablePartition().setPartitionIndex(0).setReplicas(asList(0, 1)))))). + setAllowReplicationFactorChange(false)); + assertEquals(new AlterPartitionReassignmentsResponseData(). + setErrorMessage(null).setAllowReplicationFactorChange(false).setResponses(singletonList( + new ReassignableTopicResponse().setName("foo").setPartitions(singletonList( + new ReassignablePartitionResponse().setPartitionIndex(0). + setErrorCode(INVALID_REPLICATION_FACTOR.code()). + setErrorMessage("The replication factor is changed from 3 to 2"))))), + alterReassigningResult.response()); + + ControllerResult alterReassigningResult2 = + replication.alterPartitionReassignments( + new AlterPartitionReassignmentsRequestData().setTopics(singletonList( + new ReassignableTopic().setName("foo").setPartitions(singletonList( + new ReassignablePartition().setPartitionIndex(0).setReplicas(asList(0, 2, 3)))))). + setAllowReplicationFactorChange(false)); + assertEquals(new AlterPartitionReassignmentsResponseData(). + setErrorMessage(null).setAllowReplicationFactorChange(false).setResponses(singletonList( + new ReassignableTopicResponse().setName("foo").setPartitions(singletonList( + new ReassignablePartitionResponse().setPartitionIndex(0). + setErrorMessage(null))))), + alterReassigningResult2.response()); + } + + @ParameterizedTest + @ApiKeyVersionsSource(apiKey = ApiKeys.ALTER_PARTITION) + public void testDisallowReplicationFactorChangeNoEffectWhenCancelAlterPartition(short version) { + MetadataVersion metadataVersion = MetadataVersion.latestTesting(); + ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder() + .setMetadataVersion(metadataVersion) + .build(); + ReplicationControlManager replication = ctx.replicationControl; + ctx.registerBrokers(0, 1, 2, 3); + ctx.unfenceBrokers(0, 1, 2, 3); + ctx.createTestTopic("foo", new int[][] {new int[] {0, 1, 2}}).topicId(); + + ControllerResult alterResult = + replication.alterPartitionReassignments( + new AlterPartitionReassignmentsRequestData().setTopics(singletonList( + new ReassignableTopic().setName("foo").setPartitions(singletonList( + new ReassignablePartition().setPartitionIndex(0). + setReplicas(asList(1, 2, 3))))))); + assertEquals(new AlterPartitionReassignmentsResponseData(). + setErrorMessage(null).setResponses(singletonList( + new ReassignableTopicResponse().setName("foo").setPartitions(singletonList( + new ReassignablePartitionResponse().setPartitionIndex(0).setErrorMessage(null))))), + alterResult.response()); + ctx.replay(alterResult.records()); + ListPartitionReassignmentsResponseData currentReassigning = + new ListPartitionReassignmentsResponseData().setErrorMessage(null). + setTopics(singletonList(new OngoingTopicReassignment(). + setName("foo").setPartitions(singletonList( + new OngoingPartitionReassignment().setPartitionIndex(0). + setRemovingReplicas(singletonList(0)). + setAddingReplicas(singletonList(3)). + setReplicas(asList(1, 2, 3, 0)))))); + assertEquals(currentReassigning, replication.listPartitionReassignments(singletonList( + new ListPartitionReassignmentsTopics().setName("foo"). + setPartitionIndexes(asList(0, 1, 2))), Long.MAX_VALUE)); + + // test replica factor change check takes no effect when partition reassignment is ongoing + ControllerResult cancelResult = + replication.alterPartitionReassignments( + new AlterPartitionReassignmentsRequestData().setTopics(singletonList( + new ReassignableTopic().setName("foo").setPartitions(singletonList( + new ReassignablePartition().setPartitionIndex(0).setReplicas(null))))). + setAllowReplicationFactorChange(false)); + assertEquals(new AlterPartitionReassignmentsResponseData().setAllowReplicationFactorChange(false).setErrorMessage(null). + setResponses(singletonList( + new ReassignableTopicResponse().setName("foo").setPartitions(singletonList( + new ReassignablePartitionResponse().setPartitionIndex(0).setErrorMessage(null))))), + cancelResult.response()); + ctx.replay(cancelResult.records()); + assertEquals(NONE_REASSIGNING, replication.listPartitionReassignments(null, Long.MAX_VALUE)); + } + @ParameterizedTest @ApiKeyVersionsSource(apiKey = ApiKeys.ALTER_PARTITION) public void testAlterPartitionShouldRejectFencedBrokers(short version) { diff --git a/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java b/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java index 137092da32c..33bf23d13d1 100644 --- a/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java +++ b/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java @@ -20,6 +20,7 @@ import org.apache.kafka.admin.BrokerMetadata; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.AlterConfigOp; +import org.apache.kafka.clients.admin.AlterPartitionReassignmentsOptions; import org.apache.kafka.clients.admin.ConfigEntry; import org.apache.kafka.clients.admin.DescribeReplicaLogDirsResult; import org.apache.kafka.clients.admin.NewPartitionReassignment; @@ -182,7 +183,8 @@ public class ReassignPartitionsCommand { opts.options.valueOf(opts.interBrokerThrottleOpt), opts.options.valueOf(opts.replicaAlterLogDirsThrottleOpt), opts.options.valueOf(opts.timeoutOpt), - Time.SYSTEM); + Time.SYSTEM, + opts.options.has(opts.disallowReplicationFactorChangeOpt)); } else if (opts.options.has(opts.cancelOpt)) { cancelAssignment(adminClient, Utils.readFileAsString(opts.options.valueOf(opts.reassignmentJsonFileOpt)), @@ -761,7 +763,8 @@ public class ReassignPartitionsCommand { Long interBrokerThrottle, Long logDirThrottle, Long timeoutMs, - Time time + Time time, + boolean disallowReplicationFactorChange ) throws ExecutionException, InterruptedException, JsonProcessingException, TerseException { Entry>, Map> t0 = parseExecuteAssignmentArgs(reassignmentJson); @@ -796,7 +799,7 @@ public class ReassignPartitionsCommand { } // Execute the partition reassignments. - Map errors = alterPartitionReassignments(adminClient, proposedParts); + Map errors = alterPartitionReassignments(adminClient, proposedParts, disallowReplicationFactorChange); if (!errors.isEmpty()) { throw new TerseException( String.format("Error reassigning partition(s):%n%s", @@ -941,15 +944,19 @@ public class ReassignPartitionsCommand { /** * Execute the given partition reassignments. * - * @param adminClient The admin client object to use. - * @param reassignments A map from topic names to target replica assignments. - * @return A map from partition objects to error strings. + * @param adminClient The admin client object to use. + * @param reassignments A map from topic names to target replica assignments. + * @param disallowReplicationFactorChange Disallow replication factor change or not. + * @return A map from partition objects to error strings. */ static Map alterPartitionReassignments(Admin adminClient, - Map> reassignments) throws InterruptedException { + Map> reassignments, + boolean disallowReplicationFactorChange) throws InterruptedException { Map> args = new HashMap<>(); reassignments.forEach((part, replicas) -> args.put(part, Optional.of(new NewPartitionReassignment(replicas)))); - Map> results = adminClient.alterPartitionReassignments(args).values(); + AlterPartitionReassignmentsOptions options = new AlterPartitionReassignmentsOptions(); + options.allowReplicationFactorChange(!disallowReplicationFactorChange); + Map> results = adminClient.alterPartitionReassignments(args, options).values(); Map errors = new HashMap<>(); for (Entry> e : results.entrySet()) { try { @@ -1485,7 +1492,8 @@ public class ReassignPartitionsCommand { opts.commandConfigOpt, opts.interBrokerThrottleOpt, opts.replicaAlterLogDirsThrottleOpt, - opts.timeoutOpt + opts.timeoutOpt, + opts.disallowReplicationFactorChangeOpt )); permittedArgs.put(opts.cancelOpt, Arrays.asList( isBootstrapServer ? opts.bootstrapServerOpt : opts.bootstrapControllerOpt, diff --git a/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandOptions.java b/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandOptions.java index 2d31c5a902a..39541288712 100644 --- a/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandOptions.java +++ b/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandOptions.java @@ -42,6 +42,7 @@ public class ReassignPartitionsCommandOptions extends CommandDefaultOptions { final OptionSpec timeoutOpt; final OptionSpec additionalOpt; final OptionSpec preserveThrottlesOpt; + final OptionSpec disallowReplicationFactorChangeOpt; public ReassignPartitionsCommandOptions(String[] args) { super(args); @@ -115,6 +116,7 @@ public class ReassignPartitionsCommandOptions extends CommandDefaultOptions { additionalOpt = parser.accepts("additional", "Execute this reassignment in addition to any " + "other ongoing ones. This option can also be used to change the throttle of an ongoing reassignment."); preserveThrottlesOpt = parser.accepts("preserve-throttles", "Do not modify broker or topic throttles."); + disallowReplicationFactorChangeOpt = parser.accepts("disallow-replication-factor-change", "Denies the ability to change a partition's replication factor as part of this reassignment through adding validation against it."); options = parser.parse(args); } diff --git a/tools/src/test/java/org/apache/kafka/tools/other/ReplicationQuotasTestRig.java b/tools/src/test/java/org/apache/kafka/tools/other/ReplicationQuotasTestRig.java index 0dedf567c49..8c981e4eb44 100644 --- a/tools/src/test/java/org/apache/kafka/tools/other/ReplicationQuotasTestRig.java +++ b/tools/src/test/java/org/apache/kafka/tools/other/ReplicationQuotasTestRig.java @@ -249,7 +249,7 @@ public class ReplicationQuotasTestRig { ReassignPartitionsCommand.executeAssignment(adminClient, false, ReassignPartitionsCommand.formatAsReassignmentJson(newAssignment, Collections.emptyMap()), - config.throttle, -1L, 10000L, Time.SYSTEM); + config.throttle, -1L, 10000L, Time.SYSTEM, false); //Await completion waitForReassignmentToComplete(); diff --git a/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandTest.java index d7f6f937d97..a96bccd36ed 100644 --- a/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandTest.java @@ -93,6 +93,7 @@ import static org.apache.kafka.tools.reassign.ReassignPartitionsCommand.verifyAs import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @ClusterTestDefaults(brokers = 5, disksPerBroker = 3, serverProperties = { @@ -432,6 +433,23 @@ public class ReassignPartitionsCommandTest { } } + @ClusterTest + public void testDisallowReplicationFactorChange() { + createTopics(); + String assignment = "{\"version\":1,\"partitions\":" + + "[{\"topic\":\"foo\",\"partition\":0,\"replicas\":[0,1],\"log_dirs\":[\"any\",\"any\"]}," + + "{\"topic\":\"foo\",\"partition\":1,\"replicas\":[0,1,2,3],\"log_dirs\":[\"any\",\"any\",\"any\",\"any\"]}," + + "{\"topic\":\"bar\",\"partition\":0,\"replicas\":[3],\"log_dirs\":[\"any\"]}" + + "]}"; + try (Admin admin = clusterInstance.admin()) { + assertEquals("Error reassigning partition(s):\n" + + "bar-0: The replication factor is changed from 3 to 1\n" + + "foo-0: The replication factor is changed from 3 to 2\n" + + "foo-1: The replication factor is changed from 3 to 4", + assertThrows(TerseException.class, () -> executeAssignment(admin, false, assignment, -1L, -1L, 10000L, Time.SYSTEM, true)).getMessage()); + } + } + private void createTopics() { try (Admin admin = Admin.create(Collections.singletonMap(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers()))) { Map> fooReplicasAssignments = new HashMap<>(); @@ -654,7 +672,7 @@ public class ReassignPartitionsCommandTest { Long replicaAlterLogDirsThrottle) throws RuntimeException { try (Admin admin = Admin.create(Collections.singletonMap(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers()))) { executeAssignment(admin, additional, reassignmentJson, - interBrokerThrottle, replicaAlterLogDirsThrottle, 10000L, Time.SYSTEM); + interBrokerThrottle, replicaAlterLogDirsThrottle, 10000L, Time.SYSTEM, false); } catch (ExecutionException | InterruptedException | JsonProcessingException | TerseException e) { throw new RuntimeException(e); } diff --git a/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsUnitTest.java b/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsUnitTest.java index d383b56e755..793eba842d2 100644 --- a/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsUnitTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsUnitTest.java @@ -161,7 +161,7 @@ public class ReassignPartitionsUnitTest { reassignments.put(new TopicPartition("foo", 0), asList(0, 1, 3)); reassignments.put(new TopicPartition("quux", 0), asList(1, 2, 3)); - Map reassignmentResult = alterPartitionReassignments(adminClient, reassignments); + Map reassignmentResult = alterPartitionReassignments(adminClient, reassignments, false); assertEquals(1, reassignmentResult.size()); assertEquals(UnknownTopicOrPartitionException.class, reassignmentResult.get(new TopicPartition("quux", 0)).getClass()); @@ -606,7 +606,7 @@ public class ReassignPartitionsUnitTest { "{\"version\":1,\"partitions\":" + "[{\"topic\":\"foo\",\"partition\":0,\"replicas\":[0,1],\"log_dirs\":[\"any\",\"any\"]}," + "{\"topic\":\"quux\",\"partition\":0,\"replicas\":[2,3,4],\"log_dirs\":[\"any\",\"any\",\"any\"]}" + - "]}", -1L, -1L, 10000L, Time.SYSTEM), "Expected reassignment with non-existent topic to fail").getCause().getMessage()); + "]}", -1L, -1L, 10000L, Time.SYSTEM, false), "Expected reassignment with non-existent topic to fail").getCause().getMessage()); } } @@ -619,7 +619,7 @@ public class ReassignPartitionsUnitTest { "{\"version\":1,\"partitions\":" + "[{\"topic\":\"foo\",\"partition\":0,\"replicas\":[0,1],\"log_dirs\":[\"any\",\"any\"]}," + "{\"topic\":\"foo\",\"partition\":1,\"replicas\":[2,3,4],\"log_dirs\":[\"any\",\"any\",\"any\"]}" + - "]}", -1L, -1L, 10000L, Time.SYSTEM), "Expected reassignment with non-existent broker id to fail").getMessage()); + "]}", -1L, -1L, 10000L, Time.SYSTEM, false), "Expected reassignment with non-existent broker id to fail").getMessage()); } } @@ -670,7 +670,7 @@ public class ReassignPartitionsUnitTest { reassignments.put(new TopicPartition("foo", 0), asList(0, 1, 4, 2)); reassignments.put(new TopicPartition("bar", 0), asList(2, 3)); - Map reassignmentResult = alterPartitionReassignments(adminClient, reassignments); + Map reassignmentResult = alterPartitionReassignments(adminClient, reassignments, false); assertTrue(reassignmentResult.isEmpty()); assertEquals(String.join(System.lineSeparator(), @@ -762,7 +762,7 @@ public class ReassignPartitionsUnitTest { try (MockAdminClient adminClient = new MockAdminClient.Builder().numBrokers(4).build()) { addTopics(adminClient); assertStartsWith("Unexpected character", - assertThrows(AdminOperationException.class, () -> executeAssignment(adminClient, false, "{invalid_json", -1L, -1L, 10000L, Time.SYSTEM)).getMessage()); + assertThrows(AdminOperationException.class, () -> executeAssignment(adminClient, false, "{invalid_json", -1L, -1L, 10000L, Time.SYSTEM, false)).getMessage()); } } }