diff --git a/checkstyle/import-control-storage.xml b/checkstyle/import-control-storage.xml
index d0b6524d9e1..1922b9c9384 100644
--- a/checkstyle/import-control-storage.xml
+++ b/checkstyle/import-control-storage.xml
@@ -85,6 +85,7 @@
         
         
         
+        
         
         
         
diff --git a/core/src/main/scala/kafka/cluster/Replica.scala b/core/src/main/scala/kafka/cluster/Replica.scala
index 5a2fb1abe06..8143999419d 100644
--- a/core/src/main/scala/kafka/cluster/Replica.scala
+++ b/core/src/main/scala/kafka/cluster/Replica.scala
@@ -17,12 +17,11 @@
 
 package kafka.cluster
 
-import kafka.log.UnifiedLog
 import kafka.server.MetadataCache
 import kafka.utils.Logging
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.NotLeaderOrFollowerException
-import org.apache.kafka.storage.internals.log.LogOffsetMetadata
+import org.apache.kafka.storage.internals.log.{LogOffsetMetadata, UnifiedLog}
 
 import java.util.concurrent.atomic.AtomicReference
 
@@ -75,7 +74,7 @@ case class ReplicaState(
 object ReplicaState {
   val Empty: ReplicaState = ReplicaState(
     logEndOffsetMetadata = LogOffsetMetadata.UNKNOWN_OFFSET_METADATA,
-    logStartOffset = UnifiedLog.UnknownOffset,
+    logStartOffset = UnifiedLog.UNKNOWN_OFFSET,
     lastFetchLeaderLogEndOffset = 0L,
     lastFetchTimeMs = 0L,
     lastCaughtUpTimeMs = 0L,
@@ -157,9 +156,9 @@ class Replica(val brokerId: Int, val topicPartition: TopicPartition, val metadat
 
       if (isNewLeader) {
         ReplicaState(
-          logStartOffset = UnifiedLog.UnknownOffset,
+          logStartOffset = UnifiedLog.UNKNOWN_OFFSET,
           logEndOffsetMetadata = LogOffsetMetadata.UNKNOWN_OFFSET_METADATA,
-          lastFetchLeaderLogEndOffset = UnifiedLog.UnknownOffset,
+          lastFetchLeaderLogEndOffset = UnifiedLog.UNKNOWN_OFFSET,
           lastFetchTimeMs = 0L,
           lastCaughtUpTimeMs = lastCaughtUpTimeMs,
           brokerEpoch = Option.empty
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index 4f5aaa41375..3f6b464420c 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -34,7 +34,7 @@ import org.apache.kafka.common.utils.{BufferSupplier, Time}
 import org.apache.kafka.server.config.ServerConfigs
 import org.apache.kafka.server.metrics.KafkaMetricsGroup
 import org.apache.kafka.server.util.ShutdownableThread
-import org.apache.kafka.storage.internals.log.{AbortedTxn, CleanerConfig, LastRecord, LogCleaningAbortedException, LogDirFailureChannel, LogSegment, LogSegmentOffsetOverflowException, OffsetMap, SkimpyOffsetMap, ThreadShutdownException, TransactionIndex}
+import org.apache.kafka.storage.internals.log.{AbortedTxn, CleanerConfig, LastRecord, LogCleaningAbortedException, LogDirFailureChannel, LogSegment, LogSegmentOffsetOverflowException, OffsetMap, SkimpyOffsetMap, ThreadShutdownException, TransactionIndex, UnifiedLog => JUnifiedLog}
 import org.apache.kafka.storage.internals.utils.Throttler
 
 import scala.jdk.CollectionConverters._
@@ -655,7 +655,7 @@ private[log] class Cleaner(val id: Int,
                                  legacyDeleteHorizonMs: Long,
                                  upperBoundOffsetOfCleaningRound: Long): Unit = {
     // create a new segment with a suffix appended to the name of the log and indexes
-    val cleaned = UnifiedLog.createNewCleanedSegment(log.dir, log.config, segments.head.baseOffset)
+    val cleaned = JUnifiedLog.createNewCleanedSegment(log.dir, log.config, segments.head.baseOffset)
     transactionMetadata.cleanedIndex = Some(cleaned.txnIndex)
 
     try {
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index 72cf277be27..60dd708a548 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -41,7 +41,7 @@ import org.apache.kafka.metadata.properties.{MetaProperties, MetaPropertiesEnsem
 import java.util.{Collections, OptionalLong, Properties}
 import org.apache.kafka.server.metrics.KafkaMetricsGroup
 import org.apache.kafka.server.util.{FileLock, Scheduler}
-import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig, LogDirFailureChannel, ProducerStateManagerConfig, RemoteIndexCache}
+import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig, LogDirFailureChannel, ProducerStateManagerConfig, UnifiedLog => JUnifiedLog, RemoteIndexCache}
 import org.apache.kafka.storage.internals.checkpoint.{CleanShutdownFileHandler, OffsetCheckpointFile}
 import org.apache.kafka.storage.log.metrics.BrokerTopicStats
 
@@ -323,7 +323,7 @@ class LogManager(logDirs: Seq[File],
                            topicConfigOverrides: Map[String, LogConfig],
                            numRemainingSegments: ConcurrentMap[String, Integer],
                            isStray: UnifiedLog => Boolean): UnifiedLog = {
-    val topicPartition = UnifiedLog.parseTopicPartitionName(logDir)
+    val topicPartition = JUnifiedLog.parseTopicPartitionName(logDir)
     val config = topicConfigOverrides.getOrElse(topicPartition.topic, defaultConfig)
     val logRecoveryPoint = recoveryPoints.getOrDefault(topicPartition, 0L)
     val logStartOffset = logStartOffsets.getOrDefault(topicPartition, 0L)
@@ -345,9 +345,9 @@ class LogManager(logDirs: Seq[File],
       numRemainingSegments = numRemainingSegments,
       remoteStorageSystemEnable = remoteStorageSystemEnable)
 
-    if (logDir.getName.endsWith(UnifiedLog.DeleteDirSuffix)) {
+    if (logDir.getName.endsWith(JUnifiedLog.DELETE_DIR_SUFFIX)) {
       addLogToBeDeleted(log)
-    } else if (logDir.getName.endsWith(UnifiedLog.StrayDirSuffix)) {
+    } else if (logDir.getName.endsWith(JUnifiedLog.STRAY_DIR_SUFFIX)) {
       addStrayLog(topicPartition, log)
       warn(s"Loaded stray log: $logDir")
     } else if (isStray(log)) {
@@ -355,7 +355,7 @@ class LogManager(logDirs: Seq[File],
       // Broker with an offline directory may be unable to detect it still holds a to-be-deleted replica,
       // and can create a conflicting topic partition for a new incarnation of the topic in one of the remaining online directories.
       // So upon a restart in which the offline directory is back online we need to clean up the old replica directory.
-      log.renameDir(UnifiedLog.logStrayDirName(log.topicPartition), shouldReinitialize = false)
+      log.renameDir(JUnifiedLog.logStrayDirName(log.topicPartition), shouldReinitialize = false)
       addStrayLog(log.topicPartition, log)
       warn(s"Log in ${logDir.getAbsolutePath} marked stray and renamed to ${log.dir.getAbsolutePath}")
     } else {
@@ -461,7 +461,7 @@ class LogManager(logDirs: Seq[File],
             // Ignore remote-log-index-cache directory as that is index cache maintained by tiered storage subsystem
             // but not any topic-partition dir.
             !logDir.getName.equals(RemoteIndexCache.DIR_NAME) &&
-            UnifiedLog.parseTopicPartitionName(logDir).topic != KafkaRaftServer.MetadataTopic)
+            JUnifiedLog.parseTopicPartitionName(logDir).topic != KafkaRaftServer.MetadataTopic)
         numTotalLogs += logsToLoad.length
         numRemainingLogs.put(logDirAbsolutePath, logsToLoad.length)
         loadLogsCompletedFlags.put(logDirAbsolutePath, logsToLoad.isEmpty)
@@ -1037,9 +1037,9 @@ class LogManager(logDirs: Seq[File],
 
         val logDirName = {
           if (isFuture)
-            UnifiedLog.logFutureDirName(topicPartition)
+            JUnifiedLog.logFutureDirName(topicPartition)
           else
-            UnifiedLog.logDirName(topicPartition)
+            JUnifiedLog.logDirName(topicPartition)
         }
 
         val logDir = logDirs
@@ -1213,7 +1213,7 @@ class LogManager(logDirs: Seq[File],
   def replaceCurrentWithFutureLog(sourceLog: Option[UnifiedLog], destLog: UnifiedLog, updateHighWatermark: Boolean = false): Unit = {
     val topicPartition = destLog.topicPartition
 
-    destLog.renameDir(UnifiedLog.logDirName(topicPartition), shouldReinitialize = true)
+    destLog.renameDir(JUnifiedLog.logDirName(topicPartition), shouldReinitialize = true)
     // the metrics tags still contain "future", so we have to remove it.
     // we will add metrics back after sourceLog remove the metrics
     destLog.removeLogMetrics()
@@ -1234,7 +1234,7 @@ class LogManager(logDirs: Seq[File],
 
     try {
       sourceLog.foreach { srcLog =>
-        srcLog.renameDir(UnifiedLog.logDeleteDirName(topicPartition), shouldReinitialize = true)
+        srcLog.renameDir(JUnifiedLog.logDeleteDirName(topicPartition), shouldReinitialize = true)
         // Now that replica in source log directory has been successfully renamed for deletion.
         // Close the log, update checkpoint files, and enqueue this log to be deleted.
         srcLog.close()
@@ -1285,10 +1285,10 @@ class LogManager(logDirs: Seq[File],
         }
         if (isStray) {
           // Move aside stray partitions, don't delete them
-          removedLog.renameDir(UnifiedLog.logStrayDirName(topicPartition), shouldReinitialize = false)
+          removedLog.renameDir(JUnifiedLog.logStrayDirName(topicPartition), shouldReinitialize = false)
           warn(s"Log for partition ${removedLog.topicPartition} is marked as stray and renamed to ${removedLog.dir.getAbsolutePath}")
         } else {
-          removedLog.renameDir(UnifiedLog.logDeleteDirName(topicPartition), shouldReinitialize = false)
+          removedLog.renameDir(JUnifiedLog.logDeleteDirName(topicPartition), shouldReinitialize = false)
           addLogToBeDeleted(removedLog)
           info(s"Log for partition ${removedLog.topicPartition} is renamed to ${removedLog.dir.getAbsolutePath} and is scheduled for deletion")
         }
diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala b/core/src/main/scala/kafka/log/UnifiedLog.scala
index f5d6b867201..e0d2a216094 100644
--- a/core/src/main/scala/kafka/log/UnifiedLog.scala
+++ b/core/src/main/scala/kafka/log/UnifiedLog.scala
@@ -20,7 +20,6 @@ package kafka.log
 import kafka.log.remote.RemoteLogManager
 import kafka.utils._
 import org.apache.kafka.common.errors._
-import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.message.DescribeProducersResponseData
 import org.apache.kafka.common.record.FileRecords.TimestampAndOffset
 import org.apache.kafka.common.record._
@@ -30,16 +29,14 @@ import org.apache.kafka.common.requests.ProduceResponse.RecordError
 import org.apache.kafka.common.utils.{PrimitiveRef, Time, Utils}
 import org.apache.kafka.common.{InvalidRecordException, KafkaException, TopicPartition, Uuid}
 import org.apache.kafka.server.common.{OffsetAndEpoch, RequestLocal}
-import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig
 import org.apache.kafka.server.metrics.KafkaMetricsGroup
 import org.apache.kafka.server.record.BrokerCompressionType
 import org.apache.kafka.server.storage.log.{FetchIsolation, UnexpectedAppendOffsetException}
 import org.apache.kafka.server.util.Scheduler
-import org.apache.kafka.storage.internals.checkpoint.{LeaderEpochCheckpointFile, PartitionMetadataFile}
+import org.apache.kafka.storage.internals.checkpoint.PartitionMetadataFile
 import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
-import org.apache.kafka.storage.internals.log.LocalLog.SplitSegmentResult
-import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, BatchMetadata, CompletedTxn, FetchDataInfo, LastRecord, LeaderHwChange, LocalLog, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogLoader, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogSegment, LogSegments, LogStartOffsetIncrementReason, LogValidator, OffsetResultHolder, OffsetsOutOfOrderException, ProducerAppendInfo, ProducerStateManager, ProducerStateManagerConfig, RollParams, SegmentDeletionReason, VerificationGuard, UnifiedLog => JUnifiedLog}
-import org.apache.kafka.storage.log.metrics.{BrokerTopicMetrics, BrokerTopicStats}
+import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, BatchMetadata, CompletedTxn, FetchDataInfo, LastRecord, LeaderHwChange, LocalLog, LogAppendInfo, LogConfig, LogDirFailureChannel, LogLoader, LogMetricNames, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogSegment, LogSegments, LogStartOffsetIncrementReason, LogValidator, OffsetResultHolder, OffsetsOutOfOrderException, ProducerAppendInfo, ProducerStateManager, ProducerStateManagerConfig, RollParams, SegmentDeletionReason, VerificationGuard, UnifiedLog => JUnifiedLog}
+import org.apache.kafka.storage.log.metrics.BrokerTopicStats
 
 import java.io.{File, IOException}
 import java.lang.{Long => JLong}
@@ -107,7 +104,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
   /* A lock that guards all modifications to the log */
   private val lock = new Object
-  private val validatorMetricsRecorder = newValidatorMetricsRecorder(brokerTopicStats.allTopicsStats)
+  private val validatorMetricsRecorder = JUnifiedLog.newValidatorMetricsRecorder(brokerTopicStats.allTopicsStats)
 
   /* The earliest offset which is part of an incomplete transaction. This is used to compute the
    * last stable offset (LSO) in ReplicaManager. Note that it is possible that the "true" first unstable offset
@@ -177,7 +174,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   }
 
   def remoteLogEnabled(): Boolean = {
-    UnifiedLog.isRemoteLogEnabled(remoteStorageSystemEnable, config, topicPartition.topic())
+    JUnifiedLog.isRemoteLogEnabled(remoteStorageSystemEnable, config, topicPartition.topic())
   }
 
   /**
@@ -425,15 +422,14 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   private[log] def newMetrics(): Unit = {
     val tags = (Map("topic" -> topicPartition.topic, "partition" -> topicPartition.partition.toString) ++
       (if (isFuture) Map("is-future" -> "true") else Map.empty)).asJava
-    metricsGroup.newGauge(LogMetricNames.NumLogSegments, () => numberOfSegments, tags)
-    metricsGroup.newGauge(LogMetricNames.LogStartOffset, () => logStartOffset, tags)
-    metricsGroup.newGauge(LogMetricNames.LogEndOffset, () => logEndOffset, tags)
-    metricsGroup.newGauge(LogMetricNames.Size, () => size, tags)
-    metricNames = Map(LogMetricNames.NumLogSegments -> tags,
-      LogMetricNames.LogStartOffset -> tags,
-      LogMetricNames.LogEndOffset -> tags,
-      LogMetricNames.Size -> tags)
-
+    metricsGroup.newGauge(LogMetricNames.NUM_LOG_SEGMENTS, () => numberOfSegments, tags)
+    metricsGroup.newGauge(LogMetricNames.LOG_START_OFFSET, () => logStartOffset, tags)
+    metricsGroup.newGauge(LogMetricNames.LOG_END_OFFSET, () => logEndOffset, tags)
+    metricsGroup.newGauge(LogMetricNames.SIZE, () => size, tags)
+    metricNames = Map(LogMetricNames.NUM_LOG_SEGMENTS -> tags,
+      LogMetricNames.LOG_START_OFFSET -> tags,
+      LogMetricNames.LOG_END_OFFSET -> tags,
+      LogMetricNames.SIZE -> tags)
   }
 
   val producerExpireCheck: ScheduledFuture[_] = scheduler.schedule("PeriodicProducerExpirationCheck", () => removeExpiredProducers(time.milliseconds),
@@ -485,8 +481,8 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   }
 
   private def reinitializeLeaderEpochCache(): Unit = lock synchronized {
-    leaderEpochCache = UnifiedLog.createLeaderEpochCache(
-      dir, topicPartition, logDirFailureChannel, Option.apply(leaderEpochCache), scheduler)
+    leaderEpochCache = JUnifiedLog.createLeaderEpochCache(
+      dir, topicPartition, logDirFailureChannel, Optional.of(leaderEpochCache), scheduler)
   }
 
   private def updateHighWatermarkWithLogEndOffset(): Unit = {
@@ -817,7 +813,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
               // we may still be able to recover if the log is empty
               // one example: fetching from log start offset on the leader which is not batch aligned,
               // which may happen as a result of AdminClient#deleteRecords()
-              val hasFirstOffset = appendInfo.firstOffset != UnifiedLog.UnknownOffset
+              val hasFirstOffset = appendInfo.firstOffset != JUnifiedLog.UNKNOWN_OFFSET
               val firstOffset = if (hasFirstOffset) appendInfo.firstOffset else records.batches.iterator().next().baseOffset()
 
               val firstOrLast = if (hasFirstOffset) "First offset" else "Last offset of the first batch"
@@ -1081,7 +1077,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
                                         requireOffsetsMonotonic: Boolean,
                                         leaderEpoch: Int): LogAppendInfo = {
     var validBytesCount = 0
-    var firstOffset = UnifiedLog.UnknownOffset
+    var firstOffset = JUnifiedLog.UNKNOWN_OFFSET
     var lastOffset = -1L
     var lastLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH
     var sourceCompression = CompressionType.NONE
@@ -1267,6 +1263,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
       } else if (targetTimestamp == ListOffsetsRequest.LATEST_TIMESTAMP) {
         val latestEpoch = leaderEpochCache.latestEpoch()
         val epoch = if (latestEpoch.isPresent) Optional.of[Integer](latestEpoch.getAsInt) else Optional.empty[Integer]()
+
         new OffsetResultHolder(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, logEndOffset, epoch))
       } else if (targetTimestamp == ListOffsetsRequest.LATEST_TIERED_TIMESTAMP) {
         if (remoteLogEnabled()) {
@@ -1481,7 +1478,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   }
 
   private def deleteRetentionMsBreachedSegments(): Int = {
-    val retentionMs = localRetentionMs(config, remoteLogEnabledAndRemoteCopyEnabled())
+    val retentionMs = JUnifiedLog.localRetentionMs(config, remoteLogEnabledAndRemoteCopyEnabled())
     if (retentionMs < 0) return 0
     val startMs = time.milliseconds
 
@@ -1495,7 +1492,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   }
 
   private def deleteRetentionSizeBreachedSegments(): Int = {
-    val retentionSize: Long = localRetentionSize(config, remoteLogEnabledAndRemoteCopyEnabled())
+    val retentionSize: Long = JUnifiedLog.localRetentionSize(config, remoteLogEnabledAndRemoteCopyEnabled())
     if (retentionSize < 0 || size < retentionSize) return 0
     var diff = size - retentionSize
     def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]): Boolean = {
@@ -1535,7 +1532,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
    * The log size in bytes for all segments that are only in local log but not yet in remote log.
    */
   def onlyLocalLogSegmentsSize: Long =
-    UnifiedLog.sizeInBytes(logSegments.stream.filter(_.baseOffset >= highestOffsetInRemoteStorage()).collect(Collectors.toList[LogSegment]))
+    LogSegments.sizeInBytes(logSegments.stream.filter(_.baseOffset >= highestOffsetInRemoteStorage()).collect(Collectors.toList[LogSegment]))
 
   /**
    * The number of segments that are only in local log but not yet in remote log.
@@ -1591,7 +1588,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
         Note that this is only required for pre-V2 message formats because these do not store the first message offset
         in the header.
       */
-      val rollOffset = if (appendInfo.firstOffset == UnifiedLog.UnknownOffset)
+      val rollOffset = if (appendInfo.firstOffset == JUnifiedLog.UNKNOWN_OFFSET)
         maxOffsetInMessages - Integer.MAX_VALUE
       else
         appendInfo.firstOffset
@@ -1828,9 +1825,9 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   private[log] def replaceSegments(newSegments: Seq[LogSegment], oldSegments: Seq[LogSegment]): Unit = {
     lock synchronized {
       localLog.checkIfMemoryMappedBufferClosed()
-      val deletedSegments = UnifiedLog.replaceSegments(localLog.segments, newSegments, oldSegments, dir, topicPartition,
-        config, scheduler, logDirFailureChannel, logIdent)
-      deleteProducerSnapshots(deletedSegments.toList.asJava, asyncDelete = true)
+      val deletedSegments = LocalLog.replaceSegments(localLog.segments, newSegments.asJava, oldSegments.asJava, dir, topicPartition,
+        config, scheduler, logDirFailureChannel, logIdent, false)
+      deleteProducerSnapshots(deletedSegments, asyncDelete = true)
     }
   }
 
@@ -1867,7 +1864,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   }
 
   private[log] def splitOverflowedSegment(segment: LogSegment): List[LogSegment] = lock synchronized {
-    val result = UnifiedLog.splitOverflowedSegment(segment, localLog.segments, dir, topicPartition, config, scheduler, logDirFailureChannel, logIdent)
+    val result = LocalLog.splitOverflowedSegment(segment, localLog.segments, dir, topicPartition, config, scheduler, logDirFailureChannel, logIdent)
     deleteProducerSnapshots(result.deletedSegments, asyncDelete = true)
     result.newSegments.asScala.toList
   }
@@ -1878,34 +1875,6 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 }
 
 object UnifiedLog extends Logging {
-  val LogFileSuffix: String = LogFileUtils.LOG_FILE_SUFFIX
-
-  val IndexFileSuffix: String = LogFileUtils.INDEX_FILE_SUFFIX
-
-  val TimeIndexFileSuffix: String = LogFileUtils.TIME_INDEX_FILE_SUFFIX
-
-  val TxnIndexFileSuffix: String = LogFileUtils.TXN_INDEX_FILE_SUFFIX
-
-  val CleanedFileSuffix: String = LogFileUtils.CLEANED_FILE_SUFFIX
-
-  val SwapFileSuffix: String = LogFileUtils.SWAP_FILE_SUFFIX
-
-  val DeleteDirSuffix: String = LogFileUtils.DELETE_DIR_SUFFIX
-
-  val StrayDirSuffix: String = LogFileUtils.STRAY_DIR_SUFFIX
-
-  val UnknownOffset: Long = LocalLog.UNKNOWN_OFFSET
-
-  def isRemoteLogEnabled(remoteStorageSystemEnable: Boolean,
-                         config: LogConfig,
-                         topic: String): Boolean = {
-    // Remote log is enabled only for non-compact and non-internal topics
-    remoteStorageSystemEnable &&
-      !(config.compact || Topic.isInternal(topic)
-        || TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_NAME.equals(topic)
-        || Topic.CLUSTER_METADATA_TOPIC_NAME.equals(topic)) &&
-      config.remoteStorageEnable()
-  }
 
   def apply(dir: File,
             config: LogConfig,
@@ -1925,20 +1894,20 @@ object UnifiedLog extends Logging {
             logOffsetsListener: LogOffsetsListener = LogOffsetsListener.NO_OP_OFFSETS_LISTENER): UnifiedLog = {
     // create the log directory if it doesn't exist
     Files.createDirectories(dir.toPath)
-    val topicPartition = UnifiedLog.parseTopicPartitionName(dir)
+    val topicPartition = JUnifiedLog.parseTopicPartitionName(dir)
     val segments = new LogSegments(topicPartition)
     // The created leaderEpochCache will be truncated by LogLoader if necessary
     // so it is guaranteed that the epoch entries will be correct even when on-disk
     // checkpoint was stale (due to async nature of LeaderEpochFileCache#truncateFromStart/End).
-    val leaderEpochCache = UnifiedLog.createLeaderEpochCache(
+    val leaderEpochCache = JUnifiedLog.createLeaderEpochCache(
       dir,
       topicPartition,
       logDirFailureChannel,
-      None,
+      Optional.empty,
       scheduler)
     val producerStateManager = new ProducerStateManager(topicPartition, dir,
       maxTransactionTimeoutMs, producerStateManagerConfig, time)
-    val isRemoteLogEnabled = UnifiedLog.isRemoteLogEnabled(remoteStorageSystemEnable, config, topicPartition.topic)
+    val isRemoteLogEnabled = JUnifiedLog.isRemoteLogEnabled(remoteStorageSystemEnable, config, topicPartition.topic)
     val offsets = new LogLoader(
       dir,
       topicPartition,
@@ -1968,108 +1937,6 @@ object UnifiedLog extends Logging {
       logOffsetsListener)
   }
 
-  def logDeleteDirName(topicPartition: TopicPartition): String = LocalLog.logDeleteDirName(topicPartition)
-
-  def logFutureDirName(topicPartition: TopicPartition): String = LocalLog.logFutureDirName(topicPartition)
-
-  def logStrayDirName(topicPartition: TopicPartition): String = LocalLog.logStrayDirName(topicPartition)
-
-  def logDirName(topicPartition: TopicPartition): String = LocalLog.logDirName(topicPartition)
-
-  def transactionIndexFile(dir: File, offset: Long, suffix: String = ""): File = LogFileUtils.transactionIndexFile(dir, offset, suffix)
-
-  def offsetFromFile(file: File): Long = LogFileUtils.offsetFromFile(file)
-
-  def sizeInBytes(segments: util.Collection[LogSegment]): Long = LogSegments.sizeInBytes(segments)
-
-  def parseTopicPartitionName(dir: File): TopicPartition = LocalLog.parseTopicPartitionName(dir)
-
-  /**
-   * Create a new LeaderEpochFileCache instance and load the epoch entries from the backing checkpoint file or
-   * the provided currentCache (if not empty).
-   *
-   * @param dir                  The directory in which the log will reside
-   * @param topicPartition       The topic partition
-   * @param logDirFailureChannel The LogDirFailureChannel to asynchronously handle log dir failure
-   * @param currentCache         The current LeaderEpochFileCache instance (if any)
-   * @param scheduler            The scheduler for executing asynchronous tasks
-   * @return The new LeaderEpochFileCache instance (if created), none otherwise
-   */
-  def createLeaderEpochCache(dir: File,
-                             topicPartition: TopicPartition,
-                             logDirFailureChannel: LogDirFailureChannel,
-                             currentCache: Option[LeaderEpochFileCache],
-                             scheduler: Scheduler): LeaderEpochFileCache = {
-    val leaderEpochFile = LeaderEpochCheckpointFile.newFile(dir)
-    val checkpointFile = new LeaderEpochCheckpointFile(leaderEpochFile, logDirFailureChannel)
-    currentCache.map(_.withCheckpoint(checkpointFile)).getOrElse(new LeaderEpochFileCache(topicPartition, checkpointFile, scheduler))
-  }
-
-  private[log] def replaceSegments(existingSegments: LogSegments,
-                                   newSegments: Seq[LogSegment],
-                                   oldSegments: Seq[LogSegment],
-                                   dir: File,
-                                   topicPartition: TopicPartition,
-                                   config: LogConfig,
-                                   scheduler: Scheduler,
-                                   logDirFailureChannel: LogDirFailureChannel,
-                                   logPrefix: String,
-                                   isRecoveredSwapFile: Boolean = false): Iterable[LogSegment] = {
-    LocalLog.replaceSegments(existingSegments,
-      newSegments.asJava,
-      oldSegments.asJava,
-      dir,
-      topicPartition,
-      config,
-      scheduler,
-      logDirFailureChannel,
-      logPrefix,
-      isRecoveredSwapFile).asScala
-  }
-
-  private[log] def splitOverflowedSegment(segment: LogSegment,
-                                          existingSegments: LogSegments,
-                                          dir: File,
-                                          topicPartition: TopicPartition,
-                                          config: LogConfig,
-                                          scheduler: Scheduler,
-                                          logDirFailureChannel: LogDirFailureChannel,
-                                          logPrefix: String): SplitSegmentResult = {
-    LocalLog.splitOverflowedSegment(segment, existingSegments, dir, topicPartition, config, scheduler, logDirFailureChannel, logPrefix)
-  }
-
-  private[log] def createNewCleanedSegment(dir: File, logConfig: LogConfig, baseOffset: Long): LogSegment = {
-    LocalLog.createNewCleanedSegment(dir, logConfig, baseOffset)
-  }
-
-  // Visible for benchmarking
-  def newValidatorMetricsRecorder(allTopicsStats: BrokerTopicMetrics): LogValidator.MetricsRecorder = {
-    new LogValidator.MetricsRecorder {
-      def recordInvalidMagic(): Unit =
-        allTopicsStats.invalidMagicNumberRecordsPerSec.mark()
-
-      def recordInvalidOffset(): Unit =
-        allTopicsStats.invalidOffsetOrSequenceRecordsPerSec.mark()
-
-      def recordInvalidSequence(): Unit =
-        allTopicsStats.invalidOffsetOrSequenceRecordsPerSec.mark()
-
-      def recordInvalidChecksums(): Unit =
-        allTopicsStats.invalidMessageCrcRecordsPerSec.mark()
-
-      def recordNoKeyCompactedTopic(): Unit =
-        allTopicsStats.noKeyCompactedTopicRecordsPerSec.mark()
-    }
-  }
-
-  private[log] def localRetentionMs(config: LogConfig, remoteLogEnabledAndRemoteCopyEnabled: Boolean): Long = {
-    if (remoteLogEnabledAndRemoteCopyEnabled) config.localRetentionMs else config.retentionMs
-  }
-
-  private[log] def localRetentionSize(config: LogConfig, remoteLogEnabledAndRemoteCopyEnabled: Boolean): Long = {
-    if (remoteLogEnabledAndRemoteCopyEnabled) config.localRetentionBytes else config.retentionSize
-  }
-
   /**
    * Wraps the value of iterator.next() in an option.
    * Note: this facility is a part of the Iterator class starting from scala v2.13.
@@ -2087,20 +1954,9 @@ object UnifiedLog extends Logging {
 
 }
 
-object LogMetricNames {
-  val NumLogSegments: String = "NumLogSegments"
-  val LogStartOffset: String = "LogStartOffset"
-  val LogEndOffset: String = "LogEndOffset"
-  val Size: String = "Size"
-
-  def allMetricNames: List[String] = {
-    List(NumLogSegments, LogStartOffset, LogEndOffset, Size)
-  }
-}
-
 case class RetentionMsBreach(log: UnifiedLog, remoteLogEnabledAndRemoteCopyEnabled: Boolean) extends SegmentDeletionReason {
   override def logReason(toDelete: util.List[LogSegment]): Unit = {
-    val retentionMs = UnifiedLog.localRetentionMs(log.config, remoteLogEnabledAndRemoteCopyEnabled)
+    val retentionMs = JUnifiedLog.localRetentionMs(log.config, remoteLogEnabledAndRemoteCopyEnabled)
     toDelete.forEach { segment =>
       if (segment.largestRecordTimestamp.isPresent)
         if (remoteLogEnabledAndRemoteCopyEnabled)
@@ -2126,7 +1982,7 @@ case class RetentionSizeBreach(log: UnifiedLog, remoteLogEnabledAndRemoteCopyEna
     var size = log.size
     toDelete.forEach { segment =>
       size -= segment.size
-      if (remoteLogEnabledAndRemoteCopyEnabled) log.info(s"Deleting segment $segment due to local log retention size ${UnifiedLog.localRetentionSize(log.config, remoteLogEnabledAndRemoteCopyEnabled)} breach. " +
+      if (remoteLogEnabledAndRemoteCopyEnabled) log.info(s"Deleting segment $segment due to local log retention size ${JUnifiedLog.localRetentionSize(log.config, remoteLogEnabledAndRemoteCopyEnabled)} breach. " +
         s"Local log size after deletion will be $size.")
       else log.info(s"Deleting segment $segment due to log retention size ${log.config.retentionSize} breach. Log size " +
         s"after deletion will be $size.")
diff --git a/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala b/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
index cd3f1db2d98..3f6c2044df5 100644
--- a/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
+++ b/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
@@ -41,7 +41,7 @@ import org.apache.kafka.snapshot.RawSnapshotWriter
 import org.apache.kafka.snapshot.SnapshotPath
 import org.apache.kafka.snapshot.Snapshots
 import org.apache.kafka.storage.internals
-import org.apache.kafka.storage.internals.log.{AppendOrigin, LogConfig, LogDirFailureChannel, LogStartOffsetIncrementReason, ProducerStateManagerConfig}
+import org.apache.kafka.storage.internals.log.{AppendOrigin, LogConfig, LogDirFailureChannel, LogStartOffsetIncrementReason, ProducerStateManagerConfig, UnifiedLog => JUnifiedLog}
 import org.apache.kafka.storage.log.metrics.BrokerTopicStats
 
 import java.io.File
@@ -109,7 +109,7 @@ final class KafkaMetadataLog private (
   }
 
   private def handleAndConvertLogAppendInfo(appendInfo: internals.log.LogAppendInfo): LogAppendInfo = {
-    if (appendInfo.firstOffset != UnifiedLog.UnknownOffset)
+    if (appendInfo.firstOffset != JUnifiedLog.UNKNOWN_OFFSET)
       new LogAppendInfo(appendInfo.firstOffset, appendInfo.lastOffset)
     else
       throw new KafkaException(s"Append failed unexpectedly")
diff --git a/core/src/main/scala/kafka/raft/RaftManager.scala b/core/src/main/scala/kafka/raft/RaftManager.scala
index 5eb05e3e288..9ff75b126ad 100644
--- a/core/src/main/scala/kafka/raft/RaftManager.scala
+++ b/core/src/main/scala/kafka/raft/RaftManager.scala
@@ -25,7 +25,6 @@ import java.util.concurrent.CompletableFuture
 import java.util.{Map => JMap}
 import java.util.{Collection => JCollection}
 import kafka.log.LogManager
-import kafka.log.UnifiedLog
 import kafka.server.KafkaConfig
 import kafka.utils.CoreUtils
 import kafka.utils.Logging
@@ -49,6 +48,7 @@ import org.apache.kafka.server.common.serialization.RecordSerde
 import org.apache.kafka.server.util.{FileLock, KafkaScheduler}
 import org.apache.kafka.server.fault.FaultHandler
 import org.apache.kafka.server.util.timer.SystemTimer
+import org.apache.kafka.storage.internals.log.UnifiedLog
 
 import scala.jdk.CollectionConverters._
 import scala.jdk.OptionConverters._
diff --git a/core/src/main/scala/kafka/server/KafkaRaftServer.scala b/core/src/main/scala/kafka/server/KafkaRaftServer.scala
index e41377e945f..998fa4410c5 100644
--- a/core/src/main/scala/kafka/server/KafkaRaftServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaRaftServer.scala
@@ -18,7 +18,6 @@ package kafka.server
 
 import java.io.File
 import java.util.concurrent.CompletableFuture
-import kafka.log.UnifiedLog
 import kafka.utils.{CoreUtils, Logging, Mx4jLoader}
 import org.apache.kafka.common.config.{ConfigDef, ConfigResource}
 import org.apache.kafka.common.internals.Topic
@@ -31,7 +30,7 @@ import org.apache.kafka.metadata.properties.{MetaProperties, MetaPropertiesEnsem
 import org.apache.kafka.raft.QuorumConfig
 import org.apache.kafka.server.{ProcessRole, ServerSocketFactory}
 import org.apache.kafka.server.config.ServerTopicConfigSynonyms
-import org.apache.kafka.storage.internals.log.LogConfig
+import org.apache.kafka.storage.internals.log.{LogConfig, UnifiedLog}
 import org.slf4j.Logger
 
 import java.util
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 4cb9ce35f64..48960e8d08f 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -58,7 +58,7 @@ import org.apache.kafka.server.share.fetch.{DelayedShareFetchKey, DelayedShareFe
 import org.apache.kafka.server.storage.log.{FetchParams, FetchPartitionData}
 import org.apache.kafka.server.util.{Scheduler, ShutdownableThread}
 import org.apache.kafka.storage.internals.checkpoint.{LazyOffsetCheckpoints, OffsetCheckpointFile, OffsetCheckpoints}
-import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, LeaderHwChange, LogAppendInfo, LogConfig, LogDirFailureChannel, LogOffsetMetadata, LogReadInfo, OffsetResultHolder, RecordValidationException, RemoteLogReadResult, RemoteStorageFetchInfo, VerificationGuard}
+import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, LeaderHwChange, LogAppendInfo, LogConfig, LogDirFailureChannel, LogOffsetMetadata, LogReadInfo, OffsetResultHolder, RecordValidationException, RemoteLogReadResult, RemoteStorageFetchInfo, UnifiedLog => JUnifiedLog, VerificationGuard}
 import org.apache.kafka.storage.log.metrics.BrokerTopicStats
 
 import java.io.File
@@ -244,10 +244,10 @@ object ReplicaManager {
   def createLogReadResult(e: Throwable): LogReadResult = {
     LogReadResult(info = new FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, MemoryRecords.EMPTY),
       divergingEpoch = None,
-      highWatermark = UnifiedLog.UnknownOffset,
-      leaderLogStartOffset = UnifiedLog.UnknownOffset,
-      leaderLogEndOffset = UnifiedLog.UnknownOffset,
-      followerLogStartOffset = UnifiedLog.UnknownOffset,
+      highWatermark = JUnifiedLog.UNKNOWN_OFFSET,
+      leaderLogStartOffset = JUnifiedLog.UNKNOWN_OFFSET,
+      leaderLogEndOffset = JUnifiedLog.UNKNOWN_OFFSET,
+      followerLogStartOffset = JUnifiedLog.UNKNOWN_OFFSET,
       fetchTimeMs = -1L,
       lastStableOffset = None,
       exception = Some(e))
@@ -1084,7 +1084,7 @@ class ReplicaManager(val config: KafkaConfig,
           /* If the topic name is exceptionally long, we can't support altering the log directory.
            * See KAFKA-4893 for details.
            * TODO: fix this by implementing topic IDs. */
-          if (UnifiedLog.logFutureDirName(topicPartition).length > 255)
+          if (JUnifiedLog.logFutureDirName(topicPartition).length > 255)
             throw new InvalidTopicException("The topic name is too long.")
           if (!logManager.isLogDirOnline(destinationDir))
             throw new KafkaStorageException(s"Log directory $destinationDir is offline")
@@ -1746,10 +1746,10 @@ class ReplicaManager(val config: KafkaConfig,
 
           LogReadResult(info = new FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, MemoryRecords.EMPTY),
             divergingEpoch = None,
-            highWatermark = UnifiedLog.UnknownOffset,
-            leaderLogStartOffset = UnifiedLog.UnknownOffset,
-            leaderLogEndOffset = UnifiedLog.UnknownOffset,
-            followerLogStartOffset = UnifiedLog.UnknownOffset,
+            highWatermark = JUnifiedLog.UNKNOWN_OFFSET,
+            leaderLogStartOffset = JUnifiedLog.UNKNOWN_OFFSET,
+            leaderLogEndOffset = JUnifiedLog.UNKNOWN_OFFSET,
+            followerLogStartOffset = JUnifiedLog.UNKNOWN_OFFSET,
             fetchTimeMs = -1L,
             lastStableOffset = None,
             exception = Some(e)
diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
index 09b8eab3d48..68545e0271c 100755
--- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
+++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
@@ -21,7 +21,6 @@ import com.fasterxml.jackson.databind.JsonNode
 
 import java.io._
 import com.fasterxml.jackson.databind.node.{IntNode, JsonNodeFactory, ObjectNode, TextNode}
-import kafka.log._
 import kafka.utils.CoreUtils
 import org.apache.kafka.clients.consumer.internals.ConsumerProtocol
 import org.apache.kafka.common.message.ConsumerProtocolAssignment
@@ -49,7 +48,7 @@ import org.apache.kafka.metadata.bootstrap.BootstrapDirectory
 import org.apache.kafka.snapshot.Snapshots
 import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde
 import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
-import org.apache.kafka.storage.internals.log.{CorruptSnapshotException, LogFileUtils, OffsetIndex, ProducerStateManager, TimeIndex, TransactionIndex}
+import org.apache.kafka.storage.internals.log.{CorruptSnapshotException, LogFileUtils, OffsetIndex, ProducerStateManager, TimeIndex, TransactionIndex, UnifiedLog => JUnifiedLog}
 import org.apache.kafka.tools.api.{Decoder, StringDecoder}
 
 import java.nio.ByteBuffer
@@ -77,16 +76,16 @@ object DumpLogSegments {
       val filename = file.getName
       val suffix = filename.substring(filename.lastIndexOf("."))
       suffix match {
-        case UnifiedLog.LogFileSuffix | Snapshots.SUFFIX =>
+        case JUnifiedLog.LOG_FILE_SUFFIX | Snapshots.SUFFIX =>
           dumpLog(file, opts.shouldPrintDataLog, nonConsecutivePairsForLogFilesMap, opts.isDeepIteration,
             opts.messageParser, opts.skipRecordMetadata, opts.maxBytes)
-        case UnifiedLog.IndexFileSuffix =>
+        case JUnifiedLog.INDEX_FILE_SUFFIX =>
           dumpIndex(file, opts.indexSanityOnly, opts.verifyOnly, misMatchesForIndexFilesMap, opts.maxMessageSize)
-        case UnifiedLog.TimeIndexFileSuffix =>
+        case JUnifiedLog.TIME_INDEX_FILE_SUFFIX =>
           dumpTimeIndex(file, opts.indexSanityOnly, opts.verifyOnly, timeIndexDumpErrors)
         case LogFileUtils.PRODUCER_SNAPSHOT_FILE_SUFFIX =>
           dumpProducerIdSnapshot(file)
-        case UnifiedLog.TxnIndexFileSuffix =>
+        case JUnifiedLog.TXN_INDEX_FILE_SUFFIX =>
           dumpTxnIndex(file)
         case _ =>
           System.err.println(s"Ignoring unknown file $file")
@@ -111,7 +110,7 @@ object DumpLogSegments {
   }
 
   private def dumpTxnIndex(file: File): Unit = {
-    val index = new TransactionIndex(UnifiedLog.offsetFromFile(file), file)
+    val index = new TransactionIndex(JUnifiedLog.offsetFromFile(file), file)
     for (abortedTxn <- index.allAbortedTxns.asScala) {
       println(s"version: ${abortedTxn.version} producerId: ${abortedTxn.producerId} firstOffset: ${abortedTxn.firstOffset} " +
         s"lastOffset: ${abortedTxn.lastOffset} lastStableOffset: ${abortedTxn.lastStableOffset}")
@@ -144,7 +143,7 @@ object DumpLogSegments {
                                misMatchesForIndexFilesMap: mutable.Map[String, List[(Long, Long)]],
                                maxMessageSize: Int): Unit = {
     val startOffset = file.getName.split("\\.")(0).toLong
-    val logFile = new File(file.getAbsoluteFile.getParent, file.getName.split("\\.")(0) + UnifiedLog.LogFileSuffix)
+    val logFile = new File(file.getAbsoluteFile.getParent, file.getName.split("\\.")(0) + JUnifiedLog.LOG_FILE_SUFFIX)
     val fileRecords = FileRecords.open(logFile, false)
     val index = new OffsetIndex(file, startOffset, -1, false)
 
@@ -185,9 +184,9 @@ object DumpLogSegments {
                                    verifyOnly: Boolean,
                                    timeIndexDumpErrors: TimeIndexDumpErrors): Unit = {
     val startOffset = file.getName.split("\\.")(0).toLong
-    val logFile = new File(file.getAbsoluteFile.getParent, file.getName.split("\\.")(0) + UnifiedLog.LogFileSuffix)
+    val logFile = new File(file.getAbsoluteFile.getParent, file.getName.split("\\.")(0) + JUnifiedLog.LOG_FILE_SUFFIX)
     val fileRecords = FileRecords.open(logFile, false)
-    val indexFile = new File(file.getAbsoluteFile.getParent, file.getName.split("\\.")(0) + UnifiedLog.IndexFileSuffix)
+    val indexFile = new File(file.getAbsoluteFile.getParent, file.getName.split("\\.")(0) + JUnifiedLog.INDEX_FILE_SUFFIX)
     val index = new OffsetIndex(indexFile, startOffset, -1, false)
     val timeIndex = new TimeIndex(file, startOffset, -1, false)
 
@@ -268,7 +267,7 @@ object DumpLogSegments {
                       parser: MessageParser[_, _],
                       skipRecordMetadata: Boolean,
                       maxBytes: Int): Unit = {
-    if (file.getName.endsWith(UnifiedLog.LogFileSuffix)) {
+    if (file.getName.endsWith(JUnifiedLog.LOG_FILE_SUFFIX)) {
       val startOffset = file.getName.split("\\.")(0).toLong
       println(s"Log starting offset: $startOffset")
     } else if (file.getName.endsWith(Snapshots.SUFFIX)) {
diff --git a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
index b956c123806..272c78fce84 100644
--- a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
+++ b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
@@ -17,7 +17,6 @@
 package kafka.log.remote;
 
 import kafka.cluster.Partition;
-import kafka.log.UnifiedLog;
 import kafka.server.KafkaConfig;
 
 import org.apache.kafka.common.Endpoint;
@@ -75,6 +74,7 @@ import org.apache.kafka.storage.internals.log.ProducerStateManager;
 import org.apache.kafka.storage.internals.log.RemoteStorageFetchInfo;
 import org.apache.kafka.storage.internals.log.TimeIndex;
 import org.apache.kafka.storage.internals.log.TransactionIndex;
+import org.apache.kafka.storage.internals.log.UnifiedLog;
 import org.apache.kafka.storage.log.metrics.BrokerTopicStats;
 import org.apache.kafka.test.TestUtils;
 
@@ -225,7 +225,7 @@ public class RemoteLogManagerTest {
     private LeaderEpochCheckpointFile checkpoint;
     private final AtomicLong currentLogStartOffset = new AtomicLong(0L);
 
-    private UnifiedLog mockLog = mock(UnifiedLog.class);
+    private kafka.log.UnifiedLog mockLog = mock(kafka.log.UnifiedLog.class);
 
     private final MockScheduler scheduler = new MockScheduler(time);
     private final Properties brokerConfig = kafka.utils.TestUtils.createDummyBrokerConfig();
@@ -259,7 +259,7 @@ public class RemoteLogManagerTest {
                 return Duration.ofMillis(100);
             }
             @Override
-            long findLogStartOffset(TopicIdPartition topicIdPartition, UnifiedLog log) {
+            long findLogStartOffset(TopicIdPartition topicIdPartition, kafka.log.UnifiedLog log) {
                 return 0L;
             }
         };
@@ -990,10 +990,10 @@ public class RemoteLogManagerTest {
             return 0L == argument.getValue();
         }, "Timed out waiting for updateHighestOffsetInRemoteStorage(0) get invoked for dir1 log");
 
-        UnifiedLog oldMockLog = mockLog;
+        kafka.log.UnifiedLog oldMockLog = mockLog;
         Mockito.clearInvocations(oldMockLog);
         // simulate altering log dir completes, and the new partition leader changes to the same broker in different log dir (dir2)
-        mockLog = mock(UnifiedLog.class);
+        mockLog = mock(kafka.log.UnifiedLog.class);
         when(mockLog.parentDir()).thenReturn("dir2");
         when(mockLog.leaderEpochCache()).thenReturn(cache);
         when(mockLog.config()).thenReturn(logConfig);
@@ -1639,16 +1639,16 @@ public class RemoteLogManagerTest {
 
         File tpDir = new File(logDir, tp.toString());
         Files.createDirectory(tpDir.toPath());
-        File txnIdxFile = new File(tpDir, "txn-index" + UnifiedLog.TxnIndexFileSuffix());
+        File txnIdxFile = new File(tpDir, "txn-index" + UnifiedLog.TXN_INDEX_FILE_SUFFIX);
         txnIdxFile.createNewFile();
         when(remoteStorageManager.fetchIndex(any(RemoteLogSegmentMetadata.class), any(IndexType.class)))
                 .thenAnswer(ans -> {
                     RemoteLogSegmentMetadata metadata = ans.getArgument(0);
                     IndexType indexType = ans.getArgument(1);
                     int maxEntries = (int) (metadata.endOffset() - metadata.startOffset());
-                    OffsetIndex offsetIdx = new OffsetIndex(new File(tpDir, metadata.startOffset() + UnifiedLog.IndexFileSuffix()),
+                    OffsetIndex offsetIdx = new OffsetIndex(new File(tpDir, metadata.startOffset() + UnifiedLog.INDEX_FILE_SUFFIX),
                             metadata.startOffset(), maxEntries * 8);
-                    TimeIndex timeIdx = new TimeIndex(new File(tpDir, metadata.startOffset() + UnifiedLog.TimeIndexFileSuffix()),
+                    TimeIndex timeIdx = new TimeIndex(new File(tpDir, metadata.startOffset() + UnifiedLog.TIME_INDEX_FILE_SUFFIX),
                             metadata.startOffset(), maxEntries * 12);
                     switch (indexType) {
                         case OFFSET:
@@ -2041,7 +2041,7 @@ public class RemoteLogManagerTest {
 
     @Test
     public void testCandidateLogSegmentsSkipsActiveSegment() {
-        UnifiedLog log = mock(UnifiedLog.class);
+        kafka.log.UnifiedLog log = mock(kafka.log.UnifiedLog.class);
         LogSegment segment1 = mock(LogSegment.class);
         LogSegment segment2 = mock(LogSegment.class);
         LogSegment activeSegment = mock(LogSegment.class);
@@ -2065,7 +2065,7 @@ public class RemoteLogManagerTest {
 
     @Test
     public void testCandidateLogSegmentsSkipsSegmentsAfterLastStableOffset() {
-        UnifiedLog log = mock(UnifiedLog.class);
+        kafka.log.UnifiedLog log = mock(kafka.log.UnifiedLog.class);
         LogSegment segment1 = mock(LogSegment.class);
         LogSegment segment2 = mock(LogSegment.class);
         LogSegment segment3 = mock(LogSegment.class);
@@ -2335,7 +2335,7 @@ public class RemoteLogManagerTest {
                 return Duration.ofMillis(100);
             }
             @Override
-            long findLogStartOffset(TopicIdPartition topicIdPartition, UnifiedLog log) {
+            long findLogStartOffset(TopicIdPartition topicIdPartition, kafka.log.UnifiedLog log) {
                 return 0L;
             }
         };
@@ -3711,7 +3711,7 @@ public class RemoteLogManagerTest {
                 return Duration.ofMillis(100);
             }
             @Override
-            long findLogStartOffset(TopicIdPartition topicIdPartition, UnifiedLog log) {
+            long findLogStartOffset(TopicIdPartition topicIdPartition, kafka.log.UnifiedLog log) {
                 return 0L;
             }
         };
@@ -3766,7 +3766,7 @@ public class RemoteLogManagerTest {
     private Partition mockPartition(TopicIdPartition topicIdPartition) {
         TopicPartition tp = topicIdPartition.topicPartition();
         Partition partition = mock(Partition.class);
-        UnifiedLog log = mock(UnifiedLog.class);
+        kafka.log.UnifiedLog log = mock(kafka.log.UnifiedLog.class);
         when(partition.topicPartition()).thenReturn(tp);
         when(partition.topic()).thenReturn(tp.topic());
         when(log.remoteLogEnabled()).thenReturn(true);
diff --git a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
index de9ea1a72c0..610f454a55b 100644
--- a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
+++ b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
@@ -17,7 +17,6 @@
 
 package kafka.server
 
-import kafka.log.UnifiedLog
 import kafka.network.SocketServer
 import kafka.server.IntegrationTestUtils.connectAndReceive
 import kafka.utils.TestUtils
@@ -48,6 +47,7 @@ import org.apache.kafka.server.config.{KRaftConfigs, ServerConfigs}
 import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
 import org.apache.kafka.server.quota
 import org.apache.kafka.server.quota.{ClientQuotaCallback, ClientQuotaType}
+import org.apache.kafka.storage.internals.log.UnifiedLog
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{Tag, Test, Timeout}
 import org.junit.jupiter.params.ParameterizedTest
diff --git a/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala b/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
index 285560d3826..5f1752a54a6 100644
--- a/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
+++ b/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
@@ -16,7 +16,6 @@
  */
 package kafka.raft
 
-import kafka.log.UnifiedLog
 import kafka.server.{KafkaConfig, KafkaRaftServer}
 import kafka.utils.TestUtils
 import org.apache.kafka.common.compress.Compression
@@ -31,7 +30,7 @@ import org.apache.kafka.server.common.serialization.RecordSerde
 import org.apache.kafka.server.config.{KRaftConfigs, ServerLogConfigs}
 import org.apache.kafka.server.util.MockTime
 import org.apache.kafka.snapshot.{FileRawSnapshotWriter, RawSnapshotReader, RawSnapshotWriter, SnapshotPath, Snapshots}
-import org.apache.kafka.storage.internals.log.{LogConfig, LogStartOffsetIncrementReason}
+import org.apache.kafka.storage.internals.log.{LogConfig, LogStartOffsetIncrementReason, UnifiedLog}
 import org.apache.kafka.test.TestUtils.assertOptional
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
index e496f88449d..86b72fab047 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
@@ -39,7 +39,7 @@ import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams}
 import org.apache.kafka.server.util.MockTime
 import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpoints
 import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
-import org.apache.kafka.storage.internals.log.{AppendOrigin, CleanerConfig, LocalLog, LogAppendInfo, LogConfig, LogDirFailureChannel, LogLoader, LogSegments, ProducerStateManager, ProducerStateManagerConfig, VerificationGuard}
+import org.apache.kafka.storage.internals.log.{AppendOrigin, CleanerConfig, LocalLog, LogAppendInfo, LogConfig, LogDirFailureChannel, LogLoader, LogSegments, ProducerStateManager, ProducerStateManagerConfig, UnifiedLog => JUnifiedLog, VerificationGuard}
 import org.apache.kafka.storage.log.metrics.BrokerTopicStats
 import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
@@ -298,8 +298,8 @@ class PartitionLockTest extends Logging {
         val log = super.createLog(isNew, isFutureReplica, offsetCheckpoints, None, None)
         val logDirFailureChannel = new LogDirFailureChannel(1)
         val segments = new LogSegments(log.topicPartition)
-        val leaderEpochCache = UnifiedLog.createLeaderEpochCache(
-          log.dir, log.topicPartition, logDirFailureChannel, None, mockTime.scheduler)
+        val leaderEpochCache = JUnifiedLog.createLeaderEpochCache(
+          log.dir, log.topicPartition, logDirFailureChannel, Optional.empty, mockTime.scheduler)
         val maxTransactionTimeout = 5 * 60 * 1000
         val producerStateManagerConfig = new ProducerStateManagerConfig(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT, false)
         val producerStateManager = new ProducerStateManager(
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
index a79311eef01..5d028c7199e 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -60,7 +60,7 @@ import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams, Unexpec
 import org.apache.kafka.server.util.{KafkaScheduler, MockTime}
 import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpoints
 import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
-import org.apache.kafka.storage.internals.log.{AppendOrigin, CleanerConfig, EpochEntry, LocalLog, LogAppendInfo, LogDirFailureChannel, LogLoader, LogOffsetMetadata, LogReadInfo, LogSegments, LogStartOffsetIncrementReason, ProducerStateManager, ProducerStateManagerConfig, VerificationGuard}
+import org.apache.kafka.storage.internals.log.{AppendOrigin, CleanerConfig, EpochEntry, LocalLog, LogAppendInfo, LogDirFailureChannel, LogLoader, LogOffsetMetadata, LogReadInfo, LogSegments, LogStartOffsetIncrementReason, ProducerStateManager, ProducerStateManagerConfig, UnifiedLog => JUnifiedLog, VerificationGuard}
 import org.apache.kafka.storage.log.metrics.BrokerTopicStats
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.ValueSource
@@ -445,8 +445,8 @@ class PartitionTest extends AbstractPartitionTest {
         val log = super.createLog(isNew, isFutureReplica, offsetCheckpoints, None, None)
         val logDirFailureChannel = new LogDirFailureChannel(1)
         val segments = new LogSegments(log.topicPartition)
-        val leaderEpochCache = UnifiedLog.createLeaderEpochCache(
-          log.dir, log.topicPartition, logDirFailureChannel, None, time.scheduler)
+        val leaderEpochCache = JUnifiedLog.createLeaderEpochCache(
+          log.dir, log.topicPartition, logDirFailureChannel, Optional.empty, time.scheduler)
         val maxTransactionTimeoutMs = 5 * 60 * 1000
         val producerStateManagerConfig = new ProducerStateManagerConfig(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT, true)
         val producerStateManager = new ProducerStateManager(
@@ -1336,8 +1336,8 @@ class PartitionTest extends AbstractPartitionTest {
 
     assertReplicaState(partition, remoteBrokerId,
       lastCaughtUpTimeMs = initializeTimeMs,
-      logStartOffset = UnifiedLog.UnknownOffset,
-      logEndOffset = UnifiedLog.UnknownOffset
+      logStartOffset = JUnifiedLog.UNKNOWN_OFFSET,
+      logEndOffset = JUnifiedLog.UNKNOWN_OFFSET
     )
 
     time.sleep(500)
@@ -1435,8 +1435,8 @@ class PartitionTest extends AbstractPartitionTest {
 
     assertReplicaState(partition, remoteBrokerId,
       lastCaughtUpTimeMs = 0L,
-      logStartOffset = UnifiedLog.UnknownOffset,
-      logEndOffset = UnifiedLog.UnknownOffset
+      logStartOffset = JUnifiedLog.UNKNOWN_OFFSET,
+      logEndOffset = JUnifiedLog.UNKNOWN_OFFSET
     )
 
     fetchFollower(partition, replicaId = remoteBrokerId, fetchOffset = 10L)
@@ -1487,8 +1487,8 @@ class PartitionTest extends AbstractPartitionTest {
 
     assertReplicaState(partition, remoteBrokerId,
       lastCaughtUpTimeMs = 0L,
-      logStartOffset = UnifiedLog.UnknownOffset,
-      logEndOffset = UnifiedLog.UnknownOffset
+      logStartOffset = JUnifiedLog.UNKNOWN_OFFSET,
+      logEndOffset = JUnifiedLog.UNKNOWN_OFFSET
     )
 
     fetchFollower(partition, replicaId = remoteBrokerId, fetchOffset = 3L)
@@ -1551,8 +1551,8 @@ class PartitionTest extends AbstractPartitionTest {
 
     assertReplicaState(partition, remoteBrokerId,
       lastCaughtUpTimeMs = 0L,
-      logStartOffset = UnifiedLog.UnknownOffset,
-      logEndOffset = UnifiedLog.UnknownOffset
+      logStartOffset = JUnifiedLog.UNKNOWN_OFFSET,
+      logEndOffset = JUnifiedLog.UNKNOWN_OFFSET
     )
 
     fetchFollower(partition, replicaId = remoteBrokerId, fetchOffset = 10L)
@@ -2144,8 +2144,8 @@ class PartitionTest extends AbstractPartitionTest {
 
     assertReplicaState(partition, remoteBrokerId2,
       lastCaughtUpTimeMs = initializeTimeMs,
-      logStartOffset = UnifiedLog.UnknownOffset,
-      logEndOffset = UnifiedLog.UnknownOffset
+      logStartOffset = JUnifiedLog.UNKNOWN_OFFSET,
+      logEndOffset = JUnifiedLog.UNKNOWN_OFFSET
     )
 
     // On initialization, the replica is considered caught up and should not be removed
@@ -2263,8 +2263,8 @@ class PartitionTest extends AbstractPartitionTest {
 
     assertReplicaState(partition, remoteBrokerId,
       lastCaughtUpTimeMs = initializeTimeMs,
-      logStartOffset = UnifiedLog.UnknownOffset,
-      logEndOffset = UnifiedLog.UnknownOffset
+      logStartOffset = JUnifiedLog.UNKNOWN_OFFSET,
+      logEndOffset = JUnifiedLog.UNKNOWN_OFFSET
     )
 
     // Shrink the ISR
@@ -2322,8 +2322,8 @@ class PartitionTest extends AbstractPartitionTest {
 
     assertReplicaState(partition, remoteBrokerId,
       lastCaughtUpTimeMs = initializeTimeMs,
-      logStartOffset = UnifiedLog.UnknownOffset,
-      logEndOffset = UnifiedLog.UnknownOffset
+      logStartOffset = JUnifiedLog.UNKNOWN_OFFSET,
+      logEndOffset = JUnifiedLog.UNKNOWN_OFFSET
     )
 
     // There is a short delay before the first fetch. The follower is not yet caught up to the log end.
@@ -2382,8 +2382,8 @@ class PartitionTest extends AbstractPartitionTest {
 
     assertReplicaState(partition, remoteBrokerId,
       lastCaughtUpTimeMs = initializeTimeMs,
-      logStartOffset = UnifiedLog.UnknownOffset,
-      logEndOffset = UnifiedLog.UnknownOffset
+      logStartOffset = JUnifiedLog.UNKNOWN_OFFSET,
+      logEndOffset = JUnifiedLog.UNKNOWN_OFFSET
     )
 
     // The follower catches up to the log end immediately.
@@ -2429,8 +2429,8 @@ class PartitionTest extends AbstractPartitionTest {
 
     assertReplicaState(partition, remoteBrokerId,
       lastCaughtUpTimeMs = initializeTimeMs,
-      logStartOffset = UnifiedLog.UnknownOffset,
-      logEndOffset = UnifiedLog.UnknownOffset
+      logStartOffset = JUnifiedLog.UNKNOWN_OFFSET,
+      logEndOffset = JUnifiedLog.UNKNOWN_OFFSET
     )
 
     time.sleep(30001)
@@ -2516,8 +2516,8 @@ class PartitionTest extends AbstractPartitionTest {
 
     assertReplicaState(partition, remoteBrokerId,
       lastCaughtUpTimeMs = 0L,
-      logStartOffset = UnifiedLog.UnknownOffset,
-      logEndOffset = UnifiedLog.UnknownOffset
+      logStartOffset = JUnifiedLog.UNKNOWN_OFFSET,
+      logEndOffset = JUnifiedLog.UNKNOWN_OFFSET
     )
 
     // This will attempt to expand the ISR
@@ -3054,8 +3054,8 @@ class PartitionTest extends AbstractPartitionTest {
     // in the ISR.
     assertReplicaState(partition, followerId,
       lastCaughtUpTimeMs = 0L,
-      logStartOffset = UnifiedLog.UnknownOffset,
-      logEndOffset = UnifiedLog.UnknownOffset
+      logStartOffset = JUnifiedLog.UNKNOWN_OFFSET,
+      logEndOffset = JUnifiedLog.UNKNOWN_OFFSET
     )
 
     // Follower fetches and updates its replica state.
diff --git a/core/src/test/scala/unit/kafka/cluster/ReplicaTest.scala b/core/src/test/scala/unit/kafka/cluster/ReplicaTest.scala
index 55a49f31cbf..c74d77c6bc7 100644
--- a/core/src/test/scala/unit/kafka/cluster/ReplicaTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/ReplicaTest.scala
@@ -16,12 +16,11 @@
  */
 package kafka.cluster
 
-import kafka.log.UnifiedLog
 import kafka.server.metadata.KRaftMetadataCache
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.NotLeaderOrFollowerException
 import org.apache.kafka.server.util.MockTime
-import org.apache.kafka.storage.internals.log.LogOffsetMetadata
+import org.apache.kafka.storage.internals.log.{LogOffsetMetadata, UnifiedLog}
 import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertThrows, assertTrue}
 import org.junit.jupiter.api.{BeforeEach, Test}
 import org.mockito.Mockito.{mock, when}
@@ -128,8 +127,8 @@ class ReplicaTest {
   @Test
   def testInitialState(): Unit = {
     assertReplicaState(
-      logStartOffset = UnifiedLog.UnknownOffset,
-      logEndOffset = UnifiedLog.UnknownOffset,
+      logStartOffset = UnifiedLog.UNKNOWN_OFFSET,
+      logEndOffset = UnifiedLog.UNKNOWN_OFFSET,
       lastCaughtUpTimeMs = 0L,
       lastFetchLeaderLogEndOffset = 0L,
       lastFetchTimeMs = 0L,
@@ -243,10 +242,10 @@ class ReplicaTest {
     )
 
     assertReplicaState(
-      logStartOffset = UnifiedLog.UnknownOffset,
-      logEndOffset = UnifiedLog.UnknownOffset,
+      logStartOffset = UnifiedLog.UNKNOWN_OFFSET,
+      logEndOffset = UnifiedLog.UNKNOWN_OFFSET,
       lastCaughtUpTimeMs = resetTimeMs1,
-      lastFetchLeaderLogEndOffset = UnifiedLog.UnknownOffset,
+      lastFetchLeaderLogEndOffset = UnifiedLog.UNKNOWN_OFFSET,
       lastFetchTimeMs = 0L,
       brokerEpoch = Option.empty
     )
@@ -267,10 +266,10 @@ class ReplicaTest {
     )
 
     assertReplicaState(
-      logStartOffset = UnifiedLog.UnknownOffset,
-      logEndOffset = UnifiedLog.UnknownOffset,
+      logStartOffset = UnifiedLog.UNKNOWN_OFFSET,
+      logEndOffset = UnifiedLog.UNKNOWN_OFFSET,
       lastCaughtUpTimeMs = 0L,
-      lastFetchLeaderLogEndOffset = UnifiedLog.UnknownOffset,
+      lastFetchLeaderLogEndOffset = UnifiedLog.UNKNOWN_OFFSET,
       lastFetchTimeMs = 0L,
       brokerEpoch = Option.empty
     )
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
index 155b96f2a3a..d80567dcac6 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
@@ -19,7 +19,7 @@ package kafka.log
 
 import java.io.File
 import java.nio.file.Files
-import java.util.Properties
+import java.util.{Optional, Properties}
 import kafka.utils._
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.compress.Compression
@@ -28,7 +28,7 @@ import org.apache.kafka.common.record._
 import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.coordinator.transaction.TransactionLogConfig
 import org.apache.kafka.server.util.MockTime
-import org.apache.kafka.storage.internals.log.{AppendOrigin, LocalLog, LogConfig, LogDirFailureChannel, LogLoader, LogSegment, LogSegments, LogStartOffsetIncrementReason, ProducerStateManager, ProducerStateManagerConfig}
+import org.apache.kafka.storage.internals.log.{AppendOrigin, LocalLog, LogConfig, LogDirFailureChannel, LogLoader, LogSegment, LogSegments, LogStartOffsetIncrementReason, ProducerStateManager, ProducerStateManagerConfig, UnifiedLog => JUnifiedLog}
 import org.apache.kafka.storage.log.metrics.BrokerTopicStats
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{AfterEach, Test}
@@ -109,8 +109,8 @@ class LogCleanerManagerTest extends Logging {
     val maxTransactionTimeoutMs = 5 * 60 * 1000
     val producerIdExpirationCheckIntervalMs = TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT
     val segments = new LogSegments(tp)
-    val leaderEpochCache = UnifiedLog.createLeaderEpochCache(
-      tpDir, topicPartition, logDirFailureChannel, None, time.scheduler)
+    val leaderEpochCache = JUnifiedLog.createLeaderEpochCache(
+      tpDir, topicPartition, logDirFailureChannel, Optional.empty, time.scheduler)
     val producerStateManager = new ProducerStateManager(topicPartition, tpDir, maxTransactionTimeoutMs, producerStateManagerConfig, time)
     val offsets = new LogLoader(
       tpDir,
@@ -807,7 +807,7 @@ class LogCleanerManagerTest extends Logging {
                         cleanupPolicy: String,
                         topicPartition: TopicPartition = new TopicPartition("log", 0)): UnifiedLog = {
     val config = createLowRetentionLogConfig(segmentSize, cleanupPolicy)
-    val partitionDir = new File(logDir, UnifiedLog.logDirName(topicPartition))
+    val partitionDir = new File(logDir, JUnifiedLog.logDirName(topicPartition))
 
     UnifiedLog(
       dir = partitionDir,
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index 7b22628c382..895d5d64363 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -29,7 +29,7 @@ import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.coordinator.transaction.TransactionLogConfig
 import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics}
 import org.apache.kafka.server.util.MockTime
-import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, CleanerConfig, LocalLog, LogAppendInfo, LogCleaningAbortedException, LogConfig, LogDirFailureChannel, LogFileUtils, LogLoader, LogSegment, LogSegments, LogStartOffsetIncrementReason, OffsetMap, ProducerStateManager, ProducerStateManagerConfig}
+import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, CleanerConfig, LocalLog, LogAppendInfo, LogCleaningAbortedException, LogConfig, LogDirFailureChannel, LogFileUtils, LogLoader, LogSegment, LogSegments, LogStartOffsetIncrementReason, OffsetMap, ProducerStateManager, ProducerStateManagerConfig, UnifiedLog => JUnifiedLog}
 import org.apache.kafka.storage.internals.utils.Throttler
 import org.apache.kafka.storage.log.metrics.BrokerTopicStats
 import org.junit.jupiter.api.Assertions._
@@ -42,7 +42,7 @@ import java.io.{File, RandomAccessFile}
 import java.nio._
 import java.nio.charset.StandardCharsets
 import java.nio.file.Paths
-import java.util.Properties
+import java.util.{Optional, Properties}
 import java.util.concurrent.{ConcurrentHashMap, CountDownLatch, TimeUnit}
 import scala.collection._
 import scala.jdk.CollectionConverters._
@@ -183,13 +183,13 @@ class LogCleanerTest extends Logging {
     logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 1024 : java.lang.Integer)
     logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT + "," + TopicConfig.CLEANUP_POLICY_DELETE)
     val config = LogConfig.fromProps(logConfig.originals, logProps)
-    val topicPartition = UnifiedLog.parseTopicPartitionName(dir)
+    val topicPartition = JUnifiedLog.parseTopicPartitionName(dir)
     val logDirFailureChannel = new LogDirFailureChannel(10)
     val maxTransactionTimeoutMs = 5 * 60 * 1000
     val producerIdExpirationCheckIntervalMs = TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT
     val logSegments = new LogSegments(topicPartition)
-    val leaderEpochCache = UnifiedLog.createLeaderEpochCache(
-      dir, topicPartition, logDirFailureChannel, None, time.scheduler)
+    val leaderEpochCache = JUnifiedLog.createLeaderEpochCache(
+      dir, topicPartition, logDirFailureChannel, Optional.empty, time.scheduler)
     val producerStateManager = new ProducerStateManager(topicPartition, dir,
       maxTransactionTimeoutMs, producerStateManagerConfig, time)
     val offsets = new LogLoader(
@@ -1709,7 +1709,7 @@ class LogCleanerTest extends Logging {
 
     // 1) Simulate recovery just after .cleaned file is created, before rename to .swap
     //    On recovery, clean operation is aborted. All messages should be present in the log
-    log.logSegments.asScala.head.changeFileSuffixes("", UnifiedLog.CleanedFileSuffix)
+    log.logSegments.asScala.head.changeFileSuffixes("", JUnifiedLog.CLEANED_FILE_SUFFIX)
     for (file <- dir.listFiles if file.getName.endsWith(LogFileUtils.DELETED_FILE_SUFFIX)) {
       Utils.atomicMoveWithFallback(file.toPath, Paths.get(Utils.replaceSuffix(file.getPath, LogFileUtils.DELETED_FILE_SUFFIX, "")), false)
     }
@@ -1725,8 +1725,8 @@ class LogCleanerTest extends Logging {
 
     // 2) Simulate recovery just after .cleaned file is created, and a subset of them are renamed to .swap
     //    On recovery, clean operation is aborted. All messages should be present in the log
-    log.logSegments.asScala.head.changeFileSuffixes("", UnifiedLog.CleanedFileSuffix)
-    log.logSegments.asScala.head.log.renameTo(new File(Utils.replaceSuffix(log.logSegments.asScala.head.log.file.getPath, UnifiedLog.CleanedFileSuffix, UnifiedLog.SwapFileSuffix)))
+    log.logSegments.asScala.head.changeFileSuffixes("", JUnifiedLog.CLEANED_FILE_SUFFIX)
+    log.logSegments.asScala.head.log.renameTo(new File(Utils.replaceSuffix(log.logSegments.asScala.head.log.file.getPath, JUnifiedLog.CLEANED_FILE_SUFFIX, JUnifiedLog.SWAP_FILE_SUFFIX)))
     for (file <- dir.listFiles if file.getName.endsWith(LogFileUtils.DELETED_FILE_SUFFIX)) {
       Utils.atomicMoveWithFallback(file.toPath, Paths.get(Utils.replaceSuffix(file.getPath, LogFileUtils.DELETED_FILE_SUFFIX, "")), false)
     }
@@ -1742,7 +1742,7 @@ class LogCleanerTest extends Logging {
 
     // 3) Simulate recovery just after swap file is created, before old segment files are
     //    renamed to .deleted. Clean operation is resumed during recovery.
-    log.logSegments.asScala.head.changeFileSuffixes("", UnifiedLog.SwapFileSuffix)
+    log.logSegments.asScala.head.changeFileSuffixes("", JUnifiedLog.SWAP_FILE_SUFFIX)
     for (file <- dir.listFiles if file.getName.endsWith(LogFileUtils.DELETED_FILE_SUFFIX)) {
       Utils.atomicMoveWithFallback(file.toPath, Paths.get(Utils.replaceSuffix(file.getPath, LogFileUtils.DELETED_FILE_SUFFIX, "")), false)
     }
@@ -1763,7 +1763,7 @@ class LogCleanerTest extends Logging {
 
     // 4) Simulate recovery after swap file is created and old segments files are renamed
     //    to .deleted. Clean operation is resumed during recovery.
-    log.logSegments.asScala.head.changeFileSuffixes("", UnifiedLog.SwapFileSuffix)
+    log.logSegments.asScala.head.changeFileSuffixes("", JUnifiedLog.SWAP_FILE_SUFFIX)
     log = recoverAndCheck(config, cleanedKeys)
 
     // add some more messages and clean the log again
@@ -1781,7 +1781,7 @@ class LogCleanerTest extends Logging {
 
     // 5) Simulate recovery after a subset of swap files are renamed to regular files and old segments files are renamed
     //    to .deleted. Clean operation is resumed during recovery.
-    log.logSegments.asScala.head.timeIndex.file.renameTo(new File(Utils.replaceSuffix(log.logSegments.asScala.head.timeIndex.file.getPath, "", UnifiedLog.SwapFileSuffix)))
+    log.logSegments.asScala.head.timeIndex.file.renameTo(new File(Utils.replaceSuffix(log.logSegments.asScala.head.timeIndex.file.getPath, "", JUnifiedLog.SWAP_FILE_SUFFIX)))
     log = recoverAndCheck(config, cleanedKeys)
 
     // add some more messages and clean the log again
diff --git a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
index 17ef5f981fa..3bdf8a9436c 100644
--- a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
@@ -29,7 +29,7 @@ import org.apache.kafka.coordinator.transaction.TransactionLogConfig
 import org.apache.kafka.metadata.MockConfigRepository
 import org.apache.kafka.server.util.{MockTime, Scheduler}
 import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
-import org.apache.kafka.storage.internals.log.{AbortedTxn, CleanerConfig, EpochEntry, LocalLog, LogConfig, LogDirFailureChannel, LogFileUtils, LogLoader, LogOffsetMetadata, LogSegment, LogSegments, LogStartOffsetIncrementReason, OffsetIndex, ProducerStateManager, ProducerStateManagerConfig, SnapshotFile}
+import org.apache.kafka.storage.internals.log.{AbortedTxn, CleanerConfig, EpochEntry, LocalLog, LogConfig, LogDirFailureChannel, LogFileUtils, LogLoader, LogOffsetMetadata, LogSegment, LogSegments, LogStartOffsetIncrementReason, OffsetIndex, ProducerStateManager, ProducerStateManagerConfig, SnapshotFile, UnifiedLog => JUnifiedLog}
 import org.apache.kafka.storage.internals.checkpoint.CleanShutdownFileHandler
 import org.apache.kafka.storage.log.metrics.BrokerTopicStats
 import org.junit.jupiter.api.Assertions.{assertDoesNotThrow, assertEquals, assertFalse, assertNotEquals, assertThrows, assertTrue}
@@ -146,14 +146,14 @@ class LogLoaderTest {
             }
           }
           cleanShutdownInterceptedValue = hadCleanShutdown
-          val topicPartition = UnifiedLog.parseTopicPartitionName(logDir)
+          val topicPartition = JUnifiedLog.parseTopicPartitionName(logDir)
           val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig)
           val logRecoveryPoint = recoveryPoints.getOrDefault(topicPartition, 0L)
           val logStartOffset = logStartOffsets.getOrDefault(topicPartition, 0L)
           val logDirFailureChannel: LogDirFailureChannel = new LogDirFailureChannel(1)
           val segments = new LogSegments(topicPartition)
-          val leaderEpochCache = UnifiedLog.createLeaderEpochCache(
-            logDir, topicPartition, logDirFailureChannel, None, time.scheduler)
+          val leaderEpochCache = JUnifiedLog.createLeaderEpochCache(
+            logDir, topicPartition, logDirFailureChannel, Optional.empty, time.scheduler)
           val producerStateManager = new ProducerStateManager(topicPartition, logDir,
             this.maxTransactionTimeoutMs, this.producerStateManagerConfig, time)
           val logLoader = new LogLoader(logDir, topicPartition, config, time.scheduler, time,
@@ -279,7 +279,7 @@ class LogLoaderTest {
     def createLogWithInterceptedReads(recoveryPoint: Long): UnifiedLog = {
       val maxTransactionTimeoutMs = 5 * 60 * 1000
       val producerIdExpirationCheckIntervalMs = TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT
-      val topicPartition = UnifiedLog.parseTopicPartitionName(logDir)
+      val topicPartition = JUnifiedLog.parseTopicPartitionName(logDir)
       val logDirFailureChannel = new LogDirFailureChannel(10)
       // Intercept all segment read calls
       val interceptedLogSegments = new LogSegments(topicPartition) {
@@ -296,8 +296,8 @@ class LogLoaderTest {
           super.add(wrapper)
         }
       }
-      val leaderEpochCache = UnifiedLog.createLeaderEpochCache(
-        logDir, topicPartition, logDirFailureChannel, None, mockTime.scheduler)
+      val leaderEpochCache = JUnifiedLog.createLeaderEpochCache(
+        logDir, topicPartition, logDirFailureChannel, Optional.empty, mockTime.scheduler)
       val producerStateManager = new ProducerStateManager(topicPartition, logDir,
         maxTransactionTimeoutMs, producerStateManagerConfig, mockTime)
       val logLoader = new LogLoader(
@@ -414,12 +414,12 @@ class LogLoaderTest {
     when(stateManager.isEmpty).thenReturn(true)
     when(stateManager.firstUnstableOffset).thenReturn(Optional.empty[LogOffsetMetadata]())
 
-    val topicPartition = UnifiedLog.parseTopicPartitionName(logDir)
+    val topicPartition = JUnifiedLog.parseTopicPartitionName(logDir)
     val logDirFailureChannel: LogDirFailureChannel = new LogDirFailureChannel(1)
     val config = new LogConfig(new Properties())
     val segments = new LogSegments(topicPartition)
-    val leaderEpochCache = UnifiedLog.createLeaderEpochCache(
-      logDir, topicPartition, logDirFailureChannel, None, mockTime.scheduler)
+    val leaderEpochCache = JUnifiedLog.createLeaderEpochCache(
+      logDir, topicPartition, logDirFailureChannel, Optional.empty, mockTime.scheduler)
     val offsets = new LogLoader(
       logDir,
       topicPartition,
@@ -523,12 +523,12 @@ class LogLoaderTest {
     when(stateManager.producerStateManagerConfig).thenReturn(producerStateManagerConfig)
     when(stateManager.maxTransactionTimeoutMs).thenReturn(maxTransactionTimeoutMs)
 
-    val topicPartition = UnifiedLog.parseTopicPartitionName(logDir)
+    val topicPartition = JUnifiedLog.parseTopicPartitionName(logDir)
     val config = new LogConfig(new Properties())
     val logDirFailureChannel = null
     val segments = new LogSegments(topicPartition)
-    val leaderEpochCache = UnifiedLog.createLeaderEpochCache(
-      logDir, topicPartition, logDirFailureChannel, None, mockTime.scheduler)
+    val leaderEpochCache = JUnifiedLog.createLeaderEpochCache(
+      logDir, topicPartition, logDirFailureChannel, Optional.empty, mockTime.scheduler)
     val offsets = new LogLoader(
       logDir,
       topicPartition,
@@ -1057,7 +1057,7 @@ class LogLoaderTest {
     // Simulate recovery just after .cleaned file is created, before rename to .swap. On recovery, existing split
     // operation is aborted but the recovery process itself kicks off split which should complete.
     newSegments.reverse.foreach(segment => {
-      segment.changeFileSuffixes("", UnifiedLog.CleanedFileSuffix)
+      segment.changeFileSuffixes("", JUnifiedLog.CLEANED_FILE_SUFFIX)
       segment.truncateTo(0)
     })
     for (file <- logDir.listFiles if file.getName.endsWith(LogFileUtils.DELETED_FILE_SUFFIX))
@@ -1083,9 +1083,9 @@ class LogLoaderTest {
     // operation is aborted but the recovery process itself kicks off split which should complete.
     newSegments.reverse.foreach { segment =>
       if (segment != newSegments.last)
-        segment.changeFileSuffixes("", UnifiedLog.CleanedFileSuffix)
+        segment.changeFileSuffixes("", JUnifiedLog.CLEANED_FILE_SUFFIX)
       else
-        segment.changeFileSuffixes("", UnifiedLog.SwapFileSuffix)
+        segment.changeFileSuffixes("", JUnifiedLog.SWAP_FILE_SUFFIX)
       segment.truncateTo(0)
     }
     for (file <- logDir.listFiles if file.getName.endsWith(LogFileUtils.DELETED_FILE_SUFFIX))
@@ -1110,7 +1110,7 @@ class LogLoaderTest {
     // Simulate recovery right after all new segments have been renamed to .swap. On recovery, existing split operation
     // is completed and the old segment must be deleted.
     newSegments.reverse.foreach(segment => {
-      segment.changeFileSuffixes("", UnifiedLog.SwapFileSuffix)
+      segment.changeFileSuffixes("", JUnifiedLog.SWAP_FILE_SUFFIX)
     })
     for (file <- logDir.listFiles if file.getName.endsWith(LogFileUtils.DELETED_FILE_SUFFIX))
       Utils.atomicMoveWithFallback(file.toPath, Paths.get(Utils.replaceSuffix(file.getPath, LogFileUtils.DELETED_FILE_SUFFIX, "")))
@@ -1136,7 +1136,7 @@ class LogLoaderTest {
 
     // Simulate recovery right after all new segments have been renamed to .swap and old segment has been deleted. On
     // recovery, existing split operation is completed.
-    newSegments.reverse.foreach(_.changeFileSuffixes("", UnifiedLog.SwapFileSuffix))
+    newSegments.reverse.foreach(_.changeFileSuffixes("", JUnifiedLog.SWAP_FILE_SUFFIX))
 
     for (file <- logDir.listFiles if file.getName.endsWith(LogFileUtils.DELETED_FILE_SUFFIX))
       Utils.delete(file)
@@ -1162,7 +1162,7 @@ class LogLoaderTest {
 
     // Simulate recovery right after one of the new segment has been renamed to .swap and the other to .log. On
     // recovery, existing split operation is completed.
-    newSegments.last.changeFileSuffixes("", UnifiedLog.SwapFileSuffix)
+    newSegments.last.changeFileSuffixes("", JUnifiedLog.SWAP_FILE_SUFFIX)
 
     // Truncate the old segment
     segmentWithOverflow.truncateTo(0)
@@ -1597,7 +1597,7 @@ class LogLoaderTest {
   def testLogStartOffsetWhenRemoteStorageIsEnabled(isRemoteLogEnabled: Boolean,
                                                    expectedLogStartOffset: Long): Unit = {
     val logDirFailureChannel = null
-    val topicPartition = UnifiedLog.parseTopicPartitionName(logDir)
+    val topicPartition = JUnifiedLog.parseTopicPartitionName(logDir)
     val logConfig = LogTestUtils.createLogConfig()
     val stateManager: ProducerStateManager = mock(classOf[ProducerStateManager])
     when(stateManager.isEmpty).thenReturn(true)
@@ -1619,8 +1619,8 @@ class LogLoaderTest {
     log.logSegments.forEach(segment => segments.add(segment))
     assertEquals(5, segments.firstSegment.get.baseOffset)
 
-    val leaderEpochCache = UnifiedLog.createLeaderEpochCache(
-      logDir, topicPartition, logDirFailureChannel, None, mockTime.scheduler)
+    val leaderEpochCache = JUnifiedLog.createLeaderEpochCache(
+      logDir, topicPartition, logDirFailureChannel, Optional.empty, mockTime.scheduler)
     val offsets = new LogLoader(
       logDir,
       topicPartition,
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index 3fc2b09d262..d9ac6bc6f0e 100755
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -44,7 +44,7 @@ import java.util.{Collections, Optional, OptionalLong, Properties}
 import org.apache.kafka.server.metrics.KafkaYammerMetrics
 import org.apache.kafka.server.storage.log.FetchIsolation
 import org.apache.kafka.server.util.{FileLock, KafkaScheduler, MockTime, Scheduler}
-import org.apache.kafka.storage.internals.log.{CleanerConfig, FetchDataInfo, LogConfig, LogDirFailureChannel, LogStartOffsetIncrementReason, ProducerStateManagerConfig, RemoteIndexCache}
+import org.apache.kafka.storage.internals.log.{CleanerConfig, FetchDataInfo, LogConfig, LogDirFailureChannel, LogMetricNames, LogStartOffsetIncrementReason, ProducerStateManagerConfig, RemoteIndexCache, UnifiedLog => JUnifiedLog}
 import org.apache.kafka.storage.internals.checkpoint.{CleanShutdownFileHandler, OffsetCheckpointFile}
 import org.apache.kafka.storage.log.metrics.BrokerTopicStats
 import org.junit.jupiter.api.function.Executable
@@ -540,7 +540,7 @@ class LogManagerTest {
     assertEquals(1, invokedCount)
     assertTrue(
       logDir.listFiles().toSet
-      .exists(f => f.getName.startsWith(testTopic) && f.getName.endsWith(UnifiedLog.StrayDirSuffix))
+      .exists(f => f.getName.startsWith(testTopic) && f.getName.endsWith(JUnifiedLog.STRAY_DIR_SUFFIX))
     )
   }
 
@@ -952,7 +952,7 @@ class LogManagerTest {
       val dir: File = invocation.getArgument(0)
       val topicConfigOverrides: mutable.Map[String, LogConfig] = invocation.getArgument(5)
 
-      val topicPartition = UnifiedLog.parseTopicPartitionName(dir)
+      val topicPartition = JUnifiedLog.parseTopicPartitionName(dir)
       val config = topicConfigOverrides.getOrElse(topicPartition.topic, logConfig)
 
       UnifiedLog(
@@ -1028,7 +1028,7 @@ class LogManagerTest {
     val metricTag = s"topic=${tp.topic},partition=${tp.partition}"
 
     def verifyMetrics(): Unit = {
-      assertEquals(LogMetricNames.allMetricNames.size, logMetrics.size)
+      assertEquals(LogMetricNames.ALL_METRIC_NAMES.size, logMetrics.size)
       logMetrics.foreach { metric =>
         assertTrue(metric.getMBeanName.contains(metricTag))
       }
@@ -1068,7 +1068,7 @@ class LogManagerTest {
     val metricTag = s"topic=${tp.topic},partition=${tp.partition}"
 
     def verifyMetrics(logCount: Int): Unit = {
-      assertEquals(LogMetricNames.allMetricNames.size * logCount, logMetrics.size)
+      assertEquals(LogMetricNames.ALL_METRIC_NAMES.size * logCount, logMetrics.size)
       logMetrics.foreach { metric =>
         assertTrue(metric.getMBeanName.contains(metricTag))
       }
diff --git a/core/src/test/scala/unit/kafka/log/LogTestUtils.scala b/core/src/test/scala/unit/kafka/log/LogTestUtils.scala
index bf2b71676fa..9d940419423 100644
--- a/core/src/test/scala/unit/kafka/log/LogTestUtils.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTestUtils.scala
@@ -36,7 +36,7 @@ import org.apache.kafka.server.config.ServerLogConfigs
 import org.apache.kafka.server.storage.log.FetchIsolation
 import org.apache.kafka.server.util.Scheduler
 import org.apache.kafka.storage.internals.log.LogConfig.{DEFAULT_REMOTE_LOG_COPY_DISABLE_CONFIG, DEFAULT_REMOTE_LOG_DELETE_ON_DISABLE_CONFIG}
-import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, FetchDataInfo, LazyIndex, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogOffsetsListener, LogSegment, ProducerStateManager, ProducerStateManagerConfig, TransactionIndex}
+import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, FetchDataInfo, LazyIndex, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogOffsetsListener, LogSegment, ProducerStateManager, ProducerStateManagerConfig, TransactionIndex, UnifiedLog => JUnifiedLog}
 import org.apache.kafka.storage.log.metrics.BrokerTopicStats
 
 import scala.jdk.CollectionConverters._
@@ -52,7 +52,7 @@ object LogTestUtils {
     val ms = FileRecords.open(LogFileUtils.logFile(logDir, offset))
     val idx = LazyIndex.forOffset(LogFileUtils.offsetIndexFile(logDir, offset), offset, 1000)
     val timeIdx = LazyIndex.forTime(LogFileUtils.timeIndexFile(logDir, offset), offset, 1500)
-    val txnIndex = new TransactionIndex(offset, UnifiedLog.transactionIndexFile(logDir, offset))
+    val txnIndex = new TransactionIndex(offset, LogFileUtils.transactionIndexFile(logDir, offset, ""))
 
     new LogSegment(ms, idx, timeIdx, txnIndex, offset, indexIntervalBytes, 0, time)
   }
@@ -209,8 +209,8 @@ object LogTestUtils {
     time.sleep(config.fileDeleteDelayMs + 1)
     for (file <- logDir.listFiles) {
       assertFalse(file.getName.endsWith(LogFileUtils.DELETED_FILE_SUFFIX), "Unexpected .deleted file after recovery")
-      assertFalse(file.getName.endsWith(UnifiedLog.CleanedFileSuffix), "Unexpected .cleaned file after recovery")
-      assertFalse(file.getName.endsWith(UnifiedLog.SwapFileSuffix), "Unexpected .swap file after recovery")
+      assertFalse(file.getName.endsWith(JUnifiedLog.CLEANED_FILE_SUFFIX), "Unexpected .cleaned file after recovery")
+      assertFalse(file.getName.endsWith(JUnifiedLog.SWAP_FILE_SUFFIX), "Unexpected .swap file after recovery")
     }
     assertEquals(expectedKeys, keysInLog(recoveredLog))
     assertFalse(hasOffsetOverflow(recoveredLog))
diff --git a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
index 86f96658f89..88e7e56a901 100755
--- a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
@@ -42,7 +42,7 @@ import org.apache.kafka.server.storage.log.{FetchIsolation, UnexpectedAppendOffs
 import org.apache.kafka.server.util.{KafkaScheduler, MockTime, Scheduler}
 import org.apache.kafka.storage.internals.checkpoint.{LeaderEpochCheckpointFile, PartitionMetadataFile}
 import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
-import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, EpochEntry, LogConfig, LogFileUtils, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogSegment, LogSegments, LogStartOffsetIncrementReason, OffsetResultHolder, OffsetsOutOfOrderException, ProducerStateManager, ProducerStateManagerConfig, RecordValidationException, VerificationGuard}
+import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, EpochEntry, LogConfig, LogFileUtils, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogSegment, LogSegments, LogStartOffsetIncrementReason, OffsetResultHolder, OffsetsOutOfOrderException, ProducerStateManager, ProducerStateManagerConfig, RecordValidationException, UnifiedLog => JUnifiedLog, VerificationGuard}
 import org.apache.kafka.storage.internals.utils.Throttler
 import org.apache.kafka.storage.log.metrics.{BrokerTopicMetrics, BrokerTopicStats}
 import org.junit.jupiter.api.Assertions._
@@ -527,13 +527,6 @@ class UnifiedLogTest {
     assertLsoBoundedFetches()
   }
 
-  @Test
-  def testOffsetFromProducerSnapshotFile(): Unit = {
-    val offset = 23423423L
-    val snapshotFile = LogFileUtils.producerSnapshotFile(tmpDir, offset)
-    assertEquals(offset, UnifiedLog.offsetFromFile(snapshotFile))
-  }
-
   /**
    * Tests for time based log roll. This test appends messages then changes the time
    * using the mock clock to force the log to roll and checks the number of segments.
@@ -2495,8 +2488,8 @@ class UnifiedLogTest {
     assertEquals(Some(5), log.latestEpoch)
 
     // Ensure that after a directory rename, the epoch cache is written to the right location
-    val tp = UnifiedLog.parseTopicPartitionName(log.dir)
-    log.renameDir(UnifiedLog.logDeleteDirName(tp), true)
+    val tp = JUnifiedLog.parseTopicPartitionName(log.dir)
+    log.renameDir(JUnifiedLog.logDeleteDirName(tp), true)
     log.appendAsLeader(TestUtils.records(List(new SimpleRecord("foo".getBytes()))), leaderEpoch = 10)
     assertEquals(Some(10), log.latestEpoch)
     assertTrue(LeaderEpochCheckpointFile.newFile(log.dir).exists())
@@ -2516,8 +2509,8 @@ class UnifiedLogTest {
     assertEquals(Some(5), log.latestEpoch)
 
     // Ensure that after a directory rename, the partition metadata file is written to the right location.
-    val tp = UnifiedLog.parseTopicPartitionName(log.dir)
-    log.renameDir(UnifiedLog.logDeleteDirName(tp), true)
+    val tp = JUnifiedLog.parseTopicPartitionName(log.dir)
+    log.renameDir(JUnifiedLog.logDeleteDirName(tp), true)
     log.appendAsLeader(TestUtils.records(List(new SimpleRecord("foo".getBytes()))), leaderEpoch = 10)
     assertEquals(Some(10), log.latestEpoch)
     assertTrue(PartitionMetadataFile.newFile(log.dir).exists())
@@ -2539,8 +2532,8 @@ class UnifiedLogTest {
     log.partitionMetadataFile.get.record(topicId)
 
     // Ensure that after a directory rename, the partition metadata file is written to the right location.
-    val tp = UnifiedLog.parseTopicPartitionName(log.dir)
-    log.renameDir(UnifiedLog.logDeleteDirName(tp), true)
+    val tp = JUnifiedLog.parseTopicPartitionName(log.dir)
+    log.renameDir(JUnifiedLog.logDeleteDirName(tp), true)
     assertTrue(PartitionMetadataFile.newFile(log.dir).exists())
     assertFalse(PartitionMetadataFile.newFile(this.logDir).exists())
 
@@ -3531,7 +3524,7 @@ class UnifiedLogTest {
 
   @Test
   def testLoadPartitionDirWithNoSegmentsShouldNotThrow(): Unit = {
-    val dirName = UnifiedLog.logDeleteDirName(new TopicPartition("foo", 3))
+    val dirName = JUnifiedLog.logDeleteDirName(new TopicPartition("foo", 3))
     val logDir = new File(tmpDir, dirName)
     logDir.mkdirs()
     val logConfig = LogTestUtils.createLogConfig()
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 2dceca63dbd..cb8eead59a2 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -19,7 +19,6 @@ package kafka.server
 
 import kafka.cluster.Partition
 import kafka.coordinator.transaction.{InitProducerIdResult, TransactionCoordinator}
-import kafka.log.UnifiedLog
 import kafka.network.RequestChannel
 import kafka.server.QuotaFactory.QuotaManagers
 import kafka.server.metadata.KRaftMetadataCache
@@ -94,7 +93,7 @@ import org.apache.kafka.server.share.context.{FinalContext, ShareSessionContext}
 import org.apache.kafka.server.share.session.{ShareSession, ShareSessionKey}
 import org.apache.kafka.server.storage.log.{FetchParams, FetchPartitionData}
 import org.apache.kafka.server.util.{FutureUtils, MockTime}
-import org.apache.kafka.storage.internals.log.{AppendOrigin, LogConfig}
+import org.apache.kafka.storage.internals.log.{AppendOrigin, LogConfig, UnifiedLog}
 import org.apache.kafka.storage.log.metrics.BrokerTopicStats
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{AfterEach, Test}
@@ -3847,7 +3846,7 @@ class KafkaApisTest extends Logging {
       any[Seq[(TopicIdPartition, FetchPartitionData)] => Unit]()
     )).thenAnswer(invocation => {
       val callback = invocation.getArgument(3).asInstanceOf[Seq[(TopicIdPartition, FetchPartitionData)] => Unit]
-      callback(Seq(tidp -> new FetchPartitionData(Errors.NOT_LEADER_OR_FOLLOWER, UnifiedLog.UnknownOffset, UnifiedLog.UnknownOffset, MemoryRecords.EMPTY,
+      callback(Seq(tidp -> new FetchPartitionData(Errors.NOT_LEADER_OR_FOLLOWER, UnifiedLog.UNKNOWN_OFFSET, UnifiedLog.UNKNOWN_OFFSET, MemoryRecords.EMPTY,
         Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false)))
     })
 
diff --git a/core/src/test/scala/unit/kafka/server/KafkaRaftServerTest.scala b/core/src/test/scala/unit/kafka/server/KafkaRaftServerTest.scala
index 3e8c095b37e..1006b7c1fef 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaRaftServerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaRaftServerTest.scala
@@ -19,7 +19,6 @@ package kafka.server
 import java.io.File
 import java.nio.file.Files
 import java.util.{Optional, Properties}
-import kafka.log.UnifiedLog
 import org.apache.kafka.common.{KafkaException, Uuid}
 import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.metadata.bootstrap.{BootstrapDirectory, BootstrapMetadata}
@@ -28,6 +27,7 @@ import org.apache.kafka.raft.QuorumConfig
 import org.apache.kafka.network.SocketServerConfigs
 import org.apache.kafka.server.config.{KRaftConfigs, ServerLogConfigs}
 import org.apache.kafka.server.common.MetadataVersion
+import org.apache.kafka.storage.internals.log.UnifiedLog
 import org.apache.kafka.test.TestUtils
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.Test
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 78ae078ee60..0292f31b7db 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -68,7 +68,7 @@ import org.apache.kafka.server.util.timer.MockTimer
 import org.apache.kafka.server.util.{MockScheduler, MockTime}
 import org.apache.kafka.storage.internals.checkpoint.LazyOffsetCheckpoints
 import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
-import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, LocalLog, LogConfig, LogDirFailureChannel, LogLoader, LogOffsetMetadata, LogOffsetSnapshot, LogSegments, ProducerStateManager, ProducerStateManagerConfig, RemoteStorageFetchInfo, VerificationGuard}
+import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, LocalLog, LogConfig, LogDirFailureChannel, LogLoader, LogOffsetMetadata, LogOffsetSnapshot, LogSegments, ProducerStateManager, ProducerStateManagerConfig, RemoteStorageFetchInfo, UnifiedLog => JUnifiedLog, VerificationGuard}
 import org.apache.kafka.storage.log.metrics.BrokerTopicStats
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{AfterAll, AfterEach, BeforeEach, Test}
@@ -2700,8 +2700,8 @@ class ReplicaManagerTest {
     val maxTransactionTimeoutMs = 30000
     val maxProducerIdExpirationMs = 30000
     val segments = new LogSegments(tp)
-    val leaderEpochCache = UnifiedLog.createLeaderEpochCache(
-      logDir, tp, mockLogDirFailureChannel, None, time.scheduler)
+    val leaderEpochCache = JUnifiedLog.createLeaderEpochCache(
+      logDir, tp, mockLogDirFailureChannel, Optional.empty, time.scheduler)
     val producerStateManager = new ProducerStateManager(tp, logDir,
       maxTransactionTimeoutMs, new ProducerStateManagerConfig(maxProducerIdExpirationMs, true), time)
     val offsets = new LogLoader(
@@ -5877,7 +5877,7 @@ class ReplicaManagerTest {
     assertEquals(tpId0, fetch.head._1)
     val fetchInfo = fetch.head._2.info
     assertEquals(1L, fetchInfo.fetchOffsetMetadata.messageOffset)
-    assertEquals(UnifiedLog.UnknownOffset, fetchInfo.fetchOffsetMetadata.segmentBaseOffset)
+    assertEquals(JUnifiedLog.UNKNOWN_OFFSET, fetchInfo.fetchOffsetMetadata.segmentBaseOffset)
     assertEquals(-1, fetchInfo.fetchOffsetMetadata.relativePositionInSegment)
     assertEquals(MemoryRecords.EMPTY, fetchInfo.records)
     assertTrue(fetchInfo.delayedRemoteStorageFetch.isPresent)
diff --git a/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala b/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala
index c5671926dbf..3c42eb0c3a6 100644
--- a/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala
@@ -16,14 +16,14 @@
  */
 package kafka.utils
 
-import java.util.Properties
+import java.util.{Optional, Properties}
 import java.util.concurrent.atomic._
 import java.util.concurrent.{ConcurrentHashMap, CountDownLatch, Executors, TimeUnit}
 import kafka.log.UnifiedLog
 import kafka.utils.TestUtils.retry
 import org.apache.kafka.coordinator.transaction.TransactionLogConfig
 import org.apache.kafka.server.util.{KafkaScheduler, MockTime}
-import org.apache.kafka.storage.internals.log.{LocalLog, LogConfig, LogDirFailureChannel, LogLoader, LogSegments, ProducerStateManager, ProducerStateManagerConfig}
+import org.apache.kafka.storage.internals.log.{LocalLog, LogConfig, LogDirFailureChannel, LogLoader, LogSegments, ProducerStateManager, ProducerStateManagerConfig, UnifiedLog => JUnifiedLog}
 import org.apache.kafka.storage.log.metrics.BrokerTopicStats
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, Timeout}
@@ -135,11 +135,11 @@ class SchedulerTest {
     val maxTransactionTimeoutMs = 5 * 60 * 1000
     val maxProducerIdExpirationMs = TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT
     val producerIdExpirationCheckIntervalMs = TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT
-    val topicPartition = UnifiedLog.parseTopicPartitionName(logDir)
+    val topicPartition = JUnifiedLog.parseTopicPartitionName(logDir)
     val logDirFailureChannel = new LogDirFailureChannel(10)
     val segments = new LogSegments(topicPartition)
-    val leaderEpochCache = UnifiedLog.createLeaderEpochCache(
-      logDir, topicPartition, logDirFailureChannel, None, mockTime.scheduler)
+    val leaderEpochCache = JUnifiedLog.createLeaderEpochCache(
+      logDir, topicPartition, logDirFailureChannel, Optional.empty, mockTime.scheduler)
     val producerStateManager = new ProducerStateManager(topicPartition, logDir,
       maxTransactionTimeoutMs, new ProducerStateManagerConfig(maxProducerIdExpirationMs, false), mockTime)
     val offsets = new LogLoader(
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 5c2192662e3..a8d99191a20 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -17,7 +17,7 @@
 package kafka.utils
 
 import com.yammer.metrics.core.{Histogram, Meter}
-import kafka.log._
+import kafka.log.LogManager
 import kafka.network.RequestChannel
 import kafka.security.JaasTestUtils
 import kafka.server._
@@ -56,7 +56,7 @@ import org.apache.kafka.server.config.{DelegationTokenManagerConfigs, KRaftConfi
 import org.apache.kafka.server.metrics.KafkaYammerMetrics
 import org.apache.kafka.server.util.MockTime
 import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpointFile
-import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig, LogDirFailureChannel, ProducerStateManagerConfig}
+import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig, LogDirFailureChannel, ProducerStateManagerConfig, UnifiedLog => JUnifiedLog}
 import org.apache.kafka.storage.log.metrics.BrokerTopicStats
 import org.apache.kafka.test.{TestUtils => JTestUtils}
 import org.junit.jupiter.api.Assertions._
@@ -961,7 +961,7 @@ object TestUtils extends Logging {
                        time: MockTime = new MockTime(),
                        recoveryThreadsPerDataDir: Int = 4,
                        transactionVerificationEnabled: Boolean = false,
-                       log: Option[UnifiedLog] = None,
+                       log: Option[kafka.log.UnifiedLog] = None,
                        remoteStorageSystemEnable: Boolean = false,
                        initialTaskDelayMs: Long = ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_DEFAULT): LogManager = {
     val logManager = new LogManager(logDirs = logDirs.map(_.getAbsoluteFile),
@@ -1109,7 +1109,7 @@ object TestUtils extends Logging {
           !util.Arrays.asList(new File(logDir).list()).asScala.exists { partitionDirectoryNames =>
             partitionDirectoryNames.exists { directoryName =>
               directoryName.startsWith(tp.topic + "-" + tp.partition) &&
-                directoryName.endsWith(UnifiedLog.DeleteDirSuffix)
+                directoryName.endsWith(JUnifiedLog.DELETE_DIR_SUFFIX)
             }
           }
         }
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/BaseRecordBatchBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/BaseRecordBatchBenchmark.java
index 1313575a957..dc65c54e97e 100644
--- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/BaseRecordBatchBenchmark.java
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/BaseRecordBatchBenchmark.java
@@ -16,8 +16,6 @@
  */
 package org.apache.kafka.jmh.record;
 
-import kafka.log.UnifiedLog;
-
 import org.apache.kafka.common.compress.Compression;
 import org.apache.kafka.common.header.Header;
 import org.apache.kafka.common.record.AbstractRecords;
@@ -28,6 +26,7 @@ import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.server.common.RequestLocal;
 import org.apache.kafka.storage.internals.log.LogValidator;
+import org.apache.kafka.storage.internals.log.UnifiedLog;
 import org.apache.kafka.storage.log.metrics.BrokerTopicStats;
 
 import org.openjdk.jmh.annotations.Param;
diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogMetricNames.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogMetricNames.java
new file mode 100644
index 00000000000..47230096701
--- /dev/null
+++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogMetricNames.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import java.util.List;
+
+public class LogMetricNames {
+
+    public static final String NUM_LOG_SEGMENTS = "NumLogSegments";
+    public static final String LOG_START_OFFSET = "LogStartOffset";
+    public static final String LOG_END_OFFSET = "LogEndOffset";
+    public static final String SIZE = "Size";
+
+    public static final List ALL_METRIC_NAMES = List.of(NUM_LOG_SEGMENTS, LOG_START_OFFSET, LOG_END_OFFSET, SIZE);
+}
diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java
index 3d2ff74145b..ebb4058e419 100644
--- a/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java
+++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java
@@ -17,16 +17,22 @@
 package org.apache.kafka.storage.internals.log;
 
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.internals.Topic;
 import org.apache.kafka.common.record.FileRecords;
 import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.common.record.Records;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig;
 import org.apache.kafka.server.util.Scheduler;
+import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile;
+import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache;
+import org.apache.kafka.storage.log.metrics.BrokerTopicMetrics;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -39,6 +45,16 @@ public class UnifiedLog {
 
     private static final Logger LOG = LoggerFactory.getLogger(UnifiedLog.class);
 
+    public static final String LOG_FILE_SUFFIX = LogFileUtils.LOG_FILE_SUFFIX;
+    public static final String INDEX_FILE_SUFFIX = LogFileUtils.INDEX_FILE_SUFFIX;
+    public static final String TIME_INDEX_FILE_SUFFIX = LogFileUtils.TIME_INDEX_FILE_SUFFIX;
+    public static final String TXN_INDEX_FILE_SUFFIX = LogFileUtils.TXN_INDEX_FILE_SUFFIX;
+    public static final String CLEANED_FILE_SUFFIX = LogFileUtils.CLEANED_FILE_SUFFIX;
+    public static final String SWAP_FILE_SUFFIX = LogFileUtils.SWAP_FILE_SUFFIX;
+    public static final String DELETE_DIR_SUFFIX = LogFileUtils.DELETE_DIR_SUFFIX;
+    public static final String STRAY_DIR_SUFFIX = LogFileUtils.STRAY_DIR_SUFFIX;
+    public static final long UNKNOWN_OFFSET = LocalLog.UNKNOWN_OFFSET;
+
     /**
      * Rebuilds producer state until the provided lastOffset. This function may be called from the
      * recovery code path, and thus must be free of all side effects, i.e. it must not update any
@@ -129,8 +145,8 @@ public class UnifiedLog {
             }
             producerStateManager.updateMapEndOffset(lastOffset);
             producerStateManager.takeSnapshot();
-            LOG.info(logPrefix + "Producer state recovery took " + (segmentRecoveryStart - producerStateLoadStart) + "ms for snapshot load " +
-                    "and " + (time.milliseconds() - segmentRecoveryStart) + "ms for segment recovery from offset " + lastOffset);
+            LOG.info("{}Producer state recovery took {}ms for snapshot load and {}ms for segment recovery from offset {}",
+                    logPrefix, segmentRecoveryStart - producerStateLoadStart, time.milliseconds() - segmentRecoveryStart, lastOffset);
         }
     }
 
@@ -206,4 +222,105 @@ public class UnifiedLog {
         }
         return completedTxn;
     }
+
+    public static boolean isRemoteLogEnabled(boolean remoteStorageSystemEnable, LogConfig config, String topic) {
+        // Remote log is enabled only for non-compact and non-internal topics
+        return remoteStorageSystemEnable &&
+                !(config.compact || Topic.isInternal(topic)
+                        || TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_NAME.equals(topic)
+                        || Topic.CLUSTER_METADATA_TOPIC_NAME.equals(topic)) &&
+                config.remoteStorageEnable();
+    }
+
+    // Visible for benchmarking
+    public static LogValidator.MetricsRecorder newValidatorMetricsRecorder(BrokerTopicMetrics allTopicsStats) {
+        return new LogValidator.MetricsRecorder() {
+            public void recordInvalidMagic() {
+                allTopicsStats.invalidMagicNumberRecordsPerSec().mark();
+            }
+
+            public void recordInvalidOffset() {
+                allTopicsStats.invalidOffsetOrSequenceRecordsPerSec().mark();
+            }
+
+            public void recordInvalidSequence() {
+                allTopicsStats.invalidOffsetOrSequenceRecordsPerSec().mark();
+            }
+
+            public void recordInvalidChecksums() {
+                allTopicsStats.invalidMessageCrcRecordsPerSec().mark();
+            }
+
+            public void recordNoKeyCompactedTopic() {
+                allTopicsStats.noKeyCompactedTopicRecordsPerSec().mark();
+            }
+        };
+    }
+
+    /**
+     * Create a new LeaderEpochFileCache instance and load the epoch entries from the backing checkpoint file or
+     * the provided currentCache (if not empty).
+     *
+     * @param dir                  The directory in which the log will reside
+     * @param topicPartition       The topic partition
+     * @param logDirFailureChannel The LogDirFailureChannel to asynchronously handle log dir failure
+     * @param currentCache         The current LeaderEpochFileCache instance (if any)
+     * @param scheduler            The scheduler for executing asynchronous tasks
+     * @return The new LeaderEpochFileCache instance
+     */
+    public static LeaderEpochFileCache createLeaderEpochCache(File dir,
+                                                                             TopicPartition topicPartition,
+                                                                             LogDirFailureChannel logDirFailureChannel,
+                                                                             Optional currentCache,
+                                                                             Scheduler scheduler) throws IOException {
+        File leaderEpochFile = LeaderEpochCheckpointFile.newFile(dir);
+        LeaderEpochCheckpointFile checkpointFile = new LeaderEpochCheckpointFile(leaderEpochFile, logDirFailureChannel);
+        return currentCache.map(cache -> cache.withCheckpoint(checkpointFile))
+                .orElse(new LeaderEpochFileCache(topicPartition, checkpointFile, scheduler));
+
+    }
+
+    public static LogSegment createNewCleanedSegment(File dir, LogConfig logConfig, long baseOffset) throws IOException {
+        return LocalLog.createNewCleanedSegment(dir, logConfig, baseOffset);
+    }
+
+    public static long localRetentionMs(LogConfig config, boolean remoteLogEnabledAndRemoteCopyEnabled) {
+        return remoteLogEnabledAndRemoteCopyEnabled ? config.localRetentionMs() : config.retentionMs;
+    }
+
+    public static long localRetentionSize(LogConfig config, boolean remoteLogEnabledAndRemoteCopyEnabled) {
+        return remoteLogEnabledAndRemoteCopyEnabled ? config.localRetentionBytes() : config.retentionSize;
+    }
+
+    public static String logDeleteDirName(TopicPartition topicPartition) {
+        return LocalLog.logDeleteDirName(topicPartition);
+    }
+
+    public static String logFutureDirName(TopicPartition topicPartition) {
+        return LocalLog.logFutureDirName(topicPartition);
+    }
+
+    public static String logStrayDirName(TopicPartition topicPartition) {
+        return LocalLog.logStrayDirName(topicPartition);
+    }
+
+    public static String logDirName(TopicPartition topicPartition) {
+        return LocalLog.logDirName(topicPartition);
+    }
+
+    public static File transactionIndexFile(File dir, long offset, String suffix) {
+        return LogFileUtils.transactionIndexFile(dir, offset, suffix);
+    }
+
+    public static long offsetFromFile(File file) {
+        return LogFileUtils.offsetFromFile(file);
+    }
+
+    public static long sizeInBytes(Collection segments) {
+        return LogSegments.sizeInBytes(segments);
+    }
+
+    public static TopicPartition parseTopicPartitionName(File dir) throws IOException {
+        return LocalLog.parseTopicPartitionName(dir);
+    }
 }
diff --git a/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java b/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java
new file mode 100644
index 00000000000..ea14932ff20
--- /dev/null
+++ b/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class UnifiedLogTest {
+
+    private final File tmpDir = TestUtils.tempDirectory();
+
+    @Test
+    public void testOffsetFromProducerSnapshotFile() {
+        long offset = 23423423L;
+        File snapshotFile = LogFileUtils.producerSnapshotFile(tmpDir, offset);
+        assertEquals(offset, UnifiedLog.offsetFromFile(snapshotFile));
+    }
+}
diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestContext.java b/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestContext.java
index 2fdb9483fe6..ea17931f778 100644
--- a/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestContext.java
+++ b/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestContext.java
@@ -302,7 +302,7 @@ public final class TieredStorageTestContext implements AutoCloseable {
 
     // unused now, but it can be reused later as this is an utility method.
     public Optional leaderEpochFileCache(int brokerId, TopicPartition partition) {
-        return log(brokerId, partition).map(log -> log.leaderEpochCache());
+        return log(brokerId, partition).map(UnifiedLog::leaderEpochCache);
     }
 
     public List remoteStorageManagers() {
diff --git a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java
index 2bedb966adb..50cb9d7e3d8 100644
--- a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java
+++ b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java
@@ -17,7 +17,6 @@
 
 package org.apache.kafka.common.test;
 
-import kafka.log.UnifiedLog;
 import kafka.network.SocketServer;
 import kafka.server.BrokerServer;
 import kafka.server.ControllerServer;
@@ -51,6 +50,7 @@ import org.apache.kafka.common.test.api.Type;
 import org.apache.kafka.server.authorizer.Authorizer;
 import org.apache.kafka.server.fault.FaultHandlerException;
 import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpointFile;
+import org.apache.kafka.storage.internals.log.UnifiedLog;
 
 import java.io.File;
 import java.io.IOException;
@@ -345,7 +345,7 @@ public interface ClusterInstance {
                     topicPartitions.stream().allMatch(tp ->
                         Arrays.stream(Objects.requireNonNull(new File(logDir).list())).noneMatch(partitionDirectoryName ->
                             partitionDirectoryName.startsWith(tp.topic() + "-" + tp.partition()) &&
-                                partitionDirectoryName.endsWith(UnifiedLog.DeleteDirSuffix())))
+                                partitionDirectoryName.endsWith(UnifiedLog.DELETE_DIR_SUFFIX)))
                 )
             ), "Failed to hard-delete the delete directory");
         }