mirror of https://github.com/apache/kafka.git
MINOR: Update Scala to 2.13.3 (#8931)
I had to fix several compiler errors due to deprecation of auto application of `()`. A related Xlint config (`-Xlint:nullary-override`) is no longer valid in 2.13, so we now only enable it for 2.12. The compiler flagged two new inliner warnings that required suppression and the semantics of `&` in `@nowarn` annotations changed, requiring a small change in one of the warning suppressions. I also removed the deprecation of a number of methods in `KafkaZkClient` as they should not have been deprecated in the first place since `KafkaZkClient` is an internal class and we still use these methods in the Controller and so on. This became visible because the Scala compiler now respects Java's `@Deprecated` annotation. Finally, I included a few minor clean-ups (eg using `toBuffer` instead `toList`) when fixing the compilation warnings. Noteworthy bug fixes in Scala 2.13.3: * Fix 2.13-only bug in Java collection converters that caused some operations to perform an extra pass * Fix 2.13.2 performance regression in Vector: restore special cases for small operands in appendedAll and prependedAll * Increase laziness of #:: for LazyList * Fixes related to annotation parsing of @Deprecated from Java sources in mixed compilation Full release notes: https://github.com/scala/scala/releases/tag/v2.13.3 Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>
This commit is contained in:
parent
9c9a79b459
commit
7f90a58b69
|
@ -48,7 +48,7 @@ should_include_file() {
|
|||
base_dir=$(dirname $0)/..
|
||||
|
||||
if [ -z "$SCALA_VERSION" ]; then
|
||||
SCALA_VERSION=2.13.2
|
||||
SCALA_VERSION=2.13.3
|
||||
if [[ -f "$base_dir/gradle.properties" ]]; then
|
||||
SCALA_VERSION=`grep "^scalaVersion=" "$base_dir/gradle.properties" | cut -d= -f 2`
|
||||
fi
|
||||
|
|
|
@ -27,7 +27,7 @@ set BASE_DIR=%CD%
|
|||
popd
|
||||
|
||||
IF ["%SCALA_VERSION%"] EQU [""] (
|
||||
set SCALA_VERSION=2.13.2
|
||||
set SCALA_VERSION=2.13.3
|
||||
)
|
||||
|
||||
IF ["%SCALA_BINARY_VERSION%"] EQU [""] (
|
||||
|
|
|
@ -465,7 +465,6 @@ subprojects {
|
|||
"-Xlint:delayedinit-select",
|
||||
"-Xlint:doc-detached",
|
||||
"-Xlint:missing-interpolator",
|
||||
"-Xlint:nullary-override",
|
||||
"-Xlint:nullary-unit",
|
||||
"-Xlint:option-implicit",
|
||||
"-Xlint:package-object-classes",
|
||||
|
@ -503,6 +502,7 @@ subprojects {
|
|||
if (versions.baseScala == '2.12') {
|
||||
scalaCompileOptions.additionalParameters += [
|
||||
"-Xlint:by-name-right-associative",
|
||||
"-Xlint:nullary-override",
|
||||
"-Xlint:unsound-match"
|
||||
]
|
||||
}
|
||||
|
|
|
@ -77,7 +77,7 @@ object Kafka extends Logging {
|
|||
}
|
||||
|
||||
// attach shutdown handler to catch terminating signals as well as normal termination
|
||||
Exit.addShutdownHook("kafka-shutdown-hook", kafkaServerStartable.shutdown)
|
||||
Exit.addShutdownHook("kafka-shutdown-hook", kafkaServerStartable.shutdown())
|
||||
|
||||
kafkaServerStartable.startup()
|
||||
kafkaServerStartable.awaitShutdown()
|
||||
|
|
|
@ -627,7 +627,7 @@ object ConfigCommand extends Config {
|
|||
}
|
||||
}
|
||||
|
||||
val entities = entityTypes.map(t => Entity(t, if (sortedNames.hasNext) Some(sanitizeName(t, sortedNames.next)) else None))
|
||||
val entities = entityTypes.map(t => Entity(t, if (sortedNames.hasNext) Some(sanitizeName(t, sortedNames.next())) else None))
|
||||
ConfigEntity(entities.head, if (entities.size > 1) Some(entities(1)) else None)
|
||||
}
|
||||
|
||||
|
@ -711,12 +711,12 @@ object ConfigCommand extends Config {
|
|||
(userDefaults, ConfigType.User),
|
||||
(brokerDefaults, ConfigType.Broker))
|
||||
|
||||
private[admin] def entityTypes(): List[String] = {
|
||||
private[admin] def entityTypes: List[String] = {
|
||||
options.valuesOf(entityType).asScala.toList ++
|
||||
(entityFlags ++ entityDefaultsFlags).filter(entity => options.has(entity._1)).map(_._2)
|
||||
}
|
||||
|
||||
private[admin] def entityNames(): List[String] = {
|
||||
private[admin] def entityNames: List[String] = {
|
||||
val namesIterator = options.valuesOf(entityName).iterator
|
||||
options.specs.asScala
|
||||
.filter(spec => spec.options.contains("entity-name") || spec.options.contains("entity-default"))
|
||||
|
|
|
@ -44,6 +44,8 @@ import org.apache.kafka.common.requests.ListOffsetResponse
|
|||
import org.apache.kafka.common.ConsumerGroupState
|
||||
import joptsimple.OptionException
|
||||
|
||||
import scala.annotation.nowarn
|
||||
|
||||
object ConsumerGroupCommand extends Logging {
|
||||
|
||||
def main(args: Array[String]): Unit = {
|
||||
|
@ -566,6 +568,7 @@ object ConsumerGroupCommand extends Logging {
|
|||
/**
|
||||
* Returns the state of the specified consumer group and partition assignment states
|
||||
*/
|
||||
@nowarn("cat=optimizer")
|
||||
def collectGroupOffsets(groupId: String): (Option[String], Option[Seq[PartitionAssignmentState]]) = {
|
||||
collectGroupsOffsets(List(groupId)).getOrElse(groupId, (None, None))
|
||||
}
|
||||
|
|
|
@ -178,7 +178,7 @@ object PreferredReplicaLeaderElectionCommand extends Logging {
|
|||
case Some(partitions) =>
|
||||
partitions.map(_.topic).toSet
|
||||
case None =>
|
||||
zkClient.getAllPartitions().map(_.topic)
|
||||
zkClient.getAllPartitions.map(_.topic)
|
||||
}
|
||||
|
||||
val partitionsFromZk = zkClient.getPartitionsForTopics(topics).flatMap{ case (topic, partitions) =>
|
||||
|
@ -190,7 +190,7 @@ object PreferredReplicaLeaderElectionCommand extends Logging {
|
|||
case Some(partitions) =>
|
||||
partitions.partition(partitionsFromZk.contains)
|
||||
case None =>
|
||||
(zkClient.getAllPartitions(), Set.empty)
|
||||
(zkClient.getAllPartitions, Set.empty)
|
||||
}
|
||||
PreferredReplicaLeaderElectionCommand.writePreferredReplicaElectionData(zkClient, validPartitions)
|
||||
|
||||
|
|
|
@ -1111,7 +1111,7 @@ object ReassignPartitionsCommand extends Logging {
|
|||
// Check for the presence of the legacy partition reassignment ZNode. This actually
|
||||
// won't detect all rebalances... only ones initiated by the legacy method.
|
||||
// This is a limitation of the legacy ZK API.
|
||||
val reassignPartitionsInProgress = zkClient.reassignPartitionsInProgress()
|
||||
val reassignPartitionsInProgress = zkClient.reassignPartitionsInProgress
|
||||
if (reassignPartitionsInProgress) {
|
||||
// Note: older versions of this tool would modify the broker quotas here (but not
|
||||
// topic quotas, for some reason). This behavior wasn't documented in the --execute
|
||||
|
|
|
@ -176,7 +176,7 @@ class ZkSecurityMigrator(zkClient: KafkaZkClient) extends Logging {
|
|||
}
|
||||
|
||||
private def setAclIndividually(path: String): Unit = {
|
||||
val setPromise = Promise[String]
|
||||
val setPromise = Promise[String]()
|
||||
futures.synchronized {
|
||||
futures += setPromise.future
|
||||
}
|
||||
|
@ -184,8 +184,8 @@ class ZkSecurityMigrator(zkClient: KafkaZkClient) extends Logging {
|
|||
}
|
||||
|
||||
private def setAclsRecursively(path: String): Unit = {
|
||||
val setPromise = Promise[String]
|
||||
val childrenPromise = Promise[String]
|
||||
val setPromise = Promise[String]()
|
||||
val childrenPromise = Promise[String]()
|
||||
futures.synchronized {
|
||||
futures += setPromise.future
|
||||
futures += childrenPromise.future
|
||||
|
@ -279,15 +279,15 @@ class ZkSecurityMigrator(zkClient: KafkaZkClient) extends Logging {
|
|||
future match {
|
||||
case Some(a) =>
|
||||
Await.result(a, 6000 millis)
|
||||
futures.synchronized { futures.dequeue }
|
||||
recurse
|
||||
futures.synchronized { futures.dequeue() }
|
||||
recurse()
|
||||
case None =>
|
||||
}
|
||||
}
|
||||
recurse()
|
||||
|
||||
} finally {
|
||||
zkClient.close
|
||||
zkClient.close()
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -117,7 +117,7 @@ class ZkNodeChangeNotificationListener(private val zkClient: KafkaZkClient,
|
|||
}
|
||||
|
||||
class ChangeNotification {
|
||||
def process(): Unit = processNotifications
|
||||
def process(): Unit = processNotifications()
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -143,17 +143,17 @@ class ZkNodeChangeNotificationListener(private val zkClient: KafkaZkClient,
|
|||
private def changeNumber(name: String): Long = name.substring(seqNodePrefix.length).toLong
|
||||
|
||||
class ChangeEventProcessThread(name: String) extends ShutdownableThread(name = name) {
|
||||
override def doWork(): Unit = queue.take().process
|
||||
override def doWork(): Unit = queue.take().process()
|
||||
}
|
||||
|
||||
object ChangeNotificationHandler extends ZNodeChildChangeHandler {
|
||||
override val path: String = seqNodeRoot
|
||||
override def handleChildChange(): Unit = addChangeNotification
|
||||
override def handleChildChange(): Unit = addChangeNotification()
|
||||
}
|
||||
|
||||
object ZkStateChangeHandler extends StateChangeHandler {
|
||||
override val name: String = StateChangeHandlers.zkNodeChangeListenerHandler(seqNodeRoot)
|
||||
override def afterInitializingSession(): Unit = addChangeNotification
|
||||
override def afterInitializingSession(): Unit = addChangeNotification()
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -416,15 +416,13 @@ class ControllerContext {
|
|||
partitionLeadershipInfo.get(partition)
|
||||
}
|
||||
|
||||
def partitionsLeadershipInfo(): Iterable[(TopicPartition, LeaderIsrAndControllerEpoch)] = {
|
||||
def partitionsLeadershipInfo: Map[TopicPartition, LeaderIsrAndControllerEpoch] =
|
||||
partitionLeadershipInfo
|
||||
}
|
||||
|
||||
def partitionsWithLeaders(): Set[TopicPartition] = {
|
||||
partitionLeadershipInfo.keys.filter(tp => !isTopicQueuedUpForDeletion(tp.topic)).toSet
|
||||
}
|
||||
def partitionsWithLeaders: Set[TopicPartition] =
|
||||
partitionLeadershipInfo.keySet.filter(tp => !isTopicQueuedUpForDeletion(tp.topic))
|
||||
|
||||
def partitionsWithOfflineLeader(): Set[TopicPartition] = {
|
||||
def partitionsWithOfflineLeader: Set[TopicPartition] = {
|
||||
partitionLeadershipInfo.filter { case (topicPartition, leaderIsrAndControllerEpoch) =>
|
||||
!isReplicaOnline(leaderIsrAndControllerEpoch.leaderAndIsr.leader, topicPartition) &&
|
||||
!isTopicQueuedUpForDeletion(topicPartition.topic)
|
||||
|
@ -439,13 +437,9 @@ class ControllerContext {
|
|||
}.keySet
|
||||
}
|
||||
|
||||
def clearPartitionLeadershipInfo(): Unit = {
|
||||
partitionLeadershipInfo.clear()
|
||||
}
|
||||
def clearPartitionLeadershipInfo(): Unit = partitionLeadershipInfo.clear()
|
||||
|
||||
def partitionWithLeadersCount(): Int = {
|
||||
partitionLeadershipInfo.size
|
||||
}
|
||||
def partitionWithLeadersCount: Int = partitionLeadershipInfo.size
|
||||
|
||||
private def updatePreferredReplicaImbalanceMetric(partition: TopicPartition,
|
||||
oldReplicaAssignment: Option[ReplicaAssignment],
|
||||
|
|
|
@ -260,7 +260,7 @@ class KafkaController(val config: KafkaConfig,
|
|||
info("starting the token expiry check scheduler")
|
||||
tokenCleanScheduler.startup()
|
||||
tokenCleanScheduler.schedule(name = "delete-expired-tokens",
|
||||
fun = () => tokenManager.expireTokens,
|
||||
fun = () => tokenManager.expireTokens(),
|
||||
period = config.delegationTokenExpiryCheckIntervalMs,
|
||||
unit = TimeUnit.MILLISECONDS)
|
||||
}
|
||||
|
@ -439,7 +439,7 @@ class KafkaController(val config: KafkaConfig,
|
|||
val (newOfflineReplicasForDeletion, newOfflineReplicasNotForDeletion) =
|
||||
newOfflineReplicas.partition(p => topicDeletionManager.isTopicQueuedUpForDeletion(p.topic))
|
||||
|
||||
val partitionsWithOfflineLeader = controllerContext.partitionsWithOfflineLeader()
|
||||
val partitionsWithOfflineLeader = controllerContext.partitionsWithOfflineLeader
|
||||
|
||||
// trigger OfflinePartition state for all partitions whose current leader is one amongst the newOfflineReplicas
|
||||
partitionStateMachine.handleStateChanges(partitionsWithOfflineLeader.toSeq, OfflinePartition)
|
||||
|
@ -931,10 +931,10 @@ class KafkaController(val config: KafkaConfig,
|
|||
* @param shouldRemoveReassignment Predicate indicating which partition reassignments should be removed
|
||||
*/
|
||||
private def maybeRemoveFromZkReassignment(shouldRemoveReassignment: (TopicPartition, Seq[Int]) => Boolean): Unit = {
|
||||
if (!zkClient.reassignPartitionsInProgress())
|
||||
if (!zkClient.reassignPartitionsInProgress)
|
||||
return
|
||||
|
||||
val reassigningPartitions = zkClient.getPartitionReassignment()
|
||||
val reassigningPartitions = zkClient.getPartitionReassignment
|
||||
val (removingPartitions, updatedPartitionsBeingReassigned) = reassigningPartitions.partition { case (tp, replicas) =>
|
||||
shouldRemoveReassignment(tp, replicas)
|
||||
}
|
||||
|
@ -1516,7 +1516,7 @@ class KafkaController(val config: KafkaConfig,
|
|||
val reassignmentResults = mutable.Map.empty[TopicPartition, ApiError]
|
||||
val partitionsToReassign = mutable.Map.empty[TopicPartition, ReplicaAssignment]
|
||||
|
||||
zkClient.getPartitionReassignment().foreach { case (tp, targetReplicas) =>
|
||||
zkClient.getPartitionReassignment.foreach { case (tp, targetReplicas) =>
|
||||
maybeBuildReassignment(tp, Some(targetReplicas)) match {
|
||||
case Some(context) => partitionsToReassign.put(tp, context)
|
||||
case None => reassignmentResults.put(tp, new ApiError(Errors.NO_REASSIGNMENT_IN_PROGRESS))
|
||||
|
|
|
@ -174,7 +174,7 @@ class GroupMetadataManager(brokerId: Int,
|
|||
scheduler.startup()
|
||||
if (enableMetadataExpiration) {
|
||||
scheduler.schedule(name = "delete-expired-group-metadata",
|
||||
fun = () => cleanupGroupMetadata,
|
||||
fun = () => cleanupGroupMetadata(),
|
||||
period = config.offsetsRetentionCheckIntervalMs,
|
||||
unit = TimeUnit.MILLISECONDS)
|
||||
}
|
||||
|
@ -752,7 +752,7 @@ class GroupMetadataManager(brokerId: Int,
|
|||
onGroupUnloaded: GroupMetadata => Unit): Unit = {
|
||||
val topicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, offsetsPartition)
|
||||
info(s"Scheduling unloading of offsets and group metadata from $topicPartition")
|
||||
scheduler.schedule(topicPartition.toString, () => removeGroupsAndOffsets)
|
||||
scheduler.schedule(topicPartition.toString, () => removeGroupsAndOffsets())
|
||||
|
||||
def removeGroupsAndOffsets(): Unit = {
|
||||
var numOffsetsRemoved = 0
|
||||
|
|
|
@ -785,7 +785,7 @@ class Log(@volatile private var _dir: File,
|
|||
var truncated = false
|
||||
|
||||
while (unflushed.hasNext && !truncated) {
|
||||
val segment = unflushed.next
|
||||
val segment = unflushed.next()
|
||||
info(s"Recovering unflushed segment ${segment.baseOffset}")
|
||||
val truncatedBytes =
|
||||
try {
|
||||
|
@ -2270,7 +2270,7 @@ class Log(@volatile private var _dir: File,
|
|||
|
||||
if (asyncDelete) {
|
||||
info(s"Scheduling segments for deletion ${segments.mkString(",")}")
|
||||
scheduler.schedule("delete-file", () => deleteSegments, delay = config.fileDeleteDelayMs)
|
||||
scheduler.schedule("delete-file", () => deleteSegments(), delay = config.fileDeleteDelayMs)
|
||||
} else {
|
||||
deleteSegments()
|
||||
}
|
||||
|
|
|
@ -318,7 +318,7 @@ class LogManager(logDirs: Seq[File],
|
|||
|
||||
var recoveryPoints = Map[TopicPartition, Long]()
|
||||
try {
|
||||
recoveryPoints = this.recoveryPointCheckpoints(dir).read
|
||||
recoveryPoints = this.recoveryPointCheckpoints(dir).read()
|
||||
} catch {
|
||||
case e: Exception =>
|
||||
warn(s"Error occurred while reading recovery-point-offset-checkpoint file of directory $dir", e)
|
||||
|
@ -327,7 +327,7 @@ class LogManager(logDirs: Seq[File],
|
|||
|
||||
var logStartOffsets = Map[TopicPartition, Long]()
|
||||
try {
|
||||
logStartOffsets = this.logStartOffsetCheckpoints(dir).read
|
||||
logStartOffsets = this.logStartOffsetCheckpoints(dir).read()
|
||||
} catch {
|
||||
case e: Exception =>
|
||||
warn(s"Error occurred while reading log-start-offset-checkpoint file of directory $dir", e)
|
||||
|
@ -1039,7 +1039,7 @@ class LogManager(logDirs: Seq[File],
|
|||
debug(s"Checking if flush is needed on ${topicPartition.topic} flush interval ${log.config.flushMs}" +
|
||||
s" last flushed ${log.lastFlushTime} time since last flush: $timeSinceLastFlush")
|
||||
if(timeSinceLastFlush >= log.config.flushMs)
|
||||
log.flush
|
||||
log.flush()
|
||||
} catch {
|
||||
case e: Throwable =>
|
||||
error(s"Error flushing topic ${topicPartition.topic}", e)
|
||||
|
|
|
@ -68,7 +68,7 @@ object AclAuthorizer {
|
|||
def find(p: AclEntry => Boolean): Option[AclEntry] = {
|
||||
// Lazily iterate through the inner `Seq` elements and stop as soon as we find a match
|
||||
val it = seqs.iterator.flatMap(_.find(p))
|
||||
if (it.hasNext) Some(it.next)
|
||||
if (it.hasNext) Some(it.next())
|
||||
else None
|
||||
}
|
||||
|
||||
|
@ -367,7 +367,8 @@ class AclAuthorizer extends Authorizer with Logging {
|
|||
} else false
|
||||
}
|
||||
|
||||
@nowarn("cat=deprecation&cat=optimizer")
|
||||
@nowarn("cat=deprecation")
|
||||
@nowarn("cat=optimizer")
|
||||
private def matchingAcls(resourceType: ResourceType, resourceName: String): AclSeqs = {
|
||||
// this code is performance sensitive, make sure to run AclAuthorizerBenchmark after any changes
|
||||
|
||||
|
@ -538,6 +539,7 @@ class AclAuthorizer extends Authorizer with Logging {
|
|||
}
|
||||
}
|
||||
|
||||
@nowarn("cat=optimizer")
|
||||
private def getAclsFromCache(resource: ResourcePattern): VersionedAcls = {
|
||||
aclCache.getOrElse(resource, throw new IllegalArgumentException(s"ACLs do not exist in the cache for resource $resource"))
|
||||
}
|
||||
|
@ -548,9 +550,9 @@ class AclAuthorizer extends Authorizer with Logging {
|
|||
|
||||
private def updateCache(resource: ResourcePattern, versionedAcls: VersionedAcls): Unit = {
|
||||
if (versionedAcls.acls.nonEmpty) {
|
||||
aclCache = aclCache + (resource -> versionedAcls)
|
||||
aclCache = aclCache.updated(resource, versionedAcls)
|
||||
} else {
|
||||
aclCache = aclCache - resource
|
||||
aclCache -= resource
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -651,7 +651,7 @@ abstract class AbstractFetcherThread(name: String,
|
|||
} finally partitionMapLock.unlock()
|
||||
}
|
||||
|
||||
def partitionCount(): Int = {
|
||||
def partitionCount: Int = {
|
||||
partitionMapLock.lockInterruptibly()
|
||||
try partitionStates.size
|
||||
finally partitionMapLock.unlock()
|
||||
|
|
|
@ -185,10 +185,10 @@ class DelegationTokenManager(val config: KafkaConfig,
|
|||
|
||||
def startup() = {
|
||||
if (config.tokenAuthEnabled) {
|
||||
zkClient.createDelegationTokenPaths
|
||||
loadCache
|
||||
zkClient.createDelegationTokenPaths()
|
||||
loadCache()
|
||||
tokenChangeListener = new ZkNodeChangeNotificationListener(zkClient, DelegationTokenChangeNotificationZNode.path, DelegationTokenChangeNotificationSequenceZNode.SequenceNumberPrefix, TokenChangedNotificationHandler)
|
||||
tokenChangeListener.init
|
||||
tokenChangeListener.init()
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -267,7 +267,7 @@ class DelegationTokenManager(val config: KafkaConfig,
|
|||
responseCallback(CreateTokenResult(-1, -1, -1, "", Array[Byte](), Errors.DELEGATION_TOKEN_AUTH_DISABLED))
|
||||
} else {
|
||||
lock.synchronized {
|
||||
val tokenId = CoreUtils.generateUuidAsBase64
|
||||
val tokenId = CoreUtils.generateUuidAsBase64()
|
||||
|
||||
val issueTimeStamp = time.milliseconds
|
||||
val maxLifeTime = if (maxLifeTimeMs <= 0) tokenMaxLifetime else Math.min(maxLifeTimeMs, tokenMaxLifetime)
|
||||
|
@ -464,16 +464,10 @@ class DelegationTokenManager(val config: KafkaConfig,
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
def getAllTokenInformation(): List[TokenInformation] = {
|
||||
tokenCache.tokens.asScala.toList
|
||||
}
|
||||
def getAllTokenInformation: List[TokenInformation] = tokenCache.tokens.asScala.toList
|
||||
|
||||
def getTokens(filterToken: TokenInformation => Boolean): List[DelegationToken] = {
|
||||
getAllTokenInformation().filter(filterToken).map(token => getToken(token))
|
||||
getAllTokenInformation.filter(filterToken).map(token => getToken(token))
|
||||
}
|
||||
|
||||
object TokenChangedNotificationHandler extends NotificationHandler {
|
||||
|
|
|
@ -1528,7 +1528,7 @@ class KafkaApis(val requestChannel: RequestChannel,
|
|||
Option(syncGroupRequest.data.protocolType),
|
||||
Option(syncGroupRequest.data.protocolName),
|
||||
Option(syncGroupRequest.data.groupInstanceId),
|
||||
assignmentMap.result,
|
||||
assignmentMap.result(),
|
||||
sendResponseCallback
|
||||
)
|
||||
}
|
||||
|
|
|
@ -1210,7 +1210,7 @@ object KafkaConfig {
|
|||
.define(PasswordEncoderIterationsProp, INT, Defaults.PasswordEncoderIterations, atLeast(1024), LOW, PasswordEncoderIterationsDoc)
|
||||
}
|
||||
|
||||
def configNames() = configDef.names().asScala.toList.sorted
|
||||
def configNames: Seq[String] = configDef.names.asScala.toBuffer.sorted
|
||||
private[server] def defaultValues: Map[String, _] = configDef.defaultValues.asScala
|
||||
private[server] def configKeys: Map[String, ConfigKey] = configDef.configKeys.asScala
|
||||
|
||||
|
|
|
@ -457,7 +457,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
|
|||
}
|
||||
|
||||
private def getOrGenerateClusterId(zkClient: KafkaZkClient): String = {
|
||||
zkClient.getClusterId.getOrElse(zkClient.createOrGetClusterId(CoreUtils.generateUuidAsBase64))
|
||||
zkClient.getClusterId.getOrElse(zkClient.createOrGetClusterId(CoreUtils.generateUuidAsBase64()))
|
||||
}
|
||||
|
||||
private[server] def createBrokerInfo: BrokerInfo = {
|
||||
|
@ -745,7 +745,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
|
|||
*/
|
||||
def awaitShutdown(): Unit = shutdownLatch.await()
|
||||
|
||||
def getLogManager(): LogManager = logManager
|
||||
def getLogManager: LogManager = logManager
|
||||
|
||||
def boundPort(listenerName: ListenerName): Int = socketServer.boundPort(listenerName)
|
||||
|
||||
|
|
|
@ -49,9 +49,9 @@ object QuotaFactory extends Logging {
|
|||
alterLogDirs: ReplicationQuotaManager,
|
||||
clientQuotaCallback: Option[ClientQuotaCallback]) {
|
||||
def shutdown(): Unit = {
|
||||
fetch.shutdown
|
||||
produce.shutdown
|
||||
request.shutdown
|
||||
fetch.shutdown()
|
||||
produce.shutdown()
|
||||
request.shutdown()
|
||||
clientQuotaCallback.foreach(_.close())
|
||||
}
|
||||
}
|
||||
|
|
|
@ -174,10 +174,10 @@ class ReplicationQuotaManager(val config: ReplicationQuotaManagerConfig,
|
|||
*
|
||||
* @return
|
||||
*/
|
||||
def upperBound(): Long = {
|
||||
def upperBound: Long = {
|
||||
inReadLock(lock) {
|
||||
if (quota != null)
|
||||
quota.bound().toLong
|
||||
quota.bound.toLong
|
||||
else
|
||||
Long.MaxValue
|
||||
}
|
||||
|
|
|
@ -227,9 +227,7 @@ class LeaderEpochFileCache(topicPartition: TopicPartition,
|
|||
}
|
||||
|
||||
// Visible for testing
|
||||
def epochEntries: Seq[EpochEntry] = {
|
||||
epochs
|
||||
}
|
||||
def epochEntries: Seq[EpochEntry] = epochs
|
||||
|
||||
private def latestEntry: Option[EpochEntry] = epochs.lastOption
|
||||
|
||||
|
|
|
@ -334,14 +334,14 @@ private class ReplicaBuffer(expectedReplicasPerTopicPartition: collection.Map[To
|
|||
MessageInfo(replicaId, batch.lastOffset, batch.nextOffset, batch.checksum))
|
||||
case Some(messageInfoFromFirstReplica) =>
|
||||
if (messageInfoFromFirstReplica.offset != batch.lastOffset) {
|
||||
println(ReplicaVerificationTool.getCurrentTimeString + ": partition " + topicPartition
|
||||
println(ReplicaVerificationTool.getCurrentTimeString() + ": partition " + topicPartition
|
||||
+ ": replica " + messageInfoFromFirstReplica.replicaId + "'s offset "
|
||||
+ messageInfoFromFirstReplica.offset + " doesn't match replica "
|
||||
+ replicaId + "'s offset " + batch.lastOffset)
|
||||
Exit.exit(1)
|
||||
}
|
||||
if (messageInfoFromFirstReplica.checksum != batch.checksum)
|
||||
println(ReplicaVerificationTool.getCurrentTimeString + ": partition "
|
||||
println(ReplicaVerificationTool.getCurrentTimeString() + ": partition "
|
||||
+ topicPartition + " has unmatched checksum at offset " + batch.lastOffset + "; replica "
|
||||
+ messageInfoFromFirstReplica.replicaId + "'s checksum " + messageInfoFromFirstReplica.checksum
|
||||
+ "; replica " + replicaId + "'s checksum " + batch.checksum)
|
||||
|
|
|
@ -137,7 +137,7 @@ object StateChangeLogMerger extends Logging {
|
|||
*/
|
||||
val pqueue = new mutable.PriorityQueue[LineIterator]()(dateBasedOrdering)
|
||||
val output: OutputStream = new BufferedOutputStream(System.out, 1024*1024)
|
||||
val lineIterators = files.map(scala.io.Source.fromFile(_).getLines)
|
||||
val lineIterators = files.map(scala.io.Source.fromFile(_).getLines())
|
||||
var lines: List[LineIterator] = List()
|
||||
|
||||
for (itr <- lineIterators) {
|
||||
|
@ -166,7 +166,7 @@ object StateChangeLogMerger extends Logging {
|
|||
*/
|
||||
def getNextLine(itr: Iterator[String]): LineIterator = {
|
||||
while (itr != null && itr.hasNext) {
|
||||
val nextLine = itr.next
|
||||
val nextLine = itr.next()
|
||||
dateRegex.findFirstIn(nextLine).foreach { d =>
|
||||
val date = dateFormat.parse(d)
|
||||
if ((date.equals(startDate) || date.after(startDate)) && (date.equals(endDate) || date.before(endDate))) {
|
||||
|
|
|
@ -100,7 +100,7 @@ object DecodeJson {
|
|||
private def decodeIterator[S, T, C](it: Iterator[S])(f: S => Either[String, T])(implicit factory: Factory[T, C]): Either[String, C] = {
|
||||
val result = factory.newBuilder
|
||||
while (it.hasNext) {
|
||||
f(it.next) match {
|
||||
f(it.next()) match {
|
||||
case Right(x) => result += x
|
||||
case Left(x) => return Left(x)
|
||||
}
|
||||
|
|
|
@ -105,7 +105,7 @@ class SystemTimer(executorName: String,
|
|||
writeLock.lock()
|
||||
try {
|
||||
while (bucket != null) {
|
||||
timingWheel.advanceClock(bucket.getExpiration())
|
||||
timingWheel.advanceClock(bucket.getExpiration)
|
||||
bucket.flush(reinsert)
|
||||
bucket = delayQueue.poll()
|
||||
}
|
||||
|
|
|
@ -40,8 +40,6 @@ trait TimerTask extends Runnable {
|
|||
}
|
||||
}
|
||||
|
||||
private[timer] def getTimerTaskEntry(): TimerTaskEntry = {
|
||||
timerTaskEntry
|
||||
}
|
||||
private[timer] def getTimerTaskEntry: TimerTaskEntry = timerTaskEntry
|
||||
|
||||
}
|
||||
|
|
|
@ -43,9 +43,7 @@ private[timer] class TimerTaskList(taskCounter: AtomicInteger) extends Delayed {
|
|||
}
|
||||
|
||||
// Get the bucket's expiration time
|
||||
def getExpiration(): Long = {
|
||||
expiration.get()
|
||||
}
|
||||
def getExpiration: Long = expiration.get
|
||||
|
||||
// Apply the supplied function to each of tasks in this list
|
||||
def foreach(f: (TimerTask)=>Unit): Unit = {
|
||||
|
|
|
@ -81,7 +81,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
|
|||
private[kafka] def createSequentialPersistentPath(path: String, data: Array[Byte]): String = {
|
||||
val createRequest = CreateRequest(path, data, defaultAcls(path), CreateMode.PERSISTENT_SEQUENTIAL)
|
||||
val createResponse = retryRequestUntilConnected(createRequest)
|
||||
createResponse.maybeThrow
|
||||
createResponse.maybeThrow()
|
||||
createResponse.name
|
||||
}
|
||||
|
||||
|
@ -364,7 +364,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
|
|||
val path = ConfigEntityZNode.path(rootEntityType, sanitizedEntityName)
|
||||
try createRecursive(path, configData)
|
||||
catch {
|
||||
case _: NodeExistsException => set(configData).maybeThrow
|
||||
case _: NodeExistsException => set(configData).maybeThrow()
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -373,7 +373,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
|
|||
val setDataResponse = set(configData)
|
||||
setDataResponse.resultCode match {
|
||||
case Code.NONODE => createOrSet(configData)
|
||||
case _ => setDataResponse.maybeThrow
|
||||
case _ => setDataResponse.maybeThrow()
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -506,7 +506,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
|
|||
assignment: Map[TopicPartition, ReplicaAssignment],
|
||||
expectedControllerEpochZkVersion: Int = ZkVersion.MatchAnyVersion) = {
|
||||
val setDataResponse = setTopicAssignmentRaw(topic, assignment, expectedControllerEpochZkVersion)
|
||||
setDataResponse.maybeThrow
|
||||
setDataResponse.maybeThrow()
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -561,7 +561,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
|
|||
if (getChildrenResponse.resultCode == Code.OK) {
|
||||
deleteLogDirEventNotifications(getChildrenResponse.children.map(LogDirEventNotificationSequenceZNode.sequenceNumber), expectedControllerEpochZkVersion)
|
||||
} else if (getChildrenResponse.resultCode != Code.NONODE) {
|
||||
getChildrenResponse.maybeThrow
|
||||
getChildrenResponse.maybeThrow()
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -666,7 +666,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
|
|||
* Gets all partitions in the cluster
|
||||
* @return all partitions in the cluster
|
||||
*/
|
||||
def getAllPartitions(): Set[TopicPartition] = {
|
||||
def getAllPartitions: Set[TopicPartition] = {
|
||||
val topics = getChildren(TopicsZNode.path)
|
||||
if (topics == null) Set.empty
|
||||
else {
|
||||
|
@ -810,10 +810,8 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
|
|||
/**
|
||||
* Returns all reassignments.
|
||||
* @return the reassignments for each partition.
|
||||
* @deprecated Use the PartitionReassignment Kafka API instead
|
||||
*/
|
||||
@Deprecated
|
||||
def getPartitionReassignment(): collection.Map[TopicPartition, Seq[Int]] = {
|
||||
def getPartitionReassignment: collection.Map[TopicPartition, Seq[Int]] = {
|
||||
val getDataRequest = GetDataRequest(ReassignPartitionsZNode.path)
|
||||
val getDataResponse = retryRequestUntilConnected(getDataRequest)
|
||||
getDataResponse.resultCode match {
|
||||
|
@ -857,18 +855,16 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
|
|||
setDataResponse.resultCode match {
|
||||
case Code.NONODE =>
|
||||
val createDataResponse = create(reassignmentData)
|
||||
createDataResponse.maybeThrow
|
||||
case _ => setDataResponse.maybeThrow
|
||||
createDataResponse.maybeThrow()
|
||||
case _ => setDataResponse.maybeThrow()
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates the partition reassignment znode with the given reassignment.
|
||||
* @param reassignment the reassignment to set on the reassignment znode.
|
||||
* @throws KeeperException if there is an error while creating the znode
|
||||
* @deprecated Use the PartitionReassignment Kafka API instead
|
||||
* @throws KeeperException if there is an error while creating the znode.
|
||||
*/
|
||||
@Deprecated
|
||||
def createPartitionReassignment(reassignment: Map[TopicPartition, Seq[Int]]) = {
|
||||
createRecursive(ReassignPartitionsZNode.path, ReassignPartitionsZNode.encode(reassignment))
|
||||
}
|
||||
|
@ -876,20 +872,16 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
|
|||
/**
|
||||
* Deletes the partition reassignment znode.
|
||||
* @param expectedControllerEpochZkVersion expected controller epoch zkVersion.
|
||||
* @deprecated Use the PartitionReassignment Kafka API instead
|
||||
*/
|
||||
@Deprecated
|
||||
def deletePartitionReassignment(expectedControllerEpochZkVersion: Int): Unit = {
|
||||
deletePath(ReassignPartitionsZNode.path, expectedControllerEpochZkVersion)
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if reassign partitions is in progress
|
||||
* @return true if reassign partitions is in progress, else false
|
||||
* @deprecated Use the PartitionReassignment Kafka API instead
|
||||
* Checks if reassign partitions is in progress.
|
||||
* @return true if reassign partitions is in progress, else false.
|
||||
*/
|
||||
@Deprecated
|
||||
def reassignPartitionsInProgress(): Boolean = {
|
||||
def reassignPartitionsInProgress: Boolean = {
|
||||
pathExists(ReassignPartitionsZNode.path)
|
||||
}
|
||||
|
||||
|
@ -993,7 +985,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
|
|||
if (getChildrenResponse.resultCode == Code.OK) {
|
||||
deleteIsrChangeNotifications(getChildrenResponse.children.map(IsrChangeNotificationSequenceZNode.sequenceNumber), expectedControllerEpochZkVersion)
|
||||
} else if (getChildrenResponse.resultCode != Code.NONODE) {
|
||||
getChildrenResponse.maybeThrow
|
||||
getChildrenResponse.maybeThrow()
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1183,7 +1175,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
|
|||
val aclChange = ZkAclStore(resource.patternType).changeStore.createChangeNode(resource)
|
||||
val createRequest = CreateRequest(aclChange.path, aclChange.bytes, defaultAcls(aclChange.path), CreateMode.PERSISTENT_SEQUENTIAL)
|
||||
val createResponse = retryRequestUntilConnected(createRequest)
|
||||
createResponse.maybeThrow
|
||||
createResponse.maybeThrow()
|
||||
}
|
||||
|
||||
def propagateLogDirEvent(brokerId: Int): Unit = {
|
||||
|
@ -1209,7 +1201,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
|
|||
if (getChildrenResponse.resultCode == Code.OK) {
|
||||
deleteAclChangeNotifications(store.aclChangePath, getChildrenResponse.children)
|
||||
} else if (getChildrenResponse.resultCode != Code.NONODE) {
|
||||
getChildrenResponse.maybeThrow
|
||||
getChildrenResponse.maybeThrow()
|
||||
}
|
||||
})
|
||||
}
|
||||
|
@ -1228,7 +1220,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
|
|||
val deleteResponses = retryRequestsUntilConnected(deleteRequests)
|
||||
deleteResponses.foreach { deleteResponse =>
|
||||
if (deleteResponse.resultCode != Code.NONODE) {
|
||||
deleteResponse.maybeThrow
|
||||
deleteResponse.maybeThrow()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1349,8 +1341,8 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
|
|||
setDataResponse.resultCode match {
|
||||
case Code.NONODE =>
|
||||
val createDataResponse = create(tokenInfo)
|
||||
createDataResponse.maybeThrow
|
||||
case _ => setDataResponse.maybeThrow
|
||||
createDataResponse.maybeThrow()
|
||||
case _ => setDataResponse.maybeThrow()
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1478,7 +1470,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
|
|||
if (setDataResponse.resultCode == Code.NONODE) {
|
||||
createConsumerOffset(group, topicPartition, offset)
|
||||
} else {
|
||||
setDataResponse.maybeThrow
|
||||
setDataResponse.maybeThrow()
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1518,7 +1510,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
|
|||
def setAcl(path: String, acl: Seq[ACL]): Unit = {
|
||||
val setAclRequest = SetAclRequest(path, acl, ZkVersion.MatchAnyVersion)
|
||||
val setAclResponse = retryRequestUntilConnected(setAclRequest)
|
||||
setAclResponse.maybeThrow
|
||||
setAclResponse.maybeThrow()
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1575,7 +1567,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
|
|||
defaultAcls(FeatureZNode.path),
|
||||
CreateMode.PERSISTENT)
|
||||
val response = retryRequestUntilConnected(createRequest)
|
||||
response.maybeThrow
|
||||
response.maybeThrow()
|
||||
}
|
||||
|
||||
def updateFeatureZNode(nodeContents: FeatureZNode): Unit = {
|
||||
|
@ -1584,7 +1576,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
|
|||
FeatureZNode.encode(nodeContents),
|
||||
ZkVersion.MatchAnyVersion)
|
||||
val response = retryRequestUntilConnected(setRequest)
|
||||
response.maybeThrow
|
||||
response.maybeThrow()
|
||||
}
|
||||
|
||||
def deleteFeatureZNode(): Unit = {
|
||||
|
@ -1659,14 +1651,14 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
|
|||
var createResponse = retryRequestUntilConnected(createRequest)
|
||||
|
||||
if (throwIfPathExists && createResponse.resultCode == Code.NODEEXISTS) {
|
||||
createResponse.maybeThrow
|
||||
createResponse.maybeThrow()
|
||||
} else if (createResponse.resultCode == Code.NONODE) {
|
||||
createRecursive0(parentPath(path))
|
||||
createResponse = retryRequestUntilConnected(createRequest)
|
||||
if (throwIfPathExists || createResponse.resultCode != Code.NODEEXISTS)
|
||||
createResponse.maybeThrow
|
||||
createResponse.maybeThrow()
|
||||
} else if (createResponse.resultCode != Code.NODEEXISTS)
|
||||
createResponse.maybeThrow
|
||||
createResponse.maybeThrow()
|
||||
|
||||
}
|
||||
|
||||
|
|
|
@ -57,12 +57,12 @@ class ZooKeeperMainWithTlsSupportForKafka(args: Array[String], val zkClientConfi
|
|||
val args = co.getArgArray
|
||||
val cmd = co.getCommand
|
||||
if (args.length < 1) {
|
||||
kafkaTlsUsage
|
||||
kafkaTlsUsage()
|
||||
throw new MalformedCommandException("No command entered")
|
||||
}
|
||||
|
||||
if (!ZooKeeperMain.commandMap.containsKey(cmd)) {
|
||||
kafkaTlsUsage
|
||||
kafkaTlsUsage()
|
||||
throw new CommandNotFoundException(s"Command not found $cmd")
|
||||
}
|
||||
super.processZKCmd(co)
|
||||
|
|
|
@ -42,7 +42,7 @@ class BrokerApiVersionsCommandTest extends KafkaServerTestHarness {
|
|||
val content = new String(byteArrayOutputStream.toByteArray, StandardCharsets.UTF_8)
|
||||
val lineIter = content.split("\n").iterator
|
||||
assertTrue(lineIter.hasNext)
|
||||
assertEquals(s"$brokerList (id: 0 rack: null) -> (", lineIter.next)
|
||||
assertEquals(s"$brokerList (id: 0 rack: null) -> (", lineIter.next())
|
||||
val nodeApiVersions = NodeApiVersions.create
|
||||
for (apiKey <- ApiKeys.values) {
|
||||
val apiVersion = nodeApiVersions.apiVersion(apiKey)
|
||||
|
@ -54,10 +54,10 @@ class BrokerApiVersionsCommandTest extends KafkaServerTestHarness {
|
|||
val usableVersion = nodeApiVersions.latestUsableVersion(apiKey)
|
||||
val line = s"\t${apiKey.name}(${apiKey.id}): $versionRangeStr [usable: $usableVersion]$terminator"
|
||||
assertTrue(lineIter.hasNext)
|
||||
assertEquals(line, lineIter.next)
|
||||
assertEquals(line, lineIter.next())
|
||||
}
|
||||
assertTrue(lineIter.hasNext)
|
||||
assertEquals(")", lineIter.next)
|
||||
assertEquals(")", lineIter.next())
|
||||
assertFalse(lineIter.hasNext)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -49,7 +49,7 @@ class AdminClientWithPoliciesIntegrationTest extends KafkaServerTestHarness with
|
|||
|
||||
@Before
|
||||
override def setUp(): Unit = {
|
||||
super.setUp
|
||||
super.setUp()
|
||||
TestUtils.waitUntilBrokerMetadataIsPropagated(servers)
|
||||
}
|
||||
|
||||
|
|
|
@ -68,7 +68,7 @@ abstract class BaseAdminIntegrationTest extends IntegrationTestHarness with Logg
|
|||
|
||||
@Test
|
||||
def testCreateDeleteTopics(): Unit = {
|
||||
client = Admin.create(createConfig())
|
||||
client = Admin.create(createConfig)
|
||||
val topics = Seq("mytopic", "mytopic2", "mytopic3")
|
||||
val newTopics = Seq(
|
||||
new NewTopic("mytopic", Map((0: Integer) -> Seq[Integer](1, 2).asJava, (1: Integer) -> Seq[Integer](2, 0).asJava).asJava),
|
||||
|
@ -155,7 +155,7 @@ abstract class BaseAdminIntegrationTest extends IntegrationTestHarness with Logg
|
|||
|
||||
@Test
|
||||
def testAuthorizedOperations(): Unit = {
|
||||
client = Admin.create(createConfig())
|
||||
client = Admin.create(createConfig)
|
||||
|
||||
// without includeAuthorizedOperations flag
|
||||
var result = client.describeCluster
|
||||
|
@ -181,9 +181,8 @@ abstract class BaseAdminIntegrationTest extends IntegrationTestHarness with Logg
|
|||
assertEquals(expectedOperations, topicResult.authorizedOperations)
|
||||
}
|
||||
|
||||
def configuredClusterPermissions(): Set[AclOperation] = {
|
||||
def configuredClusterPermissions: Set[AclOperation] =
|
||||
AclEntry.supportedOperations(ResourceType.CLUSTER)
|
||||
}
|
||||
|
||||
override def modifyConfigs(configs: Seq[Properties]): Unit = {
|
||||
super.modifyConfigs(configs)
|
||||
|
@ -199,7 +198,7 @@ abstract class BaseAdminIntegrationTest extends IntegrationTestHarness with Logg
|
|||
}
|
||||
}
|
||||
|
||||
def createConfig(): util.Map[String, Object] = {
|
||||
def createConfig: util.Map[String, Object] = {
|
||||
val config = new util.HashMap[String, Object]
|
||||
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
|
||||
config.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "20000")
|
||||
|
|
|
@ -82,14 +82,14 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
|
|||
|
||||
@Test
|
||||
def testClose(): Unit = {
|
||||
val client = Admin.create(createConfig())
|
||||
val client = Admin.create(createConfig)
|
||||
client.close()
|
||||
client.close() // double close has no effect
|
||||
}
|
||||
|
||||
@Test
|
||||
def testListNodes(): Unit = {
|
||||
client = Admin.create(createConfig())
|
||||
client = Admin.create(createConfig)
|
||||
val brokerStrs = brokerList.split(",").toList.sorted
|
||||
var nodeStrs: List[String] = null
|
||||
do {
|
||||
|
@ -101,7 +101,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
|
|||
|
||||
@Test
|
||||
def testCreateExistingTopicsThrowTopicExistsException(): Unit = {
|
||||
client = Admin.create(createConfig())
|
||||
client = Admin.create(createConfig)
|
||||
val topic = "mytopic"
|
||||
val topics = Seq(topic)
|
||||
val newTopics = Seq(new NewTopic(topic, 1, 1.toShort))
|
||||
|
@ -118,7 +118,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
|
|||
|
||||
@Test
|
||||
def testMetadataRefresh(): Unit = {
|
||||
client = Admin.create(createConfig())
|
||||
client = Admin.create(createConfig)
|
||||
val topics = Seq("mytopic")
|
||||
val newTopics = Seq(new NewTopic("mytopic", 3, 3.toShort))
|
||||
client.createTopics(newTopics.asJava).all.get()
|
||||
|
@ -136,7 +136,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
|
|||
*/
|
||||
@Test
|
||||
def testDescribeNonExistingTopic(): Unit = {
|
||||
client = Admin.create(createConfig())
|
||||
client = Admin.create(createConfig)
|
||||
|
||||
val existingTopic = "existing-topic"
|
||||
client.createTopics(Seq(existingTopic).map(new NewTopic(_, 1, 1.toShort)).asJava).all.get()
|
||||
|
@ -151,7 +151,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
|
|||
|
||||
@Test
|
||||
def testDescribeCluster(): Unit = {
|
||||
client = Admin.create(createConfig())
|
||||
client = Admin.create(createConfig)
|
||||
val result = client.describeCluster
|
||||
val nodes = result.nodes.get()
|
||||
val clusterId = result.clusterId().get()
|
||||
|
@ -169,7 +169,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
|
|||
|
||||
@Test
|
||||
def testDescribeLogDirs(): Unit = {
|
||||
client = Admin.create(createConfig())
|
||||
client = Admin.create(createConfig)
|
||||
val topic = "topic"
|
||||
val leaderByPartition = createTopic(topic, numPartitions = 10, replicationFactor = 1)
|
||||
val partitionsByBroker = leaderByPartition.groupBy { case (_, leaderId) => leaderId }.map { case (k, v) =>
|
||||
|
@ -197,7 +197,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
|
|||
|
||||
@Test
|
||||
def testDescribeReplicaLogDirs(): Unit = {
|
||||
client = Admin.create(createConfig())
|
||||
client = Admin.create(createConfig)
|
||||
val topic = "topic"
|
||||
val leaderByPartition = createTopic(topic, numPartitions = 10, replicationFactor = 1)
|
||||
val replicas = leaderByPartition.map { case (partition, brokerId) =>
|
||||
|
@ -214,7 +214,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
|
|||
|
||||
@Test
|
||||
def testAlterReplicaLogDirs(): Unit = {
|
||||
client = Admin.create(createConfig())
|
||||
client = Admin.create(createConfig)
|
||||
val topic = "topic"
|
||||
val tp = new TopicPartition(topic, 0)
|
||||
val randomNums = servers.map(server => server -> Random.nextInt(2)).toMap
|
||||
|
@ -925,7 +925,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
|
|||
def testAclOperations(): Unit = {
|
||||
val acl = new AclBinding(new ResourcePattern(ResourceType.TOPIC, "mytopic3", PatternType.LITERAL),
|
||||
new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW))
|
||||
client = Admin.create(createConfig())
|
||||
client = Admin.create(createConfig)
|
||||
assertFutureExceptionTypeEquals(client.describeAcls(AclBindingFilter.ANY).values(), classOf[SecurityDisabledException])
|
||||
assertFutureExceptionTypeEquals(client.createAcls(Collections.singleton(acl)).all(),
|
||||
classOf[SecurityDisabledException])
|
||||
|
@ -939,7 +939,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
|
|||
*/
|
||||
@Test
|
||||
def testDelayedClose(): Unit = {
|
||||
client = Admin.create(createConfig())
|
||||
client = Admin.create(createConfig)
|
||||
val topics = Seq("mytopic", "mytopic2")
|
||||
val newTopics = topics.map(new NewTopic(_, 1, 1.toShort))
|
||||
val future = client.createTopics(newTopics.asJava, new CreateTopicsOptions().validateOnly(true)).all()
|
||||
|
@ -956,7 +956,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
|
|||
*/
|
||||
@Test
|
||||
def testForceClose(): Unit = {
|
||||
val config = createConfig()
|
||||
val config = createConfig
|
||||
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, s"localhost:${TestUtils.IncorrectBrokerPort}")
|
||||
client = Admin.create(config)
|
||||
// Because the bootstrap servers are set up incorrectly, this call will not complete, but must be
|
||||
|
@ -973,7 +973,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
|
|||
*/
|
||||
@Test
|
||||
def testMinimumRequestTimeouts(): Unit = {
|
||||
val config = createConfig()
|
||||
val config = createConfig
|
||||
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, s"localhost:${TestUtils.IncorrectBrokerPort}")
|
||||
config.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "0")
|
||||
client = Admin.create(config)
|
||||
|
@ -990,7 +990,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
|
|||
*/
|
||||
@Test
|
||||
def testCallInFlightTimeouts(): Unit = {
|
||||
val config = createConfig()
|
||||
val config = createConfig
|
||||
config.put(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "100000000")
|
||||
val factory = new KafkaAdminClientTest.FailureInjectingTimeoutProcessorFactory()
|
||||
client = KafkaAdminClientTest.createInternal(new AdminClientConfig(config), factory)
|
||||
|
@ -1008,7 +1008,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
|
|||
*/
|
||||
@Test
|
||||
def testConsumerGroups(): Unit = {
|
||||
val config = createConfig()
|
||||
val config = createConfig
|
||||
client = Admin.create(config)
|
||||
try {
|
||||
// Verify that initially there are no consumer groups to list.
|
||||
|
@ -1224,7 +1224,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
|
|||
|
||||
@Test
|
||||
def testDeleteConsumerGroupOffsets(): Unit = {
|
||||
val config = createConfig()
|
||||
val config = createConfig
|
||||
client = Admin.create(config)
|
||||
try {
|
||||
val testTopicName = "test_topic"
|
||||
|
@ -1662,7 +1662,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
|
|||
|
||||
@Test
|
||||
def testListReassignmentsDoesNotShowNonReassigningPartitions(): Unit = {
|
||||
client = Admin.create(createConfig())
|
||||
client = Admin.create(createConfig)
|
||||
|
||||
// Create topics
|
||||
val topic = "list-reassignments-no-reassignments"
|
||||
|
@ -1678,7 +1678,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
|
|||
|
||||
@Test
|
||||
def testListReassignmentsDoesNotShowDeletedPartitions(): Unit = {
|
||||
client = Admin.create(createConfig())
|
||||
client = Admin.create(createConfig)
|
||||
|
||||
val topic = "list-reassignments-no-reassignments"
|
||||
val tp = new TopicPartition(topic, 0)
|
||||
|
@ -1802,7 +1802,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
|
|||
|
||||
@Test
|
||||
def testIncrementalAlterConfigsDeleteAndSetBrokerConfigs(): Unit = {
|
||||
client = Admin.create(createConfig())
|
||||
client = Admin.create(createConfig)
|
||||
val broker0Resource = new ConfigResource(ConfigResource.Type.BROKER, "0")
|
||||
client.incrementalAlterConfigs(Map(broker0Resource ->
|
||||
Seq(new AlterConfigOp(new ConfigEntry(DynamicConfig.Broker.LeaderReplicationThrottledRateProp, "123"),
|
||||
|
@ -1839,7 +1839,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
|
|||
|
||||
@Test
|
||||
def testIncrementalAlterConfigsDeleteBrokerConfigs(): Unit = {
|
||||
client = Admin.create(createConfig())
|
||||
client = Admin.create(createConfig)
|
||||
val broker0Resource = new ConfigResource(ConfigResource.Type.BROKER, "0")
|
||||
client.incrementalAlterConfigs(Map(broker0Resource ->
|
||||
Seq(new AlterConfigOp(new ConfigEntry(DynamicConfig.Broker.LeaderReplicationThrottledRateProp, "123"),
|
||||
|
@ -2029,7 +2029,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
|
|||
assertEquals(ApiVersion.latestVersion, logConfig.messageFormatVersion)
|
||||
}
|
||||
|
||||
client = Admin.create(createConfig())
|
||||
client = Admin.create(createConfig)
|
||||
val invalidConfigs = Map[String, String](LogConfig.MessageFormatVersionProp -> null,
|
||||
LogConfig.CompressionTypeProp -> "producer").asJava
|
||||
val newTopic = new NewTopic(topic, 2, brokerCount.toShort)
|
||||
|
@ -2057,7 +2057,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
|
|||
|
||||
@Test
|
||||
def testDescribeConfigsForLog4jLogLevels(): Unit = {
|
||||
client = Admin.create(createConfig())
|
||||
client = Admin.create(createConfig)
|
||||
|
||||
val loggerConfig = describeBrokerLoggers()
|
||||
val rootLogLevel = loggerConfig.get(Log4jController.ROOT_LOGGER).value()
|
||||
|
@ -2073,7 +2073,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
|
|||
@Test
|
||||
@Ignore // To be re-enabled once KAFKA-8779 is resolved
|
||||
def testIncrementalAlterConfigsForLog4jLogLevels(): Unit = {
|
||||
client = Admin.create(createConfig())
|
||||
client = Admin.create(createConfig)
|
||||
|
||||
val initialLoggerConfig = describeBrokerLoggers()
|
||||
val initialRootLogLevel = initialLoggerConfig.get(Log4jController.ROOT_LOGGER).value()
|
||||
|
@ -2137,7 +2137,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
|
|||
@Test
|
||||
@Ignore // To be re-enabled once KAFKA-8779 is resolved
|
||||
def testIncrementalAlterConfigsForLog4jLogLevelsCanResetLoggerToCurrentRoot(): Unit = {
|
||||
client = Admin.create(createConfig())
|
||||
client = Admin.create(createConfig)
|
||||
// step 1 - configure root logger
|
||||
val initialRootLogLevel = LogLevelConfig.TRACE_LOG_LEVEL
|
||||
val alterRootLoggerEntry = Seq(
|
||||
|
@ -2179,7 +2179,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
|
|||
@Test
|
||||
@Ignore // To be re-enabled once KAFKA-8779 is resolved
|
||||
def testIncrementalAlterConfigsForLog4jLogLevelsCannotResetRootLogger(): Unit = {
|
||||
client = Admin.create(createConfig())
|
||||
client = Admin.create(createConfig)
|
||||
val deleteRootLoggerEntry = Seq(
|
||||
new AlterConfigOp(new ConfigEntry(Log4jController.ROOT_LOGGER, ""), AlterConfigOp.OpType.DELETE)
|
||||
).asJavaCollection
|
||||
|
@ -2190,7 +2190,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
|
|||
@Test
|
||||
@Ignore // To be re-enabled once KAFKA-8779 is resolved
|
||||
def testIncrementalAlterConfigsForLog4jLogLevelsDoesNotWorkWithInvalidConfigs(): Unit = {
|
||||
client = Admin.create(createConfig())
|
||||
client = Admin.create(createConfig)
|
||||
val validLoggerName = "kafka.server.KafkaRequestHandler"
|
||||
val expectedValidLoggerLogLevel = describeBrokerLoggers().get(validLoggerName)
|
||||
def assertLogLevelDidNotChange(): Unit = {
|
||||
|
@ -2236,7 +2236,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
|
|||
@Test
|
||||
@Ignore // To be re-enabled once KAFKA-8779 is resolved
|
||||
def testAlterConfigsForLog4jLogLevelsDoesNotWork(): Unit = {
|
||||
client = Admin.create(createConfig())
|
||||
client = Admin.create(createConfig)
|
||||
|
||||
val alterLogLevelsEntries = Seq(
|
||||
new ConfigEntry("kafka.controller.KafkaController", LogLevelConfig.INFO_LOG_LEVEL)
|
||||
|
|
|
@ -163,7 +163,7 @@ class SaslClientsWithInvalidCredentialsTest extends IntegrationTestHarness with
|
|||
val consumer = createConsumer()
|
||||
consumer.subscribe(List(topic).asJava)
|
||||
|
||||
verifyAuthenticationException(consumerGroupService.listGroups)
|
||||
verifyAuthenticationException(consumerGroupService.listGroups())
|
||||
consumerGroupService.close()
|
||||
}
|
||||
|
||||
|
@ -176,7 +176,7 @@ class SaslClientsWithInvalidCredentialsTest extends IntegrationTestHarness with
|
|||
consumer.subscribe(List(topic).asJava)
|
||||
|
||||
verifyWithRetry(consumer.poll(Duration.ofMillis(1000)))
|
||||
assertEquals(1, consumerGroupService.listConsumerGroups.size)
|
||||
assertEquals(1, consumerGroupService.listConsumerGroups().size)
|
||||
consumerGroupService.close()
|
||||
}
|
||||
|
||||
|
|
|
@ -76,6 +76,6 @@ abstract class SaslEndToEndAuthorizationTest extends EndToEndAuthorizationTest {
|
|||
case e: TopicAuthorizationException => assertTrue(e.unauthorizedTopics.contains(topic))
|
||||
case e: GroupAuthorizationException => assertEquals(group, e.groupId)
|
||||
}
|
||||
confirmReauthenticationMetrics
|
||||
confirmReauthenticationMetrics()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -96,7 +96,7 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu
|
|||
|
||||
@Test
|
||||
def testAclOperations(): Unit = {
|
||||
client = Admin.create(createConfig())
|
||||
client = Admin.create(createConfig)
|
||||
val acl = new AclBinding(new ResourcePattern(ResourceType.TOPIC, "mytopic3", PatternType.LITERAL),
|
||||
new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW))
|
||||
assertEquals(7, getAcls(AclBindingFilter.ANY).size)
|
||||
|
@ -117,7 +117,7 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu
|
|||
|
||||
@Test
|
||||
def testAclOperations2(): Unit = {
|
||||
client = Admin.create(createConfig())
|
||||
client = Admin.create(createConfig)
|
||||
val results = client.createAcls(List(acl2, acl2, transactionalIdAcl).asJava)
|
||||
assertEquals(Set(acl2, acl2, transactionalIdAcl), results.values.keySet.asScala)
|
||||
results.all.get()
|
||||
|
@ -143,7 +143,7 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu
|
|||
|
||||
@Test
|
||||
def testAclDescribe(): Unit = {
|
||||
client = Admin.create(createConfig())
|
||||
client = Admin.create(createConfig)
|
||||
ensureAcls(Set(anyAcl, acl2, fooAcl, prefixAcl))
|
||||
|
||||
val allTopicAcls = new AclBindingFilter(new ResourcePatternFilter(ResourceType.TOPIC, null, PatternType.ANY), AccessControlEntryFilter.ANY)
|
||||
|
@ -170,7 +170,7 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu
|
|||
|
||||
@Test
|
||||
def testAclDelete(): Unit = {
|
||||
client = Admin.create(createConfig())
|
||||
client = Admin.create(createConfig)
|
||||
ensureAcls(Set(anyAcl, acl2, fooAcl, prefixAcl))
|
||||
|
||||
val allTopicAcls = new AclBindingFilter(new ResourcePatternFilter(ResourceType.TOPIC, null, PatternType.MATCH), AccessControlEntryFilter.ANY)
|
||||
|
@ -220,7 +220,7 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu
|
|||
//noinspection ScalaDeprecation - test explicitly covers clients using legacy / deprecated constructors
|
||||
@Test
|
||||
def testLegacyAclOpsNeverAffectOrReturnPrefixed(): Unit = {
|
||||
client = Admin.create(createConfig())
|
||||
client = Admin.create(createConfig)
|
||||
ensureAcls(Set(anyAcl, acl2, fooAcl, prefixAcl)) // <-- prefixed exists, but should never be returned.
|
||||
|
||||
val allTopicAcls = new AclBindingFilter(new ResourcePatternFilter(ResourceType.TOPIC, null, PatternType.MATCH), AccessControlEntryFilter.ANY)
|
||||
|
@ -257,7 +257,7 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu
|
|||
|
||||
@Test
|
||||
def testAttemptToCreateInvalidAcls(): Unit = {
|
||||
client = Admin.create(createConfig())
|
||||
client = Admin.create(createConfig)
|
||||
val clusterAcl = new AclBinding(new ResourcePattern(ResourceType.CLUSTER, "foobar", PatternType.LITERAL),
|
||||
new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.READ, AclPermissionType.ALLOW))
|
||||
val emptyResourceNameAcl = new AclBinding(new ResourcePattern(ResourceType.TOPIC, "", PatternType.LITERAL),
|
||||
|
@ -268,7 +268,7 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu
|
|||
assertFutureExceptionTypeEquals(results.values.get(emptyResourceNameAcl), classOf[InvalidRequestException])
|
||||
}
|
||||
|
||||
override def configuredClusterPermissions(): Set[AclOperation] = {
|
||||
override def configuredClusterPermissions: Set[AclOperation] = {
|
||||
Set(AclOperation.ALTER, AclOperation.CREATE, AclOperation.CLUSTER_ACTION, AclOperation.ALTER_CONFIGS,
|
||||
AclOperation.DESCRIBE, AclOperation.DESCRIBE_CONFIGS)
|
||||
}
|
||||
|
@ -356,7 +356,7 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu
|
|||
|
||||
@Test
|
||||
def testAclAuthorizationDenied(): Unit = {
|
||||
client = Admin.create(createConfig())
|
||||
client = Admin.create(createConfig)
|
||||
|
||||
// Test that we cannot create or delete ACLs when ALTER is denied.
|
||||
authorizationAdmin.addClusterAcl(DENY, ALTER)
|
||||
|
@ -393,7 +393,7 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu
|
|||
val denyAcl = new AclBinding(new ResourcePattern(ResourceType.TOPIC, topic2, PatternType.LITERAL),
|
||||
new AccessControlEntry("User:*", "*", AclOperation.DESCRIBE_CONFIGS, AclPermissionType.DENY))
|
||||
|
||||
client = Admin.create(createConfig())
|
||||
client = Admin.create(createConfig)
|
||||
client.createAcls(List(denyAcl).asJava, new CreateAclsOptions()).all().get()
|
||||
|
||||
val topics = Seq(topic1, topic2)
|
||||
|
|
|
@ -214,7 +214,7 @@ class SslAdminIntegrationTest extends SaslSslAdminIntegrationTest {
|
|||
val testSemaphore = new Semaphore(0)
|
||||
SslAdminIntegrationTest.semaphore = Some(testSemaphore)
|
||||
|
||||
client = Admin.create(createConfig())
|
||||
client = Admin.create(createConfig)
|
||||
val results = client.createAcls(List(acl2, acl3).asJava).values
|
||||
assertEquals(Set(acl2, acl3), results.keySet().asScala)
|
||||
assertFalse(results.values.asScala.exists(_.isDone))
|
||||
|
@ -237,7 +237,7 @@ class SslAdminIntegrationTest extends SaslSslAdminIntegrationTest {
|
|||
}
|
||||
|
||||
private def createAdminClient: Admin = {
|
||||
val config = createConfig()
|
||||
val config = createConfig
|
||||
config.put(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "40000")
|
||||
val client = Admin.create(config)
|
||||
adminClients += client
|
||||
|
|
|
@ -390,7 +390,7 @@ object MiniKdc {
|
|||
|
|
||||
""".stripMargin
|
||||
println(infoMessage)
|
||||
Exit.addShutdownHook("minikdc-shutdown-hook", miniKdc.stop)
|
||||
Exit.addShutdownHook("minikdc-shutdown-hook", miniKdc.stop())
|
||||
miniKdc
|
||||
}
|
||||
|
||||
|
|
|
@ -89,7 +89,7 @@ class ExitTest {
|
|||
array(1) = array(1).asInstanceOf[Int] + 1
|
||||
}
|
||||
try {
|
||||
Exit.addShutdownHook(name, sideEffect) // by-name parameter, only invoked due to above shutdownHookAdder
|
||||
Exit.addShutdownHook(name, sideEffect()) // by-name parameter, only invoked due to above shutdownHookAdder
|
||||
assertEquals(1, array(1))
|
||||
assertEquals(name * array(1).asInstanceOf[Int], array(0).toString)
|
||||
Exit.addShutdownHook(name, array(1) = array(1).asInstanceOf[Int] + 1) // by-name parameter, only invoked due to above shutdownHookAdder
|
||||
|
@ -109,7 +109,7 @@ class ExitTest {
|
|||
// mutate the first element
|
||||
array(0) = array(0) + name
|
||||
}
|
||||
Exit.addShutdownHook(name, sideEffect) // by-name parameter, not invoked
|
||||
Exit.addShutdownHook(name, sideEffect()) // by-name parameter, not invoked
|
||||
// make sure the first element wasn't mutated
|
||||
assertEquals(name, array(0))
|
||||
Exit.addShutdownHook(name, sideEffect()) // by-name parameter, not invoked
|
||||
|
|
|
@ -80,7 +80,7 @@ object ReplicationQuotasTestRig {
|
|||
def run(config: ExperimentDef, journal: Journal, displayChartsOnScreen: Boolean): Unit = {
|
||||
val experiment = new Experiment()
|
||||
try {
|
||||
experiment.setUp
|
||||
experiment.setUp()
|
||||
experiment.run(config, journal, displayChartsOnScreen)
|
||||
journal.footer()
|
||||
}
|
||||
|
@ -88,7 +88,7 @@ object ReplicationQuotasTestRig {
|
|||
case e: Exception => e.printStackTrace()
|
||||
}
|
||||
finally {
|
||||
experiment.tearDown
|
||||
experiment.tearDown()
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -56,7 +56,7 @@ class DelegationTokenCommandTest extends BaseRequestTest with SaslSetup {
|
|||
props.map(KafkaConfig.fromProps)
|
||||
}
|
||||
|
||||
private def createAdminConfig():util.Map[String, Object] = {
|
||||
private def createAdminConfig: util.Map[String, Object] = {
|
||||
val config = new util.HashMap[String, Object]
|
||||
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
|
||||
val securityProps: util.Map[Object, Object] =
|
||||
|
|
|
@ -74,7 +74,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
|
|||
// check if all replicas but the one that is shut down has deleted the log
|
||||
TestUtils.waitUntilTrue(() =>
|
||||
servers.filter(s => s.config.brokerId != follower.config.brokerId)
|
||||
.forall(_.getLogManager().getLog(topicPartition).isEmpty), "Replicas 0,1 have not deleted log.")
|
||||
.forall(_.getLogManager.getLog(topicPartition).isEmpty), "Replicas 0,1 have not deleted log.")
|
||||
// ensure topic deletion is halted
|
||||
TestUtils.waitUntilTrue(() => zkClient.isTopicMarkedForDeletion(topic),
|
||||
"Admin path /admin/delete_topics/test path deleted even when a follower replica is down")
|
||||
|
@ -122,7 +122,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
|
|||
// create the topic
|
||||
TestUtils.createTopic(zkClient, topic, expectedReplicaAssignment, servers)
|
||||
// wait until replica log is created on every broker
|
||||
TestUtils.waitUntilTrue(() => servers.forall(_.getLogManager().getLog(topicPartition).isDefined),
|
||||
TestUtils.waitUntilTrue(() => servers.forall(_.getLogManager.getLog(topicPartition).isDefined),
|
||||
"Replicas for topic test not created.")
|
||||
val leaderIdOpt = zkClient.getLeaderForPartition(new TopicPartition(topic, 0))
|
||||
assertTrue("Leader should exist for partition [test,0]", leaderIdOpt.isDefined)
|
||||
|
@ -206,7 +206,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
|
|||
// create the topic
|
||||
TestUtils.createTopic(zkClient, topic, expectedReplicaAssignment, servers)
|
||||
// wait until replica log is created on every broker
|
||||
TestUtils.waitUntilTrue(() => servers.forall(_.getLogManager().getLog(topicPartition).isDefined),
|
||||
TestUtils.waitUntilTrue(() => servers.forall(_.getLogManager.getLog(topicPartition).isDefined),
|
||||
"Replicas for topic test not created.")
|
||||
// shutdown a broker to make sure the following topic deletion will be suspended
|
||||
val leaderIdOpt = zkClient.getLeaderForPartition(topicPartition)
|
||||
|
@ -279,7 +279,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
|
|||
TestUtils.verifyTopicDeletion(zkClient, topic, 1, servers)
|
||||
// verify that new partition doesn't exist on any broker either
|
||||
TestUtils.waitUntilTrue(() =>
|
||||
servers.forall(_.getLogManager().getLog(newPartition).isEmpty),
|
||||
servers.forall(_.getLogManager.getLog(newPartition).isEmpty),
|
||||
"Replica logs not for new partition [test,1] not deleted after delete topic is complete.")
|
||||
}
|
||||
|
||||
|
@ -298,7 +298,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
|
|||
TestUtils.verifyTopicDeletion(zkClient, topic, 1, servers)
|
||||
// verify that new partition doesn't exist on any broker either
|
||||
assertTrue("Replica logs not deleted after delete topic is complete",
|
||||
servers.forall(_.getLogManager().getLog(newPartition).isEmpty))
|
||||
servers.forall(_.getLogManager.getLog(newPartition).isEmpty))
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -313,7 +313,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
|
|||
// re-create topic on same replicas
|
||||
TestUtils.createTopic(zkClient, topic, expectedReplicaAssignment, servers)
|
||||
// check if all replica logs are created
|
||||
TestUtils.waitUntilTrue(() => servers.forall(_.getLogManager().getLog(topicPartition).isDefined),
|
||||
TestUtils.waitUntilTrue(() => servers.forall(_.getLogManager.getLog(topicPartition).isDefined),
|
||||
"Replicas for topic test not created.")
|
||||
}
|
||||
|
||||
|
@ -332,7 +332,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
|
|||
// verify delete topic path for test2 is removed from ZooKeeper
|
||||
TestUtils.verifyTopicDeletion(zkClient, "test2", 1, servers)
|
||||
// verify that topic test is untouched
|
||||
TestUtils.waitUntilTrue(() => servers.forall(_.getLogManager().getLog(topicPartition).isDefined),
|
||||
TestUtils.waitUntilTrue(() => servers.forall(_.getLogManager.getLog(topicPartition).isDefined),
|
||||
"Replicas for topic test not created")
|
||||
// test the topic path exists
|
||||
assertTrue("Topic test mistakenly deleted", zkClient.topicExists(topic))
|
||||
|
@ -403,7 +403,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
|
|||
// create the topic
|
||||
TestUtils.createTopic(zkClient, topic, expectedReplicaAssignment, servers)
|
||||
// wait until replica log is created on every broker
|
||||
TestUtils.waitUntilTrue(() => servers.forall(_.getLogManager().getLog(topicPartition).isDefined),
|
||||
TestUtils.waitUntilTrue(() => servers.forall(_.getLogManager.getLog(topicPartition).isDefined),
|
||||
"Replicas for topic test not created")
|
||||
servers
|
||||
}
|
||||
|
@ -428,7 +428,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
|
|||
TestUtils.waitUntilTrue(() => !zkClient.isTopicMarkedForDeletion(topic),
|
||||
"Admin path /admin/delete_topics/%s path not deleted even if deleteTopic is disabled".format(topic))
|
||||
// verify that topic test is untouched
|
||||
assertTrue(servers.forall(_.getLogManager().getLog(topicPartition).isDefined))
|
||||
assertTrue(servers.forall(_.getLogManager.getLog(topicPartition).isDefined))
|
||||
// test the topic path exists
|
||||
assertTrue("Topic path disappeared", zkClient.topicExists(topic))
|
||||
// topic test should have a leader
|
||||
|
|
|
@ -79,7 +79,7 @@ class PreferredReplicaLeaderElectionCommandTest extends ZooKeeperTestHarness wit
|
|||
() =>
|
||||
servers.forall { server =>
|
||||
partitionsAndAssignments.forall { partitionAndAssignment =>
|
||||
server.getLogManager().getLog(partitionAndAssignment._1).isDefined
|
||||
server.getLogManager.getLog(partitionAndAssignment._1).isDefined
|
||||
}
|
||||
},
|
||||
"Replicas for topic test not created"
|
||||
|
|
|
@ -47,14 +47,14 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
|
|||
|
||||
@Before
|
||||
override def setUp(): Unit = {
|
||||
super.setUp
|
||||
super.setUp()
|
||||
servers = Seq.empty[KafkaServer]
|
||||
}
|
||||
|
||||
@After
|
||||
override def tearDown(): Unit = {
|
||||
TestUtils.shutdownServers(servers)
|
||||
super.tearDown
|
||||
super.tearDown()
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -297,7 +297,7 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
|
|||
"failed to get expected partition state after partition reassignment")
|
||||
TestUtils.waitUntilTrue(() => zkClient.getFullReplicaAssignmentForTopics(Set(tp.topic)) == reassignment,
|
||||
"failed to get updated partition assignment on topic znode after partition reassignment")
|
||||
TestUtils.waitUntilTrue(() => !zkClient.reassignPartitionsInProgress(),
|
||||
TestUtils.waitUntilTrue(() => !zkClient.reassignPartitionsInProgress,
|
||||
"failed to remove reassign partitions path after completion")
|
||||
|
||||
val updatedTimerCount = timer(metricName).count
|
||||
|
@ -319,7 +319,7 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
|
|||
zkClient.setOrCreatePartitionReassignment(reassignment, controller.kafkaController.controllerContext.epochZkVersion)
|
||||
waitForPartitionState(tp, firstControllerEpoch, controllerId, LeaderAndIsr.initialLeaderEpoch + 1,
|
||||
"failed to get expected partition state during partition reassignment with offline replica")
|
||||
TestUtils.waitUntilTrue(() => zkClient.reassignPartitionsInProgress(),
|
||||
TestUtils.waitUntilTrue(() => zkClient.reassignPartitionsInProgress,
|
||||
"partition reassignment path should remain while reassignment in progress")
|
||||
}
|
||||
|
||||
|
@ -342,7 +342,7 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
|
|||
"failed to get expected partition state after partition reassignment")
|
||||
TestUtils.waitUntilTrue(() => zkClient.getFullReplicaAssignmentForTopics(Set(tp.topic)) == reassignment,
|
||||
"failed to get updated partition assignment on topic znode after partition reassignment")
|
||||
TestUtils.waitUntilTrue(() => !zkClient.reassignPartitionsInProgress(),
|
||||
TestUtils.waitUntilTrue(() => !zkClient.reassignPartitionsInProgress,
|
||||
"failed to remove reassign partitions path after completion")
|
||||
}
|
||||
|
||||
|
|
|
@ -159,7 +159,7 @@ class GroupCoordinatorConcurrencyTest extends AbstractCoordinatorConcurrencyTest
|
|||
val responseFutures = new ConcurrentHashMap[GroupMember, Future[R]]()
|
||||
|
||||
def setUpCallback(member: GroupMember): C = {
|
||||
val responsePromise = Promise[R]
|
||||
val responsePromise = Promise[R]()
|
||||
val responseFuture = responsePromise.future
|
||||
responseFutures.put(member, responseFuture)
|
||||
responseCallback(responsePromise)
|
||||
|
|
|
@ -3745,35 +3745,35 @@ class GroupCoordinatorTest {
|
|||
}
|
||||
|
||||
private def setupJoinGroupCallback: (Future[JoinGroupResult], JoinGroupCallback) = {
|
||||
val responsePromise = Promise[JoinGroupResult]
|
||||
val responsePromise = Promise[JoinGroupResult]()
|
||||
val responseFuture = responsePromise.future
|
||||
val responseCallback: JoinGroupCallback = responsePromise.success
|
||||
(responseFuture, responseCallback)
|
||||
}
|
||||
|
||||
private def setupSyncGroupCallback: (Future[SyncGroupResult], SyncGroupCallback) = {
|
||||
val responsePromise = Promise[SyncGroupResult]
|
||||
val responsePromise = Promise[SyncGroupResult]()
|
||||
val responseFuture = responsePromise.future
|
||||
val responseCallback: SyncGroupCallback = responsePromise.success
|
||||
(responseFuture, responseCallback)
|
||||
}
|
||||
|
||||
private def setupHeartbeatCallback: (Future[HeartbeatCallbackParams], HeartbeatCallback) = {
|
||||
val responsePromise = Promise[HeartbeatCallbackParams]
|
||||
val responsePromise = Promise[HeartbeatCallbackParams]()
|
||||
val responseFuture = responsePromise.future
|
||||
val responseCallback: HeartbeatCallback = error => responsePromise.success(error)
|
||||
(responseFuture, responseCallback)
|
||||
}
|
||||
|
||||
private def setupCommitOffsetsCallback: (Future[CommitOffsetCallbackParams], CommitOffsetCallback) = {
|
||||
val responsePromise = Promise[CommitOffsetCallbackParams]
|
||||
val responsePromise = Promise[CommitOffsetCallbackParams]()
|
||||
val responseFuture = responsePromise.future
|
||||
val responseCallback: CommitOffsetCallback = offsets => responsePromise.success(offsets)
|
||||
(responseFuture, responseCallback)
|
||||
}
|
||||
|
||||
private def setupLeaveGroupCallback: (Future[LeaveGroupResult], LeaveGroupCallback) = {
|
||||
val responsePromise = Promise[LeaveGroupResult]
|
||||
val responsePromise = Promise[LeaveGroupResult]()
|
||||
val responseFuture = responsePromise.future
|
||||
val responseCallback: LeaveGroupCallback = result => responsePromise.success(result)
|
||||
(responseFuture, responseCallback)
|
||||
|
|
|
@ -62,7 +62,7 @@ class MetricsDuringTopicCreationDeletionTest extends KafkaServerTestHarness with
|
|||
metricName.foreach(KafkaYammerMetrics.defaultRegistry.removeMetric)
|
||||
}
|
||||
|
||||
super.setUp
|
||||
super.setUp()
|
||||
}
|
||||
|
||||
/*
|
||||
|
|
|
@ -31,7 +31,7 @@ class MinIsrConfigTest extends KafkaServerTestHarness {
|
|||
|
||||
@Test
|
||||
def testDefaultKafkaConfig(): Unit = {
|
||||
assert(servers.head.getLogManager().initialDefaultConfig.minInSyncReplicas == 5)
|
||||
assert(servers.head.getLogManager.initialDefaultConfig.minInSyncReplicas == 5)
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -57,7 +57,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
|
|||
var servers: Seq[KafkaServer] = Seq.empty[KafkaServer]
|
||||
|
||||
val random = new Random()
|
||||
val topic = "topic" + random.nextLong
|
||||
val topic = "topic" + random.nextLong()
|
||||
val partitionId = 0
|
||||
|
||||
val kafkaApisLogger = Logger.getLogger(classOf[kafka.server.KafkaApis])
|
||||
|
@ -112,7 +112,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
|
|||
// create topic with 1 partition, 2 replicas, one on each broker
|
||||
TestUtils.createTopic(zkClient, topic, Map(partitionId -> Seq(brokerId1, brokerId2)), servers)
|
||||
|
||||
verifyUncleanLeaderElectionEnabled
|
||||
verifyUncleanLeaderElectionEnabled()
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -123,7 +123,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
|
|||
// create topic with 1 partition, 2 replicas, one on each broker
|
||||
TestUtils.createTopic(zkClient, topic, Map(partitionId -> Seq(brokerId1, brokerId2)), servers)
|
||||
|
||||
verifyUncleanLeaderElectionDisabled
|
||||
verifyUncleanLeaderElectionDisabled()
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -138,7 +138,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
|
|||
topicProps.put("unclean.leader.election.enable", "true")
|
||||
TestUtils.createTopic(zkClient, topic, Map(partitionId -> Seq(brokerId1, brokerId2)), servers, topicProps)
|
||||
|
||||
verifyUncleanLeaderElectionEnabled
|
||||
verifyUncleanLeaderElectionEnabled()
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -153,7 +153,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
|
|||
topicProps.put("unclean.leader.election.enable", "false")
|
||||
TestUtils.createTopic(zkClient, topic, Map(partitionId -> Seq(brokerId1, brokerId2)), servers, topicProps)
|
||||
|
||||
verifyUncleanLeaderElectionDisabled
|
||||
verifyUncleanLeaderElectionDisabled()
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -277,7 +277,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
|
|||
val brokerList = TestUtils.bootstrapServers(servers, ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))
|
||||
// Don't rely on coordinator as it may be down when this method is called
|
||||
val consumer = TestUtils.createConsumer(brokerList,
|
||||
groupId = "group" + random.nextLong,
|
||||
groupId = "group" + random.nextLong(),
|
||||
enableAutoCommit = false,
|
||||
valueDeserializer = new StringDeserializer)
|
||||
try {
|
||||
|
|
|
@ -182,7 +182,7 @@ class LogCleanerTest {
|
|||
cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 2, log.activeSegment.baseOffset))
|
||||
|
||||
assertTrue("Cleaned segment file should be trimmed to its real size.",
|
||||
log.logSegments.iterator.next.log.channel().size() < originalMaxFileSize)
|
||||
log.logSegments.iterator.next().log.channel.size < originalMaxFileSize)
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -334,7 +334,7 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
|
|||
val acls1 = Set[Acl](acl2)
|
||||
simpleAclAuthorizer.addAcls(acls1, resource1)
|
||||
|
||||
zkClient.deleteAclChangeNotifications
|
||||
zkClient.deleteAclChangeNotifications()
|
||||
val authorizer = new SimpleAclAuthorizer
|
||||
try {
|
||||
authorizer.configure(config.originals)
|
||||
|
|
|
@ -362,7 +362,7 @@ class AclAuthorizerTest extends ZooKeeperTestHarness {
|
|||
val acls1 = Set(acl2)
|
||||
addAcls(aclAuthorizer, acls1, resource1)
|
||||
|
||||
zkClient.deleteAclChangeNotifications
|
||||
zkClient.deleteAclChangeNotifications()
|
||||
val authorizer = new AclAuthorizer
|
||||
try {
|
||||
authorizer.configure(config.originals)
|
||||
|
@ -1087,9 +1087,8 @@ class AclAuthorizerTest extends ZooKeeperTestHarness {
|
|||
op != AclOperation.ANY && op != AclOperation.UNKNOWN
|
||||
}
|
||||
|
||||
private def prepareDefaultConfig(): String = {
|
||||
private def prepareDefaultConfig: String =
|
||||
prepareConfig(Array("broker.id=1", "zookeeper.connect=somewhere"))
|
||||
}
|
||||
|
||||
private def prepareConfig(lines : Array[String]): String = {
|
||||
val file = File.createTempFile("kafkatest", ".properties")
|
||||
|
|
|
@ -99,7 +99,7 @@ class DelegationTokenManagerTest extends ZooKeeperTestHarness {
|
|||
def testCreateToken(): Unit = {
|
||||
val config = KafkaConfig.fromProps(props)
|
||||
val tokenManager = createDelegationTokenManager(config, tokenCache, time, zkClient)
|
||||
tokenManager.startup
|
||||
tokenManager.startup()
|
||||
|
||||
tokenManager.createToken(owner, renewer, -1 , createTokenResultCallBack)
|
||||
val issueTime = time.milliseconds
|
||||
|
@ -116,7 +116,7 @@ class DelegationTokenManagerTest extends ZooKeeperTestHarness {
|
|||
def testRenewToken(): Unit = {
|
||||
val config = KafkaConfig.fromProps(props)
|
||||
val tokenManager = createDelegationTokenManager(config, tokenCache, time, zkClient)
|
||||
tokenManager.startup
|
||||
tokenManager.startup()
|
||||
|
||||
tokenManager.createToken(owner, renewer, -1 , createTokenResultCallBack)
|
||||
val issueTime = time.milliseconds
|
||||
|
@ -164,7 +164,7 @@ class DelegationTokenManagerTest extends ZooKeeperTestHarness {
|
|||
def testExpireToken(): Unit = {
|
||||
val config = KafkaConfig.fromProps(props)
|
||||
val tokenManager = createDelegationTokenManager(config, tokenCache, time, zkClient)
|
||||
tokenManager.startup
|
||||
tokenManager.startup()
|
||||
|
||||
tokenManager.createToken(owner, renewer, -1 , createTokenResultCallBack)
|
||||
val issueTime = time.milliseconds
|
||||
|
@ -199,7 +199,7 @@ class DelegationTokenManagerTest extends ZooKeeperTestHarness {
|
|||
def testRemoveTokenHmac():Unit = {
|
||||
val config = KafkaConfig.fromProps(props)
|
||||
val tokenManager = createDelegationTokenManager(config, tokenCache, time, zkClient)
|
||||
tokenManager.startup
|
||||
tokenManager.startup()
|
||||
|
||||
tokenManager.createToken(owner, renewer, -1 , createTokenResultCallBack)
|
||||
val issueTime = time.milliseconds
|
||||
|
@ -240,7 +240,7 @@ class DelegationTokenManagerTest extends ZooKeeperTestHarness {
|
|||
var hostSession = new Session(owner1, InetAddress.getByName("192.168.1.1"))
|
||||
|
||||
val tokenManager = createDelegationTokenManager(config, tokenCache, time, zkClient)
|
||||
tokenManager.startup
|
||||
tokenManager.startup()
|
||||
|
||||
//create tokens
|
||||
tokenManager.createToken(owner1, List(renewer1, renewer2), 1 * 60 * 60 * 1000L, createTokenResultCallBack)
|
||||
|
@ -253,7 +253,7 @@ class DelegationTokenManagerTest extends ZooKeeperTestHarness {
|
|||
|
||||
tokenManager.createToken(owner4, List(owner1, renewer4), 2 * 60 * 60 * 1000L, createTokenResultCallBack)
|
||||
|
||||
assert(tokenManager.getAllTokenInformation().size == 4 )
|
||||
assert(tokenManager.getAllTokenInformation.size == 4 )
|
||||
|
||||
//get tokens non-exiting owner
|
||||
var tokens = getTokens(tokenManager, aclAuthorizer, hostSession, owner1, List(SecurityUtils.parseKafkaPrincipal("User:unknown")))
|
||||
|
@ -330,18 +330,18 @@ class DelegationTokenManagerTest extends ZooKeeperTestHarness {
|
|||
def testPeriodicTokenExpiry(): Unit = {
|
||||
val config = KafkaConfig.fromProps(props)
|
||||
val tokenManager = createDelegationTokenManager(config, tokenCache, time, zkClient)
|
||||
tokenManager.startup
|
||||
tokenManager.startup()
|
||||
|
||||
//create tokens
|
||||
tokenManager.createToken(owner, renewer, 1 * 60 * 60 * 1000L, createTokenResultCallBack)
|
||||
tokenManager.createToken(owner, renewer, 1 * 60 * 60 * 1000L, createTokenResultCallBack)
|
||||
tokenManager.createToken(owner, renewer, 2 * 60 * 60 * 1000L, createTokenResultCallBack)
|
||||
tokenManager.createToken(owner, renewer, 2 * 60 * 60 * 1000L, createTokenResultCallBack)
|
||||
assert(tokenManager.getAllTokenInformation().size == 4 )
|
||||
assert(tokenManager.getAllTokenInformation.size == 4 )
|
||||
|
||||
time.sleep(2 * 60 * 60 * 1000L)
|
||||
tokenManager.expireTokens()
|
||||
assert(tokenManager.getAllTokenInformation().size == 2 )
|
||||
assert(tokenManager.getAllTokenInformation.size == 2 )
|
||||
|
||||
}
|
||||
|
||||
|
|
|
@ -36,7 +36,7 @@ class DelegationTokenRequestsOnPlainTextTest extends BaseRequestTest {
|
|||
super.setUp()
|
||||
}
|
||||
|
||||
def createAdminConfig():util.Map[String, Object] = {
|
||||
def createAdminConfig: util.Map[String, Object] = {
|
||||
val config = new util.HashMap[String, Object]
|
||||
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
|
||||
val securityProps: util.Map[Object, Object] =
|
||||
|
|
|
@ -56,7 +56,7 @@ class DelegationTokenRequestsTest extends BaseRequestTest with SaslSetup {
|
|||
props.map(KafkaConfig.fromProps)
|
||||
}
|
||||
|
||||
private def createAdminConfig():util.Map[String, Object] = {
|
||||
private def createAdminConfig: util.Map[String, Object] = {
|
||||
val config = new util.HashMap[String, Object]
|
||||
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
|
||||
val securityProps: util.Map[Object, Object] =
|
||||
|
|
|
@ -44,7 +44,7 @@ class DelegationTokenRequestsWithDisableTokenFeatureTest extends BaseRequestTest
|
|||
super.setUp()
|
||||
}
|
||||
|
||||
def createAdminConfig():util.Map[String, Object] = {
|
||||
def createAdminConfig: util.Map[String, Object] = {
|
||||
val config = new util.HashMap[String, Object]
|
||||
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
|
||||
val securityProps: util.Map[Object, Object] =
|
||||
|
|
|
@ -195,7 +195,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
|
|||
|
||||
@Test
|
||||
def testConfigChangeOnNonExistingTopic(): Unit = {
|
||||
val topic = TestUtils.tempTopic
|
||||
val topic = TestUtils.tempTopic()
|
||||
try {
|
||||
val logProps = new Properties()
|
||||
logProps.put(FlushMessagesProp, 10000: java.lang.Integer)
|
||||
|
@ -208,7 +208,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
|
|||
|
||||
@Test
|
||||
def testConfigChangeOnNonExistingTopicWithAdminClient(): Unit = {
|
||||
val topic = TestUtils.tempTopic
|
||||
val topic = TestUtils.tempTopic()
|
||||
val admin = createAdminClient()
|
||||
try {
|
||||
val resource = new ConfigResource(ConfigResource.Type.TOPIC, topic)
|
||||
|
|
|
@ -52,7 +52,7 @@ class FetchSessionTest {
|
|||
assertEquals(sessionIds.size, cache.size)
|
||||
}
|
||||
|
||||
private def dummyCreate(size: Int)() = {
|
||||
private def dummyCreate(size: Int): FetchSession.CACHE_MAP = {
|
||||
val cacheMap = new FetchSession.CACHE_MAP(size)
|
||||
for (i <- 0 until size) {
|
||||
cacheMap.add(new CachedPartition("test", i))
|
||||
|
|
|
@ -191,7 +191,7 @@ class FinalizedFeatureChangeListenerTest extends ZooKeeperTestHarness {
|
|||
*/
|
||||
@Test
|
||||
def testNotificationFailureDueToFeatureIncompatibility(): Unit = {
|
||||
createSupportedFeatures
|
||||
createSupportedFeatures()
|
||||
val initialFinalizedFeatures = createFinalizedFeatures()
|
||||
val listener = createListener(Some(initialFinalizedFeatures))
|
||||
|
||||
|
|
|
@ -60,7 +60,7 @@ class HighwatermarkPersistenceTest {
|
|||
|
||||
// create kafka scheduler
|
||||
val scheduler = new KafkaScheduler(2)
|
||||
scheduler.startup
|
||||
scheduler.startup()
|
||||
val metrics = new Metrics
|
||||
val time = new MockTime
|
||||
// create replica manager
|
||||
|
@ -110,7 +110,7 @@ class HighwatermarkPersistenceTest {
|
|||
EasyMock.replay(zkClient)
|
||||
// create kafka scheduler
|
||||
val scheduler = new KafkaScheduler(2)
|
||||
scheduler.startup
|
||||
scheduler.startup()
|
||||
val metrics = new Metrics
|
||||
val time = new MockTime
|
||||
// create replica manager
|
||||
|
@ -178,7 +178,7 @@ class HighwatermarkPersistenceTest {
|
|||
}
|
||||
|
||||
private def hwmFor(replicaManager: ReplicaManager, topic: String, partition: Int): Long = {
|
||||
replicaManager.highWatermarkCheckpoints(new File(replicaManager.config.logDirs.head).getAbsolutePath).read.getOrElse(
|
||||
replicaManager.highWatermarkCheckpoints(new File(replicaManager.config.logDirs.head).getAbsolutePath).read().getOrElse(
|
||||
new TopicPartition(topic, partition), 0L)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -575,23 +575,23 @@ class KafkaConfigTest {
|
|||
|
||||
@Test
|
||||
def testFromPropsInvalid(): Unit = {
|
||||
def getBaseProperties(): Properties = {
|
||||
def baseProperties: Properties = {
|
||||
val validRequiredProperties = new Properties()
|
||||
validRequiredProperties.put(KafkaConfig.ZkConnectProp, "127.0.0.1:2181")
|
||||
validRequiredProperties
|
||||
}
|
||||
// to ensure a basis is valid - bootstraps all needed validation
|
||||
KafkaConfig.fromProps(getBaseProperties())
|
||||
KafkaConfig.fromProps(baseProperties)
|
||||
|
||||
KafkaConfig.configNames().foreach(name => {
|
||||
KafkaConfig.configNames.foreach { name =>
|
||||
name match {
|
||||
case KafkaConfig.ZkConnectProp => // ignore string
|
||||
case KafkaConfig.ZkSessionTimeoutMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
|
||||
case KafkaConfig.ZkConnectionTimeoutMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
|
||||
case KafkaConfig.ZkSyncTimeMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
|
||||
case KafkaConfig.ZkEnableSecureAclsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_boolean")
|
||||
case KafkaConfig.ZkMaxInFlightRequestsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
|
||||
case KafkaConfig.ZkSslClientEnableProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_boolean")
|
||||
case KafkaConfig.ZkSessionTimeoutMsProp => assertPropertyInvalid(baseProperties, name, "not_a_number")
|
||||
case KafkaConfig.ZkConnectionTimeoutMsProp => assertPropertyInvalid(baseProperties, name, "not_a_number")
|
||||
case KafkaConfig.ZkSyncTimeMsProp => assertPropertyInvalid(baseProperties, name, "not_a_number")
|
||||
case KafkaConfig.ZkEnableSecureAclsProp => assertPropertyInvalid(baseProperties, name, "not_a_boolean")
|
||||
case KafkaConfig.ZkMaxInFlightRequestsProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0")
|
||||
case KafkaConfig.ZkSslClientEnableProp => assertPropertyInvalid(baseProperties, name, "not_a_boolean")
|
||||
case KafkaConfig.ZkClientCnxnSocketProp => //ignore string
|
||||
case KafkaConfig.ZkSslKeyStoreLocationProp => //ignore string
|
||||
case KafkaConfig.ZkSslKeyStorePasswordProp => //ignore string
|
||||
|
@ -603,115 +603,115 @@ class KafkaConfigTest {
|
|||
case KafkaConfig.ZkSslEnabledProtocolsProp => //ignore string
|
||||
case KafkaConfig.ZkSslCipherSuitesProp => //ignore string
|
||||
case KafkaConfig.ZkSslEndpointIdentificationAlgorithmProp => //ignore string
|
||||
case KafkaConfig.ZkSslCrlEnableProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_boolean")
|
||||
case KafkaConfig.ZkSslOcspEnableProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_boolean")
|
||||
case KafkaConfig.ZkSslCrlEnableProp => assertPropertyInvalid(baseProperties, name, "not_a_boolean")
|
||||
case KafkaConfig.ZkSslOcspEnableProp => assertPropertyInvalid(baseProperties, name, "not_a_boolean")
|
||||
|
||||
case KafkaConfig.BrokerIdProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
|
||||
case KafkaConfig.NumNetworkThreadsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
|
||||
case KafkaConfig.NumIoThreadsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
|
||||
case KafkaConfig.BackgroundThreadsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
|
||||
case KafkaConfig.QueuedMaxRequestsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
|
||||
case KafkaConfig.NumReplicaAlterLogDirsThreadsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
|
||||
case KafkaConfig.QueuedMaxBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
|
||||
case KafkaConfig.RequestTimeoutMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
|
||||
case KafkaConfig.BrokerIdProp => assertPropertyInvalid(baseProperties, name, "not_a_number")
|
||||
case KafkaConfig.NumNetworkThreadsProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0")
|
||||
case KafkaConfig.NumIoThreadsProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0")
|
||||
case KafkaConfig.BackgroundThreadsProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0")
|
||||
case KafkaConfig.QueuedMaxRequestsProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0")
|
||||
case KafkaConfig.NumReplicaAlterLogDirsThreadsProp => assertPropertyInvalid(baseProperties, name, "not_a_number")
|
||||
case KafkaConfig.QueuedMaxBytesProp => assertPropertyInvalid(baseProperties, name, "not_a_number")
|
||||
case KafkaConfig.RequestTimeoutMsProp => assertPropertyInvalid(baseProperties, name, "not_a_number")
|
||||
|
||||
case KafkaConfig.AuthorizerClassNameProp => //ignore string
|
||||
case KafkaConfig.CreateTopicPolicyClassNameProp => //ignore string
|
||||
|
||||
case KafkaConfig.PortProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
|
||||
case KafkaConfig.PortProp => assertPropertyInvalid(baseProperties, name, "not_a_number")
|
||||
case KafkaConfig.HostNameProp => // ignore string
|
||||
case KafkaConfig.AdvertisedHostNameProp => //ignore string
|
||||
case KafkaConfig.AdvertisedPortProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
|
||||
case KafkaConfig.SocketSendBufferBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
|
||||
case KafkaConfig.SocketReceiveBufferBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
|
||||
case KafkaConfig.AdvertisedPortProp => assertPropertyInvalid(baseProperties, name, "not_a_number")
|
||||
case KafkaConfig.SocketSendBufferBytesProp => assertPropertyInvalid(baseProperties, name, "not_a_number")
|
||||
case KafkaConfig.SocketReceiveBufferBytesProp => assertPropertyInvalid(baseProperties, name, "not_a_number")
|
||||
case KafkaConfig.MaxConnectionsPerIpOverridesProp =>
|
||||
assertPropertyInvalid(getBaseProperties(), name, "127.0.0.1:not_a_number")
|
||||
case KafkaConfig.ConnectionsMaxIdleMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
|
||||
case KafkaConfig.FailedAuthenticationDelayMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "-1")
|
||||
assertPropertyInvalid(baseProperties, name, "127.0.0.1:not_a_number")
|
||||
case KafkaConfig.ConnectionsMaxIdleMsProp => assertPropertyInvalid(baseProperties, name, "not_a_number")
|
||||
case KafkaConfig.FailedAuthenticationDelayMsProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "-1")
|
||||
|
||||
case KafkaConfig.NumPartitionsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
|
||||
case KafkaConfig.NumPartitionsProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0")
|
||||
case KafkaConfig.LogDirsProp => // ignore string
|
||||
case KafkaConfig.LogDirProp => // ignore string
|
||||
case KafkaConfig.LogSegmentBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", Records.LOG_OVERHEAD - 1)
|
||||
case KafkaConfig.LogSegmentBytesProp => assertPropertyInvalid(baseProperties, name, "not_a_number", Records.LOG_OVERHEAD - 1)
|
||||
|
||||
case KafkaConfig.LogRollTimeMillisProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
|
||||
case KafkaConfig.LogRollTimeHoursProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
|
||||
case KafkaConfig.LogRollTimeMillisProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0")
|
||||
case KafkaConfig.LogRollTimeHoursProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0")
|
||||
|
||||
case KafkaConfig.LogRetentionTimeMillisProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
|
||||
case KafkaConfig.LogRetentionTimeMinutesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
|
||||
case KafkaConfig.LogRetentionTimeHoursProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
|
||||
case KafkaConfig.LogRetentionTimeMillisProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0")
|
||||
case KafkaConfig.LogRetentionTimeMinutesProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0")
|
||||
case KafkaConfig.LogRetentionTimeHoursProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0")
|
||||
|
||||
case KafkaConfig.LogRetentionBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
|
||||
case KafkaConfig.LogCleanupIntervalMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
|
||||
case KafkaConfig.LogCleanupPolicyProp => assertPropertyInvalid(getBaseProperties(), name, "unknown_policy", "0")
|
||||
case KafkaConfig.LogCleanerIoMaxBytesPerSecondProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
|
||||
case KafkaConfig.LogCleanerDedupeBufferSizeProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "1024")
|
||||
case KafkaConfig.LogCleanerDedupeBufferLoadFactorProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
|
||||
case KafkaConfig.LogCleanerEnableProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_boolean")
|
||||
case KafkaConfig.LogCleanerDeleteRetentionMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
|
||||
case KafkaConfig.LogCleanerMinCompactionLagMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
|
||||
case KafkaConfig.LogCleanerMaxCompactionLagMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
|
||||
case KafkaConfig.LogCleanerMinCleanRatioProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
|
||||
case KafkaConfig.LogIndexSizeMaxBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "3")
|
||||
case KafkaConfig.LogFlushIntervalMessagesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
|
||||
case KafkaConfig.LogFlushSchedulerIntervalMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
|
||||
case KafkaConfig.LogFlushIntervalMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
|
||||
case KafkaConfig.LogMessageTimestampDifferenceMaxMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
|
||||
case KafkaConfig.LogFlushStartOffsetCheckpointIntervalMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
|
||||
case KafkaConfig.NumRecoveryThreadsPerDataDirProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
|
||||
case KafkaConfig.AutoCreateTopicsEnableProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_boolean", "0")
|
||||
case KafkaConfig.MinInSyncReplicasProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
|
||||
case KafkaConfig.ControllerSocketTimeoutMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
|
||||
case KafkaConfig.DefaultReplicationFactorProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
|
||||
case KafkaConfig.ReplicaLagTimeMaxMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
|
||||
case KafkaConfig.ReplicaSocketTimeoutMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "-2")
|
||||
case KafkaConfig.ReplicaSocketReceiveBufferBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
|
||||
case KafkaConfig.ReplicaFetchMaxBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
|
||||
case KafkaConfig.ReplicaFetchWaitMaxMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
|
||||
case KafkaConfig.ReplicaFetchMinBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
|
||||
case KafkaConfig.ReplicaFetchResponseMaxBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
|
||||
case KafkaConfig.LogRetentionBytesProp => assertPropertyInvalid(baseProperties, name, "not_a_number")
|
||||
case KafkaConfig.LogCleanupIntervalMsProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0")
|
||||
case KafkaConfig.LogCleanupPolicyProp => assertPropertyInvalid(baseProperties, name, "unknown_policy", "0")
|
||||
case KafkaConfig.LogCleanerIoMaxBytesPerSecondProp => assertPropertyInvalid(baseProperties, name, "not_a_number")
|
||||
case KafkaConfig.LogCleanerDedupeBufferSizeProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "1024")
|
||||
case KafkaConfig.LogCleanerDedupeBufferLoadFactorProp => assertPropertyInvalid(baseProperties, name, "not_a_number")
|
||||
case KafkaConfig.LogCleanerEnableProp => assertPropertyInvalid(baseProperties, name, "not_a_boolean")
|
||||
case KafkaConfig.LogCleanerDeleteRetentionMsProp => assertPropertyInvalid(baseProperties, name, "not_a_number")
|
||||
case KafkaConfig.LogCleanerMinCompactionLagMsProp => assertPropertyInvalid(baseProperties, name, "not_a_number")
|
||||
case KafkaConfig.LogCleanerMaxCompactionLagMsProp => assertPropertyInvalid(baseProperties, name, "not_a_number")
|
||||
case KafkaConfig.LogCleanerMinCleanRatioProp => assertPropertyInvalid(baseProperties, name, "not_a_number")
|
||||
case KafkaConfig.LogIndexSizeMaxBytesProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "3")
|
||||
case KafkaConfig.LogFlushIntervalMessagesProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0")
|
||||
case KafkaConfig.LogFlushSchedulerIntervalMsProp => assertPropertyInvalid(baseProperties, name, "not_a_number")
|
||||
case KafkaConfig.LogFlushIntervalMsProp => assertPropertyInvalid(baseProperties, name, "not_a_number")
|
||||
case KafkaConfig.LogMessageTimestampDifferenceMaxMsProp => assertPropertyInvalid(baseProperties, name, "not_a_number")
|
||||
case KafkaConfig.LogFlushStartOffsetCheckpointIntervalMsProp => assertPropertyInvalid(baseProperties, name, "not_a_number")
|
||||
case KafkaConfig.NumRecoveryThreadsPerDataDirProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0")
|
||||
case KafkaConfig.AutoCreateTopicsEnableProp => assertPropertyInvalid(baseProperties, name, "not_a_boolean", "0")
|
||||
case KafkaConfig.MinInSyncReplicasProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0")
|
||||
case KafkaConfig.ControllerSocketTimeoutMsProp => assertPropertyInvalid(baseProperties, name, "not_a_number")
|
||||
case KafkaConfig.DefaultReplicationFactorProp => assertPropertyInvalid(baseProperties, name, "not_a_number")
|
||||
case KafkaConfig.ReplicaLagTimeMaxMsProp => assertPropertyInvalid(baseProperties, name, "not_a_number")
|
||||
case KafkaConfig.ReplicaSocketTimeoutMsProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "-2")
|
||||
case KafkaConfig.ReplicaSocketReceiveBufferBytesProp => assertPropertyInvalid(baseProperties, name, "not_a_number")
|
||||
case KafkaConfig.ReplicaFetchMaxBytesProp => assertPropertyInvalid(baseProperties, name, "not_a_number")
|
||||
case KafkaConfig.ReplicaFetchWaitMaxMsProp => assertPropertyInvalid(baseProperties, name, "not_a_number")
|
||||
case KafkaConfig.ReplicaFetchMinBytesProp => assertPropertyInvalid(baseProperties, name, "not_a_number")
|
||||
case KafkaConfig.ReplicaFetchResponseMaxBytesProp => assertPropertyInvalid(baseProperties, name, "not_a_number")
|
||||
case KafkaConfig.ReplicaSelectorClassProp => // Ignore string
|
||||
case KafkaConfig.NumReplicaFetchersProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
|
||||
case KafkaConfig.ReplicaHighWatermarkCheckpointIntervalMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
|
||||
case KafkaConfig.FetchPurgatoryPurgeIntervalRequestsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
|
||||
case KafkaConfig.ProducerPurgatoryPurgeIntervalRequestsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
|
||||
case KafkaConfig.DeleteRecordsPurgatoryPurgeIntervalRequestsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
|
||||
case KafkaConfig.AutoLeaderRebalanceEnableProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_boolean", "0")
|
||||
case KafkaConfig.LeaderImbalancePerBrokerPercentageProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
|
||||
case KafkaConfig.LeaderImbalanceCheckIntervalSecondsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
|
||||
case KafkaConfig.UncleanLeaderElectionEnableProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_boolean", "0")
|
||||
case KafkaConfig.ControlledShutdownMaxRetriesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
|
||||
case KafkaConfig.ControlledShutdownRetryBackoffMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
|
||||
case KafkaConfig.ControlledShutdownEnableProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_boolean", "0")
|
||||
case KafkaConfig.GroupMinSessionTimeoutMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
|
||||
case KafkaConfig.GroupMaxSessionTimeoutMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
|
||||
case KafkaConfig.GroupInitialRebalanceDelayMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
|
||||
case KafkaConfig.GroupMaxSizeProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0", "-1")
|
||||
case KafkaConfig.OffsetMetadataMaxSizeProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
|
||||
case KafkaConfig.OffsetsLoadBufferSizeProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
|
||||
case KafkaConfig.OffsetsTopicReplicationFactorProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
|
||||
case KafkaConfig.OffsetsTopicPartitionsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
|
||||
case KafkaConfig.OffsetsTopicSegmentBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
|
||||
case KafkaConfig.OffsetsTopicCompressionCodecProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "-1")
|
||||
case KafkaConfig.OffsetsRetentionMinutesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
|
||||
case KafkaConfig.OffsetsRetentionCheckIntervalMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
|
||||
case KafkaConfig.OffsetCommitTimeoutMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
|
||||
case KafkaConfig.OffsetCommitRequiredAcksProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "-2")
|
||||
case KafkaConfig.TransactionalIdExpirationMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0", "-2")
|
||||
case KafkaConfig.TransactionsMaxTimeoutMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0", "-2")
|
||||
case KafkaConfig.TransactionsTopicMinISRProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0", "-2")
|
||||
case KafkaConfig.TransactionsLoadBufferSizeProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0", "-2")
|
||||
case KafkaConfig.TransactionsTopicPartitionsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0", "-2")
|
||||
case KafkaConfig.TransactionsTopicSegmentBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0", "-2")
|
||||
case KafkaConfig.TransactionsTopicReplicationFactorProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0", "-2")
|
||||
case KafkaConfig.ProducerQuotaBytesPerSecondDefaultProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
|
||||
case KafkaConfig.ConsumerQuotaBytesPerSecondDefaultProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
|
||||
case KafkaConfig.NumQuotaSamplesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
|
||||
case KafkaConfig.QuotaWindowSizeSecondsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
|
||||
case KafkaConfig.DeleteTopicEnableProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_boolean", "0")
|
||||
case KafkaConfig.NumReplicaFetchersProp => assertPropertyInvalid(baseProperties, name, "not_a_number")
|
||||
case KafkaConfig.ReplicaHighWatermarkCheckpointIntervalMsProp => assertPropertyInvalid(baseProperties, name, "not_a_number")
|
||||
case KafkaConfig.FetchPurgatoryPurgeIntervalRequestsProp => assertPropertyInvalid(baseProperties, name, "not_a_number")
|
||||
case KafkaConfig.ProducerPurgatoryPurgeIntervalRequestsProp => assertPropertyInvalid(baseProperties, name, "not_a_number")
|
||||
case KafkaConfig.DeleteRecordsPurgatoryPurgeIntervalRequestsProp => assertPropertyInvalid(baseProperties, name, "not_a_number")
|
||||
case KafkaConfig.AutoLeaderRebalanceEnableProp => assertPropertyInvalid(baseProperties, name, "not_a_boolean", "0")
|
||||
case KafkaConfig.LeaderImbalancePerBrokerPercentageProp => assertPropertyInvalid(baseProperties, name, "not_a_number")
|
||||
case KafkaConfig.LeaderImbalanceCheckIntervalSecondsProp => assertPropertyInvalid(baseProperties, name, "not_a_number")
|
||||
case KafkaConfig.UncleanLeaderElectionEnableProp => assertPropertyInvalid(baseProperties, name, "not_a_boolean", "0")
|
||||
case KafkaConfig.ControlledShutdownMaxRetriesProp => assertPropertyInvalid(baseProperties, name, "not_a_number")
|
||||
case KafkaConfig.ControlledShutdownRetryBackoffMsProp => assertPropertyInvalid(baseProperties, name, "not_a_number")
|
||||
case KafkaConfig.ControlledShutdownEnableProp => assertPropertyInvalid(baseProperties, name, "not_a_boolean", "0")
|
||||
case KafkaConfig.GroupMinSessionTimeoutMsProp => assertPropertyInvalid(baseProperties, name, "not_a_number")
|
||||
case KafkaConfig.GroupMaxSessionTimeoutMsProp => assertPropertyInvalid(baseProperties, name, "not_a_number")
|
||||
case KafkaConfig.GroupInitialRebalanceDelayMsProp => assertPropertyInvalid(baseProperties, name, "not_a_number")
|
||||
case KafkaConfig.GroupMaxSizeProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0", "-1")
|
||||
case KafkaConfig.OffsetMetadataMaxSizeProp => assertPropertyInvalid(baseProperties, name, "not_a_number")
|
||||
case KafkaConfig.OffsetsLoadBufferSizeProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0")
|
||||
case KafkaConfig.OffsetsTopicReplicationFactorProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0")
|
||||
case KafkaConfig.OffsetsTopicPartitionsProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0")
|
||||
case KafkaConfig.OffsetsTopicSegmentBytesProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0")
|
||||
case KafkaConfig.OffsetsTopicCompressionCodecProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "-1")
|
||||
case KafkaConfig.OffsetsRetentionMinutesProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0")
|
||||
case KafkaConfig.OffsetsRetentionCheckIntervalMsProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0")
|
||||
case KafkaConfig.OffsetCommitTimeoutMsProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0")
|
||||
case KafkaConfig.OffsetCommitRequiredAcksProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "-2")
|
||||
case KafkaConfig.TransactionalIdExpirationMsProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0", "-2")
|
||||
case KafkaConfig.TransactionsMaxTimeoutMsProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0", "-2")
|
||||
case KafkaConfig.TransactionsTopicMinISRProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0", "-2")
|
||||
case KafkaConfig.TransactionsLoadBufferSizeProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0", "-2")
|
||||
case KafkaConfig.TransactionsTopicPartitionsProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0", "-2")
|
||||
case KafkaConfig.TransactionsTopicSegmentBytesProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0", "-2")
|
||||
case KafkaConfig.TransactionsTopicReplicationFactorProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0", "-2")
|
||||
case KafkaConfig.ProducerQuotaBytesPerSecondDefaultProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0")
|
||||
case KafkaConfig.ConsumerQuotaBytesPerSecondDefaultProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0")
|
||||
case KafkaConfig.NumQuotaSamplesProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0")
|
||||
case KafkaConfig.QuotaWindowSizeSecondsProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0")
|
||||
case KafkaConfig.DeleteTopicEnableProp => assertPropertyInvalid(baseProperties, name, "not_a_boolean", "0")
|
||||
|
||||
case KafkaConfig.MetricNumSamplesProp => assertPropertyInvalid(getBaseProperties, name, "not_a_number", "-1", "0")
|
||||
case KafkaConfig.MetricSampleWindowMsProp => assertPropertyInvalid(getBaseProperties, name, "not_a_number", "-1", "0")
|
||||
case KafkaConfig.MetricNumSamplesProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "-1", "0")
|
||||
case KafkaConfig.MetricSampleWindowMsProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "-1", "0")
|
||||
case KafkaConfig.MetricReporterClassesProp => // ignore string
|
||||
case KafkaConfig.MetricRecordingLevelProp => // ignore string
|
||||
case KafkaConfig.RackProp => // ignore string
|
||||
|
@ -763,33 +763,33 @@ class KafkaConfigTest {
|
|||
case KafkaConfig.PasswordEncoderOldSecretProp =>
|
||||
case KafkaConfig.PasswordEncoderKeyFactoryAlgorithmProp =>
|
||||
case KafkaConfig.PasswordEncoderCipherAlgorithmProp =>
|
||||
case KafkaConfig.PasswordEncoderKeyLengthProp => assertPropertyInvalid(getBaseProperties, name, "not_a_number", "-1", "0")
|
||||
case KafkaConfig.PasswordEncoderIterationsProp => assertPropertyInvalid(getBaseProperties, name, "not_a_number", "-1", "0")
|
||||
case KafkaConfig.PasswordEncoderKeyLengthProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "-1", "0")
|
||||
case KafkaConfig.PasswordEncoderIterationsProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "-1", "0")
|
||||
|
||||
//delegation token configs
|
||||
case KafkaConfig.DelegationTokenMasterKeyProp => // ignore
|
||||
case KafkaConfig.DelegationTokenMaxLifeTimeProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
|
||||
case KafkaConfig.DelegationTokenExpiryTimeMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
|
||||
case KafkaConfig.DelegationTokenExpiryCheckIntervalMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
|
||||
case KafkaConfig.DelegationTokenMaxLifeTimeProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0")
|
||||
case KafkaConfig.DelegationTokenExpiryTimeMsProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0")
|
||||
case KafkaConfig.DelegationTokenExpiryCheckIntervalMsProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0")
|
||||
|
||||
//Kafka Yammer metrics reporter configs
|
||||
case KafkaConfig.KafkaMetricsReporterClassesProp => // ignore
|
||||
case KafkaConfig.KafkaMetricsPollingIntervalSecondsProp => //ignore
|
||||
|
||||
case _ => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "-1")
|
||||
case _ => assertPropertyInvalid(baseProperties, name, "not_a_number", "-1")
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
@Test
|
||||
def testDynamicLogConfigs(): Unit = {
|
||||
def getBaseProperties(): Properties = {
|
||||
def baseProperties: Properties = {
|
||||
val validRequiredProperties = new Properties()
|
||||
validRequiredProperties.put(KafkaConfig.ZkConnectProp, "127.0.0.1:2181")
|
||||
validRequiredProperties
|
||||
}
|
||||
|
||||
val props = getBaseProperties()
|
||||
val props = baseProperties
|
||||
val config = KafkaConfig.fromProps(props)
|
||||
|
||||
def assertDynamic(property: String, value: Any, accessor: () => Any): Unit = {
|
||||
|
|
|
@ -110,9 +110,9 @@ class LogRecoveryTest extends ZooKeeperTestHarness {
|
|||
"Failed to update high watermark for follower after timeout")
|
||||
|
||||
servers.foreach(_.replicaManager.checkpointHighWatermarks())
|
||||
val leaderHW = hwFile1.read.getOrElse(topicPartition, 0L)
|
||||
val leaderHW = hwFile1.read().getOrElse(topicPartition, 0L)
|
||||
assertEquals(numMessages, leaderHW)
|
||||
val followerHW = hwFile2.read.getOrElse(topicPartition, 0L)
|
||||
val followerHW = hwFile2.read().getOrElse(topicPartition, 0L)
|
||||
assertEquals(numMessages, followerHW)
|
||||
}
|
||||
|
||||
|
@ -120,7 +120,7 @@ class LogRecoveryTest extends ZooKeeperTestHarness {
|
|||
def testHWCheckpointWithFailuresSingleLogSegment(): Unit = {
|
||||
var leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId)
|
||||
|
||||
assertEquals(0L, hwFile1.read.getOrElse(topicPartition, 0L))
|
||||
assertEquals(0L, hwFile1.read().getOrElse(topicPartition, 0L))
|
||||
|
||||
sendMessages(1)
|
||||
Thread.sleep(1000)
|
||||
|
@ -128,7 +128,7 @@ class LogRecoveryTest extends ZooKeeperTestHarness {
|
|||
|
||||
// kill the server hosting the preferred replica
|
||||
server1.shutdown()
|
||||
assertEquals(hw, hwFile1.read.getOrElse(topicPartition, 0L))
|
||||
assertEquals(hw, hwFile1.read().getOrElse(topicPartition, 0L))
|
||||
|
||||
// check if leader moves to the other server
|
||||
leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, oldLeaderOpt = Some(leader))
|
||||
|
@ -143,7 +143,7 @@ class LogRecoveryTest extends ZooKeeperTestHarness {
|
|||
assertTrue("Leader must remain on broker 1, in case of ZooKeeper session expiration it can move to broker 0",
|
||||
leader == 0 || leader == 1)
|
||||
|
||||
assertEquals(hw, hwFile1.read.getOrElse(topicPartition, 0L))
|
||||
assertEquals(hw, hwFile1.read().getOrElse(topicPartition, 0L))
|
||||
/** We plan to shutdown server2 and transfer the leadership to server1.
|
||||
* With unclean leader election turned off, a prerequisite for the successful leadership transition
|
||||
* is that server1 has caught up on the topicPartition, and has joined the ISR.
|
||||
|
@ -155,7 +155,7 @@ class LogRecoveryTest extends ZooKeeperTestHarness {
|
|||
|
||||
// since server 2 was never shut down, the hw value of 30 is probably not checkpointed to disk yet
|
||||
server2.shutdown()
|
||||
assertEquals(hw, hwFile2.read.getOrElse(topicPartition, 0L))
|
||||
assertEquals(hw, hwFile2.read().getOrElse(topicPartition, 0L))
|
||||
|
||||
server2.startup()
|
||||
updateProducer()
|
||||
|
@ -172,8 +172,8 @@ class LogRecoveryTest extends ZooKeeperTestHarness {
|
|||
"Failed to update high watermark for follower after timeout")
|
||||
// shutdown the servers to allow the hw to be checkpointed
|
||||
servers.foreach(_.shutdown())
|
||||
assertEquals(hw, hwFile1.read.getOrElse(topicPartition, 0L))
|
||||
assertEquals(hw, hwFile2.read.getOrElse(topicPartition, 0L))
|
||||
assertEquals(hw, hwFile1.read().getOrElse(topicPartition, 0L))
|
||||
assertEquals(hw, hwFile2.read().getOrElse(topicPartition, 0L))
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -186,9 +186,9 @@ class LogRecoveryTest extends ZooKeeperTestHarness {
|
|||
"Failed to update high watermark for follower after timeout")
|
||||
// shutdown the servers to allow the hw to be checkpointed
|
||||
servers.foreach(_.shutdown())
|
||||
val leaderHW = hwFile1.read.getOrElse(topicPartition, 0L)
|
||||
val leaderHW = hwFile1.read().getOrElse(topicPartition, 0L)
|
||||
assertEquals(hw, leaderHW)
|
||||
val followerHW = hwFile2.read.getOrElse(topicPartition, 0L)
|
||||
val followerHW = hwFile2.read().getOrElse(topicPartition, 0L)
|
||||
assertEquals(hw, followerHW)
|
||||
}
|
||||
|
||||
|
@ -206,8 +206,8 @@ class LogRecoveryTest extends ZooKeeperTestHarness {
|
|||
// kill the server hosting the preferred replica
|
||||
server1.shutdown()
|
||||
server2.shutdown()
|
||||
assertEquals(hw, hwFile1.read.getOrElse(topicPartition, 0L))
|
||||
assertEquals(hw, hwFile2.read.getOrElse(topicPartition, 0L))
|
||||
assertEquals(hw, hwFile1.read().getOrElse(topicPartition, 0L))
|
||||
assertEquals(hw, hwFile2.read().getOrElse(topicPartition, 0L))
|
||||
|
||||
server2.startup()
|
||||
updateProducer()
|
||||
|
@ -215,14 +215,14 @@ class LogRecoveryTest extends ZooKeeperTestHarness {
|
|||
leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, oldLeaderOpt = Some(leader))
|
||||
assertEquals("Leader must move to broker 1", 1, leader)
|
||||
|
||||
assertEquals(hw, hwFile1.read.getOrElse(topicPartition, 0L))
|
||||
assertEquals(hw, hwFile1.read().getOrElse(topicPartition, 0L))
|
||||
|
||||
// bring the preferred replica back
|
||||
server1.startup()
|
||||
updateProducer()
|
||||
|
||||
assertEquals(hw, hwFile1.read.getOrElse(topicPartition, 0L))
|
||||
assertEquals(hw, hwFile2.read.getOrElse(topicPartition, 0L))
|
||||
assertEquals(hw, hwFile1.read().getOrElse(topicPartition, 0L))
|
||||
assertEquals(hw, hwFile2.read().getOrElse(topicPartition, 0L))
|
||||
|
||||
sendMessages(2)
|
||||
hw += 2
|
||||
|
@ -236,8 +236,8 @@ class LogRecoveryTest extends ZooKeeperTestHarness {
|
|||
"Failed to update high watermark for follower after timeout")
|
||||
// shutdown the servers to allow the hw to be checkpointed
|
||||
servers.foreach(_.shutdown())
|
||||
assertEquals(hw, hwFile1.read.getOrElse(topicPartition, 0L))
|
||||
assertEquals(hw, hwFile2.read.getOrElse(topicPartition, 0L))
|
||||
assertEquals(hw, hwFile1.read().getOrElse(topicPartition, 0L))
|
||||
assertEquals(hw, hwFile2.read().getOrElse(topicPartition, 0L))
|
||||
}
|
||||
|
||||
private def sendMessages(n: Int): Unit = {
|
||||
|
|
|
@ -72,7 +72,7 @@ class ReplicaAlterLogDirsThreadTest {
|
|||
|
||||
val addedPartitions = thread.addPartitions(Map(t1p0 -> offsetAndEpoch(0L)))
|
||||
assertEquals(Set.empty, addedPartitions)
|
||||
assertEquals(0, thread.partitionCount())
|
||||
assertEquals(0, thread.partitionCount)
|
||||
assertEquals(None, thread.fetchState(t1p0))
|
||||
}
|
||||
|
||||
|
@ -132,18 +132,18 @@ class ReplicaAlterLogDirsThreadTest {
|
|||
// Initially we add the partition with an older epoch which results in an error
|
||||
thread.addPartitions(Map(t1p0 -> offsetAndEpoch(fetchOffset = 0L, leaderEpoch - 1)))
|
||||
assertTrue(thread.fetchState(t1p0).isDefined)
|
||||
assertEquals(1, thread.partitionCount())
|
||||
assertEquals(1, thread.partitionCount)
|
||||
|
||||
thread.doWork()
|
||||
|
||||
assertTrue(failedPartitions.contains(t1p0))
|
||||
assertEquals(None, thread.fetchState(t1p0))
|
||||
assertEquals(0, thread.partitionCount())
|
||||
assertEquals(0, thread.partitionCount)
|
||||
|
||||
// Next we update the epoch and assert that we can continue
|
||||
thread.addPartitions(Map(t1p0 -> offsetAndEpoch(fetchOffset = 0L, leaderEpoch)))
|
||||
assertEquals(Some(leaderEpoch), thread.fetchState(t1p0).map(_.currentLeaderEpoch))
|
||||
assertEquals(1, thread.partitionCount())
|
||||
assertEquals(1, thread.partitionCount)
|
||||
|
||||
val requestData = new FetchRequest.PartitionData(0L, 0L,
|
||||
config.replicaFetchMaxBytes, Optional.of(leaderEpoch))
|
||||
|
@ -162,7 +162,7 @@ class ReplicaAlterLogDirsThreadTest {
|
|||
|
||||
assertFalse(failedPartitions.contains(t1p0))
|
||||
assertEquals(None, thread.fetchState(t1p0))
|
||||
assertEquals(0, thread.partitionCount())
|
||||
assertEquals(0, thread.partitionCount)
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -220,12 +220,12 @@ class ReplicaAlterLogDirsThreadTest {
|
|||
|
||||
thread.addPartitions(Map(t1p0 -> offsetAndEpoch(fetchOffset = 0L, leaderEpoch)))
|
||||
assertTrue(thread.fetchState(t1p0).isDefined)
|
||||
assertEquals(1, thread.partitionCount())
|
||||
assertEquals(1, thread.partitionCount)
|
||||
|
||||
thread.doWork()
|
||||
|
||||
assertEquals(None, thread.fetchState(t1p0))
|
||||
assertEquals(0, thread.partitionCount())
|
||||
assertEquals(0, thread.partitionCount)
|
||||
}
|
||||
|
||||
private def mockFetchFromCurrentLog(topicPartition: TopicPartition,
|
||||
|
|
|
@ -69,9 +69,9 @@ class ReplicaFetchTest extends ZooKeeperTestHarness {
|
|||
var result = true
|
||||
for (topic <- List(topic1, topic2)) {
|
||||
val tp = new TopicPartition(topic, partition)
|
||||
val expectedOffset = brokers.head.getLogManager().getLog(tp).get.logEndOffset
|
||||
val expectedOffset = brokers.head.getLogManager.getLog(tp).get.logEndOffset
|
||||
result = result && expectedOffset > 0 && brokers.forall { item =>
|
||||
expectedOffset == item.getLogManager().getLog(tp).get.logEndOffset
|
||||
expectedOffset == item.getLogManager.getLog(tp).get.logEndOffset
|
||||
}
|
||||
}
|
||||
result
|
||||
|
|
|
@ -134,7 +134,7 @@ class ReplicationQuotasTest extends ZooKeeperTestHarness {
|
|||
|
||||
//Check that throttled config correctly migrated to the new brokers
|
||||
(106 to 107).foreach { brokerId =>
|
||||
assertEquals(throttle, brokerFor(brokerId).quotaManagers.follower.upperBound())
|
||||
assertEquals(throttle, brokerFor(brokerId).quotaManagers.follower.upperBound)
|
||||
}
|
||||
if (!leaderThrottle) {
|
||||
(0 to 2).foreach { partition => assertTrue(brokerFor(106).quotaManagers.follower.isThrottled(tp(partition))) }
|
||||
|
|
|
@ -118,7 +118,7 @@ class ServerShutdownTest extends ZooKeeperTestHarness {
|
|||
producer.close()
|
||||
server.shutdown()
|
||||
CoreUtils.delete(server.config.logDirs)
|
||||
verifyNonDaemonThreadsStatus
|
||||
verifyNonDaemonThreadsStatus()
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -131,7 +131,7 @@ class ServerShutdownTest extends ZooKeeperTestHarness {
|
|||
server.shutdown()
|
||||
server.awaitShutdown()
|
||||
CoreUtils.delete(server.config.logDirs)
|
||||
verifyNonDaemonThreadsStatus
|
||||
verifyNonDaemonThreadsStatus()
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -177,7 +177,7 @@ class ServerShutdownTest extends ZooKeeperTestHarness {
|
|||
server.awaitShutdown()
|
||||
}
|
||||
CoreUtils.delete(server.config.logDirs)
|
||||
verifyNonDaemonThreadsStatus
|
||||
verifyNonDaemonThreadsStatus()
|
||||
}
|
||||
|
||||
private[this] def isNonDaemonKafkaThread(t: Thread): Boolean = {
|
||||
|
|
|
@ -86,8 +86,8 @@ class EpochDrivenReplicationProtocolAcceptanceTest extends ZooKeeperTestHarness
|
|||
producer.send(new ProducerRecord(topic, 0, null, msg)).get
|
||||
|
||||
//The message should have epoch 0 stamped onto it in both leader and follower
|
||||
assertEquals(0, latestRecord(leader).partitionLeaderEpoch())
|
||||
assertEquals(0, latestRecord(follower).partitionLeaderEpoch())
|
||||
assertEquals(0, latestRecord(leader).partitionLeaderEpoch)
|
||||
assertEquals(0, latestRecord(follower).partitionLeaderEpoch)
|
||||
|
||||
//Both leader and follower should have recorded Epoch 0 at Offset 0
|
||||
assertEquals(Buffer(EpochEntry(0, 0)), epochCache(leader).epochEntries)
|
||||
|
@ -452,16 +452,16 @@ class EpochDrivenReplicationProtocolAcceptanceTest extends ZooKeeperTestHarness
|
|||
TestUtils.createProducer(getBrokerListStrFromServers(brokers), acks = -1)
|
||||
}
|
||||
|
||||
private def leader(): KafkaServer = {
|
||||
private def leader: KafkaServer = {
|
||||
assertEquals(2, brokers.size)
|
||||
val leaderId = zkClient.getLeaderForPartition(new TopicPartition(topic, 0)).get
|
||||
brokers.filter(_.config.brokerId == leaderId)(0)
|
||||
brokers.filter(_.config.brokerId == leaderId).head
|
||||
}
|
||||
|
||||
private def follower(): KafkaServer = {
|
||||
private def follower: KafkaServer = {
|
||||
assertEquals(2, brokers.size)
|
||||
val leader = zkClient.getLeaderForPartition(new TopicPartition(topic, 0)).get
|
||||
brokers.filter(_.config.brokerId != leader)(0)
|
||||
brokers.filter(_.config.brokerId != leader).head
|
||||
}
|
||||
|
||||
private def createBroker(id: Int, enableUncleanLeaderElection: Boolean = false): KafkaServer = {
|
||||
|
|
|
@ -239,9 +239,9 @@ class LeaderEpochIntegrationTest extends ZooKeeperTestHarness with Logging {
|
|||
var result = true
|
||||
for (topic <- List(topic1, topic2)) {
|
||||
val tp = new TopicPartition(topic, 0)
|
||||
val leo = broker.getLogManager().getLog(tp).get.logEndOffset
|
||||
val leo = broker.getLogManager.getLog(tp).get.logEndOffset
|
||||
result = result && leo > 0 && brokers.forall { broker =>
|
||||
broker.getLogManager().getLog(tp).get.logSegments.iterator.forall { segment =>
|
||||
broker.getLogManager.getLog(tp).get.logSegments.iterator.forall { segment =>
|
||||
if (segment.read(minOffset, Integer.MAX_VALUE) == null) {
|
||||
false
|
||||
} else {
|
||||
|
|
|
@ -115,7 +115,7 @@ class DumpLogSegmentsTest {
|
|||
// only increment the offset if it's not a batch
|
||||
if (isBatch(index)) {
|
||||
assertTrue(s"Not a valid batch-level message record: $line", line.startsWith(s"baseOffset: $offset lastOffset: "))
|
||||
batch = batchIterator.next
|
||||
batch = batchIterator.next()
|
||||
} else {
|
||||
assertTrue(s"Not a valid message record: $line", line.startsWith(s"${DumpLogSegments.RecordIndent} offset: $offset"))
|
||||
if (checkKeysAndValues) {
|
||||
|
|
|
@ -89,7 +89,7 @@ class MockScheduler(val time: Time) extends Scheduler {
|
|||
private def poll(predicate: MockTask => Boolean): Option[MockTask] = {
|
||||
this synchronized {
|
||||
if (tasks.nonEmpty && predicate.apply(tasks.head))
|
||||
Some(tasks.dequeue)
|
||||
Some(tasks.dequeue())
|
||||
else
|
||||
None
|
||||
}
|
||||
|
|
|
@ -467,14 +467,14 @@ object TestUtils extends Logging {
|
|||
var length = 0
|
||||
while(expected.hasNext && actual.hasNext) {
|
||||
length += 1
|
||||
assertEquals(expected.next, actual.next)
|
||||
assertEquals(expected.next(), actual.next())
|
||||
}
|
||||
|
||||
// check if the expected iterator is longer
|
||||
if (expected.hasNext) {
|
||||
var length1 = length
|
||||
while (expected.hasNext) {
|
||||
expected.next
|
||||
expected.next()
|
||||
length1 += 1
|
||||
}
|
||||
assertFalse("Iterators have uneven length-- first has more: "+length1 + " > " + length, true)
|
||||
|
@ -484,7 +484,7 @@ object TestUtils extends Logging {
|
|||
if (actual.hasNext) {
|
||||
var length2 = length
|
||||
while (actual.hasNext) {
|
||||
actual.next
|
||||
actual.next()
|
||||
length2 += 1
|
||||
}
|
||||
assertFalse("Iterators have uneven length-- second has more: "+length2 + " > " + length, true)
|
||||
|
@ -499,7 +499,7 @@ object TestUtils extends Logging {
|
|||
var n = 0
|
||||
while (s1.hasNext) {
|
||||
n += 1
|
||||
s1.next
|
||||
s1.next()
|
||||
}
|
||||
assertEquals(expectedLength, n)
|
||||
}
|
||||
|
@ -524,7 +524,7 @@ object TestUtils extends Logging {
|
|||
while (true) {
|
||||
if (cur == null) {
|
||||
if (topIterator.hasNext)
|
||||
cur = topIterator.next
|
||||
cur = topIterator.next()
|
||||
else
|
||||
return false
|
||||
}
|
||||
|
@ -536,7 +536,7 @@ object TestUtils extends Logging {
|
|||
throw new RuntimeException("should not reach here")
|
||||
}
|
||||
|
||||
def next() : T = cur.next
|
||||
def next() : T = cur.next()
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -188,7 +188,7 @@ class AdminZkClientTest extends ZooKeeperTestHarness with Logging with RackAware
|
|||
assertEquals(props, savedProps)
|
||||
}
|
||||
|
||||
TestUtils.assertConcurrent("Concurrent topic creation failed", Seq(() => createTopic, () => createTopic),
|
||||
TestUtils.assertConcurrent("Concurrent topic creation failed", Seq(() => createTopic(), () => createTopic()),
|
||||
JTestUtils.DEFAULT_MAX_WAIT_MS.toInt)
|
||||
}
|
||||
|
||||
|
|
|
@ -550,7 +550,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
|
|||
})
|
||||
|
||||
// create acl paths
|
||||
zkClient.createAclPaths
|
||||
zkClient.createAclPaths()
|
||||
|
||||
ZkAclStore.stores.foreach(store => {
|
||||
assertTrue(zkClient.pathExists(store.aclPath))
|
||||
|
@ -1184,7 +1184,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
|
|||
|
||||
@Test
|
||||
def testClusterIdMethods(): Unit = {
|
||||
val clusterId = CoreUtils.generateUuidAsBase64
|
||||
val clusterId = CoreUtils.generateUuidAsBase64()
|
||||
|
||||
zkClient.createOrGetClusterId(clusterId)
|
||||
assertEquals(clusterId, zkClient.getClusterId.getOrElse(fail("No cluster id found")))
|
||||
|
@ -1193,7 +1193,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
|
|||
@Test
|
||||
def testBrokerSequenceIdMethods(): Unit = {
|
||||
val sequenceId = zkClient.generateBrokerSequenceId()
|
||||
assertEquals(sequenceId + 1, zkClient.generateBrokerSequenceId)
|
||||
assertEquals(sequenceId + 1, zkClient.generateBrokerSequenceId())
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -1235,7 +1235,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
|
|||
assertFalse(zkClient.pathExists(DelegationTokensZNode.path))
|
||||
assertFalse(zkClient.pathExists(DelegationTokenChangeNotificationZNode.path))
|
||||
|
||||
zkClient.createDelegationTokenPaths
|
||||
zkClient.createDelegationTokenPaths()
|
||||
assertTrue(zkClient.pathExists(DelegationTokensZNode.path))
|
||||
assertTrue(zkClient.pathExists(DelegationTokenChangeNotificationZNode.path))
|
||||
|
||||
|
|
|
@ -21,6 +21,6 @@ group=org.apache.kafka
|
|||
# - tests/kafkatest/version.py (variable DEV_VERSION)
|
||||
# - kafka-merge-pr.py
|
||||
version=2.7.0-SNAPSHOT
|
||||
scalaVersion=2.13.2
|
||||
scalaVersion=2.13.3
|
||||
task=build
|
||||
org.gradle.jvmargs=-Xmx2g -Xss4m -XX:+UseParallelGC
|
||||
|
|
|
@ -28,7 +28,7 @@ ext {
|
|||
|
||||
// Add Scala version
|
||||
def defaultScala212Version = '2.12.11'
|
||||
def defaultScala213Version = '2.13.2'
|
||||
def defaultScala213Version = '2.13.3'
|
||||
if (hasProperty('scalaVersion')) {
|
||||
if (scalaVersion == '2.12') {
|
||||
versions["scala"] = defaultScala212Version
|
||||
|
|
|
@ -165,7 +165,7 @@ class KTableTest extends FlatSpec with Matchers with TestDriver {
|
|||
.stream[String, String](sourceTopic)
|
||||
.groupByKey
|
||||
.windowedBy(window)
|
||||
.count
|
||||
.count()
|
||||
.suppress(suppression)
|
||||
|
||||
table.toStream((k, _) => s"${k.window().start()}:${k.window().end()}:${k.key()}").to(sinkTopic)
|
||||
|
@ -222,7 +222,7 @@ class KTableTest extends FlatSpec with Matchers with TestDriver {
|
|||
.stream[String, String](sourceTopic)
|
||||
.groupByKey
|
||||
.windowedBy(window)
|
||||
.count
|
||||
.count()
|
||||
.suppress(suppression)
|
||||
|
||||
table.toStream((k, _) => s"${k.window().start()}:${k.window().end()}:${k.key()}").to(sinkTopic)
|
||||
|
@ -280,7 +280,7 @@ class KTableTest extends FlatSpec with Matchers with TestDriver {
|
|||
.stream[String, String](sourceTopic)
|
||||
.groupByKey
|
||||
.windowedBy(window)
|
||||
.count
|
||||
.count()
|
||||
.suppress(suppression)
|
||||
|
||||
table.toStream((k, _) => s"${k.window().start()}:${k.window().end()}:${k.key()}").to(sinkTopic)
|
||||
|
@ -348,7 +348,7 @@ class KTableTest extends FlatSpec with Matchers with TestDriver {
|
|||
val table: KTable[String, Long] = builder
|
||||
.stream[String, String](sourceTopic)
|
||||
.groupByKey
|
||||
.count
|
||||
.count()
|
||||
.suppress(suppression)
|
||||
|
||||
table.toStream.to(sinkTopic)
|
||||
|
|
Loading…
Reference in New Issue