KAFKA-6238; Fix inter-broker protocol message format compatibility check

This patch fixes a bug in the validation of the inter-broker protocol and the message format version. We should allow the configured message format api version to be greater than the inter-broker protocol api version as long as the actual message format versions are equal. For example, if the message format version is set to 1.0, it is fine for the inter-broker protocol version to be 0.11.0 because they both use message format v2.

I have added a unit test which checks compatibility for all combinations of the message format version and the inter-broker protocol version.

Author: Jason Gustafson <jason@confluent.io>

Reviewers: Ismael Juma <ismael@juma.me.uk>

Closes #4583 from hachikuji/KAFKA-6328-REOPENED
This commit is contained in:
Jason Gustafson 2018-02-21 09:38:39 +00:00 committed by Damian Guy
parent 75df8cc77a
commit 660c0c0aa3
9 changed files with 131 additions and 32 deletions

View File

@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.record;
public enum RecordFormat {
V0(0), V1(1), V2(2);
public final byte value;
RecordFormat(int value) {
this.value = (byte) value;
}
public static RecordFormat lookup(byte version) {
switch (version) {
case 0: return V0;
case 1: return V1;
case 2: return V2;
default: throw new IllegalArgumentException("Unknown format version: " + version);
}
}
public static RecordFormat current() {
return V2;
}
}

View File

@ -17,7 +17,7 @@
package kafka.api
import org.apache.kafka.common.record.RecordBatch
import org.apache.kafka.common.record.RecordFormat
/**
* This class contains the different Kafka versions.
@ -90,11 +90,23 @@ object ApiVersion {
def latestVersion = versionNameMap.values.max
def allVersions: Set[ApiVersion] = {
versionNameMap.values.toSet
}
def minVersionForMessageFormat(messageFormatVersion: RecordFormat): String = {
messageFormatVersion match {
case RecordFormat.V0 => "0.8.0"
case RecordFormat.V1 => "0.10.0"
case RecordFormat.V2 => "0.11.0"
case _ => throw new IllegalArgumentException(s"Invalid message format version $messageFormatVersion")
}
}
}
sealed trait ApiVersion extends Ordered[ApiVersion] {
val version: String
val messageFormatVersion: Byte
val messageFormatVersion: RecordFormat
val id: Int
override def compare(that: ApiVersion): Int =
@ -106,90 +118,90 @@ sealed trait ApiVersion extends Ordered[ApiVersion] {
// Keep the IDs in order of versions
case object KAFKA_0_8_0 extends ApiVersion {
val version: String = "0.8.0.X"
val messageFormatVersion: Byte = RecordBatch.MAGIC_VALUE_V0
val messageFormatVersion = RecordFormat.V0
val id: Int = 0
}
case object KAFKA_0_8_1 extends ApiVersion {
val version: String = "0.8.1.X"
val messageFormatVersion: Byte = RecordBatch.MAGIC_VALUE_V0
val messageFormatVersion = RecordFormat.V0
val id: Int = 1
}
case object KAFKA_0_8_2 extends ApiVersion {
val version: String = "0.8.2.X"
val messageFormatVersion: Byte = RecordBatch.MAGIC_VALUE_V0
val messageFormatVersion = RecordFormat.V0
val id: Int = 2
}
case object KAFKA_0_9_0 extends ApiVersion {
val version: String = "0.9.0.X"
val messageFormatVersion: Byte = RecordBatch.MAGIC_VALUE_V0
val messageFormatVersion = RecordFormat.V0
val id: Int = 3
}
case object KAFKA_0_10_0_IV0 extends ApiVersion {
val version: String = "0.10.0-IV0"
val messageFormatVersion: Byte = RecordBatch.MAGIC_VALUE_V1
val messageFormatVersion = RecordFormat.V1
val id: Int = 4
}
case object KAFKA_0_10_0_IV1 extends ApiVersion {
val version: String = "0.10.0-IV1"
val messageFormatVersion: Byte = RecordBatch.MAGIC_VALUE_V1
val messageFormatVersion = RecordFormat.V1
val id: Int = 5
}
case object KAFKA_0_10_1_IV0 extends ApiVersion {
val version: String = "0.10.1-IV0"
val messageFormatVersion: Byte = RecordBatch.MAGIC_VALUE_V1
val messageFormatVersion = RecordFormat.V1
val id: Int = 6
}
case object KAFKA_0_10_1_IV1 extends ApiVersion {
val version: String = "0.10.1-IV1"
val messageFormatVersion: Byte = RecordBatch.MAGIC_VALUE_V1
val messageFormatVersion = RecordFormat.V1
val id: Int = 7
}
case object KAFKA_0_10_1_IV2 extends ApiVersion {
val version: String = "0.10.1-IV2"
val messageFormatVersion: Byte = RecordBatch.MAGIC_VALUE_V1
val messageFormatVersion = RecordFormat.V1
val id: Int = 8
}
case object KAFKA_0_10_2_IV0 extends ApiVersion {
val version: String = "0.10.2-IV0"
val messageFormatVersion: Byte = RecordBatch.MAGIC_VALUE_V1
val messageFormatVersion = RecordFormat.V1
val id: Int = 9
}
case object KAFKA_0_11_0_IV0 extends ApiVersion {
val version: String = "0.11.0-IV0"
val messageFormatVersion: Byte = RecordBatch.MAGIC_VALUE_V2
val messageFormatVersion = RecordFormat.V2
val id: Int = 10
}
case object KAFKA_0_11_0_IV1 extends ApiVersion {
val version: String = "0.11.0-IV1"
val messageFormatVersion: Byte = RecordBatch.MAGIC_VALUE_V2
val messageFormatVersion = RecordFormat.V2
val id: Int = 11
}
case object KAFKA_0_11_0_IV2 extends ApiVersion {
val version: String = "0.11.0-IV2"
val messageFormatVersion: Byte = RecordBatch.MAGIC_VALUE_V2
val messageFormatVersion = RecordFormat.V2
val id: Int = 12
}
case object KAFKA_1_0_IV0 extends ApiVersion {
val version: String = "1.0-IV0"
val messageFormatVersion: Byte = RecordBatch.MAGIC_VALUE_V2
val messageFormatVersion = RecordFormat.V2
val id: Int = 13
}
case object KAFKA_1_1_IV0 extends ApiVersion {
val version: String = "1.1-IV0"
val messageFormatVersion: Byte = RecordBatch.MAGIC_VALUE_V2
val messageFormatVersion = RecordFormat.V2
val id: Int = 14
}

View File

@ -465,7 +465,7 @@ class Log(@volatile var dir: File,
private def loadProducerState(lastOffset: Long, reloadFromCleanShutdown: Boolean): Unit = lock synchronized {
checkIfMemoryMappedBufferClosed()
val messageFormatVersion = config.messageFormatVersion.messageFormatVersion
val messageFormatVersion = config.messageFormatVersion.messageFormatVersion.value
info(s"Loading producer state from offset $lastOffset for partition $topicPartition with message " +
s"format version $messageFormatVersion")
@ -663,7 +663,7 @@ class Log(@volatile var dir: File,
appendInfo.sourceCodec,
appendInfo.targetCodec,
config.compact,
config.messageFormatVersion.messageFormatVersion,
config.messageFormatVersion.messageFormatVersion.value,
config.messageTimestampType,
config.messageTimestampDifferenceMaxMs,
leaderEpoch,

View File

@ -59,7 +59,7 @@ import DescribeLogDirsResponse.LogDirInfo
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.common.security.token.delegation.{DelegationToken, TokenInformation}
import scala.collection.{mutable, _}
import scala.collection._
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import scala.util.{Failure, Success, Try}
@ -1347,7 +1347,8 @@ class KafkaApis(val requestChannel: RequestChannel,
if (apiVersionRequest.hasUnsupportedRequestVersion)
apiVersionRequest.getErrorResponse(requestThrottleMs, Errors.UNSUPPORTED_VERSION.exception)
else
ApiVersionsResponse.apiVersionsResponse(requestThrottleMs, config.interBrokerProtocolVersion.messageFormatVersion)
ApiVersionsResponse.apiVersionsResponse(requestThrottleMs,
config.interBrokerProtocolVersion.messageFormatVersion.value)
}
sendResponseMaybeThrottle(request, createResponseCallback)
}

View File

@ -1330,8 +1330,13 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
require(!advertisedListeners.exists(endpoint => endpoint.host=="0.0.0.0"),
s"${KafkaConfig.AdvertisedListenersProp} cannot use the nonroutable meta-address 0.0.0.0. "+
s"Use a routable IP address.")
require(interBrokerProtocolVersion >= logMessageFormatVersion,
s"log.message.format.version $logMessageFormatVersionString cannot be used when inter.broker.protocol.version is set to $interBrokerProtocolVersionString")
val messageFormatVersion = logMessageFormatVersion.messageFormatVersion
require(interBrokerProtocolVersion.messageFormatVersion.value >= messageFormatVersion.value,
s"log.message.format.version $logMessageFormatVersionString can only be used when " +
"inter.broker.protocol.version is set to version " +
s"${ApiVersion.minVersionForMessageFormat(messageFormatVersion)} or higher")
val interBrokerUsesSasl = interBrokerSecurityProtocol == SecurityProtocol.SASL_PLAINTEXT || interBrokerSecurityProtocol == SecurityProtocol.SASL_SSL
require(!interBrokerUsesSasl || saslInterBrokerHandshakeRequestEnable || saslMechanismInterBrokerProtocol == SaslConfigs.GSSAPI_MECHANISM,
s"Only GSSAPI mechanism is supported for inter-broker communication with SASL when inter.broker.protocol.version is set to $interBrokerProtocolVersionString")

View File

@ -1002,7 +1002,7 @@ class ReplicaManager(val config: KafkaConfig,
}
def getMagic(topicPartition: TopicPartition): Option[Byte] =
getReplica(topicPartition).flatMap(_.log.map(_.config.messageFormatVersion.messageFormatVersion))
getReplica(topicPartition).flatMap(_.log.map(_.config.messageFormatVersion.messageFormatVersion.value))
def maybeUpdateMetadataCache(correlationId: Int, updateMetadataRequest: UpdateMetadataRequest) : Seq[TopicPartition] = {
replicaStateChangeLock synchronized {

View File

@ -17,6 +17,7 @@
package kafka.api
import org.apache.kafka.common.record.RecordFormat
import org.junit.Test
import org.junit.Assert._
@ -74,4 +75,16 @@ class ApiVersionTest {
assertEquals(KAFKA_1_0_IV0, ApiVersion("1.0.1"))
}
@Test
def testMinVersionForMessageFormat(): Unit = {
assertEquals("0.8.0", ApiVersion.minVersionForMessageFormat(RecordFormat.V0))
assertEquals("0.10.0", ApiVersion.minVersionForMessageFormat(RecordFormat.V1))
assertEquals("0.11.0", ApiVersion.minVersionForMessageFormat(RecordFormat.V2))
// Ensure that all message format versions have a defined min version so that we remember
// to update the function
for (messageFormatVersion <- RecordFormat.values)
assertNotNull(ApiVersion.minVersionForMessageFormat(messageFormatVersion))
}
}

View File

@ -522,6 +522,30 @@ class KafkaConfigTest {
}
}
@Test
def testInterBrokerVersionMessageFormatCompatibility(): Unit = {
def buildConfig(interBrokerProtocol: ApiVersion, messageFormat: ApiVersion): KafkaConfig = {
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
props.put(KafkaConfig.InterBrokerProtocolVersionProp, interBrokerProtocol.version)
props.put(KafkaConfig.LogMessageFormatVersionProp, messageFormat.version)
KafkaConfig.fromProps(props)
}
ApiVersion.allVersions.foreach { interBrokerVersion =>
ApiVersion.allVersions.foreach { messageFormatVersion =>
if (interBrokerVersion.messageFormatVersion.value >= messageFormatVersion.messageFormatVersion.value) {
val config = buildConfig(interBrokerVersion, messageFormatVersion)
assertEquals(messageFormatVersion, config.logMessageFormatVersion)
assertEquals(interBrokerVersion, config.interBrokerProtocolVersion)
} else {
intercept[IllegalArgumentException] {
buildConfig(interBrokerVersion, messageFormatVersion)
}
}
}
}
}
@Test
def testFromPropsInvalid() {
def getBaseProperties(): Properties = {

View File

@ -36,10 +36,10 @@
<li>log.message.format.version=CURRENT_MESSAGE_FORMAT_VERSION (See <a href="#upgrade_10_performance_impact">potential performance impact
following the upgrade</a> for the details on what this configuration does.)</li>
</ul>
If you are upgrading from 0.11.0.x and you have not overridden the message format, then you only need to override
If you are upgrading from 0.11.0.x or 1.0.x and you have not overridden the message format, then you only need to override
the inter-broker protocol format.
<ul>
<li>inter.broker.protocol.version=CURRENT_KAFKA_VERSION (e.g. 0.8.2, 0.9.0, 0.10.0, 0.10.1, 0.10.2, 0.11.0, 1.0).</li>
<li>inter.broker.protocol.version=CURRENT_KAFKA_VERSION (0.11.0 or 1.0).</li>
</ul>
</li>
<li> Upgrade the brokers one at a time: shut down the broker, update the code, and restart it. </li>
@ -106,10 +106,11 @@
<li>log.message.format.version=CURRENT_MESSAGE_FORMAT_VERSION (See <a href="#upgrade_10_performance_impact">potential performance impact
following the upgrade</a> for the details on what this configuration does.)</li>
</ul>
If you are upgrading from 0.11.0.x and you have not overridden the message format, then you only need to override
the inter-broker protocol format.
If you are upgrading from 0.11.0.x and you have not overridden the message format, you must set
both the message format version and the inter-broker protocol version to 0.11.0.
<ul>
<li>inter.broker.protocol.version=CURRENT_KAFKA_VERSION (e.g. 0.8.2, 0.9.0, 0.10.0, 0.10.1, 0.10.2, 0.11.0).</li>
<li>inter.broker.protocol.version=0.11.0</li>
<li>log.message.format.version=0.11.0</li>
</ul>
</li>
<li> Upgrade the brokers one at a time: shut down the broker, update the code, and restart it. </li>
@ -117,9 +118,11 @@
<li> Restart the brokers one by one for the new protocol version to take effect. </li>
<li> If you have overridden the message format version as instructed above, then you need to do one more rolling restart to
upgrade it to its latest version. Once all (or most) consumers have been upgraded to 0.11.0 or later,
change log.message.format.version to 1.0 on each broker and restart them one by one. Note that the older Scala consumer
does not support the new message format introduced in 0.11, so to avoid the performance cost of down-conversion (or to
take advantage of <a href="#upgrade_11_exactly_once_semantics">exactly once semantics</a>), the newer Java consumer must be used.</li>
change log.message.format.version to 1.0 on each broker and restart them one by one. If you are upgrading from
0.11.0 and log.message.format.version is set to 0.11.0, you can update the config and skip the rolling restart.
Note that the older Scala consumer does not support the new message format introduced in 0.11, so to avoid the
performance cost of down-conversion (or to take advantage of <a href="#upgrade_11_exactly_once_semantics">exactly once semantics</a>),
the newer Java consumer must be used.</li>
</ol>
<p><b>Additional Upgrade Notes:</b></p>