KAFKA-19182 Move SchedulerTest to server module (#19608)
CI / build (push) Waiting to run Details

This PR moves SchedulerTest to server module and rewrite it with java.

Please also check updated import control config!

Reviewers: Ken Huang <s7133700@gmail.com>, Chia-Ping Tsai
 <chia7712@gmail.com>
This commit is contained in:
Uladzislau Blok 2025-05-08 18:02:38 +02:00 committed by GitHub
parent 9e785cee8d
commit 0076b65f99
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 271 additions and 234 deletions

View File

@ -911,6 +911,7 @@ project(':server') {
testImplementation project(':test-common:test-common-internal-api') testImplementation project(':test-common:test-common-internal-api')
testImplementation project(':test-common:test-common-runtime') testImplementation project(':test-common:test-common-runtime')
testImplementation project(':storage:storage-api').sourceSets.test.output testImplementation project(':storage:storage-api').sourceSets.test.output
testImplementation project(':server-common').sourceSets.test.output
testRuntimeOnly runtimeTestLibs testRuntimeOnly runtimeTestLibs
} }

View File

@ -100,6 +100,10 @@
<subpackage name="share"> <subpackage name="share">
<allow pkg="org.apache.kafka.storage.log.metrics" /> <allow pkg="org.apache.kafka.storage.log.metrics" />
</subpackage> </subpackage>
<subpackage name="util">
<allow pkg="org.apache.kafka.storage.log.metrics" />
<allow pkg="org.apache.kafka.storage.internals.epoch" />
</subpackage>
</subpackage> </subpackage>
<subpackage name="security"> <subpackage name="security">

View File

@ -1,234 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.utils
import java.util.{Optional, Properties}
import java.util.concurrent.atomic._
import java.util.concurrent.{ConcurrentHashMap, CountDownLatch, Executors, TimeUnit}
import kafka.utils.TestUtils.retry
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.server.util.{KafkaScheduler, MockTime}
import org.apache.kafka.storage.internals.log.{LocalLog, LogConfig, LogDirFailureChannel, LogLoader, LogOffsetsListener, LogSegments, ProducerStateManager, ProducerStateManagerConfig, UnifiedLog}
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, Timeout}
class SchedulerTest {
val scheduler = new KafkaScheduler(1)
val mockTime = new MockTime
val counter1 = new AtomicInteger(0)
val counter2 = new AtomicInteger(0)
@BeforeEach
def setup(): Unit = {
scheduler.startup()
}
@AfterEach
def teardown(): Unit = {
scheduler.shutdown()
}
@Test
def testMockSchedulerNonPeriodicTask(): Unit = {
mockTime.scheduler.scheduleOnce("test1", () => counter1.getAndIncrement(), 1)
mockTime.scheduler.scheduleOnce("test2", () => counter2.getAndIncrement(), 100)
assertEquals(0, counter1.get, "Counter1 should not be incremented prior to task running.")
assertEquals(0, counter2.get, "Counter2 should not be incremented prior to task running.")
mockTime.sleep(1)
assertEquals(1, counter1.get, "Counter1 should be incremented")
assertEquals(0, counter2.get, "Counter2 should not be incremented")
mockTime.sleep(100000)
assertEquals(1, counter1.get, "More sleeping should not result in more incrementing on counter1.")
assertEquals(1, counter2.get, "Counter2 should now be incremented.")
}
@Test
def testMockSchedulerPeriodicTask(): Unit = {
mockTime.scheduler.schedule("test1", () => counter1.getAndIncrement(), 1, 1)
mockTime.scheduler.schedule("test2", () => counter2.getAndIncrement(), 100, 100)
assertEquals(0, counter1.get, "Counter1 should not be incremented prior to task running.")
assertEquals(0, counter2.get, "Counter2 should not be incremented prior to task running.")
mockTime.sleep(1)
assertEquals(1, counter1.get, "Counter1 should be incremented")
assertEquals(0, counter2.get, "Counter2 should not be incremented")
mockTime.sleep(100)
assertEquals(101, counter1.get, "Counter1 should be incremented 101 times")
assertEquals(1, counter2.get, "Counter2 should not be incremented once")
}
@Test
def testReentrantTaskInMockScheduler(): Unit = {
mockTime.scheduler.scheduleOnce("test1", () => mockTime.scheduler.scheduleOnce("test2", () => counter2.getAndIncrement(), 0), 1)
mockTime.sleep(1)
assertEquals(1, counter2.get)
}
@Test
def testNonPeriodicTask(): Unit = {
scheduler.scheduleOnce("test", () => counter1.getAndIncrement())
retry(30000) {
assertEquals(counter1.get, 1)
}
Thread.sleep(5)
assertEquals(1, counter1.get, "Should only run once")
}
@Test
def testNonPeriodicTaskWhenPeriodIsZero(): Unit = {
scheduler.schedule("test", () => counter1.getAndIncrement(), 0, 0)
retry(30000) {
assertEquals(counter1.get, 1)
}
Thread.sleep(5)
assertEquals(1, counter1.get, "Should only run once")
}
@Test
def testPeriodicTask(): Unit = {
scheduler.schedule("test", () => counter1.getAndIncrement(), 0, 5)
retry(30000) {
assertTrue(counter1.get >= 20, "Should count to 20")
}
}
@Test
def testRestart(): Unit = {
// schedule a task to increment a counter
mockTime.scheduler.scheduleOnce("test1", () => counter1.getAndIncrement(), 1)
mockTime.sleep(1)
assertEquals(1, counter1.get())
// restart the scheduler
mockTime.scheduler.shutdown()
mockTime.scheduler.startup()
// schedule another task to increment the counter
mockTime.scheduler.scheduleOnce("test1", () => counter1.getAndIncrement(), 1)
mockTime.sleep(1)
assertEquals(2, counter1.get())
}
@Test
def testUnscheduleProducerTask(): Unit = {
val tmpDir = TestUtils.tempDir()
val logDir = TestUtils.randomPartitionLogDir(tmpDir)
val logConfig = new LogConfig(new Properties())
val brokerTopicStats = new BrokerTopicStats
val maxTransactionTimeoutMs = 5 * 60 * 1000
val maxProducerIdExpirationMs = TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT
val producerIdExpirationCheckIntervalMs = TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT
val topicPartition = UnifiedLog.parseTopicPartitionName(logDir)
val logDirFailureChannel = new LogDirFailureChannel(10)
val segments = new LogSegments(topicPartition)
val leaderEpochCache = UnifiedLog.createLeaderEpochCache(
logDir, topicPartition, logDirFailureChannel, Optional.empty, mockTime.scheduler)
val producerStateManagerConfig = new ProducerStateManagerConfig(maxProducerIdExpirationMs, false)
val producerStateManager = new ProducerStateManager(topicPartition, logDir,
maxTransactionTimeoutMs, producerStateManagerConfig, mockTime)
val offsets = new LogLoader(
logDir,
topicPartition,
logConfig,
scheduler,
mockTime,
logDirFailureChannel,
true,
segments,
0L,
0L,
leaderEpochCache,
producerStateManager,
new ConcurrentHashMap[String, Integer],
false
).load()
val localLog = new LocalLog(logDir, logConfig, segments, offsets.recoveryPoint,
offsets.nextOffsetMetadata, scheduler, mockTime, topicPartition, logDirFailureChannel)
val log = new UnifiedLog(offsets.logStartOffset,
localLog,
brokerTopicStats,
producerIdExpirationCheckIntervalMs,
leaderEpochCache,
producerStateManager,
Optional.empty,
false,
LogOffsetsListener.NO_OP_OFFSETS_LISTENER)
assertTrue(scheduler.taskRunning(log.producerExpireCheck))
log.close()
assertFalse(scheduler.taskRunning(log.producerExpireCheck))
}
/**
* Verify that scheduler lock is not held when invoking task method, allowing new tasks to be scheduled
* when another is being executed. This is required to avoid deadlocks when:
* a) Thread1 executes a task which attempts to acquire LockA
* b) Thread2 holding LockA attempts to schedule a new task
*/
@Timeout(15)
@Test
def testMockSchedulerLocking(): Unit = {
val initLatch = new CountDownLatch(1)
val completionLatch = new CountDownLatch(2)
val taskLatches = List(new CountDownLatch(1), new CountDownLatch(1))
def scheduledTask(taskLatch: CountDownLatch): Unit = {
initLatch.countDown()
assertTrue(taskLatch.await(30, TimeUnit.SECONDS), "Timed out waiting for latch")
completionLatch.countDown()
}
mockTime.scheduler.scheduleOnce("test1", () => scheduledTask(taskLatches.head), 1)
val tickExecutor = Executors.newSingleThreadScheduledExecutor()
try {
tickExecutor.scheduleWithFixedDelay(() => mockTime.sleep(1), 0, 1, TimeUnit.MILLISECONDS)
// wait for first task to execute and then schedule the next task while the first one is running
assertTrue(initLatch.await(10, TimeUnit.SECONDS))
mockTime.scheduler.scheduleOnce("test2", () => scheduledTask(taskLatches(1)), 1)
taskLatches.foreach(_.countDown())
assertTrue(completionLatch.await(10, TimeUnit.SECONDS), "Tasks did not complete")
} finally {
tickExecutor.shutdownNow()
}
}
@Test
def testPendingTaskSize(): Unit = {
val latch1 = new CountDownLatch(1)
val latch2 = new CountDownLatch(2)
val task1 = new Runnable {
override def run(): Unit = {
latch1.await()
}
}
scheduler.scheduleOnce("task1", task1, 0)
scheduler.scheduleOnce("task2", () => latch2.countDown(), 5)
scheduler.scheduleOnce("task3", () => latch2.countDown(), 5)
retry(30000) {
assertEquals(2, scheduler.pendingTaskSize())
}
latch1.countDown()
latch2.await()
retry(30000) {
assertEquals(0, scheduler.pendingTaskSize())
}
scheduler.shutdown()
assertEquals(0, scheduler.pendingTaskSize())
}
}

View File

@ -0,0 +1,266 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.util;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.coordinator.transaction.TransactionLogConfig;
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache;
import org.apache.kafka.storage.internals.log.LoadedLogOffsets;
import org.apache.kafka.storage.internals.log.LocalLog;
import org.apache.kafka.storage.internals.log.LogConfig;
import org.apache.kafka.storage.internals.log.LogDirFailureChannel;
import org.apache.kafka.storage.internals.log.LogLoader;
import org.apache.kafka.storage.internals.log.LogOffsetsListener;
import org.apache.kafka.storage.internals.log.LogSegments;
import org.apache.kafka.storage.internals.log.ProducerStateManager;
import org.apache.kafka.storage.internals.log.ProducerStateManagerConfig;
import org.apache.kafka.storage.internals.log.UnifiedLog;
import org.apache.kafka.storage.log.metrics.BrokerTopicStats;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class SchedulerTest {
private final KafkaScheduler scheduler = new KafkaScheduler(1);
private final MockTime mockTime = new MockTime();
private final AtomicInteger counter1 = new AtomicInteger(0);
private final AtomicInteger counter2 = new AtomicInteger(0);
@BeforeEach
void setup() {
counter1.set(0);
counter2.set(0);
scheduler.startup();
}
@AfterEach
void teardown() throws InterruptedException {
scheduler.shutdown();
}
@Test
void testMockSchedulerNonPeriodicTask() {
mockTime.scheduler.scheduleOnce("test1", counter1::getAndIncrement, 1);
mockTime.scheduler.scheduleOnce("test2", counter2::getAndIncrement, 100);
assertEquals(0, counter1.get(), "Counter1 should not be incremented prior to task running.");
assertEquals(0, counter2.get(), "Counter2 should not be incremented prior to task running.");
mockTime.sleep(1);
assertEquals(1, counter1.get(), "Counter1 should be incremented");
assertEquals(0, counter2.get(), "Counter2 should not be incremented");
mockTime.sleep(100000);
assertEquals(1, counter1.get(), "More sleeping should not result in more incrementing on counter1.");
assertEquals(1, counter2.get(), "Counter2 should now be incremented.");
}
@Test
void testMockSchedulerPeriodicTask() {
mockTime.scheduler.schedule("test1", counter1::getAndIncrement, 1, 1);
mockTime.scheduler.schedule("test2", counter2::getAndIncrement, 100, 100);
assertEquals(0, counter1.get(), "Counter1 should not be incremented prior to task running.");
assertEquals(0, counter2.get(), "Counter2 should not be incremented prior to task running.");
mockTime.sleep(1);
assertEquals(1, counter1.get(), "Counter1 should be incremented");
assertEquals(0, counter2.get(), "Counter2 should not be incremented");
mockTime.sleep(100);
assertEquals(101, counter1.get(), "Counter1 should be incremented 101 times");
assertEquals(1, counter2.get(), "Counter2 should not be incremented once");
}
@Test
void testReentrantTaskInMockScheduler() {
mockTime.scheduler.scheduleOnce("test1", () -> mockTime.scheduler.scheduleOnce("test2", counter2::getAndIncrement, 0), 1);
mockTime.sleep(1);
assertEquals(1, counter2.get());
}
@Test
void testNonPeriodicTask() throws InterruptedException {
scheduler.scheduleOnce("test", counter1::getAndIncrement);
TestUtils.waitForCondition(() -> counter1.get() == 1, "Scheduled task was not executed");
Thread.sleep(5);
assertEquals(1, counter1.get(), "Should only run once");
}
@Test
void testNonPeriodicTaskWhenPeriodIsZero() throws InterruptedException {
scheduler.schedule("test", counter1::getAndIncrement, 0, 0);
TestUtils.waitForCondition(() -> counter1.get() == 1, "Scheduled task was not executed");
Thread.sleep(5);
assertEquals(1, counter1.get(), "Should only run once");
}
@Test
void testPeriodicTask() throws InterruptedException {
scheduler.schedule("test", counter1::getAndIncrement, 0, 5);
TestUtils.waitForCondition(() -> counter1.get() >= 20, "Should count to 20");
}
@Test
void testRestart() throws InterruptedException {
// schedule a task to increment a counter
mockTime.scheduler.scheduleOnce("test1", counter1::getAndIncrement, 1);
mockTime.sleep(1);
assertEquals(1, counter1.get());
// restart the scheduler
mockTime.scheduler.shutdown();
mockTime.scheduler.startup();
// schedule another task to increment the counter
mockTime.scheduler.scheduleOnce("test1", counter1::getAndIncrement, 1);
mockTime.sleep(1);
assertEquals(2, counter1.get());
}
@Test
void testUnscheduleProducerTask() throws IOException {
File tmpDir = TestUtils.tempDirectory();
File logDir = TestUtils.randomPartitionLogDir(tmpDir);
LogConfig logConfig = new LogConfig(new Properties());
BrokerTopicStats brokerTopicStats = new BrokerTopicStats();
int maxTransactionTimeoutMs = 5 * 60 * 1000;
int maxProducerIdExpirationMs = TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT;
int producerIdExpirationCheckIntervalMs = TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT;
TopicPartition topicPartition = UnifiedLog.parseTopicPartitionName(logDir);
LogDirFailureChannel logDirFailureChannel = new LogDirFailureChannel(10);
LogSegments segments = new LogSegments(topicPartition);
LeaderEpochFileCache leaderEpochCache = UnifiedLog.createLeaderEpochCache(logDir, topicPartition,
logDirFailureChannel, Optional.empty(), mockTime.scheduler);
ProducerStateManagerConfig producerStateManagerConfig = new ProducerStateManagerConfig(maxProducerIdExpirationMs, false);
ProducerStateManager producerStateManager = new ProducerStateManager(topicPartition, logDir,
maxTransactionTimeoutMs, producerStateManagerConfig, mockTime);
LoadedLogOffsets offsets = new LogLoader(
logDir,
topicPartition,
logConfig,
scheduler,
mockTime,
logDirFailureChannel,
true,
segments,
0L,
0L,
leaderEpochCache,
producerStateManager,
new ConcurrentHashMap<>(), false).load();
LocalLog localLog = new LocalLog(logDir, logConfig, segments, offsets.recoveryPoint,
offsets.nextOffsetMetadata, scheduler, mockTime, topicPartition, logDirFailureChannel);
UnifiedLog log = new UnifiedLog(offsets.logStartOffset,
localLog,
brokerTopicStats,
producerIdExpirationCheckIntervalMs,
leaderEpochCache,
producerStateManager,
Optional.empty(),
false,
LogOffsetsListener.NO_OP_OFFSETS_LISTENER);
assertTrue(scheduler.taskRunning(log.producerExpireCheck()));
log.close();
assertFalse(scheduler.taskRunning(log.producerExpireCheck()));
}
/**
* Verify that scheduler lock is not held when invoking task method, allowing new tasks to be scheduled
* when another is being executed. This is required to avoid deadlocks when:
* <ul>
* <li>Thread1 executes a task which attempts to acquire LockA</li>
* <li>Thread2 holding LockA attempts to schedule a new task</li>
* </ul>
*/
@Timeout(15)
@Test
void testMockSchedulerLocking() throws InterruptedException {
CountDownLatch initLatch = new CountDownLatch(1);
CountDownLatch completionLatch = new CountDownLatch(2);
List<CountDownLatch> taskLatches = List.of(new CountDownLatch(1), new CountDownLatch(1));
InterruptedConsumer<CountDownLatch> scheduledTask = taskLatch -> {
initLatch.countDown();
assertTrue(taskLatch.await(30, TimeUnit.SECONDS), "Timed out waiting for latch");
completionLatch.countDown();
};
mockTime.scheduler.scheduleOnce("test1", interruptedRunnableWrapper(() -> scheduledTask.accept(taskLatches.get(0))), 1);
ScheduledExecutorService tickExecutor = Executors.newSingleThreadScheduledExecutor();
try {
tickExecutor.scheduleWithFixedDelay(() -> mockTime.sleep(1), 0, 1, TimeUnit.MILLISECONDS);
// wait for first task to execute and then schedule the next task while the first one is running
assertTrue(initLatch.await(10, TimeUnit.SECONDS));
mockTime.scheduler.scheduleOnce("test2", interruptedRunnableWrapper(() -> scheduledTask.accept(taskLatches.get(1))), 1);
taskLatches.forEach(CountDownLatch::countDown);
assertTrue(completionLatch.await(10, TimeUnit.SECONDS), "Tasks did not complete");
} finally {
tickExecutor.shutdownNow();
}
}
@Test
void testPendingTaskSize() throws InterruptedException {
CountDownLatch latch1 = new CountDownLatch(1);
CountDownLatch latch2 = new CountDownLatch(2);
scheduler.scheduleOnce("task1", interruptedRunnableWrapper(latch1::await), 0);
scheduler.scheduleOnce("task2", latch2::countDown, 5);
scheduler.scheduleOnce("task3", latch2::countDown, 5);
TestUtils.waitForCondition(() -> scheduler.pendingTaskSize() <= 2, "Scheduled task was not executed");
latch1.countDown();
latch2.await();
TestUtils.waitForCondition(() -> scheduler.pendingTaskSize() == 0, "Scheduled task was not executed");
scheduler.shutdown();
assertEquals(0, scheduler.pendingTaskSize());
}
@FunctionalInterface
private interface InterruptedConsumer<T> {
void accept(T t) throws InterruptedException;
}
@FunctionalInterface
private interface InterruptedRunnable {
void run() throws InterruptedException;
}
private static Runnable interruptedRunnableWrapper(InterruptedRunnable runnable) {
return () -> {
try {
runnable.run();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
};
}
}