MINOR: Update GroupCoordinator interface to use AuthorizableRequestContext instead of RequestContext (#19485)

This patch updates the `GroupCoordinator` interface to use
`AuthorizableRequestContext` instead of using `RequestContext`. It makes
the interface more generic. The only downside is that the request
version in `AuthorizableRequestContext` is an `int` instead of a `short`
so we had to adapt it in a few places. We opted for using `int` directly
wherever possible.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Rajini Sivaram <rajinisivaram@googlemail.com>
This commit is contained in:
David Jacot 2025-04-16 18:12:11 +02:00 committed by GitHub
parent 18e4608d1c
commit 6e26ec06bb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
18 changed files with 142 additions and 142 deletions

View File

@ -96,7 +96,7 @@ public class JoinGroupRequest extends AbstractRequest {
*
* @return whether a known member id is required or not.
*/
public static boolean requiresKnownMemberId(short apiVersion) {
public static boolean requiresKnownMemberId(int apiVersion) {
return apiVersion >= 4;
}
@ -117,7 +117,7 @@ public class JoinGroupRequest extends AbstractRequest {
*/
public static boolean requiresKnownMemberId(
JoinGroupRequestData request,
short apiVersion
int apiVersion
) {
return request.groupInstanceId() == null
&& request.memberId().equals(UNKNOWN_MEMBER_ID)
@ -150,7 +150,7 @@ public class JoinGroupRequest extends AbstractRequest {
* @return whether the version supports skipping assignment.
*/
public static boolean supportsSkippingAssignment(short apiVersion) {
public static boolean supportsSkippingAssignment(int apiVersion) {
return apiVersion >= 9;
}

View File

@ -2236,7 +2236,7 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
short producerEpoch,
Duration timeout,
CoordinatorWriteOperation<S, T, U> op,
Short apiVersion
int apiVersion
) {
throwIfNotRunning();
log.debug("Scheduled execution of transactional write operation {}.", name);

View File

@ -105,7 +105,7 @@ public interface PartitionWriter {
String transactionalId,
long producerId,
short producerEpoch,
short apiVersion
int apiVersion
) throws KafkaException;
/**

View File

@ -129,7 +129,7 @@ public class InMemoryPartitionWriter implements PartitionWriter {
String transactionalId,
long producerId,
short producerEpoch,
short apiVersion
int apiVersion
) throws KafkaException {
return CompletableFuture.completedFuture(new VerificationGuard());
}

View File

@ -107,7 +107,7 @@ class CoordinatorPartitionWriter(
transactionalId: String,
producerId: Long,
producerEpoch: Short,
apiVersion: Short
apiVersion: Int
): CompletableFuture[VerificationGuard] = {
val transactionSupportedOperation = AddPartitionsToTxnManager.txnOffsetCommitRequestVersionToTransactionSupportedOperation(apiVersion)
val future = new CompletableFuture[VerificationGuard]()

View File

@ -51,7 +51,7 @@ object AddPartitionsToTxnManager {
}
}
def txnOffsetCommitRequestVersionToTransactionSupportedOperation(version: Short): TransactionSupportedOperation = {
def txnOffsetCommitRequestVersionToTransactionSupportedOperation(version: Int): TransactionSupportedOperation = {
if (version > 4) {
addPartition
} else if (version > 3) {

View File

@ -178,7 +178,7 @@ class CoordinatorPartitionWriterTest {
"transactional-id",
10L,
5.toShort,
ApiKeys.TXN_OFFSET_COMMIT.latestVersion()
ApiKeys.TXN_OFFSET_COMMIT.latestVersion().toInt
)
if (error == Errors.NONE) {

View File

@ -117,7 +117,7 @@ public interface Group {
String groupInstanceId,
int generationIdOrMemberEpoch,
boolean isTransactional,
short apiVersion
int apiVersion
) throws KafkaException;

View File

@ -49,12 +49,12 @@ import org.apache.kafka.common.message.SyncGroupRequestData;
import org.apache.kafka.common.message.SyncGroupResponseData;
import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
import org.apache.kafka.common.message.TxnOffsetCommitResponseData;
import org.apache.kafka.common.requests.RequestContext;
import org.apache.kafka.common.requests.TransactionResult;
import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.coordinator.group.streams.StreamsGroupHeartbeatResult;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
import java.time.Duration;
import java.util.List;
@ -80,7 +80,7 @@ public interface GroupCoordinator {
* The error code(s) of the response are set to indicate the error(s) occurred during the execution.
*/
CompletableFuture<ConsumerGroupHeartbeatResponseData> consumerGroupHeartbeat(
RequestContext context,
AuthorizableRequestContext context,
ConsumerGroupHeartbeatRequestData request
);
@ -94,7 +94,7 @@ public interface GroupCoordinator {
* The error code(s) of the response are set to indicate the error(s) occurred during the execution.
*/
CompletableFuture<StreamsGroupHeartbeatResult> streamsGroupHeartbeat(
RequestContext context,
AuthorizableRequestContext context,
StreamsGroupHeartbeatRequestData request
);
@ -108,7 +108,7 @@ public interface GroupCoordinator {
* The error code(s) of the response are set to indicate the error(s) occurred during the execution.
*/
CompletableFuture<ShareGroupHeartbeatResponseData> shareGroupHeartbeat(
RequestContext context,
AuthorizableRequestContext context,
ShareGroupHeartbeatRequestData request
);
@ -123,7 +123,7 @@ public interface GroupCoordinator {
* The error code(s) of the response are set to indicate the error(s) occurred during the execution.
*/
CompletableFuture<JoinGroupResponseData> joinGroup(
RequestContext context,
AuthorizableRequestContext context,
JoinGroupRequestData request,
BufferSupplier bufferSupplier
);
@ -139,7 +139,7 @@ public interface GroupCoordinator {
* The error code(s) of the response are set to indicate the error(s) occurred during the execution.
*/
CompletableFuture<SyncGroupResponseData> syncGroup(
RequestContext context,
AuthorizableRequestContext context,
SyncGroupRequestData request,
BufferSupplier bufferSupplier
);
@ -154,7 +154,7 @@ public interface GroupCoordinator {
* The error code(s) of the response are set to indicate the error(s) occurred during the execution.
*/
CompletableFuture<HeartbeatResponseData> heartbeat(
RequestContext context,
AuthorizableRequestContext context,
HeartbeatRequestData request
);
@ -168,7 +168,7 @@ public interface GroupCoordinator {
* The error code(s) of the response are set to indicate the error(s) occurred during the execution.
*/
CompletableFuture<LeaveGroupResponseData> leaveGroup(
RequestContext context,
AuthorizableRequestContext context,
LeaveGroupRequestData request
);
@ -182,7 +182,7 @@ public interface GroupCoordinator {
* The error code(s) of the response are set to indicate the error(s) occurred during the execution.
*/
CompletableFuture<ListGroupsResponseData> listGroups(
RequestContext context,
AuthorizableRequestContext context,
ListGroupsRequestData request
);
@ -196,7 +196,7 @@ public interface GroupCoordinator {
* The error codes of the results are set to indicate the errors occurred during the execution.
*/
CompletableFuture<List<DescribeGroupsResponseData.DescribedGroup>> describeGroups(
RequestContext context,
AuthorizableRequestContext context,
List<String> groupIds
);
@ -209,7 +209,7 @@ public interface GroupCoordinator {
* @return A future yielding the results or an exception.
*/
CompletableFuture<List<ConsumerGroupDescribeResponseData.DescribedGroup>> consumerGroupDescribe(
RequestContext context,
AuthorizableRequestContext context,
List<String> groupIds
);
@ -222,7 +222,7 @@ public interface GroupCoordinator {
* @return A future yielding the results or an exception.
*/
CompletableFuture<List<StreamsGroupDescribeResponseData.DescribedGroup>> streamsGroupDescribe(
RequestContext context,
AuthorizableRequestContext context,
List<String> groupIds
);
@ -235,7 +235,7 @@ public interface GroupCoordinator {
* @return A future yielding the results or an exception.
*/
CompletableFuture<List<ShareGroupDescribeResponseData.DescribedGroup>> shareGroupDescribe(
RequestContext context,
AuthorizableRequestContext context,
List<String> groupIds
);
@ -250,7 +250,7 @@ public interface GroupCoordinator {
* The error codes of the results are set to indicate the errors occurred during the execution.
*/
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection> deleteGroups(
RequestContext context,
AuthorizableRequestContext context,
List<String> groupIds,
BufferSupplier bufferSupplier
);
@ -265,7 +265,7 @@ public interface GroupCoordinator {
* The error codes of the results are set to indicate the errors occurred during the execution.
*/
CompletableFuture<OffsetFetchResponseData.OffsetFetchResponseGroup> fetchOffsets(
RequestContext context,
AuthorizableRequestContext context,
OffsetFetchRequestData.OffsetFetchRequestGroup request,
boolean requireStable
);
@ -280,7 +280,7 @@ public interface GroupCoordinator {
* The error codes of the results are set to indicate the errors occurred during the execution.
*/
CompletableFuture<OffsetFetchResponseData.OffsetFetchResponseGroup> fetchAllOffsets(
RequestContext context,
AuthorizableRequestContext context,
OffsetFetchRequestData.OffsetFetchRequestGroup request,
boolean requireStable
);
@ -295,7 +295,7 @@ public interface GroupCoordinator {
* The error codes of the response are set to indicate the errors occurred during the execution.
*/
CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup> describeShareGroupOffsets(
RequestContext context,
AuthorizableRequestContext context,
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup request
);
@ -309,7 +309,7 @@ public interface GroupCoordinator {
* The error codes of the response are set to indicate the errors occurred during the execution.
*/
CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup> describeShareGroupAllOffsets(
RequestContext context,
AuthorizableRequestContext context,
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup request
);
@ -323,7 +323,7 @@ public interface GroupCoordinator {
* The error codes of the response are set to indicate the errors occurred during the execution.
*/
CompletableFuture<DeleteShareGroupOffsetsResponseData> deleteShareGroupOffsets(
RequestContext context,
AuthorizableRequestContext context,
DeleteShareGroupOffsetsRequestData request
);
@ -338,7 +338,7 @@ public interface GroupCoordinator {
* The error code(s) of the response are set to indicate the error(s) occurred during the execution.
*/
CompletableFuture<OffsetCommitResponseData> commitOffsets(
RequestContext context,
AuthorizableRequestContext context,
OffsetCommitRequestData request,
BufferSupplier bufferSupplier
);
@ -354,7 +354,7 @@ public interface GroupCoordinator {
* The error code(s) of the response are set to indicate the error(s) occurred during the execution.
*/
CompletableFuture<TxnOffsetCommitResponseData> commitTransactionalOffsets(
RequestContext context,
AuthorizableRequestContext context,
TxnOffsetCommitRequestData request,
BufferSupplier bufferSupplier
);
@ -370,7 +370,7 @@ public interface GroupCoordinator {
* The error code(s) of the response are set to indicate the error(s) occurred during the execution.
*/
CompletableFuture<OffsetDeleteResponseData> deleteOffsets(
RequestContext context,
AuthorizableRequestContext context,
OffsetDeleteRequestData request,
BufferSupplier bufferSupplier
);

View File

@ -66,7 +66,6 @@ import org.apache.kafka.common.requests.DeleteShareGroupOffsetsRequest;
import org.apache.kafka.common.requests.DescribeGroupsRequest;
import org.apache.kafka.common.requests.DescribeShareGroupOffsetsRequest;
import org.apache.kafka.common.requests.OffsetCommitRequest;
import org.apache.kafka.common.requests.RequestContext;
import org.apache.kafka.common.requests.ShareGroupDescribeRequest;
import org.apache.kafka.common.requests.StreamsGroupDescribeRequest;
import org.apache.kafka.common.requests.TransactionResult;
@ -88,6 +87,7 @@ import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics;
import org.apache.kafka.coordinator.group.streams.StreamsGroupHeartbeatResult;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
import org.apache.kafka.server.authorizer.Authorizer;
import org.apache.kafka.server.record.BrokerCompressionType;
import org.apache.kafka.server.share.persister.DeleteShareGroupStateParameters;
@ -364,11 +364,11 @@ public class GroupCoordinatorService implements GroupCoordinator {
}
/**
* See {@link GroupCoordinator#consumerGroupHeartbeat(RequestContext, ConsumerGroupHeartbeatRequestData)}.
* See {@link GroupCoordinator#consumerGroupHeartbeat(AuthorizableRequestContext, ConsumerGroupHeartbeatRequestData)}.
*/
@Override
public CompletableFuture<ConsumerGroupHeartbeatResponseData> consumerGroupHeartbeat(
RequestContext context,
AuthorizableRequestContext context,
ConsumerGroupHeartbeatRequestData request
) {
if (!isActive.get()) {
@ -395,11 +395,11 @@ public class GroupCoordinatorService implements GroupCoordinator {
/**
* See
* {@link GroupCoordinator#streamsGroupHeartbeat(RequestContext, org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData)}.
* {@link GroupCoordinator#streamsGroupHeartbeat(AuthorizableRequestContext, StreamsGroupHeartbeatRequestData)}.
*/
@Override
public CompletableFuture<StreamsGroupHeartbeatResult> streamsGroupHeartbeat(
RequestContext context,
AuthorizableRequestContext context,
StreamsGroupHeartbeatRequestData request
) {
if (!isActive.get()) {
@ -432,11 +432,11 @@ public class GroupCoordinatorService implements GroupCoordinator {
}
/**
* See {@link GroupCoordinator#shareGroupHeartbeat(RequestContext, ShareGroupHeartbeatRequestData)}.
* See {@link GroupCoordinator#shareGroupHeartbeat(AuthorizableRequestContext, ShareGroupHeartbeatRequestData)}.
*/
@Override
public CompletableFuture<ShareGroupHeartbeatResponseData> shareGroupHeartbeat(
RequestContext context,
AuthorizableRequestContext context,
ShareGroupHeartbeatRequestData request
) {
if (!isActive.get()) {
@ -592,11 +592,11 @@ public class GroupCoordinatorService implements GroupCoordinator {
}
/**
* See {@link GroupCoordinator#joinGroup(RequestContext, JoinGroupRequestData, BufferSupplier)}.
* See {@link GroupCoordinator#joinGroup(AuthorizableRequestContext, JoinGroupRequestData, BufferSupplier)}.
*/
@Override
public CompletableFuture<JoinGroupResponseData> joinGroup(
RequestContext context,
AuthorizableRequestContext context,
JoinGroupRequestData request,
BufferSupplier bufferSupplier
) {
@ -646,11 +646,11 @@ public class GroupCoordinatorService implements GroupCoordinator {
}
/**
* See {@link GroupCoordinator#syncGroup(RequestContext, SyncGroupRequestData, BufferSupplier)}.
* See {@link GroupCoordinator#syncGroup(AuthorizableRequestContext, SyncGroupRequestData, BufferSupplier)}.
*/
@Override
public CompletableFuture<SyncGroupResponseData> syncGroup(
RequestContext context,
AuthorizableRequestContext context,
SyncGroupRequestData request,
BufferSupplier bufferSupplier
) {
@ -690,11 +690,11 @@ public class GroupCoordinatorService implements GroupCoordinator {
}
/**
* See {@link GroupCoordinator#heartbeat(RequestContext, HeartbeatRequestData)}.
* See {@link GroupCoordinator#heartbeat(AuthorizableRequestContext, HeartbeatRequestData)}.
*/
@Override
public CompletableFuture<HeartbeatResponseData> heartbeat(
RequestContext context,
AuthorizableRequestContext context,
HeartbeatRequestData request
) {
if (!isActive.get()) {
@ -733,11 +733,11 @@ public class GroupCoordinatorService implements GroupCoordinator {
}
/**
* See {@link GroupCoordinator#leaveGroup(RequestContext, LeaveGroupRequestData)}.
* See {@link GroupCoordinator#leaveGroup(AuthorizableRequestContext, LeaveGroupRequestData)}.
*/
@Override
public CompletableFuture<LeaveGroupResponseData> leaveGroup(
RequestContext context,
AuthorizableRequestContext context,
LeaveGroupRequestData request
) {
if (!isActive.get()) {
@ -782,11 +782,11 @@ public class GroupCoordinatorService implements GroupCoordinator {
}
/**
* See {@link GroupCoordinator#listGroups(RequestContext, ListGroupsRequestData)}.
* See {@link GroupCoordinator#listGroups(AuthorizableRequestContext, ListGroupsRequestData)}.
*/
@Override
public CompletableFuture<ListGroupsResponseData> listGroups(
RequestContext context,
AuthorizableRequestContext context,
ListGroupsRequestData request
) {
if (!isActive.get()) {
@ -827,11 +827,11 @@ public class GroupCoordinatorService implements GroupCoordinator {
}
/**
* See {@link GroupCoordinator#consumerGroupDescribe(RequestContext, List)}.
* See {@link GroupCoordinator#consumerGroupDescribe(AuthorizableRequestContext, List)}.
*/
@Override
public CompletableFuture<List<ConsumerGroupDescribeResponseData.DescribedGroup>> consumerGroupDescribe(
RequestContext context,
AuthorizableRequestContext context,
List<String> groupIds
) {
if (!isActive.get()) {
@ -879,11 +879,11 @@ public class GroupCoordinatorService implements GroupCoordinator {
}
/**
* See {@link GroupCoordinator#streamsGroupDescribe(RequestContext, List)}.
* See {@link GroupCoordinator#streamsGroupDescribe(AuthorizableRequestContext, List)}.
*/
@Override
public CompletableFuture<List<StreamsGroupDescribeResponseData.DescribedGroup>> streamsGroupDescribe(
RequestContext context,
AuthorizableRequestContext context,
List<String> groupIds
) {
if (!isActive.get()) {
@ -931,11 +931,11 @@ public class GroupCoordinatorService implements GroupCoordinator {
}
/**
* See {@link GroupCoordinator#shareGroupDescribe(RequestContext, List)}.
* See {@link GroupCoordinator#shareGroupDescribe(AuthorizableRequestContext, List)}.
*/
@Override
public CompletableFuture<List<DescribedGroup>> shareGroupDescribe(
RequestContext context,
AuthorizableRequestContext context,
List<String> groupIds
) {
if (!isActive.get()) {
@ -983,11 +983,11 @@ public class GroupCoordinatorService implements GroupCoordinator {
}
/**
* See {@link GroupCoordinator#describeGroups(RequestContext, List)}.
* See {@link GroupCoordinator#describeGroups(AuthorizableRequestContext, List)}.
*/
@Override
public CompletableFuture<List<DescribeGroupsResponseData.DescribedGroup>> describeGroups(
RequestContext context,
AuthorizableRequestContext context,
List<String> groupIds
) {
if (!isActive.get()) {
@ -1037,11 +1037,11 @@ public class GroupCoordinatorService implements GroupCoordinator {
}
/**
* See {@link GroupCoordinator#deleteGroups(RequestContext, List, BufferSupplier)}.
* See {@link GroupCoordinator#deleteGroups(AuthorizableRequestContext, List, BufferSupplier)}.
*/
@Override
public CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection> deleteGroups(
RequestContext context,
AuthorizableRequestContext context,
List<String> groupIds,
BufferSupplier bufferSupplier
) {
@ -1141,7 +1141,7 @@ public class GroupCoordinatorService implements GroupCoordinator {
}
private CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection> handleDeleteGroups(
RequestContext context,
AuthorizableRequestContext context,
TopicPartition topicPartition,
List<String> groupIds
) {
@ -1296,11 +1296,11 @@ public class GroupCoordinatorService implements GroupCoordinator {
}
/**
* See {@link GroupCoordinator#fetchOffsets(RequestContext, OffsetFetchRequestData.OffsetFetchRequestGroup, boolean)}.
* See {@link GroupCoordinator#fetchOffsets(AuthorizableRequestContext, OffsetFetchRequestData.OffsetFetchRequestGroup, boolean)}.
*/
@Override
public CompletableFuture<OffsetFetchResponseData.OffsetFetchResponseGroup> fetchOffsets(
RequestContext context,
AuthorizableRequestContext context,
OffsetFetchRequestData.OffsetFetchRequestGroup request,
boolean requireStable
) {
@ -1351,11 +1351,11 @@ public class GroupCoordinatorService implements GroupCoordinator {
}
/**
* See {@link GroupCoordinator#fetchAllOffsets(RequestContext, OffsetFetchRequestData.OffsetFetchRequestGroup, boolean)}.
* See {@link GroupCoordinator#fetchAllOffsets(AuthorizableRequestContext, OffsetFetchRequestData.OffsetFetchRequestGroup, boolean)}.
*/
@Override
public CompletableFuture<OffsetFetchResponseData.OffsetFetchResponseGroup> fetchAllOffsets(
RequestContext context,
AuthorizableRequestContext context,
OffsetFetchRequestData.OffsetFetchRequestGroup request,
boolean requireStable
) {
@ -1406,11 +1406,11 @@ public class GroupCoordinatorService implements GroupCoordinator {
}
/**
* See {@link GroupCoordinator#describeShareGroupOffsets(RequestContext, DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup)}.
* See {@link GroupCoordinator#describeShareGroupOffsets(AuthorizableRequestContext, DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup)}.
*/
@Override
public CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup> describeShareGroupOffsets(
RequestContext context,
AuthorizableRequestContext context,
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup requestData
) {
if (!isActive.get()) {
@ -1468,11 +1468,11 @@ public class GroupCoordinatorService implements GroupCoordinator {
}
/**
* See {@link GroupCoordinator#describeShareGroupAllOffsets(RequestContext, DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup)}.
* See {@link GroupCoordinator#describeShareGroupAllOffsets(AuthorizableRequestContext, DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup)}.
*/
@Override
public CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup> describeShareGroupAllOffsets(
RequestContext context,
AuthorizableRequestContext context,
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup requestData
) {
if (!isActive.get()) {
@ -1561,11 +1561,11 @@ public class GroupCoordinatorService implements GroupCoordinator {
}
/**
* See {@link GroupCoordinator#deleteShareGroupOffsets(RequestContext, DeleteShareGroupOffsetsRequestData)}.
* See {@link GroupCoordinator#deleteShareGroupOffsets(AuthorizableRequestContext, DeleteShareGroupOffsetsRequestData)}.
*/
@Override
public CompletableFuture<DeleteShareGroupOffsetsResponseData> deleteShareGroupOffsets(
RequestContext context,
AuthorizableRequestContext context,
DeleteShareGroupOffsetsRequestData requestData
) {
if (!isActive.get()) {
@ -1635,11 +1635,11 @@ public class GroupCoordinatorService implements GroupCoordinator {
}
/**
* See {@link GroupCoordinator#commitOffsets(RequestContext, OffsetCommitRequestData, BufferSupplier)}.
* See {@link GroupCoordinator#commitOffsets(AuthorizableRequestContext, OffsetCommitRequestData, BufferSupplier)}.
*/
@Override
public CompletableFuture<OffsetCommitResponseData> commitOffsets(
RequestContext context,
AuthorizableRequestContext context,
OffsetCommitRequestData request,
BufferSupplier bufferSupplier
) {
@ -1673,11 +1673,11 @@ public class GroupCoordinatorService implements GroupCoordinator {
}
/**
* See {@link GroupCoordinator#commitTransactionalOffsets(RequestContext, TxnOffsetCommitRequestData, BufferSupplier)}.
* See {@link GroupCoordinator#commitTransactionalOffsets(AuthorizableRequestContext, TxnOffsetCommitRequestData, BufferSupplier)}.
*/
@Override
public CompletableFuture<TxnOffsetCommitResponseData> commitTransactionalOffsets(
RequestContext context,
AuthorizableRequestContext context,
TxnOffsetCommitRequestData request,
BufferSupplier bufferSupplier
) {
@ -1703,7 +1703,7 @@ public class GroupCoordinatorService implements GroupCoordinator {
request.producerEpoch(),
Duration.ofMillis(config.offsetCommitTimeoutMs()),
coordinator -> coordinator.commitTransactionalOffset(context, request),
context.apiVersion()
context.requestVersion()
).exceptionally(exception -> handleOperationException(
"txn-commit-offset",
request,
@ -1714,11 +1714,11 @@ public class GroupCoordinatorService implements GroupCoordinator {
}
/**
* See {@link GroupCoordinator#deleteOffsets(RequestContext, OffsetDeleteRequestData, BufferSupplier)}.
* See {@link GroupCoordinator#deleteOffsets(AuthorizableRequestContext, OffsetDeleteRequestData, BufferSupplier)}.
*/
@Override
public CompletableFuture<OffsetDeleteResponseData> deleteOffsets(
RequestContext context,
AuthorizableRequestContext context,
OffsetDeleteRequestData request,
BufferSupplier bufferSupplier
) {

View File

@ -55,7 +55,6 @@ import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
import org.apache.kafka.common.message.TxnOffsetCommitResponseData;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.RequestContext;
import org.apache.kafka.common.requests.TransactionResult;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
@ -122,6 +121,7 @@ import org.apache.kafka.coordinator.group.modern.share.ShareGroup;
import org.apache.kafka.coordinator.group.streams.StreamsGroupHeartbeatResult;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
import org.apache.kafka.server.authorizer.Authorizer;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.share.persister.DeleteShareGroupStateParameters;
@ -456,7 +456,7 @@ public class GroupCoordinatorShard implements CoordinatorShard<CoordinatorRecord
* a list of records to update the state machine.
*/
public CoordinatorResult<ConsumerGroupHeartbeatResponseData, CoordinatorRecord> consumerGroupHeartbeat(
RequestContext context,
AuthorizableRequestContext context,
ConsumerGroupHeartbeatRequestData request
) {
return groupMetadataManager.consumerGroupHeartbeat(context, request);
@ -472,7 +472,7 @@ public class GroupCoordinatorShard implements CoordinatorShard<CoordinatorRecord
* a list of records to update the state machine.
*/
public CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> streamsGroupHeartbeat(
RequestContext context,
AuthorizableRequestContext context,
StreamsGroupHeartbeatRequestData request
) {
return groupMetadataManager.streamsGroupHeartbeat(context, request);
@ -488,7 +488,7 @@ public class GroupCoordinatorShard implements CoordinatorShard<CoordinatorRecord
* and a list of records to update the state machine.
*/
public CoordinatorResult<Map.Entry<ShareGroupHeartbeatResponseData, Optional<InitializeShareGroupStateParameters>>, CoordinatorRecord> shareGroupHeartbeat(
RequestContext context,
AuthorizableRequestContext context,
ShareGroupHeartbeatRequestData request
) {
return groupMetadataManager.shareGroupHeartbeat(context, request);
@ -557,7 +557,7 @@ public class GroupCoordinatorShard implements CoordinatorShard<CoordinatorRecord
* a list of records to update the state machine.
*/
public CoordinatorResult<Void, CoordinatorRecord> classicGroupJoin(
RequestContext context,
AuthorizableRequestContext context,
JoinGroupRequestData request,
CompletableFuture<JoinGroupResponseData> responseFuture
) {
@ -578,7 +578,7 @@ public class GroupCoordinatorShard implements CoordinatorShard<CoordinatorRecord
* a list of records to update the state machine.
*/
public CoordinatorResult<Void, CoordinatorRecord> classicGroupSync(
RequestContext context,
AuthorizableRequestContext context,
SyncGroupRequestData request,
CompletableFuture<SyncGroupResponseData> responseFuture
) {
@ -599,7 +599,7 @@ public class GroupCoordinatorShard implements CoordinatorShard<CoordinatorRecord
* a list of records to update the state machine.
*/
public CoordinatorResult<HeartbeatResponseData, CoordinatorRecord> classicGroupHeartbeat(
RequestContext context,
AuthorizableRequestContext context,
HeartbeatRequestData request
) {
return groupMetadataManager.classicGroupHeartbeat(
@ -617,7 +617,7 @@ public class GroupCoordinatorShard implements CoordinatorShard<CoordinatorRecord
* a list of records to update the state machine.
*/
public CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection, CoordinatorRecord> deleteGroups(
RequestContext context,
AuthorizableRequestContext context,
List<String> groupIds
) throws ApiException {
final DeleteGroupsResponseData.DeletableGroupResultCollection resultCollection =
@ -774,7 +774,7 @@ public class GroupCoordinatorShard implements CoordinatorShard<CoordinatorRecord
* a list of records to update the state machine.
*/
public CoordinatorResult<OffsetCommitResponseData, CoordinatorRecord> commitOffset(
RequestContext context,
AuthorizableRequestContext context,
OffsetCommitRequestData request
) throws ApiException {
return offsetMetadataManager.commitOffset(context, request);
@ -790,7 +790,7 @@ public class GroupCoordinatorShard implements CoordinatorShard<CoordinatorRecord
* a list of records to update the state machine.
*/
public CoordinatorResult<TxnOffsetCommitResponseData, CoordinatorRecord> commitTransactionalOffset(
RequestContext context,
AuthorizableRequestContext context,
TxnOffsetCommitRequestData request
) throws ApiException {
return offsetMetadataManager.commitTransactionalOffset(context, request);
@ -875,7 +875,7 @@ public class GroupCoordinatorShard implements CoordinatorShard<CoordinatorRecord
* @return A list containing the DescribeGroupsResponseData.DescribedGroup.
*/
public List<DescribeGroupsResponseData.DescribedGroup> describeGroups(
RequestContext context,
AuthorizableRequestContext context,
List<String> groupIds,
long committedOffset
) {
@ -892,7 +892,7 @@ public class GroupCoordinatorShard implements CoordinatorShard<CoordinatorRecord
* a list of records to update the state machine.
*/
public CoordinatorResult<LeaveGroupResponseData, CoordinatorRecord> classicGroupLeave(
RequestContext context,
AuthorizableRequestContext context,
LeaveGroupRequestData request
) throws ApiException {
return groupMetadataManager.classicGroupLeave(context, request);
@ -908,7 +908,7 @@ public class GroupCoordinatorShard implements CoordinatorShard<CoordinatorRecord
* a list of records to update the state machine.
*/
public CoordinatorResult<OffsetDeleteResponseData, CoordinatorRecord> deleteOffsets(
RequestContext context,
AuthorizableRequestContext context,
OffsetDeleteRequestData request
) throws ApiException {
return offsetMetadataManager.deleteOffsets(request);

View File

@ -73,7 +73,6 @@ import org.apache.kafka.common.message.SyncGroupResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.SchemaException;
import org.apache.kafka.common.requests.JoinGroupRequest;
import org.apache.kafka.common.requests.RequestContext;
import org.apache.kafka.common.requests.ShareGroupHeartbeatRequest;
import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse;
import org.apache.kafka.common.resource.ResourcePattern;
@ -167,6 +166,7 @@ import org.apache.kafka.image.TopicImage;
import org.apache.kafka.image.TopicsDelta;
import org.apache.kafka.image.TopicsImage;
import org.apache.kafka.server.authorizer.Action;
import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
import org.apache.kafka.server.authorizer.AuthorizationResult;
import org.apache.kafka.server.authorizer.Authorizer;
import org.apache.kafka.server.share.persister.DeleteShareGroupStateParameters;
@ -743,7 +743,7 @@ public class GroupMetadataManager {
* @return A list containing the DescribeGroupsResponseData.DescribedGroup.
*/
public List<DescribeGroupsResponseData.DescribedGroup> describeGroups(
RequestContext context,
AuthorizableRequestContext context,
List<String> groupIds,
long committedOffset
) {
@ -779,7 +779,7 @@ public class GroupMetadataManager {
);
}
} catch (GroupIdNotFoundException exception) {
if (context.header.apiVersion() >= 6) {
if (context.requestVersion() >= 6) {
describedGroups.add(new DescribeGroupsResponseData.DescribedGroup()
.setGroupId(groupId)
.setGroupState(DEAD.toString())
@ -1502,7 +1502,7 @@ public class GroupMetadataManager {
*/
private void throwIfConsumerGroupHeartbeatRequestIsInvalid(
ConsumerGroupHeartbeatRequestData request,
short apiVersion
int apiVersion
) throws InvalidRequestException, UnsupportedAssignorException {
if (apiVersion >= CONSUMER_GENERATED_MEMBER_ID_REQUIRED_VERSION ||
request.memberEpoch() > 0 ||
@ -2369,7 +2369,7 @@ public class GroupMetadataManager {
* a list of records to update the state machine.
*/
private CoordinatorResult<ConsumerGroupHeartbeatResponseData, CoordinatorRecord> consumerGroupHeartbeat(
RequestContext context,
AuthorizableRequestContext context,
String groupId,
String memberId,
int memberEpoch,
@ -2427,7 +2427,7 @@ public class GroupMetadataManager {
.maybeUpdateSubscribedTopicNames(Optional.ofNullable(subscribedTopicNames))
.maybeUpdateSubscribedTopicRegex(Optional.ofNullable(subscribedTopicRegex))
.setClientId(context.clientId())
.setClientHost(context.clientAddress.toString())
.setClientHost(context.clientAddress().toString())
.setClassicMemberMetadata(null)
.build();
@ -2538,7 +2538,7 @@ public class GroupMetadataManager {
*/
private CoordinatorResult<Void, CoordinatorRecord> classicGroupJoinToConsumerGroup(
ConsumerGroup group,
RequestContext context,
AuthorizableRequestContext context,
JoinGroupRequestData request,
CompletableFuture<JoinGroupResponseData> responseFuture
) throws ApiException {
@ -2556,7 +2556,7 @@ public class GroupMetadataManager {
throwIfConsumerGroupIsFull(group, memberId);
throwIfClassicProtocolIsNotSupported(group, memberId, request.protocolType(), protocols);
if (JoinGroupRequest.requiresKnownMemberId(request, context.apiVersion())) {
if (JoinGroupRequest.requiresKnownMemberId(request, context.requestVersion())) {
// A dynamic member requiring a member id joins the group. Send back a response to call for another
// join group request with allocated member id.
responseFuture.complete(new JoinGroupResponseData()
@ -2609,7 +2609,7 @@ public class GroupMetadataManager {
.maybeUpdateServerAssignorName(Optional.empty())
.maybeUpdateSubscribedTopicNames(Optional.ofNullable(subscription.topics()))
.setClientId(context.clientId())
.setClientHost(context.clientAddress.toString())
.setClientHost(context.clientAddress().toString())
.setClassicMemberMetadata(
new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
.setSessionTimeoutMs(sessionTimeoutMs)
@ -3228,7 +3228,7 @@ public class GroupMetadataManager {
* @return Whether a rebalance must be triggered.
*/
private boolean maybeUpdateRegularExpressions(
RequestContext context,
AuthorizableRequestContext context,
ConsumerGroup group,
ConsumerGroupMember member,
ConsumerGroupMember updatedMember,
@ -3339,7 +3339,7 @@ public class GroupMetadataManager {
* public for benchmarks.
*/
public static Map<String, ResolvedRegularExpression> refreshRegularExpressions(
RequestContext context,
AuthorizableRequestContext context,
String groupId,
Logger log,
Time time,
@ -3404,7 +3404,7 @@ public class GroupMetadataManager {
* @param resolvedRegexes The map of the regex pattern and its set of matched topics.
*/
private static void filterTopicDescribeAuthorizedTopics(
RequestContext context,
AuthorizableRequestContext context,
Optional<Plugin<Authorizer>> authorizerPlugin,
Map<String, Set<String>> resolvedRegexes
) {
@ -4792,10 +4792,10 @@ public class GroupMetadataManager {
* a list of records to update the state machine.
*/
public CoordinatorResult<ConsumerGroupHeartbeatResponseData, CoordinatorRecord> consumerGroupHeartbeat(
RequestContext context,
AuthorizableRequestContext context,
ConsumerGroupHeartbeatRequestData request
) throws ApiException {
throwIfConsumerGroupHeartbeatRequestIsInvalid(request, context.apiVersion());
throwIfConsumerGroupHeartbeatRequestIsInvalid(request, context.requestVersion());
if (request.memberEpoch() == LEAVE_GROUP_MEMBER_EPOCH || request.memberEpoch() == LEAVE_GROUP_STATIC_MEMBER_EPOCH) {
// -1 means that the member wants to leave the group.
@ -4834,7 +4834,7 @@ public class GroupMetadataManager {
* a list of records to update the state machine.
*/
public CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> streamsGroupHeartbeat(
RequestContext context,
AuthorizableRequestContext context,
StreamsGroupHeartbeatRequestData request
) throws ApiException {
throwIfStreamsGroupHeartbeatRequestIsInvalid(request);
@ -4857,7 +4857,7 @@ public class GroupMetadataManager {
request.rackId(),
request.rebalanceTimeoutMs(),
context.clientId(),
context.clientAddress.toString(),
context.clientAddress().toString(),
request.topology(),
request.activeTasks(),
request.standbyTasks(),
@ -4917,7 +4917,7 @@ public class GroupMetadataManager {
* and a list of records to update the state machine.
*/
public CoordinatorResult<Map.Entry<ShareGroupHeartbeatResponseData, Optional<InitializeShareGroupStateParameters>>, CoordinatorRecord> shareGroupHeartbeat(
RequestContext context,
AuthorizableRequestContext context,
ShareGroupHeartbeatRequestData request
) throws ApiException {
throwIfShareGroupHeartbeatRequestIsInvalid(request);
@ -4941,7 +4941,7 @@ public class GroupMetadataManager {
request.memberEpoch(),
request.rackId(),
context.clientId(),
context.clientAddress.toString(),
context.clientAddress().toString(),
request.subscribedTopicNames());
}
@ -6209,7 +6209,7 @@ public class GroupMetadataManager {
* @return The result that contains records to append if the join group phase completes.
*/
public CoordinatorResult<Void, CoordinatorRecord> classicGroupJoin(
RequestContext context,
AuthorizableRequestContext context,
JoinGroupRequestData request,
CompletableFuture<JoinGroupResponseData> responseFuture
) {
@ -6244,7 +6244,7 @@ public class GroupMetadataManager {
* @return The result that contains records to append if the join group phase completes.
*/
CoordinatorResult<Void, CoordinatorRecord> classicGroupJoinToClassicGroup(
RequestContext context,
AuthorizableRequestContext context,
JoinGroupRequestData request,
CompletableFuture<JoinGroupResponseData> responseFuture
) {
@ -6360,7 +6360,7 @@ public class GroupMetadataManager {
* @return The coordinator result that will be appended to the log.
*/
private CoordinatorResult<Void, CoordinatorRecord> classicGroupJoinNewMember(
RequestContext context,
AuthorizableRequestContext context,
JoinGroupRequestData request,
ClassicGroup group,
CompletableFuture<JoinGroupResponseData> responseFuture
@ -6418,7 +6418,7 @@ public class GroupMetadataManager {
* @return The coordinator result that will be appended to the log.
*/
private CoordinatorResult<Void, CoordinatorRecord> classicGroupJoinNewStaticMember(
RequestContext context,
AuthorizableRequestContext context,
JoinGroupRequestData request,
ClassicGroup group,
String newMemberId,
@ -6464,13 +6464,13 @@ public class GroupMetadataManager {
* @return The coordinator result that will be appended to the log.
*/
private CoordinatorResult<Void, CoordinatorRecord> classicGroupJoinNewDynamicMember(
RequestContext context,
AuthorizableRequestContext context,
JoinGroupRequestData request,
ClassicGroup group,
String newMemberId,
CompletableFuture<JoinGroupResponseData> responseFuture
) {
if (JoinGroupRequest.requiresKnownMemberId(context.apiVersion())) {
if (JoinGroupRequest.requiresKnownMemberId(context.requestVersion())) {
// If member id required, register the member in the pending member list and send
// back a response to call for another join group request with allocated member id.
log.info("Dynamic member with unknown member id joins group {} in {} state. " +
@ -6514,7 +6514,7 @@ public class GroupMetadataManager {
* @return The coordinator result that will be appended to the log.
*/
private CoordinatorResult<Void, CoordinatorRecord> classicGroupJoinExistingMember(
RequestContext context,
AuthorizableRequestContext context,
JoinGroupRequestData request,
ClassicGroup group,
CompletableFuture<JoinGroupResponseData> responseFuture
@ -6937,7 +6937,7 @@ public class GroupMetadataManager {
* @return The coordinator result that will be appended to the log.
*/
private CoordinatorResult<Void, CoordinatorRecord> addMemberThenRebalanceOrCompleteJoin(
RequestContext context,
AuthorizableRequestContext context,
JoinGroupRequestData request,
ClassicGroup group,
String memberId,
@ -7318,7 +7318,7 @@ public class GroupMetadataManager {
* @return The coordinator result that will be appended to the log.
*/
private CoordinatorResult<Void, CoordinatorRecord> updateStaticMemberThenRebalanceOrCompleteJoin(
RequestContext context,
AuthorizableRequestContext context,
JoinGroupRequestData request,
ClassicGroup group,
String oldMemberId,
@ -7377,7 +7377,7 @@ public class GroupMetadataManager {
.setSkipAssignment(false)
.setErrorCode(appendGroupMetadataErrorToResponseError(Errors.forException(t)).code()));
} else if (JoinGroupRequest.supportsSkippingAssignment(context.apiVersion())) {
} else if (JoinGroupRequest.supportsSkippingAssignment(context.requestVersion())) {
boolean isLeader = group.isLeader(newMemberId);
group.completeJoinFuture(newMember, new JoinGroupResponseData()
@ -7442,7 +7442,7 @@ public class GroupMetadataManager {
* @return The result that contains records to append.
*/
public CoordinatorResult<Void, CoordinatorRecord> classicGroupSync(
RequestContext context,
AuthorizableRequestContext context,
SyncGroupRequestData request,
CompletableFuture<SyncGroupResponseData> responseFuture
) throws UnknownMemberIdException {
@ -7484,7 +7484,7 @@ public class GroupMetadataManager {
*/
private CoordinatorResult<Void, CoordinatorRecord> classicGroupSyncToClassicGroup(
ClassicGroup group,
RequestContext context,
AuthorizableRequestContext context,
SyncGroupRequestData request,
CompletableFuture<SyncGroupResponseData> responseFuture
) throws IllegalStateException {
@ -7582,7 +7582,7 @@ public class GroupMetadataManager {
*/
private CoordinatorResult<Void, CoordinatorRecord> classicGroupSyncToConsumerGroup(
ConsumerGroup group,
RequestContext context,
AuthorizableRequestContext context,
SyncGroupRequestData request,
CompletableFuture<SyncGroupResponseData> responseFuture
) throws UnknownMemberIdException, FencedInstanceIdException, IllegalGenerationException,
@ -7721,7 +7721,7 @@ public class GroupMetadataManager {
* @return The coordinator result that contains the heartbeat response.
*/
public CoordinatorResult<HeartbeatResponseData, CoordinatorRecord> classicGroupHeartbeat(
RequestContext context,
AuthorizableRequestContext context,
HeartbeatRequestData request
) {
Group group;
@ -7755,7 +7755,7 @@ public class GroupMetadataManager {
*/
private CoordinatorResult<HeartbeatResponseData, CoordinatorRecord> classicGroupHeartbeatToClassicGroup(
ClassicGroup group,
RequestContext context,
AuthorizableRequestContext context,
HeartbeatRequestData request
) {
validateClassicGroupHeartbeat(group, request.memberId(), request.groupInstanceId(), request.generationId());
@ -7838,7 +7838,7 @@ public class GroupMetadataManager {
*/
private CoordinatorResult<HeartbeatResponseData, CoordinatorRecord> classicGroupHeartbeatToConsumerGroup(
ConsumerGroup group,
RequestContext context,
AuthorizableRequestContext context,
HeartbeatRequestData request
) throws UnknownMemberIdException, FencedInstanceIdException, IllegalGenerationException {
String groupId = request.groupId();
@ -7908,7 +7908,7 @@ public class GroupMetadataManager {
* @return The LeaveGroup response and the records to append.
*/
public CoordinatorResult<LeaveGroupResponseData, CoordinatorRecord> classicGroupLeave(
RequestContext context,
AuthorizableRequestContext context,
LeaveGroupRequestData request
) throws UnknownMemberIdException {
Group group;
@ -8135,7 +8135,7 @@ public class GroupMetadataManager {
/**
* Handles a DeleteGroups request.
* Populates the record list passed in with record to update the state machine.
* Validations are done in {@link GroupCoordinatorShard#deleteGroups(RequestContext, List)} by
* Validations are done in {@link GroupCoordinatorShard#deleteGroups(AuthorizableRequestContext, List)} by
* calling {@link GroupMetadataManager#validateDeleteGroup(String)}.
*
* @param groupId The id of the group to be deleted. It has been checked in {@link GroupMetadataManager#validateDeleteGroup}.

View File

@ -35,7 +35,6 @@ import org.apache.kafka.common.message.TxnOffsetCommitResponseData.TxnOffsetComm
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.requests.OffsetCommitRequest;
import org.apache.kafka.common.requests.RequestContext;
import org.apache.kafka.common.requests.TransactionResult;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
@ -48,6 +47,7 @@ import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics;
import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.apache.kafka.timeline.TimelineHashMap;
import org.apache.kafka.timeline.TimelineHashSet;
@ -290,7 +290,7 @@ public class OffsetMetadataManager {
* @param request The actual request.
*/
private Group validateOffsetCommit(
RequestContext context,
AuthorizableRequestContext context,
OffsetCommitRequestData request
) throws ApiException {
Group group;
@ -305,7 +305,7 @@ public class OffsetMetadataManager {
log.info("[GroupId {}] Creating a simple consumer group via manual offset commit.", request.groupId());
group = groupMetadataManager.getOrMaybeCreateClassicGroup(request.groupId(), true);
} else {
if (context.header.apiVersion() >= 9) {
if (context.requestVersion() >= 9) {
// Starting from version 9 of the OffsetCommit API, we return GROUP_ID_NOT_FOUND
// if the group does not exist. This error works for both the old and the new
// protocol for clients using this version of the API.
@ -323,7 +323,7 @@ public class OffsetMetadataManager {
request.groupInstanceId(),
request.generationIdOrMemberEpoch(),
false,
context.apiVersion()
context.requestVersion()
);
return group;
@ -336,7 +336,7 @@ public class OffsetMetadataManager {
* @param request The actual request.
*/
private Group validateTransactionalOffsetCommit(
RequestContext context,
AuthorizableRequestContext context,
TxnOffsetCommitRequestData request
) throws ApiException {
Group group;
@ -360,7 +360,7 @@ public class OffsetMetadataManager {
request.groupInstanceId(),
request.generationId(),
true,
context.apiVersion()
context.requestVersion()
);
} catch (StaleMemberEpochException ex) {
throw Errors.ILLEGAL_GENERATION.exception();
@ -438,7 +438,7 @@ public class OffsetMetadataManager {
* a list of records to update the state machine.
*/
public CoordinatorResult<OffsetCommitResponseData, CoordinatorRecord> commitOffset(
RequestContext context,
AuthorizableRequestContext context,
OffsetCommitRequestData request
) throws ApiException {
Group group = validateOffsetCommit(context, request);
@ -511,7 +511,7 @@ public class OffsetMetadataManager {
* a list of records to update the state machine.
*/
public CoordinatorResult<TxnOffsetCommitResponseData, CoordinatorRecord> commitTransactionalOffset(
RequestContext context,
AuthorizableRequestContext context,
TxnOffsetCommitRequestData request
) throws ApiException {
validateTransactionalOffsetCommit(context, request);
@ -623,7 +623,7 @@ public class OffsetMetadataManager {
/**
* Deletes offsets as part of a DeleteGroups request.
* Populates the record list passed in with records to update the state machine.
* Validations are done in {@link GroupCoordinatorShard#deleteGroups(RequestContext, List)}
* Validations are done in {@link GroupCoordinatorShard#deleteGroups(AuthorizableRequestContext, List)}
*
* @param groupId The id of the given group.
* @param records The record list to populate.

View File

@ -831,7 +831,7 @@ public class ClassicGroup implements Group {
String groupInstanceId,
int generationId,
boolean isTransactional,
short apiVersion
int apiVersion
) throws CoordinatorNotAvailableException, UnknownMemberIdException, IllegalGenerationException, FencedInstanceIdException {
if (isInState(DEAD)) {
throw Errors.COORDINATOR_NOT_AVAILABLE.exception();

View File

@ -644,7 +644,7 @@ public class ConsumerGroup extends ModernGroup<ConsumerGroupMember> {
String groupInstanceId,
int memberEpoch,
boolean isTransactional,
short apiVersion
int apiVersion
) throws UnknownMemberIdException, StaleMemberEpochException, IllegalGenerationException {
// When the member epoch is -1, the request comes from either the admin client
// or a consumer which does not use the group management facility. In this case,

View File

@ -186,7 +186,7 @@ public class ShareGroup extends ModernGroup<ShareGroupMember> {
String groupInstanceId,
int memberEpoch,
boolean isTransactional,
short apiVersion
int apiVersion
) {
throw new GroupIdNotFoundException(String.format("Group %s is not a consumer group.", groupId));
}

View File

@ -683,7 +683,7 @@ public class StreamsGroup implements Group {
String groupInstanceId,
int memberEpoch,
boolean isTransactional,
short apiVersion
int apiVersion
) throws UnknownMemberIdException, StaleMemberEpochException {
// When the member epoch is -1, the request comes from either the admin client
// or a consumer which does not use the group management facility. In this case,

View File

@ -2396,7 +2396,7 @@ public class GroupCoordinatorServiceTest {
@ParameterizedTest
@ValueSource(shorts = {4, 5})
public void testCommitTransactionalOffsets(Short txnOffsetCommitVersion) throws ExecutionException, InterruptedException {
public void testCommitTransactionalOffsets(short txnOffsetCommitVersion) throws ExecutionException, InterruptedException {
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
.setConfig(createConfig())
@ -2431,7 +2431,7 @@ public class GroupCoordinatorServiceTest {
ArgumentMatchers.eq((short) 5),
ArgumentMatchers.eq(Duration.ofMillis(5000)),
ArgumentMatchers.any(),
ArgumentMatchers.any()
ArgumentMatchers.eq((int) txnOffsetCommitVersion)
)).thenReturn(CompletableFuture.completedFuture(response));
CompletableFuture<TxnOffsetCommitResponseData> future = service.commitTransactionalOffsets(
@ -2486,7 +2486,7 @@ public class GroupCoordinatorServiceTest {
ArgumentMatchers.eq((short) 5),
ArgumentMatchers.eq(Duration.ofMillis(5000)),
ArgumentMatchers.any(),
ArgumentMatchers.any()
ArgumentMatchers.eq((int) ApiKeys.TXN_OFFSET_COMMIT.latestVersion())
)).thenReturn(FutureUtils.failedFuture(new CompletionException(error.exception())));
CompletableFuture<TxnOffsetCommitResponseData> future = service.commitTransactionalOffsets(