diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AlterShareGroupOffsetsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AlterShareGroupOffsetsRequest.java index 6c663e3a40e..2eb9e37bc50 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AlterShareGroupOffsetsRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AlterShareGroupOffsetsRequest.java @@ -23,15 +23,11 @@ import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.Readable; -import java.util.ArrayList; -import java.util.List; -import java.util.stream.Collectors; - public class AlterShareGroupOffsetsRequest extends AbstractRequest { private final AlterShareGroupOffsetsRequestData data; - public AlterShareGroupOffsetsRequest(AlterShareGroupOffsetsRequestData data, short version) { + private AlterShareGroupOffsetsRequest(AlterShareGroupOffsetsRequestData data, short version) { super(ApiKeys.ALTER_SHARE_GROUP_OFFSETS, version); this.data = data; } @@ -58,17 +54,25 @@ public class AlterShareGroupOffsetsRequest extends AbstractRequest { @Override public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { - List results = new ArrayList<>(); - data.topics().forEach( - topicResult -> results.add(new AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponseTopic() - .setTopicName(topicResult.topicName()) - .setPartitions(topicResult.partitions().stream() - .map(partitionData -> new AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponsePartition() - .setPartitionIndex(partitionData.partitionIndex()) - .setErrorCode(Errors.forException(e).code())) - .collect(Collectors.toList())))); - return new AlterShareGroupOffsetsResponse(new AlterShareGroupOffsetsResponseData() - .setResponses(results)); + Errors error = Errors.forException(e); + return new AlterShareGroupOffsetsResponse(getErrorResponse(throttleTimeMs, error)); + } + + public static AlterShareGroupOffsetsResponseData getErrorResponse(int throttleTimeMs, Errors error) { + return new AlterShareGroupOffsetsResponseData() + .setThrottleTimeMs(throttleTimeMs) + .setErrorCode(error.code()) + .setErrorMessage(error.message()); + } + + public static AlterShareGroupOffsetsResponseData getErrorResponse(Errors error) { + return getErrorResponse(error.code(), error.message()); + } + + public static AlterShareGroupOffsetsResponseData getErrorResponse(short errorCode, String errorMessage) { + return new AlterShareGroupOffsetsResponseData() + .setErrorCode(errorCode) + .setErrorMessage(errorMessage); } public static AlterShareGroupOffsetsRequest parse(Readable readable, short version) { diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AlterShareGroupOffsetsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AlterShareGroupOffsetsResponse.java index 98814415cf6..7cd05e22807 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AlterShareGroupOffsetsResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AlterShareGroupOffsetsResponse.java @@ -17,12 +17,16 @@ package org.apache.kafka.common.requests; +import org.apache.kafka.common.Uuid; import org.apache.kafka.common.message.AlterShareGroupOffsetsResponseData; +import org.apache.kafka.common.message.AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponseTopic; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.Readable; +import java.util.ArrayList; import java.util.EnumMap; +import java.util.HashMap; import java.util.Map; public class AlterShareGroupOffsetsResponse extends AbstractResponse { @@ -37,6 +41,7 @@ public class AlterShareGroupOffsetsResponse extends AbstractResponse { @Override public Map errorCounts() { Map counts = new EnumMap<>(Errors.class); + updateErrorCounts(counts, Errors.forCode(data.errorCode())); data.responses().forEach(topic -> topic.partitions().forEach(partitionResponse -> updateErrorCounts(counts, Errors.forCode(partitionResponse.errorCode())) )); @@ -63,4 +68,47 @@ public class AlterShareGroupOffsetsResponse extends AbstractResponse { new AlterShareGroupOffsetsResponseData(readable, version) ); } + + public static class Builder { + AlterShareGroupOffsetsResponseData data = new AlterShareGroupOffsetsResponseData(); + HashMap topics = new HashMap<>(); + + private AlterShareGroupOffsetsResponseTopic getOrCreateTopic(String topic, Uuid topicId) { + AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponseTopic topicData = topics.get(topic); + if (topicData == null) { + topicData = new AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponseTopic() + .setTopicName(topic) + .setTopicId(topicId == null ? Uuid.ZERO_UUID : topicId); + topics.put(topic, topicData); + } + return topicData; + } + + public Builder addPartition(String topic, int partition, Map topicIdsToNames, Errors error) { + AlterShareGroupOffsetsResponseTopic topicData = getOrCreateTopic(topic, topicIdsToNames.get(topic)); + topicData.partitions().add(new AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponsePartition() + .setPartitionIndex(partition) + .setErrorCode(error.code()) + .setErrorMessage(error.message())); + return this; + } + + public AlterShareGroupOffsetsResponse build() { + data.setResponses(new ArrayList<>(topics.values())); + return new AlterShareGroupOffsetsResponse(data); + } + + public Builder merge(AlterShareGroupOffsetsResponseData data, Map topicIdsToNames) { + data.responses().forEach(topic -> { + AlterShareGroupOffsetsResponseTopic newTopic = getOrCreateTopic(topic.topicName(), topicIdsToNames.get(topic.topicName())); + topic.partitions().forEach(partition -> newTopic.partitions().add( + new AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponsePartition() + .setPartitionIndex(partition.partitionIndex()) + .setErrorCode(partition.errorCode()) + .setErrorMessage(partition.errorMessage()))); + }); + return this; + + } + } } diff --git a/clients/src/main/resources/common/message/AlterShareGroupOffsetsResponse.json b/clients/src/main/resources/common/message/AlterShareGroupOffsetsResponse.json index 34bc35c72ac..6691eedaa1e 100644 --- a/clients/src/main/resources/common/message/AlterShareGroupOffsetsResponse.json +++ b/clients/src/main/resources/common/message/AlterShareGroupOffsetsResponse.json @@ -19,6 +19,10 @@ "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, + { "name": "ErrorCode", "type": "int16", "versions": "0+", + "about": "The top-level error code, or 0 if there was no error." }, + { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", + "about": "The top-level error message, or null if there was no error." }, { "name": "Responses", "type": "[]AlterShareGroupOffsetsResponseTopic", "versions": "0+", "about": "The results for each topic.", "fields": [ { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName", diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index bc155353097..595ef5dc2c5 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -3738,11 +3738,50 @@ class KafkaApis(val requestChannel: RequestChannel, def handleAlterShareGroupOffsetsRequest(request: RequestChannel.Request): CompletableFuture[Unit] = { val alterShareGroupOffsetsRequest = request.body[AlterShareGroupOffsetsRequest] + val groupId = alterShareGroupOffsetsRequest.data.groupId + if (!isShareGroupProtocolEnabled) { requestHelper.sendMaybeThrottle(request, alterShareGroupOffsetsRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, Errors.UNSUPPORTED_VERSION.exception)) return CompletableFuture.completedFuture[Unit](()) + } else if (!authHelper.authorize(request.context, READ, GROUP, groupId)) { + requestHelper.sendMaybeThrottle(request, alterShareGroupOffsetsRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception)) + } else { + val responseBuilder = new AlterShareGroupOffsetsResponse.Builder() + val authorizedTopicPartitions = new AlterShareGroupOffsetsRequestData.AlterShareGroupOffsetsRequestTopicCollection() + + alterShareGroupOffsetsRequest.data.topics.forEach(topic => { + val topicError = { + if (!authHelper.authorize(request.context, READ, TOPIC, topic.topicName())) { + Some(new ApiError(Errors.TOPIC_AUTHORIZATION_FAILED)) + } else if (!metadataCache.contains(topic.topicName())) { + Some(new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION)) + } else { + None + } + } + topicError match { + case Some(error) => + topic.partitions().forEach(partition => responseBuilder.addPartition(topic.topicName(), partition.partitionIndex(), metadataCache.topicNamesToIds(), error.error)) + case None => + authorizedTopicPartitions.add(topic) + } + }) + + val data = new AlterShareGroupOffsetsRequestData() + .setGroupId(groupId) + .setTopics(authorizedTopicPartitions) + groupCoordinator.alterShareGroupOffsets( + request.context, + groupId, + data + ).handle[Unit] { (response, exception) => + if (exception != null) { + requestHelper.sendMaybeThrottle(request, alterShareGroupOffsetsRequest.getErrorResponse(exception)) + } else { + requestHelper.sendMaybeThrottle(request, responseBuilder.merge(response, metadataCache.topicNamesToIds()).build()) + } + } } - requestHelper.sendMaybeThrottle(request, alterShareGroupOffsetsRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) CompletableFuture.completedFuture[Unit](()) } diff --git a/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala b/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala index fb18da6872a..88b2cf07012 100644 --- a/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala +++ b/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala @@ -373,6 +373,8 @@ class KRaftMetadataCache( override def topicIdsToNames(): util.Map[Uuid, String] = _currentImage.topics.topicIdToNameView() + override def topicNamesToIds(): util.Map[String, Uuid] = _currentImage.topics().topicNameToIdView() + // if the leader is not known, return None; // if the leader is known and corresponding node is available, return Some(node) // if the leader is known but corresponding node with the listener name is not available, return Some(NO_NODE) diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index 93d138e8946..3ca2b111c08 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -13167,6 +13167,181 @@ class KafkaApisTest extends Logging { }) } + @Test + def testAlterShareGroupOffsetsSuccess(): Unit = { + val groupId = "group" + val topicName1 = "foo" + val topicId1 = Uuid.randomUuid + metadataCache = initializeMetadataCacheWithShareGroupsEnabled() + addTopicToMetadataCache(topicName1, 2, topicId = topicId1) + val topicCollection = new AlterShareGroupOffsetsRequestTopicCollection(); + topicCollection.addAll(util.List.of( + new AlterShareGroupOffsetsRequestData.AlterShareGroupOffsetsRequestTopic() + .setTopicName(topicName1) + .setPartitions(List( + new AlterShareGroupOffsetsRequestData.AlterShareGroupOffsetsRequestPartition() + .setPartitionIndex(0) + .setStartOffset(0L), + new AlterShareGroupOffsetsRequestData.AlterShareGroupOffsetsRequestPartition() + .setPartitionIndex(1) + .setStartOffset(0L) + ).asJava))) + + val alterRequestData = new AlterShareGroupOffsetsRequestData() + .setGroupId(groupId) + .setTopics(topicCollection) + + val requestChannelRequest = buildRequest(new AlterShareGroupOffsetsRequest.Builder(alterRequestData).build) + val resultFuture = new CompletableFuture[AlterShareGroupOffsetsResponseData] + when(groupCoordinator.alterShareGroupOffsets( + any(), + ArgumentMatchers.eq[String](groupId), + ArgumentMatchers.any(classOf[AlterShareGroupOffsetsRequestData]) + )).thenReturn(resultFuture) + + kafkaApis = createKafkaApis() + kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching) + + val alterShareGroupOffsetsResponse = new AlterShareGroupOffsetsResponseData() + resultFuture.complete(alterShareGroupOffsetsResponse) + val response = verifyNoThrottling[AlterShareGroupOffsetsResponse](requestChannelRequest) + assertEquals(alterShareGroupOffsetsResponse, response.data) + } + + @Test + def testAlterShareGroupOffsetsAuthorizationFailed(): Unit = { + val groupId = "group" + val topicName1 = "foo" + val topicId1 = Uuid.randomUuid + val topicName2 = "bar" + val topicId2 = Uuid.randomUuid + val topicName3 = "zoo" + metadataCache = initializeMetadataCacheWithShareGroupsEnabled() + addTopicToMetadataCache(topicName1, 2, topicId = topicId1) + addTopicToMetadataCache(topicName2, 1, topicId = topicId2) + val topicCollection = new AlterShareGroupOffsetsRequestTopicCollection(); + topicCollection.addAll(util.List.of( + new AlterShareGroupOffsetsRequestData.AlterShareGroupOffsetsRequestTopic() + .setTopicName(topicName1) + .setPartitions(List( + new AlterShareGroupOffsetsRequestData.AlterShareGroupOffsetsRequestPartition() + .setPartitionIndex(0) + .setStartOffset(0L), + new AlterShareGroupOffsetsRequestData.AlterShareGroupOffsetsRequestPartition() + .setPartitionIndex(1) + .setStartOffset(0L) + ).asJava), + new AlterShareGroupOffsetsRequestData.AlterShareGroupOffsetsRequestTopic() + .setTopicName(topicName2) + .setPartitions(List( + new AlterShareGroupOffsetsRequestData.AlterShareGroupOffsetsRequestPartition() + .setPartitionIndex(0) + .setStartOffset(0L) + ).asJava), + new AlterShareGroupOffsetsRequestData.AlterShareGroupOffsetsRequestTopic() + .setTopicName(topicName3) + setPartitions(List( + new AlterShareGroupOffsetsRequestData.AlterShareGroupOffsetsRequestPartition() + .setPartitionIndex(0) + .setStartOffset(0L) + ).asJava)) + ) + + val authorizer: Authorizer = mock(classOf[Authorizer]) + when(authorizer.authorize(any[RequestContext], any[util.List[Action]])) + .thenReturn(Seq(AuthorizationResult.ALLOWED).asJava, Seq(AuthorizationResult.DENIED).asJava, Seq(AuthorizationResult.ALLOWED).asJava, Seq(AuthorizationResult.ALLOWED).asJava) + + val alterRequestData = new AlterShareGroupOffsetsRequestData() + .setGroupId(groupId) + .setTopics(topicCollection) + + val requestChannelRequest = buildRequest(new AlterShareGroupOffsetsRequest.Builder(alterRequestData).build) + val resultFuture = new CompletableFuture[AlterShareGroupOffsetsResponseData] + when(groupCoordinator.alterShareGroupOffsets( + any(), + ArgumentMatchers.eq[String](groupId), + ArgumentMatchers.any(classOf[AlterShareGroupOffsetsRequestData]) + )).thenReturn(resultFuture) + + kafkaApis = createKafkaApis(authorizer = Some(authorizer)) + kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching) + + val alterShareGroupOffsetsResponse = new AlterShareGroupOffsetsResponseData() + .setResponses(List( + new AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponseTopic() + .setTopicName(topicName2) + .setTopicId(topicId2) + .setPartitions(List( + new AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponsePartition() + .setPartitionIndex(0) + .setErrorCode(Errors.NONE.code()) + .setErrorMessage(Errors.NONE.message()) + ).asJava) + ).asJava) + resultFuture.complete(alterShareGroupOffsetsResponse) + val response = verifyNoThrottling[AlterShareGroupOffsetsResponse](requestChannelRequest) + + assertNotNull(response.data) + assertEquals(1, response.errorCounts().get(Errors.UNKNOWN_TOPIC_OR_PARTITION)) + assertEquals(2, response.errorCounts().get(Errors.TOPIC_AUTHORIZATION_FAILED)) + assertEquals(3, response.data().responses().size()) + + val bar = response.data().responses().get(0) + val foo = response.data().responses().get(1) + val zoo = response.data().responses().get(2) + assertEquals(topicName1, foo.topicName()) + assertEquals(topicId1, foo.topicId()) + assertEquals(topicName2, bar.topicName()) + assertEquals(topicId2, bar.topicId()) + assertEquals(topicName3, zoo.topicName()) + assertEquals(Uuid.ZERO_UUID, zoo.topicId()) + foo.partitions().forEach(partition => { + assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED.code(), partition.errorCode()) + }) + } + + @Test + def testAlterShareGroupOffsetsRequestGroupCoordinatorThrowsError(): Unit = { + val groupId = "group" + val topicName1 = "foo" + val topicId1 = Uuid.randomUuid + metadataCache = initializeMetadataCacheWithShareGroupsEnabled() + addTopicToMetadataCache(topicName1, 2, topicId = topicId1) + val topicCollection = new AlterShareGroupOffsetsRequestTopicCollection(); + topicCollection.addAll(util.List.of( + new AlterShareGroupOffsetsRequestData.AlterShareGroupOffsetsRequestTopic() + .setTopicName(topicName1) + .setPartitions(List( + new AlterShareGroupOffsetsRequestData.AlterShareGroupOffsetsRequestPartition() + .setPartitionIndex(0) + .setStartOffset(0L), + new AlterShareGroupOffsetsRequestData.AlterShareGroupOffsetsRequestPartition() + .setPartitionIndex(1) + .setStartOffset(0L) + ).asJava))) + + val alterRequestData = new AlterShareGroupOffsetsRequestData() + .setGroupId(groupId) + .setTopics(topicCollection) + + val requestChannelRequest = buildRequest(new AlterShareGroupOffsetsRequest.Builder(alterRequestData).build) + when(groupCoordinator.alterShareGroupOffsets( + any(), + ArgumentMatchers.eq[String](groupId), + ArgumentMatchers.any(classOf[AlterShareGroupOffsetsRequestData]) + )).thenReturn(CompletableFuture.failedFuture(Errors.UNKNOWN_SERVER_ERROR.exception)) + + kafkaApis = createKafkaApis() + kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching) + + val alterShareGroupOffsetsResponseData = new AlterShareGroupOffsetsResponseData() + .setErrorMessage(Errors.UNKNOWN_SERVER_ERROR.message()) + .setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code()) + + val response = verifyNoThrottling[AlterShareGroupOffsetsResponse](requestChannelRequest) + assertEquals(alterShareGroupOffsetsResponseData, response.data) + } + def getShareGroupDescribeResponse(groupIds: util.List[String], enableShareGroups: Boolean = true, verifyNoErr: Boolean = true, authorizer: Authorizer = null, describedGroups: util.List[ShareGroupDescribeResponseData.DescribedGroup]): ShareGroupDescribeResponse = { diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java index 0bc37c82b14..9559c4bb52f 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java @@ -17,6 +17,8 @@ package org.apache.kafka.coordinator.group; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.message.AlterShareGroupOffsetsRequestData; +import org.apache.kafka.common.message.AlterShareGroupOffsetsResponseData; import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData; import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; @@ -239,6 +241,20 @@ public interface GroupCoordinator { List groupIds ); + /** + * Alter Share Group Offsets for a given group. + * + * @param context The request context. + * @param groupId The group id. + * @param requestData The AlterShareGroupOffsetsRequest data. + * @return A future yielding the results or an exception. + */ + CompletableFuture alterShareGroupOffsets( + AuthorizableRequestContext context, + String groupId, + AlterShareGroupOffsetsRequestData requestData + ); + /** * Delete Groups. * diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java index 7072a8a8fb3..2f10fd73934 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java @@ -26,6 +26,8 @@ import org.apache.kafka.common.errors.StreamsInvalidTopologyException; import org.apache.kafka.common.errors.UnsupportedAssignorException; import org.apache.kafka.common.internals.Plugin; import org.apache.kafka.common.internals.Topic; +import org.apache.kafka.common.message.AlterShareGroupOffsetsRequestData; +import org.apache.kafka.common.message.AlterShareGroupOffsetsResponseData; import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData; import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; @@ -62,6 +64,7 @@ import org.apache.kafka.common.message.SyncGroupResponseData; import org.apache.kafka.common.message.TxnOffsetCommitRequestData; import org.apache.kafka.common.message.TxnOffsetCommitResponseData; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.AlterShareGroupOffsetsRequest; import org.apache.kafka.common.requests.ApiError; import org.apache.kafka.common.requests.ConsumerGroupDescribeRequest; import org.apache.kafka.common.requests.DeleteGroupsRequest; @@ -667,6 +670,56 @@ public class GroupCoordinatorService implements GroupCoordinator { )); } + // Visibility for testing + CompletableFuture persisterInitialize( + InitializeShareGroupStateParameters request, + AlterShareGroupOffsetsResponseData response + ) { + return persister.initializeState(request) + .handle((result, exp) -> { + if (exp == null) { + if (result.errorCounts().isEmpty()) { + handlePersisterInitializeResponse(request.groupTopicPartitionData().groupId(), result, new ShareGroupHeartbeatResponseData()); + return response; + } else { + //TODO build new AlterShareGroupOffsetsResponseData for error response + return response; + } + } else { + return buildErrorResponse(request, response, exp); + } + + }); + } + + private AlterShareGroupOffsetsResponseData buildErrorResponse(InitializeShareGroupStateParameters request, AlterShareGroupOffsetsResponseData response, Throwable exp) { + // build new AlterShareGroupOffsetsResponseData for error response + AlterShareGroupOffsetsResponseData data = new AlterShareGroupOffsetsResponseData(); + GroupTopicPartitionData gtp = request.groupTopicPartitionData(); + log.error("Unable to initialize share group state for {}, {} while altering share group offsets", gtp.groupId(), gtp.topicsData(), exp); + Errors error = Errors.forException(exp); + data.setErrorCode(error.code()) + .setErrorMessage(error.message()) + .setResponses(response.responses()); + data.setResponses( + response.responses().stream() + .map(topic -> { + AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponseTopic topicData = new AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponseTopic() + .setTopicName(topic.topicName()); + topic.partitions().forEach(partition -> { + AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponsePartition partitionData = new AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponsePartition() + .setPartitionIndex(partition.partitionIndex()) + .setErrorCode(error.code()) + .setErrorMessage(error.message()); + topicData.partitions().add(partitionData); + }); + return topicData; + }) + .collect(Collectors.toList())); + // don't uninitialized share group state here, as we regard this alter share group offsets request failed. + return data; + } + // Visibility for testing CompletableFuture persisterInitialize( InitializeShareGroupStateParameters request, @@ -1153,6 +1206,39 @@ public class GroupCoordinatorService implements GroupCoordinator { return FutureUtils.combineFutures(futures, ArrayList::new, List::addAll); } + /** + * See {@link GroupCoordinator#alterShareGroupOffsets(AuthorizableRequestContext, String, AlterShareGroupOffsetsRequestData)}. + */ + @Override + public CompletableFuture alterShareGroupOffsets(AuthorizableRequestContext context, String groupId, AlterShareGroupOffsetsRequestData request) { + if (!isActive.get() || metadataImage == null) { + return CompletableFuture.completedFuture(AlterShareGroupOffsetsRequest.getErrorResponse(Errors.COORDINATOR_NOT_AVAILABLE)); + } + + if (groupId == null || groupId.isEmpty()) { + return CompletableFuture.completedFuture(AlterShareGroupOffsetsRequest.getErrorResponse(Errors.INVALID_GROUP_ID)); + } + + if (request.topics() == null || request.topics().isEmpty()) { + return CompletableFuture.completedFuture(new AlterShareGroupOffsetsResponseData()); + } + + return runtime.scheduleWriteOperation( + "share-group-offsets-alter", + topicPartitionFor(groupId), + Duration.ofMillis(config.offsetCommitTimeoutMs()), + coordinator -> coordinator.alterShareGroupOffsets(groupId, request) + ).thenCompose(result -> + persisterInitialize(result.getValue(), result.getKey()) + ).exceptionally(exception -> handleOperationException( + "share-group-offsets-alter", + request, + exception, + (error, message) -> AlterShareGroupOffsetsRequest.getErrorResponse(error), + log + )); + } + /** * See {@link GroupCoordinator#describeGroups(AuthorizableRequestContext, List)}. */ diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java index 2d32b3136b2..564612ebd20 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java @@ -23,6 +23,8 @@ import org.apache.kafka.common.errors.GroupIdNotFoundException; import org.apache.kafka.common.errors.GroupNotEmptyException; import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.internals.Plugin; +import org.apache.kafka.common.message.AlterShareGroupOffsetsRequestData; +import org.apache.kafka.common.message.AlterShareGroupOffsetsResponseData; import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData; import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; @@ -776,6 +778,36 @@ public class GroupCoordinatorShard implements CoordinatorShard, CoordinatorRecord> alterShareGroupOffsets( + String groupId, + AlterShareGroupOffsetsRequestData alterShareGroupOffsetsRequestData + ) { + List records = new ArrayList<>(); + ShareGroup group = groupMetadataManager.shareGroup(groupId); + group.validateOffsetsAlterable(); + + Map.Entry response = groupMetadataManager.completeAlterShareGroupOffsets( + groupId, + alterShareGroupOffsetsRequestData, + records + ); + return new CoordinatorResult<>( + records, + response + ); + } + /** * Fetch offsets for a given set of partitions and a given group. * diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index 4312cb47b09..0d153dc5597 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -35,6 +35,8 @@ import org.apache.kafka.common.errors.UnknownServerException; import org.apache.kafka.common.errors.UnreleasedInstanceIdException; import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.internals.Plugin; +import org.apache.kafka.common.message.AlterShareGroupOffsetsRequestData; +import org.apache.kafka.common.message.AlterShareGroupOffsetsResponseData; import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData; import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; @@ -2741,6 +2743,18 @@ public class GroupMetadataManager { )).build(); } + private InitializeShareGroupStateParameters buildInitializeShareGroupState(String groupId, int groupEpoch, Map> offsetByTopicPartitions) { + return new InitializeShareGroupStateParameters.Builder().setGroupTopicPartitionData( + new GroupTopicPartitionData<>(groupId, offsetByTopicPartitions.entrySet().stream() + .map(entry -> new TopicData<>( + entry.getKey(), + entry.getValue().entrySet().stream() + .map(partitionEntry -> PartitionFactory.newPartitionStateData(partitionEntry.getKey(), groupEpoch, partitionEntry.getValue())) + .toList()) + ).toList() + )).build(); + } + // Visibility for tests void addInitializingTopicsRecords(String groupId, List records, Map topicPartitionMap) { if (topicPartitionMap == null || topicPartitionMap.isEmpty()) { @@ -8089,6 +8103,74 @@ public class GroupMetadataManager { return deleteShareGroupStateRequestTopicsData; } + public Map.Entry completeAlterShareGroupOffsets( + String groupId, + AlterShareGroupOffsetsRequestData alterShareGroupOffsetsRequest, + List records + ) { + Group group = groups.get(groupId); + List alterShareGroupOffsetsResponseTopics = new ArrayList<>(); + + Map> initializingTopics = new HashMap<>(); + Map> offsetByTopicPartitions = new HashMap<>(); + + alterShareGroupOffsetsRequest.topics().forEach(topic -> { + TopicImage topicImage = metadataImage.topics().getTopic(topic.topicName()); + if (topicImage != null) { + Uuid topicId = topicImage.id(); + Set existingPartitions = new HashSet<>(topicImage.partitions().keySet()); + List partitions = new ArrayList<>(); + topic.partitions().forEach(partition -> { + if (existingPartitions.contains(partition.partitionIndex())) { + partitions.add( + new AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponsePartition() + .setPartitionIndex(partition.partitionIndex()) + .setErrorCode(Errors.NONE.code())); + offsetByTopicPartitions.computeIfAbsent(topicId, k -> new HashMap<>()).put(partition.partitionIndex(), partition.startOffset()); + } else { + partitions.add( + new AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponsePartition() + .setPartitionIndex(partition.partitionIndex()) + .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) + .setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message())); + } + }); + + initializingTopics.put(topicId, topic.partitions().stream() + .map(AlterShareGroupOffsetsRequestData.AlterShareGroupOffsetsRequestPartition::partitionIndex) + .filter(existingPartitions::contains) + .collect(Collectors.toSet())); + + alterShareGroupOffsetsResponseTopics.add( + new AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponseTopic() + .setTopicName(topic.topicName()) + .setTopicId(topicId) + .setPartitions(partitions) + ); + + } else { + List partitions = new ArrayList<>(); + topic.partitions().forEach(partition -> partitions.add( + new AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponsePartition() + .setPartitionIndex(partition.partitionIndex()) + .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) + .setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message()))); + alterShareGroupOffsetsResponseTopics.add( + new AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponseTopic() + .setTopicName(topic.topicName()) + .setPartitions(partitions) + ); + } + }); + + addInitializingTopicsRecords(groupId, records, initializingTopics); + return Map.entry( + new AlterShareGroupOffsetsResponseData() + .setResponses(alterShareGroupOffsetsResponseTopics), + buildInitializeShareGroupState(groupId, ((ShareGroup) group).groupEpoch(), offsetByTopicPartitions) + ); + } + /** * Iterates over the share state metadata map and removes any * deleted topic ids from the initialized and initializing maps. diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroup.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroup.java index be4711604e5..2d5f5fe3956 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroup.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroup.java @@ -241,6 +241,14 @@ public class ShareGroup extends ModernGroup { */ @Override public void validateDeleteGroup() throws ApiException { + validateEmptyGroup(); + } + + public void validateOffsetsAlterable() throws ApiException { + validateEmptyGroup(); + } + + public void validateEmptyGroup() { if (state() != ShareGroupState.EMPTY) { throw Errors.NON_EMPTY_GROUP.exception(); } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java index a6f32d1fedf..026af24f141 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java @@ -38,6 +38,8 @@ import org.apache.kafka.common.errors.StreamsTopologyFencedException; import org.apache.kafka.common.errors.UnknownMemberIdException; import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.apache.kafka.common.internals.Topic; +import org.apache.kafka.common.message.AlterShareGroupOffsetsRequestData; +import org.apache.kafka.common.message.AlterShareGroupOffsetsResponseData; import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData; import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; @@ -5230,6 +5232,192 @@ public class GroupCoordinatorServiceTest { verify(mockPersister, times(1)).initializeState(ArgumentMatchers.any()); } + @Test + public void testAlterShareGroupOffsetsMetadataImageNull() throws ExecutionException, InterruptedException { + CoordinatorRuntime runtime = mockRuntime(); + GroupCoordinatorService service = new GroupCoordinatorServiceBuilder() + .setConfig(createConfig()) + .setRuntime(runtime) + .build(true); + + // Forcing a null Metadata Image + service.onNewMetadataImage(null, null); + + String groupId = "share-group"; + AlterShareGroupOffsetsRequestData request = new AlterShareGroupOffsetsRequestData() + .setGroupId(groupId) + .setTopics(null); + + AlterShareGroupOffsetsResponseData response = new AlterShareGroupOffsetsResponseData() + .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()) + .setErrorMessage(Errors.COORDINATOR_NOT_AVAILABLE.message()); + + CompletableFuture future = service.alterShareGroupOffsets( + requestContext(ApiKeys.ALTER_SHARE_GROUP_OFFSETS), + groupId, + request + ); + assertEquals(response, future.get()); + } + + @Test + public void testAlterShareGroupOffsetsInvalidGroupId() throws InterruptedException, ExecutionException { + CoordinatorRuntime runtime = mockRuntime(); + GroupCoordinatorService service = new GroupCoordinatorServiceBuilder() + .setConfig(createConfig()) + .setRuntime(runtime) + .setMetrics(mock(GroupCoordinatorMetrics.class)) + .build(true); + + String groupId = ""; + AlterShareGroupOffsetsRequestData.AlterShareGroupOffsetsRequestTopicCollection requestTopicCollection = + new AlterShareGroupOffsetsRequestData.AlterShareGroupOffsetsRequestTopicCollection(List.of( + new AlterShareGroupOffsetsRequestData.AlterShareGroupOffsetsRequestTopic() + .setTopicName(TOPIC_NAME) + .setPartitions(List.of( + new AlterShareGroupOffsetsRequestData.AlterShareGroupOffsetsRequestPartition() + .setPartitionIndex(0) + .setStartOffset(0L) + )) + ).iterator()); + AlterShareGroupOffsetsRequestData request = new AlterShareGroupOffsetsRequestData() + .setGroupId(groupId) + .setTopics(requestTopicCollection); + + AlterShareGroupOffsetsResponseData response = new AlterShareGroupOffsetsResponseData() + .setErrorCode(Errors.INVALID_GROUP_ID.code()) + .setErrorMessage(Errors.INVALID_GROUP_ID.message()); + + CompletableFuture future = service.alterShareGroupOffsets( + requestContext(ApiKeys.ALTER_SHARE_GROUP_OFFSETS), + groupId, + request + ); + assertEquals(response, future.get()); + } + + @Test + public void testAlterShareGroupOffsetsEmptyRequest() throws ExecutionException, InterruptedException { + CoordinatorRuntime runtime = mockRuntime(); + GroupCoordinatorService service = new GroupCoordinatorServiceBuilder() + .setConfig(createConfig()) + .setRuntime(runtime) + .setMetrics(mock(GroupCoordinatorMetrics.class)) + .build(true); + + String groupId = "share-group"; + AlterShareGroupOffsetsRequestData request = new AlterShareGroupOffsetsRequestData() + .setGroupId(groupId); + + CompletableFuture future = service.alterShareGroupOffsets( + requestContext(ApiKeys.ALTER_SHARE_GROUP_OFFSETS), + groupId, + request + ); + + AlterShareGroupOffsetsResponseData data = new AlterShareGroupOffsetsResponseData(); + assertEquals(data, future.get()); + } + + @Test + public void testAlterShareGroupOffsetsRequestReturnsGroupNotEmpty() throws ExecutionException, InterruptedException { + CoordinatorRuntime runtime = mockRuntime(); + Persister persister = mock(DefaultStatePersister.class); + GroupCoordinatorService service = new GroupCoordinatorServiceBuilder() + .setConfig(createConfig()) + .setRuntime(runtime) + .setPersister(persister) + .build(true); + service.startup(() -> 1); + + String groupId = "share-group"; + + AlterShareGroupOffsetsRequestData.AlterShareGroupOffsetsRequestTopicCollection requestTopicCollection = + new AlterShareGroupOffsetsRequestData.AlterShareGroupOffsetsRequestTopicCollection(List.of( + new AlterShareGroupOffsetsRequestData.AlterShareGroupOffsetsRequestTopic() + .setTopicName(TOPIC_NAME) + .setPartitions(List.of( + new AlterShareGroupOffsetsRequestData.AlterShareGroupOffsetsRequestPartition() + .setPartitionIndex(0) + .setStartOffset(0L) + )) + ).iterator()); + + AlterShareGroupOffsetsRequestData request = new AlterShareGroupOffsetsRequestData() + .setGroupId(groupId) + .setTopics(requestTopicCollection); + + AlterShareGroupOffsetsResponseData response = new AlterShareGroupOffsetsResponseData() + .setErrorCode(Errors.NON_EMPTY_GROUP.code()) + .setErrorMessage(Errors.NON_EMPTY_GROUP.message()); + + when(runtime.scheduleWriteOperation( + ArgumentMatchers.eq("share-group-offsets-alter"), + ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)), + ArgumentMatchers.eq(Duration.ofMillis(5000)), + ArgumentMatchers.any() + )).thenReturn(FutureUtils.failedFuture( + new GroupNotEmptyException("bad stuff") + )); + + CompletableFuture future = + service.alterShareGroupOffsets(requestContext(ApiKeys.ALTER_SHARE_GROUP_OFFSETS), groupId, request); + assertEquals(response, future.get()); + } + + @Test + public void testPersisterInitializeForAlterShareGroupOffsetsResponseSuccess() { + CoordinatorRuntime runtime = mockRuntime(); + Persister mockPersister = mock(Persister.class); + GroupCoordinatorService service = new GroupCoordinatorServiceBuilder() + .setConfig(createConfig()) + .setRuntime(runtime) + .setPersister(mockPersister) + .build(true); + + String groupId = "share-group"; + Uuid topicId = Uuid.randomUuid(); + MetadataImage image = new MetadataImageBuilder() + .addTopic(topicId, "topic-name", 1) + .build(); + + service.onNewMetadataImage(image, null); + + when(mockPersister.initializeState(ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture( + new InitializeShareGroupStateResult.Builder() + .setTopicsData(List.of( + new TopicData<>(topicId, List.of( + PartitionFactory.newPartitionErrorData(0, Errors.NONE.code(), Errors.NONE.message()) + )) + )).build() + )); + + AlterShareGroupOffsetsResponseData defaultResponse = new AlterShareGroupOffsetsResponseData(); + InitializeShareGroupStateParameters params = new InitializeShareGroupStateParameters.Builder() + .setGroupTopicPartitionData( + new GroupTopicPartitionData<>(groupId, + List.of( + new TopicData<>(topicId, List.of(PartitionFactory.newPartitionStateData(0, 0, 0))) + )) + ).build(); + + when(runtime.scheduleWriteOperation( + ArgumentMatchers.eq("initialize-share-group-state"), + ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)), + ArgumentMatchers.eq(Duration.ofMillis(5000)), + ArgumentMatchers.any() + )).thenReturn(CompletableFuture.completedFuture(null)); + + assertEquals(defaultResponse, service.persisterInitialize(params, defaultResponse).getNow(null)); + verify(runtime, times(1)).scheduleWriteOperation( + ArgumentMatchers.eq("initialize-share-group-state"), + ArgumentMatchers.any(), + ArgumentMatchers.any(), + ArgumentMatchers.any() + ); + verify(mockPersister, times(1)).initializeState(ArgumentMatchers.any()); + } + @FunctionalInterface private interface TriFunction { R apply(A a, B b, C c); diff --git a/metadata/src/main/java/org/apache/kafka/metadata/MetadataCache.java b/metadata/src/main/java/org/apache/kafka/metadata/MetadataCache.java index a0ca97f44de..9fb3ec24640 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/MetadataCache.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/MetadataCache.java @@ -97,6 +97,8 @@ public interface MetadataCache extends ConfigRepository { Map topicIdsToNames(); + Map topicNamesToIds(); + /** * Get a partition leader's endpoint * diff --git a/server-common/src/main/java/org/apache/kafka/server/share/persister/InitializeShareGroupStateResult.java b/server-common/src/main/java/org/apache/kafka/server/share/persister/InitializeShareGroupStateResult.java index d616820c30a..722cf8da1da 100644 --- a/server-common/src/main/java/org/apache/kafka/server/share/persister/InitializeShareGroupStateResult.java +++ b/server-common/src/main/java/org/apache/kafka/server/share/persister/InitializeShareGroupStateResult.java @@ -18,8 +18,10 @@ package org.apache.kafka.server.share.persister; import org.apache.kafka.common.message.InitializeShareGroupStateResponseData; +import org.apache.kafka.common.protocol.Errors; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; /** @@ -47,6 +49,16 @@ public class InitializeShareGroupStateResult implements PersisterResult { .build(); } + public Map errorCounts() { + return topicsData.stream() + .flatMap(topicData -> topicData.partitions().stream()) + .filter(e -> e.errorCode() != Errors.NONE.code()) + .collect(Collectors.groupingBy( + partitionError -> Errors.forCode(partitionError.errorCode()), + Collectors.summingInt(partitionError -> 1) + )); + } + public static class Builder { private List> topicsData;