KAFKA-3264; Deprecate the old Scala consumer (KIP-109)

Author: Vahid Hashemian <vahidhashemian@us.ibm.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>

This patch had conflicts when merged, resolved by
Committer: Ismael Juma <ismael@juma.me.uk>

Closes #2328 from vahidhashemian/KAFKA-3264
This commit is contained in:
Vahid Hashemian 2017-06-02 11:28:13 +01:00 committed by Ismael Juma
parent 5d46348619
commit f85c18032b
45 changed files with 187 additions and 70 deletions

View File

@ -352,6 +352,7 @@ object AdminUtils extends Logging with AdminUtilities {
} }
} }
@deprecated("This method has been deprecated and will be removed in a future release.", "0.11.0.0")
def isConsumerGroupActive(zkUtils: ZkUtils, group: String) = { def isConsumerGroupActive(zkUtils: ZkUtils, group: String) = {
zkUtils.getConsumersInGroup(group).nonEmpty zkUtils.getConsumersInGroup(group).nonEmpty
} }
@ -363,6 +364,7 @@ object AdminUtils extends Logging with AdminUtilities {
* @param group Consumer group * @param group Consumer group
* @return whether or not we deleted the consumer group information * @return whether or not we deleted the consumer group information
*/ */
@deprecated("This method has been deprecated and will be removed in a future release.", "0.11.0.0")
def deleteConsumerGroupInZK(zkUtils: ZkUtils, group: String) = { def deleteConsumerGroupInZK(zkUtils: ZkUtils, group: String) = {
if (!isConsumerGroupActive(zkUtils, group)) { if (!isConsumerGroupActive(zkUtils, group)) {
val dir = new ZKGroupDirs(group) val dir = new ZKGroupDirs(group)
@ -381,6 +383,7 @@ object AdminUtils extends Logging with AdminUtilities {
* @param topic Topic of the consumer group information we wish to delete * @param topic Topic of the consumer group information we wish to delete
* @return whether or not we deleted the consumer group information for the given topic * @return whether or not we deleted the consumer group information for the given topic
*/ */
@deprecated("This method has been deprecated and will be removed in a future release.", "0.11.0.0")
def deleteConsumerGroupInfoForTopicInZK(zkUtils: ZkUtils, group: String, topic: String) = { def deleteConsumerGroupInfoForTopicInZK(zkUtils: ZkUtils, group: String, topic: String) = {
val topics = zkUtils.getTopicsByConsumerGroup(group) val topics = zkUtils.getTopicsByConsumerGroup(group)
if (topics == Seq(topic)) { if (topics == Seq(topic)) {
@ -401,6 +404,7 @@ object AdminUtils extends Logging with AdminUtilities {
* @param zkUtils Zookeeper utilities * @param zkUtils Zookeeper utilities
* @param topic Topic of the consumer group information we wish to delete * @param topic Topic of the consumer group information we wish to delete
*/ */
@deprecated("This method has been deprecated and will be removed in a future release.", "0.11.0.0")
def deleteAllConsumerGroupInfoForTopicInZK(zkUtils: ZkUtils, topic: String) { def deleteAllConsumerGroupInfoForTopicInZK(zkUtils: ZkUtils, topic: String) {
val groups = zkUtils.getAllConsumerGroupsForTopic(topic) val groups = zkUtils.getAllConsumerGroupsForTopic(topic)
groups.foreach(group => deleteConsumerGroupInfoForTopicInZK(zkUtils, group, topic)) groups.foreach(group => deleteConsumerGroupInfoForTopicInZK(zkUtils, group, topic))

View File

@ -231,6 +231,7 @@ object ConsumerGroupCommand extends Logging {
def exportOffsetsToReset(assignmentsToReset: Map[TopicPartition, OffsetAndMetadata]): String = throw new UnsupportedOperationException def exportOffsetsToReset(assignmentsToReset: Map[TopicPartition, OffsetAndMetadata]): String = throw new UnsupportedOperationException
} }
@deprecated("This class has been deprecated and will be removed in a future release.", "0.11.0.0")
class ZkConsumerGroupService(val opts: ConsumerGroupCommandOptions) extends ConsumerGroupService { class ZkConsumerGroupService(val opts: ConsumerGroupCommandOptions) extends ConsumerGroupService {
private val zkUtils = { private val zkUtils = {

View File

@ -37,6 +37,7 @@ import scala.util.Random
case class PartitionFetchInfo(offset: Long, fetchSize: Int) case class PartitionFetchInfo(offset: Long, fetchSize: Int)
@deprecated("This object has been deprecated and will be removed in a future release.", "0.11.0.0")
object FetchRequest { object FetchRequest {
private val random = new Random private val random = new Random
@ -90,6 +91,7 @@ object FetchRequest {
} }
@deprecated("This class has been deprecated and will be removed in a future release.", "0.11.0.0")
case class FetchRequest(versionId: Short = FetchRequest.CurrentVersion, case class FetchRequest(versionId: Short = FetchRequest.CurrentVersion,
correlationId: Int = FetchRequest.DefaultCorrelationId, correlationId: Int = FetchRequest.DefaultCorrelationId,
clientId: String = ConsumerConfig.DefaultClientId, clientId: String = ConsumerConfig.DefaultClientId,
@ -227,6 +229,7 @@ case class FetchRequest(versionId: Short = FetchRequest.CurrentVersion,
} }
} }
@deprecated("This class has been deprecated and will be removed in a future release.", "0.11.0.0")
@nonthreadsafe @nonthreadsafe
class FetchRequestBuilder() { class FetchRequestBuilder() {
private val correlationId = new AtomicInteger(0) private val correlationId = new AtomicInteger(0)

View File

@ -34,7 +34,8 @@ import java.io.IOException
/** /**
* Helper functions common to clients (producer, consumer, or admin) * Helper functions common to clients (producer, consumer, or admin)
*/ */
object ClientUtils extends Logging{ @deprecated("This class has been deprecated and will be removed in a future release.", "0.11.0.0")
object ClientUtils extends Logging {
/** /**
* Used by the producer to send a metadata request since it has access to the ProducerConfig * Used by the producer to send a metadata request since it has access to the ProducerConfig

View File

@ -34,6 +34,8 @@ import org.apache.kafka.common.header.internals.RecordHeaders
* this class should be removed (along with BaseProducer) * this class should be removed (along with BaseProducer)
* once we deprecate old consumer * once we deprecate old consumer
*/ */
@deprecated("This trait has been deprecated and will be removed in a future release. " +
"Please use org.apache.kafka.clients.consumer.KafkaConsumer instead.", "0.11.0.0")
trait BaseConsumer { trait BaseConsumer {
def receive(): BaseConsumerRecord def receive(): BaseConsumerRecord
def stop() def stop()
@ -41,6 +43,8 @@ trait BaseConsumer {
def commit() def commit()
} }
@deprecated("This class has been deprecated and will be removed in a future release. " +
"Please use org.apache.kafka.clients.consumer.ConsumerRecord instead.", "0.11.0.0")
case class BaseConsumerRecord(topic: String, case class BaseConsumerRecord(topic: String,
partition: Int, partition: Int,
offset: Long, offset: Long,
@ -50,6 +54,8 @@ case class BaseConsumerRecord(topic: String,
value: Array[Byte], value: Array[Byte],
headers: Headers = new RecordHeaders()) headers: Headers = new RecordHeaders())
@deprecated("This class has been deprecated and will be removed in a future release. " +
"Please use org.apache.kafka.clients.consumer.KafkaConsumer instead.", "0.11.0.0")
class NewShinyConsumer(topic: Option[String], partitionId: Option[Int], offset: Option[Long], whitelist: Option[String], consumerProps: Properties, val timeoutMs: Long = Long.MaxValue) extends BaseConsumer { class NewShinyConsumer(topic: Option[String], partitionId: Option[Int], offset: Option[Long], whitelist: Option[String], consumerProps: Properties, val timeoutMs: Long = Long.MaxValue) extends BaseConsumer {
import org.apache.kafka.clients.consumer.KafkaConsumer import org.apache.kafka.clients.consumer.KafkaConsumer
@ -117,6 +123,8 @@ class NewShinyConsumer(topic: Option[String], partitionId: Option[Int], offset:
} }
} }
@deprecated("This class has been deprecated and will be removed in a future release. " +
"Please use org.apache.kafka.clients.consumer.KafkaConsumer instead.", "0.11.0.0")
class OldConsumer(topicFilter: TopicFilter, consumerProps: Properties) extends BaseConsumer { class OldConsumer(topicFilter: TopicFilter, consumerProps: Properties) extends BaseConsumer {
import kafka.serializer.DefaultDecoder import kafka.serializer.DefaultDecoder

View File

@ -5,7 +5,7 @@
* The ASF licenses this file to You under the Apache License, Version 2.0 * The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with * (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at * the License. You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
@ -23,6 +23,8 @@ import kafka.utils._
import kafka.common.{InvalidConfigException, Config} import kafka.common.{InvalidConfigException, Config}
import java.util.Locale import java.util.Locale
@deprecated("This object has been deprecated and will be removed in a future release. " +
"Please use org.apache.kafka.clients.consumer.ConsumerConfig instead.", "0.11.0.0")
object ConsumerConfig extends Config { object ConsumerConfig extends Config {
val RefreshMetadataBackoffMs = 200 val RefreshMetadataBackoffMs = 200
val SocketTimeout = 30 * 1000 val SocketTimeout = 30 * 1000
@ -99,6 +101,8 @@ object ConsumerConfig extends Config {
} }
} }
@deprecated("This class has been deprecated and will be removed in a future release. " +
"Please use org.apache.kafka.clients.consumer.ConsumerConfig instead.", "0.11.0.0")
class ConsumerConfig private (val props: VerifiableProperties) extends ZKConfig(props) { class ConsumerConfig private (val props: VerifiableProperties) extends ZKConfig(props) {
import ConsumerConfig._ import ConsumerConfig._
@ -116,19 +120,19 @@ class ConsumerConfig private (val props: VerifiableProperties) extends ZKConfig(
/** the socket timeout for network requests. Its value should be at least fetch.wait.max.ms. */ /** the socket timeout for network requests. Its value should be at least fetch.wait.max.ms. */
val socketTimeoutMs = props.getInt("socket.timeout.ms", SocketTimeout) val socketTimeoutMs = props.getInt("socket.timeout.ms", SocketTimeout)
/** the socket receive buffer for network requests */ /** the socket receive buffer for network requests */
val socketReceiveBufferBytes = props.getInt("socket.receive.buffer.bytes", SocketBufferSize) val socketReceiveBufferBytes = props.getInt("socket.receive.buffer.bytes", SocketBufferSize)
/** the number of bytes of messages to attempt to fetch from each partition */ /** the number of bytes of messages to attempt to fetch from each partition */
val fetchMessageMaxBytes = props.getInt("fetch.message.max.bytes", FetchSize) val fetchMessageMaxBytes = props.getInt("fetch.message.max.bytes", FetchSize)
/** the number threads used to fetch data */ /** the number threads used to fetch data */
val numConsumerFetchers = props.getInt("num.consumer.fetchers", NumConsumerFetchers) val numConsumerFetchers = props.getInt("num.consumer.fetchers", NumConsumerFetchers)
/** if true, periodically commit to zookeeper the offset of messages already fetched by the consumer */ /** if true, periodically commit to zookeeper the offset of messages already fetched by the consumer */
val autoCommitEnable = props.getBoolean("auto.commit.enable", AutoCommit) val autoCommitEnable = props.getBoolean("auto.commit.enable", AutoCommit)
/** the frequency in ms that the consumer offsets are committed to zookeeper */ /** the frequency in ms that the consumer offsets are committed to zookeeper */
val autoCommitIntervalMs = props.getInt("auto.commit.interval.ms", AutoCommitInterval) val autoCommitIntervalMs = props.getInt("auto.commit.interval.ms", AutoCommitInterval)
@ -137,10 +141,10 @@ class ConsumerConfig private (val props: VerifiableProperties) extends ZKConfig(
/** max number of retries during rebalance */ /** max number of retries during rebalance */
val rebalanceMaxRetries = props.getInt("rebalance.max.retries", MaxRebalanceRetries) val rebalanceMaxRetries = props.getInt("rebalance.max.retries", MaxRebalanceRetries)
/** the minimum amount of data the server should return for a fetch request. If insufficient data is available the request will block */ /** the minimum amount of data the server should return for a fetch request. If insufficient data is available the request will block */
val fetchMinBytes = props.getInt("fetch.min.bytes", MinFetchBytes) val fetchMinBytes = props.getInt("fetch.min.bytes", MinFetchBytes)
/** the maximum amount of data the server should return for a fetch request */ /** the maximum amount of data the server should return for a fetch request */
val fetchMaxBytes = props.getInt("fetch.max.bytes", MaxFetchBytes) val fetchMaxBytes = props.getInt("fetch.max.bytes", MaxFetchBytes)
@ -148,7 +152,7 @@ class ConsumerConfig private (val props: VerifiableProperties) extends ZKConfig(
val fetchWaitMaxMs = props.getInt("fetch.wait.max.ms", MaxFetchWaitMs) val fetchWaitMaxMs = props.getInt("fetch.wait.max.ms", MaxFetchWaitMs)
require(fetchWaitMaxMs <= socketTimeoutMs, "socket.timeout.ms should always be at least fetch.wait.max.ms" + require(fetchWaitMaxMs <= socketTimeoutMs, "socket.timeout.ms should always be at least fetch.wait.max.ms" +
" to prevent unnecessary socket timeouts") " to prevent unnecessary socket timeouts")
/** backoff time between retries during rebalance */ /** backoff time between retries during rebalance */
val rebalanceBackoffMs = props.getInt("rebalance.backoff.ms", zkSyncTimeMs) val rebalanceBackoffMs = props.getInt("rebalance.backoff.ms", zkSyncTimeMs)
@ -195,7 +199,7 @@ class ConsumerConfig private (val props: VerifiableProperties) extends ZKConfig(
/** Select a strategy for assigning partitions to consumer streams. Possible values: range, roundrobin */ /** Select a strategy for assigning partitions to consumer streams. Possible values: range, roundrobin */
val partitionAssignmentStrategy = props.getString("partition.assignment.strategy", DefaultPartitionAssignmentStrategy) val partitionAssignmentStrategy = props.getString("partition.assignment.strategy", DefaultPartitionAssignmentStrategy)
validate(this) validate(this)
} }

View File

@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package kafka.consumer package kafka.consumer
import kafka.common.{OffsetAndMetadata, TopicAndPartition} import kafka.common.{OffsetAndMetadata, TopicAndPartition}
@ -26,8 +27,9 @@ import kafka.serializer._
/** /**
* Main interface for consumer * Main interface for consumer
*/ */
@deprecated("This trait has been deprecated and will be removed in a future release.", "0.11.0.0")
trait ConsumerConnector { trait ConsumerConnector {
/** /**
* Create a list of MessageStreams for each topic. * Create a list of MessageStreams for each topic.
* *
@ -37,7 +39,7 @@ trait ConsumerConnector {
* an iterator over message/metadata pairs. * an iterator over message/metadata pairs.
*/ */
def createMessageStreams(topicCountMap: Map[String,Int]): Map[String, List[KafkaStream[Array[Byte],Array[Byte]]]] def createMessageStreams(topicCountMap: Map[String,Int]): Map[String, List[KafkaStream[Array[Byte],Array[Byte]]]]
/** /**
* Create a list of MessageStreams for each topic. * Create a list of MessageStreams for each topic.
* *
@ -52,7 +54,7 @@ trait ConsumerConnector {
keyDecoder: Decoder[K], keyDecoder: Decoder[K],
valueDecoder: Decoder[V]) valueDecoder: Decoder[V])
: Map[String,List[KafkaStream[K,V]]] : Map[String,List[KafkaStream[K,V]]]
/** /**
* Create a list of message streams for all topics that match a given filter. * Create a list of message streams for all topics that match a given filter.
* *
@ -73,7 +75,7 @@ trait ConsumerConnector {
* Commit the offsets of all broker partitions connected by this connector. * Commit the offsets of all broker partitions connected by this connector.
*/ */
def commitOffsets(retryOnFailure: Boolean) def commitOffsets(retryOnFailure: Boolean)
/** /**
* KAFKA-1743: This method added for backward compatibility. * KAFKA-1743: This method added for backward compatibility.
*/ */
@ -90,13 +92,15 @@ trait ConsumerConnector {
* @param listener The consumer rebalance listener to wire in * @param listener The consumer rebalance listener to wire in
*/ */
def setConsumerRebalanceListener(listener: ConsumerRebalanceListener) def setConsumerRebalanceListener(listener: ConsumerRebalanceListener)
/** /**
* Shut down the connector * Shut down the connector
*/ */
def shutdown() def shutdown()
} }
@deprecated("This object has been deprecated and will be removed in a future release. " +
"Please use org.apache.kafka.clients.consumer.Consumer instead.", "0.11.0.0")
object Consumer extends Logging { object Consumer extends Logging {
/** /**
* Create a ConsumerConnector * Create a ConsumerConnector

View File

@ -38,6 +38,7 @@ import java.util.concurrent.atomic.AtomicInteger
* Once ConsumerFetcherManager is created, startConnections() and stopAllConnections() can be called repeatedly * Once ConsumerFetcherManager is created, startConnections() and stopAllConnections() can be called repeatedly
* until shutdown() is called. * until shutdown() is called.
*/ */
@deprecated("This class has been deprecated and will be removed in a future release.", "0.11.0.0")
class ConsumerFetcherManager(private val consumerIdString: String, class ConsumerFetcherManager(private val consumerIdString: String,
private val config: ConsumerConfig, private val config: ConsumerConfig,
private val zkUtils : ZkUtils) private val zkUtils : ZkUtils)

View File

@ -30,6 +30,8 @@ import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.MemoryRecords import org.apache.kafka.common.record.MemoryRecords
import org.apache.kafka.common.requests.EpochEndOffset import org.apache.kafka.common.requests.EpochEndOffset
@deprecated("This class has been deprecated and will be removed in a future release. " +
"Please use org.apache.kafka.clients.consumer.internals.Fetcher instead.", "0.11.0.0")
class ConsumerFetcherThread(name: String, class ConsumerFetcherThread(name: String,
val config: ConsumerConfig, val config: ConsumerConfig,
sourceBroker: BrokerEndPoint, sourceBroker: BrokerEndPoint,
@ -120,6 +122,8 @@ class ConsumerFetcherThread(name: String,
override def maybeTruncate(fetchedEpochs: Map[TopicPartition, EpochEndOffset]): Map[TopicPartition, Long] = { Map() } override def maybeTruncate(fetchedEpochs: Map[TopicPartition, EpochEndOffset]): Map[TopicPartition, Long] = { Map() }
} }
@deprecated("This object has been deprecated and will be removed in a future release. " +
"Please use org.apache.kafka.clients.consumer.internals.Fetcher instead.", "0.11.0.0")
object ConsumerFetcherThread { object ConsumerFetcherThread {
class FetchRequest(val underlying: kafka.api.FetchRequest) extends AbstractFetcherThread.FetchRequest { class FetchRequest(val underlying: kafka.api.FetchRequest) extends AbstractFetcherThread.FetchRequest {

View File

@ -30,6 +30,7 @@ import kafka.common.{KafkaException, MessageSizeTooLargeException}
* The iterator takes a shutdownCommand object which can be added to the queue to trigger a shutdown * The iterator takes a shutdownCommand object which can be added to the queue to trigger a shutdown
* *
*/ */
@deprecated("This class has been deprecated and will be removed in a future release.", "0.11.0.0")
class ConsumerIterator[K, V](private val channel: BlockingQueue[FetchedDataChunk], class ConsumerIterator[K, V](private val channel: BlockingQueue[FetchedDataChunk],
consumerTimeoutMs: Int, consumerTimeoutMs: Int,
private val keyDecoder: Decoder[K], private val keyDecoder: Decoder[K],
@ -116,5 +117,7 @@ class ConsumerIterator[K, V](private val channel: BlockingQueue[FetchedDataChunk
} }
} }
@deprecated("This class has been deprecated and will be removed in a future release. " +
"Please use org.apache.kafka.common.errors.TimeoutException instead.", "0.11.0.0")
class ConsumerTimeoutException() extends RuntimeException() class ConsumerTimeoutException() extends RuntimeException()

View File

@ -23,6 +23,7 @@ import kafka.metrics.KafkaMetricsGroup
import kafka.common.{ClientIdTopic, ClientIdAllTopics, ClientIdAndTopic} import kafka.common.{ClientIdTopic, ClientIdAllTopics, ClientIdAndTopic}
@threadsafe @threadsafe
@deprecated("This class has been deprecated and will be removed in a future release.", "0.11.0.0")
class ConsumerTopicMetrics(metricId: ClientIdTopic) extends KafkaMetricsGroup { class ConsumerTopicMetrics(metricId: ClientIdTopic) extends KafkaMetricsGroup {
val tags = metricId match { val tags = metricId match {
case ClientIdAndTopic(clientId, topic) => Map("clientId" -> clientId, "topic" -> topic) case ClientIdAndTopic(clientId, topic) => Map("clientId" -> clientId, "topic" -> topic)
@ -37,6 +38,7 @@ class ConsumerTopicMetrics(metricId: ClientIdTopic) extends KafkaMetricsGroup {
* Tracks metrics for each topic the given consumer client has consumed data from. * Tracks metrics for each topic the given consumer client has consumed data from.
* @param clientId The clientId of the given consumer client. * @param clientId The clientId of the given consumer client.
*/ */
@deprecated("This class has been deprecated and will be removed in a future release.", "0.11.0.0")
class ConsumerTopicStats(clientId: String) extends Logging { class ConsumerTopicStats(clientId: String) extends Logging {
private val valueFactory = (k: ClientIdAndTopic) => new ConsumerTopicMetrics(k) private val valueFactory = (k: ClientIdAndTopic) => new ConsumerTopicMetrics(k)
private val stats = new Pool[ClientIdAndTopic, ConsumerTopicMetrics](Some(valueFactory)) private val stats = new Pool[ClientIdAndTopic, ConsumerTopicMetrics](Some(valueFactory))
@ -52,6 +54,7 @@ class ConsumerTopicStats(clientId: String) extends Logging {
/** /**
* Stores the topic stats information of each consumer client in a (clientId -> ConsumerTopicStats) map. * Stores the topic stats information of each consumer client in a (clientId -> ConsumerTopicStats) map.
*/ */
@deprecated("This object has been deprecated and will be removed in a future release.", "0.11.0.0")
object ConsumerTopicStatsRegistry { object ConsumerTopicStatsRegistry {
private val valueFactory = (k: String) => new ConsumerTopicStats(k) private val valueFactory = (k: String) => new ConsumerTopicStats(k)
private val globalStats = new Pool[String, ConsumerTopicStats](Some(valueFactory)) private val globalStats = new Pool[String, ConsumerTopicStats](Some(valueFactory))

View File

@ -23,6 +23,7 @@ import kafka.common.{ClientIdAllBrokers, ClientIdBroker, ClientIdAndBroker}
import kafka.metrics.{KafkaMetricsGroup, KafkaTimer} import kafka.metrics.{KafkaMetricsGroup, KafkaTimer}
import kafka.utils.Pool import kafka.utils.Pool
@deprecated("This class has been deprecated and will be removed in a future release.", "0.11.0.0")
class FetchRequestAndResponseMetrics(metricId: ClientIdBroker) extends KafkaMetricsGroup { class FetchRequestAndResponseMetrics(metricId: ClientIdBroker) extends KafkaMetricsGroup {
val tags = metricId match { val tags = metricId match {
case ClientIdAndBroker(clientId, brokerHost, brokerPort) => case ClientIdAndBroker(clientId, brokerHost, brokerPort) =>
@ -41,6 +42,7 @@ class FetchRequestAndResponseMetrics(metricId: ClientIdBroker) extends KafkaMetr
* Tracks metrics of the requests made by a given consumer client to all brokers, and the responses obtained from the brokers. * Tracks metrics of the requests made by a given consumer client to all brokers, and the responses obtained from the brokers.
* @param clientId ClientId of the given consumer * @param clientId ClientId of the given consumer
*/ */
@deprecated("This class has been deprecated and will be removed in a future release.", "0.11.0.0")
class FetchRequestAndResponseStats(clientId: String) { class FetchRequestAndResponseStats(clientId: String) {
private val valueFactory = (k: ClientIdBroker) => new FetchRequestAndResponseMetrics(k) private val valueFactory = (k: ClientIdBroker) => new FetchRequestAndResponseMetrics(k)
private val stats = new Pool[ClientIdBroker, FetchRequestAndResponseMetrics](Some(valueFactory)) private val stats = new Pool[ClientIdBroker, FetchRequestAndResponseMetrics](Some(valueFactory))
@ -56,6 +58,7 @@ class FetchRequestAndResponseStats(clientId: String) {
/** /**
* Stores the fetch request and response stats information of each consumer client in a (clientId -> FetchRequestAndResponseStats) map. * Stores the fetch request and response stats information of each consumer client in a (clientId -> FetchRequestAndResponseStats) map.
*/ */
@deprecated("This object has been deprecated and will be removed in a future release.", "0.11.0.0")
object FetchRequestAndResponseStatsRegistry { object FetchRequestAndResponseStatsRegistry {
private val valueFactory = (k: String) => new FetchRequestAndResponseStats(k) private val valueFactory = (k: String) => new FetchRequestAndResponseStats(k)
private val globalStats = new Pool[String, FetchRequestAndResponseStats](Some(valueFactory)) private val globalStats = new Pool[String, FetchRequestAndResponseStats](Some(valueFactory))

View File

@ -5,7 +5,7 @@
* The ASF licenses this file to You under the Apache License, Version 2.0 * The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with * (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at * the License. You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
@ -19,6 +19,7 @@ package kafka.consumer
import kafka.message.ByteBufferMessageSet import kafka.message.ByteBufferMessageSet
@deprecated("This class has been deprecated and will be removed in a future release.", "0.11.0.0")
case class FetchedDataChunk(messages: ByteBufferMessageSet, case class FetchedDataChunk(messages: ByteBufferMessageSet,
topicInfo: PartitionTopicInfo, topicInfo: PartitionTopicInfo,
fetchOffset: Long) fetchOffset: Long)

View File

@ -22,6 +22,8 @@ import java.util.concurrent.BlockingQueue
import kafka.serializer.Decoder import kafka.serializer.Decoder
import kafka.message.MessageAndMetadata import kafka.message.MessageAndMetadata
@deprecated("This class has been deprecated and will be removed in a future release. " +
"Please use org.apache.kafka.streams.KafkaStreams instead.", "0.11.0.0")
class KafkaStream[K,V](private val queue: BlockingQueue[FetchedDataChunk], class KafkaStream[K,V](private val queue: BlockingQueue[FetchedDataChunk],
consumerTimeoutMs: Int, consumerTimeoutMs: Int,
private val keyDecoder: Decoder[K], private val keyDecoder: Decoder[K],

View File

@ -23,6 +23,8 @@ import kafka.utils.{Pool, CoreUtils, ZkUtils, Logging}
import scala.collection.mutable import scala.collection.mutable
@deprecated("This trait has been deprecated and will be removed in a future release. " +
"Please use org.apache.kafka.clients.consumer.internals.PartitionAssignor instead.", "0.11.0.0")
trait PartitionAssignor { trait PartitionAssignor {
/** /**
@ -34,6 +36,8 @@ trait PartitionAssignor {
} }
@deprecated("This object has been deprecated and will be removed in a future release. " +
"Please use org.apache.kafka.clients.consumer.internals.PartitionAssignor instead.", "0.11.0.0")
object PartitionAssignor { object PartitionAssignor {
def createInstance(assignmentStrategy: String) = assignmentStrategy match { def createInstance(assignmentStrategy: String) = assignmentStrategy match {
case "roundrobin" => new RoundRobinAssignor() case "roundrobin" => new RoundRobinAssignor()
@ -41,6 +45,7 @@ object PartitionAssignor {
} }
} }
@deprecated("This class has been deprecated and will be removed in a future release.", "0.11.0.0")
class AssignmentContext(group: String, val consumerId: String, excludeInternalTopics: Boolean, zkUtils: ZkUtils) { class AssignmentContext(group: String, val consumerId: String, excludeInternalTopics: Boolean, zkUtils: ZkUtils) {
val myTopicThreadIds: collection.Map[String, collection.Set[ConsumerThreadId]] = { val myTopicThreadIds: collection.Map[String, collection.Set[ConsumerThreadId]] = {
val myTopicCount = TopicCount.constructTopicCount(group, consumerId, zkUtils, excludeInternalTopics) val myTopicCount = TopicCount.constructTopicCount(group, consumerId, zkUtils, excludeInternalTopics)
@ -63,6 +68,8 @@ class AssignmentContext(group: String, val consumerId: String, excludeInternalTo
* instances are identical, then the partitions will be uniformly distributed. (i.e., the partition ownership counts * instances are identical, then the partitions will be uniformly distributed. (i.e., the partition ownership counts
* will be within a delta of exactly one across all consumer threads.) * will be within a delta of exactly one across all consumer threads.)
*/ */
@deprecated("This class has been deprecated and will be removed in a future release. " +
"Please use org.apache.kafka.clients.consumer.RoundRobinAssignor instead.", "0.11.0.0")
class RoundRobinAssignor() extends PartitionAssignor with Logging { class RoundRobinAssignor() extends PartitionAssignor with Logging {
def assign(ctx: AssignmentContext) = { def assign(ctx: AssignmentContext) = {
@ -117,6 +124,8 @@ class RoundRobinAssignor() extends PartitionAssignor with Logging {
* will get at least one partition and the first consumer thread will get one extra partition. So the assignment will be: * will get at least one partition and the first consumer thread will get one extra partition. So the assignment will be:
* p0 -> C1-0, p1 -> C1-0, p2 -> C1-1, p3 -> C2-0, p4 -> C2-1 * p0 -> C1-0, p1 -> C1-0, p2 -> C1-1, p3 -> C2-0, p4 -> C2-1
*/ */
@deprecated("This class has been deprecated and will be removed in a future release. " +
"Please use org.apache.kafka.clients.consumer.RangeAssignor instead.", "0.11.0.0")
class RangeAssignor() extends PartitionAssignor with Logging { class RangeAssignor() extends PartitionAssignor with Logging {
def assign(ctx: AssignmentContext) = { def assign(ctx: AssignmentContext) = {

View File

@ -5,7 +5,7 @@
* The ASF licenses this file to You under the Apache License, Version 2.0 * The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with * (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at * the License. You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
@ -22,6 +22,7 @@ import java.util.concurrent.atomic._
import kafka.message._ import kafka.message._
import kafka.utils.Logging import kafka.utils.Logging
@deprecated("This class has been deprecated and will be removed in a future release.", "0.11.0.0")
class PartitionTopicInfo(val topic: String, class PartitionTopicInfo(val topic: String,
val partitionId: Int, val partitionId: Int,
private val chunkQueue: BlockingQueue[FetchedDataChunk], private val chunkQueue: BlockingQueue[FetchedDataChunk],
@ -66,11 +67,12 @@ class PartitionTopicInfo(val topic: String,
chunkQueue.put(new FetchedDataChunk(messages, this, fetchedOffset.get)) chunkQueue.put(new FetchedDataChunk(messages, this, fetchedOffset.get))
} }
} }
override def toString: String = topic + ":" + partitionId.toString + ": fetched offset = " + fetchedOffset.get + override def toString: String = topic + ":" + partitionId.toString + ": fetched offset = " + fetchedOffset.get +
": consumed offset = " + consumedOffset.get ": consumed offset = " + consumedOffset.get
} }
@deprecated("This object has been deprecated and will be removed in a future release.", "0.11.0.0")
object PartitionTopicInfo { object PartitionTopicInfo {
val InvalidOffset = -1L val InvalidOffset = -1L

View File

@ -32,6 +32,8 @@ import org.apache.kafka.common.utils.Utils._
/** /**
* A consumer of kafka messages * A consumer of kafka messages
*/ */
@deprecated("This class has been deprecated and will be removed in a future release. " +
"Please use org.apache.kafka.clients.consumer.KafkaConsumer instead.", "0.11.0.0")
@threadsafe @threadsafe
class SimpleConsumer(val host: String, class SimpleConsumer(val host: String,
val port: Int, val port: Int,

View File

@ -5,7 +5,7 @@
* The ASF licenses this file to You under the Apache License, Version 2.0 * The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with * (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at * the License. You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
@ -21,6 +21,7 @@ import scala.collection._
import kafka.utils.{Json, ZKGroupDirs, ZkUtils, Logging, CoreUtils} import kafka.utils.{Json, ZKGroupDirs, ZkUtils, Logging, CoreUtils}
import kafka.common.KafkaException import kafka.common.KafkaException
@deprecated("This trait has been deprecated and will be removed in a future release.", "0.11.0.0")
private[kafka] trait TopicCount { private[kafka] trait TopicCount {
def getConsumerThreadIdsPerTopic: Map[String, Set[ConsumerThreadId]] def getConsumerThreadIdsPerTopic: Map[String, Set[ConsumerThreadId]]
@ -29,12 +30,14 @@ private[kafka] trait TopicCount {
} }
@deprecated("This class has been deprecated and will be removed in a future release.", "0.11.0.0")
case class ConsumerThreadId(consumer: String, threadId: Int) extends Ordered[ConsumerThreadId] { case class ConsumerThreadId(consumer: String, threadId: Int) extends Ordered[ConsumerThreadId] {
override def toString = "%s-%d".format(consumer, threadId) override def toString = "%s-%d".format(consumer, threadId)
def compare(that: ConsumerThreadId) = toString.compare(that.toString) def compare(that: ConsumerThreadId) = toString.compare(that.toString)
} }
@deprecated("This object has been deprecated and will be removed in a future release.", "0.11.0.0")
private[kafka] object TopicCount extends Logging { private[kafka] object TopicCount extends Logging {
val whiteListPattern = "white_list" val whiteListPattern = "white_list"
val blackListPattern = "black_list" val blackListPattern = "black_list"
@ -105,6 +108,7 @@ private[kafka] object TopicCount extends Logging {
} }
@deprecated("This class has been deprecated and will be removed in a future release.", "0.11.0.0")
private[kafka] class StaticTopicCount(val consumerIdString: String, private[kafka] class StaticTopicCount(val consumerIdString: String,
val topicCountMap: Map[String, Int]) val topicCountMap: Map[String, Int])
extends TopicCount { extends TopicCount {
@ -116,6 +120,7 @@ private[kafka] class StaticTopicCount(val consumerIdString: String,
def pattern = TopicCount.staticPattern def pattern = TopicCount.staticPattern
} }
@deprecated("This class has been deprecated and will be removed in a future release.", "0.11.0.0")
private[kafka] class WildcardTopicCount(zkUtils: ZkUtils, private[kafka] class WildcardTopicCount(zkUtils: ZkUtils,
consumerIdString: String, consumerIdString: String,
topicFilter: TopicFilter, topicFilter: TopicFilter,

View File

@ -17,6 +17,7 @@
package kafka.consumer package kafka.consumer
@deprecated("This trait has been deprecated and will be removed in a future release.", "0.11.0.0")
trait TopicEventHandler[T] { trait TopicEventHandler[T] {
def handleTopicEvent(allTopics: Seq[T]) def handleTopicEvent(allTopics: Seq[T])

View File

@ -22,6 +22,7 @@ import java.util.regex.{Pattern, PatternSyntaxException}
import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.internals.Topic
@deprecated("This class has been deprecated and will be removed in a future release.", "0.11.0.0")
sealed abstract class TopicFilter(rawRegex: String) extends Logging { sealed abstract class TopicFilter(rawRegex: String) extends Logging {
val regex = rawRegex val regex = rawRegex
@ -44,6 +45,7 @@ sealed abstract class TopicFilter(rawRegex: String) extends Logging {
def isTopicAllowed(topic: String, excludeInternalTopics: Boolean): Boolean def isTopicAllowed(topic: String, excludeInternalTopics: Boolean): Boolean
} }
@deprecated("This class has been deprecated and will be removed in a future release.", "0.11.0.0")
case class Whitelist(rawRegex: String) extends TopicFilter(rawRegex) { case class Whitelist(rawRegex: String) extends TopicFilter(rawRegex) {
override def isTopicAllowed(topic: String, excludeInternalTopics: Boolean) = { override def isTopicAllowed(topic: String, excludeInternalTopics: Boolean) = {
val allowed = topic.matches(regex) && !(Topic.isInternal(topic) && excludeInternalTopics) val allowed = topic.matches(regex) && !(Topic.isInternal(topic) && excludeInternalTopics)
@ -53,10 +55,9 @@ case class Whitelist(rawRegex: String) extends TopicFilter(rawRegex) {
allowed allowed
} }
} }
@deprecated("This class has been deprecated and will be removed in a future release.", "0.11.0.0")
case class Blacklist(rawRegex: String) extends TopicFilter(rawRegex) { case class Blacklist(rawRegex: String) extends TopicFilter(rawRegex) {
override def isTopicAllowed(topic: String, excludeInternalTopics: Boolean) = { override def isTopicAllowed(topic: String, excludeInternalTopics: Boolean) = {
val allowed = (!topic.matches(regex)) && !(Topic.isInternal(topic) && excludeInternalTopics) val allowed = (!topic.matches(regex)) && !(Topic.isInternal(topic) && excludeInternalTopics)

View File

@ -80,10 +80,12 @@ import scala.collection.JavaConverters._
* Each consumer tracks the offset of the latest message consumed for each partition. * Each consumer tracks the offset of the latest message consumed for each partition.
* *
*/ */
@deprecated("This object has been deprecated and will be removed in a future release.", "0.11.0.0")
private[kafka] object ZookeeperConsumerConnector { private[kafka] object ZookeeperConsumerConnector {
val shutdownCommand: FetchedDataChunk = new FetchedDataChunk(null, null, -1L) val shutdownCommand: FetchedDataChunk = new FetchedDataChunk(null, null, -1L)
} }
@deprecated("This class has been deprecated and will be removed in a future release.", "0.11.0.0")
private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
val enableFetcher: Boolean) // for testing only val enableFetcher: Boolean) // for testing only
extends ConsumerConnector with Logging with KafkaMetricsGroup { extends ConsumerConnector with Logging with KafkaMetricsGroup {

View File

@ -22,6 +22,7 @@ import kafka.utils.{ZkUtils, Logging}
import org.I0Itec.zkclient.{IZkStateListener, IZkChildListener} import org.I0Itec.zkclient.{IZkStateListener, IZkChildListener}
import org.apache.zookeeper.Watcher.Event.KeeperState import org.apache.zookeeper.Watcher.Event.KeeperState
@deprecated("This class has been deprecated and will be removed in a future release.", "0.11.0.0")
class ZookeeperTopicEventWatcher(val zkUtils: ZkUtils, class ZookeeperTopicEventWatcher(val zkUtils: ZkUtils,
val eventHandler: TopicEventHandler[String]) extends Logging { val eventHandler: TopicEventHandler[String]) extends Logging {

View File

@ -25,6 +25,10 @@ import kafka.serializer.Decoder;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
/**
* @deprecated since 0.11.0.0, this interface will be removed in a future release.
*/
@Deprecated
public interface ConsumerConnector { public interface ConsumerConnector {
/** /**
* Create a list of MessageStreams of type T for each topic. * Create a list of MessageStreams of type T for each topic.

View File

@ -25,6 +25,10 @@ import java.util.Set;
* This listener is used for execution of tasks defined by user when a consumer rebalance * This listener is used for execution of tasks defined by user when a consumer rebalance
* occurs in {@link kafka.consumer.ZookeeperConsumerConnector} * occurs in {@link kafka.consumer.ZookeeperConsumerConnector}
*/ */
/**
* @deprecated since 0.11.0.0, this interface will be removed in a future release.
*/
@Deprecated
public interface ConsumerRebalanceListener { public interface ConsumerRebalanceListener {
/** /**

View File

@ -24,6 +24,8 @@ import kafka.javaapi.OffsetRequest
/** /**
* A consumer of kafka messages * A consumer of kafka messages
*/ */
@deprecated("This class has been deprecated and will be removed in a future release. " +
"Please use org.apache.kafka.clients.consumer.KafkaConsumer instead.", "0.11.0.0")
@threadsafe @threadsafe
class SimpleConsumer(val host: String, class SimpleConsumer(val host: String,
val port: Int, val port: Int,
@ -46,7 +48,7 @@ class SimpleConsumer(val host: String,
import kafka.javaapi.Implicits._ import kafka.javaapi.Implicits._
underlying.fetch(request) underlying.fetch(request)
} }
/** /**
* Fetch a set of messages from a topic. * Fetch a set of messages from a topic.
* *
@ -59,7 +61,7 @@ class SimpleConsumer(val host: String,
/** /**
* Fetch metadata for a sequence of topics. * Fetch metadata for a sequence of topics.
* *
* @param request specifies the versionId, clientId, sequence of topics. * @param request specifies the versionId, clientId, sequence of topics.
* @return metadata for each topic in the request. * @return metadata for each topic in the request.
*/ */

View File

@ -59,6 +59,7 @@ import scala.collection.JavaConverters._
* *
*/ */
@deprecated("This class has been deprecated and will be removed in a future release.", "0.11.0.0")
private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
val enableFetcher: Boolean) // for testing only val enableFetcher: Boolean) // for testing only
extends ConsumerConnector { extends ConsumerConnector {

View File

@ -181,6 +181,7 @@ object KafkaMetricsGroup extends KafkaMetricsGroup with Logging {
else None else None
} }
@deprecated("This method has been deprecated and will be removed in a future release.", "0.11.0.0")
def removeAllConsumerMetrics(clientId: String) { def removeAllConsumerMetrics(clientId: String) {
FetchRequestAndResponseStatsRegistry.removeConsumerFetchRequestAndResponseStats(clientId) FetchRequestAndResponseStatsRegistry.removeConsumerFetchRequestAndResponseStats(clientId)
ConsumerTopicStatsRegistry.removeConsumerTopicStat(clientId) ConsumerTopicStatsRegistry.removeConsumerTopicStat(clientId)

View File

@ -25,6 +25,7 @@ import kafka.utils.{Logging, nonthreadsafe}
import org.apache.kafka.common.network.NetworkReceive import org.apache.kafka.common.network.NetworkReceive
@deprecated("This object has been deprecated and will be removed in a future release.", "0.11.0.0")
object BlockingChannel{ object BlockingChannel{
val UseDefaultBufferSize = -1 val UseDefaultBufferSize = -1
} }
@ -34,6 +35,7 @@ object BlockingChannel{
* *
*/ */
@nonthreadsafe @nonthreadsafe
@deprecated("This class has been deprecated and will be removed in a future release.", "0.11.0.0")
class BlockingChannel( val host: String, class BlockingChannel( val host: String,
val port: Int, val port: Int,
val readBufferSize: Int, val readBufferSize: Int,

View File

@ -241,7 +241,7 @@ abstract class AbstractFetcherThread(name: String,
!partitionStates.contains(tp) !partitionStates.contains(tp)
}.map { case (tp, offset) => }.map { case (tp, offset) =>
val fetchState = val fetchState =
if (PartitionTopicInfo.isOffsetInvalid(offset)) if (offset < 0)
new PartitionFetchState(handleOffsetOutOfRange(tp), includeLogTruncation) new PartitionFetchState(handleOffsetOutOfRange(tp), includeLogTruncation)
else else
new PartitionFetchState(offset, includeLogTruncation) new PartitionFetchState(offset, includeLogTruncation)

View File

@ -34,6 +34,7 @@ import kafka.api.PartitionOffsetRequestInfo
import org.I0Itec.zkclient.exception.ZkNoNodeException import org.I0Itec.zkclient.exception.ZkNoNodeException
import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.network.ListenerName
@deprecated("This class has been deprecated and will be removed in a future release.", "0.11.0.0")
object ConsumerOffsetChecker extends Logging { object ConsumerOffsetChecker extends Logging {
private val consumerMap: mutable.Map[Int, Option[SimpleConsumer]] = mutable.Map() private val consumerMap: mutable.Map[Int, Option[SimpleConsumer]] = mutable.Map()

View File

@ -5,7 +5,7 @@
* The ASF licenses this file to You under the Apache License, Version 2.0 * The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with * (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at * the License. You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
@ -30,23 +30,25 @@ import scala.collection.JavaConverters._
/** /**
* A utility that retrieves the offset of broker partitions in ZK and * A utility that retrieves the offset of broker partitions in ZK and
* prints to an output file in the following format: * prints to an output file in the following format:
* *
* /consumers/group1/offsets/topic1/1-0:286894308 * /consumers/group1/offsets/topic1/1-0:286894308
* /consumers/group1/offsets/topic1/2-0:284803985 * /consumers/group1/offsets/topic1/2-0:284803985
* *
* This utility expects 3 arguments: * This utility expects 3 arguments:
* 1. Zk host:port string * 1. Zk host:port string
* 2. group name (all groups implied if omitted) * 2. group name (all groups implied if omitted)
* 3. output filename * 3. output filename
* *
* To print debug message, add the following line to log4j.properties: * To print debug message, add the following line to log4j.properties:
* log4j.logger.kafka.tools.ExportZkOffsets$=DEBUG * log4j.logger.kafka.tools.ExportZkOffsets$=DEBUG
* (for eclipse debugging, copy log4j.properties to the binary directory in "core" such as core/bin) * (for eclipse debugging, copy log4j.properties to the binary directory in "core" such as core/bin)
*/ */
@deprecated("This class has been deprecated and will be removed in a future release.", "0.11.0.0")
object ExportZkOffsets extends Logging { object ExportZkOffsets extends Logging {
def main(args: Array[String]) { def main(args: Array[String]) {
val parser = new OptionParser(false) val parser = new OptionParser(false)
warn("WARNING: ExportZkOffsets is deprecated and will be dropped in a future release following 0.11.0.0.")
val zkConnectOpt = parser.accepts("zkconnect", "ZooKeeper connect string.") val zkConnectOpt = parser.accepts("zkconnect", "ZooKeeper connect string.")
.withRequiredArg() .withRequiredArg()
@ -59,19 +61,19 @@ object ExportZkOffsets extends Logging {
.withRequiredArg() .withRequiredArg()
.ofType(classOf[String]) .ofType(classOf[String])
parser.accepts("help", "Print this message.") parser.accepts("help", "Print this message.")
if(args.length == 0) if(args.length == 0)
CommandLineUtils.printUsageAndDie(parser, "Export consumer offsets to an output file.") CommandLineUtils.printUsageAndDie(parser, "Export consumer offsets to an output file.")
val options = parser.parse(args : _*) val options = parser.parse(args : _*)
if (options.has("help")) { if (options.has("help")) {
parser.printHelpOn(System.out) parser.printHelpOn(System.out)
Exit.exit(0) Exit.exit(0)
} }
CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt, outFileOpt) CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt, outFileOpt)
val zkConnect = options.valueOf(zkConnectOpt) val zkConnect = options.valueOf(zkConnectOpt)
val groups = options.valuesOf(groupOpt) val groups = options.valuesOf(groupOpt)
val outfile = options.valueOf(outFileOpt) val outfile = options.valueOf(outFileOpt)
@ -79,13 +81,13 @@ object ExportZkOffsets extends Logging {
var zkUtils : ZkUtils = null var zkUtils : ZkUtils = null
val fileWriter : OutputStreamWriter = val fileWriter : OutputStreamWriter =
new OutputStreamWriter(new FileOutputStream(outfile), StandardCharsets.UTF_8) new OutputStreamWriter(new FileOutputStream(outfile), StandardCharsets.UTF_8)
try { try {
zkUtils = ZkUtils(zkConnect, zkUtils = ZkUtils(zkConnect,
30000, 30000,
30000, 30000,
JaasUtils.isZkSecurityEnabled()) JaasUtils.isZkSecurityEnabled())
var consumerGroups: Seq[String] = null var consumerGroups: Seq[String] = null
if (groups.size == 0) { if (groups.size == 0) {
@ -94,13 +96,13 @@ object ExportZkOffsets extends Logging {
else { else {
consumerGroups = groups.asScala consumerGroups = groups.asScala
} }
for (consumerGrp <- consumerGroups) { for (consumerGrp <- consumerGroups) {
val topicsList = getTopicsList(zkUtils, consumerGrp) val topicsList = getTopicsList(zkUtils, consumerGrp)
for (topic <- topicsList) { for (topic <- topicsList) {
val bidPidList = getBrokeridPartition(zkUtils, consumerGrp, topic) val bidPidList = getBrokeridPartition(zkUtils, consumerGrp, topic)
for (bidPid <- bidPidList) { for (bidPid <- bidPidList) {
val zkGrpTpDir = new ZKGroupTopicDirs(consumerGrp,topic) val zkGrpTpDir = new ZKGroupTopicDirs(consumerGrp,topic)
val offsetPath = zkGrpTpDir.consumerOffsetDir + "/" + bidPid val offsetPath = zkGrpTpDir.consumerOffsetDir + "/" + bidPid
@ -113,9 +115,9 @@ object ExportZkOffsets extends Logging {
} }
} }
} }
} }
} }
finally { finally {
fileWriter.flush() fileWriter.flush()
fileWriter.close() fileWriter.close()
} }
@ -123,7 +125,7 @@ object ExportZkOffsets extends Logging {
private def getBrokeridPartition(zkUtils: ZkUtils, consumerGroup: String, topic: String): List[String] = private def getBrokeridPartition(zkUtils: ZkUtils, consumerGroup: String, topic: String): List[String] =
zkUtils.getChildrenParentMayNotExist("/consumers/%s/offsets/%s".format(consumerGroup, topic)).toList zkUtils.getChildrenParentMayNotExist("/consumers/%s/offsets/%s".format(consumerGroup, topic)).toList
private def getTopicsList(zkUtils: ZkUtils, consumerGroup: String): List[String] = private def getTopicsList(zkUtils: ZkUtils, consumerGroup: String): List[String] =
zkUtils.getChildren("/consumers/%s/offsets".format(consumerGroup)).toList zkUtils.getChildren("/consumers/%s/offsets".format(consumerGroup)).toList

View File

@ -31,11 +31,14 @@ import org.apache.kafka.common.utils.Time
/** /**
* Performance test for the simple consumer * Performance test for the simple consumer
*/ */
@deprecated("This class has been deprecated and will be removed in a future release.", "0.11.0.0")
object SimpleConsumerPerformance { object SimpleConsumerPerformance {
private val logger = Logger.getLogger(getClass()) private val logger = Logger.getLogger(getClass())
def main(args: Array[String]) { def main(args: Array[String]) {
logger.warn("WARNING: SimpleConsumerPerformance is deprecated and will be dropped in a future release following 0.11.0.0.")
val config = new ConsumerPerfConfig(args) val config = new ConsumerPerfConfig(args)
logger.info("Starting SimpleConsumer...") logger.info("Starting SimpleConsumer...")

View File

@ -5,7 +5,7 @@
* The ASF licenses this file to You under the Apache License, Version 2.0 * The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with * (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at * the License. You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
@ -32,11 +32,13 @@ import org.apache.kafka.common.utils.Utils
/** /**
* Command line program to dump out messages to standard out using the simple consumer * Command line program to dump out messages to standard out using the simple consumer
*/ */
@deprecated("This class has been deprecated and will be removed in a future release.", "0.11.0.0")
object SimpleConsumerShell extends Logging { object SimpleConsumerShell extends Logging {
def UseLeaderReplica = -1 def UseLeaderReplica = -1
def main(args: Array[String]): Unit = { def main(args: Array[String]): Unit = {
warn("WARNING: SimpleConsumerShell is deprecated and will be dropped in a future release following 0.11.0.0.")
val parser = new OptionParser(false) val parser = new OptionParser(false)
val brokerListOpt = parser.accepts("broker-list", "REQUIRED: The list of hostname and port of the server to connect to.") val brokerListOpt = parser.accepts("broker-list", "REQUIRED: The list of hostname and port of the server to connect to.")
@ -96,7 +98,7 @@ object SimpleConsumerShell extends Logging {
"skip it instead of halt.") "skip it instead of halt.")
val noWaitAtEndOfLogOpt = parser.accepts("no-wait-at-logend", val noWaitAtEndOfLogOpt = parser.accepts("no-wait-at-logend",
"If set, when the simple consumer reaches the end of the Log, it will stop, not waiting for new produced messages") "If set, when the simple consumer reaches the end of the Log, it will stop, not waiting for new produced messages")
if(args.length == 0) if(args.length == 0)
CommandLineUtils.printUsageAndDie(parser, "A low-level tool for fetching data directly from a particular replica.") CommandLineUtils.printUsageAndDie(parser, "A low-level tool for fetching data directly from a particular replica.")

View File

@ -5,7 +5,7 @@
* The ASF licenses this file to You under the Apache License, Version 2.0 * The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with * (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at * the License. You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
@ -17,11 +17,10 @@
package kafka.tools package kafka.tools
import org.I0Itec.zkclient.ZkClient
import kafka.consumer.{ConsumerConfig, SimpleConsumer} import kafka.consumer.{ConsumerConfig, SimpleConsumer}
import kafka.api.{OffsetRequest, PartitionOffsetRequestInfo} import kafka.api.{OffsetRequest, PartitionOffsetRequestInfo}
import kafka.common.{KafkaException, TopicAndPartition} import kafka.common.{KafkaException, TopicAndPartition}
import kafka.utils.{CoreUtils, Exit, ZKGroupTopicDirs, ZkUtils} import kafka.utils.{Exit, Logging, ZKGroupTopicDirs, ZkUtils}
import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.SecurityProtocol import org.apache.kafka.common.protocol.SecurityProtocol
import org.apache.kafka.common.security.JaasUtils import org.apache.kafka.common.security.JaasUtils
@ -30,11 +29,14 @@ import org.apache.kafka.common.utils.Utils
/** /**
* A utility that updates the offset of every broker partition to the offset of earliest or latest log segment file, in ZK. * A utility that updates the offset of every broker partition to the offset of earliest or latest log segment file, in ZK.
*/ */
object UpdateOffsetsInZK { @deprecated("This class has been deprecated and will be removed in a future release.", "0.11.0.0")
object UpdateOffsetsInZK extends Logging {
val Earliest = "earliest" val Earliest = "earliest"
val Latest = "latest" val Latest = "latest"
def main(args: Array[String]) { def main(args: Array[String]) {
warn("WARNING: UpdateOffsetsInZK is deprecated and will be dropped in releases following 0.11.0.0.")
if(args.length < 3) if(args.length < 3)
usage usage
val config = new ConsumerConfig(Utils.loadProps(args(1))) val config = new ConsumerConfig(Utils.loadProps(args(1)))

View File

@ -21,9 +21,11 @@ import joptsimple.OptionParser
import org.apache.kafka.common.security._ import org.apache.kafka.common.security._
import kafka.utils.{CommandLineUtils, Exit, Logging, ZKGroupTopicDirs, ZkUtils} import kafka.utils.{CommandLineUtils, Exit, Logging, ZKGroupTopicDirs, ZkUtils}
@deprecated("This class has been deprecated and will be removed in a future release.", "0.11.0.0")
object VerifyConsumerRebalance extends Logging { object VerifyConsumerRebalance extends Logging {
def main(args: Array[String]) { def main(args: Array[String]) {
val parser = new OptionParser(false) val parser = new OptionParser(false)
warn("WARNING: VerifyConsumerRebalance is deprecated and will be dropped in a future release following 0.11.0.0.")
val zkConnectOpt = parser.accepts("zookeeper.connect", "ZooKeeper connect string."). val zkConnectOpt = parser.accepts("zookeeper.connect", "ZooKeeper connect string.").
withRequiredArg().defaultsTo("localhost:2181").ofType(classOf[String]) withRequiredArg().defaultsTo("localhost:2181").ofType(classOf[String])

View File

@ -421,6 +421,7 @@ class ZkUtils(val zkClient: ZkClient,
} }
} }
@deprecated("This method has been deprecated and will be removed in a future release.", "0.11.0.0")
def getConsumerPartitionOwnerPath(group: String, topic: String, partition: Int): String = { def getConsumerPartitionOwnerPath(group: String, topic: String, partition: Int): String = {
val topicDirs = new ZKGroupTopicDirs(group, topic) val topicDirs = new ZKGroupTopicDirs(group, topic)
topicDirs.consumerOwnerDir + "/" + partition topicDirs.consumerOwnerDir + "/" + partition
@ -823,11 +824,13 @@ class ZkUtils(val zkClient: ZkClient,
zkClient.delete(brokerPartTopicPath) zkClient.delete(brokerPartTopicPath)
} }
@deprecated("This method has been deprecated and will be removed in a future release.", "0.11.0.0")
def getConsumersInGroup(group: String): Seq[String] = { def getConsumersInGroup(group: String): Seq[String] = {
val dirs = new ZKGroupDirs(group) val dirs = new ZKGroupDirs(group)
getChildren(dirs.consumerRegistryDir) getChildren(dirs.consumerRegistryDir)
} }
@deprecated("This method has been deprecated and will be removed in a future release.", "0.11.0.0")
def getConsumersPerTopic(group: String, excludeInternalTopics: Boolean): mutable.Map[String, List[ConsumerThreadId]] = { def getConsumersPerTopic(group: String, excludeInternalTopics: Boolean): mutable.Map[String, List[ConsumerThreadId]] = {
val dirs = new ZKGroupDirs(group) val dirs = new ZKGroupDirs(group)
val consumers = getChildrenParentMayNotExist(dirs.consumerRegistryDir) val consumers = getChildrenParentMayNotExist(dirs.consumerRegistryDir)
@ -847,6 +850,7 @@ class ZkUtils(val zkClient: ZkClient,
consumersPerTopicMap consumersPerTopicMap
} }
@deprecated("This method has been deprecated and will be removed in a future release.", "0.11.0.0")
def getTopicsPerMemberId(group: String, excludeInternalTopics: Boolean = true): Map[String, List[String]] = { def getTopicsPerMemberId(group: String, excludeInternalTopics: Boolean = true): Map[String, List[String]] = {
val dirs = new ZKGroupDirs(group) val dirs = new ZKGroupDirs(group)
val memberIds = getChildrenParentMayNotExist(dirs.consumerRegistryDir) val memberIds = getChildrenParentMayNotExist(dirs.consumerRegistryDir)
@ -916,14 +920,17 @@ class ZkUtils(val zkClient: ZkClient,
} }
} }
@deprecated("This method has been deprecated and will be removed in a future release.", "0.11.0.0")
def getConsumerGroups() = { def getConsumerGroups() = {
getChildren(ConsumersPath) getChildren(ConsumersPath)
} }
@deprecated("This method has been deprecated and will be removed in a future release.", "0.11.0.0")
def getTopicsByConsumerGroup(consumerGroup:String) = { def getTopicsByConsumerGroup(consumerGroup:String) = {
getChildrenParentMayNotExist(new ZKGroupDirs(consumerGroup).consumerGroupOwnersDir) getChildrenParentMayNotExist(new ZKGroupDirs(consumerGroup).consumerGroupOwnersDir)
} }
@deprecated("This method has been deprecated and will be removed in a future release.", "0.11.0.0")
def getAllConsumerGroupsForTopic(topic: String): Set[String] = { def getAllConsumerGroupsForTopic(topic: String): Set[String] = {
val groups = getChildrenParentMayNotExist(ConsumersPath) val groups = getChildrenParentMayNotExist(ConsumersPath)
if (groups == null) Set.empty if (groups == null) Set.empty
@ -957,6 +964,7 @@ private object ZKStringSerializer extends ZkSerializer {
} }
} }
@deprecated("This class has been deprecated and will be removed in a future release.", "0.11.0.0")
class ZKGroupDirs(val group: String) { class ZKGroupDirs(val group: String) {
def consumerDir = ConsumersPath def consumerDir = ConsumersPath
def consumerGroupDir = consumerDir + "/" + group def consumerGroupDir = consumerDir + "/" + group
@ -965,6 +973,7 @@ class ZKGroupDirs(val group: String) {
def consumerGroupOwnersDir = consumerGroupDir + "/owners" def consumerGroupOwnersDir = consumerGroupDir + "/owners"
} }
@deprecated("This class has been deprecated and will be removed in a future release.", "0.11.0.0")
class ZKGroupTopicDirs(group: String, topic: String) extends ZKGroupDirs(group) { class ZKGroupTopicDirs(group: String, topic: String) extends ZKGroupDirs(group) {
def consumerOffsetDir = consumerGroupOffsetsDir + "/" + topic def consumerOffsetDir = consumerGroupOffsetsDir + "/" + topic
def consumerOwnerDir = consumerGroupOwnersDir + "/" + topic def consumerOwnerDir = consumerGroupOwnersDir + "/" + topic

View File

@ -33,16 +33,16 @@ import scala.collection.JavaConverters._
/** /**
* This is a torture test that runs against an existing broker. Here is how it works: * This is a torture test that runs against an existing broker. Here is how it works:
* *
* It produces a series of specially formatted messages to one or more partitions. Each message it produces * It produces a series of specially formatted messages to one or more partitions. Each message it produces
* it logs out to a text file. The messages have a limited set of keys, so there is duplication in the key space. * it logs out to a text file. The messages have a limited set of keys, so there is duplication in the key space.
* *
* The broker will clean its log as the test runs. * The broker will clean its log as the test runs.
* *
* When the specified number of messages have been produced we create a consumer and consume all the messages in the topic * When the specified number of messages have been produced we create a consumer and consume all the messages in the topic
* and write that out to another text file. * and write that out to another text file.
* *
* Using a stable unix sort we sort both the producer log of what was sent and the consumer log of what was retrieved by the message key. * Using a stable unix sort we sort both the producer log of what was sent and the consumer log of what was retrieved by the message key.
* Then we compare the final message in both logs for each key. If this final message is not the same for all keys we * Then we compare the final message in both logs for each key. If this final message is not the same for all keys we
* print an error and exit with exit code 1, otherwise we print the size reduction and exit with exit code 0. * print an error and exit with exit code 1, otherwise we print the size reduction and exit with exit code 0.
*/ */
@ -129,13 +129,13 @@ object TestLogCleaning {
val consumedLines = lineCount(consumedDataFile) val consumedLines = lineCount(consumedDataFile)
val reduction = 1.0 - consumedLines.toDouble/producedLines.toDouble val reduction = 1.0 - consumedLines.toDouble/producedLines.toDouble
println("%d rows of data produced, %d rows of data consumed (%.1f%% reduction).".format(producedLines, consumedLines, 100 * reduction)) println("%d rows of data produced, %d rows of data consumed (%.1f%% reduction).".format(producedLines, consumedLines, 100 * reduction))
println("De-duplicating and validating output files...") println("De-duplicating and validating output files...")
validateOutput(producedDataFile, consumedDataFile) validateOutput(producedDataFile, consumedDataFile)
producedDataFile.delete() producedDataFile.delete()
consumedDataFile.delete() consumedDataFile.delete()
} }
def dumpLog(dir: File) { def dumpLog(dir: File) {
require(dir.exists, "Non-existent directory: " + dir.getAbsolutePath) require(dir.exists, "Non-existent directory: " + dir.getAbsolutePath)
for (file <- dir.list.sorted; if file.endsWith(Log.LogFileSuffix)) { for (file <- dir.list.sorted; if file.endsWith(Log.LogFileSuffix)) {
@ -151,9 +151,9 @@ object TestLogCleaning {
} }
} }
} }
def lineCount(file: File): Int = io.Source.fromFile(file).getLines.size def lineCount(file: File): Int = io.Source.fromFile(file).getLines.size
def validateOutput(producedDataFile: File, consumedDataFile: File) { def validateOutput(producedDataFile: File, consumedDataFile: File) {
val producedReader = externalSort(producedDataFile) val producedReader = externalSort(producedDataFile)
val consumedReader = externalSort(consumedDataFile) val consumedReader = externalSort(consumedDataFile)
@ -186,7 +186,7 @@ object TestLogCleaning {
producedDedupedFile.delete() producedDedupedFile.delete()
consumedDedupedFile.delete() consumedDedupedFile.delete()
} }
def valuesIterator(reader: BufferedReader) = { def valuesIterator(reader: BufferedReader) = {
new IteratorTemplate[TestRecord] { new IteratorTemplate[TestRecord] {
def makeNext(): TestRecord = { def makeNext(): TestRecord = {
@ -200,7 +200,7 @@ object TestLogCleaning {
} }
} }
} }
def readNext(reader: BufferedReader): TestRecord = { def readNext(reader: BufferedReader): TestRecord = {
var line = reader.readLine() var line = reader.readLine()
if(line == null) if(line == null)
@ -218,14 +218,14 @@ object TestLogCleaning {
} }
null null
} }
def peekLine(reader: BufferedReader) = { def peekLine(reader: BufferedReader) = {
reader.mark(4096) reader.mark(4096)
val line = reader.readLine val line = reader.readLine
reader.reset() reader.reset()
line line
} }
def externalSort(file: File): BufferedReader = { def externalSort(file: File): BufferedReader = {
val builder = new ProcessBuilder("sort", "--key=1,2", "--stable", "--buffer-size=20%", "--temporary-directory=" + System.getProperty("java.io.tmpdir"), file.getAbsolutePath) val builder = new ProcessBuilder("sort", "--key=1,2", "--stable", "--buffer-size=20%", "--temporary-directory=" + System.getProperty("java.io.tmpdir"), file.getAbsolutePath)
val process = builder.start() val process = builder.start()
@ -265,7 +265,7 @@ object TestLogCleaning {
val topic = topics((i % topics.length).toInt) val topic = topics((i % topics.length).toInt)
val key = rand.nextInt(keyCount) val key = rand.nextInt(keyCount)
val delete = i % 100 < percentDeletes val delete = i % 100 < percentDeletes
val msg = val msg =
if(delete) if(delete)
new ProducerRecord[Array[Byte],Array[Byte]](topic, key.toString.getBytes(), null) new ProducerRecord[Array[Byte],Array[Byte]](topic, key.toString.getBytes(), null)
else else
@ -278,7 +278,8 @@ object TestLogCleaning {
producer.close() producer.close()
producedFile producedFile
} }
@deprecated("This method has been deprecated and will be removed in a future release.", "0.11.0.0")
def makeConsumer(zkUrl: String, topics: Array[String]): ZookeeperConsumerConnector = { def makeConsumer(zkUrl: String, topics: Array[String]): ZookeeperConsumerConnector = {
val consumerProps = new Properties val consumerProps = new Properties
consumerProps.setProperty("group.id", "log-cleaner-test-" + new Random().nextInt(Int.MaxValue)) consumerProps.setProperty("group.id", "log-cleaner-test-" + new Random().nextInt(Int.MaxValue))
@ -287,7 +288,7 @@ object TestLogCleaning {
consumerProps.setProperty("auto.offset.reset", "smallest") consumerProps.setProperty("auto.offset.reset", "smallest")
new ZookeeperConsumerConnector(new ConsumerConfig(consumerProps)) new ZookeeperConsumerConnector(new ConsumerConfig(consumerProps))
} }
def consumeMessages(zkUrl: String, topics: Array[String]): File = { def consumeMessages(zkUrl: String, topics: Array[String]): File = {
val connector = makeConsumer(zkUrl, topics) val connector = makeConsumer(zkUrl, topics)
val streams = connector.createMessageStreams(topics.map(topic => (topic, 1)).toMap, new StringDecoder, new StringDecoder) val streams = connector.createMessageStreams(topics.map(topic => (topic, 1)).toMap, new StringDecoder, new StringDecoder)
@ -311,7 +312,7 @@ object TestLogCleaning {
connector.shutdown() connector.shutdown()
consumedFile consumedFile
} }
} }
case class TestRecord(topic: String, key: Int, value: Long, delete: Boolean) { case class TestRecord(topic: String, key: Int, value: Long, delete: Boolean) {

View File

@ -26,6 +26,7 @@ import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import kafka.integration.KafkaServerTestHarness import kafka.integration.KafkaServerTestHarness
@deprecated("This test has been deprecated and will be removed in a future release.", "0.11.0.0")
class DeleteConsumerGroupTest extends KafkaServerTestHarness { class DeleteConsumerGroupTest extends KafkaServerTestHarness {
def generateConfigs() = TestUtils.createBrokerConfigs(3, zkConnect, false, true).map(KafkaConfig.fromProps) def generateConfigs() = TestUtils.createBrokerConfigs(3, zkConnect, false, true).map(KafkaConfig.fromProps)

View File

@ -61,6 +61,7 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness {
} }
@Test @Test
@deprecated("This test has been deprecated and will be removed in a future release.", "0.11.0.0")
def testDescribeNonExistingGroup() { def testDescribeNonExistingGroup() {
// mocks // mocks
props.setProperty("zookeeper.connect", zkConnect) props.setProperty("zookeeper.connect", zkConnect)
@ -82,6 +83,7 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness {
} }
@Test @Test
@deprecated("This test has been deprecated and will be removed in a future release.", "0.11.0.0")
def testDescribeExistingGroup() { def testDescribeExistingGroup() {
// mocks // mocks
props.setProperty("zookeeper.connect", zkConnect) props.setProperty("zookeeper.connect", zkConnect)
@ -108,6 +110,7 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness {
} }
@Test @Test
@deprecated("This test has been deprecated and will be removed in a future release.", "0.11.0.0")
def testDescribeExistingGroupWithNoMembers() { def testDescribeExistingGroupWithNoMembers() {
// mocks // mocks
props.setProperty("zookeeper.connect", zkConnect) props.setProperty("zookeeper.connect", zkConnect)
@ -141,6 +144,7 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness {
} }
@Test @Test
@deprecated("This test has been deprecated and will be removed in a future release.", "0.11.0.0")
def testDescribeConsumersWithNoAssignedPartitions() { def testDescribeConsumersWithNoAssignedPartitions() {
// mocks // mocks
props.setProperty("zookeeper.connect", zkConnect) props.setProperty("zookeeper.connect", zkConnect)

View File

@ -33,6 +33,7 @@ import org.junit.{Before, Test}
import kafka.serializer._ import kafka.serializer._
import kafka.integration.KafkaServerTestHarness import kafka.integration.KafkaServerTestHarness
@deprecated("This test has been deprecated and will be removed in a future release.", "0.11.0.0")
class ConsumerIteratorTest extends KafkaServerTestHarness { class ConsumerIteratorTest extends KafkaServerTestHarness {
val numNodes = 1 val numNodes = 1
@ -72,9 +73,9 @@ class ConsumerIteratorTest extends KafkaServerTestHarness {
assertEquals(1, queue.size) assertEquals(1, queue.size)
queue.put(ZookeeperConsumerConnector.shutdownCommand) queue.put(ZookeeperConsumerConnector.shutdownCommand)
val iter = new ConsumerIterator[String, String](queue, val iter = new ConsumerIterator[String, String](queue,
consumerConfig.consumerTimeoutMs, consumerConfig.consumerTimeoutMs,
new StringDecoder(), new StringDecoder(),
new StringDecoder(), new StringDecoder(),
clientId = "") clientId = "")
val receivedMessages = (0 until 5).map(_ => iter.next.message) val receivedMessages = (0 until 5).map(_ => iter.next.message)

View File

@ -28,6 +28,7 @@ import kafka.consumer.PartitionAssignorTest.Scenario
import kafka.consumer.PartitionAssignorTest.WildcardSubscriptionInfo import kafka.consumer.PartitionAssignorTest.WildcardSubscriptionInfo
import org.junit.Test import org.junit.Test
@deprecated("This test has been deprecated and will be removed in a future release.", "0.11.0.0")
class PartitionAssignorTest extends Logging { class PartitionAssignorTest extends Logging {
@Test @Test
@ -42,7 +43,7 @@ class PartitionAssignorTest extends Logging {
val topicPartitionCounts = Map((1 to topicCount).map(topic => { val topicPartitionCounts = Map((1 to topicCount).map(topic => {
("topic-" + topic, PartitionAssignorTest.MinPartitionCount.max(TestUtils.random.nextInt(PartitionAssignorTest.MaxPartitionCount))) ("topic-" + topic, PartitionAssignorTest.MinPartitionCount.max(TestUtils.random.nextInt(PartitionAssignorTest.MaxPartitionCount)))
}):_*) }):_*)
val subscriptions = Map((1 to consumerCount).map { consumer => val subscriptions = Map((1 to consumerCount).map { consumer =>
val streamCount = 1.max(TestUtils.random.nextInt(PartitionAssignorTest.MaxStreamCount + 1)) val streamCount = 1.max(TestUtils.random.nextInt(PartitionAssignorTest.MaxStreamCount + 1))
("g1c" + consumer, WildcardSubscriptionInfo(streamCount, ".*", isWhitelist = true)) ("g1c" + consumer, WildcardSubscriptionInfo(streamCount, ".*", isWhitelist = true))

View File

@ -23,6 +23,7 @@ import org.junit.Assert._
import org.scalatest.junit.JUnitSuite import org.scalatest.junit.JUnitSuite
import org.junit.Test import org.junit.Test
@deprecated("This test has been deprecated and will be removed in a future release.", "0.11.0.0")
class TopicFilterTest extends JUnitSuite { class TopicFilterTest extends JUnitSuite {
@Test @Test
@ -45,7 +46,7 @@ class TopicFilterTest extends JUnitSuite {
val topicFilter4 = Whitelist("test-(?!bad\\b)[\\w]+") val topicFilter4 = Whitelist("test-(?!bad\\b)[\\w]+")
assertTrue(topicFilter4.isTopicAllowed("test-good", excludeInternalTopics = true)) assertTrue(topicFilter4.isTopicAllowed("test-good", excludeInternalTopics = true))
assertFalse(topicFilter4.isTopicAllowed("test-bad", excludeInternalTopics = true)) assertFalse(topicFilter4.isTopicAllowed("test-bad", excludeInternalTopics = true))
} }
@Test @Test
@ -80,5 +81,5 @@ class TopicFilterTest extends JUnitSuite {
assertEquals("-\\\\u001f-", getTopicCountMapKey("-\\u001f-")) assertEquals("-\\\\u001f-", getTopicCountMapKey("-\\u001f-"))
assertEquals("-\\\\u007f-", getTopicCountMapKey("-\\u007f-")) assertEquals("-\\\\u007f-", getTopicCountMapKey("-\\u007f-"))
assertEquals("-\\\\u009f-", getTopicCountMapKey("-\\u009f-")) assertEquals("-\\\\u009f-", getTopicCountMapKey("-\\u009f-"))
} }
} }

View File

@ -29,6 +29,7 @@ import kafka.server._
import kafka.consumer._ import kafka.consumer._
import kafka.utils.TestUtils import kafka.utils.TestUtils
@deprecated("This test has been deprecated and will be removed in a future release.", "0.11.0.0")
class FetcherTest extends KafkaServerTestHarness { class FetcherTest extends KafkaServerTestHarness {
val numNodes = 1 val numNodes = 1
def generateConfigs() = TestUtils.createBrokerConfigs(numNodes, zkConnect).map(KafkaConfig.fromProps) def generateConfigs() = TestUtils.createBrokerConfigs(numNodes, zkConnect).map(KafkaConfig.fromProps)

View File

@ -653,6 +653,8 @@ object TestUtils extends Logging {
props props
} }
@deprecated("This method has been deprecated and will be removed in a future release.", "0.11.0.0")
def updateConsumerOffset(config : ConsumerConfig, path : String, offset : Long) = { def updateConsumerOffset(config : ConsumerConfig, path : String, offset : Long) = {
val zkUtils = ZkUtils(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, false) val zkUtils = ZkUtils(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, false)
zkUtils.updatePersistentPath(path, offset.toString) zkUtils.updatePersistentPath(path, offset.toString)
@ -1051,7 +1053,7 @@ object TestUtils extends Logging {
case -1 => s"test-$x".getBytes case -1 => s"test-$x".getBytes
case _ => new Array[Byte](valueBytes) case _ => new Array[Byte](valueBytes)
}) })
val futures = values.map { value => val futures = values.map { value =>
producer.send(new ProducerRecord(topic, value)) producer.send(new ProducerRecord(topic, value))
} }
@ -1082,6 +1084,7 @@ object TestUtils extends Logging {
* If not specified, then all available messages will be consumed, and no exception is thrown. * If not specified, then all available messages will be consumed, and no exception is thrown.
* @return the list of messages consumed. * @return the list of messages consumed.
*/ */
@deprecated("This method has been deprecated and will be removed in a future release.", "0.11.0.0")
def getMessages(topicMessageStreams: Map[String, List[KafkaStream[String, String]]], def getMessages(topicMessageStreams: Map[String, List[KafkaStream[String, String]]],
nMessagesPerThread: Int = -1): List[String] = { nMessagesPerThread: Int = -1): List[String] = {

View File

@ -30,6 +30,10 @@ import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
/**
* @deprecated since 0.11.0.0. This class will be removed in a future release.
*/
@Deprecated
public class SimpleConsumerDemo { public class SimpleConsumerDemo {
private static void printMessages(ByteBufferMessageSet messageSet) throws UnsupportedEncodingException { private static void printMessages(ByteBufferMessageSet messageSet) throws UnsupportedEncodingException {