From af2e6fb548cf3b5778455a7b5791bbebe3b74b44 Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Sat, 10 Nov 2018 13:58:18 -0800 Subject: [PATCH] MINOR: Update zstd, easymock, powermock, zkclient and build plugins (#5846) EasyMock 4.0.x includes a change that relies on the caller for inferring the return type of mock creator methods. Updated a number of Scala tests for compilation and execution to succeed. The versions of EasyMock and PowerMock in this PR include full support for Java 11. Reviewers: Manikumar Reddy --- build.gradle | 13 +- .../kafka/server/DelayedFetchTest.scala | 6 +- .../common/InterBrokerSendThreadTest.scala | 2 +- .../scala/unit/kafka/admin/AdminTest.scala | 2 +- .../unit/kafka/admin/ConfigCommandTest.scala | 6 +- .../admin/ReassignPartitionsCommandTest.scala | 12 +- .../unit/kafka/cluster/PartitionTest.scala | 10 +- .../controller/ControllerTestUtils.scala | 2 +- .../PartitionStateMachineTest.scala | 4 +- .../group/GroupCoordinatorTest.scala | 6 +- .../group/GroupMetadataManagerTest.scala | 12 +- .../transaction/ProducerIdManagerTest.scala | 2 +- ...ransactionCoordinatorConcurrencyTest.scala | 8 +- .../TransactionMarkerChannelManagerTest.scala | 6 +- ...onMarkerRequestCompletionHandlerTest.scala | 5 +- .../TransactionStateManagerTest.scala | 4 +- .../test/scala/unit/kafka/log/LogTest.scala | 10 +- .../kafka/log/ProducerStateManagerTest.scala | 2 +- .../server/AbstractFetcherManagerTest.scala | 2 +- .../kafka/server/ClientQuotaManagerTest.scala | 2 +- .../server/DynamicBrokerConfigTest.scala | 7 +- .../server/DynamicConfigChangeTest.scala | 2 +- .../server/HighwatermarkPersistenceTest.scala | 2 +- .../unit/kafka/server/ISRExpirationTest.scala | 8 +- .../unit/kafka/server/KafkaApisTest.scala | 25 ++- .../unit/kafka/server/LogOffsetTest.scala | 8 +- .../ReplicaAlterLogDirsThreadTest.scala | 112 +++++++------- .../server/ReplicaFetcherThreadTest.scala | 146 +++++++++--------- .../server/ReplicaManagerQuotasTest.scala | 16 +- .../kafka/server/ReplicaManagerTest.scala | 10 +- .../unit/kafka/server/ServerStartupTest.scala | 2 +- .../unit/kafka/server/SimpleFetchTest.scala | 10 +- .../ThrottledChannelExpirationTest.scala | 4 +- .../epoch/OffsetsForLeaderEpochTest.scala | 11 +- .../kafka/utils/ReplicationUtilsTest.scala | 9 +- .../unit/kafka/zk/AdminZkClientTest.scala | 2 +- gradle/dependencies.gradle | 11 +- 37 files changed, 251 insertions(+), 250 deletions(-) diff --git a/build.gradle b/build.gradle index e78d2cea6c2..bba3d839671 100644 --- a/build.gradle +++ b/build.gradle @@ -29,11 +29,11 @@ buildscript { // For Apache Rat plugin to ignore non-Git files classpath "org.ajoberstar:grgit:1.9.3" classpath 'com.github.ben-manes:gradle-versions-plugin:0.20.0' - classpath 'org.scoverage:gradle-scoverage:2.4.0' - classpath 'com.github.jengelman.gradle.plugins:shadow:4.0.0' - classpath 'org.owasp:dependency-check-gradle:3.3.2' - classpath "com.diffplug.spotless:spotless-plugin-gradle:3.15.0" - classpath "gradle.plugin.com.github.spotbugs:spotbugs-gradle-plugin:1.6.4" + classpath 'org.scoverage:gradle-scoverage:2.5.0' + classpath 'com.github.jengelman.gradle.plugins:shadow:4.0.2' + classpath 'org.owasp:dependency-check-gradle:3.3.4' + classpath "com.diffplug.spotless:spotless-plugin-gradle:3.16.0" + classpath "gradle.plugin.com.github.spotbugs:spotbugs-gradle-plugin:1.6.5" } } @@ -369,8 +369,7 @@ subprojects { if (!JavaVersion.current().isJava11Compatible()) { spotbugs { - // 3.1.6 has a regression that breaks our build, seems to be https://github.com/spotbugs/spotbugs/pull/688 - toolVersion = '3.1.5' + toolVersion = '3.1.8' excludeFilter = file("$rootDir/gradle/spotbugs-exclude.xml") ignoreFailures = false } diff --git a/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala b/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala index 9ed227464a6..6a5a33d2702 100644 --- a/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala +++ b/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala @@ -30,8 +30,8 @@ import org.junit.Assert._ class DelayedFetchTest extends EasyMockSupport { private val maxBytes = 1024 - private val replicaManager = mock(classOf[ReplicaManager]) - private val replicaQuota = mock(classOf[ReplicaQuota]) + private val replicaManager: ReplicaManager = mock(classOf[ReplicaManager]) + private val replicaQuota: ReplicaQuota = mock(classOf[ReplicaQuota]) @Test def testFetchWithFencedEpoch(): Unit = { @@ -58,7 +58,7 @@ class DelayedFetchTest extends EasyMockSupport { quota = replicaQuota, responseCallback = callback) - val partition = mock(classOf[Partition]) + val partition: Partition = mock(classOf[Partition]) EasyMock.expect(replicaManager.getPartitionOrException(topicPartition, expectLeader = true)) .andReturn(partition) diff --git a/core/src/test/scala/kafka/common/InterBrokerSendThreadTest.scala b/core/src/test/scala/kafka/common/InterBrokerSendThreadTest.scala index 6838653ad13..5c0ea2d8d48 100644 --- a/core/src/test/scala/kafka/common/InterBrokerSendThreadTest.scala +++ b/core/src/test/scala/kafka/common/InterBrokerSendThreadTest.scala @@ -30,7 +30,7 @@ import scala.collection.mutable class InterBrokerSendThreadTest { private val time = new MockTime() - private val networkClient = EasyMock.createMock(classOf[NetworkClient]) + private val networkClient: NetworkClient = EasyMock.createMock(classOf[NetworkClient]) private val completionHandler = new StubCompletionHandler private val requestTimeoutMs = 1000 diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala index 88aff62c5c3..2306a921898 100755 --- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala @@ -142,7 +142,7 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest { val topic = "test.topic" // simulate the ZK interactions that can happen when a topic is concurrently created by multiple processes - val zkMock = EasyMock.createNiceMock(classOf[ZkUtils]) + val zkMock: ZkUtils = EasyMock.createNiceMock(classOf[ZkUtils]) EasyMock.expect(zkMock.pathExists(s"/brokers/topics/$topic")).andReturn(false) EasyMock.expect(zkMock.getAllTopics).andReturn(Seq("some.topic", topic, "some.other.topic")) EasyMock.replay(zkMock) diff --git a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala index cb261f62c7d..ee4a6ef8421 100644 --- a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala +++ b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala @@ -245,12 +245,12 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging { val configEntries = util.Collections.singletonList(new ConfigEntry("num.io.threads", "5")) val future = new KafkaFutureImpl[util.Map[ConfigResource, Config]] future.complete(util.Collections.singletonMap(resource, new Config(configEntries))) - val describeResult = EasyMock.createNiceMock(classOf[DescribeConfigsResult]) + val describeResult: DescribeConfigsResult = EasyMock.createNiceMock(classOf[DescribeConfigsResult]) EasyMock.expect(describeResult.all()).andReturn(future).once() val alterFuture = new KafkaFutureImpl[Void] alterFuture.complete(null) - val alterResult = EasyMock.createNiceMock(classOf[AlterConfigsResult]) + val alterResult: AlterConfigsResult = EasyMock.createNiceMock(classOf[AlterConfigsResult]) EasyMock.expect(alterResult.all()).andReturn(alterFuture) val mockAdminClient = new MockAdminClient(util.Collections.singletonList(node), node) { @@ -622,7 +622,7 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging { @Test def testQuotaDescribeEntities() { - val zkClient = EasyMock.createNiceMock(classOf[KafkaZkClient]) + val zkClient: KafkaZkClient = EasyMock.createNiceMock(classOf[KafkaZkClient]) def checkEntities(opts: Array[String], expectedFetches: Map[String, Seq[String]], expectedEntityNames: Seq[String]) { val entity = ConfigCommand.parseEntity(new ConfigCommandOptions(opts :+ "--describe")) diff --git a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala index f94abae0e7f..128919f83eb 100644 --- a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala +++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala @@ -268,7 +268,7 @@ class ReassignPartitionsCommandTest extends ZooKeeperTestHarness with Logging { //Setup val zk = stubZKClient(existing) - val admin = createMock(classOf[AdminZkClient]) + val admin: AdminZkClient = createMock(classOf[AdminZkClient]) val propsCapture: Capture[Properties] = newCapture(CaptureType.ALL) val assigner = new ReassignPartitionsCommand(zk, None, proposed, Map.empty, admin) expect(admin.fetchEntityConfig(anyString(), anyString())).andStubReturn(new Properties) @@ -294,7 +294,7 @@ class ReassignPartitionsCommandTest extends ZooKeeperTestHarness with Logging { //Setup val zk = stubZKClient(existing) - val admin = createMock(classOf[AdminZkClient]) + val admin: AdminZkClient = createMock(classOf[AdminZkClient]) val propsCapture: Capture[Properties] = newCapture(CaptureType.ALL) val assigner = new ReassignPartitionsCommand(zk, None, proposed, Map.empty, admin) expect(admin.changeBrokerConfig(anyObject().asInstanceOf[List[Int]], capture(propsCapture))).anyTimes() @@ -328,7 +328,7 @@ class ReassignPartitionsCommandTest extends ZooKeeperTestHarness with Logging { //Setup val zk = stubZKClient(existing) - val admin = createMock(classOf[AdminZkClient]) + val admin: AdminZkClient = createMock(classOf[AdminZkClient]) val propsCapture: Capture[Properties] = newCapture(CaptureType.ALL) val assigner = new ReassignPartitionsCommand(zk, None, proposed, Map.empty, admin) expect(admin.changeBrokerConfig(anyObject().asInstanceOf[List[Int]], capture(propsCapture))).anyTimes() @@ -364,7 +364,7 @@ class ReassignPartitionsCommandTest extends ZooKeeperTestHarness with Logging { //Setup val zk = stubZKClient(brokers = brokers) - val admin = createMock(classOf[AdminZkClient]) + val admin: AdminZkClient = createMock(classOf[AdminZkClient]) val propsCapture: Capture[Properties] = newCapture(CaptureType.ALL) expect(admin.fetchEntityConfig(is(ConfigType.Topic), anyString())).andStubReturn(new Properties) expect(admin.changeBrokerConfig(anyObject().asInstanceOf[Seq[Int]], capture(propsCapture))).anyTimes() @@ -399,7 +399,7 @@ class ReassignPartitionsCommandTest extends ZooKeeperTestHarness with Logging { //Setup val zk = stubZKClient(brokers = Seq(100, 101)) - val admin = createMock(classOf[AdminZkClient]) + val admin: AdminZkClient = createMock(classOf[AdminZkClient]) val propsCapture: Capture[Properties] = newCapture(CaptureType.ALL) expect(admin.fetchEntityConfig(is(ConfigType.Broker), anyString())).andStubReturn(new Properties) expect(admin.fetchEntityConfig(is(ConfigType.Topic), is("topic1"))).andStubReturn(copyOf(existingConfigs)) @@ -567,7 +567,7 @@ class ReassignPartitionsCommandTest extends ZooKeeperTestHarness with Logging { def stubZKClient(existingAssignment: Map[TopicPartition, Seq[Int]] = Map[TopicPartition, Seq[Int]](), brokers: Seq[Int] = Seq[Int]()): KafkaZkClient = { - val zkClient = createMock(classOf[KafkaZkClient]) + val zkClient: KafkaZkClient = createMock(classOf[KafkaZkClient]) expect(zkClient.getReplicaAssignmentForTopics(anyObject().asInstanceOf[Set[String]])).andStubReturn(existingAssignment) expect(zkClient.getAllBrokersInCluster).andStubReturn(brokers.map(TestUtils.createBroker(_, "", 1))) replay(zkClient) diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala index a075bd05220..6e38ca9575b 100644 --- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala +++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala @@ -76,7 +76,7 @@ class PartitionTest { val brokerProps = TestUtils.createBrokerConfig(brokerId, TestUtils.MockZkConnect) brokerProps.put(KafkaConfig.LogDirsProp, Seq(logDir1, logDir2).map(_.getAbsolutePath).mkString(",")) val brokerConfig = KafkaConfig.fromProps(brokerProps) - val kafkaZkClient = EasyMock.createMock(classOf[KafkaZkClient]) + val kafkaZkClient: KafkaZkClient = EasyMock.createMock(classOf[KafkaZkClient]) replicaManager = new ReplicaManager( config = brokerConfig, metrics, time, zkClient = kafkaZkClient, new MockScheduler(time), logManager, new AtomicBoolean(false), QuotaFactory.instantiate(brokerConfig, metrics, time, ""), @@ -386,8 +386,8 @@ class PartitionTest { isLeader: Boolean, log: Log = logManager.getOrCreateLog(topicPartition, logConfig)): Partition = { val replica = new Replica(brokerId, topicPartition, time, log = Some(log)) - val replicaManager = EasyMock.mock(classOf[ReplicaManager]) - val zkClient = EasyMock.mock(classOf[KafkaZkClient]) + val replicaManager: ReplicaManager = EasyMock.mock(classOf[ReplicaManager]) + val zkClient: KafkaZkClient = EasyMock.mock(classOf[KafkaZkClient]) val partition = new Partition(topicPartition, isOffline = false, @@ -481,8 +481,8 @@ class PartitionTest { def testListOffsetIsolationLevels(): Unit = { val log = logManager.getOrCreateLog(topicPartition, logConfig) val replica = new Replica(brokerId, topicPartition, time, log = Some(log)) - val replicaManager = EasyMock.mock(classOf[ReplicaManager]) - val zkClient = EasyMock.mock(classOf[KafkaZkClient]) + val replicaManager: ReplicaManager = EasyMock.mock(classOf[ReplicaManager]) + val zkClient: KafkaZkClient = EasyMock.mock(classOf[KafkaZkClient]) val partition = new Partition(topicPartition, isOffline = false, diff --git a/core/src/test/scala/unit/kafka/controller/ControllerTestUtils.scala b/core/src/test/scala/unit/kafka/controller/ControllerTestUtils.scala index b0413a71af6..84b956df79b 100644 --- a/core/src/test/scala/unit/kafka/controller/ControllerTestUtils.scala +++ b/core/src/test/scala/unit/kafka/controller/ControllerTestUtils.scala @@ -22,7 +22,7 @@ object ControllerTestUtils { /** Since ControllerEvent is sealed, return a subclass of ControllerEvent created with EasyMock */ def createMockControllerEvent(controllerState: ControllerState, process: () => Unit): ControllerEvent = { - val mockEvent = EasyMock.createNiceMock(classOf[ControllerEvent]) + val mockEvent: ControllerEvent = EasyMock.createNiceMock(classOf[ControllerEvent]) EasyMock.expect(mockEvent.state).andReturn(controllerState) EasyMock.expect(mockEvent.process()).andAnswer(new IAnswer[Unit]() { def answer(): Unit = { diff --git a/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala b/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala index 3370b5410aa..0e8f98e8d8f 100644 --- a/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala +++ b/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala @@ -433,7 +433,7 @@ class PartitionStateMachineTest extends JUnitSuite { mockZkClient, partitionState, mockControllerBrokerRequestBatch) def createMockController() = { - val mockController = EasyMock.createMock(classOf[KafkaController]) + val mockController: KafkaController = EasyMock.createMock(classOf[KafkaController]) EasyMock.expect(mockController.controllerContext).andReturn(controllerContext).anyTimes() EasyMock.expect(mockController.config).andReturn(customConfig).anyTimes() EasyMock.expect(mockController.partitionStateMachine).andReturn(partitionStateMachine).anyTimes() @@ -444,7 +444,7 @@ class PartitionStateMachineTest extends JUnitSuite { } val mockController = createMockController() - val mockEventManager = EasyMock.createMock(classOf[ControllerEventManager]) + val mockEventManager: ControllerEventManager = EasyMock.createMock(classOf[ControllerEventManager]) EasyMock.replay(mockController, replicaStateMachine, mockEventManager) val topicDeletionManager = new TopicDeletionManager(mockController, mockEventManager, mockZkClient) diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala index c2c0841534c..c1623427f0f 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala @@ -928,7 +928,7 @@ class GroupCoordinatorTest extends JUnitSuite { assertEquals(Empty.toString, summary.state) val groupTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId) - val partition = EasyMock.niceMock(classOf[Partition]) + val partition: Partition = EasyMock.niceMock(classOf[Partition]) EasyMock.reset(replicaManager) EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andStubReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE)) @@ -1425,7 +1425,7 @@ class GroupCoordinatorTest extends JUnitSuite { assertEquals(Errors.NONE, leaveGroupResult) val groupTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId) - val partition = EasyMock.niceMock(classOf[Partition]) + val partition: Partition = EasyMock.niceMock(classOf[Partition]) EasyMock.reset(replicaManager) EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andStubReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE)) @@ -1466,7 +1466,7 @@ class GroupCoordinatorTest extends JUnitSuite { assertEquals(Errors.NONE, leaveGroupResult) val groupTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId) - val partition = EasyMock.niceMock(classOf[Partition]) + val partition: Partition = EasyMock.niceMock(classOf[Partition]) EasyMock.reset(replicaManager) EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andStubReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE)) diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala index 9ab97051dd7..3ab4a13cc6f 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala @@ -729,7 +729,7 @@ class GroupMetadataManagerTest { val tp2 = new TopicPartition("bar", 0) val tp3 = new TopicPartition("xxx", 0) - val logMock = EasyMock.mock(classOf[Log]) + val logMock: Log = EasyMock.mock(classOf[Log]) EasyMock.expect(replicaManager.getLog(groupTopicPartition)).andStubReturn(Some(logMock)) val segment1MemberId = "a" @@ -1785,19 +1785,19 @@ class GroupMetadataManagerTest { offsetCommitRecords ++ Seq(groupMetadataRecord): _*) // Prepend empty control batch to valid records - val mockBatch = EasyMock.createMock(classOf[MutableRecordBatch]) + val mockBatch: MutableRecordBatch = EasyMock.createMock(classOf[MutableRecordBatch]) EasyMock.expect(mockBatch.iterator).andReturn(Collections.emptyIterator[Record]) EasyMock.expect(mockBatch.isControlBatch).andReturn(true) EasyMock.expect(mockBatch.isTransactional).andReturn(true) EasyMock.expect(mockBatch.nextOffset).andReturn(16L) EasyMock.replay(mockBatch) - val mockRecords = EasyMock.createMock(classOf[MemoryRecords]) + val mockRecords: MemoryRecords = EasyMock.createMock(classOf[MemoryRecords]) EasyMock.expect(mockRecords.batches).andReturn((Iterable[MutableRecordBatch](mockBatch) ++ records.batches.asScala).asJava).anyTimes() EasyMock.expect(mockRecords.records).andReturn(records.records()).anyTimes() EasyMock.expect(mockRecords.sizeInBytes()).andReturn(DefaultRecordBatch.RECORD_BATCH_OVERHEAD + records.sizeInBytes()).anyTimes() EasyMock.replay(mockRecords) - val logMock = EasyMock.mock(classOf[Log]) + val logMock: Log = EasyMock.mock(classOf[Log]) EasyMock.expect(logMock.logStartOffset).andReturn(startOffset).anyTimes() EasyMock.expect(logMock.read(EasyMock.eq(startOffset), maxLength = EasyMock.anyInt(), @@ -1886,7 +1886,7 @@ class GroupMetadataManagerTest { private def expectGroupMetadataLoad(groupMetadataTopicPartition: TopicPartition, startOffset: Long, records: MemoryRecords): Unit = { - val logMock = EasyMock.mock(classOf[Log]) + val logMock: Log = EasyMock.mock(classOf[Log]) EasyMock.expect(replicaManager.getLog(groupMetadataTopicPartition)).andStubReturn(Some(logMock)) val endOffset = expectGroupMetadataLoad(logMock, startOffset, records) EasyMock.expect(replicaManager.getLogEndOffset(groupMetadataTopicPartition)).andStubReturn(Some(endOffset)) @@ -1902,7 +1902,7 @@ class GroupMetadataManagerTest { startOffset: Long, records: MemoryRecords): Long = { val endOffset = startOffset + records.records.asScala.size - val fileRecordsMock = EasyMock.mock(classOf[FileRecords]) + val fileRecordsMock: FileRecords = EasyMock.mock(classOf[FileRecords]) EasyMock.expect(logMock.logStartOffset).andStubReturn(startOffset) EasyMock.expect(logMock.read(EasyMock.eq(startOffset), diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala index 660e62309e7..b2cc4a51a59 100644 --- a/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala @@ -24,7 +24,7 @@ import org.junit.Assert._ class ProducerIdManagerTest { - private val zkClient = EasyMock.createNiceMock(classOf[KafkaZkClient]) + private val zkClient: KafkaZkClient = EasyMock.createNiceMock(classOf[KafkaZkClient]) @After def tearDown(): Unit = { diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala index b2a67333d9c..3cf956629b2 100644 --- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala @@ -81,13 +81,13 @@ class TransactionCoordinatorConcurrencyTest extends AbstractCoordinatorConcurren new MockTimer, reaperEnabled = false) val brokerNode = new Node(0, "host", 10) - val metadataCache = EasyMock.createNiceMock(classOf[MetadataCache]) + val metadataCache: MetadataCache = EasyMock.createNiceMock(classOf[MetadataCache]) EasyMock.expect(metadataCache.getPartitionLeaderEndpoint( EasyMock.anyString(), EasyMock.anyInt(), EasyMock.anyObject()) ).andReturn(Some(brokerNode)).anyTimes() - val networkClient = EasyMock.createNiceMock(classOf[NetworkClient]) + val networkClient: NetworkClient = EasyMock.createNiceMock(classOf[NetworkClient]) txnMarkerChannelManager = new TransactionMarkerChannelManager( KafkaConfig.fromProps(serverProps), metadataCache, @@ -246,8 +246,8 @@ class TransactionCoordinatorConcurrencyTest extends AbstractCoordinatorConcurren private def prepareTxnLog(partitionId: Int): Unit = { - val logMock = EasyMock.mock(classOf[Log]) - val fileRecordsMock = EasyMock.mock(classOf[FileRecords]) + val logMock: Log = EasyMock.mock(classOf[Log]) + val fileRecordsMock: FileRecords = EasyMock.mock(classOf[FileRecords]) val topicPartition = new TopicPartition(TRANSACTION_STATE_TOPIC_NAME, partitionId) val startOffset = replicaManager.getLogEndOffset(topicPartition).getOrElse(20L) diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala index 454b3617328..44d5c5f4b75 100644 --- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala @@ -36,9 +36,9 @@ import scala.collection.JavaConverters._ import scala.collection.mutable class TransactionMarkerChannelManagerTest { - private val metadataCache = EasyMock.createNiceMock(classOf[MetadataCache]) - private val networkClient = EasyMock.createNiceMock(classOf[NetworkClient]) - private val txnStateManager = EasyMock.createNiceMock(classOf[TransactionStateManager]) + private val metadataCache: MetadataCache = EasyMock.createNiceMock(classOf[MetadataCache]) + private val networkClient: NetworkClient = EasyMock.createNiceMock(classOf[NetworkClient]) + private val txnStateManager: TransactionStateManager = EasyMock.createNiceMock(classOf[TransactionStateManager]) private val partition1 = new TopicPartition("topic1", 0) private val partition2 = new TopicPartition("topic1", 1) diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala index 3ca6c1b00b4..85159c31b29 100644 --- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala @@ -47,9 +47,10 @@ class TransactionMarkerRequestCompletionHandlerTest { private val txnMetadata = new TransactionMetadata(transactionalId, producerId, producerEpoch, txnTimeoutMs, PrepareCommit, mutable.Set[TopicPartition](topicPartition), 0L, 0L) - private val markerChannelManager = EasyMock.createNiceMock(classOf[TransactionMarkerChannelManager]) + private val markerChannelManager: TransactionMarkerChannelManager = + EasyMock.createNiceMock(classOf[TransactionMarkerChannelManager]) - private val txnStateManager = EasyMock.createNiceMock(classOf[TransactionStateManager]) + private val txnStateManager: TransactionStateManager = EasyMock.createNiceMock(classOf[TransactionStateManager]) private val handler = new TransactionMarkerRequestCompletionHandler(brokerId, txnStateManager, markerChannelManager, txnIdAndMarkers) diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala index 6004cc01229..34a37bebe4d 100644 --- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala @@ -573,8 +573,8 @@ class TransactionStateManagerTest { records: MemoryRecords): Unit = { EasyMock.reset(replicaManager) - val logMock = EasyMock.mock(classOf[Log]) - val fileRecordsMock = EasyMock.mock(classOf[FileRecords]) + val logMock: Log = EasyMock.mock(classOf[Log]) + val fileRecordsMock: FileRecords = EasyMock.mock(classOf[FileRecords]) val endOffset = startOffset + records.records.asScala.size diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index c37da515901..fe2820cba1c 100755 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -327,7 +327,7 @@ class LogTest { @Test def testSizeForLargeLogs(): Unit = { val largeSize = Int.MaxValue.toLong * 2 - val logSegment = EasyMock.createMock(classOf[LogSegment]) + val logSegment: LogSegment = EasyMock.createMock(classOf[LogSegment]) EasyMock.expect(logSegment.size).andReturn(Int.MaxValue).anyTimes EasyMock.replay(logSegment) @@ -349,7 +349,7 @@ class LogTest { @Test def testSkipLoadingIfEmptyProducerStateBeforeTruncation(): Unit = { - val stateManager = EasyMock.mock(classOf[ProducerStateManager]) + val stateManager: ProducerStateManager = EasyMock.mock(classOf[ProducerStateManager]) // Load the log EasyMock.expect(stateManager.latestSnapshotOffset).andReturn(None) @@ -428,7 +428,7 @@ class LogTest { @Test def testSkipTruncateAndReloadIfOldMessageFormatAndNoCleanShutdown(): Unit = { - val stateManager = EasyMock.mock(classOf[ProducerStateManager]) + val stateManager: ProducerStateManager = EasyMock.mock(classOf[ProducerStateManager]) stateManager.updateMapEndOffset(0L) EasyMock.expectLastCall().anyTimes() @@ -465,7 +465,7 @@ class LogTest { @Test def testSkipTruncateAndReloadIfOldMessageFormatAndCleanShutdown(): Unit = { - val stateManager = EasyMock.mock(classOf[ProducerStateManager]) + val stateManager: ProducerStateManager = EasyMock.mock(classOf[ProducerStateManager]) stateManager.updateMapEndOffset(0L) EasyMock.expectLastCall().anyTimes() @@ -505,7 +505,7 @@ class LogTest { @Test def testSkipTruncateAndReloadIfNewMessageFormatAndCleanShutdown(): Unit = { - val stateManager = EasyMock.mock(classOf[ProducerStateManager]) + val stateManager: ProducerStateManager = EasyMock.mock(classOf[ProducerStateManager]) EasyMock.expect(stateManager.latestSnapshotOffset).andReturn(None) diff --git a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala index 9afb145c4c6..b49b5e15d87 100644 --- a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala @@ -754,7 +754,7 @@ class ProducerStateManagerTest extends JUnitSuite { val producerEpoch = 145.toShort val baseOffset = 15 - val batch = EasyMock.createMock(classOf[RecordBatch]) + val batch: RecordBatch = EasyMock.createMock(classOf[RecordBatch]) EasyMock.expect(batch.isControlBatch).andReturn(true).once EasyMock.expect(batch.iterator).andReturn(Collections.emptyIterator[Record]).once EasyMock.replay(batch) diff --git a/core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala b/core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala index cd00ff166d5..0a4d7c11df3 100644 --- a/core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala @@ -26,7 +26,7 @@ class AbstractFetcherManagerTest { @Test def testAddAndRemovePartition(): Unit = { - val fetcher = EasyMock.mock(classOf[AbstractFetcherThread]) + val fetcher: AbstractFetcherThread = EasyMock.mock(classOf[AbstractFetcherThread]) val fetcherManager = new AbstractFetcherManager[AbstractFetcherThread]("fetcher-manager", "fetcher-manager", 2) { override def createFetcherThread(fetcherId: Int, sourceBroker: BrokerEndPoint): AbstractFetcherThread = { fetcher diff --git a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala index 6f75174706c..e10d4b2d7c8 100644 --- a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala @@ -59,7 +59,7 @@ class ClientQuotaManagerTest { val request = builder.build() val buffer = request.serialize(new RequestHeader(builder.apiKey, request.version, "", 0)) - val requestChannelMetrics = EasyMock.createNiceMock(classOf[RequestChannel.Metrics]) + val requestChannelMetrics: RequestChannel.Metrics = EasyMock.createNiceMock(classOf[RequestChannel.Metrics]) // read the header from the buffer first so that the body can be read next from the Request constructor val header = RequestHeader.parse(buffer) diff --git a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala index 41b90557dba..45ef18f5187 100755 --- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala @@ -21,6 +21,7 @@ import java.util import java.util.Properties import kafka.utils.TestUtils +import kafka.zk.KafkaZkClient import org.apache.kafka.common.Reconfigurable import org.apache.kafka.common.config.types.Password import org.apache.kafka.common.config.{ConfigException, SslConfigs} @@ -303,7 +304,7 @@ class DynamicBrokerConfigTest extends JUnitSuite { def testDynamicListenerConfig(): Unit = { val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 9092) val oldConfig = KafkaConfig.fromProps(props) - val kafkaServer = EasyMock.createMock(classOf[kafka.server.KafkaServer]) + val kafkaServer: KafkaServer = EasyMock.createMock(classOf[kafka.server.KafkaServer]) EasyMock.expect(kafkaServer.config).andReturn(oldConfig).anyTimes() EasyMock.replay(kafkaServer) @@ -328,7 +329,7 @@ class DynamicBrokerConfigTest extends JUnitSuite { @Test def testDynamicConfigInitializationWithoutConfigsInZK(): Unit = { - val zkClient = EasyMock.createMock(classOf[kafka.zk.KafkaZkClient]) + val zkClient: KafkaZkClient = EasyMock.createMock(classOf[KafkaZkClient]) EasyMock.expect(zkClient.getEntityConfigs(EasyMock.anyString(), EasyMock.anyString())).andReturn(new java.util.Properties()).anyTimes() EasyMock.replay(zkClient) @@ -362,4 +363,4 @@ class TestDynamicThreadPool() extends BrokerReconfigurable { assertEquals(10, newConfig.numIoThreads) assertEquals(100, newConfig.backgroundThreads) } -} \ No newline at end of file +} diff --git a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala index cabe0a984e2..789dbaeb8f7 100644 --- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala +++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala @@ -210,7 +210,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness { // Create a mock ConfigHandler to record config changes it is asked to process val entityArgument = EasyMock.newCapture[String] val propertiesArgument = EasyMock.newCapture[Properties] - val handler = EasyMock.createNiceMock(classOf[ConfigHandler]) + val handler: ConfigHandler = EasyMock.createNiceMock(classOf[ConfigHandler]) handler.processConfigChanges( EasyMock.and(EasyMock.capture(entityArgument), EasyMock.isA(classOf[String])), EasyMock.and(EasyMock.capture(propertiesArgument), EasyMock.isA(classOf[Properties]))) diff --git a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala index 74512347e26..61cbd2c94f4 100755 --- a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala +++ b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala @@ -35,7 +35,7 @@ class HighwatermarkPersistenceTest { val configs = TestUtils.createBrokerConfigs(2, TestUtils.MockZkConnect).map(KafkaConfig.fromProps) val topic = "foo" - val zkClient = EasyMock.createMock(classOf[KafkaZkClient]) + val zkClient: KafkaZkClient = EasyMock.createMock(classOf[KafkaZkClient]) val logManagers = configs map { config => TestUtils.createLogManager( logDirs = config.logDirs.map(new File(_)), diff --git a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala index 3dff709bef6..d5bdf14917d 100644 --- a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala +++ b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala @@ -21,7 +21,7 @@ import java.util.Properties import java.util.concurrent.atomic.AtomicBoolean import kafka.cluster.{Partition, Replica} -import kafka.log.Log +import kafka.log.{Log, LogManager} import kafka.server.epoch.LeaderEpochFileCache import kafka.utils._ import org.apache.kafka.common.TopicPartition @@ -55,7 +55,7 @@ class IsrExpirationTest { @Before def setUp() { - val logManager = EasyMock.createMock(classOf[kafka.log.LogManager]) + val logManager: LogManager = EasyMock.createMock(classOf[LogManager]) EasyMock.expect(logManager.liveLogDirs).andReturn(Array.empty[File]).anyTimes() EasyMock.replay(logManager) @@ -252,8 +252,8 @@ class IsrExpirationTest { } private def logMock: Log = { - val log = EasyMock.createMock(classOf[kafka.log.Log]) - val cache = EasyMock.createNiceMock(classOf[LeaderEpochFileCache]) + val log: Log = EasyMock.createMock(classOf[Log]) + val cache: LeaderEpochFileCache = EasyMock.createNiceMock(classOf[LeaderEpochFileCache]) EasyMock.expect(log.dir).andReturn(TestUtils.tempDir()).anyTimes() EasyMock.expect(log.leaderEpochCache).andReturn(cache).anyTimes() EasyMock.expect(log.onHighWatermarkIncremented(0L)) diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index dd016f470b0..b18511e7ce9 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -25,7 +25,6 @@ import kafka.api.{ApiVersion, KAFKA_0_10_2_IV0} import kafka.controller.KafkaController import kafka.coordinator.group.GroupCoordinator import kafka.coordinator.transaction.TransactionCoordinator -import kafka.log.TimestampOffset import kafka.network.RequestChannel import kafka.network.RequestChannel.SendResponse import kafka.security.auth.Authorizer @@ -55,24 +54,24 @@ import scala.collection.Map class KafkaApisTest { - private val requestChannel = EasyMock.createNiceMock(classOf[RequestChannel]) - private val requestChannelMetrics = EasyMock.createNiceMock(classOf[RequestChannel.Metrics]) - private val replicaManager = EasyMock.createNiceMock(classOf[ReplicaManager]) - private val groupCoordinator = EasyMock.createNiceMock(classOf[GroupCoordinator]) - private val adminManager = EasyMock.createNiceMock(classOf[AdminManager]) - private val txnCoordinator = EasyMock.createNiceMock(classOf[TransactionCoordinator]) - private val controller = EasyMock.createNiceMock(classOf[KafkaController]) - private val zkClient = EasyMock.createNiceMock(classOf[KafkaZkClient]) + private val requestChannel: RequestChannel = EasyMock.createNiceMock(classOf[RequestChannel]) + private val requestChannelMetrics: RequestChannel.Metrics = EasyMock.createNiceMock(classOf[RequestChannel.Metrics]) + private val replicaManager: ReplicaManager = EasyMock.createNiceMock(classOf[ReplicaManager]) + private val groupCoordinator: GroupCoordinator = EasyMock.createNiceMock(classOf[GroupCoordinator]) + private val adminManager: AdminManager = EasyMock.createNiceMock(classOf[AdminManager]) + private val txnCoordinator: TransactionCoordinator = EasyMock.createNiceMock(classOf[TransactionCoordinator]) + private val controller: KafkaController = EasyMock.createNiceMock(classOf[KafkaController]) + private val zkClient: KafkaZkClient = EasyMock.createNiceMock(classOf[KafkaZkClient]) private val metrics = new Metrics() private val brokerId = 1 private val metadataCache = new MetadataCache(brokerId) private val authorizer: Option[Authorizer] = None - private val clientQuotaManager = EasyMock.createNiceMock(classOf[ClientQuotaManager]) - private val clientRequestQuotaManager = EasyMock.createNiceMock(classOf[ClientRequestQuotaManager]) - private val replicaQuotaManager = EasyMock.createNiceMock(classOf[ReplicationQuotaManager]) + private val clientQuotaManager: ClientQuotaManager = EasyMock.createNiceMock(classOf[ClientQuotaManager]) + private val clientRequestQuotaManager: ClientRequestQuotaManager = EasyMock.createNiceMock(classOf[ClientRequestQuotaManager]) + private val replicaQuotaManager: ReplicationQuotaManager = EasyMock.createNiceMock(classOf[ReplicationQuotaManager]) private val quotas = QuotaManagers(clientQuotaManager, clientQuotaManager, clientRequestQuotaManager, replicaQuotaManager, replicaQuotaManager, replicaQuotaManager, None) - private val fetchManager = EasyMock.createNiceMock(classOf[FetchManager]) + private val fetchManager: FetchManager = EasyMock.createNiceMock(classOf[FetchManager]) private val brokerTopicStats = new BrokerTopicStats private val clusterId = "clusterId" private val time = new MockTime diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala index 740b28e11cf..50449dc3e36 100755 --- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala +++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala @@ -211,8 +211,8 @@ class LogOffsetTest extends BaseRequestTest { * a race condition) */ @Test def testFetchOffsetsBeforeWithChangingSegmentSize() { - val log = EasyMock.niceMock(classOf[Log]) - val logSegment = EasyMock.niceMock(classOf[LogSegment]) + val log: Log = EasyMock.niceMock(classOf[Log]) + val logSegment: LogSegment = EasyMock.niceMock(classOf[LogSegment]) EasyMock.expect(logSegment.size).andStubAnswer(new IAnswer[Int] { private val value = new AtomicInteger(0) def answer: Int = value.getAndIncrement() @@ -228,8 +228,8 @@ class LogOffsetTest extends BaseRequestTest { * different (simulating a race condition) */ @Test def testFetchOffsetsBeforeWithChangingSegments() { - val log = EasyMock.niceMock(classOf[Log]) - val logSegment = EasyMock.niceMock(classOf[LogSegment]) + val log: Log = EasyMock.niceMock(classOf[Log]) + val logSegment: LogSegment = EasyMock.niceMock(classOf[LogSegment]) EasyMock.expect(log.logSegments).andStubAnswer { new IAnswer[Iterable[LogSegment]] { def answer = new Iterable[LogSegment] { diff --git a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala index 5d92b61c959..a4fbaf28564 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala @@ -50,9 +50,9 @@ class ReplicaAlterLogDirsThreadTest { val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234")) //Setup all dependencies - val partitionT1p0 = createMock(classOf[Partition]) - val partitionT1p1 = createMock(classOf[Partition]) - val replicaManager = createMock(classOf[ReplicaManager]) + val partitionT1p0: Partition = createMock(classOf[Partition]) + val partitionT1p1: Partition = createMock(classOf[Partition]) + val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager]) val leaderEpochT1p0 = 2 val leaderEpochT1p1 = 5 @@ -101,8 +101,8 @@ class ReplicaAlterLogDirsThreadTest { val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234")) //Setup all dependencies - val partitionT1p0 = createMock(classOf[Partition]) - val replicaManager = createMock(classOf[ReplicaManager]) + val partitionT1p0: Partition = createMock(classOf[Partition]) + val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager]) val leaderEpoch = 2 val leo = 13 @@ -149,18 +149,18 @@ class ReplicaAlterLogDirsThreadTest { // Setup all the dependencies val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234")) - val quotaManager = createNiceMock(classOf[ReplicationQuotaManager]) - val futureReplicaLeaderEpochsT1p0 = createMock(classOf[LeaderEpochFileCache]) - val futureReplicaLeaderEpochsT1p1 = createMock(classOf[LeaderEpochFileCache]) - val logManager = createMock(classOf[LogManager]) - val replicaT1p0 = createNiceMock(classOf[Replica]) - val replicaT1p1 = createNiceMock(classOf[Replica]) + val quotaManager: ReplicationQuotaManager = createNiceMock(classOf[ReplicationQuotaManager]) + val futureReplicaLeaderEpochsT1p0: LeaderEpochFileCache = createMock(classOf[LeaderEpochFileCache]) + val futureReplicaLeaderEpochsT1p1: LeaderEpochFileCache = createMock(classOf[LeaderEpochFileCache]) + val logManager: LogManager = createMock(classOf[LogManager]) + val replicaT1p0: Replica = createNiceMock(classOf[Replica]) + val replicaT1p1: Replica = createNiceMock(classOf[Replica]) // one future replica mock because our mocking methods return same values for both future replicas - val futureReplicaT1p0 = createNiceMock(classOf[Replica]) - val futureReplicaT1p1 = createNiceMock(classOf[Replica]) - val partitionT1p0 = createMock(classOf[Partition]) - val partitionT1p1 = createMock(classOf[Partition]) - val replicaManager = createMock(classOf[ReplicaManager]) + val futureReplicaT1p0: Replica = createNiceMock(classOf[Replica]) + val futureReplicaT1p1: Replica = createNiceMock(classOf[Replica]) + val partitionT1p0: Partition = createMock(classOf[Partition]) + val partitionT1p1: Partition = createMock(classOf[Partition]) + val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager]) val responseCallback: Capture[Seq[(TopicPartition, FetchPartitionData)] => Unit] = EasyMock.newCapture() val leaderEpoch = 2 @@ -228,14 +228,14 @@ class ReplicaAlterLogDirsThreadTest { // Setup all the dependencies val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234")) - val quotaManager = createNiceMock(classOf[ReplicationQuotaManager]) - val futureReplicaLeaderEpochs = createMock(classOf[LeaderEpochFileCache]) - val logManager = createMock(classOf[LogManager]) - val replica = createNiceMock(classOf[Replica]) + val quotaManager: ReplicationQuotaManager = createNiceMock(classOf[ReplicationQuotaManager]) + val futureReplicaLeaderEpochs: LeaderEpochFileCache = createMock(classOf[LeaderEpochFileCache]) + val logManager: LogManager = createMock(classOf[LogManager]) + val replica: Replica = createNiceMock(classOf[Replica]) // one future replica mock because our mocking methods return same values for both future replicas - val futureReplica = createNiceMock(classOf[Replica]) - val partition = createMock(classOf[Partition]) - val replicaManager = createMock(classOf[ReplicaManager]) + val futureReplica: Replica = createNiceMock(classOf[Replica]) + val partition: Partition = createMock(classOf[Partition]) + val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager]) val responseCallback: Capture[Seq[(TopicPartition, FetchPartitionData)] => Unit] = EasyMock.newCapture() val leaderEpoch = 5 @@ -301,13 +301,13 @@ class ReplicaAlterLogDirsThreadTest { // Setup all the dependencies val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234")) - val quotaManager = createNiceMock(classOf[ReplicationQuotaManager]) - val logManager = createMock(classOf[LogManager]) - val replica = createNiceMock(classOf[Replica]) - val futureReplica = createNiceMock(classOf[Replica]) - val futureReplicaLeaderEpochs = createMock(classOf[LeaderEpochFileCache]) - val partition = createMock(classOf[Partition]) - val replicaManager = createMock(classOf[ReplicaManager]) + val quotaManager: ReplicationQuotaManager = createNiceMock(classOf[ReplicationQuotaManager]) + val logManager: LogManager = createMock(classOf[LogManager]) + val replica: Replica = createNiceMock(classOf[Replica]) + val futureReplica: Replica = createNiceMock(classOf[Replica]) + val futureReplicaLeaderEpochs: LeaderEpochFileCache = createMock(classOf[LeaderEpochFileCache]) + val partition: Partition = createMock(classOf[Partition]) + val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager]) val responseCallback: Capture[Seq[(TopicPartition, FetchPartitionData)] => Unit] = EasyMock.newCapture() val initialFetchOffset = 100 @@ -356,13 +356,13 @@ class ReplicaAlterLogDirsThreadTest { // Setup all the dependencies val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234")) - val quotaManager = createNiceMock(classOf[kafka.server.ReplicationQuotaManager]) - val futureReplicaLeaderEpochs = createMock(classOf[LeaderEpochFileCache]) - val logManager = createMock(classOf[kafka.log.LogManager]) - val replica = createNiceMock(classOf[Replica]) - val futureReplica = createNiceMock(classOf[Replica]) - val partition = createMock(classOf[Partition]) - val replicaManager = createMock(classOf[kafka.server.ReplicaManager]) + val quotaManager: ReplicationQuotaManager = createNiceMock(classOf[ReplicationQuotaManager]) + val futureReplicaLeaderEpochs: LeaderEpochFileCache = createMock(classOf[LeaderEpochFileCache]) + val logManager: LogManager = createMock(classOf[LogManager]) + val replica: Replica = createNiceMock(classOf[Replica]) + val futureReplica: Replica = createNiceMock(classOf[Replica]) + val partition: Partition = createMock(classOf[Partition]) + val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager]) val responseCallback: Capture[Seq[(TopicPartition, FetchPartitionData)] => Unit] = EasyMock.newCapture() val futureReplicaLeaderEpoch = 1 @@ -439,13 +439,13 @@ class ReplicaAlterLogDirsThreadTest { //Setup all dependencies val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234")) - val quotaManager = createNiceMock(classOf[ReplicationQuotaManager]) - val futureReplicaLeaderEpochs = createMock(classOf[LeaderEpochFileCache]) - val logManager = createMock(classOf[LogManager]) - val replica = createNiceMock(classOf[Replica]) - val futureReplica = createNiceMock(classOf[Replica]) - val partition = createMock(classOf[Partition]) - val replicaManager = createMock(classOf[ReplicaManager]) + val quotaManager: ReplicationQuotaManager = createNiceMock(classOf[ReplicationQuotaManager]) + val futureReplicaLeaderEpochs: LeaderEpochFileCache = createMock(classOf[LeaderEpochFileCache]) + val logManager: LogManager = createMock(classOf[LogManager]) + val replica: Replica = createNiceMock(classOf[Replica]) + val futureReplica: Replica = createNiceMock(classOf[Replica]) + val partition: Partition = createMock(classOf[Partition]) + val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager]) val responseCallback: Capture[Seq[(TopicPartition, FetchPartitionData)] => Unit] = EasyMock.newCapture() val leaderEpoch = 5 @@ -494,12 +494,12 @@ class ReplicaAlterLogDirsThreadTest { //Setup all dependencies val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234")) - val quotaManager = createNiceMock(classOf[ReplicationQuotaManager]) - val logManager = createMock(classOf[LogManager]) - val replica = createNiceMock(classOf[Replica]) - val futureReplica = createNiceMock(classOf[Replica]) - val partition = createMock(classOf[Partition]) - val replicaManager = createMock(classOf[ReplicaManager]) + val quotaManager: ReplicationQuotaManager = createNiceMock(classOf[ReplicationQuotaManager]) + val logManager: LogManager = createMock(classOf[LogManager]) + val replica: Replica = createNiceMock(classOf[Replica]) + val futureReplica: Replica = createNiceMock(classOf[Replica]) + val partition: Partition = createMock(classOf[Partition]) + val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager]) //Stubs expect(futureReplica.logStartOffset).andReturn(123).anyTimes() @@ -543,12 +543,12 @@ class ReplicaAlterLogDirsThreadTest { //Setup all dependencies val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234")) - val quotaManager = createNiceMock(classOf[ReplicationQuotaManager]) - val logManager = createMock(classOf[LogManager]) - val replica = createNiceMock(classOf[Replica]) - val futureReplica = createNiceMock(classOf[Replica]) - val partition = createMock(classOf[Partition]) - val replicaManager = createMock(classOf[ReplicaManager]) + val quotaManager: ReplicationQuotaManager = createNiceMock(classOf[ReplicationQuotaManager]) + val logManager: LogManager = createMock(classOf[LogManager]) + val replica: Replica = createNiceMock(classOf[Replica]) + val futureReplica: Replica = createNiceMock(classOf[Replica]) + val partition: Partition = createMock(classOf[Partition]) + val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager]) //Stubs expect(futureReplica.logStartOffset).andReturn(123).anyTimes() diff --git a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala index c9d9b966964..93a7a1e6aaa 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala @@ -80,7 +80,7 @@ class ReplicaFetcherThreadTest { props.put(KafkaConfig.InterBrokerProtocolVersionProp, "0.10.2") props.put(KafkaConfig.LogMessageFormatVersionProp, "0.10.2") val config = KafkaConfig.fromProps(props) - val leaderEndpoint = createMock(classOf[BlockingSend]) + val leaderEndpoint: BlockingSend = createMock(classOf[BlockingSend]) expect(leaderEndpoint.sendRequest(anyObject())).andAnswer(new IAnswer[ClientResponse] { override def answer(): ClientResponse = { toFail = true // assert no leader request is sent @@ -124,7 +124,7 @@ class ReplicaFetcherThreadTest { val props = TestUtils.createBrokerConfig(1, "localhost:1234") props.put(KafkaConfig.InterBrokerProtocolVersionProp, "1.0.0") val config = KafkaConfig.fromProps(props) - val leaderEndpoint = createMock(classOf[BlockingSend]) + val leaderEndpoint: BlockingSend = createMock(classOf[BlockingSend]) expect(leaderEndpoint.sendRequest(anyObject())).andAnswer(new IAnswer[ClientResponse] { override def answer(): ClientResponse = { @@ -163,13 +163,13 @@ class ReplicaFetcherThreadTest { val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234")) //Setup all dependencies - val quota = createNiceMock(classOf[ReplicationQuotaManager]) - val leaderEpochs = createNiceMock(classOf[LeaderEpochFileCache]) - val logManager = createMock(classOf[LogManager]) - val replicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager]) - val replica = createNiceMock(classOf[Replica]) - val partition = createMock(classOf[Partition]) - val replicaManager = createMock(classOf[ReplicaManager]) + val quota: ReplicationQuotaManager = createNiceMock(classOf[ReplicationQuotaManager]) + val leaderEpochs: LeaderEpochFileCache = createNiceMock(classOf[LeaderEpochFileCache]) + val logManager: LogManager = createMock(classOf[LogManager]) + val replicaAlterLogDirsManager: ReplicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager]) + val replica: Replica = createNiceMock(classOf[Replica]) + val partition: Partition = createMock(classOf[Partition]) + val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager]) val leaderEpoch = 5 @@ -262,7 +262,7 @@ class ReplicaFetcherThreadTest { def shouldHandleExceptionFromBlockingSend(): Unit = { val props = TestUtils.createBrokerConfig(1, "localhost:1234") val config = KafkaConfig.fromProps(props) - val mockBlockingSend = createMock(classOf[BlockingSend]) + val mockBlockingSend: BlockingSend = createMock(classOf[BlockingSend]) expect(mockBlockingSend.sendRequest(anyObject())).andThrow(new NullPointerException).once() replay(mockBlockingSend) @@ -296,12 +296,12 @@ class ReplicaFetcherThreadTest { val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234")) //Setup all dependencies - val leaderEpochs = createNiceMock(classOf[LeaderEpochFileCache]) - val logManager = createMock(classOf[LogManager]) - val replicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager]) - val replica = createNiceMock(classOf[Replica]) - val partition = createMock(classOf[Partition]) - val replicaManager = createMock(classOf[ReplicaManager]) + val leaderEpochs: LeaderEpochFileCache = createNiceMock(classOf[LeaderEpochFileCache]) + val logManager: LogManager = createMock(classOf[LogManager]) + val replicaAlterLogDirsManager: ReplicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager]) + val replica: Replica = createNiceMock(classOf[Replica]) + val partition: Partition = createMock(classOf[Partition]) + val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager]) val leaderEpoch = 5 @@ -356,13 +356,13 @@ class ReplicaFetcherThreadTest { // Setup all the dependencies val configs = TestUtils.createBrokerConfigs(1, "localhost:1234").map(KafkaConfig.fromProps) - val quota = createNiceMock(classOf[ReplicationQuotaManager]) - val leaderEpochs = createMock(classOf[LeaderEpochFileCache]) - val logManager = createMock(classOf[LogManager]) - val replicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager]) - val replica = createNiceMock(classOf[Replica]) - val partition = createMock(classOf[Partition]) - val replicaManager = createMock(classOf[ReplicaManager]) + val quota: ReplicationQuotaManager = createNiceMock(classOf[ReplicationQuotaManager]) + val leaderEpochs: LeaderEpochFileCache = createMock(classOf[LeaderEpochFileCache]) + val logManager: LogManager = createMock(classOf[LogManager]) + val replicaAlterLogDirsManager: ReplicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager]) + val replica: Replica = createNiceMock(classOf[Replica]) + val partition: Partition = createMock(classOf[Partition]) + val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager]) val leaderEpoch = 5 val initialLEO = 200 @@ -405,13 +405,13 @@ class ReplicaFetcherThreadTest { // Setup all the dependencies val configs = TestUtils.createBrokerConfigs(1, "localhost:1234").map(KafkaConfig.fromProps) - val quota = createNiceMock(classOf[ReplicationQuotaManager]) - val leaderEpochs = createMock(classOf[LeaderEpochFileCache]) - val logManager = createMock(classOf[LogManager]) - val replicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager]) - val replica = createNiceMock(classOf[Replica]) - val partition = createMock(classOf[Partition]) - val replicaManager = createMock(classOf[ReplicaManager]) + val quota: ReplicationQuotaManager = createNiceMock(classOf[ReplicationQuotaManager]) + val leaderEpochs: LeaderEpochFileCache = createMock(classOf[LeaderEpochFileCache]) + val logManager: LogManager = createMock(classOf[LogManager]) + val replicaAlterLogDirsManager: ReplicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager]) + val replica: Replica = createNiceMock(classOf[Replica]) + val partition: Partition = createMock(classOf[Partition]) + val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager]) val leaderEpochAtFollower = 5 val leaderEpochAtLeader = 4 @@ -459,13 +459,13 @@ class ReplicaFetcherThreadTest { val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234")) // Setup all dependencies - val quota = createNiceMock(classOf[ReplicationQuotaManager]) - val leaderEpochs = createNiceMock(classOf[LeaderEpochFileCache]) - val logManager = createMock(classOf[LogManager]) - val replicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager]) - val replica = createNiceMock(classOf[Replica]) - val partition = createMock(classOf[Partition]) - val replicaManager = createMock(classOf[ReplicaManager]) + val quota: ReplicationQuotaManager = createNiceMock(classOf[ReplicationQuotaManager]) + val leaderEpochs: LeaderEpochFileCache = createNiceMock(classOf[LeaderEpochFileCache]) + val logManager: LogManager = createMock(classOf[LogManager]) + val replicaAlterLogDirsManager: ReplicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager]) + val replica: Replica = createNiceMock(classOf[Replica]) + val partition: Partition = createMock(classOf[Partition]) + val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager]) val initialLEO = 200 @@ -530,13 +530,13 @@ class ReplicaFetcherThreadTest { val config = KafkaConfig.fromProps(props) // Setup all dependencies - val quota = createNiceMock(classOf[ReplicationQuotaManager]) - val leaderEpochs = createNiceMock(classOf[LeaderEpochFileCache]) - val logManager = createMock(classOf[LogManager]) - val replicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager]) - val replica = createNiceMock(classOf[Replica]) - val partition = createMock(classOf[Partition]) - val replicaManager = createMock(classOf[ReplicaManager]) + val quota: ReplicationQuotaManager = createNiceMock(classOf[ReplicationQuotaManager]) + val leaderEpochs: LeaderEpochFileCache = createNiceMock(classOf[LeaderEpochFileCache]) + val logManager: LogManager = createMock(classOf[LogManager]) + val replicaAlterLogDirsManager: ReplicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager]) + val replica: Replica = createNiceMock(classOf[Replica]) + val partition: Partition = createMock(classOf[Partition]) + val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager]) val initialLEO = 200 @@ -591,13 +591,13 @@ class ReplicaFetcherThreadTest { // Setup all the dependencies val configs = TestUtils.createBrokerConfigs(1, "localhost:1234").map(KafkaConfig.fromProps) - val quota = createNiceMock(classOf[ReplicationQuotaManager]) - val leaderEpochs = createNiceMock(classOf[LeaderEpochFileCache]) - val logManager = createMock(classOf[LogManager]) - val replicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager]) - val replica = createNiceMock(classOf[Replica]) - val partition = createMock(classOf[Partition]) - val replicaManager = createMock(classOf[ReplicaManager]) + val quota: ReplicationQuotaManager = createNiceMock(classOf[ReplicationQuotaManager]) + val leaderEpochs: LeaderEpochFileCache = createNiceMock(classOf[LeaderEpochFileCache]) + val logManager: LogManager = createMock(classOf[LogManager]) + val replicaAlterLogDirsManager: ReplicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager]) + val replica: Replica = createNiceMock(classOf[Replica]) + val partition: Partition = createMock(classOf[Partition]) + val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager]) val initialFetchOffset = 100 val initialLeo = 300 @@ -636,13 +636,13 @@ class ReplicaFetcherThreadTest { // Setup all the dependencies val configs = TestUtils.createBrokerConfigs(1, "localhost:1234").map(KafkaConfig.fromProps) - val quota = createNiceMock(classOf[kafka.server.ReplicationQuotaManager]) - val leaderEpochs = createNiceMock(classOf[LeaderEpochFileCache]) - val logManager = createMock(classOf[kafka.log.LogManager]) - val replicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager]) - val replica = createNiceMock(classOf[Replica]) - val partition = createMock(classOf[Partition]) - val replicaManager = createMock(classOf[kafka.server.ReplicaManager]) + val quota: ReplicationQuotaManager = createNiceMock(classOf[ReplicationQuotaManager]) + val leaderEpochs: LeaderEpochFileCache = createNiceMock(classOf[LeaderEpochFileCache]) + val logManager: LogManager = createMock(classOf[LogManager]) + val replicaAlterLogDirsManager: ReplicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager]) + val replica: Replica = createNiceMock(classOf[Replica]) + val partition: Partition = createMock(classOf[Partition]) + val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager]) val leaderEpoch = 5 val highWaterMark = 100 @@ -694,13 +694,13 @@ class ReplicaFetcherThreadTest { val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234")) //Setup all stubs - val quota = createNiceMock(classOf[ReplicationQuotaManager]) - val leaderEpochs = createNiceMock(classOf[LeaderEpochFileCache]) - val logManager = createNiceMock(classOf[LogManager]) - val replicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager]) - val replica = createNiceMock(classOf[Replica]) - val partition = createMock(classOf[Partition]) - val replicaManager = createNiceMock(classOf[ReplicaManager]) + val quota: ReplicationQuotaManager = createNiceMock(classOf[ReplicationQuotaManager]) + val leaderEpochs: LeaderEpochFileCache = createNiceMock(classOf[LeaderEpochFileCache]) + val logManager: LogManager = createNiceMock(classOf[LogManager]) + val replicaAlterLogDirsManager: ReplicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager]) + val replica: Replica = createNiceMock(classOf[Replica]) + val partition: Partition = createMock(classOf[Partition]) + val replicaManager: ReplicaManager = createNiceMock(classOf[ReplicaManager]) val leaderEpoch = 4 @@ -747,13 +747,13 @@ class ReplicaFetcherThreadTest { val initialLEO = 100 //Setup all stubs - val quota = createNiceMock(classOf[ReplicationQuotaManager]) - val leaderEpochs = createNiceMock(classOf[LeaderEpochFileCache]) - val logManager = createNiceMock(classOf[LogManager]) - val replicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager]) - val replica = createNiceMock(classOf[Replica]) - val partition = createMock(classOf[Partition]) - val replicaManager = createNiceMock(classOf[ReplicaManager]) + val quota: ReplicationQuotaManager = createNiceMock(classOf[ReplicationQuotaManager]) + val leaderEpochs: LeaderEpochFileCache = createNiceMock(classOf[LeaderEpochFileCache]) + val logManager: LogManager = createNiceMock(classOf[LogManager]) + val replicaAlterLogDirsManager: ReplicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager]) + val replica: Replica = createNiceMock(classOf[Replica]) + val partition: Partition = createMock(classOf[Partition]) + val replicaManager: ReplicaManager = createNiceMock(classOf[ReplicaManager]) //Stub return values expect(partition.truncateTo(capture(truncateToCapture), anyBoolean())).once @@ -797,7 +797,7 @@ class ReplicaFetcherThreadTest { def shouldCatchExceptionFromBlockingSendWhenShuttingDownReplicaFetcherThread(): Unit = { val props = TestUtils.createBrokerConfig(1, "localhost:1234") val config = KafkaConfig.fromProps(props) - val mockBlockingSend = createMock(classOf[BlockingSend]) + val mockBlockingSend: BlockingSend = createMock(classOf[BlockingSend]) expect(mockBlockingSend.close()).andThrow(new IllegalArgumentException()).once() replay(mockBlockingSend) diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala index 0c6d09c2fc1..5b2f2aed77b 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala @@ -21,7 +21,7 @@ import java.util.{Optional, Properties} import java.util.concurrent.atomic.AtomicBoolean import kafka.cluster.{Partition, Replica} -import kafka.log.{Log, LogOffsetSnapshot} +import kafka.log.{Log, LogManager, LogOffsetSnapshot} import kafka.utils._ import kafka.zk.KafkaZkClient import org.apache.kafka.common.TopicPartition @@ -150,7 +150,7 @@ class ReplicaManagerQuotasTest { // Set up DelayedFetch where there is data to return to a follower replica, either in-sync or out of sync def setupDelayedFetch(isReplicaInSync: Boolean): DelayedFetch = { val endOffsetMetadata = new LogOffsetMetadata(messageOffset = 100L, segmentBaseOffset = 0L, relativePositionInSegment = 500) - val partition = EasyMock.createMock(classOf[Partition]) + val partition: Partition = EasyMock.createMock(classOf[Partition]) val offsetSnapshot = LogOffsetSnapshot( logStartOffset = 0L, @@ -160,7 +160,7 @@ class ReplicaManagerQuotasTest { EasyMock.expect(partition.fetchOffsetSnapshot(Optional.empty(), fetchOnlyFromLeader = true)) .andReturn(offsetSnapshot) - val replicaManager = EasyMock.createMock(classOf[ReplicaManager]) + val replicaManager: ReplicaManager = EasyMock.createMock(classOf[ReplicaManager]) EasyMock.expect(replicaManager.getPartitionOrException( EasyMock.anyObject[TopicPartition], EasyMock.anyBoolean())) .andReturn(partition).anyTimes() @@ -191,11 +191,11 @@ class ReplicaManagerQuotasTest { } def setUpMocks(fetchInfo: Seq[(TopicPartition, PartitionData)], record: SimpleRecord = this.record, bothReplicasInSync: Boolean = false) { - val zkClient = EasyMock.createMock(classOf[KafkaZkClient]) - val scheduler = createNiceMock(classOf[KafkaScheduler]) + val zkClient: KafkaZkClient = EasyMock.createMock(classOf[KafkaZkClient]) + val scheduler: KafkaScheduler = createNiceMock(classOf[KafkaScheduler]) //Create log which handles both a regular read and a 0 bytes read - val log = createNiceMock(classOf[Log]) + val log: Log = createNiceMock(classOf[Log]) expect(log.logStartOffset).andReturn(0L).anyTimes() expect(log.logEndOffset).andReturn(20L).anyTimes() expect(log.logEndOffsetMetadata).andReturn(new LogOffsetMetadata(20L)).anyTimes() @@ -224,7 +224,7 @@ class ReplicaManagerQuotasTest { replay(log) //Create log manager - val logManager = createMock(classOf[kafka.log.LogManager]) + val logManager: LogManager = createMock(classOf[LogManager]) //Return the same log for each partition as it doesn't matter expect(logManager.getLog(anyObject(), anyBoolean())).andReturn(Some(log)).anyTimes() @@ -262,7 +262,7 @@ class ReplicaManagerQuotasTest { } def mockQuota(bound: Long): ReplicaQuota = { - val quota = createMock(classOf[ReplicaQuota]) + val quota: ReplicaQuota = createMock(classOf[ReplicaQuota]) expect(quota.isThrottled(anyObject())).andReturn(true).anyTimes() quota } diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 44017481d4c..c4ca2cbce98 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -148,7 +148,7 @@ class ReplicaManagerTest { val logProps = new Properties() val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)), LogConfig(logProps)) val aliveBrokers = Seq(createBroker(0, "host0", 0), createBroker(1, "host1", 1)) - val metadataCache = EasyMock.createMock(classOf[MetadataCache]) + val metadataCache: MetadataCache = EasyMock.createMock(classOf[MetadataCache]) EasyMock.expect(metadataCache.getAliveBrokers).andReturn(aliveBrokers).anyTimes() EasyMock.replay(metadataCache) val rm = new ReplicaManager(config, metrics, time, kafkaZkClient, new MockScheduler(time), mockLogMgr, @@ -594,7 +594,7 @@ class ReplicaManagerTest { val mockScheduler = new MockScheduler(time) val mockBrokerTopicStats = new BrokerTopicStats val mockLogDirFailureChannel = new LogDirFailureChannel(config.logDirs.size) - val mockLeaderEpochCache = EasyMock.createMock(classOf[LeaderEpochFileCache]) + val mockLeaderEpochCache: LeaderEpochFileCache = EasyMock.createMock(classOf[LeaderEpochFileCache]) EasyMock.expect(mockLeaderEpochCache.latestEpoch).andReturn(leaderEpochFromLeader) EasyMock.expect(mockLeaderEpochCache.endOffsetFor(leaderEpochFromLeader)) .andReturn((leaderEpochFromLeader, localLogOffset)) @@ -620,7 +620,7 @@ class ReplicaManagerTest { } // Expect to call LogManager.truncateTo exactly once - val mockLogMgr = EasyMock.createMock(classOf[LogManager]) + val mockLogMgr: LogManager = EasyMock.createMock(classOf[LogManager]) EasyMock.expect(mockLogMgr.liveLogDirs).andReturn(config.logDirs.map(new File(_).getAbsoluteFile)).anyTimes EasyMock.expect(mockLogMgr.currentDefaultConfig).andReturn(LogConfig()) EasyMock.expect(mockLogMgr.getOrCreateLog(new TopicPartition(topic, topicPartition), @@ -634,7 +634,7 @@ class ReplicaManagerTest { val aliveBrokerIds = Seq[Integer](followerBrokerId, leaderBrokerId) val aliveBrokers = aliveBrokerIds.map(brokerId => createBroker(brokerId, s"host$brokerId", brokerId)) - val metadataCache = EasyMock.createMock(classOf[MetadataCache]) + val metadataCache: MetadataCache = EasyMock.createMock(classOf[MetadataCache]) EasyMock.expect(metadataCache.getAliveBrokers).andReturn(aliveBrokers).anyTimes aliveBrokerIds.foreach { brokerId => EasyMock.expect(metadataCache.isBrokerAlive(EasyMock.eq(brokerId))).andReturn(true).anyTimes @@ -793,7 +793,7 @@ class ReplicaManagerTest { val logProps = new Properties() val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)), LogConfig(logProps)) val aliveBrokers = aliveBrokerIds.map(brokerId => createBroker(brokerId, s"host$brokerId", brokerId)) - val metadataCache = EasyMock.createMock(classOf[MetadataCache]) + val metadataCache: MetadataCache = EasyMock.createMock(classOf[MetadataCache]) EasyMock.expect(metadataCache.getAliveBrokers).andReturn(aliveBrokers).anyTimes() aliveBrokerIds.foreach { brokerId => EasyMock.expect(metadataCache.isBrokerAlive(EasyMock.eq(brokerId))).andReturn(true).anyTimes() diff --git a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala index 67d083c7ab9..1bc02578636 100755 --- a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala +++ b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala @@ -105,7 +105,7 @@ class ServerStartupTest extends ZooKeeperTestHarness { @Test def testBrokerStateRunningAfterZK(): Unit = { val brokerId = 0 - val mockBrokerState = EasyMock.niceMock(classOf[kafka.server.BrokerState]) + val mockBrokerState: BrokerState = EasyMock.niceMock(classOf[BrokerState]) class BrokerStateInterceptor() extends BrokerState { override def newState(newState: BrokerStates): Unit = { diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala index 1bb0e20ed6d..94f9a163413 100644 --- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala +++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala @@ -21,7 +21,7 @@ import java.io.File import kafka.api._ import kafka.utils._ import kafka.cluster.Replica -import kafka.log.Log +import kafka.log.{Log, LogManager} import kafka.server.QuotaFactory.UnboundedQuota import kafka.zk.KafkaZkClient import org.apache.kafka.common.metrics.Metrics @@ -70,15 +70,15 @@ class SimpleFetchTest { @Before def setUp() { // create nice mock since we don't particularly care about zkclient calls - val kafkaZkClient = EasyMock.createNiceMock(classOf[KafkaZkClient]) + val kafkaZkClient: KafkaZkClient = EasyMock.createNiceMock(classOf[KafkaZkClient]) EasyMock.replay(kafkaZkClient) // create nice mock since we don't particularly care about scheduler calls - val scheduler = EasyMock.createNiceMock(classOf[KafkaScheduler]) + val scheduler: KafkaScheduler = EasyMock.createNiceMock(classOf[KafkaScheduler]) EasyMock.replay(scheduler) // create the log which takes read with either HW max offset or none max offset - val log = EasyMock.createNiceMock(classOf[Log]) + val log: Log = EasyMock.createNiceMock(classOf[Log]) EasyMock.expect(log.logStartOffset).andReturn(0).anyTimes() EasyMock.expect(log.logEndOffset).andReturn(leaderLEO).anyTimes() EasyMock.expect(log.dir).andReturn(TestUtils.tempDir()).anyTimes() @@ -106,7 +106,7 @@ class SimpleFetchTest { EasyMock.replay(log) // create the log manager that is aware of this mock log - val logManager = EasyMock.createMock(classOf[kafka.log.LogManager]) + val logManager: LogManager = EasyMock.createMock(classOf[LogManager]) EasyMock.expect(logManager.getLog(topicPartition, false)).andReturn(Some(log)).anyTimes() EasyMock.expect(logManager.liveLogDirs).andReturn(Array.empty[File]).anyTimes() EasyMock.replay(logManager) diff --git a/core/src/test/scala/unit/kafka/server/ThrottledChannelExpirationTest.scala b/core/src/test/scala/unit/kafka/server/ThrottledChannelExpirationTest.scala index ff781a2e159..c46404addca 100644 --- a/core/src/test/scala/unit/kafka/server/ThrottledChannelExpirationTest.scala +++ b/core/src/test/scala/unit/kafka/server/ThrottledChannelExpirationTest.scala @@ -50,7 +50,7 @@ class ThrottledChannelExpirationTest { val request = builder.build() val buffer = request.serialize(new RequestHeader(builder.apiKey, request.version, "", 0)) - val requestChannelMetrics = EasyMock.createNiceMock(classOf[RequestChannel.Metrics]) + val requestChannelMetrics: RequestChannel.Metrics = EasyMock.createNiceMock(classOf[RequestChannel.Metrics]) // read the header from the buffer first so that the body can be read next from the Request constructor val header = RequestHeader.parse(buffer) @@ -122,4 +122,4 @@ class ThrottledChannelExpirationTest { time.sleep(10) } } -} \ No newline at end of file +} diff --git a/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala b/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala index 86a087b480d..7adc2047515 100644 --- a/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala +++ b/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala @@ -21,6 +21,7 @@ import java.util.Optional import java.util.concurrent.atomic.AtomicBoolean import kafka.cluster.Replica +import kafka.log.{Log, LogManager} import kafka.server._ import kafka.utils.{MockTime, TestUtils} import org.apache.kafka.common.TopicPartition @@ -46,9 +47,9 @@ class OffsetsForLeaderEpochTest { val request = Map(tp -> new OffsetsForLeaderEpochRequest.PartitionData(Optional.empty(), epochRequested)) //Stubs - val mockLog = createNiceMock(classOf[kafka.log.Log]) - val mockCache = createNiceMock(classOf[kafka.server.epoch.LeaderEpochFileCache]) - val logManager = createNiceMock(classOf[kafka.log.LogManager]) + val mockLog: Log = createNiceMock(classOf[Log]) + val mockCache: LeaderEpochFileCache = createNiceMock(classOf[LeaderEpochFileCache]) + val logManager: LogManager = createNiceMock(classOf[LogManager]) expect(mockCache.endOffsetFor(epochRequested)).andReturn(epochAndOffset) expect(mockLog.leaderEpochCache).andReturn(mockCache).anyTimes() expect(logManager.liveLogDirs).andReturn(Array.empty[File]).anyTimes() @@ -72,7 +73,7 @@ class OffsetsForLeaderEpochTest { @Test def shouldReturnNoLeaderForPartitionIfThrown(): Unit = { - val logManager = createNiceMock(classOf[kafka.log.LogManager]) + val logManager: LogManager = createNiceMock(classOf[LogManager]) expect(logManager.liveLogDirs).andReturn(Array.empty[File]).anyTimes() replay(logManager) @@ -95,7 +96,7 @@ class OffsetsForLeaderEpochTest { @Test def shouldReturnUnknownTopicOrPartitionIfThrown(): Unit = { - val logManager = createNiceMock(classOf[kafka.log.LogManager]) + val logManager: LogManager = createNiceMock(classOf[LogManager]) expect(logManager.liveLogDirs).andReturn(Array.empty[File]).anyTimes() replay(logManager) diff --git a/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala b/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala index 65273eb01a5..4bf74713684 100644 --- a/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala +++ b/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala @@ -17,9 +17,10 @@ package kafka.utils -import kafka.server.{KafkaConfig, ReplicaFetcherManager} +import kafka.server.{KafkaConfig, ReplicaFetcherManager, ReplicaManager} import kafka.api.LeaderAndIsr import kafka.controller.LeaderIsrAndControllerEpoch +import kafka.log.{Log, LogManager} import kafka.zk._ import org.apache.kafka.common.TopicPartition import org.junit.Assert._ @@ -48,16 +49,16 @@ class ReplicationUtilsTest extends ZooKeeperTestHarness { @Test def testUpdateLeaderAndIsr() { val configs = TestUtils.createBrokerConfigs(1, zkConnect).map(KafkaConfig.fromProps) - val log = EasyMock.createMock(classOf[kafka.log.Log]) + val log: Log = EasyMock.createMock(classOf[Log]) EasyMock.expect(log.logEndOffset).andReturn(20).anyTimes() EasyMock.expect(log) EasyMock.replay(log) - val logManager = EasyMock.createMock(classOf[kafka.log.LogManager]) + val logManager: LogManager = EasyMock.createMock(classOf[LogManager]) EasyMock.expect(logManager.getLog(new TopicPartition(topic, partition), false)).andReturn(Some(log)).anyTimes() EasyMock.replay(logManager) - val replicaManager = EasyMock.createMock(classOf[kafka.server.ReplicaManager]) + val replicaManager: ReplicaManager = EasyMock.createMock(classOf[ReplicaManager]) EasyMock.expect(replicaManager.config).andReturn(configs.head) EasyMock.expect(replicaManager.logManager).andReturn(logManager) EasyMock.expect(replicaManager.replicaFetcherManager).andReturn(EasyMock.createMock(classOf[ReplicaFetcherManager])) diff --git a/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala index 9f81c18c7b1..ec4eed29536 100644 --- a/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala +++ b/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala @@ -141,7 +141,7 @@ class AdminZkClientTest extends ZooKeeperTestHarness with Logging with RackAware val topic = "test.topic" // simulate the ZK interactions that can happen when a topic is concurrently created by multiple processes - val zkMock = EasyMock.createNiceMock(classOf[KafkaZkClient]) + val zkMock: KafkaZkClient = EasyMock.createNiceMock(classOf[KafkaZkClient]) EasyMock.expect(zkMock.topicExists(topic)).andReturn(false) EasyMock.expect(zkMock.getAllTopicsInCluster).andReturn(Seq("some.topic", topic, "some.other.topic")) EasyMock.replay(zkMock) diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index e11ded1d94a..7dd3604db08 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -52,7 +52,7 @@ versions += [ apacheds: "2.0.0-M24", argparse4j: "0.7.0", bcpkix: "1.60", - easymock: "3.6", + easymock: "4.0.1", jackson: "2.9.7", jetty: "9.4.12.v20180830", jersey: "2.27", @@ -73,20 +73,19 @@ versions += [ kafka_11: "1.1.1", kafka_20: "2.0.0", lz4: "1.5.0", - mavenArtifact: "3.5.4", + mavenArtifact: "3.6.0", metrics: "2.2.0", mockito: "2.23.0", - // PowerMock 1.x doesn't support Java 9, so use PowerMock 2.0.0 beta - powermock: "2.0.0-beta.5", + powermock: "2.0.0-RC.3", reflections: "0.9.11", rocksDB: "5.14.2", scalatest: "3.0.5", scoverage: "1.3.1", slf4j: "1.7.25", snappy: "1.1.7.2", - zkclient: "0.10", + zkclient: "0.11", zookeeper: "3.4.13", - zstd: "1.3.5-4" + zstd: "1.3.7-1" ] libs += [