diff --git a/bin/kafka-run-class.sh b/bin/kafka-run-class.sh index b06fb12535f..f485f5b0445 100755 --- a/bin/kafka-run-class.sh +++ b/bin/kafka-run-class.sh @@ -48,7 +48,7 @@ should_include_file() { base_dir=$(dirname $0)/.. if [ -z "$SCALA_VERSION" ]; then - SCALA_VERSION=2.13.3 + SCALA_VERSION=2.13.4 if [[ -f "$base_dir/gradle.properties" ]]; then SCALA_VERSION=`grep "^scalaVersion=" "$base_dir/gradle.properties" | cut -d= -f 2` fi diff --git a/bin/windows/kafka-run-class.bat b/bin/windows/kafka-run-class.bat index 4a516c026a0..3490588e37d 100755 --- a/bin/windows/kafka-run-class.bat +++ b/bin/windows/kafka-run-class.bat @@ -27,7 +27,7 @@ set BASE_DIR=%CD% popd IF ["%SCALA_VERSION%"] EQU [""] ( - set SCALA_VERSION=2.13.3 + set SCALA_VERSION=2.13.4 ) IF ["%SCALA_BINARY_VERSION%"] EQU [""] ( diff --git a/build.gradle b/build.gradle index f229699e9df..dc486a17ee1 100644 --- a/build.gradle +++ b/build.gradle @@ -525,7 +525,7 @@ subprojects { scalaCompileOptions.additionalParameters += inlineFrom if (versions.baseScala != '2.12') { - scalaCompileOptions.additionalParameters += ["-opt-warnings"] + scalaCompileOptions.additionalParameters += ["-opt-warnings", "-Xlint:strict-unsealed-patmat"] // Scala 2.13.2 introduces compiler warnings suppression, which is a pre-requisite for -Xfatal-warnings scalaCompileOptions.additionalParameters += ["-Xfatal-warnings"] } diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala index 0492f68acc2..eb420dd4faa 100644 --- a/core/src/main/scala/kafka/admin/ConfigCommand.scala +++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala @@ -480,6 +480,7 @@ object ConfigCommand extends Config { describeResourceConfig(adminClient, entityTypes.head, entityNames.headOption, describeAll) case ConfigType.User | ConfigType.Client => describeClientQuotaAndUserScramCredentialConfigs(adminClient, entityTypes, entityNames) + case entityType => throw new IllegalArgumentException(s"Invalid entity type: $entityType") } } @@ -491,6 +492,7 @@ object ConfigCommand extends Config { adminClient.listTopics(new ListTopicsOptions().listInternal(true)).names().get().asScala.toSeq case ConfigType.Broker | BrokerLoggerConfigType => adminClient.describeCluster(new DescribeClusterOptions()).nodes().get().asScala.map(_.idString).toSeq :+ BrokerDefaultEntityName + case entityType => throw new IllegalArgumentException(s"Invalid entity type: $entityType") }) entities.foreach { entity => @@ -530,6 +532,7 @@ object ConfigCommand extends Config { if (!entityName.isEmpty) validateBrokerId() (ConfigResource.Type.BROKER_LOGGER, None) + case entityType => throw new IllegalArgumentException(s"Invalid entity type: $entityType") } val configSourceFilter = if (describeAll) diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala index 36a5010cd35..d8a01abe1cb 100755 --- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala +++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala @@ -44,8 +44,6 @@ import org.apache.kafka.common.requests.ListOffsetResponse import org.apache.kafka.common.ConsumerGroupState import joptsimple.OptionException -import scala.annotation.nowarn - object ConsumerGroupCommand extends Logging { def main(args: Array[String]): Unit = { @@ -151,22 +149,24 @@ object ConsumerGroupCommand extends Logging { private[admin] case class CsvUtils() { val mapper = new CsvMapper with ScalaObjectMapper mapper.registerModule(DefaultScalaModule) - def readerFor[T <: CsvRecord: ClassTag] = { + def readerFor[T <: CsvRecord : ClassTag] = { val schema = getSchema[T] val clazz = implicitly[ClassTag[T]].runtimeClass mapper.readerFor(clazz).`with`(schema) } - def writerFor[T <: CsvRecord: ClassTag] = { + def writerFor[T <: CsvRecord : ClassTag] = { val schema = getSchema[T] val clazz = implicitly[ClassTag[T]].runtimeClass mapper.writerFor(clazz).`with`(schema) } - private def getSchema[T <: CsvRecord: ClassTag] = { + private def getSchema[T <: CsvRecord : ClassTag] = { val clazz = implicitly[ClassTag[T]].runtimeClass - val fields = clazz match { - case _ if classOf[CsvRecordWithGroup] == clazz => CsvRecordWithGroup.fields - case _ if classOf[CsvRecordNoGroup] == clazz => CsvRecordNoGroup.fields - } + + val fields = + if (classOf[CsvRecordWithGroup] == clazz) CsvRecordWithGroup.fields + else if (classOf[CsvRecordNoGroup] == clazz) CsvRecordNoGroup.fields + else throw new IllegalStateException(s"Unhandled class $clazz") + val schema = mapper.schemaFor(clazz).sortedBy(fields: _*) schema } @@ -555,7 +555,6 @@ object ConsumerGroupCommand extends Logging { /** * Returns the state of the specified consumer group and partition assignment states */ - @nowarn("cat=optimizer") def collectGroupOffsets(groupId: String): (Option[String], Option[Seq[PartitionAssignmentState]]) = { collectGroupsOffsets(List(groupId)).getOrElse(groupId, (None, None)) } diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala index 3ae128976fd..ad7b5454172 100755 --- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala +++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala @@ -479,8 +479,7 @@ object ReassignPartitionsCommand extends Logging { private def topicDescriptionFutureToState(partition: Int, future: KafkaFuture[TopicDescription], - targetReplicas: Seq[Int]) - : PartitionReassignmentState = { + targetReplicas: Seq[Int]): PartitionReassignmentState = { try { val topicDescription = future.get() if (topicDescription.partitions().size() < partition) { @@ -494,7 +493,8 @@ object ReassignPartitionsCommand extends Logging { case t: ExecutionException => t.getCause match { case _: UnknownTopicOrPartitionException => - new PartitionReassignmentState(Seq(), targetReplicas, true) + PartitionReassignmentState(Seq(), targetReplicas, true) + case e => throw e } } } diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala index 71ae2476b9d..dd2eeb6f360 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala @@ -589,7 +589,7 @@ class GroupMetadataManager(brokerId: Int, readAtLeastOneRecord = fetchDataInfo.records.sizeInBytes > 0 - val memRecords = fetchDataInfo.records match { + val memRecords = (fetchDataInfo.records: @unchecked) match { case records: MemoryRecords => records case fileRecords: FileRecords => val sizeInBytes = fileRecords.sizeInBytes diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala index b2835d778c3..2882d863024 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala @@ -314,7 +314,7 @@ class TransactionStateManager(brokerId: Int, readAtLeastOneRecord = fetchDataInfo.records.sizeInBytes > 0 - val memRecords = fetchDataInfo.records match { + val memRecords = (fetchDataInfo.records: @unchecked) match { case records: MemoryRecords => records case fileRecords: FileRecords => val sizeInBytes = fileRecords.sizeInBytes diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 173357e0cc9..15dc9cedd10 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -2737,4 +2737,4 @@ case object LogDeletion extends SegmentDeletionReason { override def logReason(log: Log, toDelete: List[LogSegment]): Unit = { log.info(s"Deleting segments as the log has been deleted: ${toDelete.mkString(",")}") } -} \ No newline at end of file +} diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala index 064c0e85e7f..3945e8e15c1 100644 --- a/core/src/main/scala/kafka/network/RequestChannel.scala +++ b/core/src/main/scala/kafka/network/RequestChannel.scala @@ -308,7 +308,7 @@ object RequestChannel extends Logging { } - abstract class Response(val request: Request) { + sealed abstract class Response(val request: Request) { def processor: Int = request.processor diff --git a/core/src/main/scala/kafka/raft/KafkaNetworkChannel.scala b/core/src/main/scala/kafka/raft/KafkaNetworkChannel.scala index 9a91b33cf72..7f769c8463e 100644 --- a/core/src/main/scala/kafka/raft/KafkaNetworkChannel.scala +++ b/core/src/main/scala/kafka/raft/KafkaNetworkChannel.scala @@ -45,6 +45,8 @@ object KafkaNetworkChannel { new EndQuorumEpochResponse(endEpochResponse) case fetchResponse: FetchResponseData => new FetchResponse(fetchResponse) + case _ => + throw new IllegalArgumentException(s"Unexpected type for responseData: $responseData") } } @@ -61,6 +63,8 @@ object KafkaNetworkChannel { new AbstractRequest.Builder[FetchRequest](ApiKeys.FETCH) { override def build(version: Short): FetchRequest = new FetchRequest(fetchRequest, version) } + case _ => + throw new IllegalArgumentException(s"Unexpected type for requestData: $requestData") } } @@ -70,6 +74,7 @@ object KafkaNetworkChannel { case beginEpochResponse: BeginQuorumEpochResponse => beginEpochResponse.data case endEpochResponse: EndQuorumEpochResponse => endEpochResponse.data case fetchResponse: FetchResponse[_] => fetchResponse.data + case _ => throw new IllegalArgumentException(s"Unexpected type for response: $response") } } @@ -79,6 +84,7 @@ object KafkaNetworkChannel { case beginEpochRequest: BeginQuorumEpochRequest => beginEpochRequest.data case endEpochRequest: EndQuorumEpochRequest => endEpochRequest.data case fetchRequest: FetchRequest => fetchRequest.data + case _ => throw new IllegalArgumentException(s"Unexpected type for request: $request") } } diff --git a/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala b/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala index f78e9251316..0a60e51659c 100644 --- a/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala +++ b/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala @@ -287,12 +287,9 @@ class AclAuthorizer extends Authorizer with Logging { }.map(CompletableFuture.completedFuture[AclDeleteResult]).asJava } - @nowarn("cat=optimizer") override def acls(filter: AclBindingFilter): lang.Iterable[AclBinding] = { val aclBindings = new util.ArrayList[AclBinding]() - // Using `forKeyValue` triggers a scalac bug related to suppression of optimizer warnings, we - // should change this code once that's fixed - aclCache.foreach { case (resource, versionedAcls) => + aclCache.forKeyValue { case (resource, versionedAcls) => versionedAcls.acls.foreach { acl => val binding = new AclBinding(resource, acl.ace) if (filter.matches(binding)) @@ -542,7 +539,6 @@ class AclAuthorizer extends Authorizer with Logging { } } - @nowarn("cat=optimizer") private def getAclsFromCache(resource: ResourcePattern): VersionedAcls = { aclCache.getOrElse(resource, throw new IllegalArgumentException(s"ACLs do not exist in the cache for resource $resource")) } diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index 52985f9f785..edaa6b71e5b 100755 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -679,7 +679,7 @@ abstract class AbstractFetcherThread(name: String, } protected def toMemoryRecords(records: Records): MemoryRecords = { - records match { + (records: @unchecked) match { case r: MemoryRecords => r case r: FileRecords => val buffer = ByteBuffer.allocate(r.sizeInBytes) diff --git a/core/src/main/scala/kafka/server/ClientQuotaManager.scala b/core/src/main/scala/kafka/server/ClientQuotaManager.scala index 69676ec3afe..e32978cf171 100644 --- a/core/src/main/scala/kafka/server/ClientQuotaManager.scala +++ b/core/src/main/scala/kafka/server/ClientQuotaManager.scala @@ -80,7 +80,9 @@ object ClientQuotaManager { val DefaultUserQuotaEntity = KafkaQuotaEntity(Some(DefaultUserEntity), None) val DefaultUserClientIdQuotaEntity = KafkaQuotaEntity(Some(DefaultUserEntity), Some(DefaultClientIdEntity)) - case class UserEntity(sanitizedUser: String) extends ClientQuotaEntity.ConfigEntity { + sealed trait BaseUserEntity extends ClientQuotaEntity.ConfigEntity + + case class UserEntity(sanitizedUser: String) extends BaseUserEntity { override def entityType: ClientQuotaEntity.ConfigEntityType = ClientQuotaEntity.ConfigEntityType.USER override def name: String = Sanitizer.desanitize(sanitizedUser) override def toString: String = s"user $sanitizedUser" @@ -92,7 +94,7 @@ object ClientQuotaManager { override def toString: String = s"client-id $clientId" } - case object DefaultUserEntity extends ClientQuotaEntity.ConfigEntity { + case object DefaultUserEntity extends BaseUserEntity { override def entityType: ClientQuotaEntity.ConfigEntityType = ClientQuotaEntity.ConfigEntityType.DEFAULT_USER override def name: String = ConfigEntityName.Default override def toString: String = "default user" @@ -104,7 +106,7 @@ object ClientQuotaManager { override def toString: String = "default client-id" } - case class KafkaQuotaEntity(userEntity: Option[ClientQuotaEntity.ConfigEntity], + case class KafkaQuotaEntity(userEntity: Option[BaseUserEntity], clientIdEntity: Option[ClientQuotaEntity.ConfigEntity]) extends ClientQuotaEntity { override def configEntities: util.List[ClientQuotaEntity.ConfigEntity] = (userEntity.toList ++ clientIdEntity.toList).asJava diff --git a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala index a131acdda7a..96ef6d51793 100755 --- a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala +++ b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala @@ -195,7 +195,10 @@ class ZooKeeperClient(connectString: String, def responseMetadata(sendTimeMs: Long) = new ResponseMetadata(sendTimeMs, receivedTimeMs = time.hiResClockMs()) val sendTimeMs = time.hiResClockMs() - request match { + + // Cast to AsyncRequest to workaround a scalac bug that results in an false exhaustiveness warning + // with -Xlint:strict-unsealed-patmat + (request: AsyncRequest) match { case ExistsRequest(path, ctx) => zooKeeper.exists(path, shouldWatch(request), new StatCallback { def processResult(rc: Int, path: String, ctx: Any, stat: Stat): Unit = diff --git a/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala index d74805f0676..356de957e22 100644 --- a/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala +++ b/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala @@ -38,16 +38,13 @@ object SaslPlainSslEndToEndAuthorizationTest { class TestPrincipalBuilder extends KafkaPrincipalBuilder { override def build(context: AuthenticationContext): KafkaPrincipal = { - context match { - case ctx: SaslAuthenticationContext => - ctx.server.getAuthorizationID match { - case KafkaPlainAdmin => - new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "admin") - case KafkaPlainUser => - new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user") - case _ => - KafkaPrincipal.ANONYMOUS - } + context.asInstanceOf[SaslAuthenticationContext].server.getAuthorizationID match { + case KafkaPlainAdmin => + new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "admin") + case KafkaPlainUser => + new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user") + case _ => + KafkaPrincipal.ANONYMOUS } } } diff --git a/core/src/test/scala/integration/kafka/api/SslEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/SslEndToEndAuthorizationTest.scala index 4567b713c5d..b705050c1b1 100644 --- a/core/src/test/scala/integration/kafka/api/SslEndToEndAuthorizationTest.scala +++ b/core/src/test/scala/integration/kafka/api/SslEndToEndAuthorizationTest.scala @@ -34,16 +34,13 @@ object SslEndToEndAuthorizationTest { // Use full DN as client principal to test special characters in principal // Use field from DN as server principal to test custom PrincipalBuilder override def build(context: AuthenticationContext): KafkaPrincipal = { - context match { - case ctx: SslAuthenticationContext => - val peerPrincipal = ctx.session.getPeerPrincipal.getName - peerPrincipal match { - case Pattern(name, _) => - val principal = if (name == "server") name else peerPrincipal - new KafkaPrincipal(KafkaPrincipal.USER_TYPE, principal) - case _ => - KafkaPrincipal.ANONYMOUS - } + val peerPrincipal = context.asInstanceOf[SslAuthenticationContext].session.getPeerPrincipal.getName + peerPrincipal match { + case Pattern(name, _) => + val principal = if (name == "server") name else peerPrincipal + new KafkaPrincipal(KafkaPrincipal.USER_TYPE, principal) + case _ => + KafkaPrincipal.ANONYMOUS } } } diff --git a/core/src/test/scala/unit/kafka/raft/KafkaNetworkChannelTest.scala b/core/src/test/scala/unit/kafka/raft/KafkaNetworkChannelTest.scala index 71697589650..699e8faa711 100644 --- a/core/src/test/scala/unit/kafka/raft/KafkaNetworkChannelTest.scala +++ b/core/src/test/scala/unit/kafka/raft/KafkaNetworkChannelTest.scala @@ -207,7 +207,7 @@ class KafkaNetworkChannelTest { } private def extractError(response: ApiMessage): Errors = { - val code = response match { + val code = (response: @unchecked) match { case res: BeginQuorumEpochResponseData => res.errorCode case res: EndQuorumEpochResponseData => res.errorCode case res: FetchResponseData => res.errorCode diff --git a/core/src/test/scala/unit/kafka/server/BaseClientQuotaManagerTest.scala b/core/src/test/scala/unit/kafka/server/BaseClientQuotaManagerTest.scala index d2c6d4b2dec..3a4dd9487d1 100644 --- a/core/src/test/scala/unit/kafka/server/BaseClientQuotaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/BaseClientQuotaManagerTest.scala @@ -53,7 +53,7 @@ class BaseClientQuotaManagerTest { protected def callback(response: RequestChannel.Response): Unit = { // Count how many times this callback is called for notifyThrottlingDone(). - response match { + (response: @unchecked) match { case _: StartThrottlingResponse => case _: EndThrottlingResponse => numCallbacks += 1 } diff --git a/core/src/test/scala/unit/kafka/server/ThrottledChannelExpirationTest.scala b/core/src/test/scala/unit/kafka/server/ThrottledChannelExpirationTest.scala index ff33084948c..02e36413b86 100644 --- a/core/src/test/scala/unit/kafka/server/ThrottledChannelExpirationTest.scala +++ b/core/src/test/scala/unit/kafka/server/ThrottledChannelExpirationTest.scala @@ -62,7 +62,7 @@ class ThrottledChannelExpirationTest { } def callback(response: Response): Unit = { - response match { + (response: @unchecked) match { case _: StartThrottlingResponse => numCallbacksForStartThrottling += 1 case _: EndThrottlingResponse => numCallbacksForEndThrottling += 1 } diff --git a/gradle.properties b/gradle.properties index 5ce35dbfb87..6c73c38857b 100644 --- a/gradle.properties +++ b/gradle.properties @@ -21,6 +21,6 @@ group=org.apache.kafka # - tests/kafkatest/version.py (variable DEV_VERSION) # - kafka-merge-pr.py version=2.8.0-SNAPSHOT -scalaVersion=2.13.3 +scalaVersion=2.13.4 task=build org.gradle.jvmargs=-Xmx2g -Xss4m -XX:+UseParallelGC diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index b96c25c224b..f4868e95761 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -28,7 +28,7 @@ ext { // Add Scala version def defaultScala212Version = '2.12.12' -def defaultScala213Version = '2.13.3' +def defaultScala213Version = '2.13.4' if (hasProperty('scalaVersion')) { if (scalaVersion == '2.12') { versions["scala"] = defaultScala212Version @@ -101,7 +101,7 @@ versions += [ powermock: "2.0.9", reflections: "0.9.12", rocksDB: "5.18.4", - scalaCollectionCompat: "2.2.0", + scalaCollectionCompat: "2.3.0", scalafmt: "1.5.1", scalaJava8Compat : "0.9.1", scalatest: "3.0.8", diff --git a/gradle/spotbugs-exclude.xml b/gradle/spotbugs-exclude.xml index f395d0a7765..9115e0d59ae 100644 --- a/gradle/spotbugs-exclude.xml +++ b/gradle/spotbugs-exclude.xml @@ -124,6 +124,14 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read + + + + + +