MINOR: Move MockTimer to server-common (#13954)

This patch rewrites MockTimer in Java and moves it from core to server-common. This continues the work started in https://github.com/apache/kafka/pull/13820.

Reviewers: Divij Vaidya <diviv@amazon.com>
This commit is contained in:
David Jacot 2023-07-06 14:56:05 +02:00 committed by GitHub
parent 4a61b48d3d
commit bd1f02b2be
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 98 additions and 77 deletions

View File

@ -82,13 +82,17 @@
<allow pkg="org.apache.kafka.server.authorizer" /> <allow pkg="org.apache.kafka.server.authorizer" />
</subpackage> </subpackage>
<!-- InterBrokerSendThread uses some clients classes that are not part of the public -->
<!-- API but are still relatively common -->
<subpackage name="util"> <subpackage name="util">
<!-- InterBrokerSendThread uses some clients classes that are not part of the public -->
<!-- API but are still relatively common -->
<allow class="org.apache.kafka.clients.ClientRequest" /> <allow class="org.apache.kafka.clients.ClientRequest" />
<allow class="org.apache.kafka.clients.ClientResponse" /> <allow class="org.apache.kafka.clients.ClientResponse" />
<allow class="org.apache.kafka.clients.KafkaClient" /> <allow class="org.apache.kafka.clients.KafkaClient" />
<allow class="org.apache.kafka.clients.RequestCompletionHandler" /> <allow class="org.apache.kafka.clients.RequestCompletionHandler" />
<subpackage name="timer">
<allow class="org.apache.kafka.server.util.MockTime" />
</subpackage>
</subpackage> </subpackage>
</subpackage> </subpackage>

View File

@ -25,12 +25,12 @@ import kafka.coordinator.AbstractCoordinatorConcurrencyTest._
import kafka.log.UnifiedLog import kafka.log.UnifiedLog
import kafka.server._ import kafka.server._
import kafka.utils._ import kafka.utils._
import kafka.utils.timer.MockTimer
import kafka.zk.KafkaZkClient import kafka.zk.KafkaZkClient
import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.{MemoryRecords, RecordBatch, RecordConversionStats} import org.apache.kafka.common.record.{MemoryRecords, RecordBatch, RecordConversionStats}
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.server.util.timer.MockTimer
import org.apache.kafka.server.util.{MockScheduler, MockTime} import org.apache.kafka.server.util.{MockScheduler, MockTime}
import org.apache.kafka.storage.internals.log.{AppendOrigin, LogConfig} import org.apache.kafka.storage.internals.log.{AppendOrigin, LogConfig}
import org.junit.jupiter.api.{AfterEach, BeforeEach} import org.junit.jupiter.api.{AfterEach, BeforeEach}

View File

@ -21,7 +21,6 @@ import java.util.{Optional, OptionalInt}
import kafka.common.OffsetAndMetadata import kafka.common.OffsetAndMetadata
import kafka.server.{DelayedOperationPurgatory, HostedPartition, KafkaConfig, ReplicaManager, RequestLocal} import kafka.server.{DelayedOperationPurgatory, HostedPartition, KafkaConfig, ReplicaManager, RequestLocal}
import kafka.utils._ import kafka.utils._
import kafka.utils.timer.MockTimer
import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid} import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.{MemoryRecords, RecordBatch} import org.apache.kafka.common.record.{MemoryRecords, RecordBatch}
@ -37,6 +36,7 @@ import org.apache.kafka.clients.consumer.internals.ConsumerProtocol
import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity
import org.apache.kafka.server.util.timer.MockTimer
import org.apache.kafka.server.util.{KafkaScheduler, MockTime} import org.apache.kafka.server.util.{KafkaScheduler, MockTime}
import org.apache.kafka.storage.internals.log.AppendOrigin import org.apache.kafka.storage.internals.log.AppendOrigin
import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Assertions._

View File

@ -32,7 +32,6 @@ import kafka.log._
import kafka.server.QuotaFactory.{QuotaManagers, UnboundedQuota} import kafka.server.QuotaFactory.{QuotaManagers, UnboundedQuota}
import kafka.server.checkpoints.{LazyOffsetCheckpoints, OffsetCheckpointFile} import kafka.server.checkpoints.{LazyOffsetCheckpoints, OffsetCheckpointFile}
import kafka.server.epoch.util.MockBlockingSender import kafka.server.epoch.util.MockBlockingSender
import kafka.utils.timer.MockTimer
import kafka.utils.{Pool, TestUtils} import kafka.utils.{Pool, TestUtils}
import org.apache.kafka.clients.FetchSessionHandler import org.apache.kafka.clients.FetchSessionHandler
import org.apache.kafka.common.errors.{InvalidPidMappingException, KafkaStorageException} import org.apache.kafka.common.errors.{InvalidPidMappingException, KafkaStorageException}
@ -71,6 +70,7 @@ import kafka.log.remote.RemoteLogManager
import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.{AddPartitionsToTxnTopic, AddPartitionsToTxnTopicCollection, AddPartitionsToTxnTransaction} import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.{AddPartitionsToTxnTopic, AddPartitionsToTxnTopicCollection, AddPartitionsToTxnTransaction}
import org.apache.kafka.common.message.MetadataResponseData.{MetadataResponsePartition, MetadataResponseTopic} import org.apache.kafka.common.message.MetadataResponseData.{MetadataResponsePartition, MetadataResponseTopic}
import org.apache.kafka.server.util.timer.MockTimer
import org.mockito.invocation.InvocationOnMock import org.mockito.invocation.InvocationOnMock
import org.mockito.stubbing.Answer import org.mockito.stubbing.Answer
import org.mockito.{ArgumentCaptor, ArgumentMatchers} import org.mockito.{ArgumentCaptor, ArgumentMatchers}

View File

@ -1,72 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.utils.timer
import org.apache.kafka.server.util.MockTime
import org.apache.kafka.server.util.timer.{Timer, TimerTask, TimerTaskEntry}
import scala.collection.mutable
class MockTimer(val time: MockTime = new MockTime) extends Timer {
private val taskQueue = mutable.PriorityQueue.empty[TimerTaskEntry](new Ordering[TimerTaskEntry] {
override def compare(x: TimerTaskEntry, y: TimerTaskEntry): Int = java.lang.Long.compare(x.expirationMs, y.expirationMs)
}.reverse)
def add(timerTask: TimerTask): Unit = {
if (timerTask.delayMs <= 0)
timerTask.run()
else {
taskQueue synchronized {
taskQueue.enqueue(new TimerTaskEntry(timerTask, timerTask.delayMs + time.milliseconds))
}
}
}
def advanceClock(timeoutMs: Long): Boolean = {
time.sleep(timeoutMs)
var executed = false
val now = time.milliseconds
var hasMore = true
while (hasMore) {
hasMore = false
val head = taskQueue synchronized {
if (taskQueue.nonEmpty && now > taskQueue.head.expirationMs) {
val entry = Some(taskQueue.dequeue())
hasMore = taskQueue.nonEmpty
entry
} else
None
}
head.foreach { taskEntry =>
if (!taskEntry.cancelled) {
val task = taskEntry.timerTask
task.run()
executed = true
}
}
}
executed
}
def size: Int = taskQueue.size
override def close(): Unit = {}
}

View File

@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.util.timer;
import org.apache.kafka.server.util.MockTime;
import java.util.Comparator;
import java.util.PriorityQueue;
public class MockTimer implements Timer {
private final MockTime time;
private final PriorityQueue<TimerTaskEntry> taskQueue = new PriorityQueue<>(
Comparator.comparingLong(entry -> entry.expirationMs)
);
public MockTimer() {
this(new MockTime());
}
public MockTimer(MockTime time) {
this.time = time;
}
@Override
public void add(TimerTask timerTask) {
if (timerTask.delayMs <= 0) {
timerTask.run();
} else {
synchronized (taskQueue) {
taskQueue.add(new TimerTaskEntry(timerTask, timerTask.delayMs + time.milliseconds()));
}
}
}
@Override
public boolean advanceClock(long timeoutMs) throws InterruptedException {
time.sleep(timeoutMs);
final long now = time.milliseconds();
boolean executed = false;
boolean hasMore = true;
while (hasMore) {
hasMore = false;
TimerTaskEntry taskEntry = null;
synchronized (taskQueue) {
if (!taskQueue.isEmpty() && now > taskQueue.peek().expirationMs) {
taskEntry = taskQueue.poll();
hasMore = !taskQueue.isEmpty();
}
}
if (taskEntry != null) {
if (!taskEntry.cancelled()) {
taskEntry.timerTask.run();
executed = true;
}
}
}
return executed;
}
public MockTime time() {
return time;
}
public int size() {
return taskQueue.size();
}
@Override
public void close() throws Exception {}
}