diff --git a/core/src/main/scala/kafka/server/ControllerApis.scala b/core/src/main/scala/kafka/server/ControllerApis.scala index 85ae7d70b6d..74bc4dd4067 100644 --- a/core/src/main/scala/kafka/server/ControllerApis.scala +++ b/core/src/main/scala/kafka/server/ControllerApis.scala @@ -117,6 +117,11 @@ class ControllerApis(val requestChannel: RequestChannel, s"with context ${request.context}", t) requestHelper.handleError(request, t) } + } finally { + // Only record local completion time if it is unset. + if (request.apiLocalCompleteTimeNanos < 0) { + request.apiLocalCompleteTimeNanos = time.nanoseconds + } } } diff --git a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala index 0c6f979874f..86a0c854705 100644 --- a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala @@ -17,11 +17,6 @@ package kafka.server -import java.net.InetAddress -import java.util -import java.util.Collections.singletonList -import java.util.{Collections, Properties} -import java.util.concurrent.{CompletableFuture, ExecutionException} import kafka.network.RequestChannel import kafka.raft.RaftManager import kafka.server.QuotaFactory.QuotaManagers @@ -44,7 +39,7 @@ import org.apache.kafka.common.message.DeleteTopicsRequestData.DeleteTopicState import org.apache.kafka.common.message.DeleteTopicsResponseData.DeletableTopicResult import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.{AlterConfigsResource, AlterConfigsResourceCollection, AlterableConfig, AlterableConfigCollection} import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.AlterConfigsResourceResponse -import org.apache.kafka.common.message.{CreateTopicsRequestData, _} +import org.apache.kafka.common.message._ import org.apache.kafka.common.network.{ClientInformation, ListenerName} import org.apache.kafka.common.protocol.Errors._ import org.apache.kafka.common.protocol.{ApiKeys, ApiMessage, Errors} @@ -64,6 +59,11 @@ import org.mockito.ArgumentMatchers._ import org.mockito.Mockito._ import org.mockito.{ArgumentCaptor, ArgumentMatchers} +import java.net.InetAddress +import java.util +import java.util.Collections.singletonList +import java.util.concurrent.{CompletableFuture, ExecutionException, TimeUnit} +import java.util.{Collections, Properties} import scala.annotation.nowarn import scala.jdk.CollectionConverters._ import scala.reflect.ClassTag @@ -174,6 +174,41 @@ class ControllerApisTest { ) } + @Test + def testFetchLocalTimeComputedCorrectly(): Unit = { + val localTimeDurationMs = 5 + val initialTimeNanos = time.nanoseconds() + val initialTimeMs = time.milliseconds() + + when( + raftManager.handleRequest( + any(classOf[RequestHeader]), + any(classOf[ApiMessage]), + any(classOf[Long]) + ) + ).thenAnswer { _ => + time.sleep(localTimeDurationMs) + new CompletableFuture[ApiMessage]() + } + + // Local time should be updated when `ControllerApis.handle` returns + val fetchRequestData = new FetchRequestData() + val request = buildRequest(new FetchRequest(fetchRequestData, ApiKeys.FETCH.latestVersion)) + createControllerApis(None, new MockController.Builder().build()) + .handle(request, RequestLocal.NoCaching) + + verify(raftManager).handleRequest( + ArgumentMatchers.eq(request.header), + ArgumentMatchers.eq(fetchRequestData), + ArgumentMatchers.eq(initialTimeMs) + ) + + assertEquals(localTimeDurationMs, TimeUnit.MILLISECONDS.convert( + request.apiLocalCompleteTimeNanos - initialTimeNanos, + TimeUnit.NANOSECONDS + )) + } + @Test def testUnauthorizedFetchSnapshot(): Unit = { assertThrows(classOf[ClusterAuthorizationException], () => createControllerApis(