From ac757eb19ec50b79ab2b5336fa4d7411cd99456d Mon Sep 17 00:00:00 2001 From: Manikumar reddy O Date: Thu, 24 Sep 2015 12:38:27 -0700 Subject: [PATCH] KAFKA-2554: change 0.8.3 to 0.9.0 in ApiVersion and other files Updated the version from 0.8.3 to 0.9.0. in ApiVersion. Also updated in gradle.propeties. Author: Manikumar reddy O Reviewers: Ismael Juma, Gwen Shapira Closes #237 from omkreddy/KAFKA-2554 --- core/src/main/scala/kafka/api/ApiVersion.scala | 8 ++++---- .../scala/kafka/controller/ControllerChannelManager.scala | 4 ++-- core/src/main/scala/kafka/server/KafkaConfig.scala | 2 +- core/src/main/scala/kafka/server/KafkaServer.scala | 6 +++--- .../main/scala/kafka/server/ReplicaFetcherThread.scala | 4 ++-- gradle.properties | 2 +- tests/setup.py | 2 +- 7 files changed, 14 insertions(+), 14 deletions(-) diff --git a/core/src/main/scala/kafka/api/ApiVersion.scala b/core/src/main/scala/kafka/api/ApiVersion.scala index a7c15f3781a..c9c19761ec2 100644 --- a/core/src/main/scala/kafka/api/ApiVersion.scala +++ b/core/src/main/scala/kafka/api/ApiVersion.scala @@ -33,7 +33,7 @@ object ApiVersion { "0.8.0" -> KAFKA_080, "0.8.1" -> KAFKA_081, "0.8.2" -> KAFKA_082, - "0.8.3" -> KAFKA_083 + "0.9.0" -> KAFKA_090 ) def apply(version: String): ApiVersion = versionNameMap(version.split("\\.").slice(0,3).mkString(".")) @@ -72,7 +72,7 @@ case object KAFKA_082 extends ApiVersion { val id: Int = 2 } -case object KAFKA_083 extends ApiVersion { - val version: String = "0.8.3.X" +case object KAFKA_090 extends ApiVersion { + val version: String = "0.9.0.X" val id: Int = 3 -} \ No newline at end of file +} diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala index 0d62c966635..1220c3829da 100755 --- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala +++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala @@ -16,7 +16,7 @@ */ package kafka.controller -import kafka.api.{LeaderAndIsr, KAFKA_083, PartitionStateInfo} +import kafka.api.{LeaderAndIsr, KAFKA_090, PartitionStateInfo} import kafka.utils._ import org.apache.kafka.clients.{ClientResponse, ClientRequest, ManualMetadataUpdater, NetworkClient} import org.apache.kafka.common.{TopicPartition, Node} @@ -379,7 +379,7 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends Logging topicPartition -> partitionState } - val version = if (controller.config.interBrokerProtocolVersion.onOrAfter(KAFKA_083)) (1: Short) else (0: Short) + val version = if (controller.config.interBrokerProtocolVersion.onOrAfter(KAFKA_090)) (1: Short) else (0: Short) val updateMetadataRequest = if (version == 0) { diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 1e8b2331486..1c594f6fa0d 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -431,7 +431,7 @@ object KafkaConfig { val InterBrokerSecurityProtocolDoc = "Security protocol used to communicate between brokers. Defaults to plain text." val InterBrokerProtocolVersionDoc = "Specify which version of the inter-broker protocol will be used.\n" + " This is typically bumped after all brokers were upgraded to a new version.\n" + - " Example of some valid values are: 0.8.0, 0.8.1, 0.8.1.1, 0.8.2, 0.8.2.0, 0.8.2.1, 0.8.3, 0.8.3.0. Check ApiVersion for the full list." + " Example of some valid values are: 0.8.0, 0.8.1, 0.8.1.1, 0.8.2, 0.8.2.0, 0.8.2.1, 0.9.0.0, 0.9.0.1 Check ApiVersion for the full list." /** ********* Controlled shutdown configuration ***********/ val ControlledShutdownMaxRetriesDoc = "Controlled shutdown can fail for multiple reasons. This determines the number of retries when such failure happens" val ControlledShutdownRetryBackoffMsDoc = "Before each retry, the system needs time to recover from the state that caused the previous failure (Controller fail over, replica lag etc). This config determines the amount of time to wait before retrying." diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index ba3333ddee2..0a60efbfd8d 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -21,7 +21,7 @@ import java.net.{SocketTimeoutException} import java.util import kafka.admin._ -import kafka.api.{KAFKA_083, ApiVersion} +import kafka.api.{KAFKA_090, ApiVersion} import kafka.log.LogConfig import kafka.log.CleanerConfig import kafka.log.LogManager @@ -481,9 +481,9 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr brokerState.newState(PendingControlledShutdown) val shutdownSucceeded = - // Before 0.8.3, `ControlledShutdownRequest` did not contain `client_id` and it's a mandatory field in + // Before 0.9.0.0, `ControlledShutdownRequest` did not contain `client_id` and it's a mandatory field in // `RequestHeader`, which is used by `NetworkClient` - if (config.interBrokerProtocolVersion.onOrAfter(KAFKA_083)) + if (config.interBrokerProtocolVersion.onOrAfter(KAFKA_090)) networkClientControlledShutdown(config.controlledShutdownMaxRetries.intValue) else blockingChannelControlledShutdown(config.controlledShutdownMaxRetries.intValue) diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index 6c85e52e172..12698c7bb50 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -23,7 +23,7 @@ import kafka.admin.AdminUtils import kafka.cluster.BrokerEndPoint import kafka.log.LogConfig import kafka.message.ByteBufferMessageSet -import kafka.api.KAFKA_083 +import kafka.api.KAFKA_090 import kafka.common.{KafkaStorageException, TopicAndPartition} import ReplicaFetcherThread._ import org.apache.kafka.clients.{ManualMetadataUpdater, NetworkClient, ClientRequest, ClientResponse} @@ -54,7 +54,7 @@ class ReplicaFetcherThread(name: String, type REQ = FetchRequest type PD = PartitionData - private val fetchRequestVersion: Short = if (brokerConfig.interBrokerProtocolVersion.onOrAfter(KAFKA_083)) 1 else 0 + private val fetchRequestVersion: Short = if (brokerConfig.interBrokerProtocolVersion.onOrAfter(KAFKA_090)) 1 else 0 private val socketTimeout: Int = brokerConfig.replicaSocketTimeoutMs private val replicaId = brokerConfig.brokerId private val maxWait = brokerConfig.replicaFetchWaitMaxMs diff --git a/gradle.properties b/gradle.properties index 6f18a50902e..faeaf885efe 100644 --- a/gradle.properties +++ b/gradle.properties @@ -14,7 +14,7 @@ # limitations under the License. group=org.apache.kafka -version=0.8.3-SNAPSHOT +version=0.9.0.0-SNAPSHOT scalaVersion=2.10.5 task=build org.gradle.jvmargs=-XX:MaxPermSize=512m -Xmx1024m -Xss2m diff --git a/tests/setup.py b/tests/setup.py index 00e58dd21c2..d637eb8dd0b 100644 --- a/tests/setup.py +++ b/tests/setup.py @@ -17,7 +17,7 @@ from setuptools import find_packages, setup setup(name="kafkatest", - version="0.8.3.dev0", + version="0.9.0.dev0", description="Apache Kafka System Tests", author="Apache Kafka", platforms=["any"],