mirror of https://github.com/apache/kafka.git
MINOR: Use primitive data types in tools (#20038)
Use primitive data types instead of wrappers in the tools module. Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
parent
a5a54dc32b
commit
959da32f97
|
@ -93,7 +93,7 @@ public class DelegationTokenCommand {
|
|||
|
||||
public static DelegationToken createToken(Admin adminClient, DelegationTokenCommandOptions opts) throws ExecutionException, InterruptedException {
|
||||
List<KafkaPrincipal> renewerPrincipals = getPrincipals(opts, opts.renewPrincipalsOpt);
|
||||
Long maxLifeTimeMs = opts.maxLifeTime();
|
||||
long maxLifeTimeMs = opts.maxLifeTime();
|
||||
|
||||
System.out.println("Calling create token operation with renewers :" + renewerPrincipals + " , max-life-time-period :" + maxLifeTimeMs);
|
||||
CreateDelegationTokenOptions createDelegationTokenOptions = new CreateDelegationTokenOptions().maxLifetimeMs(maxLifeTimeMs).renewers(renewerPrincipals);
|
||||
|
@ -141,7 +141,7 @@ public class DelegationTokenCommand {
|
|||
|
||||
public static Long renewToken(Admin adminClient, DelegationTokenCommandOptions opts) throws ExecutionException, InterruptedException {
|
||||
String hmac = opts.hmac();
|
||||
Long renewTimePeriodMs = opts.renewTimePeriod();
|
||||
long renewTimePeriodMs = opts.renewTimePeriod();
|
||||
|
||||
System.out.println("Calling renew token operation with hmac :" + hmac + " , renew-time-period :" + renewTimePeriodMs);
|
||||
RenewDelegationTokenResult renewResult = adminClient.renewDelegationToken(Base64.getDecoder().decode(hmac), new RenewDelegationTokenOptions().renewTimePeriodMs(renewTimePeriodMs));
|
||||
|
@ -153,7 +153,7 @@ public class DelegationTokenCommand {
|
|||
|
||||
public static void expireToken(Admin adminClient, DelegationTokenCommandOptions opts) throws ExecutionException, InterruptedException {
|
||||
String hmac = opts.hmac();
|
||||
Long expiryTimePeriodMs = opts.expiryTimePeriod();
|
||||
long expiryTimePeriodMs = opts.expiryTimePeriod();
|
||||
|
||||
System.out.println("Calling expire token operation with hmac :" + hmac + " , expire-time-period :" + expiryTimePeriodMs);
|
||||
ExpireDelegationTokenResult renewResult = adminClient.expireDelegationToken(Base64.getDecoder().decode(hmac), new ExpireDelegationTokenOptions().expiryTimePeriodMs(expiryTimePeriodMs));
|
||||
|
|
|
@ -234,7 +234,7 @@ public class GroupsCommand {
|
|||
checkArgs();
|
||||
}
|
||||
|
||||
public Boolean has(OptionSpec<?> builder) {
|
||||
public boolean has(OptionSpec<?> builder) {
|
||||
return options.has(builder);
|
||||
}
|
||||
|
||||
|
|
|
@ -189,7 +189,7 @@ public abstract class TopicCommand {
|
|||
|
||||
}
|
||||
|
||||
private static Integer getReplicationFactor(TopicPartitionInfo tpi, PartitionReassignment reassignment) {
|
||||
private static int getReplicationFactor(TopicPartitionInfo tpi, PartitionReassignment reassignment) {
|
||||
return isReassignmentInProgress(tpi, reassignment) ?
|
||||
reassignment.replicas().size() - reassignment.addingReplicas().size() :
|
||||
tpi.replicas().size();
|
||||
|
@ -204,7 +204,7 @@ public abstract class TopicCommand {
|
|||
* If set to true, the command will throw an exception if the topic with the
|
||||
* requested name does not exist.
|
||||
*/
|
||||
private static void ensureTopicExists(List<String> foundTopics, Optional<String> requestedTopic, Boolean requireTopicExists) {
|
||||
private static void ensureTopicExists(List<String> foundTopics, Optional<String> requestedTopic, boolean requireTopicExists) {
|
||||
// If no topic name was mentioned, do not need to throw exception.
|
||||
if (requestedTopic.isPresent() && !requestedTopic.get().isEmpty() && requireTopicExists && foundTopics.isEmpty()) {
|
||||
// If given topic doesn't exist then throw exception
|
||||
|
@ -212,7 +212,7 @@ public abstract class TopicCommand {
|
|||
}
|
||||
}
|
||||
|
||||
private static List<String> doGetTopics(List<String> allTopics, Optional<String> topicIncludeList, Boolean excludeInternalTopics) {
|
||||
private static List<String> doGetTopics(List<String> allTopics, Optional<String> topicIncludeList, boolean excludeInternalTopics) {
|
||||
if (topicIncludeList.isPresent()) {
|
||||
IncludeList topicsFilter = new IncludeList(topicIncludeList.get());
|
||||
return allTopics.stream()
|
||||
|
@ -234,7 +234,7 @@ public abstract class TopicCommand {
|
|||
* If set to true, the command will throw an exception if the topic with the
|
||||
* requested id does not exist.
|
||||
*/
|
||||
private static void ensureTopicIdExists(List<Uuid> foundTopicIds, Uuid requestedTopicId, Boolean requireTopicIdExists) {
|
||||
private static void ensureTopicIdExists(List<Uuid> foundTopicIds, Uuid requestedTopicId, boolean requireTopicIdExists) {
|
||||
// If no topic id was mentioned, do not need to throw exception.
|
||||
if (requestedTopicId != null && requireTopicIdExists && foundTopicIds.isEmpty()) {
|
||||
// If given topicId doesn't exist then throw exception
|
||||
|
@ -260,11 +260,11 @@ public abstract class TopicCommand {
|
|||
configsToAdd = parseTopicConfigsToBeAdded(options);
|
||||
}
|
||||
|
||||
public Boolean hasReplicaAssignment() {
|
||||
public boolean hasReplicaAssignment() {
|
||||
return !replicaAssignment.isEmpty();
|
||||
}
|
||||
|
||||
public Boolean ifTopicDoesntExist() {
|
||||
public boolean ifTopicDoesntExist() {
|
||||
return opts.ifNotExists();
|
||||
}
|
||||
}
|
||||
|
@ -272,12 +272,12 @@ public abstract class TopicCommand {
|
|||
static class TopicDescription {
|
||||
private final String topic;
|
||||
private final Uuid topicId;
|
||||
private final Integer numPartitions;
|
||||
private final Integer replicationFactor;
|
||||
private final int numPartitions;
|
||||
private final int replicationFactor;
|
||||
private final Config config;
|
||||
private final Boolean markedForDeletion;
|
||||
private final boolean markedForDeletion;
|
||||
|
||||
public TopicDescription(String topic, Uuid topicId, Integer numPartitions, Integer replicationFactor, Config config, Boolean markedForDeletion) {
|
||||
public TopicDescription(String topic, Uuid topicId, int numPartitions, int replicationFactor, Config config, boolean markedForDeletion) {
|
||||
this.topic = topic;
|
||||
this.topicId = topicId;
|
||||
this.numPartitions = numPartitions;
|
||||
|
@ -306,13 +306,13 @@ public abstract class TopicCommand {
|
|||
private final String topic;
|
||||
private final TopicPartitionInfo info;
|
||||
private final Config config;
|
||||
private final Boolean markedForDeletion;
|
||||
private final boolean markedForDeletion;
|
||||
private final PartitionReassignment reassignment;
|
||||
|
||||
PartitionDescription(String topic,
|
||||
TopicPartitionInfo info,
|
||||
Config config,
|
||||
Boolean markedForDeletion,
|
||||
boolean markedForDeletion,
|
||||
PartitionReassignment reassignment) {
|
||||
this.topic = topic;
|
||||
this.info = info;
|
||||
|
@ -321,11 +321,11 @@ public abstract class TopicCommand {
|
|||
this.reassignment = reassignment;
|
||||
}
|
||||
|
||||
public Integer minIsrCount() {
|
||||
public int minIsrCount() {
|
||||
return Integer.parseInt(config.get(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG).value());
|
||||
}
|
||||
|
||||
public Boolean isUnderReplicated() {
|
||||
public boolean isUnderReplicated() {
|
||||
return getReplicationFactor(info, reassignment) - info.isr().size() > 0;
|
||||
}
|
||||
|
||||
|
@ -333,15 +333,15 @@ public abstract class TopicCommand {
|
|||
return info.leader() != null;
|
||||
}
|
||||
|
||||
public Boolean isUnderMinIsr() {
|
||||
public boolean isUnderMinIsr() {
|
||||
return !hasLeader() || info.isr().size() < minIsrCount();
|
||||
}
|
||||
|
||||
public Boolean isAtMinIsrPartitions() {
|
||||
public boolean isAtMinIsrPartitions() {
|
||||
return minIsrCount() == info.isr().size();
|
||||
}
|
||||
|
||||
public Boolean hasUnavailablePartitions(Set<Integer> liveBrokers) {
|
||||
public boolean hasUnavailablePartitions(Set<Integer> liveBrokers) {
|
||||
return !hasLeader() || !liveBrokers.contains(info.leader().id());
|
||||
}
|
||||
|
||||
|
@ -519,7 +519,7 @@ public abstract class TopicCommand {
|
|||
String topicName) {
|
||||
if (topic.hasReplicaAssignment()) {
|
||||
try {
|
||||
Integer startPartitionId = topicsInfo.get(topicName).get().partitions().size();
|
||||
int startPartitionId = topicsInfo.get(topicName).get().partitions().size();
|
||||
Map<Integer, List<Integer>> replicaMap = topic.replicaAssignment.entrySet().stream()
|
||||
.skip(startPartitionId)
|
||||
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
|
||||
|
@ -552,7 +552,7 @@ public abstract class TopicCommand {
|
|||
// If topicId is provided and not zero, will use topicId regardless of topic name
|
||||
Optional<Uuid> inputTopicId = opts.topicId()
|
||||
.map(Uuid::fromString).filter(uuid -> !uuid.equals(Uuid.ZERO_UUID));
|
||||
Boolean useTopicId = inputTopicId.isPresent();
|
||||
boolean useTopicId = inputTopicId.isPresent();
|
||||
|
||||
List<Uuid> topicIds;
|
||||
List<String> topics;
|
||||
|
@ -826,7 +826,7 @@ public abstract class TopicCommand {
|
|||
checkArgs();
|
||||
}
|
||||
|
||||
public Boolean has(OptionSpec<?> builder) {
|
||||
public boolean has(OptionSpec<?> builder) {
|
||||
return options.has(builder);
|
||||
}
|
||||
|
||||
|
@ -850,23 +850,23 @@ public abstract class TopicCommand {
|
|||
return options.has(option) ? Optional.of(options.valuesOf(option)) : Optional.of(defaultValue);
|
||||
}
|
||||
|
||||
public Boolean hasCreateOption() {
|
||||
public boolean hasCreateOption() {
|
||||
return has(createOpt);
|
||||
}
|
||||
|
||||
public Boolean hasAlterOption() {
|
||||
public boolean hasAlterOption() {
|
||||
return has(alterOpt);
|
||||
}
|
||||
|
||||
public Boolean hasListOption() {
|
||||
public boolean hasListOption() {
|
||||
return has(listOpt);
|
||||
}
|
||||
|
||||
public Boolean hasDescribeOption() {
|
||||
public boolean hasDescribeOption() {
|
||||
return has(describeOpt);
|
||||
}
|
||||
|
||||
public Boolean hasDeleteOption() {
|
||||
public boolean hasDeleteOption() {
|
||||
return has(deleteOpt);
|
||||
}
|
||||
|
||||
|
@ -905,35 +905,35 @@ public abstract class TopicCommand {
|
|||
return Optional.empty();
|
||||
}
|
||||
|
||||
public Boolean reportUnderReplicatedPartitions() {
|
||||
public boolean reportUnderReplicatedPartitions() {
|
||||
return has(reportUnderReplicatedPartitionsOpt);
|
||||
}
|
||||
|
||||
public Boolean reportUnavailablePartitions() {
|
||||
public boolean reportUnavailablePartitions() {
|
||||
return has(reportUnavailablePartitionsOpt);
|
||||
}
|
||||
|
||||
public Boolean reportUnderMinIsrPartitions() {
|
||||
public boolean reportUnderMinIsrPartitions() {
|
||||
return has(reportUnderMinIsrPartitionsOpt);
|
||||
}
|
||||
|
||||
public Boolean reportAtMinIsrPartitions() {
|
||||
public boolean reportAtMinIsrPartitions() {
|
||||
return has(reportAtMinIsrPartitionsOpt);
|
||||
}
|
||||
|
||||
public Boolean reportOverriddenConfigs() {
|
||||
public boolean reportOverriddenConfigs() {
|
||||
return has(topicsWithOverridesOpt);
|
||||
}
|
||||
|
||||
public Boolean ifExists() {
|
||||
public boolean ifExists() {
|
||||
return has(ifExistsOpt);
|
||||
}
|
||||
|
||||
public Boolean ifNotExists() {
|
||||
public boolean ifNotExists() {
|
||||
return has(ifNotExistsOpt);
|
||||
}
|
||||
|
||||
public Boolean excludeInternalTopics() {
|
||||
public boolean excludeInternalTopics() {
|
||||
return has(excludeInternalTopicOpt);
|
||||
}
|
||||
|
||||
|
|
|
@ -206,7 +206,7 @@ public class ReassignPartitionsCommand {
|
|||
*/
|
||||
static VerifyAssignmentResult verifyAssignment(Admin adminClient,
|
||||
String jsonString,
|
||||
Boolean preserveThrottles
|
||||
boolean preserveThrottles
|
||||
) throws ExecutionException, InterruptedException, JsonProcessingException {
|
||||
Entry<List<Entry<TopicPartition, List<Integer>>>, Map<TopicPartitionReplica, String>> t0 = parsePartitionReassignmentData(jsonString);
|
||||
|
||||
|
@ -246,7 +246,7 @@ public class ReassignPartitionsCommand {
|
|||
* in the JSON file.)
|
||||
*/
|
||||
private static Entry<Map<TopicPartition, PartitionReassignmentState>, Boolean> verifyPartitionAssignments(Admin adminClient,
|
||||
List<Entry<TopicPartition, List<Integer>>> targets
|
||||
List<Entry<TopicPartition, List<Integer>>> targets
|
||||
) throws ExecutionException, InterruptedException {
|
||||
Entry<Map<TopicPartition, PartitionReassignmentState>, Boolean> t0 = findPartitionReassignmentStates(adminClient, targets);
|
||||
System.out.println(partitionReassignmentStatesToString(t0.getKey()));
|
||||
|
@ -307,8 +307,8 @@ public class ReassignPartitionsCommand {
|
|||
* partition, plus whether there are any ongoing reassignments.
|
||||
*/
|
||||
static Entry<Map<TopicPartition, PartitionReassignmentState>, Boolean> findPartitionReassignmentStates(Admin adminClient,
|
||||
List<Entry<TopicPartition,
|
||||
List<Integer>>> targetReassignments
|
||||
List<Entry<TopicPartition,
|
||||
List<Integer>>> targetReassignments
|
||||
) throws ExecutionException, InterruptedException {
|
||||
Map<TopicPartition, PartitionReassignment> currentReassignments = adminClient.
|
||||
listPartitionReassignments().reassignments().get();
|
||||
|
@ -402,7 +402,7 @@ public class ReassignPartitionsCommand {
|
|||
* returns all ongoing replica reassignments.)
|
||||
*/
|
||||
private static Entry<Map<TopicPartitionReplica, LogDirMoveState>, Boolean> verifyReplicaMoves(Admin adminClient,
|
||||
Map<TopicPartitionReplica, String> targetReassignments
|
||||
Map<TopicPartitionReplica, String> targetReassignments
|
||||
) throws ExecutionException, InterruptedException {
|
||||
Map<TopicPartitionReplica, LogDirMoveState> moveStates = findLogDirMoveStates(adminClient, targetReassignments);
|
||||
System.out.println(replicaMoveStatesToString(moveStates));
|
||||
|
@ -556,9 +556,9 @@ public class ReassignPartitionsCommand {
|
|||
* current assignment.
|
||||
*/
|
||||
public static Entry<Map<TopicPartition, List<Integer>>, Map<TopicPartition, List<Integer>>> generateAssignment(Admin adminClient,
|
||||
String reassignmentJson,
|
||||
String brokerListString,
|
||||
Boolean enableRackAwareness
|
||||
String reassignmentJson,
|
||||
String brokerListString,
|
||||
boolean enableRackAwareness
|
||||
) throws ExecutionException, InterruptedException, JsonProcessingException {
|
||||
Entry<List<Integer>, List<String>> t0 = parseGenerateAssignmentArgs(reassignmentJson, brokerListString);
|
||||
|
||||
|
@ -723,7 +723,7 @@ public class ReassignPartitionsCommand {
|
|||
* @return A tuple of brokers to reassign, topics to reassign
|
||||
*/
|
||||
static Entry<List<Integer>, List<String>> parseGenerateAssignmentArgs(String reassignmentJson,
|
||||
String brokerList) throws JsonMappingException {
|
||||
String brokerList) throws JsonMappingException {
|
||||
List<Integer> brokerListToReassign = Stream.of(brokerList.split(",")).map(Integer::parseInt).collect(Collectors.toList());
|
||||
Set<Integer> duplicateReassignments = ToolsUtils.duplicates(brokerListToReassign);
|
||||
if (!duplicateReassignments.isEmpty())
|
||||
|
@ -751,13 +751,13 @@ public class ReassignPartitionsCommand {
|
|||
* @param time The Time object to use.
|
||||
*/
|
||||
public static void executeAssignment(Admin adminClient,
|
||||
Boolean additional,
|
||||
String reassignmentJson,
|
||||
Long interBrokerThrottle,
|
||||
Long logDirThrottle,
|
||||
Long timeoutMs,
|
||||
Time time,
|
||||
boolean disallowReplicationFactorChange
|
||||
boolean additional,
|
||||
String reassignmentJson,
|
||||
long interBrokerThrottle,
|
||||
long logDirThrottle,
|
||||
long timeoutMs,
|
||||
Time time,
|
||||
boolean disallowReplicationFactorChange
|
||||
) throws ExecutionException, InterruptedException, JsonProcessingException, TerseException {
|
||||
Entry<Map<TopicPartition, List<Integer>>, Map<TopicPartitionReplica, String>> t0 = parseExecuteAssignmentArgs(reassignmentJson);
|
||||
|
||||
|
@ -824,7 +824,7 @@ public class ReassignPartitionsCommand {
|
|||
*/
|
||||
private static void executeMoves(Admin adminClient,
|
||||
Map<TopicPartitionReplica, String> proposedReplicas,
|
||||
Long timeoutMs,
|
||||
long timeoutMs,
|
||||
Time time
|
||||
) throws InterruptedException, TerseException {
|
||||
long startTimeMs = time.milliseconds();
|
||||
|
@ -1143,7 +1143,7 @@ public class ReassignPartitionsCommand {
|
|||
private static void modifyReassignmentThrottle(
|
||||
Admin admin,
|
||||
Map<String, Map<Integer, PartitionMove>> moveMap,
|
||||
Long interBrokerThrottle
|
||||
long interBrokerThrottle
|
||||
) throws ExecutionException, InterruptedException {
|
||||
Map<String, String> leaderThrottles = calculateLeaderThrottles(moveMap);
|
||||
Map<String, String> followerThrottles = calculateFollowerThrottles(moveMap);
|
||||
|
@ -1256,10 +1256,10 @@ public class ReassignPartitionsCommand {
|
|||
* and the replica movements that were cancelled.
|
||||
*/
|
||||
static Entry<Set<TopicPartition>, Set<TopicPartitionReplica>> cancelAssignment(Admin adminClient,
|
||||
String jsonString,
|
||||
Boolean preserveThrottles,
|
||||
Long timeoutMs,
|
||||
Time time
|
||||
String jsonString,
|
||||
boolean preserveThrottles,
|
||||
long timeoutMs,
|
||||
Time time
|
||||
) throws ExecutionException, InterruptedException, JsonProcessingException, TerseException {
|
||||
Entry<List<Entry<TopicPartition, List<Integer>>>, Map<TopicPartitionReplica, String>> t0 = parsePartitionReassignmentData(jsonString);
|
||||
|
||||
|
@ -1306,7 +1306,7 @@ public class ReassignPartitionsCommand {
|
|||
}
|
||||
|
||||
public static String formatAsReassignmentJson(Map<TopicPartition, List<Integer>> partitionsToBeReassigned,
|
||||
Map<TopicPartitionReplica, String> replicaLogDirAssignment) throws JsonProcessingException {
|
||||
Map<TopicPartitionReplica, String> replicaLogDirAssignment) throws JsonProcessingException {
|
||||
List<Map<String, Object>> partitions = new ArrayList<>();
|
||||
partitionsToBeReassigned.keySet().stream().sorted(ReassignPartitionsCommand::compareTopicPartitions).forEach(tp -> {
|
||||
List<Integer> replicas = partitionsToBeReassigned.get(tp);
|
||||
|
|
|
@ -1427,12 +1427,12 @@ public class TopicCommandTest {
|
|||
|
||||
private void checkReplicaDistribution(Map<Integer, List<Integer>> assignment,
|
||||
Map<Integer, String> brokerRackMapping,
|
||||
Integer numBrokers,
|
||||
Integer numPartitions,
|
||||
Integer replicationFactor,
|
||||
Boolean verifyRackAware,
|
||||
Boolean verifyLeaderDistribution,
|
||||
Boolean verifyReplicasDistribution) {
|
||||
int numBrokers,
|
||||
int numPartitions,
|
||||
int replicationFactor,
|
||||
boolean verifyRackAware,
|
||||
boolean verifyLeaderDistribution,
|
||||
boolean verifyReplicasDistribution) {
|
||||
// always verify that no broker will be assigned for more than one replica
|
||||
assignment.forEach((partition, assignedNodes) -> assertEquals(new HashSet<>(assignedNodes).size(), assignedNodes.size(),
|
||||
"More than one replica is assigned to same broker for the same partition"));
|
||||
|
|
|
@ -674,7 +674,7 @@ public class ReassignPartitionsCommandTest {
|
|||
|
||||
private VerifyAssignmentResult runVerifyAssignment(Admin admin,
|
||||
String jsonString,
|
||||
Boolean preserveThrottles) {
|
||||
boolean preserveThrottles) {
|
||||
try {
|
||||
return verifyAssignment(admin, jsonString, preserveThrottles);
|
||||
} catch (ExecutionException | InterruptedException | JsonProcessingException e) {
|
||||
|
@ -684,7 +684,7 @@ public class ReassignPartitionsCommandTest {
|
|||
|
||||
private void waitForVerifyAssignment(Admin admin,
|
||||
String jsonString,
|
||||
Boolean preserveThrottles,
|
||||
boolean preserveThrottles,
|
||||
VerifyAssignmentResult expectedResult) throws InterruptedException {
|
||||
final VerifyAssignmentResult[] latestResult = {null};
|
||||
TestUtils.waitForCondition(
|
||||
|
@ -696,7 +696,7 @@ public class ReassignPartitionsCommandTest {
|
|||
);
|
||||
}
|
||||
|
||||
private void runExecuteAssignment(Boolean additional,
|
||||
private void runExecuteAssignment(boolean additional,
|
||||
String reassignmentJson,
|
||||
Long interBrokerThrottle,
|
||||
Long replicaAlterLogDirsThrottle) throws RuntimeException {
|
||||
|
@ -710,8 +710,8 @@ public class ReassignPartitionsCommandTest {
|
|||
|
||||
private Map.Entry<Set<TopicPartition>, Set<TopicPartitionReplica>> runCancelAssignment(
|
||||
String jsonString,
|
||||
Boolean preserveThrottles,
|
||||
Boolean useBootstrapServer
|
||||
boolean preserveThrottles,
|
||||
boolean useBootstrapServer
|
||||
) {
|
||||
Map<String, Object> config;
|
||||
if (useBootstrapServer) {
|
||||
|
|
Loading…
Reference in New Issue