KAFKA-16717 [2/N]: Add AdminClient.alterShareGroupOffsets (#18929)

[KAFKA-16720](https://issues.apache.org/jira/browse/KAFKA-16720) aims to
finish the AlterShareGroupOffsets RPC.

Reviewers: Andrew Schofield <aschofield@confluent.io>

---------

Co-authored-by: jimmy <wangzhiwang@qq.com>
This commit is contained in:
jimmy 2025-05-23 16:05:48 +08:00 committed by GitHub
parent 999afbbbf1
commit b44bfca408
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 715 additions and 17 deletions

View File

@ -23,15 +23,11 @@ import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.Readable;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
public class AlterShareGroupOffsetsRequest extends AbstractRequest {
private final AlterShareGroupOffsetsRequestData data;
public AlterShareGroupOffsetsRequest(AlterShareGroupOffsetsRequestData data, short version) {
private AlterShareGroupOffsetsRequest(AlterShareGroupOffsetsRequestData data, short version) {
super(ApiKeys.ALTER_SHARE_GROUP_OFFSETS, version);
this.data = data;
}
@ -58,17 +54,25 @@ public class AlterShareGroupOffsetsRequest extends AbstractRequest {
@Override
public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
List<AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponseTopic> results = new ArrayList<>();
data.topics().forEach(
topicResult -> results.add(new AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponseTopic()
.setTopicName(topicResult.topicName())
.setPartitions(topicResult.partitions().stream()
.map(partitionData -> new AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponsePartition()
.setPartitionIndex(partitionData.partitionIndex())
.setErrorCode(Errors.forException(e).code()))
.collect(Collectors.toList()))));
return new AlterShareGroupOffsetsResponse(new AlterShareGroupOffsetsResponseData()
.setResponses(results));
Errors error = Errors.forException(e);
return new AlterShareGroupOffsetsResponse(getErrorResponse(throttleTimeMs, error));
}
public static AlterShareGroupOffsetsResponseData getErrorResponse(int throttleTimeMs, Errors error) {
return new AlterShareGroupOffsetsResponseData()
.setThrottleTimeMs(throttleTimeMs)
.setErrorCode(error.code())
.setErrorMessage(error.message());
}
public static AlterShareGroupOffsetsResponseData getErrorResponse(Errors error) {
return getErrorResponse(error.code(), error.message());
}
public static AlterShareGroupOffsetsResponseData getErrorResponse(short errorCode, String errorMessage) {
return new AlterShareGroupOffsetsResponseData()
.setErrorCode(errorCode)
.setErrorMessage(errorMessage);
}
public static AlterShareGroupOffsetsRequest parse(Readable readable, short version) {

View File

@ -17,12 +17,16 @@
package org.apache.kafka.common.requests;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.AlterShareGroupOffsetsResponseData;
import org.apache.kafka.common.message.AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponseTopic;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.Readable;
import java.util.ArrayList;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.Map;
public class AlterShareGroupOffsetsResponse extends AbstractResponse {
@ -37,6 +41,7 @@ public class AlterShareGroupOffsetsResponse extends AbstractResponse {
@Override
public Map<Errors, Integer> errorCounts() {
Map<Errors, Integer> counts = new EnumMap<>(Errors.class);
updateErrorCounts(counts, Errors.forCode(data.errorCode()));
data.responses().forEach(topic -> topic.partitions().forEach(partitionResponse ->
updateErrorCounts(counts, Errors.forCode(partitionResponse.errorCode()))
));
@ -63,4 +68,47 @@ public class AlterShareGroupOffsetsResponse extends AbstractResponse {
new AlterShareGroupOffsetsResponseData(readable, version)
);
}
public static class Builder {
AlterShareGroupOffsetsResponseData data = new AlterShareGroupOffsetsResponseData();
HashMap<String, AlterShareGroupOffsetsResponseTopic> topics = new HashMap<>();
private AlterShareGroupOffsetsResponseTopic getOrCreateTopic(String topic, Uuid topicId) {
AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponseTopic topicData = topics.get(topic);
if (topicData == null) {
topicData = new AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponseTopic()
.setTopicName(topic)
.setTopicId(topicId == null ? Uuid.ZERO_UUID : topicId);
topics.put(topic, topicData);
}
return topicData;
}
public Builder addPartition(String topic, int partition, Map<String, Uuid> topicIdsToNames, Errors error) {
AlterShareGroupOffsetsResponseTopic topicData = getOrCreateTopic(topic, topicIdsToNames.get(topic));
topicData.partitions().add(new AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponsePartition()
.setPartitionIndex(partition)
.setErrorCode(error.code())
.setErrorMessage(error.message()));
return this;
}
public AlterShareGroupOffsetsResponse build() {
data.setResponses(new ArrayList<>(topics.values()));
return new AlterShareGroupOffsetsResponse(data);
}
public Builder merge(AlterShareGroupOffsetsResponseData data, Map<String, Uuid> topicIdsToNames) {
data.responses().forEach(topic -> {
AlterShareGroupOffsetsResponseTopic newTopic = getOrCreateTopic(topic.topicName(), topicIdsToNames.get(topic.topicName()));
topic.partitions().forEach(partition -> newTopic.partitions().add(
new AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponsePartition()
.setPartitionIndex(partition.partitionIndex())
.setErrorCode(partition.errorCode())
.setErrorMessage(partition.errorMessage())));
});
return this;
}
}
}

View File

@ -19,6 +19,10 @@
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The top-level error code, or 0 if there was no error." },
{ "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
"about": "The top-level error message, or null if there was no error." },
{ "name": "Responses", "type": "[]AlterShareGroupOffsetsResponseTopic", "versions": "0+",
"about": "The results for each topic.", "fields": [
{ "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",

View File

@ -3738,11 +3738,50 @@ class KafkaApis(val requestChannel: RequestChannel,
def handleAlterShareGroupOffsetsRequest(request: RequestChannel.Request): CompletableFuture[Unit] = {
val alterShareGroupOffsetsRequest = request.body[AlterShareGroupOffsetsRequest]
val groupId = alterShareGroupOffsetsRequest.data.groupId
if (!isShareGroupProtocolEnabled) {
requestHelper.sendMaybeThrottle(request, alterShareGroupOffsetsRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, Errors.UNSUPPORTED_VERSION.exception))
return CompletableFuture.completedFuture[Unit](())
} else if (!authHelper.authorize(request.context, READ, GROUP, groupId)) {
requestHelper.sendMaybeThrottle(request, alterShareGroupOffsetsRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
} else {
val responseBuilder = new AlterShareGroupOffsetsResponse.Builder()
val authorizedTopicPartitions = new AlterShareGroupOffsetsRequestData.AlterShareGroupOffsetsRequestTopicCollection()
alterShareGroupOffsetsRequest.data.topics.forEach(topic => {
val topicError = {
if (!authHelper.authorize(request.context, READ, TOPIC, topic.topicName())) {
Some(new ApiError(Errors.TOPIC_AUTHORIZATION_FAILED))
} else if (!metadataCache.contains(topic.topicName())) {
Some(new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION))
} else {
None
}
}
topicError match {
case Some(error) =>
topic.partitions().forEach(partition => responseBuilder.addPartition(topic.topicName(), partition.partitionIndex(), metadataCache.topicNamesToIds(), error.error))
case None =>
authorizedTopicPartitions.add(topic)
}
})
val data = new AlterShareGroupOffsetsRequestData()
.setGroupId(groupId)
.setTopics(authorizedTopicPartitions)
groupCoordinator.alterShareGroupOffsets(
request.context,
groupId,
data
).handle[Unit] { (response, exception) =>
if (exception != null) {
requestHelper.sendMaybeThrottle(request, alterShareGroupOffsetsRequest.getErrorResponse(exception))
} else {
requestHelper.sendMaybeThrottle(request, responseBuilder.merge(response, metadataCache.topicNamesToIds()).build())
}
}
}
requestHelper.sendMaybeThrottle(request, alterShareGroupOffsetsRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
CompletableFuture.completedFuture[Unit](())
}

View File

@ -373,6 +373,8 @@ class KRaftMetadataCache(
override def topicIdsToNames(): util.Map[Uuid, String] = _currentImage.topics.topicIdToNameView()
override def topicNamesToIds(): util.Map[String, Uuid] = _currentImage.topics().topicNameToIdView()
// if the leader is not known, return None;
// if the leader is known and corresponding node is available, return Some(node)
// if the leader is known but corresponding node with the listener name is not available, return Some(NO_NODE)

View File

@ -13167,6 +13167,181 @@ class KafkaApisTest extends Logging {
})
}
@Test
def testAlterShareGroupOffsetsSuccess(): Unit = {
val groupId = "group"
val topicName1 = "foo"
val topicId1 = Uuid.randomUuid
metadataCache = initializeMetadataCacheWithShareGroupsEnabled()
addTopicToMetadataCache(topicName1, 2, topicId = topicId1)
val topicCollection = new AlterShareGroupOffsetsRequestTopicCollection();
topicCollection.addAll(util.List.of(
new AlterShareGroupOffsetsRequestData.AlterShareGroupOffsetsRequestTopic()
.setTopicName(topicName1)
.setPartitions(List(
new AlterShareGroupOffsetsRequestData.AlterShareGroupOffsetsRequestPartition()
.setPartitionIndex(0)
.setStartOffset(0L),
new AlterShareGroupOffsetsRequestData.AlterShareGroupOffsetsRequestPartition()
.setPartitionIndex(1)
.setStartOffset(0L)
).asJava)))
val alterRequestData = new AlterShareGroupOffsetsRequestData()
.setGroupId(groupId)
.setTopics(topicCollection)
val requestChannelRequest = buildRequest(new AlterShareGroupOffsetsRequest.Builder(alterRequestData).build)
val resultFuture = new CompletableFuture[AlterShareGroupOffsetsResponseData]
when(groupCoordinator.alterShareGroupOffsets(
any(),
ArgumentMatchers.eq[String](groupId),
ArgumentMatchers.any(classOf[AlterShareGroupOffsetsRequestData])
)).thenReturn(resultFuture)
kafkaApis = createKafkaApis()
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
val alterShareGroupOffsetsResponse = new AlterShareGroupOffsetsResponseData()
resultFuture.complete(alterShareGroupOffsetsResponse)
val response = verifyNoThrottling[AlterShareGroupOffsetsResponse](requestChannelRequest)
assertEquals(alterShareGroupOffsetsResponse, response.data)
}
@Test
def testAlterShareGroupOffsetsAuthorizationFailed(): Unit = {
val groupId = "group"
val topicName1 = "foo"
val topicId1 = Uuid.randomUuid
val topicName2 = "bar"
val topicId2 = Uuid.randomUuid
val topicName3 = "zoo"
metadataCache = initializeMetadataCacheWithShareGroupsEnabled()
addTopicToMetadataCache(topicName1, 2, topicId = topicId1)
addTopicToMetadataCache(topicName2, 1, topicId = topicId2)
val topicCollection = new AlterShareGroupOffsetsRequestTopicCollection();
topicCollection.addAll(util.List.of(
new AlterShareGroupOffsetsRequestData.AlterShareGroupOffsetsRequestTopic()
.setTopicName(topicName1)
.setPartitions(List(
new AlterShareGroupOffsetsRequestData.AlterShareGroupOffsetsRequestPartition()
.setPartitionIndex(0)
.setStartOffset(0L),
new AlterShareGroupOffsetsRequestData.AlterShareGroupOffsetsRequestPartition()
.setPartitionIndex(1)
.setStartOffset(0L)
).asJava),
new AlterShareGroupOffsetsRequestData.AlterShareGroupOffsetsRequestTopic()
.setTopicName(topicName2)
.setPartitions(List(
new AlterShareGroupOffsetsRequestData.AlterShareGroupOffsetsRequestPartition()
.setPartitionIndex(0)
.setStartOffset(0L)
).asJava),
new AlterShareGroupOffsetsRequestData.AlterShareGroupOffsetsRequestTopic()
.setTopicName(topicName3)
setPartitions(List(
new AlterShareGroupOffsetsRequestData.AlterShareGroupOffsetsRequestPartition()
.setPartitionIndex(0)
.setStartOffset(0L)
).asJava))
)
val authorizer: Authorizer = mock(classOf[Authorizer])
when(authorizer.authorize(any[RequestContext], any[util.List[Action]]))
.thenReturn(Seq(AuthorizationResult.ALLOWED).asJava, Seq(AuthorizationResult.DENIED).asJava, Seq(AuthorizationResult.ALLOWED).asJava, Seq(AuthorizationResult.ALLOWED).asJava)
val alterRequestData = new AlterShareGroupOffsetsRequestData()
.setGroupId(groupId)
.setTopics(topicCollection)
val requestChannelRequest = buildRequest(new AlterShareGroupOffsetsRequest.Builder(alterRequestData).build)
val resultFuture = new CompletableFuture[AlterShareGroupOffsetsResponseData]
when(groupCoordinator.alterShareGroupOffsets(
any(),
ArgumentMatchers.eq[String](groupId),
ArgumentMatchers.any(classOf[AlterShareGroupOffsetsRequestData])
)).thenReturn(resultFuture)
kafkaApis = createKafkaApis(authorizer = Some(authorizer))
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
val alterShareGroupOffsetsResponse = new AlterShareGroupOffsetsResponseData()
.setResponses(List(
new AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponseTopic()
.setTopicName(topicName2)
.setTopicId(topicId2)
.setPartitions(List(
new AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponsePartition()
.setPartitionIndex(0)
.setErrorCode(Errors.NONE.code())
.setErrorMessage(Errors.NONE.message())
).asJava)
).asJava)
resultFuture.complete(alterShareGroupOffsetsResponse)
val response = verifyNoThrottling[AlterShareGroupOffsetsResponse](requestChannelRequest)
assertNotNull(response.data)
assertEquals(1, response.errorCounts().get(Errors.UNKNOWN_TOPIC_OR_PARTITION))
assertEquals(2, response.errorCounts().get(Errors.TOPIC_AUTHORIZATION_FAILED))
assertEquals(3, response.data().responses().size())
val bar = response.data().responses().get(0)
val foo = response.data().responses().get(1)
val zoo = response.data().responses().get(2)
assertEquals(topicName1, foo.topicName())
assertEquals(topicId1, foo.topicId())
assertEquals(topicName2, bar.topicName())
assertEquals(topicId2, bar.topicId())
assertEquals(topicName3, zoo.topicName())
assertEquals(Uuid.ZERO_UUID, zoo.topicId())
foo.partitions().forEach(partition => {
assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED.code(), partition.errorCode())
})
}
@Test
def testAlterShareGroupOffsetsRequestGroupCoordinatorThrowsError(): Unit = {
val groupId = "group"
val topicName1 = "foo"
val topicId1 = Uuid.randomUuid
metadataCache = initializeMetadataCacheWithShareGroupsEnabled()
addTopicToMetadataCache(topicName1, 2, topicId = topicId1)
val topicCollection = new AlterShareGroupOffsetsRequestTopicCollection();
topicCollection.addAll(util.List.of(
new AlterShareGroupOffsetsRequestData.AlterShareGroupOffsetsRequestTopic()
.setTopicName(topicName1)
.setPartitions(List(
new AlterShareGroupOffsetsRequestData.AlterShareGroupOffsetsRequestPartition()
.setPartitionIndex(0)
.setStartOffset(0L),
new AlterShareGroupOffsetsRequestData.AlterShareGroupOffsetsRequestPartition()
.setPartitionIndex(1)
.setStartOffset(0L)
).asJava)))
val alterRequestData = new AlterShareGroupOffsetsRequestData()
.setGroupId(groupId)
.setTopics(topicCollection)
val requestChannelRequest = buildRequest(new AlterShareGroupOffsetsRequest.Builder(alterRequestData).build)
when(groupCoordinator.alterShareGroupOffsets(
any(),
ArgumentMatchers.eq[String](groupId),
ArgumentMatchers.any(classOf[AlterShareGroupOffsetsRequestData])
)).thenReturn(CompletableFuture.failedFuture(Errors.UNKNOWN_SERVER_ERROR.exception))
kafkaApis = createKafkaApis()
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
val alterShareGroupOffsetsResponseData = new AlterShareGroupOffsetsResponseData()
.setErrorMessage(Errors.UNKNOWN_SERVER_ERROR.message())
.setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code())
val response = verifyNoThrottling[AlterShareGroupOffsetsResponse](requestChannelRequest)
assertEquals(alterShareGroupOffsetsResponseData, response.data)
}
def getShareGroupDescribeResponse(groupIds: util.List[String], enableShareGroups: Boolean = true,
verifyNoErr: Boolean = true, authorizer: Authorizer = null,
describedGroups: util.List[ShareGroupDescribeResponseData.DescribedGroup]): ShareGroupDescribeResponse = {

View File

@ -17,6 +17,8 @@
package org.apache.kafka.coordinator.group;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.message.AlterShareGroupOffsetsRequestData;
import org.apache.kafka.common.message.AlterShareGroupOffsetsResponseData;
import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
@ -239,6 +241,20 @@ public interface GroupCoordinator {
List<String> groupIds
);
/**
* Alter Share Group Offsets for a given group.
*
* @param context The request context.
* @param groupId The group id.
* @param requestData The AlterShareGroupOffsetsRequest data.
* @return A future yielding the results or an exception.
*/
CompletableFuture<AlterShareGroupOffsetsResponseData> alterShareGroupOffsets(
AuthorizableRequestContext context,
String groupId,
AlterShareGroupOffsetsRequestData requestData
);
/**
* Delete Groups.
*

View File

@ -26,6 +26,8 @@ import org.apache.kafka.common.errors.StreamsInvalidTopologyException;
import org.apache.kafka.common.errors.UnsupportedAssignorException;
import org.apache.kafka.common.internals.Plugin;
import org.apache.kafka.common.internals.Topic;
import org.apache.kafka.common.message.AlterShareGroupOffsetsRequestData;
import org.apache.kafka.common.message.AlterShareGroupOffsetsResponseData;
import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
@ -62,6 +64,7 @@ import org.apache.kafka.common.message.SyncGroupResponseData;
import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
import org.apache.kafka.common.message.TxnOffsetCommitResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AlterShareGroupOffsetsRequest;
import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.common.requests.ConsumerGroupDescribeRequest;
import org.apache.kafka.common.requests.DeleteGroupsRequest;
@ -667,6 +670,56 @@ public class GroupCoordinatorService implements GroupCoordinator {
));
}
// Visibility for testing
CompletableFuture<AlterShareGroupOffsetsResponseData> persisterInitialize(
InitializeShareGroupStateParameters request,
AlterShareGroupOffsetsResponseData response
) {
return persister.initializeState(request)
.handle((result, exp) -> {
if (exp == null) {
if (result.errorCounts().isEmpty()) {
handlePersisterInitializeResponse(request.groupTopicPartitionData().groupId(), result, new ShareGroupHeartbeatResponseData());
return response;
} else {
//TODO build new AlterShareGroupOffsetsResponseData for error response
return response;
}
} else {
return buildErrorResponse(request, response, exp);
}
});
}
private AlterShareGroupOffsetsResponseData buildErrorResponse(InitializeShareGroupStateParameters request, AlterShareGroupOffsetsResponseData response, Throwable exp) {
// build new AlterShareGroupOffsetsResponseData for error response
AlterShareGroupOffsetsResponseData data = new AlterShareGroupOffsetsResponseData();
GroupTopicPartitionData<PartitionStateData> gtp = request.groupTopicPartitionData();
log.error("Unable to initialize share group state for {}, {} while altering share group offsets", gtp.groupId(), gtp.topicsData(), exp);
Errors error = Errors.forException(exp);
data.setErrorCode(error.code())
.setErrorMessage(error.message())
.setResponses(response.responses());
data.setResponses(
response.responses().stream()
.map(topic -> {
AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponseTopic topicData = new AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponseTopic()
.setTopicName(topic.topicName());
topic.partitions().forEach(partition -> {
AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponsePartition partitionData = new AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponsePartition()
.setPartitionIndex(partition.partitionIndex())
.setErrorCode(error.code())
.setErrorMessage(error.message());
topicData.partitions().add(partitionData);
});
return topicData;
})
.collect(Collectors.toList()));
// don't uninitialized share group state here, as we regard this alter share group offsets request failed.
return data;
}
// Visibility for testing
CompletableFuture<ShareGroupHeartbeatResponseData> persisterInitialize(
InitializeShareGroupStateParameters request,
@ -1153,6 +1206,39 @@ public class GroupCoordinatorService implements GroupCoordinator {
return FutureUtils.combineFutures(futures, ArrayList::new, List::addAll);
}
/**
* See {@link GroupCoordinator#alterShareGroupOffsets(AuthorizableRequestContext, String, AlterShareGroupOffsetsRequestData)}.
*/
@Override
public CompletableFuture<AlterShareGroupOffsetsResponseData> alterShareGroupOffsets(AuthorizableRequestContext context, String groupId, AlterShareGroupOffsetsRequestData request) {
if (!isActive.get() || metadataImage == null) {
return CompletableFuture.completedFuture(AlterShareGroupOffsetsRequest.getErrorResponse(Errors.COORDINATOR_NOT_AVAILABLE));
}
if (groupId == null || groupId.isEmpty()) {
return CompletableFuture.completedFuture(AlterShareGroupOffsetsRequest.getErrorResponse(Errors.INVALID_GROUP_ID));
}
if (request.topics() == null || request.topics().isEmpty()) {
return CompletableFuture.completedFuture(new AlterShareGroupOffsetsResponseData());
}
return runtime.scheduleWriteOperation(
"share-group-offsets-alter",
topicPartitionFor(groupId),
Duration.ofMillis(config.offsetCommitTimeoutMs()),
coordinator -> coordinator.alterShareGroupOffsets(groupId, request)
).thenCompose(result ->
persisterInitialize(result.getValue(), result.getKey())
).exceptionally(exception -> handleOperationException(
"share-group-offsets-alter",
request,
exception,
(error, message) -> AlterShareGroupOffsetsRequest.getErrorResponse(error),
log
));
}
/**
* See {@link GroupCoordinator#describeGroups(AuthorizableRequestContext, List)}.
*/

View File

@ -23,6 +23,8 @@ import org.apache.kafka.common.errors.GroupIdNotFoundException;
import org.apache.kafka.common.errors.GroupNotEmptyException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.internals.Plugin;
import org.apache.kafka.common.message.AlterShareGroupOffsetsRequestData;
import org.apache.kafka.common.message.AlterShareGroupOffsetsResponseData;
import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
@ -776,6 +778,36 @@ public class GroupCoordinatorShard implements CoordinatorShard<CoordinatorRecord
);
}
/**
* Make the following checks to make sure the AlterShareGroupOffsetsRequest request is valid:
* 1. Checks whether the provided group is empty
* 2. Checks the requested topics are presented in the metadataImage
* 3. Checks the corresponding share partitions in AlterShareGroupOffsetsRequest are existing
*
* @param groupId - The group ID
* @param alterShareGroupOffsetsRequestData - The request data for AlterShareGroupOffsetsRequestData
* @return A Result containing a pair of AlterShareGroupOffsets InitializeShareGroupStateParameters
* and a list of records to update the state machine.
*/
public CoordinatorResult<Map.Entry<AlterShareGroupOffsetsResponseData, InitializeShareGroupStateParameters>, CoordinatorRecord> alterShareGroupOffsets(
String groupId,
AlterShareGroupOffsetsRequestData alterShareGroupOffsetsRequestData
) {
List<CoordinatorRecord> records = new ArrayList<>();
ShareGroup group = groupMetadataManager.shareGroup(groupId);
group.validateOffsetsAlterable();
Map.Entry<AlterShareGroupOffsetsResponseData, InitializeShareGroupStateParameters> response = groupMetadataManager.completeAlterShareGroupOffsets(
groupId,
alterShareGroupOffsetsRequestData,
records
);
return new CoordinatorResult<>(
records,
response
);
}
/**
* Fetch offsets for a given set of partitions and a given group.
*

View File

@ -35,6 +35,8 @@ import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.errors.UnreleasedInstanceIdException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.internals.Plugin;
import org.apache.kafka.common.message.AlterShareGroupOffsetsRequestData;
import org.apache.kafka.common.message.AlterShareGroupOffsetsResponseData;
import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
@ -2741,6 +2743,18 @@ public class GroupMetadataManager {
)).build();
}
private InitializeShareGroupStateParameters buildInitializeShareGroupState(String groupId, int groupEpoch, Map<Uuid, Map<Integer, Long>> offsetByTopicPartitions) {
return new InitializeShareGroupStateParameters.Builder().setGroupTopicPartitionData(
new GroupTopicPartitionData<>(groupId, offsetByTopicPartitions.entrySet().stream()
.map(entry -> new TopicData<>(
entry.getKey(),
entry.getValue().entrySet().stream()
.map(partitionEntry -> PartitionFactory.newPartitionStateData(partitionEntry.getKey(), groupEpoch, partitionEntry.getValue()))
.toList())
).toList()
)).build();
}
// Visibility for tests
void addInitializingTopicsRecords(String groupId, List<CoordinatorRecord> records, Map<Uuid, InitMapValue> topicPartitionMap) {
if (topicPartitionMap == null || topicPartitionMap.isEmpty()) {
@ -8089,6 +8103,74 @@ public class GroupMetadataManager {
return deleteShareGroupStateRequestTopicsData;
}
public Map.Entry<AlterShareGroupOffsetsResponseData, InitializeShareGroupStateParameters> completeAlterShareGroupOffsets(
String groupId,
AlterShareGroupOffsetsRequestData alterShareGroupOffsetsRequest,
List<CoordinatorRecord> records
) {
Group group = groups.get(groupId);
List<AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponseTopic> alterShareGroupOffsetsResponseTopics = new ArrayList<>();
Map<Uuid, Set<Integer>> initializingTopics = new HashMap<>();
Map<Uuid, Map<Integer, Long>> offsetByTopicPartitions = new HashMap<>();
alterShareGroupOffsetsRequest.topics().forEach(topic -> {
TopicImage topicImage = metadataImage.topics().getTopic(topic.topicName());
if (topicImage != null) {
Uuid topicId = topicImage.id();
Set<Integer> existingPartitions = new HashSet<>(topicImage.partitions().keySet());
List<AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponsePartition> partitions = new ArrayList<>();
topic.partitions().forEach(partition -> {
if (existingPartitions.contains(partition.partitionIndex())) {
partitions.add(
new AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponsePartition()
.setPartitionIndex(partition.partitionIndex())
.setErrorCode(Errors.NONE.code()));
offsetByTopicPartitions.computeIfAbsent(topicId, k -> new HashMap<>()).put(partition.partitionIndex(), partition.startOffset());
} else {
partitions.add(
new AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponsePartition()
.setPartitionIndex(partition.partitionIndex())
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
.setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message()));
}
});
initializingTopics.put(topicId, topic.partitions().stream()
.map(AlterShareGroupOffsetsRequestData.AlterShareGroupOffsetsRequestPartition::partitionIndex)
.filter(existingPartitions::contains)
.collect(Collectors.toSet()));
alterShareGroupOffsetsResponseTopics.add(
new AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponseTopic()
.setTopicName(topic.topicName())
.setTopicId(topicId)
.setPartitions(partitions)
);
} else {
List<AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponsePartition> partitions = new ArrayList<>();
topic.partitions().forEach(partition -> partitions.add(
new AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponsePartition()
.setPartitionIndex(partition.partitionIndex())
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
.setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message())));
alterShareGroupOffsetsResponseTopics.add(
new AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponseTopic()
.setTopicName(topic.topicName())
.setPartitions(partitions)
);
}
});
addInitializingTopicsRecords(groupId, records, initializingTopics);
return Map.entry(
new AlterShareGroupOffsetsResponseData()
.setResponses(alterShareGroupOffsetsResponseTopics),
buildInitializeShareGroupState(groupId, ((ShareGroup) group).groupEpoch(), offsetByTopicPartitions)
);
}
/**
* Iterates over the share state metadata map and removes any
* deleted topic ids from the initialized and initializing maps.

View File

@ -241,6 +241,14 @@ public class ShareGroup extends ModernGroup<ShareGroupMember> {
*/
@Override
public void validateDeleteGroup() throws ApiException {
validateEmptyGroup();
}
public void validateOffsetsAlterable() throws ApiException {
validateEmptyGroup();
}
public void validateEmptyGroup() {
if (state() != ShareGroupState.EMPTY) {
throw Errors.NON_EMPTY_GROUP.exception();
}

View File

@ -38,6 +38,8 @@ import org.apache.kafka.common.errors.StreamsTopologyFencedException;
import org.apache.kafka.common.errors.UnknownMemberIdException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.internals.Topic;
import org.apache.kafka.common.message.AlterShareGroupOffsetsRequestData;
import org.apache.kafka.common.message.AlterShareGroupOffsetsResponseData;
import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
@ -5230,6 +5232,192 @@ public class GroupCoordinatorServiceTest {
verify(mockPersister, times(1)).initializeState(ArgumentMatchers.any());
}
@Test
public void testAlterShareGroupOffsetsMetadataImageNull() throws ExecutionException, InterruptedException {
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
.setConfig(createConfig())
.setRuntime(runtime)
.build(true);
// Forcing a null Metadata Image
service.onNewMetadataImage(null, null);
String groupId = "share-group";
AlterShareGroupOffsetsRequestData request = new AlterShareGroupOffsetsRequestData()
.setGroupId(groupId)
.setTopics(null);
AlterShareGroupOffsetsResponseData response = new AlterShareGroupOffsetsResponseData()
.setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
.setErrorMessage(Errors.COORDINATOR_NOT_AVAILABLE.message());
CompletableFuture<AlterShareGroupOffsetsResponseData> future = service.alterShareGroupOffsets(
requestContext(ApiKeys.ALTER_SHARE_GROUP_OFFSETS),
groupId,
request
);
assertEquals(response, future.get());
}
@Test
public void testAlterShareGroupOffsetsInvalidGroupId() throws InterruptedException, ExecutionException {
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
.setConfig(createConfig())
.setRuntime(runtime)
.setMetrics(mock(GroupCoordinatorMetrics.class))
.build(true);
String groupId = "";
AlterShareGroupOffsetsRequestData.AlterShareGroupOffsetsRequestTopicCollection requestTopicCollection =
new AlterShareGroupOffsetsRequestData.AlterShareGroupOffsetsRequestTopicCollection(List.of(
new AlterShareGroupOffsetsRequestData.AlterShareGroupOffsetsRequestTopic()
.setTopicName(TOPIC_NAME)
.setPartitions(List.of(
new AlterShareGroupOffsetsRequestData.AlterShareGroupOffsetsRequestPartition()
.setPartitionIndex(0)
.setStartOffset(0L)
))
).iterator());
AlterShareGroupOffsetsRequestData request = new AlterShareGroupOffsetsRequestData()
.setGroupId(groupId)
.setTopics(requestTopicCollection);
AlterShareGroupOffsetsResponseData response = new AlterShareGroupOffsetsResponseData()
.setErrorCode(Errors.INVALID_GROUP_ID.code())
.setErrorMessage(Errors.INVALID_GROUP_ID.message());
CompletableFuture<AlterShareGroupOffsetsResponseData> future = service.alterShareGroupOffsets(
requestContext(ApiKeys.ALTER_SHARE_GROUP_OFFSETS),
groupId,
request
);
assertEquals(response, future.get());
}
@Test
public void testAlterShareGroupOffsetsEmptyRequest() throws ExecutionException, InterruptedException {
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
.setConfig(createConfig())
.setRuntime(runtime)
.setMetrics(mock(GroupCoordinatorMetrics.class))
.build(true);
String groupId = "share-group";
AlterShareGroupOffsetsRequestData request = new AlterShareGroupOffsetsRequestData()
.setGroupId(groupId);
CompletableFuture<AlterShareGroupOffsetsResponseData> future = service.alterShareGroupOffsets(
requestContext(ApiKeys.ALTER_SHARE_GROUP_OFFSETS),
groupId,
request
);
AlterShareGroupOffsetsResponseData data = new AlterShareGroupOffsetsResponseData();
assertEquals(data, future.get());
}
@Test
public void testAlterShareGroupOffsetsRequestReturnsGroupNotEmpty() throws ExecutionException, InterruptedException {
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
Persister persister = mock(DefaultStatePersister.class);
GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
.setConfig(createConfig())
.setRuntime(runtime)
.setPersister(persister)
.build(true);
service.startup(() -> 1);
String groupId = "share-group";
AlterShareGroupOffsetsRequestData.AlterShareGroupOffsetsRequestTopicCollection requestTopicCollection =
new AlterShareGroupOffsetsRequestData.AlterShareGroupOffsetsRequestTopicCollection(List.of(
new AlterShareGroupOffsetsRequestData.AlterShareGroupOffsetsRequestTopic()
.setTopicName(TOPIC_NAME)
.setPartitions(List.of(
new AlterShareGroupOffsetsRequestData.AlterShareGroupOffsetsRequestPartition()
.setPartitionIndex(0)
.setStartOffset(0L)
))
).iterator());
AlterShareGroupOffsetsRequestData request = new AlterShareGroupOffsetsRequestData()
.setGroupId(groupId)
.setTopics(requestTopicCollection);
AlterShareGroupOffsetsResponseData response = new AlterShareGroupOffsetsResponseData()
.setErrorCode(Errors.NON_EMPTY_GROUP.code())
.setErrorMessage(Errors.NON_EMPTY_GROUP.message());
when(runtime.scheduleWriteOperation(
ArgumentMatchers.eq("share-group-offsets-alter"),
ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
ArgumentMatchers.eq(Duration.ofMillis(5000)),
ArgumentMatchers.any()
)).thenReturn(FutureUtils.failedFuture(
new GroupNotEmptyException("bad stuff")
));
CompletableFuture<AlterShareGroupOffsetsResponseData> future =
service.alterShareGroupOffsets(requestContext(ApiKeys.ALTER_SHARE_GROUP_OFFSETS), groupId, request);
assertEquals(response, future.get());
}
@Test
public void testPersisterInitializeForAlterShareGroupOffsetsResponseSuccess() {
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
Persister mockPersister = mock(Persister.class);
GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
.setConfig(createConfig())
.setRuntime(runtime)
.setPersister(mockPersister)
.build(true);
String groupId = "share-group";
Uuid topicId = Uuid.randomUuid();
MetadataImage image = new MetadataImageBuilder()
.addTopic(topicId, "topic-name", 1)
.build();
service.onNewMetadataImage(image, null);
when(mockPersister.initializeState(ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture(
new InitializeShareGroupStateResult.Builder()
.setTopicsData(List.of(
new TopicData<>(topicId, List.of(
PartitionFactory.newPartitionErrorData(0, Errors.NONE.code(), Errors.NONE.message())
))
)).build()
));
AlterShareGroupOffsetsResponseData defaultResponse = new AlterShareGroupOffsetsResponseData();
InitializeShareGroupStateParameters params = new InitializeShareGroupStateParameters.Builder()
.setGroupTopicPartitionData(
new GroupTopicPartitionData<>(groupId,
List.of(
new TopicData<>(topicId, List.of(PartitionFactory.newPartitionStateData(0, 0, 0)))
))
).build();
when(runtime.scheduleWriteOperation(
ArgumentMatchers.eq("initialize-share-group-state"),
ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
ArgumentMatchers.eq(Duration.ofMillis(5000)),
ArgumentMatchers.any()
)).thenReturn(CompletableFuture.completedFuture(null));
assertEquals(defaultResponse, service.persisterInitialize(params, defaultResponse).getNow(null));
verify(runtime, times(1)).scheduleWriteOperation(
ArgumentMatchers.eq("initialize-share-group-state"),
ArgumentMatchers.any(),
ArgumentMatchers.any(),
ArgumentMatchers.any()
);
verify(mockPersister, times(1)).initializeState(ArgumentMatchers.any());
}
@FunctionalInterface
private interface TriFunction<A, B, C, R> {
R apply(A a, B b, C c);

View File

@ -97,6 +97,8 @@ public interface MetadataCache extends ConfigRepository {
Map<Uuid, String> topicIdsToNames();
Map<String, Uuid> topicNamesToIds();
/**
* Get a partition leader's endpoint
*

View File

@ -18,8 +18,10 @@
package org.apache.kafka.server.share.persister;
import org.apache.kafka.common.message.InitializeShareGroupStateResponseData;
import org.apache.kafka.common.protocol.Errors;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
@ -47,6 +49,16 @@ public class InitializeShareGroupStateResult implements PersisterResult {
.build();
}
public Map<Errors, Integer> errorCounts() {
return topicsData.stream()
.flatMap(topicData -> topicData.partitions().stream())
.filter(e -> e.errorCode() != Errors.NONE.code())
.collect(Collectors.groupingBy(
partitionError -> Errors.forCode(partitionError.errorCode()),
Collectors.summingInt(partitionError -> 1)
));
}
public static class Builder {
private List<TopicData<PartitionErrorData>> topicsData;