KAFKA-14690; Add TopicId to OffsetCommit API (#19461)

This patch extends the OffsetCommit API to support topic ids. From
version 10 of the API, topic ids must be used. Originally, we wanted to
support both using topic ids and topic names from version 10 but it
turns out that it makes everything more complicated. Hence we propose to
only support topic ids from version 10. Clients which only support using
topic names can either lookup the topic ids using the Metadata API or
stay on using an earlier version.

The patch only contains the server side changes and it keeps the version
10 as unstable for now. We will mark the version as stable when the
client side changes are merged in.

Reviewers: Lianet Magrans <lmagrans@confluent.io>, PoAn Yang <payang@apache.org>
This commit is contained in:
David Jacot 2025-04-23 08:22:09 +02:00 committed by GitHub
parent b97a130c08
commit 71d08780d1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
23 changed files with 589 additions and 237 deletions

View File

@ -108,7 +108,7 @@ public class AlterConsumerGroupOffsetsHandler extends AdminApiHandler.Batched<Co
.setGroupId(groupId.idValue)
.setTopics(new ArrayList<>(offsetData.values()));
return new OffsetCommitRequest.Builder(data);
return OffsetCommitRequest.Builder.forTopicNames(data);
}
@Override

View File

@ -727,7 +727,7 @@ public class CommitRequestManager implements RequestManager, MemberStateListener
lastEpochSentOnCommit = Optional.empty();
}
OffsetCommitRequest.Builder builder = new OffsetCommitRequest.Builder(data);
OffsetCommitRequest.Builder builder = OffsetCommitRequest.Builder.forTopicNames(data);
return buildRequestWithResponseHandling(builder);
}

View File

@ -1327,7 +1327,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
groupInstanceId = null;
}
OffsetCommitRequest.Builder builder = new OffsetCommitRequest.Builder(
OffsetCommitRequest.Builder builder = OffsetCommitRequest.Builder.forTopicNames(
new OffsetCommitRequestData()
.setGroupId(this.rebalanceConfig.groupId)
.setGenerationIdOrMemberEpoch(generation.generationId)

View File

@ -17,6 +17,7 @@
package org.apache.kafka.common.requests;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.OffsetCommitRequestData;
import org.apache.kafka.common.message.OffsetCommitRequestData.OffsetCommitRequestTopic;
@ -45,20 +46,39 @@ public class OffsetCommitRequest extends AbstractRequest {
private final OffsetCommitRequestData data;
public Builder(OffsetCommitRequestData data, boolean enableUnstableLastVersion) {
super(ApiKeys.OFFSET_COMMIT, enableUnstableLastVersion);
private Builder(OffsetCommitRequestData data, short oldestAllowedVersion, short latestAllowedVersion) {
super(ApiKeys.OFFSET_COMMIT, oldestAllowedVersion, latestAllowedVersion);
this.data = data;
}
public Builder(OffsetCommitRequestData data) {
this(data, false);
public static Builder forTopicIdsOrNames(OffsetCommitRequestData data, boolean enableUnstableLastVersion) {
return new Builder(data, ApiKeys.OFFSET_COMMIT.oldestVersion(), ApiKeys.OFFSET_COMMIT.latestVersion(enableUnstableLastVersion));
}
public static Builder forTopicNames(OffsetCommitRequestData data) {
return new Builder(data, ApiKeys.OFFSET_COMMIT.oldestVersion(), (short) 9);
}
@Override
public OffsetCommitRequest build(short version) {
if (data.groupInstanceId() != null && version < 7) {
throw new UnsupportedVersionException("The broker offset commit protocol version " +
version + " does not support usage of config group.instance.id.");
throw new UnsupportedVersionException("The broker offset commit api version " +
version + " does not support usage of config group.instance.id.");
}
if (version >= 10) {
data.topics().forEach(topic -> {
if (topic.topicId() == null || topic.topicId().equals(Uuid.ZERO_UUID)) {
throw new UnsupportedVersionException("The broker offset commit api version " +
version + " does require usage of topic ids.");
}
});
} else {
data.topics().forEach(topic -> {
if (topic.name() == null || topic.name().isEmpty()) {
throw new UnsupportedVersionException("The broker offset commit api version " +
version + " does require usage of topic names.");
}
});
}
return new OffsetCommitRequest(data, version);
}
@ -97,6 +117,7 @@ public class OffsetCommitRequest extends AbstractRequest {
OffsetCommitResponseData response = new OffsetCommitResponseData();
request.topics().forEach(topic -> {
OffsetCommitResponseTopic responseTopic = new OffsetCommitResponseTopic()
.setTopicId(topic.topicId())
.setName(topic.name());
response.topics().add(responseTopic);

View File

@ -17,6 +17,7 @@
package org.apache.kafka.common.requests;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.OffsetCommitResponseData;
import org.apache.kafka.common.message.OffsetCommitResponseData.OffsetCommitResponsePartition;
import org.apache.kafka.common.message.OffsetCommitResponseData.OffsetCommitResponseTopic;
@ -123,43 +124,56 @@ public class OffsetCommitResponse extends AbstractResponse {
return version >= 4;
}
public static class Builder {
OffsetCommitResponseData data = new OffsetCommitResponseData();
HashMap<String, OffsetCommitResponseTopic> byTopicName = new HashMap<>();
public static boolean useTopicIds(short version) {
return version >= 10;
}
private OffsetCommitResponseTopic getOrCreateTopic(
String topicName
) {
OffsetCommitResponseTopic topic = byTopicName.get(topicName);
if (topic == null) {
topic = new OffsetCommitResponseTopic().setName(topicName);
data.topics().add(topic);
byTopicName.put(topicName, topic);
}
return topic;
public static Builder newBuilder(boolean useTopicIds) {
if (useTopicIds) {
return new TopicIdBuilder();
} else {
return new TopicNameBuilder();
}
}
public abstract static class Builder {
protected OffsetCommitResponseData data = new OffsetCommitResponseData();
protected abstract void add(
OffsetCommitResponseTopic topic
);
protected abstract OffsetCommitResponseTopic get(
Uuid topicId,
String topicName
);
protected abstract OffsetCommitResponseTopic getOrCreate(
Uuid topicId,
String topicName
);
public Builder addPartition(
Uuid topicId,
String topicName,
int partitionIndex,
Errors error
) {
final OffsetCommitResponseTopic topicResponse = getOrCreateTopic(topicName);
final OffsetCommitResponseTopic topicResponse = getOrCreate(topicId, topicName);
topicResponse.partitions().add(new OffsetCommitResponsePartition()
.setPartitionIndex(partitionIndex)
.setErrorCode(error.code()));
return this;
}
public <P> Builder addPartitions(
Uuid topicId,
String topicName,
List<P> partitions,
Function<P, Integer> partitionIndex,
Errors error
) {
final OffsetCommitResponseTopic topicResponse = getOrCreateTopic(topicName);
final OffsetCommitResponseTopic topicResponse = getOrCreate(topicId, topicName);
partitions.forEach(partition ->
topicResponse.partitions().add(new OffsetCommitResponsePartition()
.setPartitionIndex(partitionIndex.apply(partition))
@ -177,11 +191,10 @@ public class OffsetCommitResponse extends AbstractResponse {
} else {
// Otherwise, we have to merge them together.
newData.topics().forEach(newTopic -> {
OffsetCommitResponseTopic existingTopic = byTopicName.get(newTopic.name());
OffsetCommitResponseTopic existingTopic = get(newTopic.topicId(), newTopic.name());
if (existingTopic == null) {
// If no topic exists, we can directly copy the new topic data.
data.topics().add(newTopic);
byTopicName.put(newTopic.name(), newTopic);
add(newTopic);
} else {
// Otherwise, we add the partitions to the existing one. Note we
// expect non-overlapping partitions here as we don't verify
@ -190,7 +203,6 @@ public class OffsetCommitResponse extends AbstractResponse {
}
});
}
return this;
}
@ -198,4 +210,78 @@ public class OffsetCommitResponse extends AbstractResponse {
return new OffsetCommitResponse(data);
}
}
public static class TopicIdBuilder extends Builder {
private final HashMap<Uuid, OffsetCommitResponseTopic> byTopicId = new HashMap<>();
@Override
protected void add(OffsetCommitResponseTopic topic) {
throwIfTopicIdIsNull(topic.topicId());
data.topics().add(topic);
byTopicId.put(topic.topicId(), topic);
}
@Override
protected OffsetCommitResponseTopic get(Uuid topicId, String topicName) {
throwIfTopicIdIsNull(topicId);
return byTopicId.get(topicId);
}
@Override
protected OffsetCommitResponseTopic getOrCreate(Uuid topicId, String topicName) {
throwIfTopicIdIsNull(topicId);
OffsetCommitResponseTopic topic = byTopicId.get(topicId);
if (topic == null) {
topic = new OffsetCommitResponseTopic()
.setName(topicName)
.setTopicId(topicId);
data.topics().add(topic);
byTopicId.put(topicId, topic);
}
return topic;
}
private static void throwIfTopicIdIsNull(Uuid topicId) {
if (topicId == null) {
throw new IllegalArgumentException("TopicId cannot be null.");
}
}
}
public static class TopicNameBuilder extends Builder {
private final HashMap<String, OffsetCommitResponseTopic> byTopicName = new HashMap<>();
@Override
protected void add(OffsetCommitResponseTopic topic) {
throwIfTopicNameIsNull(topic.name());
data.topics().add(topic);
byTopicName.put(topic.name(), topic);
}
@Override
protected OffsetCommitResponseTopic get(Uuid topicId, String topicName) {
throwIfTopicNameIsNull(topicName);
return byTopicName.get(topicName);
}
@Override
protected OffsetCommitResponseTopic getOrCreate(Uuid topicId, String topicName) {
throwIfTopicNameIsNull(topicName);
OffsetCommitResponseTopic topic = byTopicName.get(topicName);
if (topic == null) {
topic = new OffsetCommitResponseTopic()
.setName(topicName)
.setTopicId(topicId);
data.topics().add(topic);
byTopicName.put(topicName, topic);
}
return topic;
}
private void throwIfTopicNameIsNull(String topicName) {
if (topicName == null) {
throw new IllegalArgumentException("TopicName cannot be null.");
}
}
}
}

View File

@ -36,8 +36,11 @@
//
// Version 9 is the first version that can be used with the new consumer group protocol (KIP-848). The
// request is the same as version 8.
"validVersions": "2-9",
//
// Version 10 adds support for topic ids and removes support for topic names (KIP-848).
"validVersions": "2-10",
"flexibleVersions": "8+",
"latestVersionUnstable": true,
"fields": [
{ "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
"about": "The unique group identifier." },
@ -52,8 +55,10 @@
"about": "The time period in ms to retain the offset." },
{ "name": "Topics", "type": "[]OffsetCommitRequestTopic", "versions": "0+",
"about": "The topics to commit offsets for.", "fields": [
{ "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
{ "name": "Name", "type": "string", "versions": "0-9", "entityType": "topicName", "ignorable": true,
"about": "The topic name." },
{ "name": "TopicId", "type": "uuid", "versions": "10+", "ignorable": true,
"about": "The topic ID." },
{ "name": "Partitions", "type": "[]OffsetCommitRequestPartition", "versions": "0+",
"about": "Each partition to commit offsets for.", "fields": [
{ "name": "PartitionIndex", "type": "int32", "versions": "0+",

View File

@ -34,7 +34,9 @@
// Version 9 is the first version that can be used with the new consumer group protocol (KIP-848). The response is
// the same as version 8 but can return STALE_MEMBER_EPOCH when the new consumer group protocol is used and
// GROUP_ID_NOT_FOUND when the group does not exist for both protocols.
"validVersions": "2-9",
//
// Version 10 adds support for topic ids and removes support for topic names (KIP-848).
"validVersions": "2-10",
"flexibleVersions": "8+",
// Supported errors:
// - GROUP_AUTHORIZATION_FAILED (version 0+)
@ -47,13 +49,16 @@
// - FENCED_MEMBER_EPOCH (version 7+)
// - GROUP_ID_NOT_FOUND (version 9+)
// - STALE_MEMBER_EPOCH (version 9+)
// - UNKNOWN_TOPIC_ID (version 10+)
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "3+", "ignorable": true,
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
{ "name": "Topics", "type": "[]OffsetCommitResponseTopic", "versions": "0+",
"about": "The responses for each topic.", "fields": [
{ "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
{ "name": "Name", "type": "string", "versions": "0-9", "entityType": "topicName", "ignorable": true,
"about": "The topic name." },
{ "name": "TopicId", "type": "uuid", "versions": "10+", "ignorable": true,
"about": "The topic ID." },
{ "name": "Partitions", "type": "[]OffsetCommitResponsePartition", "versions": "0+",
"about": "The responses for each partition in the topic.", "fields": [
{ "name": "PartitionIndex", "type": "int32", "versions": "0+",

View File

@ -682,7 +682,7 @@ public abstract class ConsumerCoordinatorTest {
)
);
consumerClient.send(coordinator.checkAndGetCoordinator(), new OffsetCommitRequest.Builder(offsetCommitRequestData))
consumerClient.send(coordinator.checkAndGetCoordinator(), OffsetCommitRequest.Builder.forTopicNames(offsetCommitRequestData))
.compose(new RequestFutureAdapter<ClientResponse, Object>() {
@Override
public void onSuccess(ClientResponse value, RequestFuture<Object> future) {}

View File

@ -56,11 +56,13 @@ import org.apache.kafka.common.protocol.Message;
import org.apache.kafka.common.protocol.MessageUtil;
import org.apache.kafka.common.protocol.ObjectSerializationCache;
import org.apache.kafka.common.protocol.types.RawTaggedField;
import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource;
import com.fasterxml.jackson.databind.JsonNode;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import java.lang.reflect.Method;
import java.nio.ByteBuffer;
@ -409,90 +411,49 @@ public final class MessageTest {
new OffsetForLeaderEpochRequestData().setReplicaId(-2));
}
@Test
public void testOffsetCommitRequestVersions() throws Exception {
String groupId = "groupId";
String topicName = "topic";
String metadata = "metadata";
int partition = 2;
int offset = 100;
@ParameterizedTest
@ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)
public void testOffsetCommitRequestVersions(short version) throws Exception {
OffsetCommitRequestData request = new OffsetCommitRequestData()
.setGroupId("groupId")
.setMemberId("memberId")
.setGenerationIdOrMemberEpoch(version >= 1 ? 10 : -1)
.setGroupInstanceId(version >= 7 ? "instanceId" : null)
.setRetentionTimeMs((version >= 2 && version <= 4) ? 20 : -1)
.setTopics(singletonList(
new OffsetCommitRequestTopic()
.setTopicId(version >= 10 ? Uuid.randomUuid() : Uuid.ZERO_UUID)
.setName(version < 10 ? "topic" : "")
.setPartitions(singletonList(
new OffsetCommitRequestPartition()
.setPartitionIndex(1)
.setCommittedMetadata("metadata")
.setCommittedOffset(100)
.setCommittedLeaderEpoch(version >= 6 ? 10 : -1)
testAllMessageRoundTrips(new OffsetCommitRequestData()
.setGroupId(groupId)
.setTopics(Collections.singletonList(
new OffsetCommitRequestTopic()
.setName(topicName)
.setPartitions(Collections.singletonList(
new OffsetCommitRequestPartition()
.setPartitionIndex(partition)
.setCommittedMetadata(metadata)
.setCommittedOffset(offset)
)))));
))
));
Supplier<OffsetCommitRequestData> request =
() -> new OffsetCommitRequestData()
.setGroupId(groupId)
.setMemberId("memberId")
.setGroupInstanceId("instanceId")
.setTopics(Collections.singletonList(
new OffsetCommitRequestTopic()
.setName(topicName)
.setPartitions(Collections.singletonList(
new OffsetCommitRequestPartition()
.setPartitionIndex(partition)
.setCommittedLeaderEpoch(10)
.setCommittedMetadata(metadata)
.setCommittedOffset(offset)
))))
.setRetentionTimeMs(20);
for (short version : ApiKeys.OFFSET_COMMIT.allVersions()) {
OffsetCommitRequestData requestData = request.get();
if (version > 4) {
requestData.setRetentionTimeMs(-1);
}
if (version < 6) {
requestData.topics().get(0).partitions().get(0).setCommittedLeaderEpoch(-1);
}
if (version < 7) {
requestData.setGroupInstanceId(null);
}
if (version >= 2 && version <= 4) {
testAllMessageRoundTripsBetweenVersions(version, (short) 5, requestData, requestData);
} else {
testAllMessageRoundTripsFromVersion(version, requestData);
}
}
testMessageRoundTrip(version, request, request);
}
@Test
public void testOffsetCommitResponseVersions() throws Exception {
Supplier<OffsetCommitResponseData> response =
() -> new OffsetCommitResponseData()
.setTopics(
singletonList(
new OffsetCommitResponseTopic()
.setName("topic")
.setPartitions(singletonList(
new OffsetCommitResponsePartition()
.setPartitionIndex(1)
.setErrorCode(Errors.UNKNOWN_MEMBER_ID.code())
))
)
)
.setThrottleTimeMs(20);
@ParameterizedTest
@ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)
public void testOffsetCommitResponseVersions(short version) throws Exception {
OffsetCommitResponseData response = new OffsetCommitResponseData()
.setThrottleTimeMs(version >= 3 ? 20 : 0)
.setTopics(singletonList(
new OffsetCommitResponseTopic()
.setTopicId(version >= 10 ? Uuid.randomUuid() : Uuid.ZERO_UUID)
.setName(version < 10 ? "topic" : "")
.setPartitions(singletonList(
new OffsetCommitResponsePartition()
.setPartitionIndex(1)
.setErrorCode(Errors.UNKNOWN_MEMBER_ID.code())
))
));
for (short version : ApiKeys.OFFSET_COMMIT.allVersions()) {
OffsetCommitResponseData responseData = response.get();
if (version < 3) {
responseData.setThrottleTimeMs(0);
}
testAllMessageRoundTripsFromVersion(version, responseData);
}
testMessageRoundTrip(version, response, response);
}
@Test

View File

@ -17,6 +17,7 @@
package org.apache.kafka.common.requests;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.OffsetCommitRequestData;
import org.apache.kafka.common.message.OffsetCommitRequestData.OffsetCommitRequestPartition;
@ -45,6 +46,8 @@ public class OffsetCommitRequestTest {
protected static String groupId = "groupId";
protected static String memberId = "consumerId";
protected static String groupInstanceId = "groupInstanceId";
protected static Uuid topicIdOne = Uuid.randomUuid();
protected static Uuid topicIdTwo = Uuid.randomUuid();
protected static String topicOne = "topicOne";
protected static String topicTwo = "topicTwo";
protected static int partitionOne = 1;
@ -61,6 +64,7 @@ public class OffsetCommitRequestTest {
public void setUp() {
List<OffsetCommitRequestTopic> topics = Arrays.asList(
new OffsetCommitRequestTopic()
.setTopicId(topicIdOne)
.setName(topicOne)
.setPartitions(Collections.singletonList(
new OffsetCommitRequestPartition()
@ -70,6 +74,7 @@ public class OffsetCommitRequestTest {
.setCommittedMetadata(metadata)
)),
new OffsetCommitRequestTopic()
.setTopicId(topicIdTwo)
.setName(topicTwo)
.setPartitions(Collections.singletonList(
new OffsetCommitRequestPartition()
@ -90,7 +95,7 @@ public class OffsetCommitRequestTest {
expectedOffsets.put(new TopicPartition(topicOne, partitionOne), offset);
expectedOffsets.put(new TopicPartition(topicTwo, partitionTwo), offset);
OffsetCommitRequest.Builder builder = new OffsetCommitRequest.Builder(data);
OffsetCommitRequest.Builder builder = OffsetCommitRequest.Builder.forTopicNames(data);
for (short version : ApiKeys.OFFSET_COMMIT.allVersions()) {
OffsetCommitRequest request = builder.build(version);
@ -105,7 +110,7 @@ public class OffsetCommitRequestTest {
@Test
public void testVersionSupportForGroupInstanceId() {
OffsetCommitRequest.Builder builder = new OffsetCommitRequest.Builder(
OffsetCommitRequest.Builder builder = OffsetCommitRequest.Builder.forTopicNames(
new OffsetCommitRequestData()
.setGroupId(groupId)
.setMemberId(memberId)
@ -127,12 +132,14 @@ public class OffsetCommitRequestTest {
OffsetCommitResponseData expectedResponse = new OffsetCommitResponseData()
.setTopics(Arrays.asList(
new OffsetCommitResponseTopic()
.setTopicId(topicIdOne)
.setName(topicOne)
.setPartitions(Collections.singletonList(
new OffsetCommitResponsePartition()
.setErrorCode(Errors.UNKNOWN_MEMBER_ID.code())
.setPartitionIndex(partitionOne))),
new OffsetCommitResponseTopic()
.setTopicId(topicIdTwo)
.setName(topicTwo)
.setPartitions(Collections.singletonList(
new OffsetCommitResponsePartition()

View File

@ -329,6 +329,8 @@ import static org.junit.jupiter.api.Assertions.fail;
// This class performs tests requests and responses for all API keys
public class RequestResponseTest {
private static final Uuid TOPIC_ID = Uuid.randomUuid();
// Exception includes a message that we verify is not included in error responses
private final UnknownServerException unknownServerException = new UnknownServerException("secret");
@ -2401,7 +2403,7 @@ public class RequestResponseTest {
}
private OffsetCommitRequest createOffsetCommitRequest(short version) {
return new OffsetCommitRequest.Builder(new OffsetCommitRequestData()
return OffsetCommitRequest.Builder.forTopicNames(new OffsetCommitRequestData()
.setGroupId("group1")
.setMemberId("consumer1")
.setGroupInstanceId(null)
@ -2409,6 +2411,7 @@ public class RequestResponseTest {
.setTopics(singletonList(
new OffsetCommitRequestData.OffsetCommitRequestTopic()
.setName("test")
.setTopicId(TOPIC_ID)
.setPartitions(asList(
new OffsetCommitRequestData.OffsetCommitRequestPartition()
.setPartitionIndex(0)
@ -2430,6 +2433,7 @@ public class RequestResponseTest {
.setTopics(singletonList(
new OffsetCommitResponseData.OffsetCommitResponseTopic()
.setName("test")
.setTopicId(TOPIC_ID)
.setPartitions(singletonList(
new OffsetCommitResponseData.OffsetCommitResponsePartition()
.setPartitionIndex(0)

View File

@ -275,11 +275,21 @@ class KafkaApis(val requestChannel: RequestChannel,
): CompletableFuture[Unit] = {
val offsetCommitRequest = request.body[OffsetCommitRequest]
// Reject the request if not authorized to the group
// Reject the request if not authorized to the group.
if (!authHelper.authorize(request.context, READ, GROUP, offsetCommitRequest.data.groupId)) {
requestHelper.sendMaybeThrottle(request, offsetCommitRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
CompletableFuture.completedFuture[Unit](())
} else {
val useTopicIds = OffsetCommitResponse.useTopicIds(request.header.apiVersion)
if (useTopicIds) {
offsetCommitRequest.data.topics.forEach { topic =>
if (topic.topicId != Uuid.ZERO_UUID) {
metadataCache.getTopicName(topic.topicId).ifPresent(name => topic.setName(name))
}
}
}
val authorizedTopics = authHelper.filterByAuthorized(
request.context,
READ,
@ -287,28 +297,40 @@ class KafkaApis(val requestChannel: RequestChannel,
offsetCommitRequest.data.topics.asScala
)(_.name)
val responseBuilder = new OffsetCommitResponse.Builder()
val responseBuilder = OffsetCommitResponse.newBuilder(useTopicIds)
val authorizedTopicsRequest = new mutable.ArrayBuffer[OffsetCommitRequestData.OffsetCommitRequestTopic]()
offsetCommitRequest.data.topics.forEach { topic =>
if (!authorizedTopics.contains(topic.name)) {
if (useTopicIds && topic.name.isEmpty) {
// If the topic name is undefined, it means that the topic id is unknown so we add
// the topic and all its partitions to the response with UNKNOWN_TOPIC_ID.
responseBuilder.addPartitions[OffsetCommitRequestData.OffsetCommitRequestPartition](
topic.topicId, topic.name, topic.partitions, _.partitionIndex, Errors.UNKNOWN_TOPIC_ID)
} else if (!authorizedTopics.contains(topic.name)) {
// If the topic is not authorized, we add the topic and all its partitions
// to the response with TOPIC_AUTHORIZATION_FAILED.
responseBuilder.addPartitions[OffsetCommitRequestData.OffsetCommitRequestPartition](
topic.name, topic.partitions, _.partitionIndex, Errors.TOPIC_AUTHORIZATION_FAILED)
topic.topicId, topic.name, topic.partitions, _.partitionIndex, Errors.TOPIC_AUTHORIZATION_FAILED)
} else if (!metadataCache.contains(topic.name)) {
// If the topic is unknown, we add the topic and all its partitions
// to the response with UNKNOWN_TOPIC_OR_PARTITION.
responseBuilder.addPartitions[OffsetCommitRequestData.OffsetCommitRequestPartition](
topic.name, topic.partitions, _.partitionIndex, Errors.UNKNOWN_TOPIC_OR_PARTITION)
topic.topicId, topic.name, topic.partitions, _.partitionIndex, Errors.UNKNOWN_TOPIC_OR_PARTITION)
} else {
// Otherwise, we check all partitions to ensure that they all exist.
val topicWithValidPartitions = new OffsetCommitRequestData.OffsetCommitRequestTopic().setName(topic.name)
val topicWithValidPartitions = new OffsetCommitRequestData.OffsetCommitRequestTopic()
.setTopicId(topic.topicId)
.setName(topic.name)
topic.partitions.forEach { partition =>
if (metadataCache.getLeaderAndIsr(topic.name, partition.partitionIndex).isPresent()) {
if (metadataCache.getLeaderAndIsr(topic.name, partition.partitionIndex).isPresent) {
topicWithValidPartitions.partitions.add(partition)
} else {
responseBuilder.addPartition(topic.name, partition.partitionIndex, Errors.UNKNOWN_TOPIC_OR_PARTITION)
responseBuilder.addPartition(
topic.topicId,
topic.name,
partition.partitionIndex,
Errors.UNKNOWN_TOPIC_OR_PARTITION
)
}
}
@ -322,42 +344,23 @@ class KafkaApis(val requestChannel: RequestChannel,
requestHelper.sendMaybeThrottle(request, responseBuilder.build())
CompletableFuture.completedFuture(())
} else {
// For version > 0, store offsets in Coordinator.
commitOffsetsToCoordinator(
request,
offsetCommitRequest,
authorizedTopicsRequest,
responseBuilder,
requestLocal
)
}
}
}
private def commitOffsetsToCoordinator(
request: RequestChannel.Request,
offsetCommitRequest: OffsetCommitRequest,
authorizedTopicsRequest: mutable.ArrayBuffer[OffsetCommitRequestData.OffsetCommitRequestTopic],
responseBuilder: OffsetCommitResponse.Builder,
requestLocal: RequestLocal
): CompletableFuture[Unit] = {
val offsetCommitRequestData = new OffsetCommitRequestData()
.setGroupId(offsetCommitRequest.data.groupId)
.setMemberId(offsetCommitRequest.data.memberId)
.setGenerationIdOrMemberEpoch(offsetCommitRequest.data.generationIdOrMemberEpoch)
.setRetentionTimeMs(offsetCommitRequest.data.retentionTimeMs)
.setGroupInstanceId(offsetCommitRequest.data.groupInstanceId)
.setTopics(authorizedTopicsRequest.asJava)
groupCoordinator.commitOffsets(
request.context,
offsetCommitRequestData,
requestLocal.bufferSupplier
).handle[Unit] { (results, exception) =>
if (exception != null) {
requestHelper.sendMaybeThrottle(request, offsetCommitRequest.getErrorResponse(exception))
} else {
requestHelper.sendMaybeThrottle(request, responseBuilder.merge(results).build())
groupCoordinator.commitOffsets(
request.context,
new OffsetCommitRequestData()
.setGroupId(offsetCommitRequest.data.groupId)
.setMemberId(offsetCommitRequest.data.memberId)
.setGenerationIdOrMemberEpoch(offsetCommitRequest.data.generationIdOrMemberEpoch)
.setRetentionTimeMs(offsetCommitRequest.data.retentionTimeMs)
.setGroupInstanceId(offsetCommitRequest.data.groupInstanceId)
.setTopics(authorizedTopicsRequest.asJava),
requestLocal.bufferSupplier
).handle[Unit] { (results, exception) =>
if (exception != null) {
requestHelper.sendMaybeThrottle(request, offsetCommitRequest.getErrorResponse(exception))
} else {
requestHelper.sendMaybeThrottle(request, responseBuilder.merge(results).build())
}
}
}
}
}

View File

@ -372,7 +372,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
}
private def createOffsetCommitRequest = {
new requests.OffsetCommitRequest.Builder(
requests.OffsetCommitRequest.Builder.forTopicNames(
new OffsetCommitRequestData()
.setGroupId(group)
.setMemberId(JoinGroupRequest.UNKNOWN_MEMBER_ID)

View File

@ -690,7 +690,7 @@ class ConsumerProtocolMigrationTest(cluster: ClusterInstance) extends GroupCoord
val topicName = "foo"
// Create the topic.
createTopic(
val topicId = createTopic(
topic = topicName,
numPartitions = 3
)
@ -702,6 +702,7 @@ class ConsumerProtocolMigrationTest(cluster: ClusterInstance) extends GroupCoord
memberId = "member-id",
memberEpoch = -1,
topic = topicName,
topicId = topicId,
partition = 0,
offset = 1000L,
expectedError = Errors.NONE,
@ -765,7 +766,7 @@ class ConsumerProtocolMigrationTest(cluster: ClusterInstance) extends GroupCoord
createOffsetsTopic()
// Create the topic.
createTopic(
val topicId = createTopic(
topic = "foo",
numPartitions = 3
)
@ -865,6 +866,7 @@ class ConsumerProtocolMigrationTest(cluster: ClusterInstance) extends GroupCoord
memberId = memberId1,
memberEpoch = 1,
topic = "foo",
topicId = topicId,
partition = partitionId,
offset = 100L + 10 * version + partitionId,
expectedError = Errors.NONE,
@ -1096,7 +1098,7 @@ class ConsumerProtocolMigrationTest(cluster: ClusterInstance) extends GroupCoord
createOffsetsTopic()
// Create the topic.
createTopic(
val topicId = createTopic(
topic = "foo",
numPartitions = 3
)
@ -1164,6 +1166,7 @@ class ConsumerProtocolMigrationTest(cluster: ClusterInstance) extends GroupCoord
memberId = memberId1,
memberEpoch = 1,
topic = "foo",
topicId = topicId,
partition = partitionId,
offset = 100L + 10 * version + partitionId,
expectedError = Errors.NONE,

View File

@ -48,7 +48,7 @@ class DeleteGroupsRequestTest(cluster: ClusterInstance) extends GroupCoordinator
createOffsetsTopic()
// Create the topic.
createTopic(
val topicId = createTopic(
topic = "foo",
numPartitions = 3
)
@ -89,6 +89,7 @@ class DeleteGroupsRequestTest(cluster: ClusterInstance) extends GroupCoordinator
memberId = memberId,
memberEpoch = memberEpoch,
topic = "foo",
topicId = topicId,
partition = 0,
offset = 100L,
expectedError = Errors.GROUP_ID_NOT_FOUND,

View File

@ -19,7 +19,7 @@ package kafka.server
import kafka.network.SocketServer
import kafka.utils.TestUtils
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata}
import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
import org.apache.kafka.common.{TopicCollection, TopicIdPartition, TopicPartition, Uuid}
import org.apache.kafka.common.message.DeleteGroupsResponseData.{DeletableGroupResult, DeletableGroupResultCollection}
import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity
import org.apache.kafka.common.message.LeaveGroupResponseData.MemberResponse
@ -75,7 +75,7 @@ class GroupCoordinatorBaseRequestTest(cluster: ClusterInstance) {
protected def createTopic(
topic: String,
numPartitions: Int
): Unit = {
): Uuid = {
val admin = cluster.admin()
try {
TestUtils.createTopicWithAdmin(
@ -85,6 +85,12 @@ class GroupCoordinatorBaseRequestTest(cluster: ClusterInstance) {
topic = topic,
numPartitions = numPartitions
)
admin
.describeTopics(TopicCollection.ofTopicNames(List(topic).asJava))
.allTopicNames()
.get()
.get(topic)
.topicId()
} finally {
admin.close()
}
@ -166,18 +172,24 @@ class GroupCoordinatorBaseRequestTest(cluster: ClusterInstance) {
memberId: String,
memberEpoch: Int,
topic: String,
topicId: Uuid,
partition: Int,
offset: Long,
expectedError: Errors,
version: Short = ApiKeys.OFFSET_COMMIT.latestVersion(isUnstableApiEnabled)
): Unit = {
val request = new OffsetCommitRequest.Builder(
if (version >= 10 && topicId == Uuid.ZERO_UUID) {
throw new IllegalArgumentException(s"Cannot call OffsetCommit API version $version without a topic id")
}
val request = OffsetCommitRequest.Builder.forTopicIdsOrNames(
new OffsetCommitRequestData()
.setGroupId(groupId)
.setMemberId(memberId)
.setGenerationIdOrMemberEpoch(memberEpoch)
.setTopics(List(
new OffsetCommitRequestData.OffsetCommitRequestTopic()
.setTopicId(topicId)
.setName(topic)
.setPartitions(List(
new OffsetCommitRequestData.OffsetCommitRequestPartition()
@ -191,7 +203,8 @@ class GroupCoordinatorBaseRequestTest(cluster: ClusterInstance) {
val expectedResponse = new OffsetCommitResponseData()
.setTopics(List(
new OffsetCommitResponseData.OffsetCommitResponseTopic()
.setName(topic)
.setTopicId(if (version >= 10) topicId else Uuid.ZERO_UUID)
.setName(if (version < 10) topic else "")
.setPartitions(List(
new OffsetCommitResponseData.OffsetCommitResponsePartition()
.setPartitionIndex(partition)

View File

@ -992,27 +992,43 @@ class KafkaApisTest extends Logging {
)
}
@Test
def testHandleOffsetCommitRequest(): Unit = {
addTopicToMetadataCache("foo", numPartitions = 1)
@ParameterizedTest
@ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)
def testHandleOffsetCommitRequest(version: Short): Unit = {
val topicName = "foo"
val topicId = Uuid.randomUuid()
addTopicToMetadataCache(topicName, topicId = topicId, numPartitions = 1)
val offsetCommitRequest = new OffsetCommitRequestData()
.setGroupId("group")
.setMemberId("member")
.setTopics(List(
new OffsetCommitRequestData.OffsetCommitRequestTopic()
.setName("foo")
.setTopicId(if (version >= 10) topicId else Uuid.ZERO_UUID)
.setName(if (version < 10) topicName else "")
.setPartitions(List(
new OffsetCommitRequestData.OffsetCommitRequestPartition()
.setPartitionIndex(0)
.setCommittedOffset(10)).asJava)).asJava)
val requestChannelRequest = buildRequest(new OffsetCommitRequest.Builder(offsetCommitRequest).build())
val expectedOffsetCommitRequest = new OffsetCommitRequestData()
.setGroupId("group")
.setMemberId("member")
.setTopics(List(
new OffsetCommitRequestData.OffsetCommitRequestTopic()
.setTopicId(if (version >= 10) topicId else Uuid.ZERO_UUID)
.setName(topicName)
.setPartitions(List(
new OffsetCommitRequestData.OffsetCommitRequestPartition()
.setPartitionIndex(0)
.setCommittedOffset(10)).asJava)).asJava)
val requestChannelRequest = buildRequest(OffsetCommitRequest.Builder.forTopicIdsOrNames(offsetCommitRequest, true).build(version))
val future = new CompletableFuture[OffsetCommitResponseData]()
when(groupCoordinator.commitOffsets(
requestChannelRequest.context,
offsetCommitRequest,
expectedOffsetCommitRequest,
RequestLocal.noCaching.bufferSupplier
)).thenReturn(future)
kafkaApis = createKafkaApis()
@ -1025,7 +1041,8 @@ class KafkaApisTest extends Logging {
val offsetCommitResponse = new OffsetCommitResponseData()
.setTopics(List(
new OffsetCommitResponseData.OffsetCommitResponseTopic()
.setName("foo")
.setTopicId(if (version >= 10) topicId else Uuid.ZERO_UUID)
.setName(if (version < 10) topicName else "")
.setPartitions(List(
new OffsetCommitResponseData.OffsetCommitResponsePartition()
.setPartitionIndex(0)
@ -1036,27 +1053,43 @@ class KafkaApisTest extends Logging {
assertEquals(offsetCommitResponse, response.data)
}
@Test
def testHandleOffsetCommitRequestFutureFailed(): Unit = {
addTopicToMetadataCache("foo", numPartitions = 1)
@ParameterizedTest
@ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)
def testHandleOffsetCommitRequestFutureFailed(version: Short): Unit = {
val topicName = "foo"
val topicId = Uuid.randomUuid()
addTopicToMetadataCache(topicName, topicId = topicId, numPartitions = 1)
val offsetCommitRequest = new OffsetCommitRequestData()
.setGroupId("group")
.setMemberId("member")
.setTopics(List(
new OffsetCommitRequestData.OffsetCommitRequestTopic()
.setName("foo")
.setTopicId(if (version >= 10) topicId else Uuid.ZERO_UUID)
.setName(if (version < 10) topicName else "")
.setPartitions(List(
new OffsetCommitRequestData.OffsetCommitRequestPartition()
.setPartitionIndex(0)
.setCommittedOffset(10)).asJava)).asJava)
val requestChannelRequest = buildRequest(new OffsetCommitRequest.Builder(offsetCommitRequest).build())
val expectedOffsetCommitRequest = new OffsetCommitRequestData()
.setGroupId("group")
.setMemberId("member")
.setTopics(List(
new OffsetCommitRequestData.OffsetCommitRequestTopic()
.setTopicId(if (version >= 10) topicId else Uuid.ZERO_UUID)
.setName(topicName)
.setPartitions(List(
new OffsetCommitRequestData.OffsetCommitRequestPartition()
.setPartitionIndex(0)
.setCommittedOffset(10)).asJava)).asJava)
val requestChannelRequest = buildRequest(OffsetCommitRequest.Builder.forTopicIdsOrNames(offsetCommitRequest, true).build(version))
val future = new CompletableFuture[OffsetCommitResponseData]()
when(groupCoordinator.commitOffsets(
requestChannelRequest.context,
offsetCommitRequest,
expectedOffsetCommitRequest,
RequestLocal.noCaching.bufferSupplier
)).thenReturn(future)
@ -1069,7 +1102,8 @@ class KafkaApisTest extends Logging {
val expectedOffsetCommitResponse = new OffsetCommitResponseData()
.setTopics(List(
new OffsetCommitResponseData.OffsetCommitResponseTopic()
.setName("foo")
.setTopicId(if (version >= 10) topicId else Uuid.ZERO_UUID)
.setName(if (version < 10) topicName else "")
.setPartitions(List(
new OffsetCommitResponseData.OffsetCommitResponsePartition()
.setPartitionIndex(0)
@ -1080,6 +1114,161 @@ class KafkaApisTest extends Logging {
assertEquals(expectedOffsetCommitResponse, response.data)
}
@Test
def testHandleOffsetCommitRequestTopicsAndPartitionsValidationWithTopicIds(): Unit = {
val fooId = Uuid.randomUuid()
val barId = Uuid.randomUuid()
val zarId = Uuid.randomUuid()
val fooName = "foo"
val barName = "bar"
addTopicToMetadataCache(fooName, topicId = fooId, numPartitions = 2)
addTopicToMetadataCache(barName, topicId = barId, numPartitions = 2)
val offsetCommitRequest = new OffsetCommitRequestData()
.setGroupId("group")
.setMemberId("member")
.setTopics(List(
// foo exists but only has 2 partitions.
new OffsetCommitRequestData.OffsetCommitRequestTopic()
.setTopicId(fooId)
.setPartitions(List(
new OffsetCommitRequestData.OffsetCommitRequestPartition()
.setPartitionIndex(0)
.setCommittedOffset(10),
new OffsetCommitRequestData.OffsetCommitRequestPartition()
.setPartitionIndex(1)
.setCommittedOffset(20),
new OffsetCommitRequestData.OffsetCommitRequestPartition()
.setPartitionIndex(2)
.setCommittedOffset(30)).asJava),
// bar exists.
new OffsetCommitRequestData.OffsetCommitRequestTopic()
.setTopicId(barId)
.setPartitions(List(
new OffsetCommitRequestData.OffsetCommitRequestPartition()
.setPartitionIndex(0)
.setCommittedOffset(40),
new OffsetCommitRequestData.OffsetCommitRequestPartition()
.setPartitionIndex(1)
.setCommittedOffset(50)).asJava),
// zar does not exist.
new OffsetCommitRequestData.OffsetCommitRequestTopic()
.setTopicId(zarId)
.setPartitions(List(
new OffsetCommitRequestData.OffsetCommitRequestPartition()
.setPartitionIndex(0)
.setCommittedOffset(60),
new OffsetCommitRequestData.OffsetCommitRequestPartition()
.setPartitionIndex(1)
.setCommittedOffset(70)).asJava)).asJava)
val requestChannelRequest = buildRequest(OffsetCommitRequest.Builder.forTopicIdsOrNames(offsetCommitRequest, true).build())
// This is the request expected by the group coordinator.
val expectedOffsetCommitRequest = new OffsetCommitRequestData()
.setGroupId("group")
.setMemberId("member")
.setTopics(List(
// foo exists but only has 2 partitions.
new OffsetCommitRequestData.OffsetCommitRequestTopic()
.setTopicId(fooId)
.setName(fooName)
.setPartitions(List(
new OffsetCommitRequestData.OffsetCommitRequestPartition()
.setPartitionIndex(0)
.setCommittedOffset(10),
new OffsetCommitRequestData.OffsetCommitRequestPartition()
.setPartitionIndex(1)
.setCommittedOffset(20)).asJava),
new OffsetCommitRequestData.OffsetCommitRequestTopic()
.setTopicId(barId)
.setName(barName)
.setPartitions(List(
new OffsetCommitRequestData.OffsetCommitRequestPartition()
.setPartitionIndex(0)
.setCommittedOffset(40),
new OffsetCommitRequestData.OffsetCommitRequestPartition()
.setPartitionIndex(1)
.setCommittedOffset(50)).asJava)).asJava)
val future = new CompletableFuture[OffsetCommitResponseData]()
when(groupCoordinator.commitOffsets(
requestChannelRequest.context,
expectedOffsetCommitRequest,
RequestLocal.noCaching.bufferSupplier
)).thenReturn(future)
kafkaApis = createKafkaApis()
kafkaApis.handle(
requestChannelRequest,
RequestLocal.noCaching
)
// This is the response returned by the group coordinator.
val offsetCommitResponse = new OffsetCommitResponseData()
.setTopics(List(
new OffsetCommitResponseData.OffsetCommitResponseTopic()
.setTopicId(fooId)
.setName(fooName)
.setPartitions(List(
new OffsetCommitResponseData.OffsetCommitResponsePartition()
.setPartitionIndex(0)
.setErrorCode(Errors.NONE.code),
new OffsetCommitResponseData.OffsetCommitResponsePartition()
.setPartitionIndex(1)
.setErrorCode(Errors.NONE.code)).asJava),
new OffsetCommitResponseData.OffsetCommitResponseTopic()
.setTopicId(barId)
.setName(barName)
.setPartitions(List(
new OffsetCommitResponseData.OffsetCommitResponsePartition()
.setPartitionIndex(0)
.setErrorCode(Errors.NONE.code),
new OffsetCommitResponseData.OffsetCommitResponsePartition()
.setPartitionIndex(1)
.setErrorCode(Errors.NONE.code)).asJava)).asJava)
val expectedOffsetCommitResponse = new OffsetCommitResponseData()
.setTopics(List(
new OffsetCommitResponseData.OffsetCommitResponseTopic()
.setTopicId(fooId)
.setPartitions(List(
// foo-2 is first because partitions failing the validation
// are put in the response first.
new OffsetCommitResponseData.OffsetCommitResponsePartition()
.setPartitionIndex(2)
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code),
new OffsetCommitResponseData.OffsetCommitResponsePartition()
.setPartitionIndex(0)
.setErrorCode(Errors.NONE.code),
new OffsetCommitResponseData.OffsetCommitResponsePartition()
.setPartitionIndex(1)
.setErrorCode(Errors.NONE.code)).asJava),
// zar is before bar because topics failing the validation are
// put in the response first.
new OffsetCommitResponseData.OffsetCommitResponseTopic()
.setTopicId(zarId)
.setPartitions(List(
new OffsetCommitResponseData.OffsetCommitResponsePartition()
.setPartitionIndex(0)
.setErrorCode(Errors.UNKNOWN_TOPIC_ID.code),
new OffsetCommitResponseData.OffsetCommitResponsePartition()
.setPartitionIndex(1)
.setErrorCode(Errors.UNKNOWN_TOPIC_ID.code)).asJava),
new OffsetCommitResponseData.OffsetCommitResponseTopic()
.setTopicId(barId)
.setPartitions(List(
new OffsetCommitResponseData.OffsetCommitResponsePartition()
.setPartitionIndex(0)
.setErrorCode(Errors.NONE.code),
new OffsetCommitResponseData.OffsetCommitResponsePartition()
.setPartitionIndex(1)
.setErrorCode(Errors.NONE.code)).asJava)).asJava)
future.complete(offsetCommitResponse)
val response = verifyNoThrottling[OffsetCommitResponse](requestChannelRequest)
assertEquals(expectedOffsetCommitResponse, response.data)
}
@Test
def testHandleOffsetCommitRequestTopicsAndPartitionsValidation(): Unit = {
addTopicToMetadataCache("foo", numPartitions = 2)
@ -1123,7 +1312,7 @@ class KafkaApisTest extends Logging {
.setPartitionIndex(1)
.setCommittedOffset(70)).asJava)).asJava)
val requestChannelRequest = buildRequest(new OffsetCommitRequest.Builder(offsetCommitRequest).build())
val requestChannelRequest = buildRequest(OffsetCommitRequest.Builder.forTopicNames(offsetCommitRequest).build())
// This is the request expected by the group coordinator.
val expectedOffsetCommitRequest = new OffsetCommitRequestData()
@ -1226,48 +1415,6 @@ class KafkaApisTest extends Logging {
assertEquals(expectedOffsetCommitResponse, response.data)
}
@Test
def testOffsetCommitWithInvalidPartition(): Unit = {
val topic = "topic"
addTopicToMetadataCache(topic, numPartitions = 1)
def checkInvalidPartition(invalidPartitionId: Int): Unit = {
reset(replicaManager, clientRequestQuotaManager, requestChannel)
val offsetCommitRequest = new OffsetCommitRequest.Builder(
new OffsetCommitRequestData()
.setGroupId("groupId")
.setTopics(Collections.singletonList(
new OffsetCommitRequestData.OffsetCommitRequestTopic()
.setName(topic)
.setPartitions(Collections.singletonList(
new OffsetCommitRequestData.OffsetCommitRequestPartition()
.setPartitionIndex(invalidPartitionId)
.setCommittedOffset(15)
.setCommittedLeaderEpoch(RecordBatch.NO_PARTITION_LEADER_EPOCH)
.setCommittedMetadata(""))
)
))).build()
val request = buildRequest(offsetCommitRequest)
when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
any[Long])).thenReturn(0)
val kafkaApis = createKafkaApis()
try {
kafkaApis.handleOffsetCommitRequest(request, RequestLocal.withThreadConfinedCaching)
val response = verifyNoThrottling[OffsetCommitResponse](request)
assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION,
Errors.forCode(response.data.topics().get(0).partitions().get(0).errorCode))
} finally {
kafkaApis.close()
}
}
checkInvalidPartition(-1)
checkInvalidPartition(1) // topic has only one partition
}
@Test
def testTxnOffsetCommitWithInvalidPartition(): Unit = {
val topic = "topic"

View File

@ -16,6 +16,7 @@
*/
package kafka.server
import org.apache.kafka.common.Uuid
import org.apache.kafka.common.test.api.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, Type}
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.test.ClusterInstance
@ -46,7 +47,7 @@ class OffsetCommitRequestTest(cluster: ClusterInstance) extends GroupCoordinator
createOffsetsTopic()
// Create the topic.
createTopic(
val topicId = createTopic(
topic = "foo",
numPartitions = 3
)
@ -55,7 +56,6 @@ class OffsetCommitRequestTest(cluster: ClusterInstance) extends GroupCoordinator
// a session long enough for the duration of the test.
val (memberId, memberEpoch) = joinConsumerGroup("grp", useNewProtocol)
// Start from version 1 because version 0 goes to ZK.
for (version <- ApiKeys.OFFSET_COMMIT.oldestVersion to ApiKeys.OFFSET_COMMIT.latestVersion(isUnstableApiEnabled)) {
// Commit offset.
commitOffset(
@ -63,6 +63,7 @@ class OffsetCommitRequestTest(cluster: ClusterInstance) extends GroupCoordinator
memberId = memberId,
memberEpoch = memberEpoch,
topic = "foo",
topicId = topicId,
partition = 0,
offset = 100L,
expectedError = if (useNewProtocol && version < 9) Errors.UNSUPPORTED_VERSION else Errors.NONE,
@ -75,6 +76,7 @@ class OffsetCommitRequestTest(cluster: ClusterInstance) extends GroupCoordinator
memberId = memberId,
memberEpoch = memberEpoch,
topic = "foo",
topicId = topicId,
partition = 0,
offset = 100L,
expectedError =
@ -89,6 +91,7 @@ class OffsetCommitRequestTest(cluster: ClusterInstance) extends GroupCoordinator
memberId = memberId,
memberEpoch = memberEpoch,
topic = "foo",
topicId = topicId,
partition = 0,
offset = 100L,
expectedError =
@ -103,6 +106,7 @@ class OffsetCommitRequestTest(cluster: ClusterInstance) extends GroupCoordinator
memberId = "",
memberEpoch = memberEpoch,
topic = "foo",
topicId = topicId,
partition = 0,
offset = 100L,
expectedError = Errors.UNKNOWN_MEMBER_ID,
@ -115,6 +119,7 @@ class OffsetCommitRequestTest(cluster: ClusterInstance) extends GroupCoordinator
memberId = memberId,
memberEpoch = memberEpoch + 1,
topic = "foo",
topicId = topicId,
partition = 0,
offset = 100L,
expectedError =
@ -131,11 +136,27 @@ class OffsetCommitRequestTest(cluster: ClusterInstance) extends GroupCoordinator
memberId = "",
memberEpoch = -1,
topic = "foo",
topicId = topicId,
partition = 0,
offset = 100L,
expectedError = Errors.NONE,
version = version.toShort
)
// Commit offset to a group with an unknown topic id.
if (version >= 10) {
commitOffset(
groupId = "grp",
memberId = memberId,
memberEpoch = memberEpoch,
topic = "bar",
topicId = Uuid.randomUuid(),
partition = 0,
offset = 100L,
expectedError = Errors.UNKNOWN_TOPIC_ID,
version = version.toShort
)
}
}
}
}

View File

@ -45,7 +45,7 @@ class OffsetDeleteRequestTest(cluster: ClusterInstance) extends GroupCoordinator
createOffsetsTopic()
// Create the topic.
createTopic(
val topicId = createTopic(
topic = "foo",
numPartitions = 3
)
@ -65,6 +65,7 @@ class OffsetDeleteRequestTest(cluster: ClusterInstance) extends GroupCoordinator
memberId = memberId,
memberEpoch = memberEpoch,
topic = "foo",
topicId = topicId,
partition = partitionId,
offset = 100L + partitionId,
expectedError = Errors.NONE,

View File

@ -71,7 +71,7 @@ class OffsetFetchRequestTest(cluster: ClusterInstance) extends GroupCoordinatorB
createOffsetsTopic()
// Create the topic.
createTopic(
val topicId = createTopic(
topic = "foo",
numPartitions = 3
)
@ -87,6 +87,7 @@ class OffsetFetchRequestTest(cluster: ClusterInstance) extends GroupCoordinatorB
memberId = memberId,
memberEpoch = memberEpoch,
topic = "foo",
topicId = topicId,
partition = partitionId,
offset = 100L + partitionId,
expectedError = Errors.NONE,
@ -239,7 +240,7 @@ class OffsetFetchRequestTest(cluster: ClusterInstance) extends GroupCoordinatorB
createOffsetsTopic()
// Create the topic.
createTopic(
val topicId = createTopic(
topic = "foo",
numPartitions = 3
)
@ -255,6 +256,7 @@ class OffsetFetchRequestTest(cluster: ClusterInstance) extends GroupCoordinatorB
memberId = memberId,
memberEpoch = memberEpoch,
topic = "foo",
topicId = topicId,
partition = partitionId,
offset = 100L + partitionId,
expectedError = Errors.NONE,
@ -348,7 +350,7 @@ class OffsetFetchRequestTest(cluster: ClusterInstance) extends GroupCoordinatorB
createOffsetsTopic()
// Create the topic.
createTopic(
val topicId = createTopic(
topic = "foo",
numPartitions = 3
)
@ -365,6 +367,7 @@ class OffsetFetchRequestTest(cluster: ClusterInstance) extends GroupCoordinatorB
memberId = memberId,
memberEpoch = memberEpoch,
topic = "foo",
topicId = topicId,
partition = partitionId,
offset = 100L + partitionId,
expectedError = Errors.NONE,

View File

@ -270,7 +270,7 @@ class RequestQuotaTest extends BaseRequestTest {
.setTargetTimes(List(topic).asJava)
case ApiKeys.OFFSET_COMMIT =>
new OffsetCommitRequest.Builder(
OffsetCommitRequest.Builder.forTopicNames(
new OffsetCommitRequestData()
.setGroupId("test-group")
.setGenerationIdOrMemberEpoch(1)

View File

@ -461,7 +461,9 @@ public class OffsetMetadataManager {
final OptionalLong expireTimestampMs = expireTimestampMs(request.retentionTimeMs(), currentTimeMs);
request.topics().forEach(topic -> {
final OffsetCommitResponseTopic topicResponse = new OffsetCommitResponseTopic().setName(topic.name());
final OffsetCommitResponseTopic topicResponse = new OffsetCommitResponseTopic()
.setTopicId(topic.topicId())
.setName(topic.name());
response.topics().add(topicResponse);
topic.partitions().forEach(partition -> {
@ -470,8 +472,8 @@ public class OffsetMetadataManager {
.setPartitionIndex(partition.partitionIndex())
.setErrorCode(Errors.OFFSET_METADATA_TOO_LARGE.code()));
} else {
log.debug("[GroupId {}] Committing offsets {} for partition {}-{} from member {} with leader epoch {}.",
request.groupId(), partition.committedOffset(), topic.name(), partition.partitionIndex(),
log.debug("[GroupId {}] Committing offsets {} for partition {}-{}-{} from member {} with leader epoch {}.",
request.groupId(), partition.committedOffset(), topic.topicId(), topic.name(), partition.partitionIndex(),
request.memberId(), partition.committedLeaderEpoch());
topicResponse.partitions().add(new OffsetCommitResponsePartition()

View File

@ -1308,6 +1308,75 @@ public class OffsetMetadataManagerTest {
);
}
@Test
public void testConsumerGroupOffsetCommitWithTopicIds() {
Uuid topicId = Uuid.randomUuid();
OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build();
// Create an empty group.
ConsumerGroup group = context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup(
"foo",
true
);
// Add member.
group.updateMember(new ConsumerGroupMember.Builder("member")
.setMemberEpoch(10)
.setPreviousMemberEpoch(10)
.build()
);
CoordinatorResult<OffsetCommitResponseData, CoordinatorRecord> result = context.commitOffset(
new OffsetCommitRequestData()
.setGroupId("foo")
.setMemberId("member")
.setGenerationIdOrMemberEpoch(10)
.setTopics(List.of(
new OffsetCommitRequestData.OffsetCommitRequestTopic()
.setTopicId(topicId)
.setName("bar")
.setPartitions(List.of(
new OffsetCommitRequestData.OffsetCommitRequestPartition()
.setPartitionIndex(0)
.setCommittedOffset(100L)
.setCommittedLeaderEpoch(10)
.setCommittedMetadata("metadata")
))
))
);
assertEquals(
new OffsetCommitResponseData()
.setTopics(List.of(
new OffsetCommitResponseData.OffsetCommitResponseTopic()
.setTopicId(topicId)
.setName("bar")
.setPartitions(List.of(
new OffsetCommitResponseData.OffsetCommitResponsePartition()
.setPartitionIndex(0)
.setErrorCode(Errors.NONE.code())
))
)),
result.response()
);
assertEquals(
List.of(GroupCoordinatorRecordHelpers.newOffsetCommitRecord(
"foo",
"bar",
0,
new OffsetAndMetadata(
100L,
OptionalInt.of(10),
"metadata",
context.time.milliseconds(),
OptionalLong.empty()
)
)),
result.records()
);
}
@Test
public void testConsumerGroupOffsetCommitWithOffsetMetadataTooLarge() {
OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder()