MINOR: refactor SelectingIterator by scala iterator (#9755)

Reviewers: Ismael Juma <ismael@juma.me.uk>
This commit is contained in:
Chia-Ping Tsai 2020-12-21 16:11:51 +08:00 committed by GitHub
parent 87260a33b0
commit 4e7c789118
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 40 additions and 38 deletions

View File

@ -897,7 +897,7 @@ class KafkaApis(val requestChannel: RequestChannel,
if (fetchRequest.isFromFollower) {
// We've already evaluated against the quota and are good to go. Just need to record it now.
unconvertedFetchResponse = fetchContext.updateAndGenerateResponseData(partitions)
val responseSize = sizeOfThrottledPartitions(versionId, unconvertedFetchResponse, quotas.leader)
val responseSize = KafkaApis.sizeOfThrottledPartitions(versionId, unconvertedFetchResponse, quotas.leader)
quotas.leader.record(responseSize)
trace(s"Sending Fetch response with partitions.size=${unconvertedFetchResponse.responseData.size}, " +
s"metadata=${unconvertedFetchResponse.sessionId}")
@ -967,42 +967,6 @@ class KafkaApis(val requestChannel: RequestChannel,
}
}
class SelectingIterator(val partitions: util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData[Records]],
val quota: ReplicationQuotaManager)
extends util.Iterator[util.Map.Entry[TopicPartition, FetchResponse.PartitionData[Records]]] {
val iter = partitions.entrySet().iterator()
var nextElement: util.Map.Entry[TopicPartition, FetchResponse.PartitionData[Records]] = null
override def hasNext: Boolean = {
while ((nextElement == null) && iter.hasNext()) {
val element = iter.next()
if (quota.isThrottled(element.getKey)) {
nextElement = element
}
}
nextElement != null
}
override def next(): util.Map.Entry[TopicPartition, FetchResponse.PartitionData[Records]] = {
if (!hasNext()) throw new NoSuchElementException()
val element = nextElement
nextElement = null
element
}
override def remove(): Unit = throw new UnsupportedOperationException()
}
// Traffic from both in-sync and out of sync replicas are accounted for in replication quota to ensure total replication
// traffic doesn't exceed quota.
private def sizeOfThrottledPartitions(versionId: Short,
unconvertedResponse: FetchResponse[Records],
quota: ReplicationQuotaManager): Int = {
val iter = new SelectingIterator(unconvertedResponse.responseData, quota)
FetchResponse.sizeOf(versionId, iter)
}
def replicationQuota(fetchRequest: FetchRequest): ReplicaQuota =
if (fetchRequest.isFromFollower) quotas.leader else UnboundedQuota
@ -3534,3 +3498,14 @@ class KafkaApis(val requestChannel: RequestChannel,
}
}
object KafkaApis {
// Traffic from both in-sync and out of sync replicas are accounted for in replication quota to ensure total replication
// traffic doesn't exceed quota.
private[server] def sizeOfThrottledPartitions(versionId: Short,
unconvertedResponse: FetchResponse[Records],
quota: ReplicationQuotaManager): Int = {
FetchResponse.sizeOf(versionId, unconvertedResponse.responseData.entrySet
.iterator.asScala.filter(element => quota.isThrottled(element.getKey)).asJava)
}
}

View File

@ -23,7 +23,6 @@ import java.util
import java.util.Arrays.asList
import java.util.concurrent.TimeUnit
import java.util.{Collections, Optional, Properties, Random}
import kafka.api.{ApiVersion, KAFKA_0_10_2_IV0, KAFKA_2_2_IV1, LeaderAndIsr}
import kafka.cluster.{Broker, Partition}
import kafka.controller.KafkaController
@ -71,6 +70,7 @@ import org.easymock.EasyMock._
import org.easymock.{Capture, EasyMock, IAnswer, IArgumentMatcher}
import org.junit.Assert._
import org.junit.{After, Test}
import org.mockito.{ArgumentMatchers, Mockito}
import scala.annotation.nowarn
import scala.collection.{Map, Seq, mutable}
@ -3068,4 +3068,31 @@ class KafkaApisTest {
Errors.LOG_DIR_NOT_FOUND -> 1,
Errors.INVALID_TOPIC_EXCEPTION -> 1).asJava, response.errorCounts)
}
@Test
def testSizeOfThrottledPartitions(): Unit = {
def fetchResponse(data: Map[TopicPartition, String]): FetchResponse[Records] = {
val responseData = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData[Records]](
data.map { case (tp, raw) =>
tp -> new FetchResponse.PartitionData(Errors.NONE,
105, 105, 0, Optional.empty(), Collections.emptyList(), Optional.empty(),
MemoryRecords.withRecords(CompressionType.NONE,
new SimpleRecord(100, raw.getBytes(StandardCharsets.UTF_8))).asInstanceOf[Records])
}.toMap.asJava)
new FetchResponse(Errors.NONE, responseData, 100, 100)
}
val throttledPartition = new TopicPartition("throttledData", 0)
val throttledData = Map(throttledPartition -> "throttledData")
val expectedSize = FetchResponse.sizeOf(FetchResponseData.HIGHEST_SUPPORTED_VERSION,
fetchResponse(throttledData).responseData.entrySet.iterator)
val response = fetchResponse(throttledData ++ Map(new TopicPartition("nonThrottledData", 0) -> "nonThrottledData"))
val quota = Mockito.mock(classOf[ReplicationQuotaManager])
Mockito.when(quota.isThrottled(ArgumentMatchers.any(classOf[TopicPartition])))
.thenAnswer(invocation => throttledPartition == invocation.getArgument(0).asInstanceOf[TopicPartition])
assertEquals(expectedSize, KafkaApis.sizeOfThrottledPartitions(FetchResponseData.HIGHEST_SUPPORTED_VERSION, response, quota))
}
}