KAFKA-14499: [3/N] Implement OffsetCommit API (#14067)

This patch introduces the `OffsetMetadataManager` and implements the `OffsetCommit` API for both the old rebalance protocol and the new rebalance protocol. It introduces version 9 of the API but keeps it as unstable for now. The patch adds unit tests to test the API. Integration tests will be done separately.

Reviewers: Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io>
This commit is contained in:
David Jacot 2023-07-27 13:18:10 +02:00 committed by GitHub
parent 353141ed92
commit 29825ee24f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 2245 additions and 194 deletions

View File

@ -30,7 +30,6 @@ import org.apache.kafka.common.protocol.Errors;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
public class OffsetCommitRequest extends AbstractRequest {
// default values for the current version
@ -121,8 +120,4 @@ public class OffsetCommitRequest extends AbstractRequest {
public static OffsetCommitRequest parse(ByteBuffer buffer, short version) {
return new OffsetCommitRequest(new OffsetCommitRequestData(new ByteBufferAccessor(buffer), version), version);
}
public static Optional<String> groupInstanceId(OffsetCommitRequestData request) {
return Optional.ofNullable(request.groupInstanceId());
}
}

View File

@ -30,7 +30,8 @@
// Version 8 is the first flexible version.
//
// Version 9 is the first version that can be used with the new consumer group protocol (KIP-848). The response is
// the same as version 8 but can return STALE_MEMBER_EPOCH when the new consumer group protocol is used.
// the same as version 8 but can return STALE_MEMBER_EPOCH when the new consumer group protocol is used and
// GROUP_ID_NOT_FOUND when the group does not exist for both protocols.
"validVersions": "0-9",
"flexibleVersions": "8+",
// Supported errors:
@ -42,6 +43,7 @@
// - UNKNOWN_MEMBER_ID (version 1+)
// - INVALID_COMMIT_OFFSET_SIZE (version 0+)
// - FENCED_MEMBER_EPOCH (version 7+)
// - GROUP_ID_NOT_FOUND (version 9+)
// - STALE_MEMBER_EPOCH (version 9+)
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "3+", "ignorable": true,

View File

@ -532,6 +532,7 @@ class BrokerServer(
config.consumerGroupMaxSize,
config.consumerGroupAssignors,
config.offsetsTopicSegmentBytes,
config.offsetMetadataMaxSize,
config.groupMaxSize,
config.groupInitialRebalanceDelay,
GroupCoordinatorConfig.GENERIC_GROUP_NEW_MEMBER_JOIN_TIMEOUT_MS,

View File

@ -16,6 +16,8 @@
*/
package org.apache.kafka.coordinator.group;
import org.apache.kafka.common.KafkaException;
/**
* Interface common for all groups.
*/
@ -50,4 +52,18 @@ public interface Group {
* @return The group id.
*/
String groupId();
/**
* Validates the OffsetCommit request.
*
* @param memberId The member id.
* @param groupInstanceId The group instance id.
* @param generationIdOrMemberEpoch The generation id for genetic groups or the member epoch
* for consumer groups.
*/
void validateOffsetCommit(
String memberId,
String groupInstanceId,
int generationIdOrMemberEpoch
) throws KafkaException;
}

View File

@ -61,6 +61,11 @@ public class GroupCoordinatorConfig {
*/
public final int offsetsTopicSegmentBytes;
/**
* The maximum size for a metadata entry associated with an offset commit.
*/
public final int offsetMetadataMaxSize;
/**
* The generic group maximum size.
*/
@ -93,6 +98,7 @@ public class GroupCoordinatorConfig {
int consumerGroupMaxSize,
List<PartitionAssignor> consumerGroupAssignors,
int offsetsTopicSegmentBytes,
int offsetMetadataMaxSize,
int genericGroupMaxSize,
int genericGroupInitialRebalanceDelayMs,
int genericGroupNewMemberJoinTimeoutMs,
@ -105,6 +111,7 @@ public class GroupCoordinatorConfig {
this.consumerGroupMaxSize = consumerGroupMaxSize;
this.consumerGroupAssignors = consumerGroupAssignors;
this.offsetsTopicSegmentBytes = offsetsTopicSegmentBytes;
this.offsetMetadataMaxSize = offsetMetadataMaxSize;
this.genericGroupMaxSize = genericGroupMaxSize;
this.genericGroupInitialRebalanceDelayMs = genericGroupInitialRebalanceDelayMs;
this.genericGroupNewMemberJoinTimeoutMs = genericGroupNewMemberJoinTimeoutMs;

View File

@ -50,6 +50,7 @@ import org.apache.kafka.common.message.SyncGroupResponseData;
import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
import org.apache.kafka.common.message.TxnOffsetCommitResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.OffsetCommitRequest;
import org.apache.kafka.common.requests.RequestContext;
import org.apache.kafka.common.requests.TransactionResult;
import org.apache.kafka.common.utils.BufferSupplier;
@ -492,11 +493,51 @@ public class GroupCoordinatorService implements GroupCoordinator {
return FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
}
return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
"This API is not implemented yet."
// For backwards compatibility, we support offset commits for the empty groupId.
if (request.groupId() == null) {
return CompletableFuture.completedFuture(OffsetCommitRequest.getErrorResponse(
request,
Errors.INVALID_GROUP_ID
));
}
return runtime.scheduleWriteOperation(
"commit-offset",
topicPartitionFor(request.groupId()),
coordinator -> coordinator.commitOffset(context, request)
).exceptionally(exception -> {
if (exception instanceof UnknownTopicOrPartitionException ||
exception instanceof NotEnoughReplicasException) {
return OffsetCommitRequest.getErrorResponse(
request,
Errors.COORDINATOR_NOT_AVAILABLE
);
}
if (exception instanceof NotLeaderOrFollowerException ||
exception instanceof KafkaStorageException) {
return OffsetCommitRequest.getErrorResponse(
request,
Errors.NOT_COORDINATOR
);
}
if (exception instanceof RecordTooLargeException ||
exception instanceof RecordBatchTooLargeException ||
exception instanceof InvalidFetchSizeException) {
return OffsetCommitRequest.getErrorResponse(
request,
Errors.INVALID_COMMIT_OFFSET_SIZE
);
}
return OffsetCommitRequest.getErrorResponse(
request,
Errors.forException(exception)
);
});
}
/**
* See {@link GroupCoordinator#commitTransactionalOffsets(RequestContext, TxnOffsetCommitRequestData, BufferSupplier)}.
*/

View File

@ -16,6 +16,7 @@
*/
package org.apache.kafka.coordinator.group;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.ApiException;
@ -82,7 +83,6 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
@ -102,6 +102,7 @@ import static org.apache.kafka.coordinator.group.RecordHelpers.newGroupSubscript
import static org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionRecord;
import static org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionTombstoneRecord;
import static org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentTombstoneRecord;
import static org.apache.kafka.coordinator.group.Utils.ofSentinel;
import static org.apache.kafka.coordinator.group.generic.GenericGroupMember.EMPTY_ASSIGNMENT;
import static org.apache.kafka.coordinator.group.generic.GenericGroupState.COMPLETING_REBALANCE;
import static org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD;
@ -410,6 +411,17 @@ public class GroupMetadataManager {
return metadataImage;
}
/**
* @return The group corresponding to the group id or throw GroupIdNotFoundException.
*/
public Group group(String groupId) throws GroupIdNotFoundException {
Group group = groups.get(groupId);
if (group == null) {
throw new GroupIdNotFoundException(String.format("Group %s not found.", groupId));
}
return group;
}
/**
* Gets or maybe creates a consumer group.
*
@ -675,10 +687,6 @@ public class GroupMetadataManager {
.collect(Collectors.toList());
}
private OptionalInt ofSentinel(int value) {
return value != -1 ? OptionalInt.of(value) : OptionalInt.empty();
}
/**
* Handles a regular heartbeat from a consumer group member. It mainly consists of
* three parts:
@ -1804,21 +1812,22 @@ public class GroupMetadataManager {
responseFuture
);
} else {
Optional<Errors> memberError = validateExistingMember(
group,
try {
group.validateMember(
memberId,
groupInstanceId,
"join-group"
);
if (memberError.isPresent()) {
} catch (KafkaException ex) {
responseFuture.complete(new JoinGroupResponseData()
.setMemberId(memberId)
.setErrorCode(memberError.get().code())
.setErrorCode(Errors.forException(ex).code())
.setProtocolType(null)
.setProtocolName(null)
);
} else {
return EMPTY_RESULT;
}
GenericGroupMember member = group.member(memberId);
if (group.isInState(PREPARING_REBALANCE)) {
return updateMemberThenRebalanceOrCompleteJoin(
@ -1901,7 +1910,6 @@ public class GroupMetadataManager {
);
}
}
}
return EMPTY_RESULT;
}
@ -2149,48 +2157,6 @@ public class GroupMetadataManager {
return maybePrepareRebalanceOrCompleteJoin(group, joinReason);
}
/**
* We are validating two things:
* 1. If `groupInstanceId` is present, then it exists and is mapped to `memberId`
* 2. The `memberId` exists in the group
*
* @param group The generic group.
* @param memberId The member id.
* @param groupInstanceId The group instance id.
* @param operation The API operation.
*
* @return the error.
*/
private Optional<Errors> validateExistingMember(
GenericGroup group,
String memberId,
String groupInstanceId,
String operation
) {
if (groupInstanceId == null) {
if (!group.hasMemberId(memberId)) {
return Optional.of(Errors.UNKNOWN_MEMBER_ID);
} else {
return Optional.empty();
}
}
String existingMemberId = group.staticMemberId(groupInstanceId);
if (existingMemberId == null) {
return Optional.of(Errors.UNKNOWN_MEMBER_ID);
}
if (!existingMemberId.equals(memberId)) {
log.info("Request memberId={} for static member with groupInstanceId={} " +
"is fenced by existing memberId={} during operation {}",
memberId, groupInstanceId, existingMemberId, operation);
return Optional.of(Errors.FENCED_INSTANCE_ID);
}
return Optional.empty();
}
/**
* Add a member then rebalance or complete join.
*
@ -2444,7 +2410,7 @@ public class GroupMetadataManager {
* @param group The group.
* @param member The member.
*/
private void rescheduleGenericGroupMemberHeartbeat(
public void rescheduleGenericGroupMemberHeartbeat(
GenericGroup group,
GenericGroupMember member
) {
@ -2825,27 +2791,26 @@ public class GroupMetadataManager {
// finding the correct coordinator and rejoin.
return Optional.of(COORDINATOR_NOT_AVAILABLE);
} else {
Optional<Errors> memberError = validateExistingMember(
group,
try {
group.validateMember(
request.memberId(),
request.groupInstanceId(),
"sync-group"
);
if (memberError.isPresent()) {
return memberError;
} else {
} catch (KafkaException ex) {
return Optional.of(Errors.forException(ex));
}
if (request.generationId() != group.generationId()) {
return Optional.of(Errors.ILLEGAL_GENERATION);
} else if (isProtocolInconsistent(request.protocolType(), group.protocolType().orElse(null)) ||
isProtocolInconsistent(request.protocolName(), group.protocolName().orElse(null))) {
return Optional.of(Errors.INCONSISTENT_GROUP_PROTOCOL);
} else {
return Optional.empty();
}
}
}
}
private void removePendingSyncMember(
GenericGroup group,

View File

@ -16,7 +16,7 @@
*/
package org.apache.kafka.coordinator.group;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.message.OffsetCommitRequestData;
import org.apache.kafka.common.requests.OffsetCommitRequest;
import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
@ -24,11 +24,13 @@ import java.util.Objects;
import java.util.OptionalInt;
import java.util.OptionalLong;
import static org.apache.kafka.coordinator.group.Utils.ofSentinel;
/**
* Represents a committed offset with its metadata.
*/
public class OffsetAndMetadata {
public static final String NO_METADATA = "";
private static final String NO_METADATA = "";
/**
* The committed offset.
@ -114,12 +116,29 @@ public class OffsetAndMetadata {
) {
return new OffsetAndMetadata(
record.offset(),
record.leaderEpoch() == RecordBatch.NO_PARTITION_LEADER_EPOCH ?
OptionalInt.empty() : OptionalInt.of(record.leaderEpoch()),
ofSentinel(record.leaderEpoch()),
record.metadata(),
record.commitTimestamp(),
record.expireTimestamp() == OffsetCommitRequest.DEFAULT_TIMESTAMP ?
OptionalLong.empty() : OptionalLong.of(record.expireTimestamp())
ofSentinel(record.expireTimestamp())
);
}
/**
* @return An OffsetAndMetadata created from an OffsetCommitRequestPartition request.
*/
public static OffsetAndMetadata fromRequest(
OffsetCommitRequestData.OffsetCommitRequestPartition partition,
long currentTimeMs,
OptionalLong expireTimestampMs
) {
return new OffsetAndMetadata(
partition.committedOffset(),
ofSentinel(partition.committedLeaderEpoch()),
partition.committedMetadata() == null ?
OffsetAndMetadata.NO_METADATA : partition.committedMetadata(),
partition.commitTimestamp() == OffsetCommitRequest.DEFAULT_TIMESTAMP ?
currentTimeMs : partition.commitTimestamp(),
expireTimestampMs
);
}
}

View File

@ -0,0 +1,383 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.GroupIdNotFoundException;
import org.apache.kafka.common.errors.StaleMemberEpochException;
import org.apache.kafka.common.message.OffsetCommitRequestData;
import org.apache.kafka.common.message.OffsetCommitResponseData;
import org.apache.kafka.common.message.OffsetCommitResponseData.OffsetCommitResponseTopic;
import org.apache.kafka.common.message.OffsetCommitResponseData.OffsetCommitResponsePartition;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.OffsetCommitRequest;
import org.apache.kafka.common.requests.RequestContext;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
import org.apache.kafka.coordinator.group.generic.GenericGroup;
import org.apache.kafka.coordinator.group.generic.GenericGroupState;
import org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.apache.kafka.timeline.TimelineHashMap;
import org.slf4j.Logger;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.OptionalLong;
/**
* The OffsetMetadataManager manages the offsets of all the groups. It basically maintains
* a mapping from group id to topic-partition to offset. This class has two kinds of methods:
* 1) The request handlers which handle the requests and generate a response and records to
* mutate the hard state. Those records will be written by the runtime and applied to the
* hard state via the replay methods.
* 2) The replay methods which apply records to the hard state. Those are used in the request
* handling as well as during the initial loading of the records from the partitions.
*/
public class OffsetMetadataManager {
public static class Builder {
private LogContext logContext = null;
private SnapshotRegistry snapshotRegistry = null;
private Time time = null;
private GroupMetadataManager groupMetadataManager = null;
private int offsetMetadataMaxSize = 4096;
private MetadataImage metadataImage = null;
Builder withLogContext(LogContext logContext) {
this.logContext = logContext;
return this;
}
Builder withSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
this.snapshotRegistry = snapshotRegistry;
return this;
}
Builder withTime(Time time) {
this.time = time;
return this;
}
Builder withGroupMetadataManager(GroupMetadataManager groupMetadataManager) {
this.groupMetadataManager = groupMetadataManager;
return this;
}
Builder withOffsetMetadataMaxSize(int offsetMetadataMaxSize) {
this.offsetMetadataMaxSize = offsetMetadataMaxSize;
return this;
}
Builder withMetadataImage(MetadataImage metadataImage) {
this.metadataImage = metadataImage;
return this;
}
public OffsetMetadataManager build() {
if (logContext == null) logContext = new LogContext();
if (snapshotRegistry == null) snapshotRegistry = new SnapshotRegistry(logContext);
if (metadataImage == null) metadataImage = MetadataImage.EMPTY;
if (time == null) time = Time.SYSTEM;
if (groupMetadataManager == null) {
throw new IllegalArgumentException("GroupMetadataManager cannot be null");
}
return new OffsetMetadataManager(
snapshotRegistry,
logContext,
time,
metadataImage,
groupMetadataManager,
offsetMetadataMaxSize
);
}
}
/**
* The logger.
*/
private final Logger log;
/**
* The snapshot registry.
*/
private final SnapshotRegistry snapshotRegistry;
/**
* The system time.
*/
private final Time time;
/**
* The metadata image.
*/
private MetadataImage metadataImage;
/**
* The group metadata manager.
*/
private final GroupMetadataManager groupMetadataManager;
/**
* The maximum allowed metadata for any offset commit.
*/
private final int offsetMetadataMaxSize;
/**
* The offsets keyed by topic-partition and group id.
*/
private final TimelineHashMap<String, TimelineHashMap<TopicPartition, OffsetAndMetadata>> offsetsByGroup;
OffsetMetadataManager(
SnapshotRegistry snapshotRegistry,
LogContext logContext,
Time time,
MetadataImage metadataImage,
GroupMetadataManager groupMetadataManager,
int offsetMetadataMaxSize
) {
this.snapshotRegistry = snapshotRegistry;
this.log = logContext.logger(OffsetMetadataManager.class);
this.time = time;
this.metadataImage = metadataImage;
this.groupMetadataManager = groupMetadataManager;
this.offsetMetadataMaxSize = offsetMetadataMaxSize;
this.offsetsByGroup = new TimelineHashMap<>(snapshotRegistry, 0);
}
/**
* Validates an OffsetCommit request.
*
* @param context The request context.
* @param request The actual request.
*/
private Group validateOffsetCommit(
RequestContext context,
OffsetCommitRequestData request
) throws ApiException {
Group group;
try {
group = groupMetadataManager.group(request.groupId());
} catch (GroupIdNotFoundException ex) {
if (request.generationIdOrMemberEpoch() < 0) {
// If the group does not exist and generation id is -1, the request comes from
// either the admin client or a consumer which does not use the group management
// facility. In this case, a so-called simple group is created and the request
// is accepted.
group = groupMetadataManager.getOrMaybeCreateGenericGroup(request.groupId(), true);
} else {
if (context.header.apiVersion() >= 9) {
// Starting from version 9 of the OffsetCommit API, we return GROUP_ID_NOT_FOUND
// if the group does not exist. This error works for both the old and the new
// protocol for clients using this version of the API.
throw ex;
} else {
// For older version, we return ILLEGAL_GENERATION to preserve the backward
// compatibility.
throw Errors.ILLEGAL_GENERATION.exception();
}
}
}
try {
group.validateOffsetCommit(
request.memberId(),
request.groupInstanceId(),
request.generationIdOrMemberEpoch()
);
} catch (StaleMemberEpochException ex) {
// The STALE_MEMBER_EPOCH error is only returned for new consumer group (KIP-848). When
// it is, the member should be using the OffsetCommit API version >= 9. As we don't
// support upgrading from the old to the new protocol yet, we return UNSUPPORTED_VERSION
// error if an older version is used. We will revise this when the upgrade path is implemented.
if (context.header.apiVersion() >= 9) {
throw ex;
} else {
throw Errors.UNSUPPORTED_VERSION.exception();
}
}
return group;
}
/**
* Computes the expiration timestamp based on the retention time provided in the OffsetCommit
* request.
*
* The "default" expiration timestamp is defined as now + retention. The retention may be overridden
* in versions from v2 to v4. Otherwise, the retention defined on the broker is used. If an explicit
* commit timestamp is provided (v1 only), the expiration timestamp is computed based on that.
*
* @param retentionTimeMs The retention time in milliseconds.
* @param currentTimeMs The current time in milliseconds.
*
* @return An optional containing the expiration timestamp if defined; an empty optional otherwise.
*/
private static OptionalLong expireTimestampMs(
long retentionTimeMs,
long currentTimeMs
) {
return retentionTimeMs == OffsetCommitRequest.DEFAULT_RETENTION_TIME ?
OptionalLong.empty() : OptionalLong.of(currentTimeMs + retentionTimeMs);
}
/**
* Handles an OffsetCommit request.
*
* @param context The request context.
* @param request The OffsetCommit request.
*
* @return A Result containing the OffsetCommitResponseData response and
* a list of records to update the state machine.
*/
public CoordinatorResult<OffsetCommitResponseData, Record> commitOffset(
RequestContext context,
OffsetCommitRequestData request
) throws ApiException {
Group group = validateOffsetCommit(context, request);
// In the old consumer group protocol, the offset commits maintain the session if
// the group is in Stable or PreparingRebalance state.
if (group.type() == Group.GroupType.GENERIC) {
GenericGroup genericGroup = (GenericGroup) group;
if (genericGroup.isInState(GenericGroupState.STABLE) || genericGroup.isInState(GenericGroupState.PREPARING_REBALANCE)) {
groupMetadataManager.rescheduleGenericGroupMemberHeartbeat(
genericGroup,
genericGroup.member(request.memberId())
);
}
}
final OffsetCommitResponseData response = new OffsetCommitResponseData();
final List<Record> records = new ArrayList<>();
final long currentTimeMs = time.milliseconds();
final OptionalLong expireTimestampMs = expireTimestampMs(request.retentionTimeMs(), currentTimeMs);
request.topics().forEach(topic -> {
final OffsetCommitResponseTopic topicResponse = new OffsetCommitResponseTopic().setName(topic.name());
response.topics().add(topicResponse);
topic.partitions().forEach(partition -> {
if (partition.committedMetadata() != null && partition.committedMetadata().length() > offsetMetadataMaxSize) {
topicResponse.partitions().add(new OffsetCommitResponsePartition()
.setPartitionIndex(partition.partitionIndex())
.setErrorCode(Errors.OFFSET_METADATA_TOO_LARGE.code()));
} else {
log.debug("[GroupId {}] Committing offsets {} for partition {}-{} from member {} with leader epoch {}.",
request.groupId(), partition.committedOffset(), topic.name(), partition.partitionIndex(),
request.memberId(), partition.committedLeaderEpoch());
topicResponse.partitions().add(new OffsetCommitResponsePartition()
.setPartitionIndex(partition.partitionIndex())
.setErrorCode(Errors.NONE.code()));
final OffsetAndMetadata offsetAndMetadata = OffsetAndMetadata.fromRequest(
partition,
currentTimeMs,
expireTimestampMs
);
records.add(RecordHelpers.newOffsetCommitRecord(
request.groupId(),
topic.name(),
partition.partitionIndex(),
offsetAndMetadata,
metadataImage.features().metadataVersion()
));
}
});
});
return new CoordinatorResult<>(records, response);
}
/**
* Replays OffsetCommitKey/Value to update or delete the corresponding offsets.
*
* @param key A OffsetCommitKey key.
* @param value A OffsetCommitValue value.
*/
public void replay(
OffsetCommitKey key,
OffsetCommitValue value
) {
final String groupId = key.group();
final TopicPartition tp = new TopicPartition(key.topic(), key.partition());
if (value != null) {
// The generic or consumer group should exist when offsets are committed or
// replayed. However, it won't if the consumer commits offsets but does not
// use the membership functionality. In this case, we automatically create
// a so-called "simple consumer group". This is an empty generic group
// without a protocol type.
try {
groupMetadataManager.group(groupId);
} catch (GroupIdNotFoundException ex) {
groupMetadataManager.getOrMaybeCreateGenericGroup(groupId, true);
}
final OffsetAndMetadata offsetAndMetadata = OffsetAndMetadata.fromRecord(value);
TimelineHashMap<TopicPartition, OffsetAndMetadata> offsets = offsetsByGroup.get(groupId);
if (offsets == null) {
offsets = new TimelineHashMap<>(snapshotRegistry, 0);
offsetsByGroup.put(groupId, offsets);
}
offsets.put(tp, offsetAndMetadata);
} else {
TimelineHashMap<TopicPartition, OffsetAndMetadata> offsets = offsetsByGroup.get(groupId);
if (offsets != null) {
offsets.remove(tp);
if (offsets.isEmpty()) {
offsetsByGroup.remove(groupId);
}
}
}
}
/**
* A new metadata image is available.
*
* @param newImage The new metadata image.
* @param delta The delta image.
*/
public void onNewMetadataImage(MetadataImage newImage, MetadataDelta delta) {
metadataImage = newImage;
}
/**
* @return The offset for the provided groupId and topic partition or null
* if it does not exist.
*
* package-private for testing.
*/
OffsetAndMetadata offset(String groupId, TopicPartition tp) {
Map<TopicPartition, OffsetAndMetadata> offsets = offsetsByGroup.get(groupId);
if (offsets == null) {
return null;
} else {
return offsets.get(tp);
}
}
}

View File

@ -21,8 +21,11 @@ import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
import org.apache.kafka.common.message.JoinGroupRequestData;
import org.apache.kafka.common.message.JoinGroupResponseData;
import org.apache.kafka.common.message.OffsetCommitRequestData;
import org.apache.kafka.common.message.OffsetCommitResponseData;
import org.apache.kafka.common.message.SyncGroupRequestData;
import org.apache.kafka.common.message.SyncGroupResponseData;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.requests.RequestContext;
import org.apache.kafka.common.utils.LogContext;
@ -41,6 +44,8 @@ import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmen
import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
import org.apache.kafka.coordinator.group.generated.GroupMetadataKey;
import org.apache.kafka.coordinator.group.generated.GroupMetadataValue;
import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
import org.apache.kafka.coordinator.group.runtime.Coordinator;
import org.apache.kafka.coordinator.group.runtime.CoordinatorBuilder;
import org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
@ -132,21 +137,32 @@ public class ReplicatedGroupCoordinator implements Coordinator<Record> {
if (topicPartition == null)
throw new IllegalArgumentException("TopicPartition must be set.");
return new ReplicatedGroupCoordinator(
new GroupMetadataManager.Builder()
GroupMetadataManager groupMetadataManager = new GroupMetadataManager.Builder()
.withLogContext(logContext)
.withSnapshotRegistry(snapshotRegistry)
.withTime(time)
.withTimer(timer)
.withTopicPartition(topicPartition)
.withAssignors(config.consumerGroupAssignors)
.withConsumerGroupMaxSize(config.consumerGroupMaxSize)
.withConsumerGroupHeartbeatInterval(config.consumerGroupHeartbeatIntervalMs)
.withTopicPartition(topicPartition)
.withGenericGroupInitialRebalanceDelayMs(config.genericGroupInitialRebalanceDelayMs)
.withGenericGroupNewMemberJoinTimeoutMs(config.genericGroupNewMemberJoinTimeoutMs)
.withGenericGroupMinSessionTimeoutMs(config.genericGroupMinSessionTimeoutMs)
.withGenericGroupMaxSessionTimeoutMs(config.genericGroupMaxSessionTimeoutMs)
.build()
.build();
OffsetMetadataManager offsetMetadataManager = new OffsetMetadataManager.Builder()
.withLogContext(logContext)
.withSnapshotRegistry(snapshotRegistry)
.withTime(time)
.withGroupMetadataManager(groupMetadataManager)
.withOffsetMetadataMaxSize(config.offsetMetadataMaxSize)
.build();
return new ReplicatedGroupCoordinator(
groupMetadataManager,
offsetMetadataManager
);
}
}
@ -156,15 +172,23 @@ public class ReplicatedGroupCoordinator implements Coordinator<Record> {
*/
private final GroupMetadataManager groupMetadataManager;
/**
* The offset metadata manager.
*/
private final OffsetMetadataManager offsetMetadataManager;
/**
* Constructor.
*
* @param groupMetadataManager The group metadata manager.
* @param offsetMetadataManager The offset metadata manager.
*/
ReplicatedGroupCoordinator(
GroupMetadataManager groupMetadataManager
GroupMetadataManager groupMetadataManager,
OffsetMetadataManager offsetMetadataManager
) {
this.groupMetadataManager = groupMetadataManager;
this.offsetMetadataManager = offsetMetadataManager;
}
/**
@ -225,6 +249,22 @@ public class ReplicatedGroupCoordinator implements Coordinator<Record> {
);
}
/**
* Handles a OffsetCommit request.
*
* @param context The request context.
* @param request The actual OffsetCommit request.
*
* @return A Result containing the OffsetCommitResponse response and
* a list of records to update the state machine.
*/
public CoordinatorResult<OffsetCommitResponseData, Record> commitOffset(
RequestContext context,
OffsetCommitRequestData request
) throws ApiException {
return offsetMetadataManager.commitOffset(context, request);
}
/**
* The coordinator has been loaded. This is used to apply any
* post loading operations (e.g. registering timers).
@ -233,7 +273,10 @@ public class ReplicatedGroupCoordinator implements Coordinator<Record> {
*/
@Override
public void onLoaded(MetadataImage newImage) {
groupMetadataManager.onNewMetadataImage(newImage, new MetadataDelta(newImage));
MetadataDelta emptyDelta = new MetadataDelta(newImage);
groupMetadataManager.onNewMetadataImage(newImage, emptyDelta);
offsetMetadataManager.onNewMetadataImage(newImage, emptyDelta);
groupMetadataManager.onLoaded();
}
@ -246,6 +289,7 @@ public class ReplicatedGroupCoordinator implements Coordinator<Record> {
@Override
public void onNewMetadataImage(MetadataImage newImage, MetadataDelta delta) {
groupMetadataManager.onNewMetadataImage(newImage, delta);
offsetMetadataManager.onNewMetadataImage(newImage, delta);
}
/**
@ -271,6 +315,14 @@ public class ReplicatedGroupCoordinator implements Coordinator<Record> {
ApiMessageAndVersion value = record.value();
switch (key.version()) {
case 0:
case 1:
offsetMetadataManager.replay(
(OffsetCommitKey) key.message(),
(OffsetCommitValue) messageOrNull(value)
);
break;
case 2:
groupMetadataManager.replay(
(GroupMetadataKey) key.message(),

View File

@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group;
import java.util.OptionalInt;
import java.util.OptionalLong;
public class Utils {
private Utils() {}
/**
* @return An OptionalInt containing the value iff the value is different from
* the sentinel (or default) value -1.
*/
public static OptionalInt ofSentinel(int value) {
return value != -1 ? OptionalInt.of(value) : OptionalInt.empty();
}
/**
* @return An OptionalLong containing the value iff the value is different from
* the sentinel (or default) value -1.
*/
public static OptionalLong ofSentinel(long value) {
return value != -1 ? OptionalLong.of(value) : OptionalLong.empty();
}
}

View File

@ -17,7 +17,9 @@
package org.apache.kafka.coordinator.group.consumer;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.StaleMemberEpochException;
import org.apache.kafka.common.errors.UnknownMemberIdException;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.coordinator.group.Group;
import org.apache.kafka.image.TopicImage;
import org.apache.kafka.image.TopicsImage;
@ -497,6 +499,30 @@ public class ConsumerGroup implements Group {
return metadataRefreshDeadline;
}
/**
* Validates the OffsetCommit request.
*
* @param memberId The member id.
* @param groupInstanceId The group instance id.
* @param memberEpoch The member epoch.
*/
@Override
public void validateOffsetCommit(
String memberId,
String groupInstanceId,
int memberEpoch
) throws UnknownMemberIdException, StaleMemberEpochException {
// When the member epoch is -1, the request comes from either the admin client
// or a consumer which does not use the group management facility. In this case,
// the request can commit offsets if the group is empty.
if (memberEpoch < 0 && members().isEmpty()) return;
final ConsumerGroupMember member = getOrMaybeCreateMember(memberId, false);
if (memberEpoch != member.memberEpoch()) {
throw Errors.STALE_MEMBER_EPOCH.exception();
}
}
/**
* Updates the current state of the group.
*/

View File

@ -17,6 +17,10 @@
package org.apache.kafka.coordinator.group.generic;
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
import org.apache.kafka.common.errors.FencedInstanceIdException;
import org.apache.kafka.common.errors.IllegalGenerationException;
import org.apache.kafka.common.errors.UnknownMemberIdException;
import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocolCollection;
import org.apache.kafka.common.message.JoinGroupResponseData;
import org.apache.kafka.common.message.ListGroupsResponseData;
@ -738,6 +742,84 @@ public class GenericGroup implements Group {
.orElseGet(() -> clientId + MEMBER_ID_DELIMITER + UUID.randomUUID());
}
/**
* Validates that (1) the group instance id exists and is mapped to the member id
* if the group instance id is provided; and (2) the member id exists in the group.
*
* @param memberId The member id.
* @param groupInstanceId The group instance id.
* @param operation The operation.
*
* @throws UnknownMemberIdException
* @throws FencedInstanceIdException
*/
public void validateMember(
String memberId,
String groupInstanceId,
String operation
) throws UnknownMemberIdException, FencedInstanceIdException {
if (groupInstanceId != null) {
String existingMemberId = staticMemberId(groupInstanceId);
if (existingMemberId == null) {
throw Errors.UNKNOWN_MEMBER_ID.exception();
} else if (!existingMemberId.equals(memberId)) {
log.info("Request memberId={} for static member with groupInstanceId={} " +
"is fenced by existing memberId={} during operation {}",
memberId, groupInstanceId, existingMemberId, operation);
throw Errors.FENCED_INSTANCE_ID.exception();
}
}
if (!hasMemberId(memberId)) {
throw Errors.UNKNOWN_MEMBER_ID.exception();
}
}
/**
* Validates the OffsetCommit request.
*
* @param memberId The member id.
* @param groupInstanceId The group instance id.
* @param generationId The generation id.
*/
@Override
public void validateOffsetCommit(
String memberId,
String groupInstanceId,
int generationId
) throws CoordinatorNotAvailableException, UnknownMemberIdException, IllegalGenerationException, FencedInstanceIdException {
if (isInState(DEAD)) {
throw Errors.COORDINATOR_NOT_AVAILABLE.exception();
}
if (generationId < 0 && isInState(EMPTY)) {
// When the generation id is -1, the request comes from either the admin client
// or a consumer which does not use the group management facility. In this case,
// the request can commit offsets if the group is empty.
return;
}
if (generationId >= 0 || !memberId.isEmpty() || groupInstanceId != null) {
validateMember(memberId, groupInstanceId, "offset-commit");
if (generationId != this.generationId) {
throw Errors.ILLEGAL_GENERATION.exception();
}
} else if (!isInState(EMPTY)) {
// If the request does not contain the member id and the generation id (version 0),
// offset commits are only accepted when the group is empty.
throw Errors.UNKNOWN_MEMBER_ID.exception();
}
if (isInState(COMPLETING_REBALANCE)) {
// We should not receive a commit request if the group has not completed rebalance;
// but since the consumer's member.id and generation is valid, it means it has received
// the latest group generation information from the JoinResponse.
// So let's return a REBALANCE_IN_PROGRESS to let consumer handle it gracefully.
throw Errors.REBALANCE_IN_PROGRESS.exception();
}
}
/**
* Verify the member id is up to date for static members. Return true if both conditions met:
* 1. given member is a known static member to group

View File

@ -35,6 +35,7 @@ public class GroupCoordinatorConfigTest {
55,
Collections.singletonList(assignor),
2222,
3333,
60,
3000,
5 * 60 * 1000,
@ -48,6 +49,7 @@ public class GroupCoordinatorConfigTest {
assertEquals(55, config.consumerGroupMaxSize);
assertEquals(Collections.singletonList(assignor), config.consumerGroupAssignors);
assertEquals(2222, config.offsetsTopicSegmentBytes);
assertEquals(3333, config.offsetMetadataMaxSize);
assertEquals(60, config.genericGroupMaxSize);
assertEquals(3000, config.genericGroupInitialRebalanceDelayMs);
assertEquals(5 * 60 * 1000, config.genericGroupNewMemberJoinTimeoutMs);

View File

@ -91,6 +91,7 @@ public class GroupCoordinatorServiceTest {
Integer.MAX_VALUE,
Collections.singletonList(new RangeAssignor()),
1000,
4096,
Integer.MAX_VALUE,
3000,
5 * 60 * 1000,

View File

@ -16,7 +16,9 @@
*/
package org.apache.kafka.coordinator.group;
import org.apache.kafka.common.message.OffsetCommitRequestData;
import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
import org.apache.kafka.server.util.MockTime;
import org.junit.jupiter.api.Test;
import java.util.OptionalInt;
@ -71,4 +73,64 @@ public class OffsetAndMetadataTest {
OptionalLong.of(5678L)
), OffsetAndMetadata.fromRecord(record));
}
@Test
public void testFromRequest() {
MockTime time = new MockTime();
OffsetCommitRequestData.OffsetCommitRequestPartition partition =
new OffsetCommitRequestData.OffsetCommitRequestPartition()
.setPartitionIndex(0)
.setCommittedOffset(100L)
.setCommittedLeaderEpoch(-1)
.setCommittedMetadata(null)
.setCommitTimestamp(-1L);
assertEquals(
new OffsetAndMetadata(
100L,
OptionalInt.empty(),
"",
time.milliseconds(),
OptionalLong.empty()
), OffsetAndMetadata.fromRequest(
partition,
time.milliseconds(),
OptionalLong.empty()
)
);
partition
.setCommittedLeaderEpoch(10)
.setCommittedMetadata("hello")
.setCommitTimestamp(1234L);
assertEquals(
new OffsetAndMetadata(
100L,
OptionalInt.of(10),
"hello",
1234L,
OptionalLong.empty()
), OffsetAndMetadata.fromRequest(
partition,
time.milliseconds(),
OptionalLong.empty()
)
);
assertEquals(
new OffsetAndMetadata(
100L,
OptionalInt.of(10),
"hello",
1234L,
OptionalLong.of(5678L)
), OffsetAndMetadata.fromRequest(
partition,
time.milliseconds(),
OptionalLong.of(5678L)
)
);
}
}

View File

@ -18,6 +18,8 @@ package org.apache.kafka.coordinator.group;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
import org.apache.kafka.common.message.OffsetCommitRequestData;
import org.apache.kafka.common.message.OffsetCommitResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.requests.RequestContext;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
@ -34,6 +36,8 @@ import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmen
import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
import org.apache.kafka.coordinator.group.generated.GroupMetadataKey;
import org.apache.kafka.coordinator.group.generated.GroupMetadataValue;
import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
import org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.server.common.ApiMessageAndVersion;
@ -56,8 +60,10 @@ public class ReplicatedGroupCoordinatorTest {
@Test
public void testConsumerGroupHeartbeat() {
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
ReplicatedGroupCoordinator coordinator = new ReplicatedGroupCoordinator(
groupMetadataManager
groupMetadataManager,
offsetMetadataManager
);
RequestContext context = requestContext(ApiKeys.CONSUMER_GROUP_HEARTBEAT);
@ -75,11 +81,86 @@ public class ReplicatedGroupCoordinatorTest {
assertEquals(result, coordinator.consumerGroupHeartbeat(context, request));
}
@Test
public void testCommitOffset() {
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
ReplicatedGroupCoordinator coordinator = new ReplicatedGroupCoordinator(
groupMetadataManager,
offsetMetadataManager
);
RequestContext context = requestContext(ApiKeys.OFFSET_COMMIT);
OffsetCommitRequestData request = new OffsetCommitRequestData();
CoordinatorResult<OffsetCommitResponseData, Record> result = new CoordinatorResult<>(
Collections.emptyList(),
new OffsetCommitResponseData()
);
when(coordinator.commitOffset(
context,
request
)).thenReturn(result);
assertEquals(result, coordinator.commitOffset(context, request));
}
@Test
public void testReplayOffsetCommit() {
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
ReplicatedGroupCoordinator coordinator = new ReplicatedGroupCoordinator(
groupMetadataManager,
offsetMetadataManager
);
OffsetCommitKey key = new OffsetCommitKey();
OffsetCommitValue value = new OffsetCommitValue();
coordinator.replay(new Record(
new ApiMessageAndVersion(key, (short) 0),
new ApiMessageAndVersion(value, (short) 0)
));
coordinator.replay(new Record(
new ApiMessageAndVersion(key, (short) 1),
new ApiMessageAndVersion(value, (short) 0)
));
verify(offsetMetadataManager, times(2)).replay(key, value);
}
@Test
public void testReplayOffsetCommitWithNullValue() {
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
ReplicatedGroupCoordinator coordinator = new ReplicatedGroupCoordinator(
groupMetadataManager,
offsetMetadataManager
);
OffsetCommitKey key = new OffsetCommitKey();
coordinator.replay(new Record(
new ApiMessageAndVersion(key, (short) 0),
null
));
coordinator.replay(new Record(
new ApiMessageAndVersion(key, (short) 1),
null
));
verify(offsetMetadataManager, times(2)).replay(key, null);
}
@Test
public void testReplayConsumerGroupMetadata() {
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
ReplicatedGroupCoordinator coordinator = new ReplicatedGroupCoordinator(
groupMetadataManager
groupMetadataManager,
offsetMetadataManager
);
ConsumerGroupMetadataKey key = new ConsumerGroupMetadataKey();
@ -96,8 +177,10 @@ public class ReplicatedGroupCoordinatorTest {
@Test
public void testReplayConsumerGroupMetadataWithNullValue() {
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
ReplicatedGroupCoordinator coordinator = new ReplicatedGroupCoordinator(
groupMetadataManager
groupMetadataManager,
offsetMetadataManager
);
ConsumerGroupMetadataKey key = new ConsumerGroupMetadataKey();
@ -113,8 +196,10 @@ public class ReplicatedGroupCoordinatorTest {
@Test
public void testReplayConsumerGroupPartitionMetadata() {
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
ReplicatedGroupCoordinator coordinator = new ReplicatedGroupCoordinator(
groupMetadataManager
groupMetadataManager,
offsetMetadataManager
);
ConsumerGroupPartitionMetadataKey key = new ConsumerGroupPartitionMetadataKey();
@ -131,8 +216,10 @@ public class ReplicatedGroupCoordinatorTest {
@Test
public void testReplayConsumerGroupPartitionMetadataWithNullValue() {
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
ReplicatedGroupCoordinator coordinator = new ReplicatedGroupCoordinator(
groupMetadataManager
groupMetadataManager,
offsetMetadataManager
);
ConsumerGroupPartitionMetadataKey key = new ConsumerGroupPartitionMetadataKey();
@ -148,8 +235,10 @@ public class ReplicatedGroupCoordinatorTest {
@Test
public void testReplayConsumerGroupMemberMetadata() {
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
ReplicatedGroupCoordinator coordinator = new ReplicatedGroupCoordinator(
groupMetadataManager
groupMetadataManager,
offsetMetadataManager
);
ConsumerGroupMemberMetadataKey key = new ConsumerGroupMemberMetadataKey();
@ -166,8 +255,10 @@ public class ReplicatedGroupCoordinatorTest {
@Test
public void testReplayConsumerGroupMemberMetadataWithNullValue() {
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
ReplicatedGroupCoordinator coordinator = new ReplicatedGroupCoordinator(
groupMetadataManager
groupMetadataManager,
offsetMetadataManager
);
ConsumerGroupMemberMetadataKey key = new ConsumerGroupMemberMetadataKey();
@ -183,8 +274,10 @@ public class ReplicatedGroupCoordinatorTest {
@Test
public void testReplayConsumerGroupTargetAssignmentMetadata() {
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
ReplicatedGroupCoordinator coordinator = new ReplicatedGroupCoordinator(
groupMetadataManager
groupMetadataManager,
offsetMetadataManager
);
ConsumerGroupTargetAssignmentMetadataKey key = new ConsumerGroupTargetAssignmentMetadataKey();
@ -201,8 +294,10 @@ public class ReplicatedGroupCoordinatorTest {
@Test
public void testReplayConsumerGroupTargetAssignmentMetadataWithNullValue() {
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
ReplicatedGroupCoordinator coordinator = new ReplicatedGroupCoordinator(
groupMetadataManager
groupMetadataManager,
offsetMetadataManager
);
ConsumerGroupTargetAssignmentMetadataKey key = new ConsumerGroupTargetAssignmentMetadataKey();
@ -218,8 +313,10 @@ public class ReplicatedGroupCoordinatorTest {
@Test
public void testReplayConsumerGroupTargetAssignmentMember() {
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
ReplicatedGroupCoordinator coordinator = new ReplicatedGroupCoordinator(
groupMetadataManager
groupMetadataManager,
offsetMetadataManager
);
ConsumerGroupTargetAssignmentMemberKey key = new ConsumerGroupTargetAssignmentMemberKey();
@ -236,8 +333,10 @@ public class ReplicatedGroupCoordinatorTest {
@Test
public void testReplayConsumerGroupTargetAssignmentMemberKeyWithNullValue() {
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
ReplicatedGroupCoordinator coordinator = new ReplicatedGroupCoordinator(
groupMetadataManager
groupMetadataManager,
offsetMetadataManager
);
ConsumerGroupTargetAssignmentMemberKey key = new ConsumerGroupTargetAssignmentMemberKey();
@ -253,8 +352,10 @@ public class ReplicatedGroupCoordinatorTest {
@Test
public void testReplayConsumerGroupCurrentMemberAssignment() {
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
ReplicatedGroupCoordinator coordinator = new ReplicatedGroupCoordinator(
groupMetadataManager
groupMetadataManager,
offsetMetadataManager
);
ConsumerGroupCurrentMemberAssignmentKey key = new ConsumerGroupCurrentMemberAssignmentKey();
@ -271,8 +372,10 @@ public class ReplicatedGroupCoordinatorTest {
@Test
public void testReplayConsumerGroupCurrentMemberAssignmentWithNullValue() {
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
ReplicatedGroupCoordinator coordinator = new ReplicatedGroupCoordinator(
groupMetadataManager
groupMetadataManager,
offsetMetadataManager
);
ConsumerGroupCurrentMemberAssignmentKey key = new ConsumerGroupCurrentMemberAssignmentKey();
@ -288,8 +391,10 @@ public class ReplicatedGroupCoordinatorTest {
@Test
public void testReplayKeyCannotBeNull() {
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
ReplicatedGroupCoordinator coordinator = new ReplicatedGroupCoordinator(
groupMetadataManager
groupMetadataManager,
offsetMetadataManager
);
assertThrows(NullPointerException.class, () -> coordinator.replay(new Record(null, null)));
@ -298,8 +403,10 @@ public class ReplicatedGroupCoordinatorTest {
@Test
public void testReplayWithUnsupportedVersion() {
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
ReplicatedGroupCoordinator coordinator = new ReplicatedGroupCoordinator(
groupMetadataManager
groupMetadataManager,
offsetMetadataManager
);
ConsumerGroupCurrentMemberAssignmentKey key = new ConsumerGroupCurrentMemberAssignmentKey();
@ -315,8 +422,10 @@ public class ReplicatedGroupCoordinatorTest {
public void testOnLoaded() {
MetadataImage image = MetadataImage.EMPTY;
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
ReplicatedGroupCoordinator coordinator = new ReplicatedGroupCoordinator(
groupMetadataManager
groupMetadataManager,
offsetMetadataManager
);
coordinator.onLoaded(image);
@ -332,8 +441,10 @@ public class ReplicatedGroupCoordinatorTest {
@Test
public void testReplayGroupMetadata() {
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
ReplicatedGroupCoordinator coordinator = new ReplicatedGroupCoordinator(
groupMetadataManager
groupMetadataManager,
offsetMetadataManager
);
GroupMetadataKey key = new GroupMetadataKey();
@ -350,8 +461,10 @@ public class ReplicatedGroupCoordinatorTest {
@Test
public void testReplayGroupMetadataWithNullValue() {
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
ReplicatedGroupCoordinator coordinator = new ReplicatedGroupCoordinator(
groupMetadataManager
groupMetadataManager,
offsetMetadataManager
);
GroupMetadataKey key = new GroupMetadataKey();

View File

@ -17,6 +17,7 @@
package org.apache.kafka.coordinator.group.consumer;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.StaleMemberEpochException;
import org.apache.kafka.common.errors.UnknownMemberIdException;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
@ -591,4 +592,31 @@ public class ConsumerGroupTest {
assertEquals(0L, group.metadataRefreshDeadline().deadlineMs);
assertEquals(0, group.metadataRefreshDeadline().epoch);
}
@Test
public void testValidateOffsetCommit() {
ConsumerGroup group = createConsumerGroup("group-foo");
// Simulate a call from the admin client without member id and member epoch.
// This should pass only if the group is empty.
group.validateOffsetCommit("", "", -1);
// The member does not exist.
assertThrows(UnknownMemberIdException.class, () ->
group.validateOffsetCommit("member-id", null, 0));
// Create a member.
group.getOrMaybeCreateMember("member-id", true);
// A call from the admin client should fail as the group is not empty.
assertThrows(UnknownMemberIdException.class, () ->
group.validateOffsetCommit("", "", -1));
// The member epoch is stale.
assertThrows(StaleMemberEpochException.class, () ->
group.validateOffsetCommit("member-id", "", 10));
// This should succeed.
group.validateOffsetCommit("member-id", "", 0);
}
}

View File

@ -18,6 +18,11 @@ package org.apache.kafka.coordinator.group.generic;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
import org.apache.kafka.common.errors.FencedInstanceIdException;
import org.apache.kafka.common.errors.IllegalGenerationException;
import org.apache.kafka.common.errors.RebalanceInProgressException;
import org.apache.kafka.common.errors.UnknownMemberIdException;
import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocol;
import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocolCollection;
import org.apache.kafka.common.message.JoinGroupResponseData;
@ -956,6 +961,71 @@ public class GenericGroupTest {
assertTrue(group.isLeader(memberId));
}
@Test
public void testValidateOffsetCommit() {
// A call from the admin client without any parameters should pass.
group.validateOffsetCommit("", "", -1);
// Add a member.
group.add(new GenericGroupMember(
"member-id",
Optional.of("instance-id"),
"",
"",
100,
100,
"consumer",
new JoinGroupRequestProtocolCollection(Collections.singletonList(
new JoinGroupRequestProtocol()
.setName("roundrobin")
.setMetadata(new byte[0])).iterator())
));
group.transitionTo(PREPARING_REBALANCE);
group.initNextGeneration();
// No parameters and the group is not empty.
assertThrows(UnknownMemberIdException.class,
() -> group.validateOffsetCommit("", "", -1));
// The member id does not exist.
assertThrows(UnknownMemberIdException.class,
() -> group.validateOffsetCommit("unknown", "unknown", -1));
// The instance id does not exist.
assertThrows(UnknownMemberIdException.class,
() -> group.validateOffsetCommit("member-id", "unknown", -1));
// The generation id is invalid.
assertThrows(IllegalGenerationException.class,
() -> group.validateOffsetCommit("member-id", "instance-id", 0));
// Group is in prepare rebalance state.
assertThrows(RebalanceInProgressException.class,
() -> group.validateOffsetCommit("member-id", "instance-id", 1));
// Group transitions to stable.
group.transitionTo(STABLE);
// This should work.
group.validateOffsetCommit("member-id", "instance-id", 1);
// Replace static member.
group.replaceStaticMember("instance-id", "member-id", "new-member-id");
// The old instance id should be fenced.
assertThrows(FencedInstanceIdException.class,
() -> group.validateOffsetCommit("member-id", "instance-id", 1));
// Remove member and transitions to dead.
group.remove("new-instance-id");
group.transitionTo(DEAD);
// This should fail with CoordinatorNotAvailableException.
assertThrows(CoordinatorNotAvailableException.class,
() -> group.validateOffsetCommit("member-id", "new-instance-id", 1));
}
private void assertState(GenericGroup group, GenericGroupState targetState) {
Set<GenericGroupState> otherStates = new HashSet<>();
otherStates.add(STABLE);