MINOR: Cleanups in CoreUtils (#19175)

Delete unused methods in CoreUtils and switch to Utils.newInstance().

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Mickael Maison 2025-03-12 19:43:30 +01:00 committed by GitHub
parent 25f8bf82f7
commit 55d65cb3ba
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 15 additions and 109 deletions

View File

@ -202,6 +202,7 @@ public class UtilsTest {
assertEquals(10, Utils.abs(10));
assertEquals(0, Utils.abs(0));
assertEquals(1, Utils.abs(-1));
assertEquals(Integer.MAX_VALUE, Utils.abs(Integer.MAX_VALUE));
}
@Test

View File

@ -21,8 +21,9 @@
package kafka.metrics
import kafka.utils.{CoreUtils, VerifiableProperties}
import java.util.concurrent.atomic.AtomicBoolean
import org.apache.kafka.common.utils.Utils
import java.util.concurrent.atomic.AtomicBoolean
import scala.collection.Seq
import scala.collection.mutable.ArrayBuffer
@ -62,7 +63,7 @@ object KafkaMetricsReporter {
val metricsConfig = new KafkaMetricsConfig(verifiableProps)
if (metricsConfig.reporters.nonEmpty) {
metricsConfig.reporters.foreach(reporterType => {
val reporter = CoreUtils.createObject[KafkaMetricsReporter](reporterType)
val reporter = Utils.newInstance(reporterType, classOf[KafkaMetricsReporter])
reporter.init(verifiableProps)
reporters += reporter
reporter match {

View File

@ -45,7 +45,7 @@ import org.apache.kafka.common.replica._
import org.apache.kafka.common.requests.FetchRequest.PartitionData
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.requests._
import org.apache.kafka.common.utils.{Exit, Time}
import org.apache.kafka.common.utils.{Exit, Time, Utils}
import org.apache.kafka.common.{IsolationLevel, Node, TopicIdPartition, TopicPartition, Uuid}
import org.apache.kafka.image.{LocalReplicaChanges, MetadataImage, TopicsDelta}
import org.apache.kafka.metadata.LeaderConstants.NO_LEADER
@ -58,7 +58,7 @@ import org.apache.kafka.server.share.fetch.{DelayedShareFetchKey, DelayedShareFe
import org.apache.kafka.server.storage.log.{FetchParams, FetchPartitionData}
import org.apache.kafka.server.util.{Scheduler, ShutdownableThread}
import org.apache.kafka.storage.internals.checkpoint.{LazyOffsetCheckpoints, OffsetCheckpointFile, OffsetCheckpoints}
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, LeaderHwChange, LogAppendInfo, LogConfig, LogDirFailureChannel, LogOffsetMetadata, LogReadInfo, OffsetResultHolder, RecordValidationException, RemoteLogReadResult, RemoteStorageFetchInfo, UnifiedLog => JUnifiedLog, VerificationGuard}
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, LeaderHwChange, LogAppendInfo, LogConfig, LogDirFailureChannel, LogOffsetMetadata, LogReadInfo, OffsetResultHolder, RecordValidationException, RemoteLogReadResult, RemoteStorageFetchInfo, VerificationGuard, UnifiedLog => JUnifiedLog}
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
import java.io.File
@ -2585,7 +2585,7 @@ class ReplicaManager(val config: KafkaConfig,
private def createReplicaSelector(): Option[ReplicaSelector] = {
config.replicaSelectorClassName.map { className =>
val tmpReplicaSelector: ReplicaSelector = CoreUtils.createObject[ReplicaSelector](className)
val tmpReplicaSelector: ReplicaSelector = Utils.newInstance(className, classOf[ReplicaSelector])
tmpReplicaSelector.configure(config.originals())
tmpReplicaSelector
}

View File

@ -21,7 +21,6 @@ import com.fasterxml.jackson.databind.JsonNode
import java.io._
import com.fasterxml.jackson.databind.node.{IntNode, JsonNodeFactory, ObjectNode, TextNode}
import kafka.utils.CoreUtils
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol
import org.apache.kafka.common.message.ConsumerProtocolAssignment
import org.apache.kafka.common.message.ConsumerProtocolAssignmentJsonConverter
@ -646,8 +645,8 @@ object DumpLogSegments {
} else if (options.has(shareStateOpt)) {
new ShareGroupStateMessageParser
} else {
val valueDecoder = CoreUtils.createObject[org.apache.kafka.tools.api.Decoder[_]](options.valueOf(valueDecoderOpt))
val keyDecoder = CoreUtils.createObject[org.apache.kafka.tools.api.Decoder[_]](options.valueOf(keyDecoderOpt))
val valueDecoder = Utils.newInstance(options.valueOf(valueDecoderOpt), classOf[Decoder[_]])
val keyDecoder = Utils.newInstance(options.valueOf(keyDecoderOpt), classOf[Decoder[_]])
new DecoderMessageParser(keyDecoder, valueDecoder)
}

View File

@ -17,14 +17,12 @@
package kafka.utils
import java.io._
import java.nio._
import java.io.File
import java.util.concurrent.locks.{Lock, ReadWriteLock}
import java.lang.management._
import java.util.{Base64, Properties, UUID}
import java.lang.management.ManagementFactory
import com.typesafe.scalalogging.Logger
import javax.management._
import javax.management.ObjectName
import scala.collection._
import scala.collection.Seq
import org.apache.kafka.network.EndPoint
@ -35,7 +33,6 @@ import org.apache.kafka.common.utils.Utils
import org.apache.kafka.network.SocketServerConfigs
import org.slf4j.event.Level
import java.util
import scala.jdk.CollectionConverters._
/**
@ -109,15 +106,6 @@ object CoreUtils {
}
}
/**
* Create an instance of the class with the given class name
*/
def createObject[T <: AnyRef](className: String, args: AnyRef*): T = {
val klass = Utils.loadClass(className, classOf[Object]).asInstanceOf[Class[T]]
val constructor = klass.getConstructor(args.map(_.getClass): _*)
constructor.newInstance(args: _*)
}
/**
* Execute the given function inside the lock
*/
@ -134,16 +122,6 @@ object CoreUtils {
def inWriteLock[T](lock: ReadWriteLock)(fun: => T): T = inLock[T](lock.writeLock)(fun)
/**
* Returns a list of duplicated items
*/
def duplicates[T](s: Iterable[T]): Iterable[T] = {
s.groupBy(identity)
.map { case (k, l) => (k, l.size)}
.filter { case (_, l) => l > 1 }
.keys
}
def listenerListToEndPoints(listeners: String, securityProtocolMap: Map[ListenerName, SecurityProtocol]): Seq[EndPoint] = {
listenerListToEndPoints(listeners, securityProtocolMap, requireDistinctPorts = true)
}
@ -217,31 +195,4 @@ object CoreUtils {
validate(endPoints)
endPoints
}
def generateUuidAsBase64(): String = {
val uuid = UUID.randomUUID()
Base64.getUrlEncoder.withoutPadding.encodeToString(getBytesFromUuid(uuid))
}
def getBytesFromUuid(uuid: UUID): Array[Byte] = {
// Extract bytes for uuid which is 128 bits (or 16 bytes) long.
val uuidBytes = ByteBuffer.wrap(new Array[Byte](16))
uuidBytes.putLong(uuid.getMostSignificantBits)
uuidBytes.putLong(uuid.getLeastSignificantBits)
uuidBytes.array
}
def propsWith(key: String, value: String): Properties = {
propsWith((key, value))
}
def propsWith(props: (String, String)*): Properties = {
val properties = new Properties()
props.foreach { case (k, v) => properties.put(k, v) }
properties
}
def replicaToBrokerAssignmentAsScala(map: util.Map[Integer, util.List[Integer]]): Map[Int, Seq[Int]] = {
map.asScala.map(e => (e._1.asInstanceOf[Int], e._2.asScala.map(_.asInstanceOf[Int])))
}
}

View File

@ -518,9 +518,9 @@ class DynamicConfigChangeUnitTest {
@Test
def shouldParseRegardlessOfWhitespaceAroundValues(): Unit = {
def parse(configHandler: TopicConfigHandler, value: String): Seq[Int] = {
configHandler.parseThrottledPartitions(
CoreUtils.propsWith(QuotaConfig.LEADER_REPLICATION_THROTTLED_REPLICAS_CONFIG, value),
102, QuotaConfig.LEADER_REPLICATION_THROTTLED_REPLICAS_CONFIG)
val props = new Properties()
props.put(QuotaConfig.LEADER_REPLICATION_THROTTLED_REPLICAS_CONFIG, value)
configHandler.parseThrottledPartitions(props, 102, QuotaConfig.LEADER_REPLICATION_THROTTLED_REPLICAS_CONFIG)
}
val configHandler: TopicConfigHandler = new TopicConfigHandler(null, null, null)
assertEquals(ReplicationQuotaManager.ALL_REPLICAS.asScala.map(_.toInt).toSeq, parse(configHandler, "* "))

View File

@ -17,16 +17,12 @@
package kafka.utils
import java.util
import java.util.{Base64, UUID}
import java.util.concurrent.locks.ReentrantLock
import java.nio.ByteBuffer
import java.util.regex.Pattern
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
import kafka.utils.CoreUtils.inLock
import org.apache.kafka.common.KafkaException
import org.apache.kafka.common.utils.Utils
import org.slf4j.event.Level
@ -67,23 +63,6 @@ class CoreUtilsTest extends Logging {
assertEquals(Some("test"+Level.ERROR),loggedMessage)
}
@Test
def testReadBytes(): Unit = {
for (testCase <- List("", "a", "abcd")) {
val bytes = testCase.getBytes
assertTrue(util.Arrays.equals(bytes, Utils.readBytes(ByteBuffer.wrap(bytes))))
}
}
@Test
def testAbs(): Unit = {
assertEquals(0, Utils.abs(Integer.MIN_VALUE))
assertEquals(1, Utils.abs(-1))
assertEquals(0, Utils.abs(0))
assertEquals(1, Utils.abs(1))
assertEquals(Integer.MAX_VALUE, Utils.abs(Integer.MAX_VALUE))
}
@Test
def testInLock(): Unit = {
val lock = new ReentrantLock()
@ -94,29 +73,4 @@ class CoreUtilsTest extends Logging {
assertEquals(2, result)
assertFalse(lock.isLocked, "Should be unlocked")
}
@Test
def testUrlSafeBase64EncodeUUID(): Unit = {
// Test a UUID that has no + or / characters in base64 encoding [a149b4a3-06e1-4b49-a8cb-8a9c4a59fa46 ->(base64)-> oUm0owbhS0moy4qcSln6Rg==]
val clusterId1 = Base64.getUrlEncoder.withoutPadding.encodeToString(CoreUtils.getBytesFromUuid(UUID.fromString(
"a149b4a3-06e1-4b49-a8cb-8a9c4a59fa46")))
assertEquals(clusterId1, "oUm0owbhS0moy4qcSln6Rg")
assertEquals(clusterId1.length, 22)
assertTrue(clusterIdPattern.matcher(clusterId1).matches())
// Test a UUID that has + or / characters in base64 encoding [d418ec02-277e-4853-81e6-afe30259daec ->(base64)-> 1BjsAid+SFOB5q/jAlna7A==]
val clusterId2 = Base64.getUrlEncoder.withoutPadding.encodeToString(CoreUtils.getBytesFromUuid(UUID.fromString(
"d418ec02-277e-4853-81e6-afe30259daec")))
assertEquals(clusterId2, "1BjsAid-SFOB5q_jAlna7A")
assertEquals(clusterId2.length, 22)
assertTrue(clusterIdPattern.matcher(clusterId2).matches())
}
@Test
def testGenerateUuidAsBase64(): Unit = {
val clusterId = CoreUtils.generateUuidAsBase64()
assertEquals(clusterId.length, 22)
assertTrue(clusterIdPattern.matcher(clusterId).matches())
}
}