MINOR: Remove unused `ByteBoundedBlockingQueue` class and `zkSessionTimeout` parameter

Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Jason Gustafson <jason@confluent.io>

Closes #2136 from ijuma/remove-unused-code
This commit is contained in:
Ismael Juma 2016-11-16 09:38:42 -08:00 committed by Jason Gustafson
parent 31203efcb2
commit b902ef985a
5 changed files with 4 additions and 333 deletions

View File

@ -46,8 +46,7 @@ import java.util.concurrent.locks.ReentrantLock
import kafka.server._
import kafka.common.TopicAndPartition
class ControllerContext(val zkUtils: ZkUtils,
val zkSessionTimeout: Int) {
class ControllerContext(val zkUtils: ZkUtils) {
var controllerChannelManager: ControllerChannelManager = null
val controllerLock: ReentrantLock = new ReentrantLock()
var shuttingDownBrokerIds: mutable.Set[Int] = mutable.Set.empty
@ -157,7 +156,7 @@ class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val brokerStat
this.logIdent = "[Controller " + config.brokerId + "]: "
private var isRunning = true
private val stateChangeLogger = KafkaController.stateChangeLogger
val controllerContext = new ControllerContext(zkUtils, config.zkSessionTimeoutMs)
val controllerContext = new ControllerContext(zkUtils)
val partitionStateMachine = new PartitionStateMachine(this)
val replicaStateMachine = new ReplicaStateMachine(this)
private val controllerElector = new ZookeeperLeaderElector(controllerContext, ZkUtils.ControllerPath, onControllerFailover,

View File

@ -1,230 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.utils
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.{TimeUnit, LinkedBlockingQueue}
/**
* A blocking queue that have size limits on both number of elements and number of bytes.
*/
class ByteBoundedBlockingQueue[E] (val queueNumMessageCapacity: Int, val queueByteCapacity: Int, sizeFunction: Option[(E) => Int])
extends Iterable[E] {
private val queue = new LinkedBlockingQueue[E] (queueNumMessageCapacity)
private var currentByteSize = new AtomicInteger()
private val putLock = new Object
/**
* Please refer to [[java.util.concurrent.BlockingQueue#offer]]
* An element can be enqueued provided the current size (in number of elements) is within the configured
* capacity and the current size in bytes of the queue is within the configured byte capacity. i.e., the
* element may be enqueued even if adding it causes the queue's size in bytes to exceed the byte capacity.
* @param e the element to put into the queue
* @param timeout the amount of time to wait before the expire the operation
* @param unit the time unit of timeout parameter, default to millisecond
* @return true if the element is put into queue, false if it is not
* @throws NullPointerException if element is null
* @throws InterruptedException if interrupted during waiting
*/
def offer(e: E, timeout: Long, unit: TimeUnit = TimeUnit.MICROSECONDS): Boolean = {
if (e == null) throw new NullPointerException("Putting null element into queue.")
val startTime = SystemTime.nanoseconds
val expireTime = startTime + unit.toNanos(timeout)
putLock synchronized {
var timeoutNanos = expireTime - SystemTime.nanoseconds
while (currentByteSize.get() >= queueByteCapacity && timeoutNanos > 0) {
// ensure that timeoutNanos > 0, otherwise (per javadoc) we have to wait until the next notify
putLock.wait(timeoutNanos / 1000000, (timeoutNanos % 1000000).toInt)
timeoutNanos = expireTime - SystemTime.nanoseconds
}
// only proceed if queue has capacity and not timeout
timeoutNanos = expireTime - SystemTime.nanoseconds
if (currentByteSize.get() < queueByteCapacity && timeoutNanos > 0) {
val success = queue.offer(e, timeoutNanos, TimeUnit.NANOSECONDS)
// only increase queue byte size if put succeeds
if (success)
currentByteSize.addAndGet(sizeFunction.get(e))
// wake up another thread in case multiple threads are waiting
if (currentByteSize.get() < queueByteCapacity)
putLock.notify()
success
} else {
false
}
}
}
/**
* Please refer to [[java.util.concurrent.BlockingQueue#offer]].
* Put an element to the tail of the queue, return false immediately if queue is full
* @param e The element to put into queue
* @return true on succeed, false on failure
* @throws NullPointerException if element is null
* @throws InterruptedException if interrupted during waiting
*/
def offer(e: E): Boolean = {
if (e == null) throw new NullPointerException("Putting null element into queue.")
putLock synchronized {
if (currentByteSize.get() >= queueByteCapacity) {
false
} else {
val success = queue.offer(e)
if (success)
currentByteSize.addAndGet(sizeFunction.get(e))
// wake up another thread in case multiple threads are waiting
if (currentByteSize.get() < queueByteCapacity)
putLock.notify()
success
}
}
}
/**
* Please refer to [[java.util.concurrent.BlockingQueue#put]].
* Put an element to the tail of the queue, block if queue is full
* @param e The element to put into queue
* @return true on succeed, false on failure
* @throws NullPointerException if element is null
* @throws InterruptedException if interrupted during waiting
*/
def put(e: E): Boolean = {
if (e == null) throw new NullPointerException("Putting null element into queue.")
putLock synchronized {
if (currentByteSize.get() >= queueByteCapacity)
putLock.wait()
val success = queue.offer(e)
if (success)
currentByteSize.addAndGet(sizeFunction.get(e))
// wake up another thread in case multiple threads are waiting
if (currentByteSize.get() < queueByteCapacity)
putLock.notify()
success
}
}
/**
* Please refer to [[java.util.concurrent.BlockingQueue#poll]]
* Get an element from the head of queue. Wait for some time if the queue is empty.
* @param timeout the amount of time to wait if the queue is empty
* @param unit the unit type
* @return the first element in the queue, null if queue is empty
*/
def poll(timeout: Long, unit: TimeUnit): E = {
val e = queue.poll(timeout, unit)
// only wake up waiting threads if the queue size drop under queueByteCapacity
if (e != null &&
currentByteSize.getAndAdd(-sizeFunction.get(e)) > queueByteCapacity &&
currentByteSize.get() < queueByteCapacity)
putLock.synchronized(putLock.notify())
e
}
/**
* Please refer to [[java.util.concurrent.BlockingQueue#poll]]
* Get an element from the head of queue.
* @return the first element in the queue, null if queue is empty
*/
def poll(): E = {
val e = queue.poll()
// only wake up waiting threads if the queue size drop under queueByteCapacity
if (e != null &&
currentByteSize.getAndAdd(-sizeFunction.get(e)) > queueByteCapacity &&
currentByteSize.get() < queueByteCapacity)
putLock.synchronized(putLock.notify())
e
}
/**
* Please refer to [[java.util.concurrent.BlockingQueue#take]]
* Get an element from the head of the queue, block if the queue is empty
* @return the first element in the queue, null if queue is empty
*/
def take(): E = {
val e = queue.take()
// only wake up waiting threads if the queue size drop under queueByteCapacity
if (currentByteSize.getAndAdd(-sizeFunction.get(e)) > queueByteCapacity &&
currentByteSize.get() < queueByteCapacity)
putLock.synchronized(putLock.notify())
e
}
/**
* Iterator for the queue
* @return Iterator for the queue
*/
override def iterator = new Iterator[E] () {
private val iter = queue.iterator()
private var curr: E = null.asInstanceOf[E]
def hasNext: Boolean = iter.hasNext
def next(): E = {
curr = iter.next()
curr
}
def remove() {
if (curr == null)
throw new IllegalStateException("Iterator does not have a current element.")
iter.remove()
if (currentByteSize.addAndGet(-sizeFunction.get(curr)) < queueByteCapacity)
putLock.synchronized(putLock.notify())
}
}
/**
* get the number of elements in the queue
* @return number of elements in the queue
*/
override def size() = queue.size()
/**
* get the current byte size in the queue
* @return current queue size in bytes
*/
def byteSize() = {
val currSize = currentByteSize.get()
// There is a potential race where after an element is put into the queue and before the size is added to
// currentByteSize, it was taken out of the queue and the size was deducted from the currentByteSize,
// in that case, currentByteSize would become negative, in that case, just put the queue size to be 0.
if (currSize > 0) currSize else 0
}
/**
* get the number of unused slots in the queue
* @return the number of unused slots in the queue
*/
def remainingSize = queue.remainingCapacity()
/**
* get the remaining bytes capacity of the queue
* @return the remaining bytes capacity of the queue
*/
def remainingByteSize = math.max(0, queueByteCapacity - currentByteSize.get())
/**
* remove all the items in the queue
*/
def clear() {
putLock synchronized {
queue.clear()
currentByteSize.set(0)
putLock.notify()
}
}
}

View File

@ -39,7 +39,7 @@ class ControlledShutdownLeaderSelectorTest {
val firstLeader = 1
val zkUtils = EasyMock.mock(classOf[ZkUtils])
val controllerContext = new ControllerContext(zkUtils, zkSessionTimeout = 1000)
val controllerContext = new ControllerContext(zkUtils)
controllerContext.liveBrokers = assignment.map(Broker(_, Map.empty, None)).toSet
controllerContext.shuttingDownBrokerIds = mutable.Set(2, 3)
controllerContext.partitionReplicaAssignment = mutable.Map(topicPartition -> assignment)

View File

@ -131,7 +131,7 @@ class LeaderElectionTest extends ZooKeeperTestHarness {
val brokers = servers.map(s => new Broker(s.config.brokerId, "localhost", s.boundPort()))
val nodes = brokers.map(_.getNode(SecurityProtocol.PLAINTEXT))
val controllerContext = new ControllerContext(zkUtils, 6000)
val controllerContext = new ControllerContext(zkUtils)
controllerContext.liveBrokers = brokers.toSet
val metrics = new Metrics
val controllerChannelManager = new ControllerChannelManager(controllerContext, controllerConfig, new SystemTime, metrics)

View File

@ -1,98 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.utils
import java.util.concurrent.TimeUnit
import org.junit.Assert._
import org.junit.Test
class ByteBoundedBlockingQueueTest {
val sizeFunction = (a: String) => a.length
val queue = new ByteBoundedBlockingQueue[String](5, 15, Some(sizeFunction))
@Test
def testByteBoundedBlockingQueue() {
assertEquals(5, queue.remainingSize)
assertEquals(15, queue.remainingByteSize)
//offer a message whose size is smaller than remaining capacity
val m0 = new String("0123456789")
assertEquals(true, queue.offer(m0))
assertEquals(1, queue.size())
assertEquals(10, queue.byteSize())
assertEquals(4, queue.remainingSize)
assertEquals(5, queue.remainingByteSize)
// offer a message where remaining capacity < message size < capacity limit
val m1 = new String("1234567890")
assertEquals(true, queue.offer(m1))
assertEquals(2, queue.size())
assertEquals(20, queue.byteSize())
assertEquals(3, queue.remainingSize)
assertEquals(0, queue.remainingByteSize)
// offer a message using timeout, should fail because no space is left
val m2 = new String("2345678901")
assertEquals(false, queue.offer(m2, 10, TimeUnit.MILLISECONDS))
assertEquals(2, queue.size())
assertEquals(20, queue.byteSize())
assertEquals(3, queue.remainingSize)
assertEquals(0, queue.remainingByteSize)
// take an element out of the queue
assertEquals("0123456789", queue.take())
assertEquals(1, queue.size())
assertEquals(10, queue.byteSize())
assertEquals(4, queue.remainingSize)
assertEquals(5, queue.remainingByteSize)
// add 5 small elements into the queue, first 4 should succeed, the 5th one should fail
// test put()
assertEquals(true, queue.put("a"))
assertEquals(true, queue.offer("b"))
assertEquals(true, queue.offer("c"))
assertEquals(4, queue.size())
assertEquals(13, queue.byteSize())
assertEquals(1, queue.remainingSize)
assertEquals(2, queue.remainingByteSize)
assertEquals(true, queue.offer("d"))
assertEquals(5, queue.size())
assertEquals(14, queue.byteSize())
assertEquals(0, queue.remainingSize)
assertEquals(1, queue.remainingByteSize)
assertEquals(false, queue.offer("e"))
assertEquals(5, queue.size())
assertEquals(14, queue.byteSize())
assertEquals(0, queue.remainingSize)
assertEquals(1, queue.remainingByteSize)
// try take 6 elements out of the queue, the last poll() should fail as there is no element anymore
// test take()
assertEquals("1234567890", queue.poll(10, TimeUnit.MILLISECONDS))
// test poll
assertEquals("a", queue.poll())
assertEquals("b", queue.poll())
assertEquals("c", queue.poll())
assertEquals("d", queue.poll())
assertEquals(null, queue.poll(10, TimeUnit.MILLISECONDS))
}
}