diff --git a/clients/src/test/java/org/apache/kafka/common/requests/TxnOffsetCommitRequestTest.java b/clients/src/test/java/org/apache/kafka/common/requests/TxnOffsetCommitRequestTest.java index abb27b2a1fd..ce314d86b96 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/TxnOffsetCommitRequestTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/TxnOffsetCommitRequestTest.java @@ -17,6 +17,7 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.message.TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition; import org.apache.kafka.common.message.TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic; import org.apache.kafka.common.message.TxnOffsetCommitResponseData; @@ -35,7 +36,9 @@ import java.util.Map; import java.util.Optional; import static org.apache.kafka.common.requests.TxnOffsetCommitRequest.getErrorResponse; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; public class TxnOffsetCommitRequestTest extends OffsetCommitRequestTest { @@ -151,4 +154,18 @@ public class TxnOffsetCommitRequestTest extends OffsetCommitRequestTest { assertEquals(expectedResponse, getErrorResponse(builderWithGroupMetadata.data, Errors.UNKNOWN_MEMBER_ID)); } + + @Test + public void testVersionSupportForGroupMetadata() { + for (short version : ApiKeys.TXN_OFFSET_COMMIT.allVersions()) { + assertDoesNotThrow(() -> builder.build(version)); + if (version >= 3) { + assertDoesNotThrow(() -> builderWithGroupMetadata.build(version)); + } else { + assertEquals("Broker doesn't support group metadata commit API on version " + version + + ", minimum supported request version is 3 which requires brokers to be on version 2.5 or above.", + assertThrows(UnsupportedVersionException.class, () -> builderWithGroupMetadata.build(version)).getMessage()); + } + } + } } diff --git a/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala b/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala index b232ccd73b2..3b375c879c4 100644 --- a/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala @@ -25,10 +25,11 @@ import org.apache.kafka.common.message.DeleteGroupsResponseData.{DeletableGroupR import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity import org.apache.kafka.common.message.LeaveGroupResponseData.MemberResponse import org.apache.kafka.common.message.SyncGroupRequestData.SyncGroupRequestAssignment -import org.apache.kafka.common.message.{ConsumerGroupDescribeRequestData, ConsumerGroupDescribeResponseData, ConsumerGroupHeartbeatRequestData, ConsumerGroupHeartbeatResponseData, DeleteGroupsRequestData, DeleteGroupsResponseData, DescribeGroupsRequestData, DescribeGroupsResponseData, HeartbeatRequestData, HeartbeatResponseData, JoinGroupRequestData, JoinGroupResponseData, LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteRequestData, OffsetDeleteResponseData, OffsetFetchResponseData, ShareGroupDescribeRequestData, ShareGroupDescribeResponseData, ShareGroupHeartbeatRequestData, ShareGroupHeartbeatResponseData, SyncGroupRequestData, SyncGroupResponseData} +import org.apache.kafka.common.message.{AddOffsetsToTxnRequestData, AddOffsetsToTxnResponseData, ConsumerGroupDescribeRequestData, ConsumerGroupDescribeResponseData, ConsumerGroupHeartbeatRequestData, ConsumerGroupHeartbeatResponseData, DeleteGroupsRequestData, DeleteGroupsResponseData, DescribeGroupsRequestData, DescribeGroupsResponseData, EndTxnRequestData, HeartbeatRequestData, HeartbeatResponseData, InitProducerIdRequestData, JoinGroupRequestData, JoinGroupResponseData, LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteRequestData, OffsetDeleteResponseData, OffsetFetchResponseData, ShareGroupDescribeRequestData, ShareGroupDescribeResponseData, ShareGroupHeartbeatRequestData, ShareGroupHeartbeatResponseData, SyncGroupRequestData, SyncGroupResponseData, TxnOffsetCommitRequestData, TxnOffsetCommitResponseData} import org.apache.kafka.common.protocol.{ApiKeys, Errors} -import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, ConsumerGroupDescribeRequest, ConsumerGroupDescribeResponse, ConsumerGroupHeartbeatRequest, ConsumerGroupHeartbeatResponse, DeleteGroupsRequest, DeleteGroupsResponse, DescribeGroupsRequest, DescribeGroupsResponse, HeartbeatRequest, HeartbeatResponse, JoinGroupRequest, JoinGroupResponse, LeaveGroupRequest, LeaveGroupResponse, ListGroupsRequest, ListGroupsResponse, OffsetCommitRequest, OffsetCommitResponse, OffsetDeleteRequest, OffsetDeleteResponse, OffsetFetchRequest, OffsetFetchResponse, ShareGroupDescribeRequest, ShareGroupDescribeResponse, ShareGroupHeartbeatRequest, ShareGroupHeartbeatResponse, SyncGroupRequest, SyncGroupResponse} +import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, AddOffsetsToTxnRequest, AddOffsetsToTxnResponse, ConsumerGroupDescribeRequest, ConsumerGroupDescribeResponse, ConsumerGroupHeartbeatRequest, ConsumerGroupHeartbeatResponse, DeleteGroupsRequest, DeleteGroupsResponse, DescribeGroupsRequest, DescribeGroupsResponse, EndTxnRequest, EndTxnResponse, HeartbeatRequest, HeartbeatResponse, InitProducerIdRequest, InitProducerIdResponse, JoinGroupRequest, JoinGroupResponse, LeaveGroupRequest, LeaveGroupResponse, ListGroupsRequest, ListGroupsResponse, OffsetCommitRequest, OffsetCommitResponse, OffsetDeleteRequest, OffsetDeleteResponse, OffsetFetchRequest, OffsetFetchResponse, ShareGroupDescribeRequest, ShareGroupDescribeResponse, ShareGroupHeartbeatRequest, ShareGroupHeartbeatResponse, SyncGroupRequest, SyncGroupResponse, TxnOffsetCommitRequest, TxnOffsetCommitResponse} import org.apache.kafka.common.serialization.StringSerializer +import org.apache.kafka.common.utils.ProducerIdAndEpoch import org.apache.kafka.controller.ControllerRequestContextUtil.ANONYMOUS_CONTEXT import org.junit.jupiter.api.Assertions.{assertEquals, fail} @@ -59,6 +60,19 @@ class GroupCoordinatorBaseRequestTest(cluster: ClusterInstance) { } } + protected def createTransactionStateTopic(): Unit = { + val admin = cluster.admin() + try { + TestUtils.createTransactionStateTopicWithAdmin( + admin = admin, + brokers = brokers(), + controllers = controllerServers() + ) + } finally { + admin.close() + } + } + protected def createTopic( topic: String, numPartitions: Int @@ -194,6 +208,114 @@ class GroupCoordinatorBaseRequestTest(cluster: ClusterInstance) { assertEquals(expectedResponse, response.data) } + protected def commitTxnOffset( + groupId: String, + memberId: String, + generationId: Int, + producerId: Long, + producerEpoch: Short, + transactionalId: String, + topic: String, + partition: Int, + offset: Long, + expectedError: Errors, + version: Short = ApiKeys.TXN_OFFSET_COMMIT.latestVersion(isUnstableApiEnabled) + ): Unit = { + val request = new TxnOffsetCommitRequest.Builder( + new TxnOffsetCommitRequestData() + .setGroupId(groupId) + .setMemberId(memberId) + .setGenerationId(generationId) + .setProducerId(producerId) + .setProducerEpoch(producerEpoch) + .setTransactionalId(transactionalId) + .setTopics(List( + new TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic() + .setName(topic) + .setPartitions(List( + new TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition() + .setPartitionIndex(partition) + .setCommittedOffset(offset) + ).asJava) + ).asJava) + ).build(version) + + val expectedResponse = new TxnOffsetCommitResponseData() + .setTopics(List( + new TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic() + .setName(topic) + .setPartitions(List( + new TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition() + .setPartitionIndex(partition) + .setErrorCode(expectedError.code) + ).asJava) + ).asJava) + + val response = connectAndReceive[TxnOffsetCommitResponse](request) + assertEquals(expectedResponse, response.data) + } + + protected def addOffsetsToTxn( + groupId: String, + producerId: Long, + producerEpoch: Short, + transactionalId: String, + version: Short = ApiKeys.ADD_OFFSETS_TO_TXN.latestVersion(isUnstableApiEnabled) + ): Unit = { + val request = new AddOffsetsToTxnRequest.Builder( + new AddOffsetsToTxnRequestData() + .setTransactionalId(transactionalId) + .setProducerId(producerId) + .setProducerEpoch(producerEpoch) + .setGroupId(groupId) + ).build(version) + + val response = connectAndReceive[AddOffsetsToTxnResponse](request) + assertEquals(new AddOffsetsToTxnResponseData(), response.data) + } + + protected def initProducerId( + transactionalId: String, + transactionTimeoutMs: Int = 60000, + producerIdAndEpoch: ProducerIdAndEpoch, + expectedError: Errors, + version: Short = ApiKeys.INIT_PRODUCER_ID.latestVersion(isUnstableApiEnabled) + ): ProducerIdAndEpoch = { + val request = new InitProducerIdRequest.Builder( + new InitProducerIdRequestData() + .setTransactionalId(transactionalId) + .setTransactionTimeoutMs(transactionTimeoutMs) + .setProducerId(producerIdAndEpoch.producerId) + .setProducerEpoch(producerIdAndEpoch.epoch)) + .build(version) + + val response = connectAndReceive[InitProducerIdResponse](request).data + assertEquals(expectedError.code, response.errorCode) + new ProducerIdAndEpoch(response.producerId, response.producerEpoch) + } + + protected def endTxn( + producerId: Long, + producerEpoch: Short, + transactionalId: String, + isTransactionV2Enabled: Boolean, + committed: Boolean, + expectedError: Errors, + version: Short = ApiKeys.END_TXN.latestVersion(isUnstableApiEnabled) + ): Unit = { + val request = new EndTxnRequest.Builder( + new EndTxnRequestData() + .setProducerId(producerId) + .setProducerEpoch(producerEpoch) + .setTransactionalId(transactionalId) + .setCommitted(committed), + isUnstableApiEnabled, + isTransactionV2Enabled + ).build(version) + + assertEquals(expectedError, connectAndReceive[EndTxnResponse](request).error) + } + protected def fetchOffsets( groupId: String, memberId: String, diff --git a/core/src/test/scala/unit/kafka/server/TxnOffsetCommitRequestTest.scala b/core/src/test/scala/unit/kafka/server/TxnOffsetCommitRequestTest.scala new file mode 100644 index 00000000000..57277f3dbed --- /dev/null +++ b/core/src/test/scala/unit/kafka/server/TxnOffsetCommitRequestTest.scala @@ -0,0 +1,245 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server + +import org.apache.kafka.common.test.api.{ClusterConfigProperty, ClusterInstance, ClusterTest, ClusterTestDefaults, ClusterTestExtensions, Type} +import kafka.utils.TestUtils +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.errors.UnsupportedVersionException +import org.apache.kafka.common.protocol.{ApiKeys, Errors} +import org.apache.kafka.common.requests.JoinGroupRequest +import org.apache.kafka.common.utils.ProducerIdAndEpoch +import org.apache.kafka.coordinator.group.GroupCoordinatorConfig +import org.apache.kafka.coordinator.transaction.TransactionLogConfig +import org.junit.jupiter.api.Assertions.{assertThrows, assertTrue, fail} +import org.junit.jupiter.api.extension.ExtendWith + +import scala.jdk.CollectionConverters.IterableHasAsScala + +@ExtendWith(value = Array(classOf[ClusterTestExtensions])) +@ClusterTestDefaults(types = Array(Type.KRAFT), serverProperties = Array( + new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"), + new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"), + new ClusterConfigProperty(key = TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, value = "1"), + new ClusterConfigProperty(key = TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"), + ) +) +class TxnOffsetCommitRequestTest(cluster:ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) { + + @ClusterTest + def testTxnOffsetCommitWithNewConsumerGroupProtocolAndNewGroupCoordinator(): Unit = { + testTxnOffsetCommit(true) + } + + @ClusterTest + def testTxnOffsetCommitWithOldConsumerGroupProtocolAndNewGroupCoordinator(): Unit = { + testTxnOffsetCommit(false) + } + + @ClusterTest( + serverProperties = Array( + new ClusterConfigProperty(key = GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "false"), + new ClusterConfigProperty(key = GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic"), + ) + ) + def testTxnOffsetCommitWithOldConsumerGroupProtocolAndOldGroupCoordinator(): Unit = { + testTxnOffsetCommit(false) + } + + private def testTxnOffsetCommit(useNewProtocol: Boolean): Unit = { + if (useNewProtocol && !isNewGroupCoordinatorEnabled) { + fail("Cannot use the new protocol with the old group coordinator.") + } + + val topic = "topic" + val partition = 0 + val transactionalId = "txn" + val groupId = "group" + + // Creates the __consumer_offsets and __transaction_state topics because it won't be created automatically + // in this test because it does not use FindCoordinator API. + createOffsetsTopic() + createTransactionStateTopic() + + // Join the consumer group. Note that we don't heartbeat here so we must use + // a session long enough for the duration of the test. + val (memberId: String, memberEpoch: Int) = joinConsumerGroup(groupId, useNewProtocol) + assertTrue(memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID) + assertTrue(memberEpoch != JoinGroupRequest.UNKNOWN_GENERATION_ID) + + createTopic(topic, 1) + + for (version <- 0 to ApiKeys.TXN_OFFSET_COMMIT.latestVersion(isUnstableApiEnabled)) { + // Verify that the TXN_OFFSET_COMMIT request is processed correctly when member id is UNKNOWN_MEMBER_ID + // and generation id is UNKNOWN_GENERATION_ID under all api versions. + verifyTxnCommitAndFetch( + topic = topic, + partition = partition, + transactionalId = transactionalId, + groupId = groupId, + memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID, + generationId = JoinGroupRequest.UNKNOWN_GENERATION_ID, + offset = 100 + version, + version = version.toShort, + expectedTxnCommitError = Errors.NONE + ) + + if (version >= 3) { + // Verify that the TXN_OFFSET_COMMIT request is processed correctly when the member ID + // and generation ID are known. This validation starts from version 3, as the member ID + // must not be empty from version 3 onwards. + verifyTxnCommitAndFetch( + topic = topic, + partition = partition, + transactionalId = transactionalId, + groupId = groupId, + memberId = memberId, + generationId = memberEpoch, + offset = 200 + version, + version = version.toShort, + expectedTxnCommitError = Errors.NONE + ) + + // Verify TXN_OFFSET_COMMIT request failed with incorrect memberId. + verifyTxnCommitAndFetch( + topic = topic, + partition = partition, + transactionalId = transactionalId, + groupId = groupId, + memberId = "non-exist", + generationId = memberEpoch, + offset = 200 + version, + version = version.toShort, + expectedTxnCommitError = Errors.UNKNOWN_MEMBER_ID + ) + + // Verify TXN_OFFSET_COMMIT request failed with incorrect generationId. + verifyTxnCommitAndFetch( + topic = topic, + partition = partition, + transactionalId = transactionalId, + groupId = groupId, + memberId = memberId, + generationId = 100, + offset = 200 + version, + version = version.toShort, + expectedTxnCommitError = Errors.ILLEGAL_GENERATION + ) + } else { + // Verify that the TXN_OFFSET_COMMIT request failed when group metadata is set under version 3. + assertThrows(classOf[UnsupportedVersionException], () => + verifyTxnCommitAndFetch( + topic = topic, + partition = partition, + transactionalId = transactionalId, + groupId = groupId, + memberId = memberId, + generationId = memberEpoch, + offset = 200 + version, + version = version.toShort, + expectedTxnCommitError = Errors.NONE + ) + ) + } + } + } + + private def verifyTxnCommitAndFetch( + topic: String, + partition: Int, + transactionalId: String, + groupId: String, + memberId: String, + generationId: Int, + offset: Long, + version: Short, + expectedTxnCommitError: Errors + ): Unit = { + var producerIdAndEpoch: ProducerIdAndEpoch = null + // Wait until the coordinator finishes loading. + TestUtils.waitUntilTrue(() => + try { + producerIdAndEpoch = initProducerId( + transactionalId = transactionalId, + producerIdAndEpoch = ProducerIdAndEpoch.NONE, + expectedError = Errors.NONE + ) + true + } catch { + case _: Throwable => false + }, "initProducerId request failed" + ) + + addOffsetsToTxn( + groupId = groupId, + producerId = producerIdAndEpoch.producerId, + producerEpoch = producerIdAndEpoch.epoch, + transactionalId = transactionalId + ) + + val originalOffset = fetchOffset(topic, partition, groupId) + + commitTxnOffset( + groupId = groupId, + memberId = memberId, + generationId = generationId, + producerId = producerIdAndEpoch.producerId, + producerEpoch = producerIdAndEpoch.epoch, + transactionalId = transactionalId, + topic = topic, + partition = partition, + offset = offset, + expectedError = expectedTxnCommitError, + version = version + ) + + endTxn( + producerId = producerIdAndEpoch.producerId, + producerEpoch = producerIdAndEpoch.epoch, + transactionalId = transactionalId, + isTransactionV2Enabled = false, + committed = true, + expectedError = Errors.NONE + ) + + val expectedOffset = if (expectedTxnCommitError == Errors.NONE) offset else originalOffset + + TestUtils.waitUntilTrue(() => + try { + fetchOffset(topic, partition, groupId) == expectedOffset + } catch { + case _: Throwable => false + }, "txn commit offset validation failed" + ) + } + + private def fetchOffset( + topic: String, + partition: Int, + groupId: String + ): Long = { + val fetchOffsetsResp = fetchOffsets( + groups = Map(groupId -> List(new TopicPartition(topic, partition))), + requireStable = true, + version = ApiKeys.OFFSET_FETCH.latestVersion + ) + val groupIdRecord = fetchOffsetsResp.find(_.groupId == groupId).head + val topicRecord = groupIdRecord.topics.asScala.find(_.name == topic).head + val partitionRecord = topicRecord.partitions.asScala.find(_.partitionIndex == partition).head + partitionRecord.committedOffset + } +} diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 24740207a6f..298168a5310 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -500,6 +500,23 @@ object TestUtils extends Logging { ) } + def createTransactionStateTopicWithAdmin[B <: KafkaBroker]( + admin: Admin, + brokers: Seq[B], + controllers: Seq[ControllerServer] + ): Map[Int, Int] = { + val broker = brokers.head + createTopicWithAdmin( + admin = admin, + topic = Topic.TRANSACTION_STATE_TOPIC_NAME, + numPartitions = broker.config.getInt(TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG), + replicationFactor = broker.config.getShort(TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG).toInt, + brokers = brokers, + controllers = controllers, + topicConfig = new Properties(), + ) + } + def deleteTopicWithAdmin[B <: KafkaBroker]( admin: Admin, topic: String, diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java index ed93251e544..ed05f636189 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java @@ -26,6 +26,7 @@ import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData; import org.apache.kafka.common.message.ConsumerProtocolSubscription; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.JoinGroupRequest; import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord; import org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers; import org.apache.kafka.coordinator.group.OffsetExpirationCondition; @@ -556,6 +557,12 @@ public class ConsumerGroup extends ModernGroup { // the request can commit offsets if the group is empty. if (memberEpoch < 0 && members().isEmpty()) return; + // The TxnOffsetCommit API does not require the member id, the generation id and the group instance id fields. + // Hence, they are only validated if any of them is provided + if (isTransactional && memberEpoch == JoinGroupRequest.UNKNOWN_GENERATION_ID && + memberId.equals(JoinGroupRequest.UNKNOWN_MEMBER_ID) && groupInstanceId == null) + return; + final ConsumerGroupMember member = getOrMaybeCreateMember(memberId, false); // If the commit is not transactional and the member uses the new consumer protocol (KIP-848), diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java index 25ebcd613c8..fc589df97a7 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java @@ -1037,7 +1037,7 @@ public class ConsumerGroupTest { } @ParameterizedTest - @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT) + @ApiKeyVersionsSource(apiKey = ApiKeys.TXN_OFFSET_COMMIT) public void testValidateTransactionalOffsetCommit(short version) { boolean isTransactional = true; ConsumerGroup group = createConsumerGroup("group-foo"); @@ -1063,6 +1063,9 @@ public class ConsumerGroupTest { // This should succeed. group.validateOffsetCommit("member-id", "", 0, isTransactional, version); + + // This should succeed. + group.validateOffsetCommit("", null, -1, isTransactional, version); } @ParameterizedTest @@ -1093,6 +1096,8 @@ public class ConsumerGroupTest { // A call from the admin client should fail as the group is not empty. assertThrows(UnknownMemberIdException.class, () -> group.validateOffsetCommit("", "", -1, isTransactional, version)); + assertThrows(UnknownMemberIdException.class, () -> + group.validateOffsetCommit("", null, -1, isTransactional, version)); // The member epoch is stale. if (version >= 9) {