MINOR: KafkaAdminClient Java 8 code cleanup (#5594)

Use lambdas and diamond operator whenever possible.

Reviewers: Ismael Juma <ismael@juma.me.uk>
This commit is contained in:
Viktor Somogyi 2018-09-13 06:13:03 +02:00 committed by Ismael Juma
parent c121f4eb82
commit 398d2ab244
1 changed files with 34 additions and 39 deletions

View File

@ -379,7 +379,7 @@ public class KafkaAdminClient extends AdminClient {
String clientId = generateClientId(config);
try {
metrics = new Metrics(new MetricConfig(), new LinkedList<MetricsReporter>(), time);
metrics = new Metrics(new MetricConfig(), new LinkedList<>(), time);
LogContext logContext = createLogContext(clientId);
AdminMetadataManager metadataManager = new AdminMetadataManager(logContext,
config.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG),
@ -897,7 +897,7 @@ public class KafkaAdminClient extends AdminClient {
}
Call call = calls.remove(0);
int timeoutMs = calcTimeoutMsRemainingAsInt(now, call.deadlineMs);
AbstractRequest.Builder<?> requestBuilder = null;
AbstractRequest.Builder<?> requestBuilder;
try {
requestBuilder = call.createRequest(timeoutMs);
} catch (Throwable throwable) {
@ -1201,7 +1201,7 @@ public class KafkaAdminClient extends AdminClient {
// Since this only requests node information, it's safe to pass true
// for allowAutoTopicCreation (and it simplifies communication with
// older brokers)
return new MetadataRequest.Builder(Collections.<String>emptyList(), true);
return new MetadataRequest.Builder(Collections.emptyList(), true);
}
@Override
@ -1248,7 +1248,7 @@ public class KafkaAdminClient extends AdminClient {
newTopic.name() + "' cannot be represented in a request."));
topicFutures.put(newTopic.name(), future);
} else if (!topicFutures.containsKey(newTopic.name())) {
topicFutures.put(newTopic.name(), new KafkaFutureImpl<Void>());
topicFutures.put(newTopic.name(), new KafkaFutureImpl<>());
topicsMap.put(newTopic.name(), newTopic.convertToTopicDetails());
}
}
@ -1304,7 +1304,7 @@ public class KafkaAdminClient extends AdminClient {
if (!topicsMap.isEmpty()) {
runnable.call(call, now);
}
return new CreateTopicsResult(new HashMap<String, KafkaFuture<Void>>(topicFutures));
return new CreateTopicsResult(new HashMap<>(topicFutures));
}
@Override
@ -1319,7 +1319,7 @@ public class KafkaAdminClient extends AdminClient {
topicName + "' cannot be represented in a request."));
topicFutures.put(topicName, future);
} else if (!topicFutures.containsKey(topicName)) {
topicFutures.put(topicName, new KafkaFutureImpl<Void>());
topicFutures.put(topicName, new KafkaFutureImpl<>());
validTopicNames.add(topicName);
}
}
@ -1375,7 +1375,7 @@ public class KafkaAdminClient extends AdminClient {
if (!validTopicNames.isEmpty()) {
runnable.call(call, now);
}
return new DeleteTopicsResult(new HashMap<String, KafkaFuture<Void>>(topicFutures));
return new DeleteTopicsResult(new HashMap<>(topicFutures));
}
@Override
@ -1417,12 +1417,12 @@ public class KafkaAdminClient extends AdminClient {
final ArrayList<String> topicNamesList = new ArrayList<>();
for (String topicName : topicNames) {
if (topicNameIsUnrepresentable(topicName)) {
KafkaFutureImpl<TopicDescription> future = new KafkaFutureImpl<TopicDescription>();
KafkaFutureImpl<TopicDescription> future = new KafkaFutureImpl<>();
future.completeExceptionally(new InvalidTopicException("The given topic name '" +
topicName + "' cannot be represented in a request."));
topicFutures.put(topicName, future);
} else if (!topicFutures.containsKey(topicName)) {
topicFutures.put(topicName, new KafkaFutureImpl<TopicDescription>());
topicFutures.put(topicName, new KafkaFutureImpl<>());
topicNamesList.add(topicName);
}
}
@ -1467,12 +1467,7 @@ public class KafkaAdminClient extends AdminClient {
Arrays.asList(partitionInfo.inSyncReplicas()));
partitions.add(topicPartitionInfo);
}
Collections.sort(partitions, new Comparator<TopicPartitionInfo>() {
@Override
public int compare(TopicPartitionInfo tp1, TopicPartitionInfo tp2) {
return Integer.compare(tp1.partition(), tp2.partition());
}
});
partitions.sort(Comparator.comparingInt(TopicPartitionInfo::partition));
TopicDescription topicDescription = new TopicDescription(topicName, isInternal, partitions);
future.complete(topicDescription);
}
@ -1501,7 +1496,7 @@ public class KafkaAdminClient extends AdminClient {
if (!topicNamesList.isEmpty()) {
runnable.call(call, now);
}
return new DescribeTopicsResult(new HashMap<String, KafkaFuture<TopicDescription>>(topicFutures));
return new DescribeTopicsResult(new HashMap<>(topicFutures));
}
@Override
@ -1517,7 +1512,7 @@ public class KafkaAdminClient extends AdminClient {
AbstractRequest.Builder createRequest(int timeoutMs) {
// Since this only requests node information, it's safe to pass true for allowAutoTopicCreation (and it
// simplifies communication with older brokers)
return new MetadataRequest.Builder(Collections.<String>emptyList(), true);
return new MetadataRequest.Builder(Collections.emptyList(), true);
}
@Override
@ -1627,7 +1622,7 @@ public class KafkaAdminClient extends AdminClient {
completeAllExceptionally(futures.values(), throwable);
}
}, now);
return new CreateAclsResult(new HashMap<AclBinding, KafkaFuture<Void>>(futures));
return new CreateAclsResult(new HashMap<>(futures));
}
@Override
@ -1638,7 +1633,7 @@ public class KafkaAdminClient extends AdminClient {
for (AclBindingFilter filter : filters) {
if (futures.get(filter) == null) {
filterList.add(filter);
futures.put(filter, new KafkaFutureImpl<FilterResults>());
futures.put(filter, new KafkaFutureImpl<>());
}
}
runnable.call(new Call("deleteAcls", calcDeadlineMs(now, options.timeoutMs()),
@ -1679,7 +1674,7 @@ public class KafkaAdminClient extends AdminClient {
completeAllExceptionally(futures.values(), throwable);
}
}, now);
return new DeleteAclsResult(new HashMap<AclBindingFilter, KafkaFuture<FilterResults>>(futures));
return new DeleteAclsResult(new HashMap<>(futures));
}
@Override
@ -1899,7 +1894,7 @@ public class KafkaAdminClient extends AdminClient {
final Map<TopicPartitionReplica, KafkaFutureImpl<Void>> futures = new HashMap<>(replicaAssignment.size());
for (TopicPartitionReplica replica : replicaAssignment.keySet())
futures.put(replica, new KafkaFutureImpl<Void>());
futures.put(replica, new KafkaFutureImpl<>());
Map<Integer, Map<TopicPartition, String>> replicaAssignmentByBroker = new HashMap<>();
for (Map.Entry<TopicPartitionReplica, String> entry: replicaAssignment.entrySet()) {
@ -1908,7 +1903,7 @@ public class KafkaAdminClient extends AdminClient {
int brokerId = replica.brokerId();
TopicPartition topicPartition = new TopicPartition(replica.topic(), replica.partition());
if (!replicaAssignmentByBroker.containsKey(brokerId))
replicaAssignmentByBroker.put(brokerId, new HashMap<TopicPartition, String>());
replicaAssignmentByBroker.put(brokerId, new HashMap<>());
replicaAssignmentByBroker.get(brokerId).put(topicPartition, logDir);
}
@ -1950,7 +1945,7 @@ public class KafkaAdminClient extends AdminClient {
}, now);
}
return new AlterReplicaLogDirsResult(new HashMap<TopicPartitionReplica, KafkaFuture<Void>>(futures));
return new AlterReplicaLogDirsResult(new HashMap<>(futures));
}
@Override
@ -1958,7 +1953,7 @@ public class KafkaAdminClient extends AdminClient {
final Map<Integer, KafkaFutureImpl<Map<String, DescribeLogDirsResponse.LogDirInfo>>> futures = new HashMap<>(brokers.size());
for (Integer brokerId: brokers) {
futures.put(brokerId, new KafkaFutureImpl<Map<String, DescribeLogDirsResponse.LogDirInfo>>());
futures.put(brokerId, new KafkaFutureImpl<>());
}
final long now = time.milliseconds();
@ -1990,7 +1985,7 @@ public class KafkaAdminClient extends AdminClient {
}, now);
}
return new DescribeLogDirsResult(new HashMap<Integer, KafkaFuture<Map<String, DescribeLogDirsResponse.LogDirInfo>>>(futures));
return new DescribeLogDirsResult(new HashMap<>(futures));
}
@Override
@ -1998,14 +1993,14 @@ public class KafkaAdminClient extends AdminClient {
final Map<TopicPartitionReplica, KafkaFutureImpl<DescribeReplicaLogDirsResult.ReplicaLogDirInfo>> futures = new HashMap<>(replicas.size());
for (TopicPartitionReplica replica : replicas) {
futures.put(replica, new KafkaFutureImpl<DescribeReplicaLogDirsResult.ReplicaLogDirInfo>());
futures.put(replica, new KafkaFutureImpl<>());
}
Map<Integer, Set<TopicPartition>> partitionsByBroker = new HashMap<>();
for (TopicPartitionReplica replica: replicas) {
if (!partitionsByBroker.containsKey(replica.brokerId()))
partitionsByBroker.put(replica.brokerId(), new HashSet<TopicPartition>());
partitionsByBroker.put(replica.brokerId(), new HashSet<>());
partitionsByBroker.get(replica.brokerId()).add(new TopicPartition(replica.topic(), replica.partition()));
}
@ -2074,7 +2069,7 @@ public class KafkaAdminClient extends AdminClient {
}, now);
}
return new DescribeReplicaLogDirsResult(new HashMap<TopicPartitionReplica, KafkaFuture<ReplicaLogDirInfo>>(futures));
return new DescribeReplicaLogDirsResult(new HashMap<>(futures));
}
@Override
@ -2082,7 +2077,7 @@ public class KafkaAdminClient extends AdminClient {
final CreatePartitionsOptions options) {
final Map<String, KafkaFutureImpl<Void>> futures = new HashMap<>(newPartitions.size());
for (String topic : newPartitions.keySet()) {
futures.put(topic, new KafkaFutureImpl<Void>());
futures.put(topic, new KafkaFutureImpl<>());
}
final Map<String, NewPartitions> requestMap = new HashMap<>(newPartitions);
@ -2121,7 +2116,7 @@ public class KafkaAdminClient extends AdminClient {
completeAllExceptionally(futures.values(), throwable);
}
}, now);
return new CreatePartitionsResult(new HashMap<String, KafkaFuture<Void>>(futures));
return new CreatePartitionsResult(new HashMap<>(futures));
}
@Override
@ -2133,7 +2128,7 @@ public class KafkaAdminClient extends AdminClient {
final Map<TopicPartition, KafkaFutureImpl<DeletedRecords>> futures = new HashMap<>(recordsToDelete.size());
for (TopicPartition topicPartition: recordsToDelete.keySet()) {
futures.put(topicPartition, new KafkaFutureImpl<DeletedRecords>());
futures.put(topicPartition, new KafkaFutureImpl<>());
}
// preparing topics list for asking metadata about them
@ -2180,7 +2175,7 @@ public class KafkaAdminClient extends AdminClient {
Node node = cluster.leaderFor(entry.getKey());
if (node != null) {
if (!leaders.containsKey(node))
leaders.put(node, new HashMap<TopicPartition, Long>());
leaders.put(node, new HashMap<>());
leaders.get(node).put(entry.getKey(), entry.getValue().beforeOffset());
} else {
KafkaFutureImpl<DeletedRecords> future = futures.get(entry.getKey());
@ -2231,7 +2226,7 @@ public class KafkaAdminClient extends AdminClient {
}
}, nowMetadata);
return new DeleteRecordsResult(new HashMap<TopicPartition, KafkaFuture<DeletedRecords>>(futures));
return new DeleteRecordsResult(new HashMap<>(futures));
}
@Override
@ -2373,7 +2368,7 @@ public class KafkaAdminClient extends AdminClient {
groupId + "' cannot be represented in a request."));
futures.put(groupId, future);
} else if (!futures.containsKey(groupId)) {
futures.put(groupId, new KafkaFutureImpl<ConsumerGroupDescription>());
futures.put(groupId, new KafkaFutureImpl<>());
}
}
@ -2469,7 +2464,7 @@ public class KafkaAdminClient extends AdminClient {
}, startFindCoordinatorMs);
}
return new DescribeConsumerGroupsResult(new HashMap<String, KafkaFuture<ConsumerGroupDescription>>(futures));
return new DescribeConsumerGroupsResult(new HashMap<>(futures));
}
private boolean handleFindCoordinatorError(FindCoordinatorResponse response, KafkaFutureImpl<?> future) {
@ -2518,12 +2513,12 @@ public class KafkaAdminClient extends AdminClient {
private synchronized void tryComplete() {
if (remaining.isEmpty()) {
ArrayList<Object> results = new ArrayList<Object>(listings.values());
ArrayList<Object> results = new ArrayList<>(listings.values());
results.addAll(errors);
future.complete(results);
}
}
};
}
@Override
public ListConsumerGroupsResult listConsumerGroups(ListConsumerGroupsOptions options) {
@ -2687,7 +2682,7 @@ public class KafkaAdminClient extends AdminClient {
groupId + "' cannot be represented in a request."));
futures.put(groupId, future);
} else if (!futures.containsKey(groupId)) {
futures.put(groupId, new KafkaFutureImpl<Void>());
futures.put(groupId, new KafkaFutureImpl<>());
}
}
@ -2755,7 +2750,7 @@ public class KafkaAdminClient extends AdminClient {
}, startFindCoordinatorMs);
}
return new DeleteConsumerGroupsResult(new HashMap<String, KafkaFuture<Void>>(futures));
return new DeleteConsumerGroupsResult(new HashMap<>(futures));
}
@Override