From 4e7c789118fed28ee99275e3e84a626fe912c79a Mon Sep 17 00:00:00 2001 From: Chia-Ping Tsai Date: Mon, 21 Dec 2020 16:11:51 +0800 Subject: [PATCH] MINOR: refactor SelectingIterator by scala iterator (#9755) Reviewers: Ismael Juma --- .../main/scala/kafka/server/KafkaApis.scala | 49 +++++-------------- .../unit/kafka/server/KafkaApisTest.scala | 29 ++++++++++- 2 files changed, 40 insertions(+), 38 deletions(-) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index f4758a6bf93..183fd392e14 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -897,7 +897,7 @@ class KafkaApis(val requestChannel: RequestChannel, if (fetchRequest.isFromFollower) { // We've already evaluated against the quota and are good to go. Just need to record it now. unconvertedFetchResponse = fetchContext.updateAndGenerateResponseData(partitions) - val responseSize = sizeOfThrottledPartitions(versionId, unconvertedFetchResponse, quotas.leader) + val responseSize = KafkaApis.sizeOfThrottledPartitions(versionId, unconvertedFetchResponse, quotas.leader) quotas.leader.record(responseSize) trace(s"Sending Fetch response with partitions.size=${unconvertedFetchResponse.responseData.size}, " + s"metadata=${unconvertedFetchResponse.sessionId}") @@ -967,42 +967,6 @@ class KafkaApis(val requestChannel: RequestChannel, } } - class SelectingIterator(val partitions: util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData[Records]], - val quota: ReplicationQuotaManager) - extends util.Iterator[util.Map.Entry[TopicPartition, FetchResponse.PartitionData[Records]]] { - val iter = partitions.entrySet().iterator() - - var nextElement: util.Map.Entry[TopicPartition, FetchResponse.PartitionData[Records]] = null - - override def hasNext: Boolean = { - while ((nextElement == null) && iter.hasNext()) { - val element = iter.next() - if (quota.isThrottled(element.getKey)) { - nextElement = element - } - } - nextElement != null - } - - override def next(): util.Map.Entry[TopicPartition, FetchResponse.PartitionData[Records]] = { - if (!hasNext()) throw new NoSuchElementException() - val element = nextElement - nextElement = null - element - } - - override def remove(): Unit = throw new UnsupportedOperationException() - } - - // Traffic from both in-sync and out of sync replicas are accounted for in replication quota to ensure total replication - // traffic doesn't exceed quota. - private def sizeOfThrottledPartitions(versionId: Short, - unconvertedResponse: FetchResponse[Records], - quota: ReplicationQuotaManager): Int = { - val iter = new SelectingIterator(unconvertedResponse.responseData, quota) - FetchResponse.sizeOf(versionId, iter) - } - def replicationQuota(fetchRequest: FetchRequest): ReplicaQuota = if (fetchRequest.isFromFollower) quotas.leader else UnboundedQuota @@ -3534,3 +3498,14 @@ class KafkaApis(val requestChannel: RequestChannel, } } + +object KafkaApis { + // Traffic from both in-sync and out of sync replicas are accounted for in replication quota to ensure total replication + // traffic doesn't exceed quota. + private[server] def sizeOfThrottledPartitions(versionId: Short, + unconvertedResponse: FetchResponse[Records], + quota: ReplicationQuotaManager): Int = { + FetchResponse.sizeOf(versionId, unconvertedResponse.responseData.entrySet + .iterator.asScala.filter(element => quota.isThrottled(element.getKey)).asJava) + } +} \ No newline at end of file diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index 87a870a6ad7..64c4e1783f5 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -23,7 +23,6 @@ import java.util import java.util.Arrays.asList import java.util.concurrent.TimeUnit import java.util.{Collections, Optional, Properties, Random} - import kafka.api.{ApiVersion, KAFKA_0_10_2_IV0, KAFKA_2_2_IV1, LeaderAndIsr} import kafka.cluster.{Broker, Partition} import kafka.controller.KafkaController @@ -71,6 +70,7 @@ import org.easymock.EasyMock._ import org.easymock.{Capture, EasyMock, IAnswer, IArgumentMatcher} import org.junit.Assert._ import org.junit.{After, Test} +import org.mockito.{ArgumentMatchers, Mockito} import scala.annotation.nowarn import scala.collection.{Map, Seq, mutable} @@ -3068,4 +3068,31 @@ class KafkaApisTest { Errors.LOG_DIR_NOT_FOUND -> 1, Errors.INVALID_TOPIC_EXCEPTION -> 1).asJava, response.errorCounts) } + + @Test + def testSizeOfThrottledPartitions(): Unit = { + def fetchResponse(data: Map[TopicPartition, String]): FetchResponse[Records] = { + val responseData = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData[Records]]( + data.map { case (tp, raw) => + tp -> new FetchResponse.PartitionData(Errors.NONE, + 105, 105, 0, Optional.empty(), Collections.emptyList(), Optional.empty(), + MemoryRecords.withRecords(CompressionType.NONE, + new SimpleRecord(100, raw.getBytes(StandardCharsets.UTF_8))).asInstanceOf[Records]) + }.toMap.asJava) + new FetchResponse(Errors.NONE, responseData, 100, 100) + } + + val throttledPartition = new TopicPartition("throttledData", 0) + val throttledData = Map(throttledPartition -> "throttledData") + val expectedSize = FetchResponse.sizeOf(FetchResponseData.HIGHEST_SUPPORTED_VERSION, + fetchResponse(throttledData).responseData.entrySet.iterator) + + val response = fetchResponse(throttledData ++ Map(new TopicPartition("nonThrottledData", 0) -> "nonThrottledData")) + + val quota = Mockito.mock(classOf[ReplicationQuotaManager]) + Mockito.when(quota.isThrottled(ArgumentMatchers.any(classOf[TopicPartition]))) + .thenAnswer(invocation => throttledPartition == invocation.getArgument(0).asInstanceOf[TopicPartition]) + + assertEquals(expectedSize, KafkaApis.sizeOfThrottledPartitions(FetchResponseData.HIGHEST_SUPPORTED_VERSION, response, quota)) + } }