From bd1f02b2beca69a0937bb3df6f1e0ebcc3d9bfeb Mon Sep 17 00:00:00 2001 From: David Jacot Date: Thu, 6 Jul 2023 14:56:05 +0200 Subject: [PATCH] MINOR: Move MockTimer to server-common (#13954) This patch rewrites MockTimer in Java and moves it from core to server-common. This continues the work started in https://github.com/apache/kafka/pull/13820. Reviewers: Divij Vaidya --- checkstyle/import-control-server-common.xml | 8 +- .../AbstractCoordinatorConcurrencyTest.scala | 2 +- .../group/GroupCoordinatorTest.scala | 2 +- .../kafka/server/ReplicaManagerTest.scala | 2 +- .../unit/kafka/utils/timer/MockTimer.scala | 72 --------------- .../kafka/server/util/timer/MockTimer.java | 89 +++++++++++++++++++ 6 files changed, 98 insertions(+), 77 deletions(-) delete mode 100644 core/src/test/scala/unit/kafka/utils/timer/MockTimer.scala create mode 100644 server-common/src/test/java/org/apache/kafka/server/util/timer/MockTimer.java diff --git a/checkstyle/import-control-server-common.xml b/checkstyle/import-control-server-common.xml index 350f2820968..2e65ec601a3 100644 --- a/checkstyle/import-control-server-common.xml +++ b/checkstyle/import-control-server-common.xml @@ -82,13 +82,17 @@ - - + + + + + + diff --git a/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala index 23961be7fb8..255b8dbb866 100644 --- a/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala @@ -25,12 +25,12 @@ import kafka.coordinator.AbstractCoordinatorConcurrencyTest._ import kafka.log.UnifiedLog import kafka.server._ import kafka.utils._ -import kafka.utils.timer.MockTimer import kafka.zk.KafkaZkClient import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.record.{MemoryRecords, RecordBatch, RecordConversionStats} import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse +import org.apache.kafka.server.util.timer.MockTimer import org.apache.kafka.server.util.{MockScheduler, MockTime} import org.apache.kafka.storage.internals.log.{AppendOrigin, LogConfig} import org.junit.jupiter.api.{AfterEach, BeforeEach} diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala index ee11fbd8a3d..787e76d6aef 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala @@ -21,7 +21,6 @@ import java.util.{Optional, OptionalInt} import kafka.common.OffsetAndMetadata import kafka.server.{DelayedOperationPurgatory, HostedPartition, KafkaConfig, ReplicaManager, RequestLocal} import kafka.utils._ -import kafka.utils.timer.MockTimer import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid} import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.record.{MemoryRecords, RecordBatch} @@ -37,6 +36,7 @@ import org.apache.kafka.clients.consumer.internals.ConsumerProtocol import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity +import org.apache.kafka.server.util.timer.MockTimer import org.apache.kafka.server.util.{KafkaScheduler, MockTime} import org.apache.kafka.storage.internals.log.AppendOrigin import org.junit.jupiter.api.Assertions._ diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index bcb7cfb327f..e8cad3e942e 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -32,7 +32,6 @@ import kafka.log._ import kafka.server.QuotaFactory.{QuotaManagers, UnboundedQuota} import kafka.server.checkpoints.{LazyOffsetCheckpoints, OffsetCheckpointFile} import kafka.server.epoch.util.MockBlockingSender -import kafka.utils.timer.MockTimer import kafka.utils.{Pool, TestUtils} import org.apache.kafka.clients.FetchSessionHandler import org.apache.kafka.common.errors.{InvalidPidMappingException, KafkaStorageException} @@ -71,6 +70,7 @@ import kafka.log.remote.RemoteLogManager import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.{AddPartitionsToTxnTopic, AddPartitionsToTxnTopicCollection, AddPartitionsToTxnTransaction} import org.apache.kafka.common.message.MetadataResponseData.{MetadataResponsePartition, MetadataResponseTopic} +import org.apache.kafka.server.util.timer.MockTimer import org.mockito.invocation.InvocationOnMock import org.mockito.stubbing.Answer import org.mockito.{ArgumentCaptor, ArgumentMatchers} diff --git a/core/src/test/scala/unit/kafka/utils/timer/MockTimer.scala b/core/src/test/scala/unit/kafka/utils/timer/MockTimer.scala deleted file mode 100644 index cee7dc097f1..00000000000 --- a/core/src/test/scala/unit/kafka/utils/timer/MockTimer.scala +++ /dev/null @@ -1,72 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package kafka.utils.timer - -import org.apache.kafka.server.util.MockTime -import org.apache.kafka.server.util.timer.{Timer, TimerTask, TimerTaskEntry} - -import scala.collection.mutable - -class MockTimer(val time: MockTime = new MockTime) extends Timer { - - private val taskQueue = mutable.PriorityQueue.empty[TimerTaskEntry](new Ordering[TimerTaskEntry] { - override def compare(x: TimerTaskEntry, y: TimerTaskEntry): Int = java.lang.Long.compare(x.expirationMs, y.expirationMs) - }.reverse) - - def add(timerTask: TimerTask): Unit = { - if (timerTask.delayMs <= 0) - timerTask.run() - else { - taskQueue synchronized { - taskQueue.enqueue(new TimerTaskEntry(timerTask, timerTask.delayMs + time.milliseconds)) - } - } - } - - def advanceClock(timeoutMs: Long): Boolean = { - time.sleep(timeoutMs) - - var executed = false - val now = time.milliseconds - - var hasMore = true - while (hasMore) { - hasMore = false - val head = taskQueue synchronized { - if (taskQueue.nonEmpty && now > taskQueue.head.expirationMs) { - val entry = Some(taskQueue.dequeue()) - hasMore = taskQueue.nonEmpty - entry - } else - None - } - head.foreach { taskEntry => - if (!taskEntry.cancelled) { - val task = taskEntry.timerTask - task.run() - executed = true - } - } - } - executed - } - - def size: Int = taskQueue.size - - override def close(): Unit = {} - -} diff --git a/server-common/src/test/java/org/apache/kafka/server/util/timer/MockTimer.java b/server-common/src/test/java/org/apache/kafka/server/util/timer/MockTimer.java new file mode 100644 index 00000000000..460fd56690f --- /dev/null +++ b/server-common/src/test/java/org/apache/kafka/server/util/timer/MockTimer.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.util.timer; + +import org.apache.kafka.server.util.MockTime; + +import java.util.Comparator; +import java.util.PriorityQueue; + +public class MockTimer implements Timer { + private final MockTime time; + private final PriorityQueue taskQueue = new PriorityQueue<>( + Comparator.comparingLong(entry -> entry.expirationMs) + ); + + public MockTimer() { + this(new MockTime()); + } + + public MockTimer(MockTime time) { + this.time = time; + } + + @Override + public void add(TimerTask timerTask) { + if (timerTask.delayMs <= 0) { + timerTask.run(); + } else { + synchronized (taskQueue) { + taskQueue.add(new TimerTaskEntry(timerTask, timerTask.delayMs + time.milliseconds())); + } + } + } + + @Override + public boolean advanceClock(long timeoutMs) throws InterruptedException { + time.sleep(timeoutMs); + + final long now = time.milliseconds(); + boolean executed = false; + boolean hasMore = true; + + while (hasMore) { + hasMore = false; + TimerTaskEntry taskEntry = null; + + synchronized (taskQueue) { + if (!taskQueue.isEmpty() && now > taskQueue.peek().expirationMs) { + taskEntry = taskQueue.poll(); + hasMore = !taskQueue.isEmpty(); + } + } + + if (taskEntry != null) { + if (!taskEntry.cancelled()) { + taskEntry.timerTask.run(); + executed = true; + } + } + } + + return executed; + } + + public MockTime time() { + return time; + } + + public int size() { + return taskQueue.size(); + } + + @Override + public void close() throws Exception {} +}