From 71d08780d11b23ec4e931efaa8ca329c03f161e3 Mon Sep 17 00:00:00 2001 From: David Jacot Date: Wed, 23 Apr 2025 08:22:09 +0200 Subject: [PATCH] KAFKA-14690; Add TopicId to OffsetCommit API (#19461) This patch extends the OffsetCommit API to support topic ids. From version 10 of the API, topic ids must be used. Originally, we wanted to support both using topic ids and topic names from version 10 but it turns out that it makes everything more complicated. Hence we propose to only support topic ids from version 10. Clients which only support using topic names can either lookup the topic ids using the Metadata API or stay on using an earlier version. The patch only contains the server side changes and it keeps the version 10 as unstable for now. We will mark the version as stable when the client side changes are merged in. Reviewers: Lianet Magrans , PoAn Yang --- .../AlterConsumerGroupOffsetsHandler.java | 2 +- .../internals/CommitRequestManager.java | 2 +- .../internals/ConsumerCoordinator.java | 2 +- .../common/requests/OffsetCommitRequest.java | 33 ++- .../common/requests/OffsetCommitResponse.java | 128 +++++++-- .../common/message/OffsetCommitRequest.json | 9 +- .../common/message/OffsetCommitResponse.json | 9 +- .../internals/ConsumerCoordinatorTest.java | 2 +- .../kafka/common/message/MessageTest.java | 119 +++----- .../requests/OffsetCommitRequestTest.java | 11 +- .../common/requests/RequestResponseTest.java | 6 +- .../main/scala/kafka/server/KafkaApis.scala | 91 +++--- .../kafka/api/AuthorizerIntegrationTest.scala | 2 +- .../ConsumerProtocolMigrationTest.scala | 9 +- .../server/DeleteGroupsRequestTest.scala | 3 +- .../GroupCoordinatorBaseRequestTest.scala | 21 +- .../unit/kafka/server/KafkaApisTest.scala | 261 ++++++++++++++---- .../server/OffsetCommitRequestTest.scala | 25 +- .../server/OffsetDeleteRequestTest.scala | 3 +- .../kafka/server/OffsetFetchRequestTest.scala | 9 +- .../unit/kafka/server/RequestQuotaTest.scala | 2 +- .../group/OffsetMetadataManager.java | 8 +- .../group/OffsetMetadataManagerTest.java | 69 +++++ 23 files changed, 589 insertions(+), 237 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java index 5ef72f327d6..99111a70d4b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java @@ -108,7 +108,7 @@ public class AlterConsumerGroupOffsetsHandler extends AdminApiHandler.Batched(offsetData.values())); - return new OffsetCommitRequest.Builder(data); + return OffsetCommitRequest.Builder.forTopicNames(data); } @Override diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java index 284707a812b..62d1fe3a866 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java @@ -727,7 +727,7 @@ public class CommitRequestManager implements RequestManager, MemberStateListener lastEpochSentOnCommit = Optional.empty(); } - OffsetCommitRequest.Builder builder = new OffsetCommitRequest.Builder(data); + OffsetCommitRequest.Builder builder = OffsetCommitRequest.Builder.forTopicNames(data); return buildRequestWithResponseHandling(builder); } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index 01fc605ea79..1cba10ef15d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -1327,7 +1327,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator { groupInstanceId = null; } - OffsetCommitRequest.Builder builder = new OffsetCommitRequest.Builder( + OffsetCommitRequest.Builder builder = OffsetCommitRequest.Builder.forTopicNames( new OffsetCommitRequestData() .setGroupId(this.rebalanceConfig.groupId) .setGenerationIdOrMemberEpoch(generation.generationId) diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java index 8f6ab39d1fc..1bd9c41f668 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java @@ -17,6 +17,7 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.message.OffsetCommitRequestData; import org.apache.kafka.common.message.OffsetCommitRequestData.OffsetCommitRequestTopic; @@ -45,20 +46,39 @@ public class OffsetCommitRequest extends AbstractRequest { private final OffsetCommitRequestData data; - public Builder(OffsetCommitRequestData data, boolean enableUnstableLastVersion) { - super(ApiKeys.OFFSET_COMMIT, enableUnstableLastVersion); + private Builder(OffsetCommitRequestData data, short oldestAllowedVersion, short latestAllowedVersion) { + super(ApiKeys.OFFSET_COMMIT, oldestAllowedVersion, latestAllowedVersion); this.data = data; } - public Builder(OffsetCommitRequestData data) { - this(data, false); + public static Builder forTopicIdsOrNames(OffsetCommitRequestData data, boolean enableUnstableLastVersion) { + return new Builder(data, ApiKeys.OFFSET_COMMIT.oldestVersion(), ApiKeys.OFFSET_COMMIT.latestVersion(enableUnstableLastVersion)); + } + + public static Builder forTopicNames(OffsetCommitRequestData data) { + return new Builder(data, ApiKeys.OFFSET_COMMIT.oldestVersion(), (short) 9); } @Override public OffsetCommitRequest build(short version) { if (data.groupInstanceId() != null && version < 7) { - throw new UnsupportedVersionException("The broker offset commit protocol version " + - version + " does not support usage of config group.instance.id."); + throw new UnsupportedVersionException("The broker offset commit api version " + + version + " does not support usage of config group.instance.id."); + } + if (version >= 10) { + data.topics().forEach(topic -> { + if (topic.topicId() == null || topic.topicId().equals(Uuid.ZERO_UUID)) { + throw new UnsupportedVersionException("The broker offset commit api version " + + version + " does require usage of topic ids."); + } + }); + } else { + data.topics().forEach(topic -> { + if (topic.name() == null || topic.name().isEmpty()) { + throw new UnsupportedVersionException("The broker offset commit api version " + + version + " does require usage of topic names."); + } + }); } return new OffsetCommitRequest(data, version); } @@ -97,6 +117,7 @@ public class OffsetCommitRequest extends AbstractRequest { OffsetCommitResponseData response = new OffsetCommitResponseData(); request.topics().forEach(topic -> { OffsetCommitResponseTopic responseTopic = new OffsetCommitResponseTopic() + .setTopicId(topic.topicId()) .setName(topic.name()); response.topics().add(responseTopic); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java index 2b6d00b1a47..a4d740c06f9 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java @@ -17,6 +17,7 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; import org.apache.kafka.common.message.OffsetCommitResponseData; import org.apache.kafka.common.message.OffsetCommitResponseData.OffsetCommitResponsePartition; import org.apache.kafka.common.message.OffsetCommitResponseData.OffsetCommitResponseTopic; @@ -123,43 +124,56 @@ public class OffsetCommitResponse extends AbstractResponse { return version >= 4; } - public static class Builder { - OffsetCommitResponseData data = new OffsetCommitResponseData(); - HashMap byTopicName = new HashMap<>(); + public static boolean useTopicIds(short version) { + return version >= 10; + } - private OffsetCommitResponseTopic getOrCreateTopic( - String topicName - ) { - OffsetCommitResponseTopic topic = byTopicName.get(topicName); - if (topic == null) { - topic = new OffsetCommitResponseTopic().setName(topicName); - data.topics().add(topic); - byTopicName.put(topicName, topic); - } - return topic; + public static Builder newBuilder(boolean useTopicIds) { + if (useTopicIds) { + return new TopicIdBuilder(); + } else { + return new TopicNameBuilder(); } + } + + public abstract static class Builder { + protected OffsetCommitResponseData data = new OffsetCommitResponseData(); + + protected abstract void add( + OffsetCommitResponseTopic topic + ); + + protected abstract OffsetCommitResponseTopic get( + Uuid topicId, + String topicName + ); + + protected abstract OffsetCommitResponseTopic getOrCreate( + Uuid topicId, + String topicName + ); public Builder addPartition( + Uuid topicId, String topicName, int partitionIndex, Errors error ) { - final OffsetCommitResponseTopic topicResponse = getOrCreateTopic(topicName); - + final OffsetCommitResponseTopic topicResponse = getOrCreate(topicId, topicName); topicResponse.partitions().add(new OffsetCommitResponsePartition() .setPartitionIndex(partitionIndex) .setErrorCode(error.code())); - return this; } public

Builder addPartitions( + Uuid topicId, String topicName, List

partitions, Function partitionIndex, Errors error ) { - final OffsetCommitResponseTopic topicResponse = getOrCreateTopic(topicName); + final OffsetCommitResponseTopic topicResponse = getOrCreate(topicId, topicName); partitions.forEach(partition -> topicResponse.partitions().add(new OffsetCommitResponsePartition() .setPartitionIndex(partitionIndex.apply(partition)) @@ -177,11 +191,10 @@ public class OffsetCommitResponse extends AbstractResponse { } else { // Otherwise, we have to merge them together. newData.topics().forEach(newTopic -> { - OffsetCommitResponseTopic existingTopic = byTopicName.get(newTopic.name()); + OffsetCommitResponseTopic existingTopic = get(newTopic.topicId(), newTopic.name()); if (existingTopic == null) { // If no topic exists, we can directly copy the new topic data. - data.topics().add(newTopic); - byTopicName.put(newTopic.name(), newTopic); + add(newTopic); } else { // Otherwise, we add the partitions to the existing one. Note we // expect non-overlapping partitions here as we don't verify @@ -190,7 +203,6 @@ public class OffsetCommitResponse extends AbstractResponse { } }); } - return this; } @@ -198,4 +210,78 @@ public class OffsetCommitResponse extends AbstractResponse { return new OffsetCommitResponse(data); } } + + public static class TopicIdBuilder extends Builder { + private final HashMap byTopicId = new HashMap<>(); + + @Override + protected void add(OffsetCommitResponseTopic topic) { + throwIfTopicIdIsNull(topic.topicId()); + data.topics().add(topic); + byTopicId.put(topic.topicId(), topic); + } + + @Override + protected OffsetCommitResponseTopic get(Uuid topicId, String topicName) { + throwIfTopicIdIsNull(topicId); + return byTopicId.get(topicId); + } + + @Override + protected OffsetCommitResponseTopic getOrCreate(Uuid topicId, String topicName) { + throwIfTopicIdIsNull(topicId); + OffsetCommitResponseTopic topic = byTopicId.get(topicId); + if (topic == null) { + topic = new OffsetCommitResponseTopic() + .setName(topicName) + .setTopicId(topicId); + data.topics().add(topic); + byTopicId.put(topicId, topic); + } + return topic; + } + + private static void throwIfTopicIdIsNull(Uuid topicId) { + if (topicId == null) { + throw new IllegalArgumentException("TopicId cannot be null."); + } + } + } + + public static class TopicNameBuilder extends Builder { + private final HashMap byTopicName = new HashMap<>(); + + @Override + protected void add(OffsetCommitResponseTopic topic) { + throwIfTopicNameIsNull(topic.name()); + data.topics().add(topic); + byTopicName.put(topic.name(), topic); + } + + @Override + protected OffsetCommitResponseTopic get(Uuid topicId, String topicName) { + throwIfTopicNameIsNull(topicName); + return byTopicName.get(topicName); + } + + @Override + protected OffsetCommitResponseTopic getOrCreate(Uuid topicId, String topicName) { + throwIfTopicNameIsNull(topicName); + OffsetCommitResponseTopic topic = byTopicName.get(topicName); + if (topic == null) { + topic = new OffsetCommitResponseTopic() + .setName(topicName) + .setTopicId(topicId); + data.topics().add(topic); + byTopicName.put(topicName, topic); + } + return topic; + } + + private void throwIfTopicNameIsNull(String topicName) { + if (topicName == null) { + throw new IllegalArgumentException("TopicName cannot be null."); + } + } + } } diff --git a/clients/src/main/resources/common/message/OffsetCommitRequest.json b/clients/src/main/resources/common/message/OffsetCommitRequest.json index 348ed2b90c5..ba3c12f0e2b 100644 --- a/clients/src/main/resources/common/message/OffsetCommitRequest.json +++ b/clients/src/main/resources/common/message/OffsetCommitRequest.json @@ -36,8 +36,11 @@ // // Version 9 is the first version that can be used with the new consumer group protocol (KIP-848). The // request is the same as version 8. - "validVersions": "2-9", + // + // Version 10 adds support for topic ids and removes support for topic names (KIP-848). + "validVersions": "2-10", "flexibleVersions": "8+", + "latestVersionUnstable": true, "fields": [ { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId", "about": "The unique group identifier." }, @@ -52,8 +55,10 @@ "about": "The time period in ms to retain the offset." }, { "name": "Topics", "type": "[]OffsetCommitRequestTopic", "versions": "0+", "about": "The topics to commit offsets for.", "fields": [ - { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName", + { "name": "Name", "type": "string", "versions": "0-9", "entityType": "topicName", "ignorable": true, "about": "The topic name." }, + { "name": "TopicId", "type": "uuid", "versions": "10+", "ignorable": true, + "about": "The topic ID." }, { "name": "Partitions", "type": "[]OffsetCommitRequestPartition", "versions": "0+", "about": "Each partition to commit offsets for.", "fields": [ { "name": "PartitionIndex", "type": "int32", "versions": "0+", diff --git a/clients/src/main/resources/common/message/OffsetCommitResponse.json b/clients/src/main/resources/common/message/OffsetCommitResponse.json index 0cccd64816c..0228733ce6b 100644 --- a/clients/src/main/resources/common/message/OffsetCommitResponse.json +++ b/clients/src/main/resources/common/message/OffsetCommitResponse.json @@ -34,7 +34,9 @@ // Version 9 is the first version that can be used with the new consumer group protocol (KIP-848). The response is // the same as version 8 but can return STALE_MEMBER_EPOCH when the new consumer group protocol is used and // GROUP_ID_NOT_FOUND when the group does not exist for both protocols. - "validVersions": "2-9", + // + // Version 10 adds support for topic ids and removes support for topic names (KIP-848). + "validVersions": "2-10", "flexibleVersions": "8+", // Supported errors: // - GROUP_AUTHORIZATION_FAILED (version 0+) @@ -47,13 +49,16 @@ // - FENCED_MEMBER_EPOCH (version 7+) // - GROUP_ID_NOT_FOUND (version 9+) // - STALE_MEMBER_EPOCH (version 9+) + // - UNKNOWN_TOPIC_ID (version 10+) "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "3+", "ignorable": true, "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, { "name": "Topics", "type": "[]OffsetCommitResponseTopic", "versions": "0+", "about": "The responses for each topic.", "fields": [ - { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName", + { "name": "Name", "type": "string", "versions": "0-9", "entityType": "topicName", "ignorable": true, "about": "The topic name." }, + { "name": "TopicId", "type": "uuid", "versions": "10+", "ignorable": true, + "about": "The topic ID." }, { "name": "Partitions", "type": "[]OffsetCommitResponsePartition", "versions": "0+", "about": "The responses for each partition in the topic.", "fields": [ { "name": "PartitionIndex", "type": "int32", "versions": "0+", diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java index 5c9e06ff90d..a34f2f16337 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java @@ -682,7 +682,7 @@ public abstract class ConsumerCoordinatorTest { ) ); - consumerClient.send(coordinator.checkAndGetCoordinator(), new OffsetCommitRequest.Builder(offsetCommitRequestData)) + consumerClient.send(coordinator.checkAndGetCoordinator(), OffsetCommitRequest.Builder.forTopicNames(offsetCommitRequestData)) .compose(new RequestFutureAdapter() { @Override public void onSuccess(ClientResponse value, RequestFuture future) {} diff --git a/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java b/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java index b28b8274f58..4674bf2013e 100644 --- a/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java +++ b/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java @@ -56,11 +56,13 @@ import org.apache.kafka.common.protocol.Message; import org.apache.kafka.common.protocol.MessageUtil; import org.apache.kafka.common.protocol.ObjectSerializationCache; import org.apache.kafka.common.protocol.types.RawTaggedField; +import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource; import com.fasterxml.jackson.databind.JsonNode; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.params.ParameterizedTest; import java.lang.reflect.Method; import java.nio.ByteBuffer; @@ -409,90 +411,49 @@ public final class MessageTest { new OffsetForLeaderEpochRequestData().setReplicaId(-2)); } - @Test - public void testOffsetCommitRequestVersions() throws Exception { - String groupId = "groupId"; - String topicName = "topic"; - String metadata = "metadata"; - int partition = 2; - int offset = 100; + @ParameterizedTest + @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT) + public void testOffsetCommitRequestVersions(short version) throws Exception { + OffsetCommitRequestData request = new OffsetCommitRequestData() + .setGroupId("groupId") + .setMemberId("memberId") + .setGenerationIdOrMemberEpoch(version >= 1 ? 10 : -1) + .setGroupInstanceId(version >= 7 ? "instanceId" : null) + .setRetentionTimeMs((version >= 2 && version <= 4) ? 20 : -1) + .setTopics(singletonList( + new OffsetCommitRequestTopic() + .setTopicId(version >= 10 ? Uuid.randomUuid() : Uuid.ZERO_UUID) + .setName(version < 10 ? "topic" : "") + .setPartitions(singletonList( + new OffsetCommitRequestPartition() + .setPartitionIndex(1) + .setCommittedMetadata("metadata") + .setCommittedOffset(100) + .setCommittedLeaderEpoch(version >= 6 ? 10 : -1) - testAllMessageRoundTrips(new OffsetCommitRequestData() - .setGroupId(groupId) - .setTopics(Collections.singletonList( - new OffsetCommitRequestTopic() - .setName(topicName) - .setPartitions(Collections.singletonList( - new OffsetCommitRequestPartition() - .setPartitionIndex(partition) - .setCommittedMetadata(metadata) - .setCommittedOffset(offset) - ))))); + )) + )); - Supplier request = - () -> new OffsetCommitRequestData() - .setGroupId(groupId) - .setMemberId("memberId") - .setGroupInstanceId("instanceId") - .setTopics(Collections.singletonList( - new OffsetCommitRequestTopic() - .setName(topicName) - .setPartitions(Collections.singletonList( - new OffsetCommitRequestPartition() - .setPartitionIndex(partition) - .setCommittedLeaderEpoch(10) - .setCommittedMetadata(metadata) - .setCommittedOffset(offset) - )))) - .setRetentionTimeMs(20); - - for (short version : ApiKeys.OFFSET_COMMIT.allVersions()) { - OffsetCommitRequestData requestData = request.get(); - - if (version > 4) { - requestData.setRetentionTimeMs(-1); - } - - if (version < 6) { - requestData.topics().get(0).partitions().get(0).setCommittedLeaderEpoch(-1); - } - - if (version < 7) { - requestData.setGroupInstanceId(null); - } - - if (version >= 2 && version <= 4) { - testAllMessageRoundTripsBetweenVersions(version, (short) 5, requestData, requestData); - } else { - testAllMessageRoundTripsFromVersion(version, requestData); - } - } + testMessageRoundTrip(version, request, request); } - @Test - public void testOffsetCommitResponseVersions() throws Exception { - Supplier response = - () -> new OffsetCommitResponseData() - .setTopics( - singletonList( - new OffsetCommitResponseTopic() - .setName("topic") - .setPartitions(singletonList( - new OffsetCommitResponsePartition() - .setPartitionIndex(1) - .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()) - )) - ) - ) - .setThrottleTimeMs(20); + @ParameterizedTest + @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT) + public void testOffsetCommitResponseVersions(short version) throws Exception { + OffsetCommitResponseData response = new OffsetCommitResponseData() + .setThrottleTimeMs(version >= 3 ? 20 : 0) + .setTopics(singletonList( + new OffsetCommitResponseTopic() + .setTopicId(version >= 10 ? Uuid.randomUuid() : Uuid.ZERO_UUID) + .setName(version < 10 ? "topic" : "") + .setPartitions(singletonList( + new OffsetCommitResponsePartition() + .setPartitionIndex(1) + .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()) + )) + )); - for (short version : ApiKeys.OFFSET_COMMIT.allVersions()) { - OffsetCommitResponseData responseData = response.get(); - if (version < 3) { - responseData.setThrottleTimeMs(0); - } - testAllMessageRoundTripsFromVersion(version, responseData); - } + testMessageRoundTrip(version, response, response); } @Test diff --git a/clients/src/test/java/org/apache/kafka/common/requests/OffsetCommitRequestTest.java b/clients/src/test/java/org/apache/kafka/common/requests/OffsetCommitRequestTest.java index 161a4dd5f11..9cd95cfec76 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/OffsetCommitRequestTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/OffsetCommitRequestTest.java @@ -17,6 +17,7 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.message.OffsetCommitRequestData; import org.apache.kafka.common.message.OffsetCommitRequestData.OffsetCommitRequestPartition; @@ -45,6 +46,8 @@ public class OffsetCommitRequestTest { protected static String groupId = "groupId"; protected static String memberId = "consumerId"; protected static String groupInstanceId = "groupInstanceId"; + protected static Uuid topicIdOne = Uuid.randomUuid(); + protected static Uuid topicIdTwo = Uuid.randomUuid(); protected static String topicOne = "topicOne"; protected static String topicTwo = "topicTwo"; protected static int partitionOne = 1; @@ -61,6 +64,7 @@ public class OffsetCommitRequestTest { public void setUp() { List topics = Arrays.asList( new OffsetCommitRequestTopic() + .setTopicId(topicIdOne) .setName(topicOne) .setPartitions(Collections.singletonList( new OffsetCommitRequestPartition() @@ -70,6 +74,7 @@ public class OffsetCommitRequestTest { .setCommittedMetadata(metadata) )), new OffsetCommitRequestTopic() + .setTopicId(topicIdTwo) .setName(topicTwo) .setPartitions(Collections.singletonList( new OffsetCommitRequestPartition() @@ -90,7 +95,7 @@ public class OffsetCommitRequestTest { expectedOffsets.put(new TopicPartition(topicOne, partitionOne), offset); expectedOffsets.put(new TopicPartition(topicTwo, partitionTwo), offset); - OffsetCommitRequest.Builder builder = new OffsetCommitRequest.Builder(data); + OffsetCommitRequest.Builder builder = OffsetCommitRequest.Builder.forTopicNames(data); for (short version : ApiKeys.OFFSET_COMMIT.allVersions()) { OffsetCommitRequest request = builder.build(version); @@ -105,7 +110,7 @@ public class OffsetCommitRequestTest { @Test public void testVersionSupportForGroupInstanceId() { - OffsetCommitRequest.Builder builder = new OffsetCommitRequest.Builder( + OffsetCommitRequest.Builder builder = OffsetCommitRequest.Builder.forTopicNames( new OffsetCommitRequestData() .setGroupId(groupId) .setMemberId(memberId) @@ -127,12 +132,14 @@ public class OffsetCommitRequestTest { OffsetCommitResponseData expectedResponse = new OffsetCommitResponseData() .setTopics(Arrays.asList( new OffsetCommitResponseTopic() + .setTopicId(topicIdOne) .setName(topicOne) .setPartitions(Collections.singletonList( new OffsetCommitResponsePartition() .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()) .setPartitionIndex(partitionOne))), new OffsetCommitResponseTopic() + .setTopicId(topicIdTwo) .setName(topicTwo) .setPartitions(Collections.singletonList( new OffsetCommitResponsePartition() diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index b33dec17d9a..5d1e853e691 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -329,6 +329,8 @@ import static org.junit.jupiter.api.Assertions.fail; // This class performs tests requests and responses for all API keys public class RequestResponseTest { + private static final Uuid TOPIC_ID = Uuid.randomUuid(); + // Exception includes a message that we verify is not included in error responses private final UnknownServerException unknownServerException = new UnknownServerException("secret"); @@ -2401,7 +2403,7 @@ public class RequestResponseTest { } private OffsetCommitRequest createOffsetCommitRequest(short version) { - return new OffsetCommitRequest.Builder(new OffsetCommitRequestData() + return OffsetCommitRequest.Builder.forTopicNames(new OffsetCommitRequestData() .setGroupId("group1") .setMemberId("consumer1") .setGroupInstanceId(null) @@ -2409,6 +2411,7 @@ public class RequestResponseTest { .setTopics(singletonList( new OffsetCommitRequestData.OffsetCommitRequestTopic() .setName("test") + .setTopicId(TOPIC_ID) .setPartitions(asList( new OffsetCommitRequestData.OffsetCommitRequestPartition() .setPartitionIndex(0) @@ -2430,6 +2433,7 @@ public class RequestResponseTest { .setTopics(singletonList( new OffsetCommitResponseData.OffsetCommitResponseTopic() .setName("test") + .setTopicId(TOPIC_ID) .setPartitions(singletonList( new OffsetCommitResponseData.OffsetCommitResponsePartition() .setPartitionIndex(0) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 6a22963ac7d..ca8cc311b96 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -275,11 +275,21 @@ class KafkaApis(val requestChannel: RequestChannel, ): CompletableFuture[Unit] = { val offsetCommitRequest = request.body[OffsetCommitRequest] - // Reject the request if not authorized to the group + // Reject the request if not authorized to the group. if (!authHelper.authorize(request.context, READ, GROUP, offsetCommitRequest.data.groupId)) { requestHelper.sendMaybeThrottle(request, offsetCommitRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception)) CompletableFuture.completedFuture[Unit](()) } else { + val useTopicIds = OffsetCommitResponse.useTopicIds(request.header.apiVersion) + + if (useTopicIds) { + offsetCommitRequest.data.topics.forEach { topic => + if (topic.topicId != Uuid.ZERO_UUID) { + metadataCache.getTopicName(topic.topicId).ifPresent(name => topic.setName(name)) + } + } + } + val authorizedTopics = authHelper.filterByAuthorized( request.context, READ, @@ -287,28 +297,40 @@ class KafkaApis(val requestChannel: RequestChannel, offsetCommitRequest.data.topics.asScala )(_.name) - val responseBuilder = new OffsetCommitResponse.Builder() + val responseBuilder = OffsetCommitResponse.newBuilder(useTopicIds) val authorizedTopicsRequest = new mutable.ArrayBuffer[OffsetCommitRequestData.OffsetCommitRequestTopic]() offsetCommitRequest.data.topics.forEach { topic => - if (!authorizedTopics.contains(topic.name)) { + if (useTopicIds && topic.name.isEmpty) { + // If the topic name is undefined, it means that the topic id is unknown so we add + // the topic and all its partitions to the response with UNKNOWN_TOPIC_ID. + responseBuilder.addPartitions[OffsetCommitRequestData.OffsetCommitRequestPartition]( + topic.topicId, topic.name, topic.partitions, _.partitionIndex, Errors.UNKNOWN_TOPIC_ID) + } else if (!authorizedTopics.contains(topic.name)) { // If the topic is not authorized, we add the topic and all its partitions // to the response with TOPIC_AUTHORIZATION_FAILED. responseBuilder.addPartitions[OffsetCommitRequestData.OffsetCommitRequestPartition]( - topic.name, topic.partitions, _.partitionIndex, Errors.TOPIC_AUTHORIZATION_FAILED) + topic.topicId, topic.name, topic.partitions, _.partitionIndex, Errors.TOPIC_AUTHORIZATION_FAILED) } else if (!metadataCache.contains(topic.name)) { // If the topic is unknown, we add the topic and all its partitions // to the response with UNKNOWN_TOPIC_OR_PARTITION. responseBuilder.addPartitions[OffsetCommitRequestData.OffsetCommitRequestPartition]( - topic.name, topic.partitions, _.partitionIndex, Errors.UNKNOWN_TOPIC_OR_PARTITION) + topic.topicId, topic.name, topic.partitions, _.partitionIndex, Errors.UNKNOWN_TOPIC_OR_PARTITION) } else { // Otherwise, we check all partitions to ensure that they all exist. - val topicWithValidPartitions = new OffsetCommitRequestData.OffsetCommitRequestTopic().setName(topic.name) + val topicWithValidPartitions = new OffsetCommitRequestData.OffsetCommitRequestTopic() + .setTopicId(topic.topicId) + .setName(topic.name) topic.partitions.forEach { partition => - if (metadataCache.getLeaderAndIsr(topic.name, partition.partitionIndex).isPresent()) { + if (metadataCache.getLeaderAndIsr(topic.name, partition.partitionIndex).isPresent) { topicWithValidPartitions.partitions.add(partition) } else { - responseBuilder.addPartition(topic.name, partition.partitionIndex, Errors.UNKNOWN_TOPIC_OR_PARTITION) + responseBuilder.addPartition( + topic.topicId, + topic.name, + partition.partitionIndex, + Errors.UNKNOWN_TOPIC_OR_PARTITION + ) } } @@ -322,42 +344,23 @@ class KafkaApis(val requestChannel: RequestChannel, requestHelper.sendMaybeThrottle(request, responseBuilder.build()) CompletableFuture.completedFuture(()) } else { - // For version > 0, store offsets in Coordinator. - commitOffsetsToCoordinator( - request, - offsetCommitRequest, - authorizedTopicsRequest, - responseBuilder, - requestLocal - ) - } - } - } - - private def commitOffsetsToCoordinator( - request: RequestChannel.Request, - offsetCommitRequest: OffsetCommitRequest, - authorizedTopicsRequest: mutable.ArrayBuffer[OffsetCommitRequestData.OffsetCommitRequestTopic], - responseBuilder: OffsetCommitResponse.Builder, - requestLocal: RequestLocal - ): CompletableFuture[Unit] = { - val offsetCommitRequestData = new OffsetCommitRequestData() - .setGroupId(offsetCommitRequest.data.groupId) - .setMemberId(offsetCommitRequest.data.memberId) - .setGenerationIdOrMemberEpoch(offsetCommitRequest.data.generationIdOrMemberEpoch) - .setRetentionTimeMs(offsetCommitRequest.data.retentionTimeMs) - .setGroupInstanceId(offsetCommitRequest.data.groupInstanceId) - .setTopics(authorizedTopicsRequest.asJava) - - groupCoordinator.commitOffsets( - request.context, - offsetCommitRequestData, - requestLocal.bufferSupplier - ).handle[Unit] { (results, exception) => - if (exception != null) { - requestHelper.sendMaybeThrottle(request, offsetCommitRequest.getErrorResponse(exception)) - } else { - requestHelper.sendMaybeThrottle(request, responseBuilder.merge(results).build()) + groupCoordinator.commitOffsets( + request.context, + new OffsetCommitRequestData() + .setGroupId(offsetCommitRequest.data.groupId) + .setMemberId(offsetCommitRequest.data.memberId) + .setGenerationIdOrMemberEpoch(offsetCommitRequest.data.generationIdOrMemberEpoch) + .setRetentionTimeMs(offsetCommitRequest.data.retentionTimeMs) + .setGroupInstanceId(offsetCommitRequest.data.groupInstanceId) + .setTopics(authorizedTopicsRequest.asJava), + requestLocal.bufferSupplier + ).handle[Unit] { (results, exception) => + if (exception != null) { + requestHelper.sendMaybeThrottle(request, offsetCommitRequest.getErrorResponse(exception)) + } else { + requestHelper.sendMaybeThrottle(request, responseBuilder.merge(results).build()) + } + } } } } diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index 7fccbdc9e28..e9e476fcd12 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -372,7 +372,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest { } private def createOffsetCommitRequest = { - new requests.OffsetCommitRequest.Builder( + requests.OffsetCommitRequest.Builder.forTopicNames( new OffsetCommitRequestData() .setGroupId(group) .setMemberId(JoinGroupRequest.UNKNOWN_MEMBER_ID) diff --git a/core/src/test/scala/unit/kafka/server/ConsumerProtocolMigrationTest.scala b/core/src/test/scala/unit/kafka/server/ConsumerProtocolMigrationTest.scala index 37c81ce20e5..8f5f759250b 100644 --- a/core/src/test/scala/unit/kafka/server/ConsumerProtocolMigrationTest.scala +++ b/core/src/test/scala/unit/kafka/server/ConsumerProtocolMigrationTest.scala @@ -690,7 +690,7 @@ class ConsumerProtocolMigrationTest(cluster: ClusterInstance) extends GroupCoord val topicName = "foo" // Create the topic. - createTopic( + val topicId = createTopic( topic = topicName, numPartitions = 3 ) @@ -702,6 +702,7 @@ class ConsumerProtocolMigrationTest(cluster: ClusterInstance) extends GroupCoord memberId = "member-id", memberEpoch = -1, topic = topicName, + topicId = topicId, partition = 0, offset = 1000L, expectedError = Errors.NONE, @@ -765,7 +766,7 @@ class ConsumerProtocolMigrationTest(cluster: ClusterInstance) extends GroupCoord createOffsetsTopic() // Create the topic. - createTopic( + val topicId = createTopic( topic = "foo", numPartitions = 3 ) @@ -865,6 +866,7 @@ class ConsumerProtocolMigrationTest(cluster: ClusterInstance) extends GroupCoord memberId = memberId1, memberEpoch = 1, topic = "foo", + topicId = topicId, partition = partitionId, offset = 100L + 10 * version + partitionId, expectedError = Errors.NONE, @@ -1096,7 +1098,7 @@ class ConsumerProtocolMigrationTest(cluster: ClusterInstance) extends GroupCoord createOffsetsTopic() // Create the topic. - createTopic( + val topicId = createTopic( topic = "foo", numPartitions = 3 ) @@ -1164,6 +1166,7 @@ class ConsumerProtocolMigrationTest(cluster: ClusterInstance) extends GroupCoord memberId = memberId1, memberEpoch = 1, topic = "foo", + topicId = topicId, partition = partitionId, offset = 100L + 10 * version + partitionId, expectedError = Errors.NONE, diff --git a/core/src/test/scala/unit/kafka/server/DeleteGroupsRequestTest.scala b/core/src/test/scala/unit/kafka/server/DeleteGroupsRequestTest.scala index 88733a86576..fe4501e640a 100644 --- a/core/src/test/scala/unit/kafka/server/DeleteGroupsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/DeleteGroupsRequestTest.scala @@ -48,7 +48,7 @@ class DeleteGroupsRequestTest(cluster: ClusterInstance) extends GroupCoordinator createOffsetsTopic() // Create the topic. - createTopic( + val topicId = createTopic( topic = "foo", numPartitions = 3 ) @@ -89,6 +89,7 @@ class DeleteGroupsRequestTest(cluster: ClusterInstance) extends GroupCoordinator memberId = memberId, memberEpoch = memberEpoch, topic = "foo", + topicId = topicId, partition = 0, offset = 100L, expectedError = Errors.GROUP_ID_NOT_FOUND, diff --git a/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala b/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala index c9b1dda51a5..be96826858a 100644 --- a/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala @@ -19,7 +19,7 @@ package kafka.server import kafka.network.SocketServer import kafka.utils.TestUtils import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata} -import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid} +import org.apache.kafka.common.{TopicCollection, TopicIdPartition, TopicPartition, Uuid} import org.apache.kafka.common.message.DeleteGroupsResponseData.{DeletableGroupResult, DeletableGroupResultCollection} import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity import org.apache.kafka.common.message.LeaveGroupResponseData.MemberResponse @@ -75,7 +75,7 @@ class GroupCoordinatorBaseRequestTest(cluster: ClusterInstance) { protected def createTopic( topic: String, numPartitions: Int - ): Unit = { + ): Uuid = { val admin = cluster.admin() try { TestUtils.createTopicWithAdmin( @@ -85,6 +85,12 @@ class GroupCoordinatorBaseRequestTest(cluster: ClusterInstance) { topic = topic, numPartitions = numPartitions ) + admin + .describeTopics(TopicCollection.ofTopicNames(List(topic).asJava)) + .allTopicNames() + .get() + .get(topic) + .topicId() } finally { admin.close() } @@ -166,18 +172,24 @@ class GroupCoordinatorBaseRequestTest(cluster: ClusterInstance) { memberId: String, memberEpoch: Int, topic: String, + topicId: Uuid, partition: Int, offset: Long, expectedError: Errors, version: Short = ApiKeys.OFFSET_COMMIT.latestVersion(isUnstableApiEnabled) ): Unit = { - val request = new OffsetCommitRequest.Builder( + if (version >= 10 && topicId == Uuid.ZERO_UUID) { + throw new IllegalArgumentException(s"Cannot call OffsetCommit API version $version without a topic id") + } + + val request = OffsetCommitRequest.Builder.forTopicIdsOrNames( new OffsetCommitRequestData() .setGroupId(groupId) .setMemberId(memberId) .setGenerationIdOrMemberEpoch(memberEpoch) .setTopics(List( new OffsetCommitRequestData.OffsetCommitRequestTopic() + .setTopicId(topicId) .setName(topic) .setPartitions(List( new OffsetCommitRequestData.OffsetCommitRequestPartition() @@ -191,7 +203,8 @@ class GroupCoordinatorBaseRequestTest(cluster: ClusterInstance) { val expectedResponse = new OffsetCommitResponseData() .setTopics(List( new OffsetCommitResponseData.OffsetCommitResponseTopic() - .setName(topic) + .setTopicId(if (version >= 10) topicId else Uuid.ZERO_UUID) + .setName(if (version < 10) topic else "") .setPartitions(List( new OffsetCommitResponseData.OffsetCommitResponsePartition() .setPartitionIndex(partition) diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index 02541097d4c..42c1131af61 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -992,27 +992,43 @@ class KafkaApisTest extends Logging { ) } - @Test - def testHandleOffsetCommitRequest(): Unit = { - addTopicToMetadataCache("foo", numPartitions = 1) + @ParameterizedTest + @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT) + def testHandleOffsetCommitRequest(version: Short): Unit = { + val topicName = "foo" + val topicId = Uuid.randomUuid() + addTopicToMetadataCache(topicName, topicId = topicId, numPartitions = 1) val offsetCommitRequest = new OffsetCommitRequestData() .setGroupId("group") .setMemberId("member") .setTopics(List( new OffsetCommitRequestData.OffsetCommitRequestTopic() - .setName("foo") + .setTopicId(if (version >= 10) topicId else Uuid.ZERO_UUID) + .setName(if (version < 10) topicName else "") .setPartitions(List( new OffsetCommitRequestData.OffsetCommitRequestPartition() .setPartitionIndex(0) .setCommittedOffset(10)).asJava)).asJava) - val requestChannelRequest = buildRequest(new OffsetCommitRequest.Builder(offsetCommitRequest).build()) + val expectedOffsetCommitRequest = new OffsetCommitRequestData() + .setGroupId("group") + .setMemberId("member") + .setTopics(List( + new OffsetCommitRequestData.OffsetCommitRequestTopic() + .setTopicId(if (version >= 10) topicId else Uuid.ZERO_UUID) + .setName(topicName) + .setPartitions(List( + new OffsetCommitRequestData.OffsetCommitRequestPartition() + .setPartitionIndex(0) + .setCommittedOffset(10)).asJava)).asJava) + + val requestChannelRequest = buildRequest(OffsetCommitRequest.Builder.forTopicIdsOrNames(offsetCommitRequest, true).build(version)) val future = new CompletableFuture[OffsetCommitResponseData]() when(groupCoordinator.commitOffsets( requestChannelRequest.context, - offsetCommitRequest, + expectedOffsetCommitRequest, RequestLocal.noCaching.bufferSupplier )).thenReturn(future) kafkaApis = createKafkaApis() @@ -1025,7 +1041,8 @@ class KafkaApisTest extends Logging { val offsetCommitResponse = new OffsetCommitResponseData() .setTopics(List( new OffsetCommitResponseData.OffsetCommitResponseTopic() - .setName("foo") + .setTopicId(if (version >= 10) topicId else Uuid.ZERO_UUID) + .setName(if (version < 10) topicName else "") .setPartitions(List( new OffsetCommitResponseData.OffsetCommitResponsePartition() .setPartitionIndex(0) @@ -1036,27 +1053,43 @@ class KafkaApisTest extends Logging { assertEquals(offsetCommitResponse, response.data) } - @Test - def testHandleOffsetCommitRequestFutureFailed(): Unit = { - addTopicToMetadataCache("foo", numPartitions = 1) + @ParameterizedTest + @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT) + def testHandleOffsetCommitRequestFutureFailed(version: Short): Unit = { + val topicName = "foo" + val topicId = Uuid.randomUuid() + addTopicToMetadataCache(topicName, topicId = topicId, numPartitions = 1) val offsetCommitRequest = new OffsetCommitRequestData() .setGroupId("group") .setMemberId("member") .setTopics(List( new OffsetCommitRequestData.OffsetCommitRequestTopic() - .setName("foo") + .setTopicId(if (version >= 10) topicId else Uuid.ZERO_UUID) + .setName(if (version < 10) topicName else "") .setPartitions(List( new OffsetCommitRequestData.OffsetCommitRequestPartition() .setPartitionIndex(0) .setCommittedOffset(10)).asJava)).asJava) - val requestChannelRequest = buildRequest(new OffsetCommitRequest.Builder(offsetCommitRequest).build()) + val expectedOffsetCommitRequest = new OffsetCommitRequestData() + .setGroupId("group") + .setMemberId("member") + .setTopics(List( + new OffsetCommitRequestData.OffsetCommitRequestTopic() + .setTopicId(if (version >= 10) topicId else Uuid.ZERO_UUID) + .setName(topicName) + .setPartitions(List( + new OffsetCommitRequestData.OffsetCommitRequestPartition() + .setPartitionIndex(0) + .setCommittedOffset(10)).asJava)).asJava) + + val requestChannelRequest = buildRequest(OffsetCommitRequest.Builder.forTopicIdsOrNames(offsetCommitRequest, true).build(version)) val future = new CompletableFuture[OffsetCommitResponseData]() when(groupCoordinator.commitOffsets( requestChannelRequest.context, - offsetCommitRequest, + expectedOffsetCommitRequest, RequestLocal.noCaching.bufferSupplier )).thenReturn(future) @@ -1069,7 +1102,8 @@ class KafkaApisTest extends Logging { val expectedOffsetCommitResponse = new OffsetCommitResponseData() .setTopics(List( new OffsetCommitResponseData.OffsetCommitResponseTopic() - .setName("foo") + .setTopicId(if (version >= 10) topicId else Uuid.ZERO_UUID) + .setName(if (version < 10) topicName else "") .setPartitions(List( new OffsetCommitResponseData.OffsetCommitResponsePartition() .setPartitionIndex(0) @@ -1080,6 +1114,161 @@ class KafkaApisTest extends Logging { assertEquals(expectedOffsetCommitResponse, response.data) } + @Test + def testHandleOffsetCommitRequestTopicsAndPartitionsValidationWithTopicIds(): Unit = { + val fooId = Uuid.randomUuid() + val barId = Uuid.randomUuid() + val zarId = Uuid.randomUuid() + val fooName = "foo" + val barName = "bar" + addTopicToMetadataCache(fooName, topicId = fooId, numPartitions = 2) + addTopicToMetadataCache(barName, topicId = barId, numPartitions = 2) + + val offsetCommitRequest = new OffsetCommitRequestData() + .setGroupId("group") + .setMemberId("member") + .setTopics(List( + // foo exists but only has 2 partitions. + new OffsetCommitRequestData.OffsetCommitRequestTopic() + .setTopicId(fooId) + .setPartitions(List( + new OffsetCommitRequestData.OffsetCommitRequestPartition() + .setPartitionIndex(0) + .setCommittedOffset(10), + new OffsetCommitRequestData.OffsetCommitRequestPartition() + .setPartitionIndex(1) + .setCommittedOffset(20), + new OffsetCommitRequestData.OffsetCommitRequestPartition() + .setPartitionIndex(2) + .setCommittedOffset(30)).asJava), + // bar exists. + new OffsetCommitRequestData.OffsetCommitRequestTopic() + .setTopicId(barId) + .setPartitions(List( + new OffsetCommitRequestData.OffsetCommitRequestPartition() + .setPartitionIndex(0) + .setCommittedOffset(40), + new OffsetCommitRequestData.OffsetCommitRequestPartition() + .setPartitionIndex(1) + .setCommittedOffset(50)).asJava), + // zar does not exist. + new OffsetCommitRequestData.OffsetCommitRequestTopic() + .setTopicId(zarId) + .setPartitions(List( + new OffsetCommitRequestData.OffsetCommitRequestPartition() + .setPartitionIndex(0) + .setCommittedOffset(60), + new OffsetCommitRequestData.OffsetCommitRequestPartition() + .setPartitionIndex(1) + .setCommittedOffset(70)).asJava)).asJava) + + val requestChannelRequest = buildRequest(OffsetCommitRequest.Builder.forTopicIdsOrNames(offsetCommitRequest, true).build()) + + // This is the request expected by the group coordinator. + val expectedOffsetCommitRequest = new OffsetCommitRequestData() + .setGroupId("group") + .setMemberId("member") + .setTopics(List( + // foo exists but only has 2 partitions. + new OffsetCommitRequestData.OffsetCommitRequestTopic() + .setTopicId(fooId) + .setName(fooName) + .setPartitions(List( + new OffsetCommitRequestData.OffsetCommitRequestPartition() + .setPartitionIndex(0) + .setCommittedOffset(10), + new OffsetCommitRequestData.OffsetCommitRequestPartition() + .setPartitionIndex(1) + .setCommittedOffset(20)).asJava), + new OffsetCommitRequestData.OffsetCommitRequestTopic() + .setTopicId(barId) + .setName(barName) + .setPartitions(List( + new OffsetCommitRequestData.OffsetCommitRequestPartition() + .setPartitionIndex(0) + .setCommittedOffset(40), + new OffsetCommitRequestData.OffsetCommitRequestPartition() + .setPartitionIndex(1) + .setCommittedOffset(50)).asJava)).asJava) + + val future = new CompletableFuture[OffsetCommitResponseData]() + when(groupCoordinator.commitOffsets( + requestChannelRequest.context, + expectedOffsetCommitRequest, + RequestLocal.noCaching.bufferSupplier + )).thenReturn(future) + kafkaApis = createKafkaApis() + kafkaApis.handle( + requestChannelRequest, + RequestLocal.noCaching + ) + + // This is the response returned by the group coordinator. + val offsetCommitResponse = new OffsetCommitResponseData() + .setTopics(List( + new OffsetCommitResponseData.OffsetCommitResponseTopic() + .setTopicId(fooId) + .setName(fooName) + .setPartitions(List( + new OffsetCommitResponseData.OffsetCommitResponsePartition() + .setPartitionIndex(0) + .setErrorCode(Errors.NONE.code), + new OffsetCommitResponseData.OffsetCommitResponsePartition() + .setPartitionIndex(1) + .setErrorCode(Errors.NONE.code)).asJava), + new OffsetCommitResponseData.OffsetCommitResponseTopic() + .setTopicId(barId) + .setName(barName) + .setPartitions(List( + new OffsetCommitResponseData.OffsetCommitResponsePartition() + .setPartitionIndex(0) + .setErrorCode(Errors.NONE.code), + new OffsetCommitResponseData.OffsetCommitResponsePartition() + .setPartitionIndex(1) + .setErrorCode(Errors.NONE.code)).asJava)).asJava) + + val expectedOffsetCommitResponse = new OffsetCommitResponseData() + .setTopics(List( + new OffsetCommitResponseData.OffsetCommitResponseTopic() + .setTopicId(fooId) + .setPartitions(List( + // foo-2 is first because partitions failing the validation + // are put in the response first. + new OffsetCommitResponseData.OffsetCommitResponsePartition() + .setPartitionIndex(2) + .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code), + new OffsetCommitResponseData.OffsetCommitResponsePartition() + .setPartitionIndex(0) + .setErrorCode(Errors.NONE.code), + new OffsetCommitResponseData.OffsetCommitResponsePartition() + .setPartitionIndex(1) + .setErrorCode(Errors.NONE.code)).asJava), + // zar is before bar because topics failing the validation are + // put in the response first. + new OffsetCommitResponseData.OffsetCommitResponseTopic() + .setTopicId(zarId) + .setPartitions(List( + new OffsetCommitResponseData.OffsetCommitResponsePartition() + .setPartitionIndex(0) + .setErrorCode(Errors.UNKNOWN_TOPIC_ID.code), + new OffsetCommitResponseData.OffsetCommitResponsePartition() + .setPartitionIndex(1) + .setErrorCode(Errors.UNKNOWN_TOPIC_ID.code)).asJava), + new OffsetCommitResponseData.OffsetCommitResponseTopic() + .setTopicId(barId) + .setPartitions(List( + new OffsetCommitResponseData.OffsetCommitResponsePartition() + .setPartitionIndex(0) + .setErrorCode(Errors.NONE.code), + new OffsetCommitResponseData.OffsetCommitResponsePartition() + .setPartitionIndex(1) + .setErrorCode(Errors.NONE.code)).asJava)).asJava) + + future.complete(offsetCommitResponse) + val response = verifyNoThrottling[OffsetCommitResponse](requestChannelRequest) + assertEquals(expectedOffsetCommitResponse, response.data) + } + @Test def testHandleOffsetCommitRequestTopicsAndPartitionsValidation(): Unit = { addTopicToMetadataCache("foo", numPartitions = 2) @@ -1123,7 +1312,7 @@ class KafkaApisTest extends Logging { .setPartitionIndex(1) .setCommittedOffset(70)).asJava)).asJava) - val requestChannelRequest = buildRequest(new OffsetCommitRequest.Builder(offsetCommitRequest).build()) + val requestChannelRequest = buildRequest(OffsetCommitRequest.Builder.forTopicNames(offsetCommitRequest).build()) // This is the request expected by the group coordinator. val expectedOffsetCommitRequest = new OffsetCommitRequestData() @@ -1226,48 +1415,6 @@ class KafkaApisTest extends Logging { assertEquals(expectedOffsetCommitResponse, response.data) } - @Test - def testOffsetCommitWithInvalidPartition(): Unit = { - val topic = "topic" - addTopicToMetadataCache(topic, numPartitions = 1) - - def checkInvalidPartition(invalidPartitionId: Int): Unit = { - reset(replicaManager, clientRequestQuotaManager, requestChannel) - - val offsetCommitRequest = new OffsetCommitRequest.Builder( - new OffsetCommitRequestData() - .setGroupId("groupId") - .setTopics(Collections.singletonList( - new OffsetCommitRequestData.OffsetCommitRequestTopic() - .setName(topic) - .setPartitions(Collections.singletonList( - new OffsetCommitRequestData.OffsetCommitRequestPartition() - .setPartitionIndex(invalidPartitionId) - .setCommittedOffset(15) - .setCommittedLeaderEpoch(RecordBatch.NO_PARTITION_LEADER_EPOCH) - .setCommittedMetadata("")) - ) - ))).build() - - val request = buildRequest(offsetCommitRequest) - when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](), - any[Long])).thenReturn(0) - val kafkaApis = createKafkaApis() - try { - kafkaApis.handleOffsetCommitRequest(request, RequestLocal.withThreadConfinedCaching) - - val response = verifyNoThrottling[OffsetCommitResponse](request) - assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, - Errors.forCode(response.data.topics().get(0).partitions().get(0).errorCode)) - } finally { - kafkaApis.close() - } - } - - checkInvalidPartition(-1) - checkInvalidPartition(1) // topic has only one partition - } - @Test def testTxnOffsetCommitWithInvalidPartition(): Unit = { val topic = "topic" diff --git a/core/src/test/scala/unit/kafka/server/OffsetCommitRequestTest.scala b/core/src/test/scala/unit/kafka/server/OffsetCommitRequestTest.scala index f289c241d1b..eceb21a4077 100644 --- a/core/src/test/scala/unit/kafka/server/OffsetCommitRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/OffsetCommitRequestTest.scala @@ -16,6 +16,7 @@ */ package kafka.server +import org.apache.kafka.common.Uuid import org.apache.kafka.common.test.api.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, Type} import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.test.ClusterInstance @@ -46,7 +47,7 @@ class OffsetCommitRequestTest(cluster: ClusterInstance) extends GroupCoordinator createOffsetsTopic() // Create the topic. - createTopic( + val topicId = createTopic( topic = "foo", numPartitions = 3 ) @@ -55,7 +56,6 @@ class OffsetCommitRequestTest(cluster: ClusterInstance) extends GroupCoordinator // a session long enough for the duration of the test. val (memberId, memberEpoch) = joinConsumerGroup("grp", useNewProtocol) - // Start from version 1 because version 0 goes to ZK. for (version <- ApiKeys.OFFSET_COMMIT.oldestVersion to ApiKeys.OFFSET_COMMIT.latestVersion(isUnstableApiEnabled)) { // Commit offset. commitOffset( @@ -63,6 +63,7 @@ class OffsetCommitRequestTest(cluster: ClusterInstance) extends GroupCoordinator memberId = memberId, memberEpoch = memberEpoch, topic = "foo", + topicId = topicId, partition = 0, offset = 100L, expectedError = if (useNewProtocol && version < 9) Errors.UNSUPPORTED_VERSION else Errors.NONE, @@ -75,6 +76,7 @@ class OffsetCommitRequestTest(cluster: ClusterInstance) extends GroupCoordinator memberId = memberId, memberEpoch = memberEpoch, topic = "foo", + topicId = topicId, partition = 0, offset = 100L, expectedError = @@ -89,6 +91,7 @@ class OffsetCommitRequestTest(cluster: ClusterInstance) extends GroupCoordinator memberId = memberId, memberEpoch = memberEpoch, topic = "foo", + topicId = topicId, partition = 0, offset = 100L, expectedError = @@ -103,6 +106,7 @@ class OffsetCommitRequestTest(cluster: ClusterInstance) extends GroupCoordinator memberId = "", memberEpoch = memberEpoch, topic = "foo", + topicId = topicId, partition = 0, offset = 100L, expectedError = Errors.UNKNOWN_MEMBER_ID, @@ -115,6 +119,7 @@ class OffsetCommitRequestTest(cluster: ClusterInstance) extends GroupCoordinator memberId = memberId, memberEpoch = memberEpoch + 1, topic = "foo", + topicId = topicId, partition = 0, offset = 100L, expectedError = @@ -131,11 +136,27 @@ class OffsetCommitRequestTest(cluster: ClusterInstance) extends GroupCoordinator memberId = "", memberEpoch = -1, topic = "foo", + topicId = topicId, partition = 0, offset = 100L, expectedError = Errors.NONE, version = version.toShort ) + + // Commit offset to a group with an unknown topic id. + if (version >= 10) { + commitOffset( + groupId = "grp", + memberId = memberId, + memberEpoch = memberEpoch, + topic = "bar", + topicId = Uuid.randomUuid(), + partition = 0, + offset = 100L, + expectedError = Errors.UNKNOWN_TOPIC_ID, + version = version.toShort + ) + } } } } diff --git a/core/src/test/scala/unit/kafka/server/OffsetDeleteRequestTest.scala b/core/src/test/scala/unit/kafka/server/OffsetDeleteRequestTest.scala index c9201b24e98..0fc414e24c9 100644 --- a/core/src/test/scala/unit/kafka/server/OffsetDeleteRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/OffsetDeleteRequestTest.scala @@ -45,7 +45,7 @@ class OffsetDeleteRequestTest(cluster: ClusterInstance) extends GroupCoordinator createOffsetsTopic() // Create the topic. - createTopic( + val topicId = createTopic( topic = "foo", numPartitions = 3 ) @@ -65,6 +65,7 @@ class OffsetDeleteRequestTest(cluster: ClusterInstance) extends GroupCoordinator memberId = memberId, memberEpoch = memberEpoch, topic = "foo", + topicId = topicId, partition = partitionId, offset = 100L + partitionId, expectedError = Errors.NONE, diff --git a/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala b/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala index b49de577931..be95cef7844 100644 --- a/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala @@ -71,7 +71,7 @@ class OffsetFetchRequestTest(cluster: ClusterInstance) extends GroupCoordinatorB createOffsetsTopic() // Create the topic. - createTopic( + val topicId = createTopic( topic = "foo", numPartitions = 3 ) @@ -87,6 +87,7 @@ class OffsetFetchRequestTest(cluster: ClusterInstance) extends GroupCoordinatorB memberId = memberId, memberEpoch = memberEpoch, topic = "foo", + topicId = topicId, partition = partitionId, offset = 100L + partitionId, expectedError = Errors.NONE, @@ -239,7 +240,7 @@ class OffsetFetchRequestTest(cluster: ClusterInstance) extends GroupCoordinatorB createOffsetsTopic() // Create the topic. - createTopic( + val topicId = createTopic( topic = "foo", numPartitions = 3 ) @@ -255,6 +256,7 @@ class OffsetFetchRequestTest(cluster: ClusterInstance) extends GroupCoordinatorB memberId = memberId, memberEpoch = memberEpoch, topic = "foo", + topicId = topicId, partition = partitionId, offset = 100L + partitionId, expectedError = Errors.NONE, @@ -348,7 +350,7 @@ class OffsetFetchRequestTest(cluster: ClusterInstance) extends GroupCoordinatorB createOffsetsTopic() // Create the topic. - createTopic( + val topicId = createTopic( topic = "foo", numPartitions = 3 ) @@ -365,6 +367,7 @@ class OffsetFetchRequestTest(cluster: ClusterInstance) extends GroupCoordinatorB memberId = memberId, memberEpoch = memberEpoch, topic = "foo", + topicId = topicId, partition = partitionId, offset = 100L + partitionId, expectedError = Errors.NONE, diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala index 154afd34a60..7610e466207 100644 --- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala +++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala @@ -270,7 +270,7 @@ class RequestQuotaTest extends BaseRequestTest { .setTargetTimes(List(topic).asJava) case ApiKeys.OFFSET_COMMIT => - new OffsetCommitRequest.Builder( + OffsetCommitRequest.Builder.forTopicNames( new OffsetCommitRequestData() .setGroupId("test-group") .setGenerationIdOrMemberEpoch(1) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java index 2b50071a7f7..0fa997c557a 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java @@ -461,7 +461,9 @@ public class OffsetMetadataManager { final OptionalLong expireTimestampMs = expireTimestampMs(request.retentionTimeMs(), currentTimeMs); request.topics().forEach(topic -> { - final OffsetCommitResponseTopic topicResponse = new OffsetCommitResponseTopic().setName(topic.name()); + final OffsetCommitResponseTopic topicResponse = new OffsetCommitResponseTopic() + .setTopicId(topic.topicId()) + .setName(topic.name()); response.topics().add(topicResponse); topic.partitions().forEach(partition -> { @@ -470,8 +472,8 @@ public class OffsetMetadataManager { .setPartitionIndex(partition.partitionIndex()) .setErrorCode(Errors.OFFSET_METADATA_TOO_LARGE.code())); } else { - log.debug("[GroupId {}] Committing offsets {} for partition {}-{} from member {} with leader epoch {}.", - request.groupId(), partition.committedOffset(), topic.name(), partition.partitionIndex(), + log.debug("[GroupId {}] Committing offsets {} for partition {}-{}-{} from member {} with leader epoch {}.", + request.groupId(), partition.committedOffset(), topic.topicId(), topic.name(), partition.partitionIndex(), request.memberId(), partition.committedLeaderEpoch()); topicResponse.partitions().add(new OffsetCommitResponsePartition() diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java index 6f788d84fd0..382b2a9b0e5 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java @@ -1308,6 +1308,75 @@ public class OffsetMetadataManagerTest { ); } + @Test + public void testConsumerGroupOffsetCommitWithTopicIds() { + Uuid topicId = Uuid.randomUuid(); + OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); + + // Create an empty group. + ConsumerGroup group = context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup( + "foo", + true + ); + + // Add member. + group.updateMember(new ConsumerGroupMember.Builder("member") + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .build() + ); + + CoordinatorResult result = context.commitOffset( + new OffsetCommitRequestData() + .setGroupId("foo") + .setMemberId("member") + .setGenerationIdOrMemberEpoch(10) + .setTopics(List.of( + new OffsetCommitRequestData.OffsetCommitRequestTopic() + .setTopicId(topicId) + .setName("bar") + .setPartitions(List.of( + new OffsetCommitRequestData.OffsetCommitRequestPartition() + .setPartitionIndex(0) + .setCommittedOffset(100L) + .setCommittedLeaderEpoch(10) + .setCommittedMetadata("metadata") + )) + )) + ); + + assertEquals( + new OffsetCommitResponseData() + .setTopics(List.of( + new OffsetCommitResponseData.OffsetCommitResponseTopic() + .setTopicId(topicId) + .setName("bar") + .setPartitions(List.of( + new OffsetCommitResponseData.OffsetCommitResponsePartition() + .setPartitionIndex(0) + .setErrorCode(Errors.NONE.code()) + )) + )), + result.response() + ); + + assertEquals( + List.of(GroupCoordinatorRecordHelpers.newOffsetCommitRecord( + "foo", + "bar", + 0, + new OffsetAndMetadata( + 100L, + OptionalInt.of(10), + "metadata", + context.time.milliseconds(), + OptionalLong.empty() + ) + )), + result.records() + ); + } + @Test public void testConsumerGroupOffsetCommitWithOffsetMetadataTooLarge() { OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder()