KAFKA-14869: Bump coordinator value records to flexible versions (KIP-915, Part-2) (#13526)

This patch implemented the second part of KIP-915. It bumps the versions of the value records used by the group coordinator and the transaction coordinator to make them flexible versions. The new versions are not used when writing to the partitions but only when reading from the partitions. This allows downgrades from future versions that will include tagged fields.

Reviewers: David Jacot <djacot@confluent.io>
This commit is contained in:
Jeff Kim 2023-04-18 09:37:04 -04:00 committed by GitHub
parent b36a170aa3
commit 61530d68ce
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 368 additions and 20 deletions

View File

@ -16,24 +16,27 @@
{
"type": "data",
"name": "TransactionLogValue",
"validVersions": "0",
"flexibleVersions": "none",
// Version 1 is the first flexible version.
// KIP-915: bumping the version will no longer make this record backward compatible.
// We suggest to add/remove only tagged fields to maintain backward compatibility.
"validVersions": "0-1",
"flexibleVersions": "1+",
"fields": [
{ "name": "ProducerId", "type": "int64", "versions": "0",
{ "name": "ProducerId", "type": "int64", "versions": "0+",
"about": "Producer id in use by the transactional id"},
{ "name": "ProducerEpoch", "type": "int16", "versions": "0",
{ "name": "ProducerEpoch", "type": "int16", "versions": "0+",
"about": "Epoch associated with the producer id"},
{ "name": "TransactionTimeoutMs", "type": "int32", "versions": "0",
{ "name": "TransactionTimeoutMs", "type": "int32", "versions": "0+",
"about": "Transaction timeout in milliseconds"},
{ "name": "TransactionStatus", "type": "int8", "versions": "0",
{ "name": "TransactionStatus", "type": "int8", "versions": "0+",
"about": "TransactionState the transaction is in"},
{ "name": "TransactionPartitions", "type": "[]PartitionsSchema", "versions": "0", "nullableVersions": "0",
{ "name": "TransactionPartitions", "type": "[]PartitionsSchema", "versions": "0+", "nullableVersions": "0+",
"about": "Set of partitions involved in the transaction", "fields": [
{ "name": "Topic", "type": "string", "versions": "0"},
{ "name": "PartitionIds", "type": "[]int32", "versions": "0"}]},
{ "name": "TransactionLastUpdateTimestampMs", "type": "int64", "versions": "0",
{ "name": "Topic", "type": "string", "versions": "0+"},
{ "name": "PartitionIds", "type": "[]int32", "versions": "0+"}]},
{ "name": "TransactionLastUpdateTimestampMs", "type": "int64", "versions": "0+",
"about": "Time the transaction was last updated"},
{ "name": "TransactionStartTimestampMs", "type": "int64", "versions": "0",
{ "name": "TransactionStartTimestampMs", "type": "int64", "versions": "0+",
"about": "Time the transaction was started"}
]
}

View File

@ -1088,6 +1088,8 @@ object GroupMetadataManager {
val version =
if (metadataVersion.isLessThan(IBP_2_1_IV0) || offsetAndMetadata.expireTimestamp.nonEmpty) 1.toShort
else if (metadataVersion.isLessThan(IBP_2_1_IV1)) 2.toShort
// Serialize with the highest supported non-flexible version
// until a tagged field is introduced or the version is bumped.
else 3.toShort
MessageUtil.toVersionPrefixedBytes(version, new OffsetCommitValue()
.setOffset(offsetAndMetadata.offset)
@ -1116,6 +1118,8 @@ object GroupMetadataManager {
if (metadataVersion.isLessThan(IBP_0_10_1_IV0)) 0.toShort
else if (metadataVersion.isLessThan(IBP_2_1_IV0)) 1.toShort
else if (metadataVersion.isLessThan(IBP_2_3_IV0)) 2.toShort
// Serialize with the highest supported non-flexible version
// until a tagged field is introduced or the version is bumped.
else 3.toShort
MessageUtil.toVersionPrefixedBytes(version, new GroupMetadataValue()

View File

@ -81,7 +81,9 @@ object TransactionLog {
.setPartitionIds(partitions.map(tp => Integer.valueOf(tp.partition)).toList.asJava)
}.toList.asJava
MessageUtil.toVersionPrefixedBytes(TransactionLogValue.HIGHEST_SUPPORTED_VERSION,
// Serialize with the highest supported non-flexible version
// until a tagged field is introduced or the version is bumped.
MessageUtil.toVersionPrefixedBytes(0,
new TransactionLogValue()
.setProducerId(txnMetadata.producerId)
.setProducerEpoch(txnMetadata.producerEpoch)

View File

@ -35,11 +35,14 @@ import org.apache.kafka.clients.consumer.internals.ConsumerProtocol
import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.metrics.{JmxReporter, KafkaMetricsContext, Metrics => kMetrics}
import org.apache.kafka.common.protocol.{Errors, MessageUtil}
import org.apache.kafka.common.protocol.types.Field.TaggedFieldsSection
import org.apache.kafka.common.protocol.types.{CompactArrayOf, Field, Schema, Struct, Type}
import org.apache.kafka.common.protocol.{ByteBufferAccessor, Errors, MessageUtil}
import org.apache.kafka.common.record._
import org.apache.kafka.common.requests.OffsetFetchResponse
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.coordinator.group.generated.{GroupMetadataValue, OffsetCommitValue}
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.MetadataVersion._
import org.apache.kafka.server.metrics.KafkaYammerMetrics
@ -2483,6 +2486,217 @@ class GroupMetadataManagerTest {
verifySerde(version)
}
@Test
def testSerializeGroupMetadataValueToHighestNonFlexibleVersion(): Unit = {
val generation = 935
val protocolType = "consumer"
val protocol = "range"
val memberId = "98098230493"
val assignmentBytes = Utils.toArray(ConsumerProtocol.serializeAssignment(
new ConsumerPartitionAssignor.Assignment(List(new TopicPartition("topic", 0)).asJava, null)
))
val record = TestUtils.records(Seq(
buildStableGroupRecordWithMember(generation, protocolType, protocol, memberId, assignmentBytes)
)).records.asScala.head
assertEquals(3, record.value.getShort)
}
@Test
def testSerializeOffsetCommitValueToHighestNonFlexibleVersion(): Unit = {
val committedOffsets = Map(
new TopicPartition("foo", 0) -> 23L,
new TopicPartition("foo", 1) -> 455L,
)
val offsetCommitRecords = createCommittedOffsetRecords(committedOffsets)
offsetCommitRecords.foreach { record =>
assertEquals(3, record.value.getShort)
}
}
@Test
def testDeserializeHighestSupportedGroupMetadataValueVersion(): Unit = {
val member = new GroupMetadataValue.MemberMetadata()
.setMemberId("member")
.setClientId("client")
.setClientHost("host")
val generation = 935
val protocolType = "consumer"
val protocol = "range"
val leader = "leader"
val groupMetadataValue = new GroupMetadataValue()
.setProtocolType(protocolType)
.setGeneration(generation)
.setProtocol(protocol)
.setLeader(leader)
.setMembers(java.util.Collections.singletonList(member))
val deserialized = GroupMetadataManager.readGroupMessageValue("groupId",
MessageUtil.toVersionPrefixedByteBuffer(4, groupMetadataValue), time)
assertEquals(generation, deserialized.generationId)
assertEquals(protocolType, deserialized.protocolType.get)
assertEquals(protocol, deserialized.protocolName.get)
assertEquals(leader, deserialized.leaderOrNull)
val actualMember = deserialized.allMemberMetadata.head
assertEquals(member.memberId, actualMember.memberId)
assertEquals(member.clientId, actualMember.clientId)
assertEquals(member.clientHost, actualMember.clientHost)
}
@Test
def testDeserializeHighestSupportedOffsetCommitValueVersion(): Unit = {
val offsetCommitValue = new OffsetCommitValue()
.setOffset(1000L)
.setMetadata("metadata")
.setCommitTimestamp(1500L)
.setLeaderEpoch(1)
val serialized = MessageUtil.toVersionPrefixedByteBuffer(4, offsetCommitValue)
val deserialized = GroupMetadataManager.readOffsetMessageValue(serialized)
assertEquals(1000L, deserialized.offset)
assertEquals("metadata", deserialized.metadata)
assertEquals(1500L, deserialized.commitTimestamp)
assertEquals(1, deserialized.leaderEpoch.get)
}
@Test
def testDeserializeFutureOffsetCommitValue(): Unit = {
// Copy of OffsetCommitValue.SCHEMA_4 with a few
// additional tagged fields.
val futureOffsetCommitSchema = new Schema(
new Field("offset", Type.INT64, ""),
new Field("leader_epoch", Type.INT32, ""),
new Field("metadata", Type.COMPACT_STRING, ""),
new Field("commit_timestamp", Type.INT64, ""),
TaggedFieldsSection.of(
Int.box(0), new Field("offset_foo", Type.STRING, ""),
Int.box(1), new Field("offset_bar", Type.INT32, "")
)
)
// Create OffsetCommitValue with tagged fields
val offsetCommit = new Struct(futureOffsetCommitSchema)
offsetCommit.set("offset", 1000L)
offsetCommit.set("leader_epoch", 100)
offsetCommit.set("metadata", "metadata")
offsetCommit.set("commit_timestamp", 2000L)
val offsetCommitTaggedFields = new java.util.TreeMap[Integer, Any]()
offsetCommitTaggedFields.put(0, "foo")
offsetCommitTaggedFields.put(1, 4000)
offsetCommit.set("_tagged_fields", offsetCommitTaggedFields)
// Prepare the buffer.
val buffer = ByteBuffer.allocate(offsetCommit.sizeOf() + 2)
buffer.put(0.toByte)
buffer.put(4.toByte) // Add 4 as version.
offsetCommit.writeTo(buffer)
buffer.flip()
// Read the buffer with the real schema and verify that tagged
// fields were read but ignored.
buffer.getShort() // Skip version.
val value = new OffsetCommitValue(new ByteBufferAccessor(buffer), 4.toShort)
assertEquals(Seq(0, 1), value.unknownTaggedFields().asScala.map(_.tag))
// Read the buffer with readOffsetMessageValue.
buffer.rewind()
val offsetAndMetadata = GroupMetadataManager.readOffsetMessageValue(buffer)
assertEquals(1000L, offsetAndMetadata.offset)
assertEquals(100, offsetAndMetadata.leaderEpoch.get)
assertEquals("metadata", offsetAndMetadata.metadata)
assertEquals(2000L, offsetAndMetadata.commitTimestamp)
}
@Test
def testDeserializeFutureGroupMetadataValue(): Unit = {
// Copy of GroupMetadataValue.MemberMetadata.SCHEMA_4 with a few
// additional tagged fields.
val futureMemberSchema = new Schema(
new Field("member_id", Type.COMPACT_STRING, ""),
new Field("group_instance_id", Type.COMPACT_NULLABLE_STRING, ""),
new Field("client_id", Type.COMPACT_STRING, ""),
new Field("client_host", Type.COMPACT_STRING, ""),
new Field("rebalance_timeout", Type.INT32, ""),
new Field("session_timeout", Type.INT32, ""),
new Field("subscription", Type.COMPACT_BYTES, ""),
new Field("assignment", Type.COMPACT_BYTES, ""),
TaggedFieldsSection.of(
Int.box(0), new Field("member_foo", Type.STRING, ""),
Int.box(1), new Field("member_foo", Type.INT32, "")
)
)
// Copy of GroupMetadataValue.SCHEMA_4 with a few
// additional tagged fields.
val futureGroupSchema = new Schema(
new Field("protocol_type", Type.COMPACT_STRING, ""),
new Field("generation", Type.INT32, ""),
new Field("protocol", Type.COMPACT_NULLABLE_STRING, ""),
new Field("leader", Type.COMPACT_NULLABLE_STRING, ""),
new Field("current_state_timestamp", Type.INT64, ""),
new Field("members", new CompactArrayOf(futureMemberSchema), ""),
TaggedFieldsSection.of(
Int.box(0), new Field("group_foo", Type.STRING, ""),
Int.box(1), new Field("group_bar", Type.INT32, "")
)
)
// Create a member with tagged fields.
val member = new Struct(futureMemberSchema)
member.set("member_id", "member_id")
member.set("group_instance_id", "group_instance_id")
member.set("client_id", "client_id")
member.set("client_host", "client_host")
member.set("rebalance_timeout", 1)
member.set("session_timeout", 2)
member.set("subscription", ByteBuffer.allocate(0))
member.set("assignment", ByteBuffer.allocate(0))
val memberTaggedFields = new java.util.TreeMap[Integer, Any]()
memberTaggedFields.put(0, "foo")
memberTaggedFields.put(1, 4000)
member.set("_tagged_fields", memberTaggedFields)
// Create a group with tagged fields.
val group = new Struct(futureGroupSchema)
group.set("protocol_type", "consumer")
group.set("generation", 10)
group.set("protocol", "range")
group.set("leader", "leader")
group.set("current_state_timestamp", 1000L)
group.set("members", Array(member))
val groupTaggedFields = new java.util.TreeMap[Integer, Any]()
groupTaggedFields.put(0, "foo")
groupTaggedFields.put(1, 4000)
group.set("_tagged_fields", groupTaggedFields)
// Prepare the buffer.
val buffer = ByteBuffer.allocate(group.sizeOf() + 2)
buffer.put(0.toByte)
buffer.put(4.toByte) // Add 4 as version.
group.writeTo(buffer)
buffer.flip()
// Read the buffer with the real schema and verify that tagged
// fields were read but ignored.
buffer.getShort() // Skip version.
val value = new GroupMetadataValue(new ByteBufferAccessor(buffer), 4.toShort)
assertEquals(Seq(0, 1), value.unknownTaggedFields().asScala.map(_.tag))
assertEquals(Seq(0, 1), value.members().get(0).unknownTaggedFields().asScala.map(_.tag))
// Read the buffer with readGroupMessageValue.
buffer.rewind()
val groupMetadata = GroupMetadataManager.readGroupMessageValue("group", buffer, time)
assertEquals("consumer", groupMetadata.protocolType.get)
assertEquals("leader", groupMetadata.leaderOrNull)
assertTrue(groupMetadata.allMembers.contains("member_id"))
}
@Test
def testLoadOffsetsWithEmptyControlBatch(): Unit = {
val groupMetadataTopicPartition = groupTopicPartition

View File

@ -18,14 +18,18 @@ package kafka.coordinator.transaction
import kafka.internals.generated.TransactionLogKey
import kafka.internals.generated.TransactionLogValue
import kafka.utils.TestUtils
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.protocol.MessageUtil
import org.apache.kafka.common.protocol.{ByteBufferAccessor, MessageUtil}
import org.apache.kafka.common.protocol.types.Field.TaggedFieldsSection
import org.apache.kafka.common.protocol.types.{CompactArrayOf, Field, Schema, Struct, Type}
import org.apache.kafka.common.record.{CompressionType, MemoryRecords, SimpleRecord}
import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows}
import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, assertTrue}
import org.junit.jupiter.api.Test
import java.nio.ByteBuffer
import scala.collection.Seq
import scala.jdk.CollectionConverters._
class TransactionLogTest {
@ -138,6 +142,121 @@ class TransactionLogTest {
assertEquals(Some("<DELETE>"), valueStringOpt)
}
@Test
def testSerializeTransactionLogValueToHighestNonFlexibleVersion(): Unit = {
val txnTransitMetadata = TxnTransitMetadata(1, 1, 1, 1, 1000, CompleteCommit, Set.empty, 500, 500)
val txnLogValueBuffer = ByteBuffer.wrap(TransactionLog.valueToBytes(txnTransitMetadata))
assertEquals(0, txnLogValueBuffer.getShort)
}
@Test
def testDeserializeHighestSupportedTransactionLogValue(): Unit = {
val txnPartitions = new TransactionLogValue.PartitionsSchema()
.setTopic("topic")
.setPartitionIds(java.util.Collections.singletonList(0))
val txnLogValue = new TransactionLogValue()
.setProducerId(100)
.setProducerEpoch(50.toShort)
.setTransactionStatus(CompleteCommit.id)
.setTransactionStartTimestampMs(750L)
.setTransactionLastUpdateTimestampMs(1000L)
.setTransactionTimeoutMs(500)
.setTransactionPartitions(java.util.Collections.singletonList(txnPartitions))
val serialized = MessageUtil.toVersionPrefixedByteBuffer(1, txnLogValue)
val deserialized = TransactionLog.readTxnRecordValue("transactionId", serialized).get
assertEquals(100, deserialized.producerId)
assertEquals(50, deserialized.producerEpoch)
assertEquals(CompleteCommit, deserialized.state)
assertEquals(750L, deserialized.txnStartTimestamp)
assertEquals(1000L, deserialized.txnLastUpdateTimestamp)
assertEquals(500, deserialized.txnTimeoutMs)
val actualTxnPartitions = deserialized.topicPartitions
assertEquals(1, actualTxnPartitions.size)
assertTrue(actualTxnPartitions.contains(new TopicPartition("topic", 0)))
}
@Test
def testDeserializeFutureTransactionLogValue(): Unit = {
// Copy of TransactionLogValue.PartitionsSchema.SCHEMA_1 with a few
// additional tagged fields.
val futurePartitionsSchema = new Schema(
new Field("topic", Type.COMPACT_STRING, ""),
new Field("partition_ids", new CompactArrayOf(Type.INT32), ""),
TaggedFieldsSection.of(
Int.box(0), new Field("partition_foo", Type.STRING, ""),
Int.box(1), new Field("partition_foo", Type.INT32, "")
)
)
// Create TransactionLogValue.PartitionsSchema with tagged fields
val txnPartitions = new Struct(futurePartitionsSchema)
txnPartitions.set("topic", "topic")
txnPartitions.set("partition_ids", Array(Integer.valueOf(1)))
val txnPartitionsTaggedFields = new java.util.TreeMap[Integer, Any]()
txnPartitionsTaggedFields.put(0, "foo")
txnPartitionsTaggedFields.put(1, 4000)
txnPartitions.set("_tagged_fields", txnPartitionsTaggedFields)
// Copy of TransactionLogValue.SCHEMA_1 with a few
// additional tagged fields.
val futureTransactionLogValueSchema = new Schema(
new Field("producer_id", Type.INT64, ""),
new Field("producer_epoch", Type.INT16, ""),
new Field("transaction_timeout_ms", Type.INT32, ""),
new Field("transaction_status", Type.INT8, ""),
new Field("transaction_partitions", CompactArrayOf.nullable(futurePartitionsSchema), ""),
new Field("transaction_last_update_timestamp_ms", Type.INT64, ""),
new Field("transaction_start_timestamp_ms", Type.INT64, ""),
TaggedFieldsSection.of(
Int.box(0), new Field("txn_foo", Type.STRING, ""),
Int.box(1), new Field("txn_bar", Type.INT32, "")
)
)
// Create TransactionLogValue with tagged fields
val transactionLogValue = new Struct(futureTransactionLogValueSchema)
transactionLogValue.set("producer_id", 1000L)
transactionLogValue.set("producer_epoch", 100.toShort)
transactionLogValue.set("transaction_timeout_ms", 1000)
transactionLogValue.set("transaction_status", CompleteCommit.id)
transactionLogValue.set("transaction_partitions", Array(txnPartitions))
transactionLogValue.set("transaction_last_update_timestamp_ms", 2000L)
transactionLogValue.set("transaction_start_timestamp_ms", 3000L)
val txnLogValueTaggedFields = new java.util.TreeMap[Integer, Any]()
txnLogValueTaggedFields.put(0, "foo")
txnLogValueTaggedFields.put(1, 4000)
transactionLogValue.set("_tagged_fields", txnLogValueTaggedFields)
// Prepare the buffer.
val buffer = ByteBuffer.allocate(transactionLogValue.sizeOf() + 2)
buffer.put(0.toByte)
buffer.put(1.toByte) // Add 1 as version.
transactionLogValue.writeTo(buffer)
buffer.flip()
// Read the buffer with the real schema and verify that tagged
// fields were read but ignored.
buffer.getShort() // Skip version.
val value = new TransactionLogValue(new ByteBufferAccessor(buffer), 1.toShort)
assertEquals(Seq(0, 1), value.unknownTaggedFields().asScala.map(_.tag))
assertEquals(Seq(0, 1), value.transactionPartitions().get(0).unknownTaggedFields().asScala.map(_.tag))
// Read the buffer with readTxnRecordValue.
buffer.rewind()
val txnMetadata = TransactionLog.readTxnRecordValue("transaction-id", buffer).get
assertEquals(1000L, txnMetadata.producerId)
assertEquals(100, txnMetadata.producerEpoch)
assertEquals(1000L, txnMetadata.txnTimeoutMs)
assertEquals(CompleteCommit, txnMetadata.state)
assertEquals(Set(new TopicPartition("topic", 1)), txnMetadata.topicPartitions)
assertEquals(2000L, txnMetadata.txnLastUpdateTimestamp)
assertEquals(3000L, txnMetadata.txnStartTimestamp)
}
@Test
def testReadTxnRecordKeyCanReadUnknownMessage(): Unit = {
val record = new TransactionLogKey()

View File

@ -16,8 +16,11 @@
{
"type": "data",
"name": "GroupMetadataValue",
"validVersions": "0-3",
"flexibleVersions": "none",
// Version 4 is the first flexible version.
// KIP-915: bumping the version will no longer make this record backward compatible.
// We suggest to add/remove only tagged fields to maintain backward compatibility.
"validVersions": "0-4",
"flexibleVersions": "4+",
"fields": [
{ "name": "protocolType", "versions": "0+", "type": "string"},
{ "name": "generation", "versions": "0+", "type": "int32" },
@ -29,7 +32,7 @@
"commonStructs": [
{
"name": "MemberMetadata",
"versions": "0-3",
"versions": "0+",
"fields": [
{ "name": "memberId", "versions": "0+", "type": "string" },
{ "name": "groupInstanceId", "versions": "3+", "type": "string", "default": "null", "nullableVersions": "3+", "ignorable": true},

View File

@ -16,8 +16,11 @@
{
"type": "data",
"name": "OffsetCommitValue",
"validVersions": "0-3",
"flexibleVersions": "none",
// Version 4 is the first flexible version.
// KIP-915: bumping the version will no longer make this record backward compatible.
// We suggest to add/remove only tagged fields to maintain backward compatibility.
"validVersions": "0-4",
"flexibleVersions": "4+",
"fields": [
{ "name": "offset", "type": "int64", "versions": "0+" },
{ "name": "leaderEpoch", "type": "int32", "versions": "3+", "default": -1, "ignorable": true},