MINOR: Improve code style (#15107)

Reviewers: Divij Vaidya <diviv@amazon.com>
This commit is contained in:
DL1231 2024-01-03 18:56:20 +08:00 committed by GitHub
parent 4defb7c408
commit 60c445bdd5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 20 additions and 24 deletions

View File

@ -1116,12 +1116,12 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
* @throws KafkaException for all Kafka-related exceptions, including the case where this method is called after producer close
*/
private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long nowMs, long maxWaitMs) throws InterruptedException {
// add topic to metadata topic list if it is not there already and reset expiry
Cluster cluster = metadata.fetch();
if (cluster.invalidTopics().contains(topic))
throw new InvalidTopicException(topic);
// add topic to metadata topic list if it is not there already and reset expiry
metadata.add(topic, nowMs);
Integer partitionsCount = cluster.partitionCountForTopic(topic);

View File

@ -944,7 +944,6 @@ public class RecordAccumulator {
// the rest of the work by processing outside the lock
// close() is particularly expensive
batch.close();
size += batch.records().sizeInBytes();
ready.add(batch);

View File

@ -217,11 +217,8 @@ public class Sender implements Runnable {
private void addToInflightBatches(List<ProducerBatch> batches) {
for (ProducerBatch batch : batches) {
List<ProducerBatch> inflightBatchList = inFlightBatches.get(batch.topicPartition);
if (inflightBatchList == null) {
inflightBatchList = new ArrayList<>();
inFlightBatches.put(batch.topicPartition, inflightBatchList);
}
List<ProducerBatch> inflightBatchList = inFlightBatches.computeIfAbsent(batch.topicPartition,
k -> new ArrayList<>());
inflightBatchList.add(batch);
}
}

View File

@ -1304,7 +1304,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val knownTopicNames = topicIds.flatMap(metadataCache.getTopicName)
val unknownTopicIdsTopicMetadata = unknownTopicIds.map(topicId =>
metadataResponseTopic(Errors.UNKNOWN_TOPIC_ID, null, topicId, false, util.Collections.emptyList())).toSeq
metadataResponseTopic(Errors.UNKNOWN_TOPIC_ID, null, topicId, isInternal = false, util.Collections.emptyList())).toSeq
val topics = if (metadataRequest.isAllTopics)
metadataCache.getAllTopics()
@ -1342,11 +1342,11 @@ class KafkaApis(val requestChannel: RequestChannel,
else if (useTopicId) {
// Topic IDs are not considered sensitive information, so returning TOPIC_AUTHORIZATION_FAILED is OK
unauthorizedForDescribeTopics.map(topic =>
metadataResponseTopic(Errors.TOPIC_AUTHORIZATION_FAILED, null, metadataCache.getTopicId(topic), false, util.Collections.emptyList()))
metadataResponseTopic(Errors.TOPIC_AUTHORIZATION_FAILED, null, metadataCache.getTopicId(topic), isInternal = false, util.Collections.emptyList()))
} else {
// We should not return topicId when on unauthorized error, so we return zero uuid.
unauthorizedForDescribeTopics.map(topic =>
metadataResponseTopic(Errors.TOPIC_AUTHORIZATION_FAILED, topic, Uuid.ZERO_UUID, false, util.Collections.emptyList()))
metadataResponseTopic(Errors.TOPIC_AUTHORIZATION_FAILED, topic, Uuid.ZERO_UUID, isInternal = false, util.Collections.emptyList()))
}
// In version 0, we returned an error when brokers with replicas were unavailable,
@ -1985,7 +1985,7 @@ class KafkaApis(val requestChannel: RequestChannel,
}
val authorizedTopics = if (hasClusterAuthorization) {
allowedTopicNames.toSet
allowedTopicNames
} else {
authHelper.filterByAuthorized(request.context, CREATE, TOPIC, allowedTopicNames)(identity)
}
@ -2501,24 +2501,24 @@ class KafkaApis(val requestChannel: RequestChannel,
def handleAddPartitionsToTxnRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
ensureInterBrokerVersion(IBP_0_11_0_IV0)
val addPartitionsToTxnRequest =
if (request.context.apiVersion() < 4)
request.body[AddPartitionsToTxnRequest].normalizeRequest()
else
if (request.context.apiVersion() < 4)
request.body[AddPartitionsToTxnRequest].normalizeRequest()
else
request.body[AddPartitionsToTxnRequest]
val version = addPartitionsToTxnRequest.version
val responses = new AddPartitionsToTxnResultCollection()
val partitionsByTransaction = addPartitionsToTxnRequest.partitionsByTransaction()
// Newer versions of the request should only come from other brokers.
if (version >= 4) authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
// V4 requests introduced batches of transactions. We need all transactions to be handled before sending the
// V4 requests introduced batches of transactions. We need all transactions to be handled before sending the
// response so there are a few differences in handling errors and sending responses.
def createResponse(requestThrottleMs: Int): AbstractResponse = {
if (version < 4) {
// There will only be one response in data. Add it to the response data object.
val data = new AddPartitionsToTxnResponseData()
responses.forEach { result =>
responses.forEach { result =>
data.setResultsByTopicV3AndBelow(result.topicResults())
data.setThrottleTimeMs(requestThrottleMs)
}
@ -2539,7 +2539,7 @@ class KafkaApis(val requestChannel: RequestChannel,
}
}
txns.forEach { transaction =>
txns.forEach { transaction =>
val transactionalId = transaction.transactionalId
if (transactionalId == null)
@ -2556,10 +2556,10 @@ class KafkaApis(val requestChannel: RequestChannel,
val authorizedPartitions = mutable.Set[TopicPartition]()
// Only request versions less than 4 need write authorization since they come from clients.
val authorizedTopics =
if (version < 4)
authHelper.filterByAuthorized(request.context, WRITE, TOPIC, partitionsToAdd.filterNot(tp => Topic.isInternal(tp.topic)))(_.topic)
else
val authorizedTopics =
if (version < 4)
authHelper.filterByAuthorized(request.context, WRITE, TOPIC, partitionsToAdd.filterNot(tp => Topic.isInternal(tp.topic)))(_.topic)
else
partitionsToAdd.map(_.topic).toSet
for (topicPartition <- partitionsToAdd) {
if (!authorizedTopics.contains(topicPartition.topic))
@ -3510,7 +3510,7 @@ class KafkaApis(val requestChannel: RequestChannel,
new DescribeUserScramCredentialsResponse(result.setThrottleTimeMs(requestThrottleMs)))
case RaftSupport(_, metadataCache) =>
val result = metadataCache.describeScramCredentials(describeUserScramCredentialsRequest.data())
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
new DescribeUserScramCredentialsResponse(result.setThrottleTimeMs(requestThrottleMs)))
}
}

View File

@ -468,7 +468,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
* @param sanitizedEntityName entity name
* @throws KeeperException if there is an error while setting or creating the znode
*/
def setOrCreateEntityConfigs(rootEntityType: String, sanitizedEntityName: String, config: Properties) = {
def setOrCreateEntityConfigs(rootEntityType: String, sanitizedEntityName: String, config: Properties): Unit = {
def set(configData: Array[Byte]): SetDataResponse = {
val setDataRequest = SetDataRequest(ConfigEntityZNode.path(rootEntityType, sanitizedEntityName),