MINOR: Add tests on TxnOffsetCommit and EndTxnMarker protection against invalid producer epoch when TV2 is used (#20024)
CI / build (push) Waiting to run Details

This patch adds an API level integration test for the producer epoch
verification when processing transactional offset commit and end txn
markers.

Reviewers: PoAn Yang <payang@apache.org>, TengYao Chi
 <kitingiao@gmail.com>, Sean Quah <squah@confluent.io>, Chia-Ping Tsai
 <chia7712@gmail.com>
This commit is contained in:
Dongnuo Lyu 2025-07-20 18:34:29 -04:00 committed by GitHub
parent 634e99e9ab
commit 50598191dc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 338 additions and 31 deletions

View File

@ -19,14 +19,16 @@ package kafka.server
import kafka.network.SocketServer
import kafka.utils.TestUtils
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata}
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.{TopicCollection, TopicIdPartition, TopicPartition, Uuid}
import org.apache.kafka.common.message.DeleteGroupsResponseData.{DeletableGroupResult, DeletableGroupResultCollection}
import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity
import org.apache.kafka.common.message.LeaveGroupResponseData.MemberResponse
import org.apache.kafka.common.message.SyncGroupRequestData.SyncGroupRequestAssignment
import org.apache.kafka.common.message.{AddOffsetsToTxnRequestData, AddOffsetsToTxnResponseData, ConsumerGroupDescribeRequestData, ConsumerGroupDescribeResponseData, ConsumerGroupHeartbeatRequestData, ConsumerGroupHeartbeatResponseData, DeleteGroupsRequestData, DeleteGroupsResponseData, DescribeGroupsRequestData, DescribeGroupsResponseData, EndTxnRequestData, HeartbeatRequestData, HeartbeatResponseData, InitProducerIdRequestData, JoinGroupRequestData, JoinGroupResponseData, LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteRequestData, OffsetDeleteResponseData, OffsetFetchRequestData, OffsetFetchResponseData, ShareGroupDescribeRequestData, ShareGroupDescribeResponseData, ShareGroupHeartbeatRequestData, ShareGroupHeartbeatResponseData, SyncGroupRequestData, SyncGroupResponseData, TxnOffsetCommitRequestData, TxnOffsetCommitResponseData}
import org.apache.kafka.common.message.WriteTxnMarkersRequestData.{WritableTxnMarker, WritableTxnMarkerTopic}
import org.apache.kafka.common.message.{AddOffsetsToTxnRequestData, AddOffsetsToTxnResponseData, ConsumerGroupDescribeRequestData, ConsumerGroupDescribeResponseData, ConsumerGroupHeartbeatRequestData, ConsumerGroupHeartbeatResponseData, DeleteGroupsRequestData, DeleteGroupsResponseData, DescribeGroupsRequestData, DescribeGroupsResponseData, EndTxnRequestData, HeartbeatRequestData, HeartbeatResponseData, InitProducerIdRequestData, JoinGroupRequestData, JoinGroupResponseData, LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteRequestData, OffsetDeleteResponseData, OffsetFetchRequestData, OffsetFetchResponseData, ShareGroupDescribeRequestData, ShareGroupDescribeResponseData, ShareGroupHeartbeatRequestData, ShareGroupHeartbeatResponseData, SyncGroupRequestData, SyncGroupResponseData, TxnOffsetCommitRequestData, TxnOffsetCommitResponseData, WriteTxnMarkersRequestData}
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, AddOffsetsToTxnRequest, AddOffsetsToTxnResponse, ConsumerGroupDescribeRequest, ConsumerGroupDescribeResponse, ConsumerGroupHeartbeatRequest, ConsumerGroupHeartbeatResponse, DeleteGroupsRequest, DeleteGroupsResponse, DescribeGroupsRequest, DescribeGroupsResponse, EndTxnRequest, EndTxnResponse, HeartbeatRequest, HeartbeatResponse, InitProducerIdRequest, InitProducerIdResponse, JoinGroupRequest, JoinGroupResponse, LeaveGroupRequest, LeaveGroupResponse, ListGroupsRequest, ListGroupsResponse, OffsetCommitRequest, OffsetCommitResponse, OffsetDeleteRequest, OffsetDeleteResponse, OffsetFetchRequest, OffsetFetchResponse, ShareGroupDescribeRequest, ShareGroupDescribeResponse, ShareGroupHeartbeatRequest, ShareGroupHeartbeatResponse, SyncGroupRequest, SyncGroupResponse, TxnOffsetCommitRequest, TxnOffsetCommitResponse}
import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, AddOffsetsToTxnRequest, AddOffsetsToTxnResponse, ConsumerGroupDescribeRequest, ConsumerGroupDescribeResponse, ConsumerGroupHeartbeatRequest, ConsumerGroupHeartbeatResponse, DeleteGroupsRequest, DeleteGroupsResponse, DescribeGroupsRequest, DescribeGroupsResponse, EndTxnRequest, EndTxnResponse, HeartbeatRequest, HeartbeatResponse, InitProducerIdRequest, InitProducerIdResponse, JoinGroupRequest, JoinGroupResponse, LeaveGroupRequest, LeaveGroupResponse, ListGroupsRequest, ListGroupsResponse, OffsetCommitRequest, OffsetCommitResponse, OffsetDeleteRequest, OffsetDeleteResponse, OffsetFetchRequest, OffsetFetchResponse, ShareGroupDescribeRequest, ShareGroupDescribeResponse, ShareGroupHeartbeatRequest, ShareGroupHeartbeatResponse, SyncGroupRequest, SyncGroupResponse, TxnOffsetCommitRequest, TxnOffsetCommitResponse, WriteTxnMarkersRequest, WriteTxnMarkersResponse}
import org.apache.kafka.common.serialization.StringSerializer
import org.apache.kafka.common.test.ClusterInstance
import org.apache.kafka.common.utils.ProducerIdAndEpoch
@ -352,6 +354,35 @@ class GroupCoordinatorBaseRequestTest(cluster: ClusterInstance) {
assertEquals(expectedError, connectAndReceive[EndTxnResponse](request).error)
}
protected def writeTxnMarkers(
producerId: Long,
producerEpoch: Short,
committed: Boolean,
expectedError: Errors = Errors.NONE,
version: Short = ApiKeys.WRITE_TXN_MARKERS.latestVersion(isUnstableApiEnabled)
): Unit = {
val request = new WriteTxnMarkersRequest.Builder(
new WriteTxnMarkersRequestData()
.setMarkers(List(
new WritableTxnMarker()
.setProducerId(producerId)
.setProducerEpoch(producerEpoch)
.setTransactionResult(committed)
.setTopics(List(
new WritableTxnMarkerTopic()
.setName(Topic.GROUP_METADATA_TOPIC_NAME)
.setPartitionIndexes(List[Integer](0).asJava)
).asJava)
.setCoordinatorEpoch(0)
).asJava)
).build(version)
assertEquals(
expectedError.code,
connectAndReceive[WriteTxnMarkersResponse](request).data.markers.get(0).topics.get(0).partitions.get(0).errorCode
)
}
protected def fetchOffsets(
groups: List[OffsetFetchRequestData.OffsetFetchRequestGroup],
requireStable: Boolean,
@ -422,6 +453,27 @@ class GroupCoordinatorBaseRequestTest(cluster: ClusterInstance) {
groupResponse
}
protected def fetchOffset(
groupId: String,
topic: String,
partition: Int
): Long = {
val groupIdRecord = fetchOffsets(
group = new OffsetFetchRequestData.OffsetFetchRequestGroup()
.setGroupId(groupId)
.setTopics(List(
new OffsetFetchRequestData.OffsetFetchRequestTopics()
.setName(topic)
.setPartitionIndexes(List[Integer](partition).asJava)
).asJava),
requireStable = true,
version = 9
)
val topicRecord = groupIdRecord.topics.asScala.find(_.name == topic).head
val partitionRecord = topicRecord.partitions.asScala.find(_.partitionIndex == partition).head
partitionRecord.committedOffset
}
protected def deleteOffset(
groupId: String,
topic: String,

View File

@ -16,19 +16,16 @@
*/
package kafka.server
import org.apache.kafka.common.test.api.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, Type}
import kafka.utils.TestUtils
import org.apache.kafka.common.errors.UnsupportedVersionException
import org.apache.kafka.common.message.OffsetFetchRequestData
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.JoinGroupRequest
import org.apache.kafka.common.requests.{EndTxnRequest, JoinGroupRequest}
import org.apache.kafka.common.test.ClusterInstance
import org.apache.kafka.common.test.api.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, Type}
import org.apache.kafka.common.utils.ProducerIdAndEpoch
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.junit.jupiter.api.Assertions.{assertThrows, assertTrue}
import scala.jdk.CollectionConverters._
import org.junit.jupiter.api.Assertions.{assertNotEquals, assertThrows}
@ClusterTestDefaults(
types = Array(Type.KRAFT),
@ -51,6 +48,16 @@ class TxnOffsetCommitRequestTest(cluster:ClusterInstance) extends GroupCoordinat
testTxnOffsetCommit(false)
}
@ClusterTest
def testDelayedTxnOffsetCommitWithBumpedEpochIsRejectedWithNewConsumerGroupProtocol(): Unit = {
testDelayedTxnOffsetCommitWithBumpedEpochIsRejected(true)
}
@ClusterTest
def testDelayedTxnOffsetCommitWithBumpedEpochIsRejectedWithOldConsumerGroupProtocol(): Unit = {
testDelayedTxnOffsetCommitWithBumpedEpochIsRejected(false)
}
private def testTxnOffsetCommit(useNewProtocol: Boolean): Unit = {
val topic = "topic"
val partition = 0
@ -65,8 +72,8 @@ class TxnOffsetCommitRequestTest(cluster:ClusterInstance) extends GroupCoordinat
// Join the consumer group. Note that we don't heartbeat here so we must use
// a session long enough for the duration of the test.
val (memberId: String, memberEpoch: Int) = joinConsumerGroup(groupId, useNewProtocol)
assertTrue(memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID)
assertTrue(memberEpoch != JoinGroupRequest.UNKNOWN_GENERATION_ID)
assertNotEquals(JoinGroupRequest.UNKNOWN_MEMBER_ID, memberId)
assertNotEquals(JoinGroupRequest.UNKNOWN_GENERATION_ID, memberEpoch)
createTopic(topic, 1)
@ -178,7 +185,7 @@ class TxnOffsetCommitRequestTest(cluster:ClusterInstance) extends GroupCoordinat
transactionalId = transactionalId
)
val originalOffset = fetchOffset(topic, partition, groupId)
val originalOffset = fetchOffset(groupId, topic, partition)
commitTxnOffset(
groupId = groupId,
@ -207,31 +214,107 @@ class TxnOffsetCommitRequestTest(cluster:ClusterInstance) extends GroupCoordinat
TestUtils.waitUntilTrue(() =>
try {
fetchOffset(topic, partition, groupId) == expectedOffset
fetchOffset(groupId, topic, partition) == expectedOffset
} catch {
case _: Throwable => false
}, "txn commit offset validation failed"
)
}
private def fetchOffset(
topic: String,
partition: Int,
groupId: String
): Long = {
val groupIdRecord = fetchOffsets(
group = new OffsetFetchRequestData.OffsetFetchRequestGroup()
.setGroupId(groupId)
.setTopics(List(
new OffsetFetchRequestData.OffsetFetchRequestTopics()
.setName(topic)
.setPartitionIndexes(List[Integer](partition).asJava)
).asJava),
requireStable = true,
version = 9
)
val topicRecord = groupIdRecord.topics.asScala.find(_.name == topic).head
val partitionRecord = topicRecord.partitions.asScala.find(_.partitionIndex == partition).head
partitionRecord.committedOffset
private def testDelayedTxnOffsetCommitWithBumpedEpochIsRejected(useNewProtocol: Boolean): Unit = {
val topic = "topic"
val partition = 0
val transactionalId = "txn"
val groupId = "group"
val offset = 100L
// Creates the __consumer_offsets and __transaction_state topics because it won't be created automatically
// in this test because it does not use FindCoordinator API.
createOffsetsTopic()
createTransactionStateTopic()
// Join the consumer group. Note that we don't heartbeat here so we must use
// a session long enough for the duration of the test.
val (memberId: String, memberEpoch: Int) = joinConsumerGroup(groupId, useNewProtocol)
assertNotEquals(JoinGroupRequest.UNKNOWN_MEMBER_ID, memberId)
assertNotEquals(JoinGroupRequest.UNKNOWN_GENERATION_ID, memberEpoch)
createTopic(topic, 1)
for (version <- ApiKeys.TXN_OFFSET_COMMIT.oldestVersion to ApiKeys.TXN_OFFSET_COMMIT.latestVersion(isUnstableApiEnabled)) {
val useTV2 = version > EndTxnRequest.LAST_STABLE_VERSION_BEFORE_TRANSACTION_V2
// Initialize producer. Wait until the coordinator finishes loading.
var producerIdAndEpoch: ProducerIdAndEpoch = null
TestUtils.waitUntilTrue(() =>
try {
producerIdAndEpoch = initProducerId(
transactionalId = transactionalId,
producerIdAndEpoch = ProducerIdAndEpoch.NONE,
expectedError = Errors.NONE
)
true
} catch {
case _: Throwable => false
}, "initProducerId request failed"
)
addOffsetsToTxn(
groupId = groupId,
producerId = producerIdAndEpoch.producerId,
producerEpoch = producerIdAndEpoch.epoch,
transactionalId = transactionalId
)
// Complete the transaction.
endTxn(
producerId = producerIdAndEpoch.producerId,
producerEpoch = producerIdAndEpoch.epoch,
transactionalId = transactionalId,
isTransactionV2Enabled = useTV2,
committed = true,
expectedError = Errors.NONE
)
// Start a new transaction. Wait for the previous transaction to complete.
TestUtils.waitUntilTrue(() =>
try {
addOffsetsToTxn(
groupId = groupId,
producerId = producerIdAndEpoch.producerId,
producerEpoch = if (useTV2) (producerIdAndEpoch.epoch + 1).toShort else producerIdAndEpoch.epoch,
transactionalId = transactionalId
)
true
} catch {
case _: Throwable => false
}, "addOffsetsToTxn request failed"
)
// Committing offset with old epoch succeeds for TV1 and fails for TV2.
commitTxnOffset(
groupId = groupId,
memberId = if (version >= 3) memberId else JoinGroupRequest.UNKNOWN_MEMBER_ID,
generationId = if (version >= 3) 1 else JoinGroupRequest.UNKNOWN_GENERATION_ID,
producerId = producerIdAndEpoch.producerId,
producerEpoch = producerIdAndEpoch.epoch,
transactionalId = transactionalId,
topic = topic,
partition = partition,
offset = offset,
expectedError = if (useTV2) Errors.INVALID_PRODUCER_EPOCH else Errors.NONE,
version = version.toShort
)
// Complete the transaction.
endTxn(
producerId = producerIdAndEpoch.producerId,
producerEpoch = if (useTV2) (producerIdAndEpoch.epoch + 1).toShort else producerIdAndEpoch.epoch,
transactionalId = transactionalId,
isTransactionV2Enabled = useTV2,
committed = true,
expectedError = Errors.NONE
)
}
}
}

View File

@ -0,0 +1,172 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server
import kafka.utils.TestUtils
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.{EndTxnRequest, JoinGroupRequest}
import org.apache.kafka.common.test.ClusterInstance
import org.apache.kafka.common.test.api.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, Type}
import org.apache.kafka.common.utils.ProducerIdAndEpoch
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.junit.jupiter.api.Assertions.assertNotEquals
@ClusterTestDefaults(
types = Array(Type.KRAFT),
serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
new ClusterConfigProperty(key = TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key = TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
)
)
class WriteTxnMarkersRequestTest(cluster:ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) {
@ClusterTest
def testDelayedWriteTxnMarkersShouldNotCommitTxnOffsetWithNewConsumerGroupProtocol(): Unit = {
testDelayedWriteTxnMarkersShouldNotCommitTxnOffset(true)
}
@ClusterTest
def testDelayedWriteTxnMarkersShouldNotCommitTxnOffsetWithOldConsumerGroupProtocol(): Unit = {
testDelayedWriteTxnMarkersShouldNotCommitTxnOffset(false)
}
private def testDelayedWriteTxnMarkersShouldNotCommitTxnOffset(useNewProtocol: Boolean): Unit = {
val topic = "topic"
val partition = 0
val transactionalId = "txn"
val groupId = "group"
val offset = 100L
// Creates the __consumer_offsets and __transaction_state topics because it won't be created automatically
// in this test because it does not use FindCoordinator API.
createOffsetsTopic()
createTransactionStateTopic()
// Join the consumer group. Note that we don't heartbeat here so we must use
// a session long enough for the duration of the test.
val (memberId: String, memberEpoch: Int) = joinConsumerGroup(groupId, useNewProtocol)
assertNotEquals(JoinGroupRequest.UNKNOWN_MEMBER_ID, memberId)
assertNotEquals(JoinGroupRequest.UNKNOWN_GENERATION_ID, memberEpoch)
createTopic(topic, 1)
for (version <- ApiKeys.TXN_OFFSET_COMMIT.oldestVersion to ApiKeys.TXN_OFFSET_COMMIT.latestVersion(isUnstableApiEnabled)) {
val useTV2 = version > EndTxnRequest.LAST_STABLE_VERSION_BEFORE_TRANSACTION_V2
// Initialize producer. Wait until the coordinator finishes loading.
var producerIdAndEpoch: ProducerIdAndEpoch = null
TestUtils.waitUntilTrue(() =>
try {
producerIdAndEpoch = initProducerId(
transactionalId = transactionalId,
producerIdAndEpoch = ProducerIdAndEpoch.NONE,
expectedError = Errors.NONE
)
true
} catch {
case _: Throwable => false
}, "initProducerId request failed"
)
addOffsetsToTxn(
groupId = groupId,
producerId = producerIdAndEpoch.producerId,
producerEpoch = producerIdAndEpoch.epoch,
transactionalId = transactionalId
)
// Complete the transaction.
endTxn(
producerId = producerIdAndEpoch.producerId,
producerEpoch = producerIdAndEpoch.epoch,
transactionalId = transactionalId,
isTransactionV2Enabled = useTV2,
committed = true,
expectedError = Errors.NONE
)
// Start a new transaction. Wait for the previous transaction to complete.
TestUtils.waitUntilTrue(() =>
try {
addOffsetsToTxn(
groupId = groupId,
producerId = producerIdAndEpoch.producerId,
producerEpoch = if (useTV2) (producerIdAndEpoch.epoch + 1).toShort else producerIdAndEpoch.epoch,
transactionalId = transactionalId
)
true
} catch {
case _: Throwable => false
}, "addOffsetsToTxn request failed"
)
commitTxnOffset(
groupId = groupId,
memberId = if (version >= 3) memberId else JoinGroupRequest.UNKNOWN_MEMBER_ID,
generationId = if (version >= 3) 1 else JoinGroupRequest.UNKNOWN_GENERATION_ID,
producerId = producerIdAndEpoch.producerId,
producerEpoch = if (useTV2) (producerIdAndEpoch.epoch + 1).toShort else producerIdAndEpoch.epoch,
transactionalId = transactionalId,
topic = topic,
partition = partition,
offset = offset + version,
expectedError = Errors.NONE,
version = version.toShort
)
// Delayed txn marker should be accepted for TV1 and rejected for TV2.
// Note that for the ideal case, producer epoch + 1 should also be rejected for TV2,
// which is still under fixing.
writeTxnMarkers(
producerId = producerIdAndEpoch.producerId,
producerEpoch = producerIdAndEpoch.epoch,
committed = true,
expectedError = if (useTV2) Errors.INVALID_PRODUCER_EPOCH else Errors.NONE
)
// The offset is committed for TV1 and not committed for TV2.
TestUtils.waitUntilTrue(() =>
try {
fetchOffset(groupId, topic, partition) == (if (useTV2) -1L else offset + version)
} catch {
case _: Throwable => false
}, "unexpected txn commit offset"
)
// Complete the transaction.
endTxn(
producerId = producerIdAndEpoch.producerId,
producerEpoch = if (useTV2) (producerIdAndEpoch.epoch + 1).toShort else producerIdAndEpoch.epoch,
transactionalId = transactionalId,
isTransactionV2Enabled = useTV2,
committed = true,
expectedError = Errors.NONE
)
// The offset is committed for TV2.
TestUtils.waitUntilTrue(() =>
try {
fetchOffset(groupId, topic, partition) == offset + version
} catch {
case _: Throwable => false
}, "txn commit offset validation failed"
)
}
}
}