KAFKA-14036; Set local time in `ControllerApis` when `handle` returns (#12372)

In `ControllerApis`, we are missing the logic to set the local processing end time after `handle` returns. As a consequence of this, the remote time ends up reported as the local time in the request level metrics. The patch adds the same logic we have in `KafkaApis` to set `apiLocalCompleteTimeNanos`.

Reviewers: José Armando García Sancio <jsancio@gmail.com>
This commit is contained in:
Jason Gustafson 2022-06-30 21:07:21 -07:00 committed by GitHub
parent c19398ee66
commit 6bf5bfc298
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 46 additions and 6 deletions

View File

@ -117,6 +117,11 @@ class ControllerApis(val requestChannel: RequestChannel,
s"with context ${request.context}", t)
requestHelper.handleError(request, t)
}
} finally {
// Only record local completion time if it is unset.
if (request.apiLocalCompleteTimeNanos < 0) {
request.apiLocalCompleteTimeNanos = time.nanoseconds
}
}
}

View File

@ -17,11 +17,6 @@
package kafka.server
import java.net.InetAddress
import java.util
import java.util.Collections.singletonList
import java.util.{Collections, Properties}
import java.util.concurrent.{CompletableFuture, ExecutionException}
import kafka.network.RequestChannel
import kafka.raft.RaftManager
import kafka.server.QuotaFactory.QuotaManagers
@ -44,7 +39,7 @@ import org.apache.kafka.common.message.DeleteTopicsRequestData.DeleteTopicState
import org.apache.kafka.common.message.DeleteTopicsResponseData.DeletableTopicResult
import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.{AlterConfigsResource, AlterConfigsResourceCollection, AlterableConfig, AlterableConfigCollection}
import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.AlterConfigsResourceResponse
import org.apache.kafka.common.message.{CreateTopicsRequestData, _}
import org.apache.kafka.common.message._
import org.apache.kafka.common.network.{ClientInformation, ListenerName}
import org.apache.kafka.common.protocol.Errors._
import org.apache.kafka.common.protocol.{ApiKeys, ApiMessage, Errors}
@ -64,6 +59,11 @@ import org.mockito.ArgumentMatchers._
import org.mockito.Mockito._
import org.mockito.{ArgumentCaptor, ArgumentMatchers}
import java.net.InetAddress
import java.util
import java.util.Collections.singletonList
import java.util.concurrent.{CompletableFuture, ExecutionException, TimeUnit}
import java.util.{Collections, Properties}
import scala.annotation.nowarn
import scala.jdk.CollectionConverters._
import scala.reflect.ClassTag
@ -174,6 +174,41 @@ class ControllerApisTest {
)
}
@Test
def testFetchLocalTimeComputedCorrectly(): Unit = {
val localTimeDurationMs = 5
val initialTimeNanos = time.nanoseconds()
val initialTimeMs = time.milliseconds()
when(
raftManager.handleRequest(
any(classOf[RequestHeader]),
any(classOf[ApiMessage]),
any(classOf[Long])
)
).thenAnswer { _ =>
time.sleep(localTimeDurationMs)
new CompletableFuture[ApiMessage]()
}
// Local time should be updated when `ControllerApis.handle` returns
val fetchRequestData = new FetchRequestData()
val request = buildRequest(new FetchRequest(fetchRequestData, ApiKeys.FETCH.latestVersion))
createControllerApis(None, new MockController.Builder().build())
.handle(request, RequestLocal.NoCaching)
verify(raftManager).handleRequest(
ArgumentMatchers.eq(request.header),
ArgumentMatchers.eq(fetchRequestData),
ArgumentMatchers.eq(initialTimeMs)
)
assertEquals(localTimeDurationMs, TimeUnit.MILLISECONDS.convert(
request.apiLocalCompleteTimeNanos - initialTimeNanos,
TimeUnit.NANOSECONDS
))
}
@Test
def testUnauthorizedFetchSnapshot(): Unit = {
assertThrows(classOf[ClusterAuthorizationException], () => createControllerApis(