mirror of https://github.com/apache/kafka.git
MINOR: Update zstd, easymock, powermock, zkclient and build plugins (#5846)
EasyMock 4.0.x includes a change that relies on the caller for inferring the return type of mock creator methods. Updated a number of Scala tests for compilation and execution to succeed. The versions of EasyMock and PowerMock in this PR include full support for Java 11. Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>
This commit is contained in:
parent
ecb71cf471
commit
af2e6fb548
13
build.gradle
13
build.gradle
|
@ -29,11 +29,11 @@ buildscript {
|
|||
// For Apache Rat plugin to ignore non-Git files
|
||||
classpath "org.ajoberstar:grgit:1.9.3"
|
||||
classpath 'com.github.ben-manes:gradle-versions-plugin:0.20.0'
|
||||
classpath 'org.scoverage:gradle-scoverage:2.4.0'
|
||||
classpath 'com.github.jengelman.gradle.plugins:shadow:4.0.0'
|
||||
classpath 'org.owasp:dependency-check-gradle:3.3.2'
|
||||
classpath "com.diffplug.spotless:spotless-plugin-gradle:3.15.0"
|
||||
classpath "gradle.plugin.com.github.spotbugs:spotbugs-gradle-plugin:1.6.4"
|
||||
classpath 'org.scoverage:gradle-scoverage:2.5.0'
|
||||
classpath 'com.github.jengelman.gradle.plugins:shadow:4.0.2'
|
||||
classpath 'org.owasp:dependency-check-gradle:3.3.4'
|
||||
classpath "com.diffplug.spotless:spotless-plugin-gradle:3.16.0"
|
||||
classpath "gradle.plugin.com.github.spotbugs:spotbugs-gradle-plugin:1.6.5"
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -369,8 +369,7 @@ subprojects {
|
|||
|
||||
if (!JavaVersion.current().isJava11Compatible()) {
|
||||
spotbugs {
|
||||
// 3.1.6 has a regression that breaks our build, seems to be https://github.com/spotbugs/spotbugs/pull/688
|
||||
toolVersion = '3.1.5'
|
||||
toolVersion = '3.1.8'
|
||||
excludeFilter = file("$rootDir/gradle/spotbugs-exclude.xml")
|
||||
ignoreFailures = false
|
||||
}
|
||||
|
|
|
@ -30,8 +30,8 @@ import org.junit.Assert._
|
|||
|
||||
class DelayedFetchTest extends EasyMockSupport {
|
||||
private val maxBytes = 1024
|
||||
private val replicaManager = mock(classOf[ReplicaManager])
|
||||
private val replicaQuota = mock(classOf[ReplicaQuota])
|
||||
private val replicaManager: ReplicaManager = mock(classOf[ReplicaManager])
|
||||
private val replicaQuota: ReplicaQuota = mock(classOf[ReplicaQuota])
|
||||
|
||||
@Test
|
||||
def testFetchWithFencedEpoch(): Unit = {
|
||||
|
@ -58,7 +58,7 @@ class DelayedFetchTest extends EasyMockSupport {
|
|||
quota = replicaQuota,
|
||||
responseCallback = callback)
|
||||
|
||||
val partition = mock(classOf[Partition])
|
||||
val partition: Partition = mock(classOf[Partition])
|
||||
|
||||
EasyMock.expect(replicaManager.getPartitionOrException(topicPartition, expectLeader = true))
|
||||
.andReturn(partition)
|
||||
|
|
|
@ -30,7 +30,7 @@ import scala.collection.mutable
|
|||
|
||||
class InterBrokerSendThreadTest {
|
||||
private val time = new MockTime()
|
||||
private val networkClient = EasyMock.createMock(classOf[NetworkClient])
|
||||
private val networkClient: NetworkClient = EasyMock.createMock(classOf[NetworkClient])
|
||||
private val completionHandler = new StubCompletionHandler
|
||||
private val requestTimeoutMs = 1000
|
||||
|
||||
|
|
|
@ -142,7 +142,7 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
|
|||
val topic = "test.topic"
|
||||
|
||||
// simulate the ZK interactions that can happen when a topic is concurrently created by multiple processes
|
||||
val zkMock = EasyMock.createNiceMock(classOf[ZkUtils])
|
||||
val zkMock: ZkUtils = EasyMock.createNiceMock(classOf[ZkUtils])
|
||||
EasyMock.expect(zkMock.pathExists(s"/brokers/topics/$topic")).andReturn(false)
|
||||
EasyMock.expect(zkMock.getAllTopics).andReturn(Seq("some.topic", topic, "some.other.topic"))
|
||||
EasyMock.replay(zkMock)
|
||||
|
|
|
@ -245,12 +245,12 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
|
|||
val configEntries = util.Collections.singletonList(new ConfigEntry("num.io.threads", "5"))
|
||||
val future = new KafkaFutureImpl[util.Map[ConfigResource, Config]]
|
||||
future.complete(util.Collections.singletonMap(resource, new Config(configEntries)))
|
||||
val describeResult = EasyMock.createNiceMock(classOf[DescribeConfigsResult])
|
||||
val describeResult: DescribeConfigsResult = EasyMock.createNiceMock(classOf[DescribeConfigsResult])
|
||||
EasyMock.expect(describeResult.all()).andReturn(future).once()
|
||||
|
||||
val alterFuture = new KafkaFutureImpl[Void]
|
||||
alterFuture.complete(null)
|
||||
val alterResult = EasyMock.createNiceMock(classOf[AlterConfigsResult])
|
||||
val alterResult: AlterConfigsResult = EasyMock.createNiceMock(classOf[AlterConfigsResult])
|
||||
EasyMock.expect(alterResult.all()).andReturn(alterFuture)
|
||||
|
||||
val mockAdminClient = new MockAdminClient(util.Collections.singletonList(node), node) {
|
||||
|
@ -622,7 +622,7 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
|
|||
|
||||
@Test
|
||||
def testQuotaDescribeEntities() {
|
||||
val zkClient = EasyMock.createNiceMock(classOf[KafkaZkClient])
|
||||
val zkClient: KafkaZkClient = EasyMock.createNiceMock(classOf[KafkaZkClient])
|
||||
|
||||
def checkEntities(opts: Array[String], expectedFetches: Map[String, Seq[String]], expectedEntityNames: Seq[String]) {
|
||||
val entity = ConfigCommand.parseEntity(new ConfigCommandOptions(opts :+ "--describe"))
|
||||
|
|
|
@ -268,7 +268,7 @@ class ReassignPartitionsCommandTest extends ZooKeeperTestHarness with Logging {
|
|||
|
||||
//Setup
|
||||
val zk = stubZKClient(existing)
|
||||
val admin = createMock(classOf[AdminZkClient])
|
||||
val admin: AdminZkClient = createMock(classOf[AdminZkClient])
|
||||
val propsCapture: Capture[Properties] = newCapture(CaptureType.ALL)
|
||||
val assigner = new ReassignPartitionsCommand(zk, None, proposed, Map.empty, admin)
|
||||
expect(admin.fetchEntityConfig(anyString(), anyString())).andStubReturn(new Properties)
|
||||
|
@ -294,7 +294,7 @@ class ReassignPartitionsCommandTest extends ZooKeeperTestHarness with Logging {
|
|||
|
||||
//Setup
|
||||
val zk = stubZKClient(existing)
|
||||
val admin = createMock(classOf[AdminZkClient])
|
||||
val admin: AdminZkClient = createMock(classOf[AdminZkClient])
|
||||
val propsCapture: Capture[Properties] = newCapture(CaptureType.ALL)
|
||||
val assigner = new ReassignPartitionsCommand(zk, None, proposed, Map.empty, admin)
|
||||
expect(admin.changeBrokerConfig(anyObject().asInstanceOf[List[Int]], capture(propsCapture))).anyTimes()
|
||||
|
@ -328,7 +328,7 @@ class ReassignPartitionsCommandTest extends ZooKeeperTestHarness with Logging {
|
|||
|
||||
//Setup
|
||||
val zk = stubZKClient(existing)
|
||||
val admin = createMock(classOf[AdminZkClient])
|
||||
val admin: AdminZkClient = createMock(classOf[AdminZkClient])
|
||||
val propsCapture: Capture[Properties] = newCapture(CaptureType.ALL)
|
||||
val assigner = new ReassignPartitionsCommand(zk, None, proposed, Map.empty, admin)
|
||||
expect(admin.changeBrokerConfig(anyObject().asInstanceOf[List[Int]], capture(propsCapture))).anyTimes()
|
||||
|
@ -364,7 +364,7 @@ class ReassignPartitionsCommandTest extends ZooKeeperTestHarness with Logging {
|
|||
|
||||
//Setup
|
||||
val zk = stubZKClient(brokers = brokers)
|
||||
val admin = createMock(classOf[AdminZkClient])
|
||||
val admin: AdminZkClient = createMock(classOf[AdminZkClient])
|
||||
val propsCapture: Capture[Properties] = newCapture(CaptureType.ALL)
|
||||
expect(admin.fetchEntityConfig(is(ConfigType.Topic), anyString())).andStubReturn(new Properties)
|
||||
expect(admin.changeBrokerConfig(anyObject().asInstanceOf[Seq[Int]], capture(propsCapture))).anyTimes()
|
||||
|
@ -399,7 +399,7 @@ class ReassignPartitionsCommandTest extends ZooKeeperTestHarness with Logging {
|
|||
|
||||
//Setup
|
||||
val zk = stubZKClient(brokers = Seq(100, 101))
|
||||
val admin = createMock(classOf[AdminZkClient])
|
||||
val admin: AdminZkClient = createMock(classOf[AdminZkClient])
|
||||
val propsCapture: Capture[Properties] = newCapture(CaptureType.ALL)
|
||||
expect(admin.fetchEntityConfig(is(ConfigType.Broker), anyString())).andStubReturn(new Properties)
|
||||
expect(admin.fetchEntityConfig(is(ConfigType.Topic), is("topic1"))).andStubReturn(copyOf(existingConfigs))
|
||||
|
@ -567,7 +567,7 @@ class ReassignPartitionsCommandTest extends ZooKeeperTestHarness with Logging {
|
|||
|
||||
def stubZKClient(existingAssignment: Map[TopicPartition, Seq[Int]] = Map[TopicPartition, Seq[Int]](),
|
||||
brokers: Seq[Int] = Seq[Int]()): KafkaZkClient = {
|
||||
val zkClient = createMock(classOf[KafkaZkClient])
|
||||
val zkClient: KafkaZkClient = createMock(classOf[KafkaZkClient])
|
||||
expect(zkClient.getReplicaAssignmentForTopics(anyObject().asInstanceOf[Set[String]])).andStubReturn(existingAssignment)
|
||||
expect(zkClient.getAllBrokersInCluster).andStubReturn(brokers.map(TestUtils.createBroker(_, "", 1)))
|
||||
replay(zkClient)
|
||||
|
|
|
@ -76,7 +76,7 @@ class PartitionTest {
|
|||
val brokerProps = TestUtils.createBrokerConfig(brokerId, TestUtils.MockZkConnect)
|
||||
brokerProps.put(KafkaConfig.LogDirsProp, Seq(logDir1, logDir2).map(_.getAbsolutePath).mkString(","))
|
||||
val brokerConfig = KafkaConfig.fromProps(brokerProps)
|
||||
val kafkaZkClient = EasyMock.createMock(classOf[KafkaZkClient])
|
||||
val kafkaZkClient: KafkaZkClient = EasyMock.createMock(classOf[KafkaZkClient])
|
||||
replicaManager = new ReplicaManager(
|
||||
config = brokerConfig, metrics, time, zkClient = kafkaZkClient, new MockScheduler(time),
|
||||
logManager, new AtomicBoolean(false), QuotaFactory.instantiate(brokerConfig, metrics, time, ""),
|
||||
|
@ -386,8 +386,8 @@ class PartitionTest {
|
|||
isLeader: Boolean,
|
||||
log: Log = logManager.getOrCreateLog(topicPartition, logConfig)): Partition = {
|
||||
val replica = new Replica(brokerId, topicPartition, time, log = Some(log))
|
||||
val replicaManager = EasyMock.mock(classOf[ReplicaManager])
|
||||
val zkClient = EasyMock.mock(classOf[KafkaZkClient])
|
||||
val replicaManager: ReplicaManager = EasyMock.mock(classOf[ReplicaManager])
|
||||
val zkClient: KafkaZkClient = EasyMock.mock(classOf[KafkaZkClient])
|
||||
|
||||
val partition = new Partition(topicPartition,
|
||||
isOffline = false,
|
||||
|
@ -481,8 +481,8 @@ class PartitionTest {
|
|||
def testListOffsetIsolationLevels(): Unit = {
|
||||
val log = logManager.getOrCreateLog(topicPartition, logConfig)
|
||||
val replica = new Replica(brokerId, topicPartition, time, log = Some(log))
|
||||
val replicaManager = EasyMock.mock(classOf[ReplicaManager])
|
||||
val zkClient = EasyMock.mock(classOf[KafkaZkClient])
|
||||
val replicaManager: ReplicaManager = EasyMock.mock(classOf[ReplicaManager])
|
||||
val zkClient: KafkaZkClient = EasyMock.mock(classOf[KafkaZkClient])
|
||||
|
||||
val partition = new Partition(topicPartition,
|
||||
isOffline = false,
|
||||
|
|
|
@ -22,7 +22,7 @@ object ControllerTestUtils {
|
|||
|
||||
/** Since ControllerEvent is sealed, return a subclass of ControllerEvent created with EasyMock */
|
||||
def createMockControllerEvent(controllerState: ControllerState, process: () => Unit): ControllerEvent = {
|
||||
val mockEvent = EasyMock.createNiceMock(classOf[ControllerEvent])
|
||||
val mockEvent: ControllerEvent = EasyMock.createNiceMock(classOf[ControllerEvent])
|
||||
EasyMock.expect(mockEvent.state).andReturn(controllerState)
|
||||
EasyMock.expect(mockEvent.process()).andAnswer(new IAnswer[Unit]() {
|
||||
def answer(): Unit = {
|
||||
|
|
|
@ -433,7 +433,7 @@ class PartitionStateMachineTest extends JUnitSuite {
|
|||
mockZkClient, partitionState, mockControllerBrokerRequestBatch)
|
||||
|
||||
def createMockController() = {
|
||||
val mockController = EasyMock.createMock(classOf[KafkaController])
|
||||
val mockController: KafkaController = EasyMock.createMock(classOf[KafkaController])
|
||||
EasyMock.expect(mockController.controllerContext).andReturn(controllerContext).anyTimes()
|
||||
EasyMock.expect(mockController.config).andReturn(customConfig).anyTimes()
|
||||
EasyMock.expect(mockController.partitionStateMachine).andReturn(partitionStateMachine).anyTimes()
|
||||
|
@ -444,7 +444,7 @@ class PartitionStateMachineTest extends JUnitSuite {
|
|||
}
|
||||
|
||||
val mockController = createMockController()
|
||||
val mockEventManager = EasyMock.createMock(classOf[ControllerEventManager])
|
||||
val mockEventManager: ControllerEventManager = EasyMock.createMock(classOf[ControllerEventManager])
|
||||
EasyMock.replay(mockController, replicaStateMachine, mockEventManager)
|
||||
|
||||
val topicDeletionManager = new TopicDeletionManager(mockController, mockEventManager, mockZkClient)
|
||||
|
|
|
@ -928,7 +928,7 @@ class GroupCoordinatorTest extends JUnitSuite {
|
|||
assertEquals(Empty.toString, summary.state)
|
||||
|
||||
val groupTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId)
|
||||
val partition = EasyMock.niceMock(classOf[Partition])
|
||||
val partition: Partition = EasyMock.niceMock(classOf[Partition])
|
||||
|
||||
EasyMock.reset(replicaManager)
|
||||
EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andStubReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
|
||||
|
@ -1425,7 +1425,7 @@ class GroupCoordinatorTest extends JUnitSuite {
|
|||
assertEquals(Errors.NONE, leaveGroupResult)
|
||||
|
||||
val groupTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId)
|
||||
val partition = EasyMock.niceMock(classOf[Partition])
|
||||
val partition: Partition = EasyMock.niceMock(classOf[Partition])
|
||||
|
||||
EasyMock.reset(replicaManager)
|
||||
EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andStubReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
|
||||
|
@ -1466,7 +1466,7 @@ class GroupCoordinatorTest extends JUnitSuite {
|
|||
assertEquals(Errors.NONE, leaveGroupResult)
|
||||
|
||||
val groupTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId)
|
||||
val partition = EasyMock.niceMock(classOf[Partition])
|
||||
val partition: Partition = EasyMock.niceMock(classOf[Partition])
|
||||
|
||||
EasyMock.reset(replicaManager)
|
||||
EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andStubReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
|
||||
|
|
|
@ -729,7 +729,7 @@ class GroupMetadataManagerTest {
|
|||
val tp2 = new TopicPartition("bar", 0)
|
||||
val tp3 = new TopicPartition("xxx", 0)
|
||||
|
||||
val logMock = EasyMock.mock(classOf[Log])
|
||||
val logMock: Log = EasyMock.mock(classOf[Log])
|
||||
EasyMock.expect(replicaManager.getLog(groupTopicPartition)).andStubReturn(Some(logMock))
|
||||
|
||||
val segment1MemberId = "a"
|
||||
|
@ -1785,19 +1785,19 @@ class GroupMetadataManagerTest {
|
|||
offsetCommitRecords ++ Seq(groupMetadataRecord): _*)
|
||||
|
||||
// Prepend empty control batch to valid records
|
||||
val mockBatch = EasyMock.createMock(classOf[MutableRecordBatch])
|
||||
val mockBatch: MutableRecordBatch = EasyMock.createMock(classOf[MutableRecordBatch])
|
||||
EasyMock.expect(mockBatch.iterator).andReturn(Collections.emptyIterator[Record])
|
||||
EasyMock.expect(mockBatch.isControlBatch).andReturn(true)
|
||||
EasyMock.expect(mockBatch.isTransactional).andReturn(true)
|
||||
EasyMock.expect(mockBatch.nextOffset).andReturn(16L)
|
||||
EasyMock.replay(mockBatch)
|
||||
val mockRecords = EasyMock.createMock(classOf[MemoryRecords])
|
||||
val mockRecords: MemoryRecords = EasyMock.createMock(classOf[MemoryRecords])
|
||||
EasyMock.expect(mockRecords.batches).andReturn((Iterable[MutableRecordBatch](mockBatch) ++ records.batches.asScala).asJava).anyTimes()
|
||||
EasyMock.expect(mockRecords.records).andReturn(records.records()).anyTimes()
|
||||
EasyMock.expect(mockRecords.sizeInBytes()).andReturn(DefaultRecordBatch.RECORD_BATCH_OVERHEAD + records.sizeInBytes()).anyTimes()
|
||||
EasyMock.replay(mockRecords)
|
||||
|
||||
val logMock = EasyMock.mock(classOf[Log])
|
||||
val logMock: Log = EasyMock.mock(classOf[Log])
|
||||
EasyMock.expect(logMock.logStartOffset).andReturn(startOffset).anyTimes()
|
||||
EasyMock.expect(logMock.read(EasyMock.eq(startOffset),
|
||||
maxLength = EasyMock.anyInt(),
|
||||
|
@ -1886,7 +1886,7 @@ class GroupMetadataManagerTest {
|
|||
private def expectGroupMetadataLoad(groupMetadataTopicPartition: TopicPartition,
|
||||
startOffset: Long,
|
||||
records: MemoryRecords): Unit = {
|
||||
val logMock = EasyMock.mock(classOf[Log])
|
||||
val logMock: Log = EasyMock.mock(classOf[Log])
|
||||
EasyMock.expect(replicaManager.getLog(groupMetadataTopicPartition)).andStubReturn(Some(logMock))
|
||||
val endOffset = expectGroupMetadataLoad(logMock, startOffset, records)
|
||||
EasyMock.expect(replicaManager.getLogEndOffset(groupMetadataTopicPartition)).andStubReturn(Some(endOffset))
|
||||
|
@ -1902,7 +1902,7 @@ class GroupMetadataManagerTest {
|
|||
startOffset: Long,
|
||||
records: MemoryRecords): Long = {
|
||||
val endOffset = startOffset + records.records.asScala.size
|
||||
val fileRecordsMock = EasyMock.mock(classOf[FileRecords])
|
||||
val fileRecordsMock: FileRecords = EasyMock.mock(classOf[FileRecords])
|
||||
|
||||
EasyMock.expect(logMock.logStartOffset).andStubReturn(startOffset)
|
||||
EasyMock.expect(logMock.read(EasyMock.eq(startOffset),
|
||||
|
|
|
@ -24,7 +24,7 @@ import org.junit.Assert._
|
|||
|
||||
class ProducerIdManagerTest {
|
||||
|
||||
private val zkClient = EasyMock.createNiceMock(classOf[KafkaZkClient])
|
||||
private val zkClient: KafkaZkClient = EasyMock.createNiceMock(classOf[KafkaZkClient])
|
||||
|
||||
@After
|
||||
def tearDown(): Unit = {
|
||||
|
|
|
@ -81,13 +81,13 @@ class TransactionCoordinatorConcurrencyTest extends AbstractCoordinatorConcurren
|
|||
new MockTimer,
|
||||
reaperEnabled = false)
|
||||
val brokerNode = new Node(0, "host", 10)
|
||||
val metadataCache = EasyMock.createNiceMock(classOf[MetadataCache])
|
||||
val metadataCache: MetadataCache = EasyMock.createNiceMock(classOf[MetadataCache])
|
||||
EasyMock.expect(metadataCache.getPartitionLeaderEndpoint(
|
||||
EasyMock.anyString(),
|
||||
EasyMock.anyInt(),
|
||||
EasyMock.anyObject())
|
||||
).andReturn(Some(brokerNode)).anyTimes()
|
||||
val networkClient = EasyMock.createNiceMock(classOf[NetworkClient])
|
||||
val networkClient: NetworkClient = EasyMock.createNiceMock(classOf[NetworkClient])
|
||||
txnMarkerChannelManager = new TransactionMarkerChannelManager(
|
||||
KafkaConfig.fromProps(serverProps),
|
||||
metadataCache,
|
||||
|
@ -246,8 +246,8 @@ class TransactionCoordinatorConcurrencyTest extends AbstractCoordinatorConcurren
|
|||
|
||||
private def prepareTxnLog(partitionId: Int): Unit = {
|
||||
|
||||
val logMock = EasyMock.mock(classOf[Log])
|
||||
val fileRecordsMock = EasyMock.mock(classOf[FileRecords])
|
||||
val logMock: Log = EasyMock.mock(classOf[Log])
|
||||
val fileRecordsMock: FileRecords = EasyMock.mock(classOf[FileRecords])
|
||||
|
||||
val topicPartition = new TopicPartition(TRANSACTION_STATE_TOPIC_NAME, partitionId)
|
||||
val startOffset = replicaManager.getLogEndOffset(topicPartition).getOrElse(20L)
|
||||
|
|
|
@ -36,9 +36,9 @@ import scala.collection.JavaConverters._
|
|||
import scala.collection.mutable
|
||||
|
||||
class TransactionMarkerChannelManagerTest {
|
||||
private val metadataCache = EasyMock.createNiceMock(classOf[MetadataCache])
|
||||
private val networkClient = EasyMock.createNiceMock(classOf[NetworkClient])
|
||||
private val txnStateManager = EasyMock.createNiceMock(classOf[TransactionStateManager])
|
||||
private val metadataCache: MetadataCache = EasyMock.createNiceMock(classOf[MetadataCache])
|
||||
private val networkClient: NetworkClient = EasyMock.createNiceMock(classOf[NetworkClient])
|
||||
private val txnStateManager: TransactionStateManager = EasyMock.createNiceMock(classOf[TransactionStateManager])
|
||||
|
||||
private val partition1 = new TopicPartition("topic1", 0)
|
||||
private val partition2 = new TopicPartition("topic1", 1)
|
||||
|
|
|
@ -47,9 +47,10 @@ class TransactionMarkerRequestCompletionHandlerTest {
|
|||
private val txnMetadata = new TransactionMetadata(transactionalId, producerId, producerEpoch, txnTimeoutMs,
|
||||
PrepareCommit, mutable.Set[TopicPartition](topicPartition), 0L, 0L)
|
||||
|
||||
private val markerChannelManager = EasyMock.createNiceMock(classOf[TransactionMarkerChannelManager])
|
||||
private val markerChannelManager: TransactionMarkerChannelManager =
|
||||
EasyMock.createNiceMock(classOf[TransactionMarkerChannelManager])
|
||||
|
||||
private val txnStateManager = EasyMock.createNiceMock(classOf[TransactionStateManager])
|
||||
private val txnStateManager: TransactionStateManager = EasyMock.createNiceMock(classOf[TransactionStateManager])
|
||||
|
||||
private val handler = new TransactionMarkerRequestCompletionHandler(brokerId, txnStateManager, markerChannelManager, txnIdAndMarkers)
|
||||
|
||||
|
|
|
@ -573,8 +573,8 @@ class TransactionStateManagerTest {
|
|||
records: MemoryRecords): Unit = {
|
||||
EasyMock.reset(replicaManager)
|
||||
|
||||
val logMock = EasyMock.mock(classOf[Log])
|
||||
val fileRecordsMock = EasyMock.mock(classOf[FileRecords])
|
||||
val logMock: Log = EasyMock.mock(classOf[Log])
|
||||
val fileRecordsMock: FileRecords = EasyMock.mock(classOf[FileRecords])
|
||||
|
||||
val endOffset = startOffset + records.records.asScala.size
|
||||
|
||||
|
|
|
@ -327,7 +327,7 @@ class LogTest {
|
|||
@Test
|
||||
def testSizeForLargeLogs(): Unit = {
|
||||
val largeSize = Int.MaxValue.toLong * 2
|
||||
val logSegment = EasyMock.createMock(classOf[LogSegment])
|
||||
val logSegment: LogSegment = EasyMock.createMock(classOf[LogSegment])
|
||||
|
||||
EasyMock.expect(logSegment.size).andReturn(Int.MaxValue).anyTimes
|
||||
EasyMock.replay(logSegment)
|
||||
|
@ -349,7 +349,7 @@ class LogTest {
|
|||
|
||||
@Test
|
||||
def testSkipLoadingIfEmptyProducerStateBeforeTruncation(): Unit = {
|
||||
val stateManager = EasyMock.mock(classOf[ProducerStateManager])
|
||||
val stateManager: ProducerStateManager = EasyMock.mock(classOf[ProducerStateManager])
|
||||
|
||||
// Load the log
|
||||
EasyMock.expect(stateManager.latestSnapshotOffset).andReturn(None)
|
||||
|
@ -428,7 +428,7 @@ class LogTest {
|
|||
|
||||
@Test
|
||||
def testSkipTruncateAndReloadIfOldMessageFormatAndNoCleanShutdown(): Unit = {
|
||||
val stateManager = EasyMock.mock(classOf[ProducerStateManager])
|
||||
val stateManager: ProducerStateManager = EasyMock.mock(classOf[ProducerStateManager])
|
||||
|
||||
stateManager.updateMapEndOffset(0L)
|
||||
EasyMock.expectLastCall().anyTimes()
|
||||
|
@ -465,7 +465,7 @@ class LogTest {
|
|||
|
||||
@Test
|
||||
def testSkipTruncateAndReloadIfOldMessageFormatAndCleanShutdown(): Unit = {
|
||||
val stateManager = EasyMock.mock(classOf[ProducerStateManager])
|
||||
val stateManager: ProducerStateManager = EasyMock.mock(classOf[ProducerStateManager])
|
||||
|
||||
stateManager.updateMapEndOffset(0L)
|
||||
EasyMock.expectLastCall().anyTimes()
|
||||
|
@ -505,7 +505,7 @@ class LogTest {
|
|||
|
||||
@Test
|
||||
def testSkipTruncateAndReloadIfNewMessageFormatAndCleanShutdown(): Unit = {
|
||||
val stateManager = EasyMock.mock(classOf[ProducerStateManager])
|
||||
val stateManager: ProducerStateManager = EasyMock.mock(classOf[ProducerStateManager])
|
||||
|
||||
EasyMock.expect(stateManager.latestSnapshotOffset).andReturn(None)
|
||||
|
||||
|
|
|
@ -754,7 +754,7 @@ class ProducerStateManagerTest extends JUnitSuite {
|
|||
val producerEpoch = 145.toShort
|
||||
val baseOffset = 15
|
||||
|
||||
val batch = EasyMock.createMock(classOf[RecordBatch])
|
||||
val batch: RecordBatch = EasyMock.createMock(classOf[RecordBatch])
|
||||
EasyMock.expect(batch.isControlBatch).andReturn(true).once
|
||||
EasyMock.expect(batch.iterator).andReturn(Collections.emptyIterator[Record]).once
|
||||
EasyMock.replay(batch)
|
||||
|
|
|
@ -26,7 +26,7 @@ class AbstractFetcherManagerTest {
|
|||
|
||||
@Test
|
||||
def testAddAndRemovePartition(): Unit = {
|
||||
val fetcher = EasyMock.mock(classOf[AbstractFetcherThread])
|
||||
val fetcher: AbstractFetcherThread = EasyMock.mock(classOf[AbstractFetcherThread])
|
||||
val fetcherManager = new AbstractFetcherManager[AbstractFetcherThread]("fetcher-manager", "fetcher-manager", 2) {
|
||||
override def createFetcherThread(fetcherId: Int, sourceBroker: BrokerEndPoint): AbstractFetcherThread = {
|
||||
fetcher
|
||||
|
|
|
@ -59,7 +59,7 @@ class ClientQuotaManagerTest {
|
|||
|
||||
val request = builder.build()
|
||||
val buffer = request.serialize(new RequestHeader(builder.apiKey, request.version, "", 0))
|
||||
val requestChannelMetrics = EasyMock.createNiceMock(classOf[RequestChannel.Metrics])
|
||||
val requestChannelMetrics: RequestChannel.Metrics = EasyMock.createNiceMock(classOf[RequestChannel.Metrics])
|
||||
|
||||
// read the header from the buffer first so that the body can be read next from the Request constructor
|
||||
val header = RequestHeader.parse(buffer)
|
||||
|
|
|
@ -21,6 +21,7 @@ import java.util
|
|||
import java.util.Properties
|
||||
|
||||
import kafka.utils.TestUtils
|
||||
import kafka.zk.KafkaZkClient
|
||||
import org.apache.kafka.common.Reconfigurable
|
||||
import org.apache.kafka.common.config.types.Password
|
||||
import org.apache.kafka.common.config.{ConfigException, SslConfigs}
|
||||
|
@ -303,7 +304,7 @@ class DynamicBrokerConfigTest extends JUnitSuite {
|
|||
def testDynamicListenerConfig(): Unit = {
|
||||
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 9092)
|
||||
val oldConfig = KafkaConfig.fromProps(props)
|
||||
val kafkaServer = EasyMock.createMock(classOf[kafka.server.KafkaServer])
|
||||
val kafkaServer: KafkaServer = EasyMock.createMock(classOf[kafka.server.KafkaServer])
|
||||
EasyMock.expect(kafkaServer.config).andReturn(oldConfig).anyTimes()
|
||||
EasyMock.replay(kafkaServer)
|
||||
|
||||
|
@ -328,7 +329,7 @@ class DynamicBrokerConfigTest extends JUnitSuite {
|
|||
|
||||
@Test
|
||||
def testDynamicConfigInitializationWithoutConfigsInZK(): Unit = {
|
||||
val zkClient = EasyMock.createMock(classOf[kafka.zk.KafkaZkClient])
|
||||
val zkClient: KafkaZkClient = EasyMock.createMock(classOf[KafkaZkClient])
|
||||
EasyMock.expect(zkClient.getEntityConfigs(EasyMock.anyString(), EasyMock.anyString())).andReturn(new java.util.Properties()).anyTimes()
|
||||
EasyMock.replay(zkClient)
|
||||
|
||||
|
@ -362,4 +363,4 @@ class TestDynamicThreadPool() extends BrokerReconfigurable {
|
|||
assertEquals(10, newConfig.numIoThreads)
|
||||
assertEquals(100, newConfig.backgroundThreads)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -210,7 +210,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
|
|||
// Create a mock ConfigHandler to record config changes it is asked to process
|
||||
val entityArgument = EasyMock.newCapture[String]
|
||||
val propertiesArgument = EasyMock.newCapture[Properties]
|
||||
val handler = EasyMock.createNiceMock(classOf[ConfigHandler])
|
||||
val handler: ConfigHandler = EasyMock.createNiceMock(classOf[ConfigHandler])
|
||||
handler.processConfigChanges(
|
||||
EasyMock.and(EasyMock.capture(entityArgument), EasyMock.isA(classOf[String])),
|
||||
EasyMock.and(EasyMock.capture(propertiesArgument), EasyMock.isA(classOf[Properties])))
|
||||
|
|
|
@ -35,7 +35,7 @@ class HighwatermarkPersistenceTest {
|
|||
|
||||
val configs = TestUtils.createBrokerConfigs(2, TestUtils.MockZkConnect).map(KafkaConfig.fromProps)
|
||||
val topic = "foo"
|
||||
val zkClient = EasyMock.createMock(classOf[KafkaZkClient])
|
||||
val zkClient: KafkaZkClient = EasyMock.createMock(classOf[KafkaZkClient])
|
||||
val logManagers = configs map { config =>
|
||||
TestUtils.createLogManager(
|
||||
logDirs = config.logDirs.map(new File(_)),
|
||||
|
|
|
@ -21,7 +21,7 @@ import java.util.Properties
|
|||
import java.util.concurrent.atomic.AtomicBoolean
|
||||
|
||||
import kafka.cluster.{Partition, Replica}
|
||||
import kafka.log.Log
|
||||
import kafka.log.{Log, LogManager}
|
||||
import kafka.server.epoch.LeaderEpochFileCache
|
||||
import kafka.utils._
|
||||
import org.apache.kafka.common.TopicPartition
|
||||
|
@ -55,7 +55,7 @@ class IsrExpirationTest {
|
|||
|
||||
@Before
|
||||
def setUp() {
|
||||
val logManager = EasyMock.createMock(classOf[kafka.log.LogManager])
|
||||
val logManager: LogManager = EasyMock.createMock(classOf[LogManager])
|
||||
EasyMock.expect(logManager.liveLogDirs).andReturn(Array.empty[File]).anyTimes()
|
||||
EasyMock.replay(logManager)
|
||||
|
||||
|
@ -252,8 +252,8 @@ class IsrExpirationTest {
|
|||
}
|
||||
|
||||
private def logMock: Log = {
|
||||
val log = EasyMock.createMock(classOf[kafka.log.Log])
|
||||
val cache = EasyMock.createNiceMock(classOf[LeaderEpochFileCache])
|
||||
val log: Log = EasyMock.createMock(classOf[Log])
|
||||
val cache: LeaderEpochFileCache = EasyMock.createNiceMock(classOf[LeaderEpochFileCache])
|
||||
EasyMock.expect(log.dir).andReturn(TestUtils.tempDir()).anyTimes()
|
||||
EasyMock.expect(log.leaderEpochCache).andReturn(cache).anyTimes()
|
||||
EasyMock.expect(log.onHighWatermarkIncremented(0L))
|
||||
|
|
|
@ -25,7 +25,6 @@ import kafka.api.{ApiVersion, KAFKA_0_10_2_IV0}
|
|||
import kafka.controller.KafkaController
|
||||
import kafka.coordinator.group.GroupCoordinator
|
||||
import kafka.coordinator.transaction.TransactionCoordinator
|
||||
import kafka.log.TimestampOffset
|
||||
import kafka.network.RequestChannel
|
||||
import kafka.network.RequestChannel.SendResponse
|
||||
import kafka.security.auth.Authorizer
|
||||
|
@ -55,24 +54,24 @@ import scala.collection.Map
|
|||
|
||||
class KafkaApisTest {
|
||||
|
||||
private val requestChannel = EasyMock.createNiceMock(classOf[RequestChannel])
|
||||
private val requestChannelMetrics = EasyMock.createNiceMock(classOf[RequestChannel.Metrics])
|
||||
private val replicaManager = EasyMock.createNiceMock(classOf[ReplicaManager])
|
||||
private val groupCoordinator = EasyMock.createNiceMock(classOf[GroupCoordinator])
|
||||
private val adminManager = EasyMock.createNiceMock(classOf[AdminManager])
|
||||
private val txnCoordinator = EasyMock.createNiceMock(classOf[TransactionCoordinator])
|
||||
private val controller = EasyMock.createNiceMock(classOf[KafkaController])
|
||||
private val zkClient = EasyMock.createNiceMock(classOf[KafkaZkClient])
|
||||
private val requestChannel: RequestChannel = EasyMock.createNiceMock(classOf[RequestChannel])
|
||||
private val requestChannelMetrics: RequestChannel.Metrics = EasyMock.createNiceMock(classOf[RequestChannel.Metrics])
|
||||
private val replicaManager: ReplicaManager = EasyMock.createNiceMock(classOf[ReplicaManager])
|
||||
private val groupCoordinator: GroupCoordinator = EasyMock.createNiceMock(classOf[GroupCoordinator])
|
||||
private val adminManager: AdminManager = EasyMock.createNiceMock(classOf[AdminManager])
|
||||
private val txnCoordinator: TransactionCoordinator = EasyMock.createNiceMock(classOf[TransactionCoordinator])
|
||||
private val controller: KafkaController = EasyMock.createNiceMock(classOf[KafkaController])
|
||||
private val zkClient: KafkaZkClient = EasyMock.createNiceMock(classOf[KafkaZkClient])
|
||||
private val metrics = new Metrics()
|
||||
private val brokerId = 1
|
||||
private val metadataCache = new MetadataCache(brokerId)
|
||||
private val authorizer: Option[Authorizer] = None
|
||||
private val clientQuotaManager = EasyMock.createNiceMock(classOf[ClientQuotaManager])
|
||||
private val clientRequestQuotaManager = EasyMock.createNiceMock(classOf[ClientRequestQuotaManager])
|
||||
private val replicaQuotaManager = EasyMock.createNiceMock(classOf[ReplicationQuotaManager])
|
||||
private val clientQuotaManager: ClientQuotaManager = EasyMock.createNiceMock(classOf[ClientQuotaManager])
|
||||
private val clientRequestQuotaManager: ClientRequestQuotaManager = EasyMock.createNiceMock(classOf[ClientRequestQuotaManager])
|
||||
private val replicaQuotaManager: ReplicationQuotaManager = EasyMock.createNiceMock(classOf[ReplicationQuotaManager])
|
||||
private val quotas = QuotaManagers(clientQuotaManager, clientQuotaManager, clientRequestQuotaManager,
|
||||
replicaQuotaManager, replicaQuotaManager, replicaQuotaManager, None)
|
||||
private val fetchManager = EasyMock.createNiceMock(classOf[FetchManager])
|
||||
private val fetchManager: FetchManager = EasyMock.createNiceMock(classOf[FetchManager])
|
||||
private val brokerTopicStats = new BrokerTopicStats
|
||||
private val clusterId = "clusterId"
|
||||
private val time = new MockTime
|
||||
|
|
|
@ -211,8 +211,8 @@ class LogOffsetTest extends BaseRequestTest {
|
|||
* a race condition) */
|
||||
@Test
|
||||
def testFetchOffsetsBeforeWithChangingSegmentSize() {
|
||||
val log = EasyMock.niceMock(classOf[Log])
|
||||
val logSegment = EasyMock.niceMock(classOf[LogSegment])
|
||||
val log: Log = EasyMock.niceMock(classOf[Log])
|
||||
val logSegment: LogSegment = EasyMock.niceMock(classOf[LogSegment])
|
||||
EasyMock.expect(logSegment.size).andStubAnswer(new IAnswer[Int] {
|
||||
private val value = new AtomicInteger(0)
|
||||
def answer: Int = value.getAndIncrement()
|
||||
|
@ -228,8 +228,8 @@ class LogOffsetTest extends BaseRequestTest {
|
|||
* different (simulating a race condition) */
|
||||
@Test
|
||||
def testFetchOffsetsBeforeWithChangingSegments() {
|
||||
val log = EasyMock.niceMock(classOf[Log])
|
||||
val logSegment = EasyMock.niceMock(classOf[LogSegment])
|
||||
val log: Log = EasyMock.niceMock(classOf[Log])
|
||||
val logSegment: LogSegment = EasyMock.niceMock(classOf[LogSegment])
|
||||
EasyMock.expect(log.logSegments).andStubAnswer {
|
||||
new IAnswer[Iterable[LogSegment]] {
|
||||
def answer = new Iterable[LogSegment] {
|
||||
|
|
|
@ -50,9 +50,9 @@ class ReplicaAlterLogDirsThreadTest {
|
|||
val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234"))
|
||||
|
||||
//Setup all dependencies
|
||||
val partitionT1p0 = createMock(classOf[Partition])
|
||||
val partitionT1p1 = createMock(classOf[Partition])
|
||||
val replicaManager = createMock(classOf[ReplicaManager])
|
||||
val partitionT1p0: Partition = createMock(classOf[Partition])
|
||||
val partitionT1p1: Partition = createMock(classOf[Partition])
|
||||
val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager])
|
||||
|
||||
val leaderEpochT1p0 = 2
|
||||
val leaderEpochT1p1 = 5
|
||||
|
@ -101,8 +101,8 @@ class ReplicaAlterLogDirsThreadTest {
|
|||
val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234"))
|
||||
|
||||
//Setup all dependencies
|
||||
val partitionT1p0 = createMock(classOf[Partition])
|
||||
val replicaManager = createMock(classOf[ReplicaManager])
|
||||
val partitionT1p0: Partition = createMock(classOf[Partition])
|
||||
val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager])
|
||||
|
||||
val leaderEpoch = 2
|
||||
val leo = 13
|
||||
|
@ -149,18 +149,18 @@ class ReplicaAlterLogDirsThreadTest {
|
|||
|
||||
// Setup all the dependencies
|
||||
val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234"))
|
||||
val quotaManager = createNiceMock(classOf[ReplicationQuotaManager])
|
||||
val futureReplicaLeaderEpochsT1p0 = createMock(classOf[LeaderEpochFileCache])
|
||||
val futureReplicaLeaderEpochsT1p1 = createMock(classOf[LeaderEpochFileCache])
|
||||
val logManager = createMock(classOf[LogManager])
|
||||
val replicaT1p0 = createNiceMock(classOf[Replica])
|
||||
val replicaT1p1 = createNiceMock(classOf[Replica])
|
||||
val quotaManager: ReplicationQuotaManager = createNiceMock(classOf[ReplicationQuotaManager])
|
||||
val futureReplicaLeaderEpochsT1p0: LeaderEpochFileCache = createMock(classOf[LeaderEpochFileCache])
|
||||
val futureReplicaLeaderEpochsT1p1: LeaderEpochFileCache = createMock(classOf[LeaderEpochFileCache])
|
||||
val logManager: LogManager = createMock(classOf[LogManager])
|
||||
val replicaT1p0: Replica = createNiceMock(classOf[Replica])
|
||||
val replicaT1p1: Replica = createNiceMock(classOf[Replica])
|
||||
// one future replica mock because our mocking methods return same values for both future replicas
|
||||
val futureReplicaT1p0 = createNiceMock(classOf[Replica])
|
||||
val futureReplicaT1p1 = createNiceMock(classOf[Replica])
|
||||
val partitionT1p0 = createMock(classOf[Partition])
|
||||
val partitionT1p1 = createMock(classOf[Partition])
|
||||
val replicaManager = createMock(classOf[ReplicaManager])
|
||||
val futureReplicaT1p0: Replica = createNiceMock(classOf[Replica])
|
||||
val futureReplicaT1p1: Replica = createNiceMock(classOf[Replica])
|
||||
val partitionT1p0: Partition = createMock(classOf[Partition])
|
||||
val partitionT1p1: Partition = createMock(classOf[Partition])
|
||||
val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager])
|
||||
val responseCallback: Capture[Seq[(TopicPartition, FetchPartitionData)] => Unit] = EasyMock.newCapture()
|
||||
|
||||
val leaderEpoch = 2
|
||||
|
@ -228,14 +228,14 @@ class ReplicaAlterLogDirsThreadTest {
|
|||
|
||||
// Setup all the dependencies
|
||||
val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234"))
|
||||
val quotaManager = createNiceMock(classOf[ReplicationQuotaManager])
|
||||
val futureReplicaLeaderEpochs = createMock(classOf[LeaderEpochFileCache])
|
||||
val logManager = createMock(classOf[LogManager])
|
||||
val replica = createNiceMock(classOf[Replica])
|
||||
val quotaManager: ReplicationQuotaManager = createNiceMock(classOf[ReplicationQuotaManager])
|
||||
val futureReplicaLeaderEpochs: LeaderEpochFileCache = createMock(classOf[LeaderEpochFileCache])
|
||||
val logManager: LogManager = createMock(classOf[LogManager])
|
||||
val replica: Replica = createNiceMock(classOf[Replica])
|
||||
// one future replica mock because our mocking methods return same values for both future replicas
|
||||
val futureReplica = createNiceMock(classOf[Replica])
|
||||
val partition = createMock(classOf[Partition])
|
||||
val replicaManager = createMock(classOf[ReplicaManager])
|
||||
val futureReplica: Replica = createNiceMock(classOf[Replica])
|
||||
val partition: Partition = createMock(classOf[Partition])
|
||||
val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager])
|
||||
val responseCallback: Capture[Seq[(TopicPartition, FetchPartitionData)] => Unit] = EasyMock.newCapture()
|
||||
|
||||
val leaderEpoch = 5
|
||||
|
@ -301,13 +301,13 @@ class ReplicaAlterLogDirsThreadTest {
|
|||
|
||||
// Setup all the dependencies
|
||||
val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234"))
|
||||
val quotaManager = createNiceMock(classOf[ReplicationQuotaManager])
|
||||
val logManager = createMock(classOf[LogManager])
|
||||
val replica = createNiceMock(classOf[Replica])
|
||||
val futureReplica = createNiceMock(classOf[Replica])
|
||||
val futureReplicaLeaderEpochs = createMock(classOf[LeaderEpochFileCache])
|
||||
val partition = createMock(classOf[Partition])
|
||||
val replicaManager = createMock(classOf[ReplicaManager])
|
||||
val quotaManager: ReplicationQuotaManager = createNiceMock(classOf[ReplicationQuotaManager])
|
||||
val logManager: LogManager = createMock(classOf[LogManager])
|
||||
val replica: Replica = createNiceMock(classOf[Replica])
|
||||
val futureReplica: Replica = createNiceMock(classOf[Replica])
|
||||
val futureReplicaLeaderEpochs: LeaderEpochFileCache = createMock(classOf[LeaderEpochFileCache])
|
||||
val partition: Partition = createMock(classOf[Partition])
|
||||
val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager])
|
||||
val responseCallback: Capture[Seq[(TopicPartition, FetchPartitionData)] => Unit] = EasyMock.newCapture()
|
||||
|
||||
val initialFetchOffset = 100
|
||||
|
@ -356,13 +356,13 @@ class ReplicaAlterLogDirsThreadTest {
|
|||
|
||||
// Setup all the dependencies
|
||||
val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234"))
|
||||
val quotaManager = createNiceMock(classOf[kafka.server.ReplicationQuotaManager])
|
||||
val futureReplicaLeaderEpochs = createMock(classOf[LeaderEpochFileCache])
|
||||
val logManager = createMock(classOf[kafka.log.LogManager])
|
||||
val replica = createNiceMock(classOf[Replica])
|
||||
val futureReplica = createNiceMock(classOf[Replica])
|
||||
val partition = createMock(classOf[Partition])
|
||||
val replicaManager = createMock(classOf[kafka.server.ReplicaManager])
|
||||
val quotaManager: ReplicationQuotaManager = createNiceMock(classOf[ReplicationQuotaManager])
|
||||
val futureReplicaLeaderEpochs: LeaderEpochFileCache = createMock(classOf[LeaderEpochFileCache])
|
||||
val logManager: LogManager = createMock(classOf[LogManager])
|
||||
val replica: Replica = createNiceMock(classOf[Replica])
|
||||
val futureReplica: Replica = createNiceMock(classOf[Replica])
|
||||
val partition: Partition = createMock(classOf[Partition])
|
||||
val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager])
|
||||
val responseCallback: Capture[Seq[(TopicPartition, FetchPartitionData)] => Unit] = EasyMock.newCapture()
|
||||
|
||||
val futureReplicaLeaderEpoch = 1
|
||||
|
@ -439,13 +439,13 @@ class ReplicaAlterLogDirsThreadTest {
|
|||
|
||||
//Setup all dependencies
|
||||
val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234"))
|
||||
val quotaManager = createNiceMock(classOf[ReplicationQuotaManager])
|
||||
val futureReplicaLeaderEpochs = createMock(classOf[LeaderEpochFileCache])
|
||||
val logManager = createMock(classOf[LogManager])
|
||||
val replica = createNiceMock(classOf[Replica])
|
||||
val futureReplica = createNiceMock(classOf[Replica])
|
||||
val partition = createMock(classOf[Partition])
|
||||
val replicaManager = createMock(classOf[ReplicaManager])
|
||||
val quotaManager: ReplicationQuotaManager = createNiceMock(classOf[ReplicationQuotaManager])
|
||||
val futureReplicaLeaderEpochs: LeaderEpochFileCache = createMock(classOf[LeaderEpochFileCache])
|
||||
val logManager: LogManager = createMock(classOf[LogManager])
|
||||
val replica: Replica = createNiceMock(classOf[Replica])
|
||||
val futureReplica: Replica = createNiceMock(classOf[Replica])
|
||||
val partition: Partition = createMock(classOf[Partition])
|
||||
val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager])
|
||||
val responseCallback: Capture[Seq[(TopicPartition, FetchPartitionData)] => Unit] = EasyMock.newCapture()
|
||||
|
||||
val leaderEpoch = 5
|
||||
|
@ -494,12 +494,12 @@ class ReplicaAlterLogDirsThreadTest {
|
|||
|
||||
//Setup all dependencies
|
||||
val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234"))
|
||||
val quotaManager = createNiceMock(classOf[ReplicationQuotaManager])
|
||||
val logManager = createMock(classOf[LogManager])
|
||||
val replica = createNiceMock(classOf[Replica])
|
||||
val futureReplica = createNiceMock(classOf[Replica])
|
||||
val partition = createMock(classOf[Partition])
|
||||
val replicaManager = createMock(classOf[ReplicaManager])
|
||||
val quotaManager: ReplicationQuotaManager = createNiceMock(classOf[ReplicationQuotaManager])
|
||||
val logManager: LogManager = createMock(classOf[LogManager])
|
||||
val replica: Replica = createNiceMock(classOf[Replica])
|
||||
val futureReplica: Replica = createNiceMock(classOf[Replica])
|
||||
val partition: Partition = createMock(classOf[Partition])
|
||||
val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager])
|
||||
|
||||
//Stubs
|
||||
expect(futureReplica.logStartOffset).andReturn(123).anyTimes()
|
||||
|
@ -543,12 +543,12 @@ class ReplicaAlterLogDirsThreadTest {
|
|||
|
||||
//Setup all dependencies
|
||||
val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234"))
|
||||
val quotaManager = createNiceMock(classOf[ReplicationQuotaManager])
|
||||
val logManager = createMock(classOf[LogManager])
|
||||
val replica = createNiceMock(classOf[Replica])
|
||||
val futureReplica = createNiceMock(classOf[Replica])
|
||||
val partition = createMock(classOf[Partition])
|
||||
val replicaManager = createMock(classOf[ReplicaManager])
|
||||
val quotaManager: ReplicationQuotaManager = createNiceMock(classOf[ReplicationQuotaManager])
|
||||
val logManager: LogManager = createMock(classOf[LogManager])
|
||||
val replica: Replica = createNiceMock(classOf[Replica])
|
||||
val futureReplica: Replica = createNiceMock(classOf[Replica])
|
||||
val partition: Partition = createMock(classOf[Partition])
|
||||
val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager])
|
||||
|
||||
//Stubs
|
||||
expect(futureReplica.logStartOffset).andReturn(123).anyTimes()
|
||||
|
|
|
@ -80,7 +80,7 @@ class ReplicaFetcherThreadTest {
|
|||
props.put(KafkaConfig.InterBrokerProtocolVersionProp, "0.10.2")
|
||||
props.put(KafkaConfig.LogMessageFormatVersionProp, "0.10.2")
|
||||
val config = KafkaConfig.fromProps(props)
|
||||
val leaderEndpoint = createMock(classOf[BlockingSend])
|
||||
val leaderEndpoint: BlockingSend = createMock(classOf[BlockingSend])
|
||||
expect(leaderEndpoint.sendRequest(anyObject())).andAnswer(new IAnswer[ClientResponse] {
|
||||
override def answer(): ClientResponse = {
|
||||
toFail = true // assert no leader request is sent
|
||||
|
@ -124,7 +124,7 @@ class ReplicaFetcherThreadTest {
|
|||
val props = TestUtils.createBrokerConfig(1, "localhost:1234")
|
||||
props.put(KafkaConfig.InterBrokerProtocolVersionProp, "1.0.0")
|
||||
val config = KafkaConfig.fromProps(props)
|
||||
val leaderEndpoint = createMock(classOf[BlockingSend])
|
||||
val leaderEndpoint: BlockingSend = createMock(classOf[BlockingSend])
|
||||
|
||||
expect(leaderEndpoint.sendRequest(anyObject())).andAnswer(new IAnswer[ClientResponse] {
|
||||
override def answer(): ClientResponse = {
|
||||
|
@ -163,13 +163,13 @@ class ReplicaFetcherThreadTest {
|
|||
val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234"))
|
||||
|
||||
//Setup all dependencies
|
||||
val quota = createNiceMock(classOf[ReplicationQuotaManager])
|
||||
val leaderEpochs = createNiceMock(classOf[LeaderEpochFileCache])
|
||||
val logManager = createMock(classOf[LogManager])
|
||||
val replicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
|
||||
val replica = createNiceMock(classOf[Replica])
|
||||
val partition = createMock(classOf[Partition])
|
||||
val replicaManager = createMock(classOf[ReplicaManager])
|
||||
val quota: ReplicationQuotaManager = createNiceMock(classOf[ReplicationQuotaManager])
|
||||
val leaderEpochs: LeaderEpochFileCache = createNiceMock(classOf[LeaderEpochFileCache])
|
||||
val logManager: LogManager = createMock(classOf[LogManager])
|
||||
val replicaAlterLogDirsManager: ReplicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
|
||||
val replica: Replica = createNiceMock(classOf[Replica])
|
||||
val partition: Partition = createMock(classOf[Partition])
|
||||
val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager])
|
||||
|
||||
val leaderEpoch = 5
|
||||
|
||||
|
@ -262,7 +262,7 @@ class ReplicaFetcherThreadTest {
|
|||
def shouldHandleExceptionFromBlockingSend(): Unit = {
|
||||
val props = TestUtils.createBrokerConfig(1, "localhost:1234")
|
||||
val config = KafkaConfig.fromProps(props)
|
||||
val mockBlockingSend = createMock(classOf[BlockingSend])
|
||||
val mockBlockingSend: BlockingSend = createMock(classOf[BlockingSend])
|
||||
|
||||
expect(mockBlockingSend.sendRequest(anyObject())).andThrow(new NullPointerException).once()
|
||||
replay(mockBlockingSend)
|
||||
|
@ -296,12 +296,12 @@ class ReplicaFetcherThreadTest {
|
|||
val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234"))
|
||||
|
||||
//Setup all dependencies
|
||||
val leaderEpochs = createNiceMock(classOf[LeaderEpochFileCache])
|
||||
val logManager = createMock(classOf[LogManager])
|
||||
val replicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
|
||||
val replica = createNiceMock(classOf[Replica])
|
||||
val partition = createMock(classOf[Partition])
|
||||
val replicaManager = createMock(classOf[ReplicaManager])
|
||||
val leaderEpochs: LeaderEpochFileCache = createNiceMock(classOf[LeaderEpochFileCache])
|
||||
val logManager: LogManager = createMock(classOf[LogManager])
|
||||
val replicaAlterLogDirsManager: ReplicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
|
||||
val replica: Replica = createNiceMock(classOf[Replica])
|
||||
val partition: Partition = createMock(classOf[Partition])
|
||||
val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager])
|
||||
|
||||
val leaderEpoch = 5
|
||||
|
||||
|
@ -356,13 +356,13 @@ class ReplicaFetcherThreadTest {
|
|||
|
||||
// Setup all the dependencies
|
||||
val configs = TestUtils.createBrokerConfigs(1, "localhost:1234").map(KafkaConfig.fromProps)
|
||||
val quota = createNiceMock(classOf[ReplicationQuotaManager])
|
||||
val leaderEpochs = createMock(classOf[LeaderEpochFileCache])
|
||||
val logManager = createMock(classOf[LogManager])
|
||||
val replicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
|
||||
val replica = createNiceMock(classOf[Replica])
|
||||
val partition = createMock(classOf[Partition])
|
||||
val replicaManager = createMock(classOf[ReplicaManager])
|
||||
val quota: ReplicationQuotaManager = createNiceMock(classOf[ReplicationQuotaManager])
|
||||
val leaderEpochs: LeaderEpochFileCache = createMock(classOf[LeaderEpochFileCache])
|
||||
val logManager: LogManager = createMock(classOf[LogManager])
|
||||
val replicaAlterLogDirsManager: ReplicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
|
||||
val replica: Replica = createNiceMock(classOf[Replica])
|
||||
val partition: Partition = createMock(classOf[Partition])
|
||||
val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager])
|
||||
|
||||
val leaderEpoch = 5
|
||||
val initialLEO = 200
|
||||
|
@ -405,13 +405,13 @@ class ReplicaFetcherThreadTest {
|
|||
|
||||
// Setup all the dependencies
|
||||
val configs = TestUtils.createBrokerConfigs(1, "localhost:1234").map(KafkaConfig.fromProps)
|
||||
val quota = createNiceMock(classOf[ReplicationQuotaManager])
|
||||
val leaderEpochs = createMock(classOf[LeaderEpochFileCache])
|
||||
val logManager = createMock(classOf[LogManager])
|
||||
val replicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
|
||||
val replica = createNiceMock(classOf[Replica])
|
||||
val partition = createMock(classOf[Partition])
|
||||
val replicaManager = createMock(classOf[ReplicaManager])
|
||||
val quota: ReplicationQuotaManager = createNiceMock(classOf[ReplicationQuotaManager])
|
||||
val leaderEpochs: LeaderEpochFileCache = createMock(classOf[LeaderEpochFileCache])
|
||||
val logManager: LogManager = createMock(classOf[LogManager])
|
||||
val replicaAlterLogDirsManager: ReplicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
|
||||
val replica: Replica = createNiceMock(classOf[Replica])
|
||||
val partition: Partition = createMock(classOf[Partition])
|
||||
val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager])
|
||||
|
||||
val leaderEpochAtFollower = 5
|
||||
val leaderEpochAtLeader = 4
|
||||
|
@ -459,13 +459,13 @@ class ReplicaFetcherThreadTest {
|
|||
val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234"))
|
||||
|
||||
// Setup all dependencies
|
||||
val quota = createNiceMock(classOf[ReplicationQuotaManager])
|
||||
val leaderEpochs = createNiceMock(classOf[LeaderEpochFileCache])
|
||||
val logManager = createMock(classOf[LogManager])
|
||||
val replicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
|
||||
val replica = createNiceMock(classOf[Replica])
|
||||
val partition = createMock(classOf[Partition])
|
||||
val replicaManager = createMock(classOf[ReplicaManager])
|
||||
val quota: ReplicationQuotaManager = createNiceMock(classOf[ReplicationQuotaManager])
|
||||
val leaderEpochs: LeaderEpochFileCache = createNiceMock(classOf[LeaderEpochFileCache])
|
||||
val logManager: LogManager = createMock(classOf[LogManager])
|
||||
val replicaAlterLogDirsManager: ReplicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
|
||||
val replica: Replica = createNiceMock(classOf[Replica])
|
||||
val partition: Partition = createMock(classOf[Partition])
|
||||
val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager])
|
||||
|
||||
val initialLEO = 200
|
||||
|
||||
|
@ -530,13 +530,13 @@ class ReplicaFetcherThreadTest {
|
|||
val config = KafkaConfig.fromProps(props)
|
||||
|
||||
// Setup all dependencies
|
||||
val quota = createNiceMock(classOf[ReplicationQuotaManager])
|
||||
val leaderEpochs = createNiceMock(classOf[LeaderEpochFileCache])
|
||||
val logManager = createMock(classOf[LogManager])
|
||||
val replicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
|
||||
val replica = createNiceMock(classOf[Replica])
|
||||
val partition = createMock(classOf[Partition])
|
||||
val replicaManager = createMock(classOf[ReplicaManager])
|
||||
val quota: ReplicationQuotaManager = createNiceMock(classOf[ReplicationQuotaManager])
|
||||
val leaderEpochs: LeaderEpochFileCache = createNiceMock(classOf[LeaderEpochFileCache])
|
||||
val logManager: LogManager = createMock(classOf[LogManager])
|
||||
val replicaAlterLogDirsManager: ReplicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
|
||||
val replica: Replica = createNiceMock(classOf[Replica])
|
||||
val partition: Partition = createMock(classOf[Partition])
|
||||
val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager])
|
||||
|
||||
val initialLEO = 200
|
||||
|
||||
|
@ -591,13 +591,13 @@ class ReplicaFetcherThreadTest {
|
|||
|
||||
// Setup all the dependencies
|
||||
val configs = TestUtils.createBrokerConfigs(1, "localhost:1234").map(KafkaConfig.fromProps)
|
||||
val quota = createNiceMock(classOf[ReplicationQuotaManager])
|
||||
val leaderEpochs = createNiceMock(classOf[LeaderEpochFileCache])
|
||||
val logManager = createMock(classOf[LogManager])
|
||||
val replicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
|
||||
val replica = createNiceMock(classOf[Replica])
|
||||
val partition = createMock(classOf[Partition])
|
||||
val replicaManager = createMock(classOf[ReplicaManager])
|
||||
val quota: ReplicationQuotaManager = createNiceMock(classOf[ReplicationQuotaManager])
|
||||
val leaderEpochs: LeaderEpochFileCache = createNiceMock(classOf[LeaderEpochFileCache])
|
||||
val logManager: LogManager = createMock(classOf[LogManager])
|
||||
val replicaAlterLogDirsManager: ReplicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
|
||||
val replica: Replica = createNiceMock(classOf[Replica])
|
||||
val partition: Partition = createMock(classOf[Partition])
|
||||
val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager])
|
||||
|
||||
val initialFetchOffset = 100
|
||||
val initialLeo = 300
|
||||
|
@ -636,13 +636,13 @@ class ReplicaFetcherThreadTest {
|
|||
|
||||
// Setup all the dependencies
|
||||
val configs = TestUtils.createBrokerConfigs(1, "localhost:1234").map(KafkaConfig.fromProps)
|
||||
val quota = createNiceMock(classOf[kafka.server.ReplicationQuotaManager])
|
||||
val leaderEpochs = createNiceMock(classOf[LeaderEpochFileCache])
|
||||
val logManager = createMock(classOf[kafka.log.LogManager])
|
||||
val replicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
|
||||
val replica = createNiceMock(classOf[Replica])
|
||||
val partition = createMock(classOf[Partition])
|
||||
val replicaManager = createMock(classOf[kafka.server.ReplicaManager])
|
||||
val quota: ReplicationQuotaManager = createNiceMock(classOf[ReplicationQuotaManager])
|
||||
val leaderEpochs: LeaderEpochFileCache = createNiceMock(classOf[LeaderEpochFileCache])
|
||||
val logManager: LogManager = createMock(classOf[LogManager])
|
||||
val replicaAlterLogDirsManager: ReplicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
|
||||
val replica: Replica = createNiceMock(classOf[Replica])
|
||||
val partition: Partition = createMock(classOf[Partition])
|
||||
val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager])
|
||||
|
||||
val leaderEpoch = 5
|
||||
val highWaterMark = 100
|
||||
|
@ -694,13 +694,13 @@ class ReplicaFetcherThreadTest {
|
|||
val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234"))
|
||||
|
||||
//Setup all stubs
|
||||
val quota = createNiceMock(classOf[ReplicationQuotaManager])
|
||||
val leaderEpochs = createNiceMock(classOf[LeaderEpochFileCache])
|
||||
val logManager = createNiceMock(classOf[LogManager])
|
||||
val replicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
|
||||
val replica = createNiceMock(classOf[Replica])
|
||||
val partition = createMock(classOf[Partition])
|
||||
val replicaManager = createNiceMock(classOf[ReplicaManager])
|
||||
val quota: ReplicationQuotaManager = createNiceMock(classOf[ReplicationQuotaManager])
|
||||
val leaderEpochs: LeaderEpochFileCache = createNiceMock(classOf[LeaderEpochFileCache])
|
||||
val logManager: LogManager = createNiceMock(classOf[LogManager])
|
||||
val replicaAlterLogDirsManager: ReplicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
|
||||
val replica: Replica = createNiceMock(classOf[Replica])
|
||||
val partition: Partition = createMock(classOf[Partition])
|
||||
val replicaManager: ReplicaManager = createNiceMock(classOf[ReplicaManager])
|
||||
|
||||
val leaderEpoch = 4
|
||||
|
||||
|
@ -747,13 +747,13 @@ class ReplicaFetcherThreadTest {
|
|||
val initialLEO = 100
|
||||
|
||||
//Setup all stubs
|
||||
val quota = createNiceMock(classOf[ReplicationQuotaManager])
|
||||
val leaderEpochs = createNiceMock(classOf[LeaderEpochFileCache])
|
||||
val logManager = createNiceMock(classOf[LogManager])
|
||||
val replicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
|
||||
val replica = createNiceMock(classOf[Replica])
|
||||
val partition = createMock(classOf[Partition])
|
||||
val replicaManager = createNiceMock(classOf[ReplicaManager])
|
||||
val quota: ReplicationQuotaManager = createNiceMock(classOf[ReplicationQuotaManager])
|
||||
val leaderEpochs: LeaderEpochFileCache = createNiceMock(classOf[LeaderEpochFileCache])
|
||||
val logManager: LogManager = createNiceMock(classOf[LogManager])
|
||||
val replicaAlterLogDirsManager: ReplicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
|
||||
val replica: Replica = createNiceMock(classOf[Replica])
|
||||
val partition: Partition = createMock(classOf[Partition])
|
||||
val replicaManager: ReplicaManager = createNiceMock(classOf[ReplicaManager])
|
||||
|
||||
//Stub return values
|
||||
expect(partition.truncateTo(capture(truncateToCapture), anyBoolean())).once
|
||||
|
@ -797,7 +797,7 @@ class ReplicaFetcherThreadTest {
|
|||
def shouldCatchExceptionFromBlockingSendWhenShuttingDownReplicaFetcherThread(): Unit = {
|
||||
val props = TestUtils.createBrokerConfig(1, "localhost:1234")
|
||||
val config = KafkaConfig.fromProps(props)
|
||||
val mockBlockingSend = createMock(classOf[BlockingSend])
|
||||
val mockBlockingSend: BlockingSend = createMock(classOf[BlockingSend])
|
||||
|
||||
expect(mockBlockingSend.close()).andThrow(new IllegalArgumentException()).once()
|
||||
replay(mockBlockingSend)
|
||||
|
|
|
@ -21,7 +21,7 @@ import java.util.{Optional, Properties}
|
|||
import java.util.concurrent.atomic.AtomicBoolean
|
||||
|
||||
import kafka.cluster.{Partition, Replica}
|
||||
import kafka.log.{Log, LogOffsetSnapshot}
|
||||
import kafka.log.{Log, LogManager, LogOffsetSnapshot}
|
||||
import kafka.utils._
|
||||
import kafka.zk.KafkaZkClient
|
||||
import org.apache.kafka.common.TopicPartition
|
||||
|
@ -150,7 +150,7 @@ class ReplicaManagerQuotasTest {
|
|||
// Set up DelayedFetch where there is data to return to a follower replica, either in-sync or out of sync
|
||||
def setupDelayedFetch(isReplicaInSync: Boolean): DelayedFetch = {
|
||||
val endOffsetMetadata = new LogOffsetMetadata(messageOffset = 100L, segmentBaseOffset = 0L, relativePositionInSegment = 500)
|
||||
val partition = EasyMock.createMock(classOf[Partition])
|
||||
val partition: Partition = EasyMock.createMock(classOf[Partition])
|
||||
|
||||
val offsetSnapshot = LogOffsetSnapshot(
|
||||
logStartOffset = 0L,
|
||||
|
@ -160,7 +160,7 @@ class ReplicaManagerQuotasTest {
|
|||
EasyMock.expect(partition.fetchOffsetSnapshot(Optional.empty(), fetchOnlyFromLeader = true))
|
||||
.andReturn(offsetSnapshot)
|
||||
|
||||
val replicaManager = EasyMock.createMock(classOf[ReplicaManager])
|
||||
val replicaManager: ReplicaManager = EasyMock.createMock(classOf[ReplicaManager])
|
||||
EasyMock.expect(replicaManager.getPartitionOrException(
|
||||
EasyMock.anyObject[TopicPartition], EasyMock.anyBoolean()))
|
||||
.andReturn(partition).anyTimes()
|
||||
|
@ -191,11 +191,11 @@ class ReplicaManagerQuotasTest {
|
|||
}
|
||||
|
||||
def setUpMocks(fetchInfo: Seq[(TopicPartition, PartitionData)], record: SimpleRecord = this.record, bothReplicasInSync: Boolean = false) {
|
||||
val zkClient = EasyMock.createMock(classOf[KafkaZkClient])
|
||||
val scheduler = createNiceMock(classOf[KafkaScheduler])
|
||||
val zkClient: KafkaZkClient = EasyMock.createMock(classOf[KafkaZkClient])
|
||||
val scheduler: KafkaScheduler = createNiceMock(classOf[KafkaScheduler])
|
||||
|
||||
//Create log which handles both a regular read and a 0 bytes read
|
||||
val log = createNiceMock(classOf[Log])
|
||||
val log: Log = createNiceMock(classOf[Log])
|
||||
expect(log.logStartOffset).andReturn(0L).anyTimes()
|
||||
expect(log.logEndOffset).andReturn(20L).anyTimes()
|
||||
expect(log.logEndOffsetMetadata).andReturn(new LogOffsetMetadata(20L)).anyTimes()
|
||||
|
@ -224,7 +224,7 @@ class ReplicaManagerQuotasTest {
|
|||
replay(log)
|
||||
|
||||
//Create log manager
|
||||
val logManager = createMock(classOf[kafka.log.LogManager])
|
||||
val logManager: LogManager = createMock(classOf[LogManager])
|
||||
|
||||
//Return the same log for each partition as it doesn't matter
|
||||
expect(logManager.getLog(anyObject(), anyBoolean())).andReturn(Some(log)).anyTimes()
|
||||
|
@ -262,7 +262,7 @@ class ReplicaManagerQuotasTest {
|
|||
}
|
||||
|
||||
def mockQuota(bound: Long): ReplicaQuota = {
|
||||
val quota = createMock(classOf[ReplicaQuota])
|
||||
val quota: ReplicaQuota = createMock(classOf[ReplicaQuota])
|
||||
expect(quota.isThrottled(anyObject())).andReturn(true).anyTimes()
|
||||
quota
|
||||
}
|
||||
|
|
|
@ -148,7 +148,7 @@ class ReplicaManagerTest {
|
|||
val logProps = new Properties()
|
||||
val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)), LogConfig(logProps))
|
||||
val aliveBrokers = Seq(createBroker(0, "host0", 0), createBroker(1, "host1", 1))
|
||||
val metadataCache = EasyMock.createMock(classOf[MetadataCache])
|
||||
val metadataCache: MetadataCache = EasyMock.createMock(classOf[MetadataCache])
|
||||
EasyMock.expect(metadataCache.getAliveBrokers).andReturn(aliveBrokers).anyTimes()
|
||||
EasyMock.replay(metadataCache)
|
||||
val rm = new ReplicaManager(config, metrics, time, kafkaZkClient, new MockScheduler(time), mockLogMgr,
|
||||
|
@ -594,7 +594,7 @@ class ReplicaManagerTest {
|
|||
val mockScheduler = new MockScheduler(time)
|
||||
val mockBrokerTopicStats = new BrokerTopicStats
|
||||
val mockLogDirFailureChannel = new LogDirFailureChannel(config.logDirs.size)
|
||||
val mockLeaderEpochCache = EasyMock.createMock(classOf[LeaderEpochFileCache])
|
||||
val mockLeaderEpochCache: LeaderEpochFileCache = EasyMock.createMock(classOf[LeaderEpochFileCache])
|
||||
EasyMock.expect(mockLeaderEpochCache.latestEpoch).andReturn(leaderEpochFromLeader)
|
||||
EasyMock.expect(mockLeaderEpochCache.endOffsetFor(leaderEpochFromLeader))
|
||||
.andReturn((leaderEpochFromLeader, localLogOffset))
|
||||
|
@ -620,7 +620,7 @@ class ReplicaManagerTest {
|
|||
}
|
||||
|
||||
// Expect to call LogManager.truncateTo exactly once
|
||||
val mockLogMgr = EasyMock.createMock(classOf[LogManager])
|
||||
val mockLogMgr: LogManager = EasyMock.createMock(classOf[LogManager])
|
||||
EasyMock.expect(mockLogMgr.liveLogDirs).andReturn(config.logDirs.map(new File(_).getAbsoluteFile)).anyTimes
|
||||
EasyMock.expect(mockLogMgr.currentDefaultConfig).andReturn(LogConfig())
|
||||
EasyMock.expect(mockLogMgr.getOrCreateLog(new TopicPartition(topic, topicPartition),
|
||||
|
@ -634,7 +634,7 @@ class ReplicaManagerTest {
|
|||
val aliveBrokerIds = Seq[Integer](followerBrokerId, leaderBrokerId)
|
||||
val aliveBrokers = aliveBrokerIds.map(brokerId => createBroker(brokerId, s"host$brokerId", brokerId))
|
||||
|
||||
val metadataCache = EasyMock.createMock(classOf[MetadataCache])
|
||||
val metadataCache: MetadataCache = EasyMock.createMock(classOf[MetadataCache])
|
||||
EasyMock.expect(metadataCache.getAliveBrokers).andReturn(aliveBrokers).anyTimes
|
||||
aliveBrokerIds.foreach { brokerId =>
|
||||
EasyMock.expect(metadataCache.isBrokerAlive(EasyMock.eq(brokerId))).andReturn(true).anyTimes
|
||||
|
@ -793,7 +793,7 @@ class ReplicaManagerTest {
|
|||
val logProps = new Properties()
|
||||
val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)), LogConfig(logProps))
|
||||
val aliveBrokers = aliveBrokerIds.map(brokerId => createBroker(brokerId, s"host$brokerId", brokerId))
|
||||
val metadataCache = EasyMock.createMock(classOf[MetadataCache])
|
||||
val metadataCache: MetadataCache = EasyMock.createMock(classOf[MetadataCache])
|
||||
EasyMock.expect(metadataCache.getAliveBrokers).andReturn(aliveBrokers).anyTimes()
|
||||
aliveBrokerIds.foreach { brokerId =>
|
||||
EasyMock.expect(metadataCache.isBrokerAlive(EasyMock.eq(brokerId))).andReturn(true).anyTimes()
|
||||
|
|
|
@ -105,7 +105,7 @@ class ServerStartupTest extends ZooKeeperTestHarness {
|
|||
@Test
|
||||
def testBrokerStateRunningAfterZK(): Unit = {
|
||||
val brokerId = 0
|
||||
val mockBrokerState = EasyMock.niceMock(classOf[kafka.server.BrokerState])
|
||||
val mockBrokerState: BrokerState = EasyMock.niceMock(classOf[BrokerState])
|
||||
|
||||
class BrokerStateInterceptor() extends BrokerState {
|
||||
override def newState(newState: BrokerStates): Unit = {
|
||||
|
|
|
@ -21,7 +21,7 @@ import java.io.File
|
|||
import kafka.api._
|
||||
import kafka.utils._
|
||||
import kafka.cluster.Replica
|
||||
import kafka.log.Log
|
||||
import kafka.log.{Log, LogManager}
|
||||
import kafka.server.QuotaFactory.UnboundedQuota
|
||||
import kafka.zk.KafkaZkClient
|
||||
import org.apache.kafka.common.metrics.Metrics
|
||||
|
@ -70,15 +70,15 @@ class SimpleFetchTest {
|
|||
@Before
|
||||
def setUp() {
|
||||
// create nice mock since we don't particularly care about zkclient calls
|
||||
val kafkaZkClient = EasyMock.createNiceMock(classOf[KafkaZkClient])
|
||||
val kafkaZkClient: KafkaZkClient = EasyMock.createNiceMock(classOf[KafkaZkClient])
|
||||
EasyMock.replay(kafkaZkClient)
|
||||
|
||||
// create nice mock since we don't particularly care about scheduler calls
|
||||
val scheduler = EasyMock.createNiceMock(classOf[KafkaScheduler])
|
||||
val scheduler: KafkaScheduler = EasyMock.createNiceMock(classOf[KafkaScheduler])
|
||||
EasyMock.replay(scheduler)
|
||||
|
||||
// create the log which takes read with either HW max offset or none max offset
|
||||
val log = EasyMock.createNiceMock(classOf[Log])
|
||||
val log: Log = EasyMock.createNiceMock(classOf[Log])
|
||||
EasyMock.expect(log.logStartOffset).andReturn(0).anyTimes()
|
||||
EasyMock.expect(log.logEndOffset).andReturn(leaderLEO).anyTimes()
|
||||
EasyMock.expect(log.dir).andReturn(TestUtils.tempDir()).anyTimes()
|
||||
|
@ -106,7 +106,7 @@ class SimpleFetchTest {
|
|||
EasyMock.replay(log)
|
||||
|
||||
// create the log manager that is aware of this mock log
|
||||
val logManager = EasyMock.createMock(classOf[kafka.log.LogManager])
|
||||
val logManager: LogManager = EasyMock.createMock(classOf[LogManager])
|
||||
EasyMock.expect(logManager.getLog(topicPartition, false)).andReturn(Some(log)).anyTimes()
|
||||
EasyMock.expect(logManager.liveLogDirs).andReturn(Array.empty[File]).anyTimes()
|
||||
EasyMock.replay(logManager)
|
||||
|
|
|
@ -50,7 +50,7 @@ class ThrottledChannelExpirationTest {
|
|||
|
||||
val request = builder.build()
|
||||
val buffer = request.serialize(new RequestHeader(builder.apiKey, request.version, "", 0))
|
||||
val requestChannelMetrics = EasyMock.createNiceMock(classOf[RequestChannel.Metrics])
|
||||
val requestChannelMetrics: RequestChannel.Metrics = EasyMock.createNiceMock(classOf[RequestChannel.Metrics])
|
||||
|
||||
// read the header from the buffer first so that the body can be read next from the Request constructor
|
||||
val header = RequestHeader.parse(buffer)
|
||||
|
@ -122,4 +122,4 @@ class ThrottledChannelExpirationTest {
|
|||
time.sleep(10)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,6 +21,7 @@ import java.util.Optional
|
|||
import java.util.concurrent.atomic.AtomicBoolean
|
||||
|
||||
import kafka.cluster.Replica
|
||||
import kafka.log.{Log, LogManager}
|
||||
import kafka.server._
|
||||
import kafka.utils.{MockTime, TestUtils}
|
||||
import org.apache.kafka.common.TopicPartition
|
||||
|
@ -46,9 +47,9 @@ class OffsetsForLeaderEpochTest {
|
|||
val request = Map(tp -> new OffsetsForLeaderEpochRequest.PartitionData(Optional.empty(), epochRequested))
|
||||
|
||||
//Stubs
|
||||
val mockLog = createNiceMock(classOf[kafka.log.Log])
|
||||
val mockCache = createNiceMock(classOf[kafka.server.epoch.LeaderEpochFileCache])
|
||||
val logManager = createNiceMock(classOf[kafka.log.LogManager])
|
||||
val mockLog: Log = createNiceMock(classOf[Log])
|
||||
val mockCache: LeaderEpochFileCache = createNiceMock(classOf[LeaderEpochFileCache])
|
||||
val logManager: LogManager = createNiceMock(classOf[LogManager])
|
||||
expect(mockCache.endOffsetFor(epochRequested)).andReturn(epochAndOffset)
|
||||
expect(mockLog.leaderEpochCache).andReturn(mockCache).anyTimes()
|
||||
expect(logManager.liveLogDirs).andReturn(Array.empty[File]).anyTimes()
|
||||
|
@ -72,7 +73,7 @@ class OffsetsForLeaderEpochTest {
|
|||
|
||||
@Test
|
||||
def shouldReturnNoLeaderForPartitionIfThrown(): Unit = {
|
||||
val logManager = createNiceMock(classOf[kafka.log.LogManager])
|
||||
val logManager: LogManager = createNiceMock(classOf[LogManager])
|
||||
expect(logManager.liveLogDirs).andReturn(Array.empty[File]).anyTimes()
|
||||
replay(logManager)
|
||||
|
||||
|
@ -95,7 +96,7 @@ class OffsetsForLeaderEpochTest {
|
|||
|
||||
@Test
|
||||
def shouldReturnUnknownTopicOrPartitionIfThrown(): Unit = {
|
||||
val logManager = createNiceMock(classOf[kafka.log.LogManager])
|
||||
val logManager: LogManager = createNiceMock(classOf[LogManager])
|
||||
expect(logManager.liveLogDirs).andReturn(Array.empty[File]).anyTimes()
|
||||
replay(logManager)
|
||||
|
||||
|
|
|
@ -17,9 +17,10 @@
|
|||
|
||||
package kafka.utils
|
||||
|
||||
import kafka.server.{KafkaConfig, ReplicaFetcherManager}
|
||||
import kafka.server.{KafkaConfig, ReplicaFetcherManager, ReplicaManager}
|
||||
import kafka.api.LeaderAndIsr
|
||||
import kafka.controller.LeaderIsrAndControllerEpoch
|
||||
import kafka.log.{Log, LogManager}
|
||||
import kafka.zk._
|
||||
import org.apache.kafka.common.TopicPartition
|
||||
import org.junit.Assert._
|
||||
|
@ -48,16 +49,16 @@ class ReplicationUtilsTest extends ZooKeeperTestHarness {
|
|||
@Test
|
||||
def testUpdateLeaderAndIsr() {
|
||||
val configs = TestUtils.createBrokerConfigs(1, zkConnect).map(KafkaConfig.fromProps)
|
||||
val log = EasyMock.createMock(classOf[kafka.log.Log])
|
||||
val log: Log = EasyMock.createMock(classOf[Log])
|
||||
EasyMock.expect(log.logEndOffset).andReturn(20).anyTimes()
|
||||
EasyMock.expect(log)
|
||||
EasyMock.replay(log)
|
||||
|
||||
val logManager = EasyMock.createMock(classOf[kafka.log.LogManager])
|
||||
val logManager: LogManager = EasyMock.createMock(classOf[LogManager])
|
||||
EasyMock.expect(logManager.getLog(new TopicPartition(topic, partition), false)).andReturn(Some(log)).anyTimes()
|
||||
EasyMock.replay(logManager)
|
||||
|
||||
val replicaManager = EasyMock.createMock(classOf[kafka.server.ReplicaManager])
|
||||
val replicaManager: ReplicaManager = EasyMock.createMock(classOf[ReplicaManager])
|
||||
EasyMock.expect(replicaManager.config).andReturn(configs.head)
|
||||
EasyMock.expect(replicaManager.logManager).andReturn(logManager)
|
||||
EasyMock.expect(replicaManager.replicaFetcherManager).andReturn(EasyMock.createMock(classOf[ReplicaFetcherManager]))
|
||||
|
|
|
@ -141,7 +141,7 @@ class AdminZkClientTest extends ZooKeeperTestHarness with Logging with RackAware
|
|||
val topic = "test.topic"
|
||||
|
||||
// simulate the ZK interactions that can happen when a topic is concurrently created by multiple processes
|
||||
val zkMock = EasyMock.createNiceMock(classOf[KafkaZkClient])
|
||||
val zkMock: KafkaZkClient = EasyMock.createNiceMock(classOf[KafkaZkClient])
|
||||
EasyMock.expect(zkMock.topicExists(topic)).andReturn(false)
|
||||
EasyMock.expect(zkMock.getAllTopicsInCluster).andReturn(Seq("some.topic", topic, "some.other.topic"))
|
||||
EasyMock.replay(zkMock)
|
||||
|
|
|
@ -52,7 +52,7 @@ versions += [
|
|||
apacheds: "2.0.0-M24",
|
||||
argparse4j: "0.7.0",
|
||||
bcpkix: "1.60",
|
||||
easymock: "3.6",
|
||||
easymock: "4.0.1",
|
||||
jackson: "2.9.7",
|
||||
jetty: "9.4.12.v20180830",
|
||||
jersey: "2.27",
|
||||
|
@ -73,20 +73,19 @@ versions += [
|
|||
kafka_11: "1.1.1",
|
||||
kafka_20: "2.0.0",
|
||||
lz4: "1.5.0",
|
||||
mavenArtifact: "3.5.4",
|
||||
mavenArtifact: "3.6.0",
|
||||
metrics: "2.2.0",
|
||||
mockito: "2.23.0",
|
||||
// PowerMock 1.x doesn't support Java 9, so use PowerMock 2.0.0 beta
|
||||
powermock: "2.0.0-beta.5",
|
||||
powermock: "2.0.0-RC.3",
|
||||
reflections: "0.9.11",
|
||||
rocksDB: "5.14.2",
|
||||
scalatest: "3.0.5",
|
||||
scoverage: "1.3.1",
|
||||
slf4j: "1.7.25",
|
||||
snappy: "1.1.7.2",
|
||||
zkclient: "0.10",
|
||||
zkclient: "0.11",
|
||||
zookeeper: "3.4.13",
|
||||
zstd: "1.3.5-4"
|
||||
zstd: "1.3.7-1"
|
||||
]
|
||||
|
||||
libs += [
|
||||
|
|
Loading…
Reference in New Issue