mirror of https://github.com/apache/kafka.git
KAFKA-14595: Move classes from ReassignPartitionsCommand to tools (#14172)
Reviewers: Mickael Maison <mickael.maison@gmail.com>
This commit is contained in:
parent
f137da04fa
commit
1fd58e30cf
|
@ -96,7 +96,7 @@ public interface DecodeJson<T> {
|
|||
};
|
||||
}
|
||||
|
||||
static <E> DecodeJson<List<E>> decodeList(DecodeJson<E> decodeJson) throws JsonMappingException {
|
||||
static <E> DecodeJson<List<E>> decodeList(DecodeJson<E> decodeJson) {
|
||||
return node -> {
|
||||
if (node.isArray()) {
|
||||
List<E> result = new ArrayList<>();
|
||||
|
@ -110,7 +110,7 @@ public interface DecodeJson<T> {
|
|||
};
|
||||
}
|
||||
|
||||
static <V> DecodeJson<Map<String, V>> decodeMap(DecodeJson<V> decodeJson) throws JsonMappingException {
|
||||
static <V> DecodeJson<Map<String, V>> decodeMap(DecodeJson<V> decodeJson) {
|
||||
return node -> {
|
||||
if (node.isObject()) {
|
||||
Map<String, V> result = new HashMap<>();
|
||||
|
|
|
@ -0,0 +1,60 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.kafka.tools.reassign;
|
||||
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* A replica log directory move state where the move is in progress.
|
||||
*/
|
||||
final class ActiveMoveState implements LogDirMoveState {
|
||||
public final String currentLogDir;
|
||||
|
||||
public final String targetLogDir;
|
||||
|
||||
public final String futureLogDir;
|
||||
|
||||
/**
|
||||
* @param currentLogDir The current log directory.
|
||||
* @param futureLogDir The log directory that the replica is moving to.
|
||||
* @param targetLogDir The log directory that we wanted the replica to move to.
|
||||
*/
|
||||
public ActiveMoveState(String currentLogDir, String targetLogDir, String futureLogDir) {
|
||||
this.currentLogDir = currentLogDir;
|
||||
this.targetLogDir = targetLogDir;
|
||||
this.futureLogDir = futureLogDir;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean done() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) return true;
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
ActiveMoveState that = (ActiveMoveState) o;
|
||||
return Objects.equals(currentLogDir, that.currentLogDir) && Objects.equals(targetLogDir, that.targetLogDir) && Objects.equals(futureLogDir, that.futureLogDir);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(currentLogDir, targetLogDir, futureLogDir);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,57 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.kafka.tools.reassign;
|
||||
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* A replica log directory move state where there is no move in progress, but we did not
|
||||
* reach the target log directory.
|
||||
*/
|
||||
final class CancelledMoveState implements LogDirMoveState {
|
||||
public final String currentLogDir;
|
||||
|
||||
public final String targetLogDir;
|
||||
|
||||
/**
|
||||
* @param currentLogDir The current log directory.
|
||||
* @param targetLogDir The log directory that we wanted the replica to move to.
|
||||
*/
|
||||
public CancelledMoveState(String currentLogDir, String targetLogDir) {
|
||||
this.currentLogDir = currentLogDir;
|
||||
this.targetLogDir = targetLogDir;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean done() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) return true;
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
CancelledMoveState that = (CancelledMoveState) o;
|
||||
return Objects.equals(currentLogDir, that.currentLogDir) && Objects.equals(targetLogDir, that.targetLogDir);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(currentLogDir, targetLogDir);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,52 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.kafka.tools.reassign;
|
||||
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* The completed replica log directory move state.
|
||||
*/
|
||||
final class CompletedMoveState implements LogDirMoveState {
|
||||
public final String targetLogDir;
|
||||
|
||||
/**
|
||||
* @param targetLogDir The log directory that we wanted the replica to move to.
|
||||
*/
|
||||
public CompletedMoveState(String targetLogDir) {
|
||||
this.targetLogDir = targetLogDir;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean done() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) return true;
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
CompletedMoveState that = (CompletedMoveState) o;
|
||||
return Objects.equals(targetLogDir, that.targetLogDir);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(targetLogDir);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,28 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.kafka.tools.reassign;
|
||||
|
||||
/**
|
||||
* The state of a replica log directory movement.
|
||||
*/
|
||||
interface LogDirMoveState {
|
||||
/**
|
||||
* True if the move is done without errors.
|
||||
*/
|
||||
boolean done();
|
||||
}
|
|
@ -0,0 +1,52 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.kafka.tools.reassign;
|
||||
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* A replica log directory move state where the source replica is missing.
|
||||
*/
|
||||
final class MissingLogDirMoveState implements LogDirMoveState {
|
||||
public final String targetLogDir;
|
||||
|
||||
/**
|
||||
* @param targetLogDir The log directory that we wanted the replica to move to.
|
||||
*/
|
||||
public MissingLogDirMoveState(String targetLogDir) {
|
||||
this.targetLogDir = targetLogDir;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean done() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) return true;
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
MissingLogDirMoveState that = (MissingLogDirMoveState) o;
|
||||
return Objects.equals(targetLogDir, that.targetLogDir);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(targetLogDir);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,52 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.kafka.tools.reassign;
|
||||
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* A replica log directory move state where the source log directory is missing.
|
||||
*/
|
||||
final class MissingReplicaMoveState implements LogDirMoveState {
|
||||
public final String targetLogDir;
|
||||
|
||||
/**
|
||||
* @param targetLogDir The log directory that we wanted the replica to move to.
|
||||
*/
|
||||
public MissingReplicaMoveState(String targetLogDir) {
|
||||
this.targetLogDir = targetLogDir;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean done() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) return true;
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
MissingReplicaMoveState that = (MissingReplicaMoveState) o;
|
||||
return Objects.equals(targetLogDir, that.targetLogDir);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(targetLogDir);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,52 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.kafka.tools.reassign;
|
||||
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* A partition movement. The source and destination brokers may overlap.
|
||||
*/
|
||||
final class PartitionMove {
|
||||
public final Set<Integer> sources;
|
||||
|
||||
public final Set<Integer> destinations;
|
||||
|
||||
/**
|
||||
* @param sources The source brokers.
|
||||
* @param destinations The destination brokers.
|
||||
*/
|
||||
public PartitionMove(Set<Integer> sources, Set<Integer> destinations) {
|
||||
this.sources = sources;
|
||||
this.destinations = destinations;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) return true;
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
PartitionMove that = (PartitionMove) o;
|
||||
return Objects.equals(sources, that.sources) && Objects.equals(destinations, that.destinations);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(sources, destinations);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,57 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.kafka.tools.reassign;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* The state of a partition reassignment. The current replicas and target replicas
|
||||
* may overlap.
|
||||
*/
|
||||
final class PartitionReassignmentState {
|
||||
public final List<Integer> currentReplicas;
|
||||
|
||||
public final List<Integer> targetReplicas;
|
||||
|
||||
public final boolean done;
|
||||
|
||||
/**
|
||||
* @param currentReplicas The current replicas.
|
||||
* @param targetReplicas The target replicas.
|
||||
* @param done True if the reassignment is done.
|
||||
*/
|
||||
public PartitionReassignmentState(List<Integer> currentReplicas, List<Integer> targetReplicas, boolean done) {
|
||||
this.currentReplicas = currentReplicas;
|
||||
this.targetReplicas = targetReplicas;
|
||||
this.done = done;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) return true;
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
PartitionReassignmentState state = (PartitionReassignmentState) o;
|
||||
return done == state.done && Objects.equals(currentReplicas, state.currentReplicas) && Objects.equals(targetReplicas, state.targetReplicas);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(currentReplicas, targetReplicas, done);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,176 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.kafka.tools.reassign;
|
||||
|
||||
import joptsimple.OptionSpec;
|
||||
import org.apache.kafka.server.util.CommandDefaultOptions;
|
||||
|
||||
public class ReassignPartitionsCommandOptions extends CommandDefaultOptions {
|
||||
// Actions
|
||||
final OptionSpec<?> verifyOpt;
|
||||
final OptionSpec<?> generateOpt;
|
||||
final OptionSpec<?> executeOpt;
|
||||
final OptionSpec<?> cancelOpt;
|
||||
final OptionSpec<?> listOpt;
|
||||
|
||||
// Arguments
|
||||
final OptionSpec<String> bootstrapServerOpt;
|
||||
final OptionSpec<String> commandConfigOpt;
|
||||
final OptionSpec<String> reassignmentJsonFileOpt;
|
||||
final OptionSpec<String> topicsToMoveJsonFileOpt;
|
||||
final OptionSpec<String> brokerListOpt;
|
||||
final OptionSpec<?> disableRackAware;
|
||||
final OptionSpec<Long> interBrokerThrottleOpt;
|
||||
final OptionSpec<Long> replicaAlterLogDirsThrottleOpt;
|
||||
final OptionSpec<Long> timeoutOpt;
|
||||
final OptionSpec<?> additionalOpt;
|
||||
final OptionSpec<?> preserveThrottlesOpt;
|
||||
|
||||
public ReassignPartitionsCommandOptions(String[] args) {
|
||||
super(args);
|
||||
|
||||
verifyOpt = parser.accepts("verify", "Verify if the reassignment completed as specified by the " +
|
||||
"--reassignment-json-file option. If there is a throttle engaged for the replicas specified, and the rebalance has completed, the throttle will be removed");
|
||||
generateOpt = parser.accepts("generate", "Generate a candidate partition reassignment configuration." +
|
||||
" Note that this only generates a candidate assignment, it does not execute it.");
|
||||
executeOpt = parser.accepts("execute", "Kick off the reassignment as specified by the --reassignment-json-file option.");
|
||||
cancelOpt = parser.accepts("cancel", "Cancel an active reassignment.");
|
||||
listOpt = parser.accepts("list", "List all active partition reassignments.");
|
||||
|
||||
// Arguments
|
||||
bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED: the server(s) to use for bootstrapping.")
|
||||
.withRequiredArg()
|
||||
.describedAs("Server(s) to use for bootstrapping")
|
||||
.ofType(String.class);
|
||||
|
||||
commandConfigOpt = parser.accepts("command-config", "Property file containing configs to be passed to Admin Client.")
|
||||
.withRequiredArg()
|
||||
.describedAs("Admin client property file")
|
||||
.ofType(String.class);
|
||||
|
||||
reassignmentJsonFileOpt = parser.accepts("reassignment-json-file", "The JSON file with the partition reassignment configuration" +
|
||||
"The format to use is - \n" +
|
||||
"{\"partitions\":\n\t[{\"topic\": \"foo\",\n\t \"partition\": 1,\n\t \"replicas\": [1,2,3],\n\t \"log_dirs\": [\"dir1\",\"dir2\",\"dir3\"] }],\n\"version\":1\n}\n" +
|
||||
"Note that \"log_dirs\" is optional. When it is specified, its length must equal the length of the replicas list. The value in this list " +
|
||||
"can be either \"any\" or the absolution path of the log directory on the broker. If absolute log directory path is specified, the replica will be moved to the specified log directory on the broker.")
|
||||
.withRequiredArg()
|
||||
.describedAs("manual assignment json file path")
|
||||
.ofType(String.class);
|
||||
topicsToMoveJsonFileOpt = parser.accepts("topics-to-move-json-file", "Generate a reassignment configuration to move the partitions" +
|
||||
" of the specified topics to the list of brokers specified by the --broker-list option. The format to use is - \n" +
|
||||
"{\"topics\":\n\t[{\"topic\": \"foo\"},{\"topic\": \"foo1\"}],\n\"version\":1\n}")
|
||||
.withRequiredArg()
|
||||
.describedAs("topics to reassign json file path")
|
||||
.ofType(String.class);
|
||||
brokerListOpt = parser.accepts("broker-list", "The list of brokers to which the partitions need to be reassigned" +
|
||||
" in the form \"0,1,2\". This is required if --topics-to-move-json-file is used to generate reassignment configuration")
|
||||
.withRequiredArg()
|
||||
.describedAs("brokerlist")
|
||||
.ofType(String.class);
|
||||
disableRackAware = parser.accepts("disable-rack-aware", "Disable rack aware replica assignment");
|
||||
interBrokerThrottleOpt = parser.accepts("throttle", "The movement of partitions between brokers will be throttled to this value (bytes/sec). " +
|
||||
"This option can be included with --execute when a reassignment is started, and it can be altered by resubmitting the current reassignment " +
|
||||
"along with the --additional flag. The throttle rate should be at least 1 KB/s.")
|
||||
.withRequiredArg()
|
||||
.describedAs("throttle")
|
||||
.ofType(Long.class)
|
||||
.defaultsTo(-1L);
|
||||
replicaAlterLogDirsThrottleOpt = parser.accepts("replica-alter-log-dirs-throttle",
|
||||
"The movement of replicas between log directories on the same broker will be throttled to this value (bytes/sec). " +
|
||||
"This option can be included with --execute when a reassignment is started, and it can be altered by resubmitting the current reassignment " +
|
||||
"along with the --additional flag. The throttle rate should be at least 1 KB/s.")
|
||||
.withRequiredArg()
|
||||
.describedAs("replicaAlterLogDirsThrottle")
|
||||
.ofType(Long.class)
|
||||
.defaultsTo(-1L);
|
||||
timeoutOpt = parser.accepts("timeout", "The maximum time in ms to wait for log directory replica assignment to begin.")
|
||||
.withRequiredArg()
|
||||
.describedAs("timeout")
|
||||
.ofType(Long.class)
|
||||
.defaultsTo(10000L);
|
||||
additionalOpt = parser.accepts("additional", "Execute this reassignment in addition to any " +
|
||||
"other ongoing ones. This option can also be used to change the throttle of an ongoing reassignment.");
|
||||
preserveThrottlesOpt = parser.accepts("preserve-throttles", "Do not modify broker or topic throttles.");
|
||||
|
||||
options = parser.parse(args);
|
||||
}
|
||||
|
||||
public OptionSpec<?> verifyOpt() {
|
||||
return verifyOpt;
|
||||
}
|
||||
|
||||
public OptionSpec<?> generateOpt() {
|
||||
return generateOpt;
|
||||
}
|
||||
|
||||
public OptionSpec<?> executeOpt() {
|
||||
return executeOpt;
|
||||
}
|
||||
|
||||
public OptionSpec<?> cancelOpt() {
|
||||
return cancelOpt;
|
||||
}
|
||||
|
||||
public OptionSpec<?> listOpt() {
|
||||
return listOpt;
|
||||
}
|
||||
|
||||
public OptionSpec<String> bootstrapServerOpt() {
|
||||
return bootstrapServerOpt;
|
||||
}
|
||||
|
||||
public OptionSpec<String> commandConfigOpt() {
|
||||
return commandConfigOpt;
|
||||
}
|
||||
|
||||
public OptionSpec<String> reassignmentJsonFileOpt() {
|
||||
return reassignmentJsonFileOpt;
|
||||
}
|
||||
|
||||
public OptionSpec<String> topicsToMoveJsonFileOpt() {
|
||||
return topicsToMoveJsonFileOpt;
|
||||
}
|
||||
|
||||
public OptionSpec<String> brokerListOpt() {
|
||||
return brokerListOpt;
|
||||
}
|
||||
|
||||
public OptionSpec<?> disableRackAware() {
|
||||
return disableRackAware;
|
||||
}
|
||||
|
||||
public OptionSpec<Long> interBrokerThrottleOpt() {
|
||||
return interBrokerThrottleOpt;
|
||||
}
|
||||
|
||||
public OptionSpec<Long> replicaAlterLogDirsThrottleOpt() {
|
||||
return replicaAlterLogDirsThrottleOpt;
|
||||
}
|
||||
|
||||
public OptionSpec<Long> timeoutOpt() {
|
||||
return timeoutOpt;
|
||||
}
|
||||
|
||||
public OptionSpec<?> additionalOpt() {
|
||||
return additionalOpt;
|
||||
}
|
||||
|
||||
public OptionSpec<?> preserveThrottlesOpt() {
|
||||
return preserveThrottlesOpt;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,70 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.kafka.tools.reassign;
|
||||
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.apache.kafka.common.TopicPartitionReplica;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* A result returned from verifyAssignment.
|
||||
*/
|
||||
public final class VerifyAssignmentResult {
|
||||
public final Map<TopicPartition, PartitionReassignmentState> partStates;
|
||||
public final boolean partsOngoing;
|
||||
public final Map<TopicPartitionReplica, LogDirMoveState> moveStates;
|
||||
public final boolean movesOngoing;
|
||||
|
||||
public VerifyAssignmentResult(Map<TopicPartition, PartitionReassignmentState> partStates) {
|
||||
this(partStates, false, Collections.emptyMap(), false);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param partStates A map from partitions to reassignment states.
|
||||
* @param partsOngoing True if there are any ongoing partition reassignments.
|
||||
* @param moveStates A map from log directories to movement states.
|
||||
* @param movesOngoing True if there are any ongoing moves that we know about.
|
||||
*/
|
||||
public VerifyAssignmentResult(
|
||||
Map<TopicPartition, PartitionReassignmentState> partStates,
|
||||
boolean partsOngoing,
|
||||
Map<org.apache.kafka.common.TopicPartitionReplica, LogDirMoveState> moveStates,
|
||||
boolean movesOngoing
|
||||
) {
|
||||
this.partStates = partStates;
|
||||
this.partsOngoing = partsOngoing;
|
||||
this.moveStates = moveStates;
|
||||
this.movesOngoing = movesOngoing;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) return true;
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
VerifyAssignmentResult that = (VerifyAssignmentResult) o;
|
||||
return partsOngoing == that.partsOngoing && movesOngoing == that.movesOngoing && Objects.equals(partStates, that.partStates) && Objects.equals(moveStates, that.moveStates);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(partStates, partsOngoing, moveStates, movesOngoing);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue