mirror of https://github.com/apache/kafka.git
MINOR: Upgrade to Scala 2.13.4 (#9643)
Scala 2.13.4 restores default global `ExecutionContext` to 2.12 behavior (to fix a perf regression in some use cases) and improves pattern matching (especially exhaustiveness checking). Most of the changes are related to the latter as I have enabled the newly introduced `-Xlint:strict-unsealed-patmat`. More details on the code changes: * Don't swallow exception in `ReassignPartitionsCommand.topicDescriptionFutureToState`. * `RequestChannel.Response` should be `sealed`. * Introduce sealed ClientQuotaManager.BaseUserEntity to avoid false positive exhaustiveness warning. * Handle a number of cases where pattern matches were not exhaustive: either by marking them with @unchecked or by adding a catch-all clause. * Workaround scalac bug related to exhaustiveness warnings in ZooKeeperClient * Remove warning suppression annotations related to the optimizer that are no longer needed in ConsumerGroupCommand and AclAuthorizer. * Use `forKeyValue` in `AclAuthorizer.acls` as the scala bug preventing us from using it seems to be fixed. * Also update scalaCollectionCompat to 2.3.0, which includes minor improvements. Full release notes: * https://github.com/scala/scala/releases/tag/v2.13.4 * https://github.com/scala/scala-collection-compat/releases/tag/v2.3.0 Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
parent
a5986bd32d
commit
cbf8ad277a
|
@ -48,7 +48,7 @@ should_include_file() {
|
||||||
base_dir=$(dirname $0)/..
|
base_dir=$(dirname $0)/..
|
||||||
|
|
||||||
if [ -z "$SCALA_VERSION" ]; then
|
if [ -z "$SCALA_VERSION" ]; then
|
||||||
SCALA_VERSION=2.13.3
|
SCALA_VERSION=2.13.4
|
||||||
if [[ -f "$base_dir/gradle.properties" ]]; then
|
if [[ -f "$base_dir/gradle.properties" ]]; then
|
||||||
SCALA_VERSION=`grep "^scalaVersion=" "$base_dir/gradle.properties" | cut -d= -f 2`
|
SCALA_VERSION=`grep "^scalaVersion=" "$base_dir/gradle.properties" | cut -d= -f 2`
|
||||||
fi
|
fi
|
||||||
|
|
|
@ -27,7 +27,7 @@ set BASE_DIR=%CD%
|
||||||
popd
|
popd
|
||||||
|
|
||||||
IF ["%SCALA_VERSION%"] EQU [""] (
|
IF ["%SCALA_VERSION%"] EQU [""] (
|
||||||
set SCALA_VERSION=2.13.3
|
set SCALA_VERSION=2.13.4
|
||||||
)
|
)
|
||||||
|
|
||||||
IF ["%SCALA_BINARY_VERSION%"] EQU [""] (
|
IF ["%SCALA_BINARY_VERSION%"] EQU [""] (
|
||||||
|
|
|
@ -525,7 +525,7 @@ subprojects {
|
||||||
scalaCompileOptions.additionalParameters += inlineFrom
|
scalaCompileOptions.additionalParameters += inlineFrom
|
||||||
|
|
||||||
if (versions.baseScala != '2.12') {
|
if (versions.baseScala != '2.12') {
|
||||||
scalaCompileOptions.additionalParameters += ["-opt-warnings"]
|
scalaCompileOptions.additionalParameters += ["-opt-warnings", "-Xlint:strict-unsealed-patmat"]
|
||||||
// Scala 2.13.2 introduces compiler warnings suppression, which is a pre-requisite for -Xfatal-warnings
|
// Scala 2.13.2 introduces compiler warnings suppression, which is a pre-requisite for -Xfatal-warnings
|
||||||
scalaCompileOptions.additionalParameters += ["-Xfatal-warnings"]
|
scalaCompileOptions.additionalParameters += ["-Xfatal-warnings"]
|
||||||
}
|
}
|
||||||
|
|
|
@ -480,6 +480,7 @@ object ConfigCommand extends Config {
|
||||||
describeResourceConfig(adminClient, entityTypes.head, entityNames.headOption, describeAll)
|
describeResourceConfig(adminClient, entityTypes.head, entityNames.headOption, describeAll)
|
||||||
case ConfigType.User | ConfigType.Client =>
|
case ConfigType.User | ConfigType.Client =>
|
||||||
describeClientQuotaAndUserScramCredentialConfigs(adminClient, entityTypes, entityNames)
|
describeClientQuotaAndUserScramCredentialConfigs(adminClient, entityTypes, entityNames)
|
||||||
|
case entityType => throw new IllegalArgumentException(s"Invalid entity type: $entityType")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -491,6 +492,7 @@ object ConfigCommand extends Config {
|
||||||
adminClient.listTopics(new ListTopicsOptions().listInternal(true)).names().get().asScala.toSeq
|
adminClient.listTopics(new ListTopicsOptions().listInternal(true)).names().get().asScala.toSeq
|
||||||
case ConfigType.Broker | BrokerLoggerConfigType =>
|
case ConfigType.Broker | BrokerLoggerConfigType =>
|
||||||
adminClient.describeCluster(new DescribeClusterOptions()).nodes().get().asScala.map(_.idString).toSeq :+ BrokerDefaultEntityName
|
adminClient.describeCluster(new DescribeClusterOptions()).nodes().get().asScala.map(_.idString).toSeq :+ BrokerDefaultEntityName
|
||||||
|
case entityType => throw new IllegalArgumentException(s"Invalid entity type: $entityType")
|
||||||
})
|
})
|
||||||
|
|
||||||
entities.foreach { entity =>
|
entities.foreach { entity =>
|
||||||
|
@ -530,6 +532,7 @@ object ConfigCommand extends Config {
|
||||||
if (!entityName.isEmpty)
|
if (!entityName.isEmpty)
|
||||||
validateBrokerId()
|
validateBrokerId()
|
||||||
(ConfigResource.Type.BROKER_LOGGER, None)
|
(ConfigResource.Type.BROKER_LOGGER, None)
|
||||||
|
case entityType => throw new IllegalArgumentException(s"Invalid entity type: $entityType")
|
||||||
}
|
}
|
||||||
|
|
||||||
val configSourceFilter = if (describeAll)
|
val configSourceFilter = if (describeAll)
|
||||||
|
|
|
@ -44,8 +44,6 @@ import org.apache.kafka.common.requests.ListOffsetResponse
|
||||||
import org.apache.kafka.common.ConsumerGroupState
|
import org.apache.kafka.common.ConsumerGroupState
|
||||||
import joptsimple.OptionException
|
import joptsimple.OptionException
|
||||||
|
|
||||||
import scala.annotation.nowarn
|
|
||||||
|
|
||||||
object ConsumerGroupCommand extends Logging {
|
object ConsumerGroupCommand extends Logging {
|
||||||
|
|
||||||
def main(args: Array[String]): Unit = {
|
def main(args: Array[String]): Unit = {
|
||||||
|
@ -151,22 +149,24 @@ object ConsumerGroupCommand extends Logging {
|
||||||
private[admin] case class CsvUtils() {
|
private[admin] case class CsvUtils() {
|
||||||
val mapper = new CsvMapper with ScalaObjectMapper
|
val mapper = new CsvMapper with ScalaObjectMapper
|
||||||
mapper.registerModule(DefaultScalaModule)
|
mapper.registerModule(DefaultScalaModule)
|
||||||
def readerFor[T <: CsvRecord: ClassTag] = {
|
def readerFor[T <: CsvRecord : ClassTag] = {
|
||||||
val schema = getSchema[T]
|
val schema = getSchema[T]
|
||||||
val clazz = implicitly[ClassTag[T]].runtimeClass
|
val clazz = implicitly[ClassTag[T]].runtimeClass
|
||||||
mapper.readerFor(clazz).`with`(schema)
|
mapper.readerFor(clazz).`with`(schema)
|
||||||
}
|
}
|
||||||
def writerFor[T <: CsvRecord: ClassTag] = {
|
def writerFor[T <: CsvRecord : ClassTag] = {
|
||||||
val schema = getSchema[T]
|
val schema = getSchema[T]
|
||||||
val clazz = implicitly[ClassTag[T]].runtimeClass
|
val clazz = implicitly[ClassTag[T]].runtimeClass
|
||||||
mapper.writerFor(clazz).`with`(schema)
|
mapper.writerFor(clazz).`with`(schema)
|
||||||
}
|
}
|
||||||
private def getSchema[T <: CsvRecord: ClassTag] = {
|
private def getSchema[T <: CsvRecord : ClassTag] = {
|
||||||
val clazz = implicitly[ClassTag[T]].runtimeClass
|
val clazz = implicitly[ClassTag[T]].runtimeClass
|
||||||
val fields = clazz match {
|
|
||||||
case _ if classOf[CsvRecordWithGroup] == clazz => CsvRecordWithGroup.fields
|
val fields =
|
||||||
case _ if classOf[CsvRecordNoGroup] == clazz => CsvRecordNoGroup.fields
|
if (classOf[CsvRecordWithGroup] == clazz) CsvRecordWithGroup.fields
|
||||||
}
|
else if (classOf[CsvRecordNoGroup] == clazz) CsvRecordNoGroup.fields
|
||||||
|
else throw new IllegalStateException(s"Unhandled class $clazz")
|
||||||
|
|
||||||
val schema = mapper.schemaFor(clazz).sortedBy(fields: _*)
|
val schema = mapper.schemaFor(clazz).sortedBy(fields: _*)
|
||||||
schema
|
schema
|
||||||
}
|
}
|
||||||
|
@ -555,7 +555,6 @@ object ConsumerGroupCommand extends Logging {
|
||||||
/**
|
/**
|
||||||
* Returns the state of the specified consumer group and partition assignment states
|
* Returns the state of the specified consumer group and partition assignment states
|
||||||
*/
|
*/
|
||||||
@nowarn("cat=optimizer")
|
|
||||||
def collectGroupOffsets(groupId: String): (Option[String], Option[Seq[PartitionAssignmentState]]) = {
|
def collectGroupOffsets(groupId: String): (Option[String], Option[Seq[PartitionAssignmentState]]) = {
|
||||||
collectGroupsOffsets(List(groupId)).getOrElse(groupId, (None, None))
|
collectGroupsOffsets(List(groupId)).getOrElse(groupId, (None, None))
|
||||||
}
|
}
|
||||||
|
|
|
@ -479,8 +479,7 @@ object ReassignPartitionsCommand extends Logging {
|
||||||
|
|
||||||
private def topicDescriptionFutureToState(partition: Int,
|
private def topicDescriptionFutureToState(partition: Int,
|
||||||
future: KafkaFuture[TopicDescription],
|
future: KafkaFuture[TopicDescription],
|
||||||
targetReplicas: Seq[Int])
|
targetReplicas: Seq[Int]): PartitionReassignmentState = {
|
||||||
: PartitionReassignmentState = {
|
|
||||||
try {
|
try {
|
||||||
val topicDescription = future.get()
|
val topicDescription = future.get()
|
||||||
if (topicDescription.partitions().size() < partition) {
|
if (topicDescription.partitions().size() < partition) {
|
||||||
|
@ -494,7 +493,8 @@ object ReassignPartitionsCommand extends Logging {
|
||||||
case t: ExecutionException =>
|
case t: ExecutionException =>
|
||||||
t.getCause match {
|
t.getCause match {
|
||||||
case _: UnknownTopicOrPartitionException =>
|
case _: UnknownTopicOrPartitionException =>
|
||||||
new PartitionReassignmentState(Seq(), targetReplicas, true)
|
PartitionReassignmentState(Seq(), targetReplicas, true)
|
||||||
|
case e => throw e
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -589,7 +589,7 @@ class GroupMetadataManager(brokerId: Int,
|
||||||
|
|
||||||
readAtLeastOneRecord = fetchDataInfo.records.sizeInBytes > 0
|
readAtLeastOneRecord = fetchDataInfo.records.sizeInBytes > 0
|
||||||
|
|
||||||
val memRecords = fetchDataInfo.records match {
|
val memRecords = (fetchDataInfo.records: @unchecked) match {
|
||||||
case records: MemoryRecords => records
|
case records: MemoryRecords => records
|
||||||
case fileRecords: FileRecords =>
|
case fileRecords: FileRecords =>
|
||||||
val sizeInBytes = fileRecords.sizeInBytes
|
val sizeInBytes = fileRecords.sizeInBytes
|
||||||
|
|
|
@ -314,7 +314,7 @@ class TransactionStateManager(brokerId: Int,
|
||||||
|
|
||||||
readAtLeastOneRecord = fetchDataInfo.records.sizeInBytes > 0
|
readAtLeastOneRecord = fetchDataInfo.records.sizeInBytes > 0
|
||||||
|
|
||||||
val memRecords = fetchDataInfo.records match {
|
val memRecords = (fetchDataInfo.records: @unchecked) match {
|
||||||
case records: MemoryRecords => records
|
case records: MemoryRecords => records
|
||||||
case fileRecords: FileRecords =>
|
case fileRecords: FileRecords =>
|
||||||
val sizeInBytes = fileRecords.sizeInBytes
|
val sizeInBytes = fileRecords.sizeInBytes
|
||||||
|
|
|
@ -2737,4 +2737,4 @@ case object LogDeletion extends SegmentDeletionReason {
|
||||||
override def logReason(log: Log, toDelete: List[LogSegment]): Unit = {
|
override def logReason(log: Log, toDelete: List[LogSegment]): Unit = {
|
||||||
log.info(s"Deleting segments as the log has been deleted: ${toDelete.mkString(",")}")
|
log.info(s"Deleting segments as the log has been deleted: ${toDelete.mkString(",")}")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -308,7 +308,7 @@ object RequestChannel extends Logging {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
abstract class Response(val request: Request) {
|
sealed abstract class Response(val request: Request) {
|
||||||
|
|
||||||
def processor: Int = request.processor
|
def processor: Int = request.processor
|
||||||
|
|
||||||
|
|
|
@ -45,6 +45,8 @@ object KafkaNetworkChannel {
|
||||||
new EndQuorumEpochResponse(endEpochResponse)
|
new EndQuorumEpochResponse(endEpochResponse)
|
||||||
case fetchResponse: FetchResponseData =>
|
case fetchResponse: FetchResponseData =>
|
||||||
new FetchResponse(fetchResponse)
|
new FetchResponse(fetchResponse)
|
||||||
|
case _ =>
|
||||||
|
throw new IllegalArgumentException(s"Unexpected type for responseData: $responseData")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -61,6 +63,8 @@ object KafkaNetworkChannel {
|
||||||
new AbstractRequest.Builder[FetchRequest](ApiKeys.FETCH) {
|
new AbstractRequest.Builder[FetchRequest](ApiKeys.FETCH) {
|
||||||
override def build(version: Short): FetchRequest = new FetchRequest(fetchRequest, version)
|
override def build(version: Short): FetchRequest = new FetchRequest(fetchRequest, version)
|
||||||
}
|
}
|
||||||
|
case _ =>
|
||||||
|
throw new IllegalArgumentException(s"Unexpected type for requestData: $requestData")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -70,6 +74,7 @@ object KafkaNetworkChannel {
|
||||||
case beginEpochResponse: BeginQuorumEpochResponse => beginEpochResponse.data
|
case beginEpochResponse: BeginQuorumEpochResponse => beginEpochResponse.data
|
||||||
case endEpochResponse: EndQuorumEpochResponse => endEpochResponse.data
|
case endEpochResponse: EndQuorumEpochResponse => endEpochResponse.data
|
||||||
case fetchResponse: FetchResponse[_] => fetchResponse.data
|
case fetchResponse: FetchResponse[_] => fetchResponse.data
|
||||||
|
case _ => throw new IllegalArgumentException(s"Unexpected type for response: $response")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -79,6 +84,7 @@ object KafkaNetworkChannel {
|
||||||
case beginEpochRequest: BeginQuorumEpochRequest => beginEpochRequest.data
|
case beginEpochRequest: BeginQuorumEpochRequest => beginEpochRequest.data
|
||||||
case endEpochRequest: EndQuorumEpochRequest => endEpochRequest.data
|
case endEpochRequest: EndQuorumEpochRequest => endEpochRequest.data
|
||||||
case fetchRequest: FetchRequest => fetchRequest.data
|
case fetchRequest: FetchRequest => fetchRequest.data
|
||||||
|
case _ => throw new IllegalArgumentException(s"Unexpected type for request: $request")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -287,12 +287,9 @@ class AclAuthorizer extends Authorizer with Logging {
|
||||||
}.map(CompletableFuture.completedFuture[AclDeleteResult]).asJava
|
}.map(CompletableFuture.completedFuture[AclDeleteResult]).asJava
|
||||||
}
|
}
|
||||||
|
|
||||||
@nowarn("cat=optimizer")
|
|
||||||
override def acls(filter: AclBindingFilter): lang.Iterable[AclBinding] = {
|
override def acls(filter: AclBindingFilter): lang.Iterable[AclBinding] = {
|
||||||
val aclBindings = new util.ArrayList[AclBinding]()
|
val aclBindings = new util.ArrayList[AclBinding]()
|
||||||
// Using `forKeyValue` triggers a scalac bug related to suppression of optimizer warnings, we
|
aclCache.forKeyValue { case (resource, versionedAcls) =>
|
||||||
// should change this code once that's fixed
|
|
||||||
aclCache.foreach { case (resource, versionedAcls) =>
|
|
||||||
versionedAcls.acls.foreach { acl =>
|
versionedAcls.acls.foreach { acl =>
|
||||||
val binding = new AclBinding(resource, acl.ace)
|
val binding = new AclBinding(resource, acl.ace)
|
||||||
if (filter.matches(binding))
|
if (filter.matches(binding))
|
||||||
|
@ -542,7 +539,6 @@ class AclAuthorizer extends Authorizer with Logging {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@nowarn("cat=optimizer")
|
|
||||||
private def getAclsFromCache(resource: ResourcePattern): VersionedAcls = {
|
private def getAclsFromCache(resource: ResourcePattern): VersionedAcls = {
|
||||||
aclCache.getOrElse(resource, throw new IllegalArgumentException(s"ACLs do not exist in the cache for resource $resource"))
|
aclCache.getOrElse(resource, throw new IllegalArgumentException(s"ACLs do not exist in the cache for resource $resource"))
|
||||||
}
|
}
|
||||||
|
|
|
@ -679,7 +679,7 @@ abstract class AbstractFetcherThread(name: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
protected def toMemoryRecords(records: Records): MemoryRecords = {
|
protected def toMemoryRecords(records: Records): MemoryRecords = {
|
||||||
records match {
|
(records: @unchecked) match {
|
||||||
case r: MemoryRecords => r
|
case r: MemoryRecords => r
|
||||||
case r: FileRecords =>
|
case r: FileRecords =>
|
||||||
val buffer = ByteBuffer.allocate(r.sizeInBytes)
|
val buffer = ByteBuffer.allocate(r.sizeInBytes)
|
||||||
|
|
|
@ -80,7 +80,9 @@ object ClientQuotaManager {
|
||||||
val DefaultUserQuotaEntity = KafkaQuotaEntity(Some(DefaultUserEntity), None)
|
val DefaultUserQuotaEntity = KafkaQuotaEntity(Some(DefaultUserEntity), None)
|
||||||
val DefaultUserClientIdQuotaEntity = KafkaQuotaEntity(Some(DefaultUserEntity), Some(DefaultClientIdEntity))
|
val DefaultUserClientIdQuotaEntity = KafkaQuotaEntity(Some(DefaultUserEntity), Some(DefaultClientIdEntity))
|
||||||
|
|
||||||
case class UserEntity(sanitizedUser: String) extends ClientQuotaEntity.ConfigEntity {
|
sealed trait BaseUserEntity extends ClientQuotaEntity.ConfigEntity
|
||||||
|
|
||||||
|
case class UserEntity(sanitizedUser: String) extends BaseUserEntity {
|
||||||
override def entityType: ClientQuotaEntity.ConfigEntityType = ClientQuotaEntity.ConfigEntityType.USER
|
override def entityType: ClientQuotaEntity.ConfigEntityType = ClientQuotaEntity.ConfigEntityType.USER
|
||||||
override def name: String = Sanitizer.desanitize(sanitizedUser)
|
override def name: String = Sanitizer.desanitize(sanitizedUser)
|
||||||
override def toString: String = s"user $sanitizedUser"
|
override def toString: String = s"user $sanitizedUser"
|
||||||
|
@ -92,7 +94,7 @@ object ClientQuotaManager {
|
||||||
override def toString: String = s"client-id $clientId"
|
override def toString: String = s"client-id $clientId"
|
||||||
}
|
}
|
||||||
|
|
||||||
case object DefaultUserEntity extends ClientQuotaEntity.ConfigEntity {
|
case object DefaultUserEntity extends BaseUserEntity {
|
||||||
override def entityType: ClientQuotaEntity.ConfigEntityType = ClientQuotaEntity.ConfigEntityType.DEFAULT_USER
|
override def entityType: ClientQuotaEntity.ConfigEntityType = ClientQuotaEntity.ConfigEntityType.DEFAULT_USER
|
||||||
override def name: String = ConfigEntityName.Default
|
override def name: String = ConfigEntityName.Default
|
||||||
override def toString: String = "default user"
|
override def toString: String = "default user"
|
||||||
|
@ -104,7 +106,7 @@ object ClientQuotaManager {
|
||||||
override def toString: String = "default client-id"
|
override def toString: String = "default client-id"
|
||||||
}
|
}
|
||||||
|
|
||||||
case class KafkaQuotaEntity(userEntity: Option[ClientQuotaEntity.ConfigEntity],
|
case class KafkaQuotaEntity(userEntity: Option[BaseUserEntity],
|
||||||
clientIdEntity: Option[ClientQuotaEntity.ConfigEntity]) extends ClientQuotaEntity {
|
clientIdEntity: Option[ClientQuotaEntity.ConfigEntity]) extends ClientQuotaEntity {
|
||||||
override def configEntities: util.List[ClientQuotaEntity.ConfigEntity] =
|
override def configEntities: util.List[ClientQuotaEntity.ConfigEntity] =
|
||||||
(userEntity.toList ++ clientIdEntity.toList).asJava
|
(userEntity.toList ++ clientIdEntity.toList).asJava
|
||||||
|
|
|
@ -195,7 +195,10 @@ class ZooKeeperClient(connectString: String,
|
||||||
def responseMetadata(sendTimeMs: Long) = new ResponseMetadata(sendTimeMs, receivedTimeMs = time.hiResClockMs())
|
def responseMetadata(sendTimeMs: Long) = new ResponseMetadata(sendTimeMs, receivedTimeMs = time.hiResClockMs())
|
||||||
|
|
||||||
val sendTimeMs = time.hiResClockMs()
|
val sendTimeMs = time.hiResClockMs()
|
||||||
request match {
|
|
||||||
|
// Cast to AsyncRequest to workaround a scalac bug that results in an false exhaustiveness warning
|
||||||
|
// with -Xlint:strict-unsealed-patmat
|
||||||
|
(request: AsyncRequest) match {
|
||||||
case ExistsRequest(path, ctx) =>
|
case ExistsRequest(path, ctx) =>
|
||||||
zooKeeper.exists(path, shouldWatch(request), new StatCallback {
|
zooKeeper.exists(path, shouldWatch(request), new StatCallback {
|
||||||
def processResult(rc: Int, path: String, ctx: Any, stat: Stat): Unit =
|
def processResult(rc: Int, path: String, ctx: Any, stat: Stat): Unit =
|
||||||
|
|
|
@ -38,16 +38,13 @@ object SaslPlainSslEndToEndAuthorizationTest {
|
||||||
class TestPrincipalBuilder extends KafkaPrincipalBuilder {
|
class TestPrincipalBuilder extends KafkaPrincipalBuilder {
|
||||||
|
|
||||||
override def build(context: AuthenticationContext): KafkaPrincipal = {
|
override def build(context: AuthenticationContext): KafkaPrincipal = {
|
||||||
context match {
|
context.asInstanceOf[SaslAuthenticationContext].server.getAuthorizationID match {
|
||||||
case ctx: SaslAuthenticationContext =>
|
case KafkaPlainAdmin =>
|
||||||
ctx.server.getAuthorizationID match {
|
new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "admin")
|
||||||
case KafkaPlainAdmin =>
|
case KafkaPlainUser =>
|
||||||
new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "admin")
|
new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user")
|
||||||
case KafkaPlainUser =>
|
case _ =>
|
||||||
new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user")
|
KafkaPrincipal.ANONYMOUS
|
||||||
case _ =>
|
|
||||||
KafkaPrincipal.ANONYMOUS
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -34,16 +34,13 @@ object SslEndToEndAuthorizationTest {
|
||||||
// Use full DN as client principal to test special characters in principal
|
// Use full DN as client principal to test special characters in principal
|
||||||
// Use field from DN as server principal to test custom PrincipalBuilder
|
// Use field from DN as server principal to test custom PrincipalBuilder
|
||||||
override def build(context: AuthenticationContext): KafkaPrincipal = {
|
override def build(context: AuthenticationContext): KafkaPrincipal = {
|
||||||
context match {
|
val peerPrincipal = context.asInstanceOf[SslAuthenticationContext].session.getPeerPrincipal.getName
|
||||||
case ctx: SslAuthenticationContext =>
|
peerPrincipal match {
|
||||||
val peerPrincipal = ctx.session.getPeerPrincipal.getName
|
case Pattern(name, _) =>
|
||||||
peerPrincipal match {
|
val principal = if (name == "server") name else peerPrincipal
|
||||||
case Pattern(name, _) =>
|
new KafkaPrincipal(KafkaPrincipal.USER_TYPE, principal)
|
||||||
val principal = if (name == "server") name else peerPrincipal
|
case _ =>
|
||||||
new KafkaPrincipal(KafkaPrincipal.USER_TYPE, principal)
|
KafkaPrincipal.ANONYMOUS
|
||||||
case _ =>
|
|
||||||
KafkaPrincipal.ANONYMOUS
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -207,7 +207,7 @@ class KafkaNetworkChannelTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
private def extractError(response: ApiMessage): Errors = {
|
private def extractError(response: ApiMessage): Errors = {
|
||||||
val code = response match {
|
val code = (response: @unchecked) match {
|
||||||
case res: BeginQuorumEpochResponseData => res.errorCode
|
case res: BeginQuorumEpochResponseData => res.errorCode
|
||||||
case res: EndQuorumEpochResponseData => res.errorCode
|
case res: EndQuorumEpochResponseData => res.errorCode
|
||||||
case res: FetchResponseData => res.errorCode
|
case res: FetchResponseData => res.errorCode
|
||||||
|
|
|
@ -53,7 +53,7 @@ class BaseClientQuotaManagerTest {
|
||||||
|
|
||||||
protected def callback(response: RequestChannel.Response): Unit = {
|
protected def callback(response: RequestChannel.Response): Unit = {
|
||||||
// Count how many times this callback is called for notifyThrottlingDone().
|
// Count how many times this callback is called for notifyThrottlingDone().
|
||||||
response match {
|
(response: @unchecked) match {
|
||||||
case _: StartThrottlingResponse =>
|
case _: StartThrottlingResponse =>
|
||||||
case _: EndThrottlingResponse => numCallbacks += 1
|
case _: EndThrottlingResponse => numCallbacks += 1
|
||||||
}
|
}
|
||||||
|
|
|
@ -62,7 +62,7 @@ class ThrottledChannelExpirationTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
def callback(response: Response): Unit = {
|
def callback(response: Response): Unit = {
|
||||||
response match {
|
(response: @unchecked) match {
|
||||||
case _: StartThrottlingResponse => numCallbacksForStartThrottling += 1
|
case _: StartThrottlingResponse => numCallbacksForStartThrottling += 1
|
||||||
case _: EndThrottlingResponse => numCallbacksForEndThrottling += 1
|
case _: EndThrottlingResponse => numCallbacksForEndThrottling += 1
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,6 +21,6 @@ group=org.apache.kafka
|
||||||
# - tests/kafkatest/version.py (variable DEV_VERSION)
|
# - tests/kafkatest/version.py (variable DEV_VERSION)
|
||||||
# - kafka-merge-pr.py
|
# - kafka-merge-pr.py
|
||||||
version=2.8.0-SNAPSHOT
|
version=2.8.0-SNAPSHOT
|
||||||
scalaVersion=2.13.3
|
scalaVersion=2.13.4
|
||||||
task=build
|
task=build
|
||||||
org.gradle.jvmargs=-Xmx2g -Xss4m -XX:+UseParallelGC
|
org.gradle.jvmargs=-Xmx2g -Xss4m -XX:+UseParallelGC
|
||||||
|
|
|
@ -28,7 +28,7 @@ ext {
|
||||||
|
|
||||||
// Add Scala version
|
// Add Scala version
|
||||||
def defaultScala212Version = '2.12.12'
|
def defaultScala212Version = '2.12.12'
|
||||||
def defaultScala213Version = '2.13.3'
|
def defaultScala213Version = '2.13.4'
|
||||||
if (hasProperty('scalaVersion')) {
|
if (hasProperty('scalaVersion')) {
|
||||||
if (scalaVersion == '2.12') {
|
if (scalaVersion == '2.12') {
|
||||||
versions["scala"] = defaultScala212Version
|
versions["scala"] = defaultScala212Version
|
||||||
|
@ -101,7 +101,7 @@ versions += [
|
||||||
powermock: "2.0.9",
|
powermock: "2.0.9",
|
||||||
reflections: "0.9.12",
|
reflections: "0.9.12",
|
||||||
rocksDB: "5.18.4",
|
rocksDB: "5.18.4",
|
||||||
scalaCollectionCompat: "2.2.0",
|
scalaCollectionCompat: "2.3.0",
|
||||||
scalafmt: "1.5.1",
|
scalafmt: "1.5.1",
|
||||||
scalaJava8Compat : "0.9.1",
|
scalaJava8Compat : "0.9.1",
|
||||||
scalatest: "3.0.8",
|
scalatest: "3.0.8",
|
||||||
|
|
|
@ -124,6 +124,14 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read
|
||||||
<Bug pattern="NP_NULL_ON_SOME_PATH_EXCEPTION"/>
|
<Bug pattern="NP_NULL_ON_SOME_PATH_EXCEPTION"/>
|
||||||
</Match>
|
</Match>
|
||||||
|
|
||||||
|
<Match>
|
||||||
|
<!-- Scala doesn't have checked exceptions so one cannot catch RuntimeException and rely
|
||||||
|
on the compiler to fail if the code is changed to call a method that throws Exception.
|
||||||
|
Given that, this bug pattern doesn't make sense for Scala code. -->
|
||||||
|
<Class name="kafka.log.Log"/>
|
||||||
|
<Bug pattern="REC_CATCH_EXCEPTION"/>
|
||||||
|
</Match>
|
||||||
|
|
||||||
<Match>
|
<Match>
|
||||||
<!-- A spurious null check after inlining by the scalac optimizer confuses spotBugs -->
|
<!-- A spurious null check after inlining by the scalac optimizer confuses spotBugs -->
|
||||||
<Class name="kafka.tools.StateChangeLogMerger$"/>
|
<Class name="kafka.tools.StateChangeLogMerger$"/>
|
||||||
|
|
Loading…
Reference in New Issue