mirror of https://github.com/apache/kafka.git
				
				
				
			MINOR: Enable fatal warnings with scala 2.13 (#8429)
* Upgrade to Scala 2.13.2 which introduces the ability to suppress warnings. * Upgrade to scala-collection-compat 2.1.6 as it introduces the @nowarn annotation for Scala 2.12. * While at it, also update scala-java8-compat to 0.9.1. * Fix compiler warnings and add @nowarn for the unfixed ones. Scala 2.13.2 highlights (besides @nowarn): * Rewrite Vector (using "radix-balanced finger tree vectors"), for performance. Small vectors are now more compactly represented. Some operations are now drastically faster on large vectors. A few operations may be a little slower. * Matching strings makes switches in bytecode. https://github.com/scala/scala/releases/tag/v2.13.2 Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>
This commit is contained in:
		
							parent
							
								
									039774038c
								
							
						
					
					
						commit
						c5ae154a3f
					
				|  | @ -484,6 +484,12 @@ subprojects { | |||
|     scalaCompileOptions.additionalParameters += ["-opt:l:inline"] | ||||
|     scalaCompileOptions.additionalParameters += inlineFrom | ||||
| 
 | ||||
|     if (versions.baseScala != '2.12') { | ||||
|       scalaCompileOptions.additionalParameters += ["-opt-warnings"] | ||||
|       // Scala 2.13.2 introduces compiler warnings suppression, which is a pre-requisite for -Xfatal-warnings | ||||
|       scalaCompileOptions.additionalParameters += ["-Xfatal-warnings"] | ||||
|     } | ||||
| 
 | ||||
|     // these options are valid for Scala versions < 2.13 only | ||||
|     // Scala 2.13 removes them, see https://github.com/scala/scala/pull/6502 and https://github.com/scala/scala/pull/5969 | ||||
|     if (versions.baseScala == '2.12') { | ||||
|  |  | |||
|  | @ -39,6 +39,7 @@ import org.apache.kafka.common.security.scram.internals.{ScramCredentialUtils, S | |||
| import org.apache.kafka.common.utils.{Sanitizer, Time, Utils} | ||||
| import org.apache.zookeeper.client.ZKClientConfig | ||||
| 
 | ||||
| import scala.annotation.nowarn | ||||
| import scala.jdk.CollectionConverters._ | ||||
| import scala.collection._ | ||||
| 
 | ||||
|  | @ -297,6 +298,7 @@ object ConfigCommand extends Config { | |||
|     } | ||||
|   } | ||||
| 
 | ||||
|   @nowarn("cat=deprecation") | ||||
|   private[admin] def alterConfig(adminClient: Admin, opts: ConfigCommandOptions): Unit = { | ||||
|     val entityTypes = opts.entityTypes | ||||
|     val entityNames = opts.entityNames | ||||
|  |  | |||
|  | @ -498,7 +498,7 @@ class Log(@volatile private var _dir: File, | |||
| 
 | ||||
|     def newLeaderEpochFileCache(): LeaderEpochFileCache = { | ||||
|       val checkpointFile = new LeaderEpochCheckpointFile(leaderEpochFile, logDirFailureChannel) | ||||
|       new LeaderEpochFileCache(topicPartition, logEndOffset _, checkpointFile) | ||||
|       new LeaderEpochFileCache(topicPartition, () => logEndOffset, checkpointFile) | ||||
|     } | ||||
| 
 | ||||
|     if (recordVersion.precedes(RecordVersion.V2)) { | ||||
|  |  | |||
|  | @ -38,6 +38,7 @@ import org.apache.kafka.common.requests._ | |||
| import org.apache.kafka.common.security.auth.KafkaPrincipal | ||||
| import org.apache.kafka.common.utils.{Sanitizer, Time} | ||||
| 
 | ||||
| import scala.annotation.nowarn | ||||
| import scala.collection.mutable | ||||
| import scala.jdk.CollectionConverters._ | ||||
| import scala.reflect.ClassTag | ||||
|  | @ -106,7 +107,7 @@ object RequestChannel extends Logging { | |||
| 
 | ||||
|     def requestDesc(details: Boolean): String = s"$header -- ${loggableRequest.toString(details)}" | ||||
| 
 | ||||
|     def body[T <: AbstractRequest](implicit classTag: ClassTag[T], nn: NotNothing[T]): T = { | ||||
|     def body[T <: AbstractRequest](implicit classTag: ClassTag[T], @nowarn("cat=unused") nn: NotNothing[T]): T = { | ||||
|       bodyAndSize.request match { | ||||
|         case r: T => r | ||||
|         case r => | ||||
|  |  | |||
|  | @ -700,7 +700,7 @@ private[kafka] class Acceptor(val endPoint: EndPoint, | |||
|    * Wakeup the thread for selection. | ||||
|    */ | ||||
|   @Override | ||||
|   def wakeup = nioSelector.wakeup() | ||||
|   def wakeup(): Unit = nioSelector.wakeup() | ||||
| 
 | ||||
| } | ||||
| 
 | ||||
|  |  | |||
|  | @ -39,7 +39,8 @@ import org.apache.kafka.server.authorizer.AclDeleteResult.AclBindingDeleteResult | |||
| import org.apache.kafka.server.authorizer._ | ||||
| import org.apache.zookeeper.client.ZKClientConfig | ||||
| 
 | ||||
| import scala.collection.{mutable, Seq} | ||||
| import scala.annotation.nowarn | ||||
| import scala.collection.{Seq, mutable} | ||||
| import scala.jdk.CollectionConverters._ | ||||
| import scala.util.{Failure, Random, Success, Try} | ||||
| 
 | ||||
|  | @ -249,7 +250,7 @@ class AclAuthorizer extends Authorizer with Logging { | |||
|           } | ||||
|         } catch { | ||||
|           case e: Exception => | ||||
|             resourceBindingsBeingDeleted.foreach { case (binding, index) => | ||||
|             resourceBindingsBeingDeleted.keys.foreach { binding => | ||||
|                 deleteExceptions.getOrElseUpdate(binding, apiException(e)) | ||||
|             } | ||||
|         } | ||||
|  | @ -263,6 +264,7 @@ class AclAuthorizer extends Authorizer with Logging { | |||
|     }.map(CompletableFuture.completedFuture[AclDeleteResult]).asJava | ||||
|   } | ||||
| 
 | ||||
|   @nowarn("cat=optimizer") | ||||
|   override def acls(filter: AclBindingFilter): lang.Iterable[AclBinding] = { | ||||
|       val aclBindings = new util.ArrayList[AclBinding]() | ||||
|       aclCache.foreach { case (resource, versionedAcls) => | ||||
|  | @ -342,6 +344,7 @@ class AclAuthorizer extends Authorizer with Logging { | |||
|     } else false | ||||
|   } | ||||
| 
 | ||||
|   @nowarn("cat=deprecation") | ||||
|   private def matchingAcls(resourceType: ResourceType, resourceName: String): AclSeqs = { | ||||
|     // save aclCache reference to a local val to get a consistent view of the cache during acl updates. | ||||
|     val aclCacheSnapshot = aclCache | ||||
|  |  | |||
|  | @ -28,9 +28,12 @@ import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol} | |||
| import org.apache.kafka.common.utils.Utils | ||||
| import org.apache.kafka.server.authorizer.{AuthorizableRequestContext, Authorizer} | ||||
| 
 | ||||
| import scala.annotation.nowarn | ||||
| 
 | ||||
| 
 | ||||
| object AuthorizerUtils { | ||||
| 
 | ||||
|   @nowarn("cat=deprecation") | ||||
|   def createAuthorizer(className: String): Authorizer = { | ||||
|     Utils.newInstance(className, classOf[Object]) match { | ||||
|       case auth: Authorizer => auth | ||||
|  |  | |||
|  | @ -27,7 +27,6 @@ import java.util.concurrent.atomic.AtomicInteger | |||
| 
 | ||||
| import kafka.admin.{AdminUtils, RackAwareMode} | ||||
| import kafka.api.ElectLeadersRequestOps | ||||
| import kafka.api.LeaderAndIsr | ||||
| import kafka.api.{ApiVersion, KAFKA_0_11_0_IV0, KAFKA_2_3_IV0} | ||||
| import kafka.cluster.Partition | ||||
| import kafka.common.OffsetAndMetadata | ||||
|  |  | |||
|  | @ -133,7 +133,7 @@ class ReplicationQuotaManager(val config: ReplicationQuotaManagerConfig, | |||
|     * @param value | ||||
|     */ | ||||
|   def record(value: Long): Unit = { | ||||
|     sensor().record(value, time.milliseconds(), false) | ||||
|     sensor().record(value.toDouble, time.milliseconds(), false) | ||||
|   } | ||||
| 
 | ||||
|   /** | ||||
|  |  | |||
|  | @ -37,6 +37,7 @@ import org.apache.kafka.common.requests.ListOffsetRequest | |||
| import org.apache.kafka.common.serialization.{ByteArrayDeserializer, Deserializer} | ||||
| import org.apache.kafka.common.utils.Utils | ||||
| 
 | ||||
| import scala.annotation.nowarn | ||||
| import scala.jdk.CollectionConverters._ | ||||
| 
 | ||||
| /** | ||||
|  | @ -575,6 +576,7 @@ class ChecksumMessageFormatter extends MessageFormatter { | |||
|       topicStr = "" | ||||
|   } | ||||
| 
 | ||||
|   @nowarn("cat=deprecation") | ||||
|   def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream): Unit = { | ||||
|     output.println(topicStr + "checksum:" + consumerRecord.checksum) | ||||
|   } | ||||
|  |  | |||
|  | @ -36,6 +36,7 @@ import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySe | |||
| import org.apache.kafka.common.utils.{Time, Utils} | ||||
| import org.apache.kafka.common.{KafkaException, TopicPartition} | ||||
| 
 | ||||
| import scala.annotation.nowarn | ||||
| import scala.jdk.CollectionConverters._ | ||||
| import scala.collection.mutable.HashMap | ||||
| import scala.util.control.ControlThrowable | ||||
|  | @ -190,6 +191,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { | |||
| 
 | ||||
|     setName(threadName) | ||||
| 
 | ||||
|     @nowarn("cat=deprecation") | ||||
|     private def toBaseConsumerRecord(record: ConsumerRecord[Array[Byte], Array[Byte]]): BaseConsumerRecord = | ||||
|       BaseConsumerRecord(record.topic, | ||||
|         record.partition, | ||||
|  | @ -412,10 +414,12 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { | |||
|    * If message.handler.args is specified. A constructor that takes in a String as argument must exist. | ||||
|    */ | ||||
|   trait MirrorMakerMessageHandler { | ||||
|     @nowarn("cat=deprecation") | ||||
|     def handle(record: BaseConsumerRecord): util.List[ProducerRecord[Array[Byte], Array[Byte]]] | ||||
|   } | ||||
| 
 | ||||
|   private[tools] object defaultMirrorMakerMessageHandler extends MirrorMakerMessageHandler { | ||||
|     @nowarn("cat=deprecation") | ||||
|     override def handle(record: BaseConsumerRecord): util.List[ProducerRecord[Array[Byte], Array[Byte]]] = { | ||||
|       val timestamp: java.lang.Long = if (record.timestamp == RecordBatch.NO_TIMESTAMP) null else record.timestamp | ||||
|       Collections.singletonList(new ProducerRecord(record.topic, null, timestamp, record.key, record.value, record.headers)) | ||||
|  |  | |||
|  | @ -83,7 +83,7 @@ class Pool[K,V](valueFactory: Option[K => V] = None) extends Iterable[(K, V)] { | |||
|      | ||||
|     def hasNext: Boolean = iter.hasNext | ||||
|      | ||||
|     def next: (K, V) = { | ||||
|     def next(): (K, V) = { | ||||
|       val n = iter.next | ||||
|       (n.getKey, n.getValue) | ||||
|     } | ||||
|  |  | |||
|  | @ -198,7 +198,7 @@ class ZooKeeperClient(connectString: String, | |||
|       case GetDataRequest(path, ctx) => | ||||
|         zooKeeper.getData(path, shouldWatch(request), new DataCallback { | ||||
|           def processResult(rc: Int, path: String, ctx: Any, data: Array[Byte], stat: Stat): Unit = | ||||
|             callback(GetDataResponse(Code.get(rc), path, Option(ctx), data, stat, responseMetadata(sendTimeMs))), | ||||
|             callback(GetDataResponse(Code.get(rc), path, Option(ctx), data, stat, responseMetadata(sendTimeMs))) | ||||
|         }, ctx.orNull) | ||||
|       case GetChildrenRequest(path, _, ctx) => | ||||
|         zooKeeper.getChildren(path, shouldWatch(request), new Children2Callback { | ||||
|  |  | |||
|  | @ -31,6 +31,7 @@ import org.junit.{After, Before, Rule, Test} | |||
| import org.junit.rules.Timeout | ||||
| import org.scalatest.Assertions.intercept | ||||
| 
 | ||||
| import scala.annotation.nowarn | ||||
| import scala.jdk.CollectionConverters._ | ||||
| 
 | ||||
| /** | ||||
|  | @ -92,6 +93,7 @@ class AdminClientWithPoliciesIntegrationTest extends KafkaServerTestHarness with | |||
|     PlaintextAdminIntegrationTest.checkInvalidAlterConfigs(zkClient, servers, client) | ||||
|   } | ||||
| 
 | ||||
|   @nowarn("cat=deprecation") | ||||
|   @Test | ||||
|   def testInvalidAlterConfigsDueToPolicy(): Unit = { | ||||
|     client = Admin.create(createConfig) | ||||
|  |  | |||
|  | @ -59,6 +59,7 @@ import org.junit.Assert._ | |||
| import org.junit.{After, Before, Test} | ||||
| import org.scalatest.Assertions.intercept | ||||
| 
 | ||||
| import scala.annotation.nowarn | ||||
| import scala.jdk.CollectionConverters._ | ||||
| import scala.collection.mutable | ||||
| import scala.collection.mutable.Buffer | ||||
|  | @ -949,6 +950,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { | |||
|     consumeRecords(consumer) | ||||
|   } | ||||
| 
 | ||||
|   @nowarn("cat=deprecation") | ||||
|   @Test | ||||
|   def testPatternSubscriptionWithNoTopicAccess(): Unit = { | ||||
|     createTopic(topic) | ||||
|  | @ -985,6 +987,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { | |||
|     assertEquals(Collections.singleton(topic), e.unauthorizedTopics()) | ||||
|   } | ||||
| 
 | ||||
|   @nowarn("cat=deprecation") | ||||
|   @Test | ||||
|   def testPatternSubscriptionWithTopicAndGroupRead(): Unit = { | ||||
|     createTopic(topic) | ||||
|  | @ -1016,6 +1019,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { | |||
|     assertTrue(consumer.assignment().isEmpty) | ||||
|   } | ||||
| 
 | ||||
|   @nowarn("cat=deprecation") | ||||
|   @Test | ||||
|   def testPatternSubscriptionMatchingInternalTopic(): Unit = { | ||||
|     createTopic(topic) | ||||
|  |  | |||
|  | @ -36,6 +36,7 @@ import org.junit.Assert._ | |||
| import org.junit.{After, Before, Test} | ||||
| import org.scalatest.Assertions.fail | ||||
| 
 | ||||
| import scala.annotation.nowarn | ||||
| import scala.jdk.CollectionConverters._ | ||||
| import scala.collection.mutable.Buffer | ||||
| import scala.concurrent.ExecutionException | ||||
|  | @ -102,6 +103,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness { | |||
|    * 1. Send with null key/value/partition-id should be accepted; send with null topic should be rejected. | ||||
|    * 2. Last message of the non-blocking send should return the correct offset metadata | ||||
|    */ | ||||
|   @nowarn("cat=deprecation") | ||||
|   @Test | ||||
|   def testSendOffset(): Unit = { | ||||
|     val producer = createProducer(brokerList) | ||||
|  |  | |||
|  | @ -29,6 +29,7 @@ import org.apache.kafka.common.requests.{FindCoordinatorRequest, FindCoordinator | |||
| import org.junit.Assert._ | ||||
| import org.junit.{After, Ignore, Test} | ||||
| 
 | ||||
| import scala.annotation.nowarn | ||||
| import scala.jdk.CollectionConverters._ | ||||
| import scala.collection.{Seq, mutable} | ||||
| 
 | ||||
|  | @ -83,6 +84,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging { | |||
|    * 1. Produce a bunch of messages | ||||
|    * 2. Then consume the messages while killing and restarting brokers at random | ||||
|    */ | ||||
|   @nowarn("cat=deprecation") | ||||
|   def consumeWithBrokerFailures(numIters: Int): Unit = { | ||||
|     val numRecords = 1000 | ||||
|     val producer = createProducer() | ||||
|  | @ -379,6 +381,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging { | |||
|     checkCloseDuringRebalance("group1", topic, executor, true) | ||||
|   } | ||||
| 
 | ||||
|   @nowarn("cat=deprecation") | ||||
|   private def checkCloseDuringRebalance(groupId: String, topic: String, executor: ExecutorService, brokersAvailableDuringClose: Boolean): Unit = { | ||||
| 
 | ||||
|     def subscribeAndPoll(consumer: KafkaConsumer[Array[Byte], Array[Byte]], revokeSemaphore: Option[Semaphore] = None): Future[Any] = { | ||||
|  |  | |||
|  | @ -45,6 +45,7 @@ import org.junit.Assert._ | |||
| import org.junit.{After, Before, Ignore, Test} | ||||
| import org.scalatest.Assertions.intercept | ||||
| 
 | ||||
| import scala.annotation.nowarn | ||||
| import scala.jdk.CollectionConverters._ | ||||
| import scala.collection.Seq | ||||
| import scala.concurrent.duration.Duration | ||||
|  | @ -2176,6 +2177,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { | |||
|   /** | ||||
|     * The AlterConfigs API is deprecated and should not support altering log levels | ||||
|     */ | ||||
|   @nowarn("cat=deprecation") | ||||
|   @Test | ||||
|   @Ignore // To be re-enabled once KAFKA-8779 is resolved | ||||
|   def testAlterConfigsForLog4jLogLevelsDoesNotWork(): Unit = { | ||||
|  | @ -2227,6 +2229,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { | |||
| 
 | ||||
| object PlaintextAdminIntegrationTest { | ||||
| 
 | ||||
|   @nowarn("cat=deprecation") | ||||
|   def checkValidAlterConfigs(client: Admin, topicResource1: ConfigResource, topicResource2: ConfigResource): Unit = { | ||||
|     // Alter topics | ||||
|     var topicConfigEntries1 = Seq( | ||||
|  | @ -2289,6 +2292,7 @@ object PlaintextAdminIntegrationTest { | |||
|     assertEquals("0.9", configs.get(topicResource2).get(LogConfig.MinCleanableDirtyRatioProp).value) | ||||
|   } | ||||
| 
 | ||||
|   @nowarn("cat=deprecation") | ||||
|   def checkInvalidAlterConfigs(zkClient: KafkaZkClient, servers: Seq[KafkaServer], client: Admin): Unit = { | ||||
|     // Create topics | ||||
|     val topic1 = "invalid-alter-configs-topic-1" | ||||
|  | @ -2356,12 +2360,12 @@ object PlaintextAdminIntegrationTest { | |||
| 
 | ||||
|     assertEquals(Defaults.LogCleanerMinCleanRatio.toString, | ||||
|       configs.get(topicResource1).get(LogConfig.MinCleanableDirtyRatioProp).value) | ||||
|     assertEquals(Defaults.CompressionType.toString, | ||||
|     assertEquals(Defaults.CompressionType, | ||||
|       configs.get(topicResource1).get(LogConfig.CompressionTypeProp).value) | ||||
| 
 | ||||
|     assertEquals("snappy", configs.get(topicResource2).get(LogConfig.CompressionTypeProp).value) | ||||
| 
 | ||||
|     assertEquals(Defaults.CompressionType.toString, configs.get(brokerResource).get(KafkaConfig.CompressionTypeProp).value) | ||||
|     assertEquals(Defaults.CompressionType, configs.get(brokerResource).get(KafkaConfig.CompressionTypeProp).value) | ||||
|   } | ||||
| 
 | ||||
| } | ||||
|  |  | |||
|  | @ -30,6 +30,7 @@ import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol} | |||
| import org.junit.Assert.{assertEquals, assertTrue} | ||||
| import org.junit.{After, Assert, Before, Test} | ||||
| 
 | ||||
| import scala.annotation.nowarn | ||||
| import scala.jdk.CollectionConverters._ | ||||
| import scala.collection.Seq | ||||
| import scala.compat.java8.OptionConverters._ | ||||
|  | @ -46,6 +47,7 @@ abstract class AuthorizationAdmin { | |||
| // Note: this test currently uses the deprecated SimpleAclAuthorizer to ensure we have test coverage | ||||
| // It must be replaced with the new AclAuthorizer when SimpleAclAuthorizer is removed | ||||
| class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetup { | ||||
|   @nowarn("cat=deprecation") | ||||
|   val authorizationAdmin: AuthorizationAdmin = new LegacyAuthorizationAdmin | ||||
|   this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "true") | ||||
| 
 | ||||
|  |  | |||
|  | @ -63,6 +63,7 @@ import org.junit.Assert._ | |||
| import org.junit.{After, Before, Ignore, Test} | ||||
| import org.scalatest.Assertions.intercept | ||||
| 
 | ||||
| import scala.annotation.nowarn | ||||
| import scala.collection._ | ||||
| import scala.collection.mutable.ArrayBuffer | ||||
| import scala.jdk.CollectionConverters._ | ||||
|  | @ -1324,6 +1325,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet | |||
|     }.mkString(",") | ||||
|   } | ||||
| 
 | ||||
|   @nowarn("cat=deprecation") | ||||
|   private def alterAdvertisedListener(adminClient: Admin, externalAdminClient: Admin, oldHost: String, newHost: String): Unit = { | ||||
|     val configs = servers.map { server => | ||||
|       val resource = new ConfigResource(ConfigResource.Type.BROKER, server.config.brokerId.toString) | ||||
|  | @ -1350,6 +1352,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet | |||
|     assertTrue(s"Advertised listener update not propagated by controller: $endpoints", altered) | ||||
|   } | ||||
| 
 | ||||
|   @nowarn("cat=deprecation") | ||||
|   private def alterConfigsOnServer(server: KafkaServer, props: Properties): Unit = { | ||||
|     val configEntries = props.asScala.map { case (k, v) => new ConfigEntry(k, v) }.toList.asJava | ||||
|     val newConfig = new Config(configEntries) | ||||
|  | @ -1358,6 +1361,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet | |||
|     props.asScala.foreach { case (k, v) => waitForConfigOnServer(server, k, v) } | ||||
|   } | ||||
| 
 | ||||
|   @nowarn("cat=deprecation") | ||||
|   private def alterConfigs(servers: Seq[KafkaServer], adminClient: Admin, props: Properties, | ||||
|                    perBrokerConfig: Boolean): AlterConfigsResult = { | ||||
|     val configEntries = props.asScala.map { case (k, v) => new ConfigEntry(k, v) }.toList.asJava | ||||
|  |  | |||
|  | @ -53,7 +53,7 @@ class MetricsDuringTopicCreationDeletionTest extends KafkaServerTestHarness with | |||
|     .map(KafkaConfig.fromProps(_, overridingProps)) | ||||
| 
 | ||||
|   @Before | ||||
|   override def setUp: Unit = { | ||||
|   override def setUp(): Unit = { | ||||
|     // Do some Metrics Registry cleanup by removing the metrics that this test checks. | ||||
|     // This is a test workaround to the issue that prior harness runs may have left a populated registry. | ||||
|     // see https://issues.apache.org/jira/browse/KAFKA-4605 | ||||
|  |  | |||
|  | @ -40,6 +40,8 @@ import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, AlterConfigsRes | |||
| import org.junit.Assert._ | ||||
| import org.scalatest.Assertions.intercept | ||||
| 
 | ||||
| import scala.annotation.nowarn | ||||
| 
 | ||||
| class UncleanLeaderElectionTest extends ZooKeeperTestHarness { | ||||
|   val brokerId1 = 0 | ||||
|   val brokerId2 = 1 | ||||
|  | @ -347,6 +349,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness { | |||
|     assertEquals(List("first", "third"), consumeAllMessages(topic, 2)) | ||||
|   } | ||||
| 
 | ||||
|   @nowarn("cat=deprecation") | ||||
|   private def alterTopicConfigs(adminClient: Admin, topic: String, topicConfigs: Properties): AlterConfigsResult = { | ||||
|     val configEntries = topicConfigs.asScala.map { case (k, v) => new ConfigEntry(k, v) }.toList.asJava | ||||
|     val newConfig = new Config(configEntries) | ||||
|  |  | |||
|  | @ -31,6 +31,8 @@ import scala.util.Random | |||
| import kafka.utils.TestUtils | ||||
| import org.apache.kafka.common.errors.InvalidOffsetException | ||||
| 
 | ||||
| import scala.annotation.nowarn | ||||
| 
 | ||||
| class OffsetIndexTest { | ||||
|    | ||||
|   var idx: OffsetIndex = null | ||||
|  | @ -47,7 +49,8 @@ class OffsetIndexTest { | |||
|     if(this.idx != null) | ||||
|       this.idx.file.delete() | ||||
|   } | ||||
|    | ||||
| 
 | ||||
|   @nowarn("cat=deprecation") | ||||
|   @Test | ||||
|   def randomLookupTest(): Unit = { | ||||
|     assertEquals("Not present value should return physical offset 0.", OffsetPosition(idx.baseOffset, 0), idx.lookup(92L)) | ||||
|  |  | |||
|  | @ -29,6 +29,7 @@ import org.apache.kafka.common.network.ListenerName | |||
| import org.apache.kafka.common.protocol.ApiKeys | ||||
| import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, RequestHeader, ResponseHeader} | ||||
| 
 | ||||
| import scala.annotation.nowarn | ||||
| import scala.collection.Seq | ||||
| import scala.reflect.ClassTag | ||||
| 
 | ||||
|  | @ -86,7 +87,7 @@ abstract class BaseRequestTest extends IntegrationTestHarness { | |||
|   } | ||||
| 
 | ||||
|   def receive[T <: AbstractResponse](socket: Socket, apiKey: ApiKeys, version: Short) | ||||
|                                     (implicit classTag: ClassTag[T], nn: NotNothing[T]): T = { | ||||
|                                     (implicit classTag: ClassTag[T], @nowarn("cat=unused") nn: NotNothing[T]): T = { | ||||
|     val incoming = new DataInputStream(socket.getInputStream) | ||||
|     val len = incoming.readInt() | ||||
| 
 | ||||
|  |  | |||
|  | @ -35,7 +35,7 @@ import kafka.utils.timer.MockTimer | |||
| import kafka.utils.{MockScheduler, MockTime, TestUtils} | ||||
| import kafka.zk.KafkaZkClient | ||||
| import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState | ||||
| import org.apache.kafka.common.message.StopReplicaRequestData.{StopReplicaPartitionState, StopReplicaTopicState} | ||||
| import org.apache.kafka.common.message.StopReplicaRequestData.StopReplicaPartitionState | ||||
| import org.apache.kafka.common.metrics.Metrics | ||||
| import org.apache.kafka.common.protocol.{ApiKeys, Errors} | ||||
| import org.apache.kafka.common.record._ | ||||
|  |  | |||
|  | @ -16,7 +16,7 @@ package kafka.server | |||
| 
 | ||||
| import java.util | ||||
| import java.util.concurrent.{Executors, Future, TimeUnit} | ||||
| import java.util.{Collections, LinkedHashMap, Optional, Properties} | ||||
| import java.util.{Collections, Optional, Properties} | ||||
| 
 | ||||
| import kafka.api.LeaderAndIsr | ||||
| import kafka.log.LogConfig | ||||
|  |  | |||
|  | @ -40,7 +40,7 @@ class LeaderEpochFileCacheTest { | |||
|     override def write(epochs: Seq[EpochEntry]): Unit = this.epochs = epochs | ||||
|     override def read(): Seq[EpochEntry] = this.epochs | ||||
|   } | ||||
|   private val cache = new LeaderEpochFileCache(tp, logEndOffset _, checkpoint) | ||||
|   private val cache = new LeaderEpochFileCache(tp, () => logEndOffset, checkpoint) | ||||
| 
 | ||||
|   @Test | ||||
|   def shouldAddEpochAndMessageOffsetToCache() = { | ||||
|  | @ -231,12 +231,12 @@ class LeaderEpochFileCacheTest { | |||
|     val checkpoint = new LeaderEpochCheckpointFile(new File(checkpointPath)) | ||||
| 
 | ||||
|     //Given | ||||
|     val cache = new LeaderEpochFileCache(tp, logEndOffset _, checkpoint) | ||||
|     val cache = new LeaderEpochFileCache(tp, () => logEndOffset, checkpoint) | ||||
|     cache.assign(epoch = 2, startOffset = 6) | ||||
| 
 | ||||
|     //When | ||||
|     val checkpoint2 = new LeaderEpochCheckpointFile(new File(checkpointPath)) | ||||
|     val cache2 = new LeaderEpochFileCache(tp, logEndOffset _, checkpoint2) | ||||
|     val cache2 = new LeaderEpochFileCache(tp, () => logEndOffset, checkpoint2) | ||||
| 
 | ||||
|     //Then | ||||
|     assertEquals(1, cache2.epochEntries.size) | ||||
|  |  | |||
|  | @ -19,10 +19,14 @@ package kafka.tools | |||
| 
 | ||||
| import kafka.consumer.BaseConsumerRecord | ||||
| import org.apache.kafka.common.record.{RecordBatch, TimestampType} | ||||
| 
 | ||||
| import scala.jdk.CollectionConverters._ | ||||
| import org.junit.Assert._ | ||||
| import org.junit.Test | ||||
| 
 | ||||
| import scala.annotation.nowarn | ||||
| 
 | ||||
| @nowarn("cat=deprecation") | ||||
| class MirrorMakerTest { | ||||
| 
 | ||||
|   @Test | ||||
|  |  | |||
|  | @ -38,7 +38,7 @@ class ShutdownableThreadTest { | |||
|     } | ||||
|     val latch = new CountDownLatch(1) | ||||
|     val thread = new ShutdownableThread("shutdownable-thread-test") { | ||||
|       override def doWork: Unit = { | ||||
|       override def doWork(): Unit = { | ||||
|         latch.countDown() | ||||
|         throw new FatalExitError | ||||
|       } | ||||
|  |  | |||
|  | @ -28,7 +28,7 @@ ext { | |||
| 
 | ||||
| // Add Scala version | ||||
| def defaultScala212Version = '2.12.11' | ||||
| def defaultScala213Version = '2.13.1' | ||||
| def defaultScala213Version = '2.13.2' | ||||
| if (hasProperty('scalaVersion')) { | ||||
|   if (scalaVersion == '2.12') { | ||||
|     versions["scala"] = defaultScala212Version | ||||
|  | @ -102,9 +102,9 @@ versions += [ | |||
|   powermock: "2.0.7", | ||||
|   reflections: "0.9.12", | ||||
|   rocksDB: "5.18.4", | ||||
|   scalaCollectionCompat: "2.1.4", | ||||
|   scalaCollectionCompat: "2.1.6", | ||||
|   scalafmt: "1.5.1", | ||||
|   scalaJava8Compat : "0.9.0", | ||||
|   scalaJava8Compat : "0.9.1", | ||||
|   scalatest: "3.0.8", | ||||
|   scoverage: "1.4.1", | ||||
|   scoveragePlugin: "4.0.1", | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue