KAFKA-16552 Create an internal config to control InitialTaskDelayMs in LogManager to speed up tests (#15719)

Reviewers: Luke Chen <showuon@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Kuan-Po (Cooper) Tseng 2024-04-20 20:34:02 +08:00 committed by GitHub
parent 613d4c8578
commit ced79ee12f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 41 additions and 19 deletions

View File

@ -22,6 +22,7 @@ import kafka.server.BrokerTopicStats;
import kafka.server.metadata.ConfigRepository;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.config.ServerLogConfigs;
import org.apache.kafka.storage.internals.log.CleanerConfig;
import org.apache.kafka.storage.internals.log.LogConfig;
import org.apache.kafka.storage.internals.log.LogDirFailureChannel;
@ -55,6 +56,7 @@ public class LogManagerBuilder {
private Time time = Time.SYSTEM;
private boolean keepPartitionMetadataFile = true;
private boolean remoteStorageSystemEnable = false;
private long initialTaskDelayMs = ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_DEFAULT;
public LogManagerBuilder setLogDirs(List<File> logDirs) {
this.logDirs = logDirs;
@ -151,6 +153,11 @@ public class LogManagerBuilder {
return this;
}
public LogManagerBuilder setInitialTaskDelayMs(long initialTaskDelayMs) {
this.initialTaskDelayMs = initialTaskDelayMs;
return this;
}
public LogManager build() {
if (logDirs == null) throw new RuntimeException("you must set logDirs");
if (configRepository == null) throw new RuntimeException("you must set configRepository");
@ -179,6 +186,7 @@ public class LogManagerBuilder {
logDirFailureChannel,
time,
keepPartitionMetadataFile,
remoteStorageSystemEnable);
remoteStorageSystemEnable,
initialTaskDelayMs);
}
}

View File

@ -81,14 +81,13 @@ class LogManager(logDirs: Seq[File],
logDirFailureChannel: LogDirFailureChannel,
time: Time,
val keepPartitionMetadataFile: Boolean,
remoteStorageSystemEnable: Boolean) extends Logging {
remoteStorageSystemEnable: Boolean,
val initialTaskDelayMs: Long) extends Logging {
import LogManager._
private val metricsGroup = new KafkaMetricsGroup(this.getClass)
val InitialTaskDelayMs: Int = 30 * 1000
private val logCreationOrDeletionLock = new Object
private val currentLogs = new Pool[TopicPartition, UnifiedLog]()
// Future logs are put in the directory with "-future" suffix. Future log is created when user wants to move replica
@ -628,24 +627,24 @@ class LogManager(logDirs: Seq[File],
info("Starting log cleanup with a period of %d ms.".format(retentionCheckMs))
scheduler.schedule("kafka-log-retention",
() => cleanupLogs(),
InitialTaskDelayMs,
initialTaskDelayMs,
retentionCheckMs)
info("Starting log flusher with a default period of %d ms.".format(flushCheckMs))
scheduler.schedule("kafka-log-flusher",
() => flushDirtyLogs(),
InitialTaskDelayMs,
initialTaskDelayMs,
flushCheckMs)
scheduler.schedule("kafka-recovery-point-checkpoint",
() => checkpointLogRecoveryOffsets(),
InitialTaskDelayMs,
initialTaskDelayMs,
flushRecoveryOffsetCheckpointMs)
scheduler.schedule("kafka-log-start-offset-checkpoint",
() => checkpointLogStartOffsets(),
InitialTaskDelayMs,
initialTaskDelayMs,
flushStartOffsetCheckpointMs)
scheduler.scheduleOnce("kafka-delete-logs", // will be rescheduled after each delete logs with a dynamic period
() => deleteLogs(),
InitialTaskDelayMs)
initialTaskDelayMs)
}
if (cleanerConfig.enableCleaner) {
_cleaner = new LogCleaner(cleanerConfig, liveLogDirs, currentLogs, logDirFailureChannel, time = time)
@ -1584,7 +1583,8 @@ object LogManager {
time = time,
keepPartitionMetadataFile = keepPartitionMetadataFile,
interBrokerProtocolVersion = config.interBrokerProtocolVersion,
remoteStorageSystemEnable = config.remoteLogManagerConfig.enableRemoteStorageSystem())
remoteStorageSystemEnable = config.remoteLogManagerConfig.enableRemoteStorageSystem(),
initialTaskDelayMs = config.logInitialTaskDelayMs)
}
/**

View File

@ -573,6 +573,7 @@ object KafkaConfig {
.define(ServerLogConfigs.CREATE_TOPIC_POLICY_CLASS_NAME_CONFIG, CLASS, null, LOW, ServerLogConfigs.CREATE_TOPIC_POLICY_CLASS_NAME_DOC)
.define(ServerLogConfigs.ALTER_CONFIG_POLICY_CLASS_NAME_CONFIG, CLASS, null, LOW, ServerLogConfigs.ALTER_CONFIG_POLICY_CLASS_NAME_DOC)
.define(ServerLogConfigs.LOG_MESSAGE_DOWNCONVERSION_ENABLE_CONFIG, BOOLEAN, ServerLogConfigs.LOG_MESSAGE_DOWNCONVERSION_ENABLE_DEFAULT, LOW, ServerLogConfigs.LOG_MESSAGE_DOWNCONVERSION_ENABLE_DOC)
.defineInternal(ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_CONFIG, LONG, ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_DEFAULT, atLeast(0), LOW, ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_DOC)
/** ********* Replication configuration ***********/
.define(ReplicationConfigs.CONTROLLER_SOCKET_TIMEOUT_MS_CONFIG, INT, ReplicationConfigs.CONTROLLER_SOCKET_TIMEOUT_MS_DEFAULT, MEDIUM, ReplicationConfigs.CONTROLLER_SOCKET_TIMEOUT_MS_DOC)
@ -1150,6 +1151,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
def logFlushIntervalMs: java.lang.Long = Option(getLong(ServerLogConfigs.LOG_FLUSH_INTERVAL_MS_CONFIG)).getOrElse(getLong(ServerLogConfigs.LOG_FLUSH_SCHEDULER_INTERVAL_MS_CONFIG))
def minInSyncReplicas = getInt(ServerLogConfigs.MIN_IN_SYNC_REPLICAS_CONFIG)
def logPreAllocateEnable: java.lang.Boolean = getBoolean(ServerLogConfigs.LOG_PRE_ALLOCATE_CONFIG)
def logInitialTaskDelayMs: java.lang.Long = Option(getLong(ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_CONFIG)).getOrElse(ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_DEFAULT)
// We keep the user-provided String as `MetadataVersion.fromVersionString` can choose a slightly different version (eg if `0.10.0`
// is passed, `0.10.0-IV0` may be picked)

View File

@ -127,7 +127,8 @@ class LogLoaderTest {
logDirFailureChannel = logDirFailureChannel,
time = time,
keepPartitionMetadataFile = config.usesTopicId,
remoteStorageSystemEnable = config.remoteLogManagerConfig.enableRemoteStorageSystem()) {
remoteStorageSystemEnable = config.remoteLogManagerConfig.enableRemoteStorageSystem(),
initialTaskDelayMs = config.logInitialTaskDelayMs) {
override def loadLog(logDir: File, hadCleanShutdown: Boolean, recoveryPoints: Map[TopicPartition, Long],
logStartOffsets: Map[TopicPartition, Long], defaultConfig: LogConfig,

View File

@ -69,12 +69,14 @@ class LogManagerTest {
var logManager: LogManager = _
val name = "kafka"
val veryLargeLogFlushInterval = 10000000L
val initialTaskDelayMs: Long = 10 * 1000
@BeforeEach
def setUp(): Unit = {
logDir = TestUtils.tempDir()
logManager = createLogManager()
logManager.startup(Set.empty)
assertEquals(initialTaskDelayMs, logManager.initialTaskDelayMs)
}
@AfterEach
@ -413,7 +415,7 @@ class LogManagerTest {
assertEquals(numMessages * setSize / segmentBytes, log.numberOfSegments, "Check we have the expected number of segments.")
// this cleanup shouldn't find any expired segments but should delete some to reduce size
time.sleep(logManager.InitialTaskDelayMs)
time.sleep(logManager.initialTaskDelayMs)
assertEquals(6, log.numberOfSegments, "Now there should be exactly 6 segments")
time.sleep(log.config.fileDeleteDelayMs + 1)
@ -482,7 +484,7 @@ class LogManagerTest {
val set = TestUtils.singletonRecords("test".getBytes())
log.appendAsLeader(set, leaderEpoch = 0)
}
time.sleep(logManager.InitialTaskDelayMs)
time.sleep(logManager.initialTaskDelayMs)
assertTrue(lastFlush != log.lastFlushTime, "Time based flush should have been triggered")
}
@ -604,7 +606,8 @@ class LogManagerTest {
configRepository = configRepository,
logDirs = logDirs,
time = this.time,
recoveryThreadsPerDataDir = recoveryThreadsPerDataDir)
recoveryThreadsPerDataDir = recoveryThreadsPerDataDir,
initialTaskDelayMs = initialTaskDelayMs)
}
@Test
@ -637,9 +640,9 @@ class LogManagerTest {
fileInIndex.get.getAbsolutePath)
}
time.sleep(logManager.InitialTaskDelayMs)
time.sleep(logManager.initialTaskDelayMs)
assertTrue(logManager.hasLogsToBeDeleted, "Logs deleted too early")
time.sleep(logManager.currentDefaultConfig.fileDeleteDelayMs - logManager.InitialTaskDelayMs)
time.sleep(logManager.currentDefaultConfig.fileDeleteDelayMs - logManager.initialTaskDelayMs)
assertFalse(logManager.hasLogsToBeDeleted, "Logs not deleted")
}

View File

@ -1506,7 +1506,8 @@ object TestUtils extends Logging {
recoveryThreadsPerDataDir: Int = 4,
transactionVerificationEnabled: Boolean = false,
log: Option[UnifiedLog] = None,
remoteStorageSystemEnable: Boolean = false): LogManager = {
remoteStorageSystemEnable: Boolean = false,
initialTaskDelayMs: Long = ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_DEFAULT): LogManager = {
val logManager = new LogManager(logDirs = logDirs.map(_.getAbsoluteFile),
initialOfflineDirs = Array.empty[File],
configRepository = configRepository,
@ -1526,7 +1527,8 @@ object TestUtils extends Logging {
logDirFailureChannel = new LogDirFailureChannel(logDirs.size),
keepPartitionMetadataFile = true,
interBrokerProtocolVersion = interBrokerProtocolVersion,
remoteStorageSystemEnable = remoteStorageSystemEnable)
remoteStorageSystemEnable = remoteStorageSystemEnable,
initialTaskDelayMs = initialTaskDelayMs)
if (log.isDefined) {
val spyLogManager = Mockito.spy(logManager)

View File

@ -22,6 +22,7 @@ import kafka.server.AlterPartitionManager;
import kafka.server.BrokerFeatures;
import kafka.server.BrokerTopicStats;
import kafka.server.KafkaConfig;
import org.apache.kafka.server.config.ServerLogConfigs;
import org.apache.kafka.server.util.MockTime;
import org.apache.kafka.storage.internals.log.CleanerConfig;
import org.apache.kafka.storage.internals.log.LogConfig;
@ -109,7 +110,7 @@ public class CheckpointBench {
JavaConverters.seqAsJavaList(brokerProperties.logDirs()).stream().map(File::new).collect(Collectors.toList());
this.logManager = TestUtils.createLogManager(JavaConverters.asScalaBuffer(files),
new LogConfig(new Properties()), new MockConfigRepository(), new CleanerConfig(1, 4 * 1024 * 1024L, 0.9d,
1024 * 1024, 32 * 1024 * 1024, Double.MAX_VALUE, 15 * 1000, true), time, MetadataVersion.latestTesting(), 4, false, Option.empty(), false);
1024 * 1024, 32 * 1024 * 1024, Double.MAX_VALUE, 15 * 1000, true), time, MetadataVersion.latestTesting(), 4, false, Option.empty(), false, ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_DEFAULT);
scheduler.startup();
final BrokerTopicStats brokerTopicStats = new BrokerTopicStats(Optional.empty());
final MetadataCache metadataCache =

View File

@ -185,4 +185,9 @@ public class ServerLogConfigs {
"broker will not perform down-conversion for consumers expecting an older message format. The broker responds " +
"with <code>UNSUPPORTED_VERSION</code> error for consume requests from such older clients. This configuration" +
"does not apply to any message format conversion that might be required for replication to followers.";
public static final String LOG_INITIAL_TASK_DELAY_MS_CONFIG = LOG_PREFIX + "initial.task.delay.ms";
public static final long LOG_INITIAL_TASK_DELAY_MS_DEFAULT = 30 * 1000L;
public static final String LOG_INITIAL_TASK_DELAY_MS_DOC = "The initial task delay in millisecond when initializing " +
"tasks in LogManager. This should be used for testing only.";
}