KAFKA-14121: AlterPartitionReassignments API should allow callers to specify the option of preserving the replication factor (#18983)

Reviewers: Christo Lolov <lolovc@amazon.com>, Chia-Ping Tsai <chia7712@gmail.com>, TengYao Chi <kitingiao@gmail.com>
This commit is contained in:
Kuan-Po Tseng 2025-03-05 19:23:12 +08:00 committed by GitHub
parent c3a9b0fc84
commit cbd72cc216
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 270 additions and 25 deletions

View File

@ -1083,6 +1083,13 @@ public interface Admin extends AutoCloseable {
* if the request timed out before the controller could record the new assignments.</li>
* <li>{@link org.apache.kafka.common.errors.InvalidReplicaAssignmentException}
* If the specified assignment was not valid.</li>
* <li>{@link org.apache.kafka.common.errors.InvalidReplicationFactorException}
* If the replication factor was changed in an invalid way.
* Only thrown when {@link AlterPartitionReassignmentsOptions#allowReplicationFactorChange()} is set to false and
* the request is attempting to alter reassignments (not cancel)</li>
* <li>{@link org.apache.kafka.common.errors.UnsupportedVersionException}
* If {@link AlterPartitionReassignmentsOptions#allowReplicationFactorChange()} was changed outside the default
* and the server does not support the option (e.g due to an old Kafka version).</li>
* <li>{@link org.apache.kafka.common.errors.NoReassignmentInProgressException}
* If there was an attempt to cancel a reassignment for a partition which was not being reassigned.</li>
* </ul>

View File

@ -23,4 +23,25 @@ import java.util.Map;
* Options for {@link AdminClient#alterPartitionReassignments(Map, AlterPartitionReassignmentsOptions)}
*/
public class AlterPartitionReassignmentsOptions extends AbstractOptions<AlterPartitionReassignmentsOptions> {
private boolean allowReplicationFactorChange = true;
/**
* Set the option indicating if the alter partition reassignments call should be
* allowed to alter the replication factor of a partition.
* In cases where it is not allowed, any replication factor change will result in an exception thrown by the API.
*/
public AlterPartitionReassignmentsOptions allowReplicationFactorChange(boolean allow) {
this.allowReplicationFactorChange = allow;
return this;
}
/**
* A boolean indicating if the alter partition reassignments should be
* allowed to alter the replication factor of a partition.
* In cases where it is not allowed, any replication factor change will result in an exception thrown by the API.
*/
public boolean allowReplicationFactorChange() {
return this.allowReplicationFactorChange;
}
}

View File

@ -3944,6 +3944,7 @@ public class KafkaAdminClient extends AdminClient {
data.topics().add(reassignableTopic);
}
data.setTimeoutMs(timeoutMs);
data.setAllowReplicationFactorChange(options.allowReplicationFactorChange());
return new AlterPartitionReassignmentsRequest.Builder(data);
}

View File

@ -17,6 +17,7 @@
package org.apache.kafka.common.requests;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData;
import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData.ReassignableTopic;
import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData;
@ -42,6 +43,11 @@ public class AlterPartitionReassignmentsRequest extends AbstractRequest {
@Override
public AlterPartitionReassignmentsRequest build(short version) {
if (!data.allowReplicationFactorChange() && version < 1) {
throw new UnsupportedVersionException("The broker does not support the AllowReplicationFactorChange " +
"option for the AlterPartitionReassignments API. Consider re-sending the request without the " +
"option or updating the server version");
}
return new AlterPartitionReassignmentsRequest(data, version);
}

View File

@ -18,11 +18,14 @@
"type": "request",
"listeners": ["broker", "controller"],
"name": "AlterPartitionReassignmentsRequest",
"validVersions": "0",
// Version 1 adds the ability to allow/disallow changing the replication factor as part of the request.
"validVersions": "0-1",
"flexibleVersions": "0+",
"fields": [
{ "name": "TimeoutMs", "type": "int32", "versions": "0+", "default": "60000",
"about": "The time in ms to wait for the request to complete." },
{ "name": "AllowReplicationFactorChange", "type": "bool", "versions": "1+", "default": "true",
"about": "The option indicating whether changing the replication factor of any given partition as part of this request is a valid move." },
{ "name": "Topics", "type": "[]ReassignableTopic", "versions": "0+",
"about": "The topics to reassign.", "fields": [
{ "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",

View File

@ -17,11 +17,14 @@
"apiKey": 45,
"type": "response",
"name": "AlterPartitionReassignmentsResponse",
"validVersions": "0",
// Version 1 adds the ability to allow/disallow changing the replication factor as part of the request.
"validVersions": "0-1",
"flexibleVersions": "0+",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
{ "name": "AllowReplicationFactorChange", "type": "bool", "versions": "1+", "default": "true", "ignorable": true,
"about": "The option indicating whether changing the replication factor of any given partition as part of the request was allowed." },
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The top-level error code, or 0 if there was no error." },
{ "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+",

View File

@ -3452,13 +3452,28 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
val tp1 = new TopicPartition(topic, 0)
val tp2 = new TopicPartition(topic, 1)
val tp3 = new TopicPartition(topic, 2)
createTopic(topic, numPartitions = 4)
createTopic(topic, numPartitions = 4, replicationFactor = 2)
val validAssignment = Optional.of(new NewPartitionReassignment(
(0 until brokerCount).map(_.asInstanceOf[Integer]).asJava
))
val alterOptions = new AlterPartitionReassignmentsOptions
alterOptions.allowReplicationFactorChange(false)
val alterReplicaNumberTo1 = Optional.of(new NewPartitionReassignment(List(1.asInstanceOf[Integer]).asJava))
val alterReplicaNumberTo2 = Optional.of(new NewPartitionReassignment((0 until brokerCount - 1).map(_.asInstanceOf[Integer]).asJava))
val alterReplicaNumberTo3 = Optional.of(new NewPartitionReassignment((0 until brokerCount).map(_.asInstanceOf[Integer]).asJava))
val alterReplicaResults = client.alterPartitionReassignments(Map(
tp1 -> alterReplicaNumberTo1,
tp2 -> alterReplicaNumberTo2,
tp3 -> alterReplicaNumberTo3,
).asJava, alterOptions).values()
assertDoesNotThrow(() => alterReplicaResults.get(tp2).get())
assertEquals("The replication factor is changed from 2 to 1",
assertFutureThrows(classOf[InvalidReplicationFactorException], alterReplicaResults.get(tp1)).getMessage)
assertEquals("The replication factor is changed from 2 to 3",
assertFutureThrows(classOf[InvalidReplicationFactorException], alterReplicaResults.get(tp3)).getMessage)
val nonExistentTp1 = new TopicPartition("topicA", 0)
val nonExistentTp2 = new TopicPartition(topic, 4)
val nonExistentPartitionsResult = client.alterPartitionReassignments(Map(

View File

@ -2058,8 +2058,10 @@ public class ReplicationControlManager {
ControllerResult<AlterPartitionReassignmentsResponseData>
alterPartitionReassignments(AlterPartitionReassignmentsRequestData request) {
List<ApiMessageAndVersion> records = BoundedList.newArrayBacked(MAX_RECORDS_PER_USER_OP);
boolean allowRFChange = request.allowReplicationFactorChange();
AlterPartitionReassignmentsResponseData result =
new AlterPartitionReassignmentsResponseData().setErrorMessage(null);
new AlterPartitionReassignmentsResponseData().setErrorMessage(null)
.setAllowReplicationFactorChange(allowRFChange);
int successfulAlterations = 0, totalAlterations = 0;
for (ReassignableTopic topic : request.topics()) {
ReassignableTopicResponse topicResponse = new ReassignableTopicResponse().
@ -2067,7 +2069,7 @@ public class ReplicationControlManager {
for (ReassignablePartition partition : topic.partitions()) {
ApiError error = ApiError.NONE;
try {
alterPartitionReassignment(topic.name(), partition, records);
alterPartitionReassignment(topic.name(), partition, records, allowRFChange);
successfulAlterations++;
} catch (Throwable e) {
log.info("Unable to alter partition reassignment for " +
@ -2090,7 +2092,8 @@ public class ReplicationControlManager {
void alterPartitionReassignment(String topicName,
ReassignablePartition target,
List<ApiMessageAndVersion> records) {
List<ApiMessageAndVersion> records,
boolean allowRFChange) {
Uuid topicId = topicsByName.get(topicName);
if (topicId == null) {
throw new UnknownTopicOrPartitionException("Unable to find a topic " +
@ -2111,7 +2114,7 @@ public class ReplicationControlManager {
if (target.replicas() == null) {
record = cancelPartitionReassignment(topicName, tp, part);
} else {
record = changePartitionReassignment(tp, part, target);
record = changePartitionReassignment(tp, part, target, allowRFChange);
}
record.ifPresent(records::add);
}
@ -2175,18 +2178,23 @@ public class ReplicationControlManager {
* @param tp The topic id and partition id.
* @param part The existing partition info.
* @param target The target partition info.
* @param allowRFChange Validate if partition replication factor can change. KIP-860
*
* @return The ChangePartitionRecord for the new partition assignment,
* or empty if no change is needed.
*/
Optional<ApiMessageAndVersion> changePartitionReassignment(TopicIdPartition tp,
PartitionRegistration part,
ReassignablePartition target) {
ReassignablePartition target,
boolean allowRFChange) {
// Check that the requested partition assignment is valid.
PartitionAssignment currentAssignment = new PartitionAssignment(Replicas.toList(part.replicas), part::directory);
PartitionAssignment targetAssignment = new PartitionAssignment(target.replicas(), clusterDescriber);
validateManualPartitionAssignment(targetAssignment, OptionalInt.empty());
if (!allowRFChange) {
validatePartitionReplicationFactorUnchanged(part, target);
}
List<Integer> currentReplicas = Replicas.toList(part.replicas);
PartitionReassignmentReplicas reassignment =
@ -2406,6 +2414,30 @@ public class ReplicationControlManager {
newPartInfo.elr);
}
private void validatePartitionReplicationFactorUnchanged(PartitionRegistration part,
ReassignablePartition target) {
int currentReassignmentSetSize;
if (isReassignmentInProgress(part)) {
Set<Integer> set = new HashSet<>();
for (int r : part.replicas) {
set.add(r);
}
for (int r : part.addingReplicas) {
set.add(r);
}
for (int r : part.removingReplicas) {
set.remove(r);
}
currentReassignmentSetSize = set.size();
} else {
currentReassignmentSetSize = part.replicas.length;
}
if (currentReassignmentSetSize != target.replicas().size()) {
throw new InvalidReplicationFactorException("The replication factor is changed from " +
currentReassignmentSetSize + " to " + target.replicas().size());
}
}
private static final class IneligibleReplica {
private final int replicaId;
private final String reason;

View File

@ -1922,6 +1922,135 @@ public class ReplicationControlManagerTest {
assertEquals(NONE_REASSIGNING, replication.listPartitionReassignments(null, Long.MAX_VALUE));
}
@ParameterizedTest
@ApiKeyVersionsSource(apiKey = ApiKeys.ALTER_PARTITION)
public void testAlterPartitionDisallowReplicationFactorChange(short version) {
MetadataVersion metadataVersion = MetadataVersion.latestTesting();
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder()
.setMetadataVersion(metadataVersion)
.build();
ReplicationControlManager replication = ctx.replicationControl;
ctx.registerBrokers(0, 1, 2, 3);
ctx.unfenceBrokers(0, 1, 2, 3);
ctx.createTestTopic("foo", new int[][] {new int[] {0, 1, 2}, new int[] {0, 1, 2}, new int[] {0, 1, 2}});
ControllerResult<AlterPartitionReassignmentsResponseData> alterResult =
replication.alterPartitionReassignments(
new AlterPartitionReassignmentsRequestData().setTopics(singletonList(
new ReassignableTopic().setName("foo").setPartitions(asList(
new ReassignablePartition().setPartitionIndex(0).
setReplicas(asList(1, 2, 3)),
new ReassignablePartition().setPartitionIndex(1).
setReplicas(asList(0, 1)),
new ReassignablePartition().setPartitionIndex(2).
setReplicas(asList(0, 1, 2, 3)))))).
setAllowReplicationFactorChange(false));
assertEquals(new AlterPartitionReassignmentsResponseData().
setErrorMessage(null).setAllowReplicationFactorChange(false).setResponses(singletonList(
new ReassignableTopicResponse().setName("foo").setPartitions(asList(
new ReassignablePartitionResponse().setPartitionIndex(0).
setErrorMessage(null),
new ReassignablePartitionResponse().setPartitionIndex(1).
setErrorCode(INVALID_REPLICATION_FACTOR.code()).
setErrorMessage("The replication factor is changed from 3 to 2"),
new ReassignablePartitionResponse().setPartitionIndex(2).
setErrorCode(INVALID_REPLICATION_FACTOR.code()).
setErrorMessage("The replication factor is changed from 3 to 4"))))),
alterResult.response());
ctx.replay(alterResult.records());
ListPartitionReassignmentsResponseData currentReassigning =
new ListPartitionReassignmentsResponseData().setErrorMessage(null).
setTopics(singletonList(new OngoingTopicReassignment().
setName("foo").setPartitions(singletonList(
new OngoingPartitionReassignment().setPartitionIndex(0).
setRemovingReplicas(singletonList(0)).
setAddingReplicas(singletonList(3)).
setReplicas(asList(1, 2, 3, 0))))));
assertEquals(currentReassigning, replication.listPartitionReassignments(singletonList(
new ListPartitionReassignmentsTopics().setName("foo").
setPartitionIndexes(asList(0, 1, 2))), Long.MAX_VALUE));
// test alter replica factor not allow to change when partition reassignment is ongoing
ControllerResult<AlterPartitionReassignmentsResponseData> alterReassigningResult =
replication.alterPartitionReassignments(
new AlterPartitionReassignmentsRequestData().setTopics(singletonList(
new ReassignableTopic().setName("foo").setPartitions(singletonList(
new ReassignablePartition().setPartitionIndex(0).setReplicas(asList(0, 1)))))).
setAllowReplicationFactorChange(false));
assertEquals(new AlterPartitionReassignmentsResponseData().
setErrorMessage(null).setAllowReplicationFactorChange(false).setResponses(singletonList(
new ReassignableTopicResponse().setName("foo").setPartitions(singletonList(
new ReassignablePartitionResponse().setPartitionIndex(0).
setErrorCode(INVALID_REPLICATION_FACTOR.code()).
setErrorMessage("The replication factor is changed from 3 to 2"))))),
alterReassigningResult.response());
ControllerResult<AlterPartitionReassignmentsResponseData> alterReassigningResult2 =
replication.alterPartitionReassignments(
new AlterPartitionReassignmentsRequestData().setTopics(singletonList(
new ReassignableTopic().setName("foo").setPartitions(singletonList(
new ReassignablePartition().setPartitionIndex(0).setReplicas(asList(0, 2, 3)))))).
setAllowReplicationFactorChange(false));
assertEquals(new AlterPartitionReassignmentsResponseData().
setErrorMessage(null).setAllowReplicationFactorChange(false).setResponses(singletonList(
new ReassignableTopicResponse().setName("foo").setPartitions(singletonList(
new ReassignablePartitionResponse().setPartitionIndex(0).
setErrorMessage(null))))),
alterReassigningResult2.response());
}
@ParameterizedTest
@ApiKeyVersionsSource(apiKey = ApiKeys.ALTER_PARTITION)
public void testDisallowReplicationFactorChangeNoEffectWhenCancelAlterPartition(short version) {
MetadataVersion metadataVersion = MetadataVersion.latestTesting();
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder()
.setMetadataVersion(metadataVersion)
.build();
ReplicationControlManager replication = ctx.replicationControl;
ctx.registerBrokers(0, 1, 2, 3);
ctx.unfenceBrokers(0, 1, 2, 3);
ctx.createTestTopic("foo", new int[][] {new int[] {0, 1, 2}}).topicId();
ControllerResult<AlterPartitionReassignmentsResponseData> alterResult =
replication.alterPartitionReassignments(
new AlterPartitionReassignmentsRequestData().setTopics(singletonList(
new ReassignableTopic().setName("foo").setPartitions(singletonList(
new ReassignablePartition().setPartitionIndex(0).
setReplicas(asList(1, 2, 3)))))));
assertEquals(new AlterPartitionReassignmentsResponseData().
setErrorMessage(null).setResponses(singletonList(
new ReassignableTopicResponse().setName("foo").setPartitions(singletonList(
new ReassignablePartitionResponse().setPartitionIndex(0).setErrorMessage(null))))),
alterResult.response());
ctx.replay(alterResult.records());
ListPartitionReassignmentsResponseData currentReassigning =
new ListPartitionReassignmentsResponseData().setErrorMessage(null).
setTopics(singletonList(new OngoingTopicReassignment().
setName("foo").setPartitions(singletonList(
new OngoingPartitionReassignment().setPartitionIndex(0).
setRemovingReplicas(singletonList(0)).
setAddingReplicas(singletonList(3)).
setReplicas(asList(1, 2, 3, 0))))));
assertEquals(currentReassigning, replication.listPartitionReassignments(singletonList(
new ListPartitionReassignmentsTopics().setName("foo").
setPartitionIndexes(asList(0, 1, 2))), Long.MAX_VALUE));
// test replica factor change check takes no effect when partition reassignment is ongoing
ControllerResult<AlterPartitionReassignmentsResponseData> cancelResult =
replication.alterPartitionReassignments(
new AlterPartitionReassignmentsRequestData().setTopics(singletonList(
new ReassignableTopic().setName("foo").setPartitions(singletonList(
new ReassignablePartition().setPartitionIndex(0).setReplicas(null))))).
setAllowReplicationFactorChange(false));
assertEquals(new AlterPartitionReassignmentsResponseData().setAllowReplicationFactorChange(false).setErrorMessage(null).
setResponses(singletonList(
new ReassignableTopicResponse().setName("foo").setPartitions(singletonList(
new ReassignablePartitionResponse().setPartitionIndex(0).setErrorMessage(null))))),
cancelResult.response());
ctx.replay(cancelResult.records());
assertEquals(NONE_REASSIGNING, replication.listPartitionReassignments(null, Long.MAX_VALUE));
}
@ParameterizedTest
@ApiKeyVersionsSource(apiKey = ApiKeys.ALTER_PARTITION)
public void testAlterPartitionShouldRejectFencedBrokers(short version) {

View File

@ -20,6 +20,7 @@ import org.apache.kafka.admin.BrokerMetadata;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.AlterPartitionReassignmentsOptions;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.DescribeReplicaLogDirsResult;
import org.apache.kafka.clients.admin.NewPartitionReassignment;
@ -182,7 +183,8 @@ public class ReassignPartitionsCommand {
opts.options.valueOf(opts.interBrokerThrottleOpt),
opts.options.valueOf(opts.replicaAlterLogDirsThrottleOpt),
opts.options.valueOf(opts.timeoutOpt),
Time.SYSTEM);
Time.SYSTEM,
opts.options.has(opts.disallowReplicationFactorChangeOpt));
} else if (opts.options.has(opts.cancelOpt)) {
cancelAssignment(adminClient,
Utils.readFileAsString(opts.options.valueOf(opts.reassignmentJsonFileOpt)),
@ -761,7 +763,8 @@ public class ReassignPartitionsCommand {
Long interBrokerThrottle,
Long logDirThrottle,
Long timeoutMs,
Time time
Time time,
boolean disallowReplicationFactorChange
) throws ExecutionException, InterruptedException, JsonProcessingException, TerseException {
Entry<Map<TopicPartition, List<Integer>>, Map<TopicPartitionReplica, String>> t0 = parseExecuteAssignmentArgs(reassignmentJson);
@ -796,7 +799,7 @@ public class ReassignPartitionsCommand {
}
// Execute the partition reassignments.
Map<TopicPartition, Throwable> errors = alterPartitionReassignments(adminClient, proposedParts);
Map<TopicPartition, Throwable> errors = alterPartitionReassignments(adminClient, proposedParts, disallowReplicationFactorChange);
if (!errors.isEmpty()) {
throw new TerseException(
String.format("Error reassigning partition(s):%n%s",
@ -943,13 +946,17 @@ public class ReassignPartitionsCommand {
*
* @param adminClient The admin client object to use.
* @param reassignments A map from topic names to target replica assignments.
* @param disallowReplicationFactorChange Disallow replication factor change or not.
* @return A map from partition objects to error strings.
*/
static Map<TopicPartition, Throwable> alterPartitionReassignments(Admin adminClient,
Map<TopicPartition, List<Integer>> reassignments) throws InterruptedException {
Map<TopicPartition, List<Integer>> reassignments,
boolean disallowReplicationFactorChange) throws InterruptedException {
Map<TopicPartition, Optional<NewPartitionReassignment>> args = new HashMap<>();
reassignments.forEach((part, replicas) -> args.put(part, Optional.of(new NewPartitionReassignment(replicas))));
Map<TopicPartition, KafkaFuture<Void>> results = adminClient.alterPartitionReassignments(args).values();
AlterPartitionReassignmentsOptions options = new AlterPartitionReassignmentsOptions();
options.allowReplicationFactorChange(!disallowReplicationFactorChange);
Map<TopicPartition, KafkaFuture<Void>> results = adminClient.alterPartitionReassignments(args, options).values();
Map<TopicPartition, Throwable> errors = new HashMap<>();
for (Entry<TopicPartition, KafkaFuture<Void>> e : results.entrySet()) {
try {
@ -1485,7 +1492,8 @@ public class ReassignPartitionsCommand {
opts.commandConfigOpt,
opts.interBrokerThrottleOpt,
opts.replicaAlterLogDirsThrottleOpt,
opts.timeoutOpt
opts.timeoutOpt,
opts.disallowReplicationFactorChangeOpt
));
permittedArgs.put(opts.cancelOpt, Arrays.asList(
isBootstrapServer ? opts.bootstrapServerOpt : opts.bootstrapControllerOpt,

View File

@ -42,6 +42,7 @@ public class ReassignPartitionsCommandOptions extends CommandDefaultOptions {
final OptionSpec<Long> timeoutOpt;
final OptionSpec<?> additionalOpt;
final OptionSpec<?> preserveThrottlesOpt;
final OptionSpec<?> disallowReplicationFactorChangeOpt;
public ReassignPartitionsCommandOptions(String[] args) {
super(args);
@ -115,6 +116,7 @@ public class ReassignPartitionsCommandOptions extends CommandDefaultOptions {
additionalOpt = parser.accepts("additional", "Execute this reassignment in addition to any " +
"other ongoing ones. This option can also be used to change the throttle of an ongoing reassignment.");
preserveThrottlesOpt = parser.accepts("preserve-throttles", "Do not modify broker or topic throttles.");
disallowReplicationFactorChangeOpt = parser.accepts("disallow-replication-factor-change", "Denies the ability to change a partition's replication factor as part of this reassignment through adding validation against it.");
options = parser.parse(args);
}

View File

@ -249,7 +249,7 @@ public class ReplicationQuotasTestRig {
ReassignPartitionsCommand.executeAssignment(adminClient, false,
ReassignPartitionsCommand.formatAsReassignmentJson(newAssignment, Collections.emptyMap()),
config.throttle, -1L, 10000L, Time.SYSTEM);
config.throttle, -1L, 10000L, Time.SYSTEM, false);
//Await completion
waitForReassignmentToComplete();

View File

@ -93,6 +93,7 @@ import static org.apache.kafka.tools.reassign.ReassignPartitionsCommand.verifyAs
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ClusterTestDefaults(brokers = 5, disksPerBroker = 3, serverProperties = {
@ -432,6 +433,23 @@ public class ReassignPartitionsCommandTest {
}
}
@ClusterTest
public void testDisallowReplicationFactorChange() {
createTopics();
String assignment = "{\"version\":1,\"partitions\":" +
"[{\"topic\":\"foo\",\"partition\":0,\"replicas\":[0,1],\"log_dirs\":[\"any\",\"any\"]}," +
"{\"topic\":\"foo\",\"partition\":1,\"replicas\":[0,1,2,3],\"log_dirs\":[\"any\",\"any\",\"any\",\"any\"]}," +
"{\"topic\":\"bar\",\"partition\":0,\"replicas\":[3],\"log_dirs\":[\"any\"]}" +
"]}";
try (Admin admin = clusterInstance.admin()) {
assertEquals("Error reassigning partition(s):\n" +
"bar-0: The replication factor is changed from 3 to 1\n" +
"foo-0: The replication factor is changed from 3 to 2\n" +
"foo-1: The replication factor is changed from 3 to 4",
assertThrows(TerseException.class, () -> executeAssignment(admin, false, assignment, -1L, -1L, 10000L, Time.SYSTEM, true)).getMessage());
}
}
private void createTopics() {
try (Admin admin = Admin.create(Collections.singletonMap(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers()))) {
Map<Integer, List<Integer>> fooReplicasAssignments = new HashMap<>();
@ -654,7 +672,7 @@ public class ReassignPartitionsCommandTest {
Long replicaAlterLogDirsThrottle) throws RuntimeException {
try (Admin admin = Admin.create(Collections.singletonMap(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers()))) {
executeAssignment(admin, additional, reassignmentJson,
interBrokerThrottle, replicaAlterLogDirsThrottle, 10000L, Time.SYSTEM);
interBrokerThrottle, replicaAlterLogDirsThrottle, 10000L, Time.SYSTEM, false);
} catch (ExecutionException | InterruptedException | JsonProcessingException | TerseException e) {
throw new RuntimeException(e);
}

View File

@ -161,7 +161,7 @@ public class ReassignPartitionsUnitTest {
reassignments.put(new TopicPartition("foo", 0), asList(0, 1, 3));
reassignments.put(new TopicPartition("quux", 0), asList(1, 2, 3));
Map<TopicPartition, Throwable> reassignmentResult = alterPartitionReassignments(adminClient, reassignments);
Map<TopicPartition, Throwable> reassignmentResult = alterPartitionReassignments(adminClient, reassignments, false);
assertEquals(1, reassignmentResult.size());
assertEquals(UnknownTopicOrPartitionException.class, reassignmentResult.get(new TopicPartition("quux", 0)).getClass());
@ -606,7 +606,7 @@ public class ReassignPartitionsUnitTest {
"{\"version\":1,\"partitions\":" +
"[{\"topic\":\"foo\",\"partition\":0,\"replicas\":[0,1],\"log_dirs\":[\"any\",\"any\"]}," +
"{\"topic\":\"quux\",\"partition\":0,\"replicas\":[2,3,4],\"log_dirs\":[\"any\",\"any\",\"any\"]}" +
"]}", -1L, -1L, 10000L, Time.SYSTEM), "Expected reassignment with non-existent topic to fail").getCause().getMessage());
"]}", -1L, -1L, 10000L, Time.SYSTEM, false), "Expected reassignment with non-existent topic to fail").getCause().getMessage());
}
}
@ -619,7 +619,7 @@ public class ReassignPartitionsUnitTest {
"{\"version\":1,\"partitions\":" +
"[{\"topic\":\"foo\",\"partition\":0,\"replicas\":[0,1],\"log_dirs\":[\"any\",\"any\"]}," +
"{\"topic\":\"foo\",\"partition\":1,\"replicas\":[2,3,4],\"log_dirs\":[\"any\",\"any\",\"any\"]}" +
"]}", -1L, -1L, 10000L, Time.SYSTEM), "Expected reassignment with non-existent broker id to fail").getMessage());
"]}", -1L, -1L, 10000L, Time.SYSTEM, false), "Expected reassignment with non-existent broker id to fail").getMessage());
}
}
@ -670,7 +670,7 @@ public class ReassignPartitionsUnitTest {
reassignments.put(new TopicPartition("foo", 0), asList(0, 1, 4, 2));
reassignments.put(new TopicPartition("bar", 0), asList(2, 3));
Map<TopicPartition, Throwable> reassignmentResult = alterPartitionReassignments(adminClient, reassignments);
Map<TopicPartition, Throwable> reassignmentResult = alterPartitionReassignments(adminClient, reassignments, false);
assertTrue(reassignmentResult.isEmpty());
assertEquals(String.join(System.lineSeparator(),
@ -762,7 +762,7 @@ public class ReassignPartitionsUnitTest {
try (MockAdminClient adminClient = new MockAdminClient.Builder().numBrokers(4).build()) {
addTopics(adminClient);
assertStartsWith("Unexpected character",
assertThrows(AdminOperationException.class, () -> executeAssignment(adminClient, false, "{invalid_json", -1L, -1L, 10000L, Time.SYSTEM)).getMessage());
assertThrows(AdminOperationException.class, () -> executeAssignment(adminClient, false, "{invalid_json", -1L, -1L, 10000L, Time.SYSTEM, false)).getMessage());
}
}
}