diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java index ed3625068aa..9d75d383bab 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java @@ -96,7 +96,7 @@ public class JoinGroupRequest extends AbstractRequest { * * @return whether a known member id is required or not. */ - public static boolean requiresKnownMemberId(short apiVersion) { + public static boolean requiresKnownMemberId(int apiVersion) { return apiVersion >= 4; } @@ -117,7 +117,7 @@ public class JoinGroupRequest extends AbstractRequest { */ public static boolean requiresKnownMemberId( JoinGroupRequestData request, - short apiVersion + int apiVersion ) { return request.groupInstanceId() == null && request.memberId().equals(UNKNOWN_MEMBER_ID) @@ -150,7 +150,7 @@ public class JoinGroupRequest extends AbstractRequest { * @return whether the version supports skipping assignment. */ - public static boolean supportsSkippingAssignment(short apiVersion) { + public static boolean supportsSkippingAssignment(int apiVersion) { return apiVersion >= 9; } diff --git a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java index 88b0eac353b..19567344f2d 100644 --- a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java +++ b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java @@ -2236,7 +2236,7 @@ public class CoordinatorRuntime, U> implements Aut short producerEpoch, Duration timeout, CoordinatorWriteOperation op, - Short apiVersion + int apiVersion ) { throwIfNotRunning(); log.debug("Scheduled execution of transactional write operation {}.", name); diff --git a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/PartitionWriter.java b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/PartitionWriter.java index cb8bec3f71c..cc76cfd6460 100644 --- a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/PartitionWriter.java +++ b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/PartitionWriter.java @@ -105,7 +105,7 @@ public interface PartitionWriter { String transactionalId, long producerId, short producerEpoch, - short apiVersion + int apiVersion ) throws KafkaException; /** diff --git a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/InMemoryPartitionWriter.java b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/InMemoryPartitionWriter.java index 66cfbbe8b10..7809c46cb10 100644 --- a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/InMemoryPartitionWriter.java +++ b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/InMemoryPartitionWriter.java @@ -129,7 +129,7 @@ public class InMemoryPartitionWriter implements PartitionWriter { String transactionalId, long producerId, short producerEpoch, - short apiVersion + int apiVersion ) throws KafkaException { return CompletableFuture.completedFuture(new VerificationGuard()); } diff --git a/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala b/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala index 64f23c32f52..d6ea17fe794 100644 --- a/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala +++ b/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala @@ -107,7 +107,7 @@ class CoordinatorPartitionWriter( transactionalId: String, producerId: Long, producerEpoch: Short, - apiVersion: Short + apiVersion: Int ): CompletableFuture[VerificationGuard] = { val transactionSupportedOperation = AddPartitionsToTxnManager.txnOffsetCommitRequestVersionToTransactionSupportedOperation(apiVersion) val future = new CompletableFuture[VerificationGuard]() diff --git a/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala b/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala index 050f48cd4e4..b7e3bd36d84 100644 --- a/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala +++ b/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala @@ -51,7 +51,7 @@ object AddPartitionsToTxnManager { } } - def txnOffsetCommitRequestVersionToTransactionSupportedOperation(version: Short): TransactionSupportedOperation = { + def txnOffsetCommitRequestVersionToTransactionSupportedOperation(version: Int): TransactionSupportedOperation = { if (version > 4) { addPartition } else if (version > 3) { diff --git a/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala index a56dab4fba7..3a6ee39dcc1 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala @@ -178,7 +178,7 @@ class CoordinatorPartitionWriterTest { "transactional-id", 10L, 5.toShort, - ApiKeys.TXN_OFFSET_COMMIT.latestVersion() + ApiKeys.TXN_OFFSET_COMMIT.latestVersion().toInt ) if (error == Errors.NONE) { diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java index 410064008db..54d7e98d4b7 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java @@ -117,7 +117,7 @@ public interface Group { String groupInstanceId, int generationIdOrMemberEpoch, boolean isTransactional, - short apiVersion + int apiVersion ) throws KafkaException; diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java index 98d33e9f254..0bc37c82b14 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java @@ -49,12 +49,12 @@ import org.apache.kafka.common.message.SyncGroupRequestData; import org.apache.kafka.common.message.SyncGroupResponseData; import org.apache.kafka.common.message.TxnOffsetCommitRequestData; import org.apache.kafka.common.message.TxnOffsetCommitResponseData; -import org.apache.kafka.common.requests.RequestContext; import org.apache.kafka.common.requests.TransactionResult; import org.apache.kafka.common.utils.BufferSupplier; import org.apache.kafka.coordinator.group.streams.StreamsGroupHeartbeatResult; import org.apache.kafka.image.MetadataDelta; import org.apache.kafka.image.MetadataImage; +import org.apache.kafka.server.authorizer.AuthorizableRequestContext; import java.time.Duration; import java.util.List; @@ -80,7 +80,7 @@ public interface GroupCoordinator { * The error code(s) of the response are set to indicate the error(s) occurred during the execution. */ CompletableFuture consumerGroupHeartbeat( - RequestContext context, + AuthorizableRequestContext context, ConsumerGroupHeartbeatRequestData request ); @@ -94,7 +94,7 @@ public interface GroupCoordinator { * The error code(s) of the response are set to indicate the error(s) occurred during the execution. */ CompletableFuture streamsGroupHeartbeat( - RequestContext context, + AuthorizableRequestContext context, StreamsGroupHeartbeatRequestData request ); @@ -108,7 +108,7 @@ public interface GroupCoordinator { * The error code(s) of the response are set to indicate the error(s) occurred during the execution. */ CompletableFuture shareGroupHeartbeat( - RequestContext context, + AuthorizableRequestContext context, ShareGroupHeartbeatRequestData request ); @@ -123,7 +123,7 @@ public interface GroupCoordinator { * The error code(s) of the response are set to indicate the error(s) occurred during the execution. */ CompletableFuture joinGroup( - RequestContext context, + AuthorizableRequestContext context, JoinGroupRequestData request, BufferSupplier bufferSupplier ); @@ -139,7 +139,7 @@ public interface GroupCoordinator { * The error code(s) of the response are set to indicate the error(s) occurred during the execution. */ CompletableFuture syncGroup( - RequestContext context, + AuthorizableRequestContext context, SyncGroupRequestData request, BufferSupplier bufferSupplier ); @@ -154,7 +154,7 @@ public interface GroupCoordinator { * The error code(s) of the response are set to indicate the error(s) occurred during the execution. */ CompletableFuture heartbeat( - RequestContext context, + AuthorizableRequestContext context, HeartbeatRequestData request ); @@ -168,7 +168,7 @@ public interface GroupCoordinator { * The error code(s) of the response are set to indicate the error(s) occurred during the execution. */ CompletableFuture leaveGroup( - RequestContext context, + AuthorizableRequestContext context, LeaveGroupRequestData request ); @@ -182,7 +182,7 @@ public interface GroupCoordinator { * The error code(s) of the response are set to indicate the error(s) occurred during the execution. */ CompletableFuture listGroups( - RequestContext context, + AuthorizableRequestContext context, ListGroupsRequestData request ); @@ -196,7 +196,7 @@ public interface GroupCoordinator { * The error codes of the results are set to indicate the errors occurred during the execution. */ CompletableFuture> describeGroups( - RequestContext context, + AuthorizableRequestContext context, List groupIds ); @@ -209,7 +209,7 @@ public interface GroupCoordinator { * @return A future yielding the results or an exception. */ CompletableFuture> consumerGroupDescribe( - RequestContext context, + AuthorizableRequestContext context, List groupIds ); @@ -222,7 +222,7 @@ public interface GroupCoordinator { * @return A future yielding the results or an exception. */ CompletableFuture> streamsGroupDescribe( - RequestContext context, + AuthorizableRequestContext context, List groupIds ); @@ -235,7 +235,7 @@ public interface GroupCoordinator { * @return A future yielding the results or an exception. */ CompletableFuture> shareGroupDescribe( - RequestContext context, + AuthorizableRequestContext context, List groupIds ); @@ -250,7 +250,7 @@ public interface GroupCoordinator { * The error codes of the results are set to indicate the errors occurred during the execution. */ CompletableFuture deleteGroups( - RequestContext context, + AuthorizableRequestContext context, List groupIds, BufferSupplier bufferSupplier ); @@ -265,7 +265,7 @@ public interface GroupCoordinator { * The error codes of the results are set to indicate the errors occurred during the execution. */ CompletableFuture fetchOffsets( - RequestContext context, + AuthorizableRequestContext context, OffsetFetchRequestData.OffsetFetchRequestGroup request, boolean requireStable ); @@ -280,7 +280,7 @@ public interface GroupCoordinator { * The error codes of the results are set to indicate the errors occurred during the execution. */ CompletableFuture fetchAllOffsets( - RequestContext context, + AuthorizableRequestContext context, OffsetFetchRequestData.OffsetFetchRequestGroup request, boolean requireStable ); @@ -295,7 +295,7 @@ public interface GroupCoordinator { * The error codes of the response are set to indicate the errors occurred during the execution. */ CompletableFuture describeShareGroupOffsets( - RequestContext context, + AuthorizableRequestContext context, DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup request ); @@ -309,7 +309,7 @@ public interface GroupCoordinator { * The error codes of the response are set to indicate the errors occurred during the execution. */ CompletableFuture describeShareGroupAllOffsets( - RequestContext context, + AuthorizableRequestContext context, DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup request ); @@ -323,7 +323,7 @@ public interface GroupCoordinator { * The error codes of the response are set to indicate the errors occurred during the execution. */ CompletableFuture deleteShareGroupOffsets( - RequestContext context, + AuthorizableRequestContext context, DeleteShareGroupOffsetsRequestData request ); @@ -338,7 +338,7 @@ public interface GroupCoordinator { * The error code(s) of the response are set to indicate the error(s) occurred during the execution. */ CompletableFuture commitOffsets( - RequestContext context, + AuthorizableRequestContext context, OffsetCommitRequestData request, BufferSupplier bufferSupplier ); @@ -354,7 +354,7 @@ public interface GroupCoordinator { * The error code(s) of the response are set to indicate the error(s) occurred during the execution. */ CompletableFuture commitTransactionalOffsets( - RequestContext context, + AuthorizableRequestContext context, TxnOffsetCommitRequestData request, BufferSupplier bufferSupplier ); @@ -370,7 +370,7 @@ public interface GroupCoordinator { * The error code(s) of the response are set to indicate the error(s) occurred during the execution. */ CompletableFuture deleteOffsets( - RequestContext context, + AuthorizableRequestContext context, OffsetDeleteRequestData request, BufferSupplier bufferSupplier ); diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java index de9f12337e1..46f7e513def 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java @@ -66,7 +66,6 @@ import org.apache.kafka.common.requests.DeleteShareGroupOffsetsRequest; import org.apache.kafka.common.requests.DescribeGroupsRequest; import org.apache.kafka.common.requests.DescribeShareGroupOffsetsRequest; import org.apache.kafka.common.requests.OffsetCommitRequest; -import org.apache.kafka.common.requests.RequestContext; import org.apache.kafka.common.requests.ShareGroupDescribeRequest; import org.apache.kafka.common.requests.StreamsGroupDescribeRequest; import org.apache.kafka.common.requests.TransactionResult; @@ -88,6 +87,7 @@ import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics; import org.apache.kafka.coordinator.group.streams.StreamsGroupHeartbeatResult; import org.apache.kafka.image.MetadataDelta; import org.apache.kafka.image.MetadataImage; +import org.apache.kafka.server.authorizer.AuthorizableRequestContext; import org.apache.kafka.server.authorizer.Authorizer; import org.apache.kafka.server.record.BrokerCompressionType; import org.apache.kafka.server.share.persister.DeleteShareGroupStateParameters; @@ -364,11 +364,11 @@ public class GroupCoordinatorService implements GroupCoordinator { } /** - * See {@link GroupCoordinator#consumerGroupHeartbeat(RequestContext, ConsumerGroupHeartbeatRequestData)}. + * See {@link GroupCoordinator#consumerGroupHeartbeat(AuthorizableRequestContext, ConsumerGroupHeartbeatRequestData)}. */ @Override public CompletableFuture consumerGroupHeartbeat( - RequestContext context, + AuthorizableRequestContext context, ConsumerGroupHeartbeatRequestData request ) { if (!isActive.get()) { @@ -395,11 +395,11 @@ public class GroupCoordinatorService implements GroupCoordinator { /** * See - * {@link GroupCoordinator#streamsGroupHeartbeat(RequestContext, org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData)}. + * {@link GroupCoordinator#streamsGroupHeartbeat(AuthorizableRequestContext, StreamsGroupHeartbeatRequestData)}. */ @Override public CompletableFuture streamsGroupHeartbeat( - RequestContext context, + AuthorizableRequestContext context, StreamsGroupHeartbeatRequestData request ) { if (!isActive.get()) { @@ -432,11 +432,11 @@ public class GroupCoordinatorService implements GroupCoordinator { } /** - * See {@link GroupCoordinator#shareGroupHeartbeat(RequestContext, ShareGroupHeartbeatRequestData)}. + * See {@link GroupCoordinator#shareGroupHeartbeat(AuthorizableRequestContext, ShareGroupHeartbeatRequestData)}. */ @Override public CompletableFuture shareGroupHeartbeat( - RequestContext context, + AuthorizableRequestContext context, ShareGroupHeartbeatRequestData request ) { if (!isActive.get()) { @@ -592,11 +592,11 @@ public class GroupCoordinatorService implements GroupCoordinator { } /** - * See {@link GroupCoordinator#joinGroup(RequestContext, JoinGroupRequestData, BufferSupplier)}. + * See {@link GroupCoordinator#joinGroup(AuthorizableRequestContext, JoinGroupRequestData, BufferSupplier)}. */ @Override public CompletableFuture joinGroup( - RequestContext context, + AuthorizableRequestContext context, JoinGroupRequestData request, BufferSupplier bufferSupplier ) { @@ -646,11 +646,11 @@ public class GroupCoordinatorService implements GroupCoordinator { } /** - * See {@link GroupCoordinator#syncGroup(RequestContext, SyncGroupRequestData, BufferSupplier)}. + * See {@link GroupCoordinator#syncGroup(AuthorizableRequestContext, SyncGroupRequestData, BufferSupplier)}. */ @Override public CompletableFuture syncGroup( - RequestContext context, + AuthorizableRequestContext context, SyncGroupRequestData request, BufferSupplier bufferSupplier ) { @@ -690,11 +690,11 @@ public class GroupCoordinatorService implements GroupCoordinator { } /** - * See {@link GroupCoordinator#heartbeat(RequestContext, HeartbeatRequestData)}. + * See {@link GroupCoordinator#heartbeat(AuthorizableRequestContext, HeartbeatRequestData)}. */ @Override public CompletableFuture heartbeat( - RequestContext context, + AuthorizableRequestContext context, HeartbeatRequestData request ) { if (!isActive.get()) { @@ -733,11 +733,11 @@ public class GroupCoordinatorService implements GroupCoordinator { } /** - * See {@link GroupCoordinator#leaveGroup(RequestContext, LeaveGroupRequestData)}. + * See {@link GroupCoordinator#leaveGroup(AuthorizableRequestContext, LeaveGroupRequestData)}. */ @Override public CompletableFuture leaveGroup( - RequestContext context, + AuthorizableRequestContext context, LeaveGroupRequestData request ) { if (!isActive.get()) { @@ -782,11 +782,11 @@ public class GroupCoordinatorService implements GroupCoordinator { } /** - * See {@link GroupCoordinator#listGroups(RequestContext, ListGroupsRequestData)}. + * See {@link GroupCoordinator#listGroups(AuthorizableRequestContext, ListGroupsRequestData)}. */ @Override public CompletableFuture listGroups( - RequestContext context, + AuthorizableRequestContext context, ListGroupsRequestData request ) { if (!isActive.get()) { @@ -827,11 +827,11 @@ public class GroupCoordinatorService implements GroupCoordinator { } /** - * See {@link GroupCoordinator#consumerGroupDescribe(RequestContext, List)}. + * See {@link GroupCoordinator#consumerGroupDescribe(AuthorizableRequestContext, List)}. */ @Override public CompletableFuture> consumerGroupDescribe( - RequestContext context, + AuthorizableRequestContext context, List groupIds ) { if (!isActive.get()) { @@ -879,11 +879,11 @@ public class GroupCoordinatorService implements GroupCoordinator { } /** - * See {@link GroupCoordinator#streamsGroupDescribe(RequestContext, List)}. + * See {@link GroupCoordinator#streamsGroupDescribe(AuthorizableRequestContext, List)}. */ @Override public CompletableFuture> streamsGroupDescribe( - RequestContext context, + AuthorizableRequestContext context, List groupIds ) { if (!isActive.get()) { @@ -931,11 +931,11 @@ public class GroupCoordinatorService implements GroupCoordinator { } /** - * See {@link GroupCoordinator#shareGroupDescribe(RequestContext, List)}. + * See {@link GroupCoordinator#shareGroupDescribe(AuthorizableRequestContext, List)}. */ @Override public CompletableFuture> shareGroupDescribe( - RequestContext context, + AuthorizableRequestContext context, List groupIds ) { if (!isActive.get()) { @@ -983,11 +983,11 @@ public class GroupCoordinatorService implements GroupCoordinator { } /** - * See {@link GroupCoordinator#describeGroups(RequestContext, List)}. + * See {@link GroupCoordinator#describeGroups(AuthorizableRequestContext, List)}. */ @Override public CompletableFuture> describeGroups( - RequestContext context, + AuthorizableRequestContext context, List groupIds ) { if (!isActive.get()) { @@ -1037,11 +1037,11 @@ public class GroupCoordinatorService implements GroupCoordinator { } /** - * See {@link GroupCoordinator#deleteGroups(RequestContext, List, BufferSupplier)}. + * See {@link GroupCoordinator#deleteGroups(AuthorizableRequestContext, List, BufferSupplier)}. */ @Override public CompletableFuture deleteGroups( - RequestContext context, + AuthorizableRequestContext context, List groupIds, BufferSupplier bufferSupplier ) { @@ -1141,7 +1141,7 @@ public class GroupCoordinatorService implements GroupCoordinator { } private CompletableFuture handleDeleteGroups( - RequestContext context, + AuthorizableRequestContext context, TopicPartition topicPartition, List groupIds ) { @@ -1296,11 +1296,11 @@ public class GroupCoordinatorService implements GroupCoordinator { } /** - * See {@link GroupCoordinator#fetchOffsets(RequestContext, OffsetFetchRequestData.OffsetFetchRequestGroup, boolean)}. + * See {@link GroupCoordinator#fetchOffsets(AuthorizableRequestContext, OffsetFetchRequestData.OffsetFetchRequestGroup, boolean)}. */ @Override public CompletableFuture fetchOffsets( - RequestContext context, + AuthorizableRequestContext context, OffsetFetchRequestData.OffsetFetchRequestGroup request, boolean requireStable ) { @@ -1351,11 +1351,11 @@ public class GroupCoordinatorService implements GroupCoordinator { } /** - * See {@link GroupCoordinator#fetchAllOffsets(RequestContext, OffsetFetchRequestData.OffsetFetchRequestGroup, boolean)}. + * See {@link GroupCoordinator#fetchAllOffsets(AuthorizableRequestContext, OffsetFetchRequestData.OffsetFetchRequestGroup, boolean)}. */ @Override public CompletableFuture fetchAllOffsets( - RequestContext context, + AuthorizableRequestContext context, OffsetFetchRequestData.OffsetFetchRequestGroup request, boolean requireStable ) { @@ -1406,11 +1406,11 @@ public class GroupCoordinatorService implements GroupCoordinator { } /** - * See {@link GroupCoordinator#describeShareGroupOffsets(RequestContext, DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup)}. + * See {@link GroupCoordinator#describeShareGroupOffsets(AuthorizableRequestContext, DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup)}. */ @Override public CompletableFuture describeShareGroupOffsets( - RequestContext context, + AuthorizableRequestContext context, DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup requestData ) { if (!isActive.get()) { @@ -1468,11 +1468,11 @@ public class GroupCoordinatorService implements GroupCoordinator { } /** - * See {@link GroupCoordinator#describeShareGroupAllOffsets(RequestContext, DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup)}. + * See {@link GroupCoordinator#describeShareGroupAllOffsets(AuthorizableRequestContext, DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup)}. */ @Override public CompletableFuture describeShareGroupAllOffsets( - RequestContext context, + AuthorizableRequestContext context, DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup requestData ) { if (!isActive.get()) { @@ -1561,11 +1561,11 @@ public class GroupCoordinatorService implements GroupCoordinator { } /** - * See {@link GroupCoordinator#deleteShareGroupOffsets(RequestContext, DeleteShareGroupOffsetsRequestData)}. + * See {@link GroupCoordinator#deleteShareGroupOffsets(AuthorizableRequestContext, DeleteShareGroupOffsetsRequestData)}. */ @Override public CompletableFuture deleteShareGroupOffsets( - RequestContext context, + AuthorizableRequestContext context, DeleteShareGroupOffsetsRequestData requestData ) { if (!isActive.get()) { @@ -1635,11 +1635,11 @@ public class GroupCoordinatorService implements GroupCoordinator { } /** - * See {@link GroupCoordinator#commitOffsets(RequestContext, OffsetCommitRequestData, BufferSupplier)}. + * See {@link GroupCoordinator#commitOffsets(AuthorizableRequestContext, OffsetCommitRequestData, BufferSupplier)}. */ @Override public CompletableFuture commitOffsets( - RequestContext context, + AuthorizableRequestContext context, OffsetCommitRequestData request, BufferSupplier bufferSupplier ) { @@ -1673,11 +1673,11 @@ public class GroupCoordinatorService implements GroupCoordinator { } /** - * See {@link GroupCoordinator#commitTransactionalOffsets(RequestContext, TxnOffsetCommitRequestData, BufferSupplier)}. + * See {@link GroupCoordinator#commitTransactionalOffsets(AuthorizableRequestContext, TxnOffsetCommitRequestData, BufferSupplier)}. */ @Override public CompletableFuture commitTransactionalOffsets( - RequestContext context, + AuthorizableRequestContext context, TxnOffsetCommitRequestData request, BufferSupplier bufferSupplier ) { @@ -1703,7 +1703,7 @@ public class GroupCoordinatorService implements GroupCoordinator { request.producerEpoch(), Duration.ofMillis(config.offsetCommitTimeoutMs()), coordinator -> coordinator.commitTransactionalOffset(context, request), - context.apiVersion() + context.requestVersion() ).exceptionally(exception -> handleOperationException( "txn-commit-offset", request, @@ -1714,11 +1714,11 @@ public class GroupCoordinatorService implements GroupCoordinator { } /** - * See {@link GroupCoordinator#deleteOffsets(RequestContext, OffsetDeleteRequestData, BufferSupplier)}. + * See {@link GroupCoordinator#deleteOffsets(AuthorizableRequestContext, OffsetDeleteRequestData, BufferSupplier)}. */ @Override public CompletableFuture deleteOffsets( - RequestContext context, + AuthorizableRequestContext context, OffsetDeleteRequestData request, BufferSupplier bufferSupplier ) { diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java index a9d69e2c68f..3af48c5396f 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java @@ -55,7 +55,6 @@ import org.apache.kafka.common.message.TxnOffsetCommitRequestData; import org.apache.kafka.common.message.TxnOffsetCommitResponseData; import org.apache.kafka.common.protocol.ApiMessage; import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.common.requests.RequestContext; import org.apache.kafka.common.requests.TransactionResult; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; @@ -122,6 +121,7 @@ import org.apache.kafka.coordinator.group.modern.share.ShareGroup; import org.apache.kafka.coordinator.group.streams.StreamsGroupHeartbeatResult; import org.apache.kafka.image.MetadataDelta; import org.apache.kafka.image.MetadataImage; +import org.apache.kafka.server.authorizer.AuthorizableRequestContext; import org.apache.kafka.server.authorizer.Authorizer; import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.server.share.persister.DeleteShareGroupStateParameters; @@ -456,7 +456,7 @@ public class GroupCoordinatorShard implements CoordinatorShard consumerGroupHeartbeat( - RequestContext context, + AuthorizableRequestContext context, ConsumerGroupHeartbeatRequestData request ) { return groupMetadataManager.consumerGroupHeartbeat(context, request); @@ -472,7 +472,7 @@ public class GroupCoordinatorShard implements CoordinatorShard streamsGroupHeartbeat( - RequestContext context, + AuthorizableRequestContext context, StreamsGroupHeartbeatRequestData request ) { return groupMetadataManager.streamsGroupHeartbeat(context, request); @@ -488,7 +488,7 @@ public class GroupCoordinatorShard implements CoordinatorShard>, CoordinatorRecord> shareGroupHeartbeat( - RequestContext context, + AuthorizableRequestContext context, ShareGroupHeartbeatRequestData request ) { return groupMetadataManager.shareGroupHeartbeat(context, request); @@ -557,7 +557,7 @@ public class GroupCoordinatorShard implements CoordinatorShard classicGroupJoin( - RequestContext context, + AuthorizableRequestContext context, JoinGroupRequestData request, CompletableFuture responseFuture ) { @@ -578,7 +578,7 @@ public class GroupCoordinatorShard implements CoordinatorShard classicGroupSync( - RequestContext context, + AuthorizableRequestContext context, SyncGroupRequestData request, CompletableFuture responseFuture ) { @@ -599,7 +599,7 @@ public class GroupCoordinatorShard implements CoordinatorShard classicGroupHeartbeat( - RequestContext context, + AuthorizableRequestContext context, HeartbeatRequestData request ) { return groupMetadataManager.classicGroupHeartbeat( @@ -617,7 +617,7 @@ public class GroupCoordinatorShard implements CoordinatorShard deleteGroups( - RequestContext context, + AuthorizableRequestContext context, List groupIds ) throws ApiException { final DeleteGroupsResponseData.DeletableGroupResultCollection resultCollection = @@ -774,7 +774,7 @@ public class GroupCoordinatorShard implements CoordinatorShard commitOffset( - RequestContext context, + AuthorizableRequestContext context, OffsetCommitRequestData request ) throws ApiException { return offsetMetadataManager.commitOffset(context, request); @@ -790,7 +790,7 @@ public class GroupCoordinatorShard implements CoordinatorShard commitTransactionalOffset( - RequestContext context, + AuthorizableRequestContext context, TxnOffsetCommitRequestData request ) throws ApiException { return offsetMetadataManager.commitTransactionalOffset(context, request); @@ -875,7 +875,7 @@ public class GroupCoordinatorShard implements CoordinatorShard describeGroups( - RequestContext context, + AuthorizableRequestContext context, List groupIds, long committedOffset ) { @@ -892,7 +892,7 @@ public class GroupCoordinatorShard implements CoordinatorShard classicGroupLeave( - RequestContext context, + AuthorizableRequestContext context, LeaveGroupRequestData request ) throws ApiException { return groupMetadataManager.classicGroupLeave(context, request); @@ -908,7 +908,7 @@ public class GroupCoordinatorShard implements CoordinatorShard deleteOffsets( - RequestContext context, + AuthorizableRequestContext context, OffsetDeleteRequestData request ) throws ApiException { return offsetMetadataManager.deleteOffsets(request); diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index 8bd3dfd5abc..2c03b2a3029 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -73,7 +73,6 @@ import org.apache.kafka.common.message.SyncGroupResponseData; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.types.SchemaException; import org.apache.kafka.common.requests.JoinGroupRequest; -import org.apache.kafka.common.requests.RequestContext; import org.apache.kafka.common.requests.ShareGroupHeartbeatRequest; import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse; import org.apache.kafka.common.resource.ResourcePattern; @@ -167,6 +166,7 @@ import org.apache.kafka.image.TopicImage; import org.apache.kafka.image.TopicsDelta; import org.apache.kafka.image.TopicsImage; import org.apache.kafka.server.authorizer.Action; +import org.apache.kafka.server.authorizer.AuthorizableRequestContext; import org.apache.kafka.server.authorizer.AuthorizationResult; import org.apache.kafka.server.authorizer.Authorizer; import org.apache.kafka.server.share.persister.DeleteShareGroupStateParameters; @@ -743,7 +743,7 @@ public class GroupMetadataManager { * @return A list containing the DescribeGroupsResponseData.DescribedGroup. */ public List describeGroups( - RequestContext context, + AuthorizableRequestContext context, List groupIds, long committedOffset ) { @@ -779,7 +779,7 @@ public class GroupMetadataManager { ); } } catch (GroupIdNotFoundException exception) { - if (context.header.apiVersion() >= 6) { + if (context.requestVersion() >= 6) { describedGroups.add(new DescribeGroupsResponseData.DescribedGroup() .setGroupId(groupId) .setGroupState(DEAD.toString()) @@ -1502,7 +1502,7 @@ public class GroupMetadataManager { */ private void throwIfConsumerGroupHeartbeatRequestIsInvalid( ConsumerGroupHeartbeatRequestData request, - short apiVersion + int apiVersion ) throws InvalidRequestException, UnsupportedAssignorException { if (apiVersion >= CONSUMER_GENERATED_MEMBER_ID_REQUIRED_VERSION || request.memberEpoch() > 0 || @@ -2369,7 +2369,7 @@ public class GroupMetadataManager { * a list of records to update the state machine. */ private CoordinatorResult consumerGroupHeartbeat( - RequestContext context, + AuthorizableRequestContext context, String groupId, String memberId, int memberEpoch, @@ -2427,7 +2427,7 @@ public class GroupMetadataManager { .maybeUpdateSubscribedTopicNames(Optional.ofNullable(subscribedTopicNames)) .maybeUpdateSubscribedTopicRegex(Optional.ofNullable(subscribedTopicRegex)) .setClientId(context.clientId()) - .setClientHost(context.clientAddress.toString()) + .setClientHost(context.clientAddress().toString()) .setClassicMemberMetadata(null) .build(); @@ -2538,7 +2538,7 @@ public class GroupMetadataManager { */ private CoordinatorResult classicGroupJoinToConsumerGroup( ConsumerGroup group, - RequestContext context, + AuthorizableRequestContext context, JoinGroupRequestData request, CompletableFuture responseFuture ) throws ApiException { @@ -2556,7 +2556,7 @@ public class GroupMetadataManager { throwIfConsumerGroupIsFull(group, memberId); throwIfClassicProtocolIsNotSupported(group, memberId, request.protocolType(), protocols); - if (JoinGroupRequest.requiresKnownMemberId(request, context.apiVersion())) { + if (JoinGroupRequest.requiresKnownMemberId(request, context.requestVersion())) { // A dynamic member requiring a member id joins the group. Send back a response to call for another // join group request with allocated member id. responseFuture.complete(new JoinGroupResponseData() @@ -2609,7 +2609,7 @@ public class GroupMetadataManager { .maybeUpdateServerAssignorName(Optional.empty()) .maybeUpdateSubscribedTopicNames(Optional.ofNullable(subscription.topics())) .setClientId(context.clientId()) - .setClientHost(context.clientAddress.toString()) + .setClientHost(context.clientAddress().toString()) .setClassicMemberMetadata( new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata() .setSessionTimeoutMs(sessionTimeoutMs) @@ -3228,7 +3228,7 @@ public class GroupMetadataManager { * @return Whether a rebalance must be triggered. */ private boolean maybeUpdateRegularExpressions( - RequestContext context, + AuthorizableRequestContext context, ConsumerGroup group, ConsumerGroupMember member, ConsumerGroupMember updatedMember, @@ -3339,7 +3339,7 @@ public class GroupMetadataManager { * public for benchmarks. */ public static Map refreshRegularExpressions( - RequestContext context, + AuthorizableRequestContext context, String groupId, Logger log, Time time, @@ -3404,7 +3404,7 @@ public class GroupMetadataManager { * @param resolvedRegexes The map of the regex pattern and its set of matched topics. */ private static void filterTopicDescribeAuthorizedTopics( - RequestContext context, + AuthorizableRequestContext context, Optional> authorizerPlugin, Map> resolvedRegexes ) { @@ -4792,10 +4792,10 @@ public class GroupMetadataManager { * a list of records to update the state machine. */ public CoordinatorResult consumerGroupHeartbeat( - RequestContext context, + AuthorizableRequestContext context, ConsumerGroupHeartbeatRequestData request ) throws ApiException { - throwIfConsumerGroupHeartbeatRequestIsInvalid(request, context.apiVersion()); + throwIfConsumerGroupHeartbeatRequestIsInvalid(request, context.requestVersion()); if (request.memberEpoch() == LEAVE_GROUP_MEMBER_EPOCH || request.memberEpoch() == LEAVE_GROUP_STATIC_MEMBER_EPOCH) { // -1 means that the member wants to leave the group. @@ -4834,7 +4834,7 @@ public class GroupMetadataManager { * a list of records to update the state machine. */ public CoordinatorResult streamsGroupHeartbeat( - RequestContext context, + AuthorizableRequestContext context, StreamsGroupHeartbeatRequestData request ) throws ApiException { throwIfStreamsGroupHeartbeatRequestIsInvalid(request); @@ -4857,7 +4857,7 @@ public class GroupMetadataManager { request.rackId(), request.rebalanceTimeoutMs(), context.clientId(), - context.clientAddress.toString(), + context.clientAddress().toString(), request.topology(), request.activeTasks(), request.standbyTasks(), @@ -4917,7 +4917,7 @@ public class GroupMetadataManager { * and a list of records to update the state machine. */ public CoordinatorResult>, CoordinatorRecord> shareGroupHeartbeat( - RequestContext context, + AuthorizableRequestContext context, ShareGroupHeartbeatRequestData request ) throws ApiException { throwIfShareGroupHeartbeatRequestIsInvalid(request); @@ -4941,7 +4941,7 @@ public class GroupMetadataManager { request.memberEpoch(), request.rackId(), context.clientId(), - context.clientAddress.toString(), + context.clientAddress().toString(), request.subscribedTopicNames()); } @@ -6209,7 +6209,7 @@ public class GroupMetadataManager { * @return The result that contains records to append if the join group phase completes. */ public CoordinatorResult classicGroupJoin( - RequestContext context, + AuthorizableRequestContext context, JoinGroupRequestData request, CompletableFuture responseFuture ) { @@ -6244,7 +6244,7 @@ public class GroupMetadataManager { * @return The result that contains records to append if the join group phase completes. */ CoordinatorResult classicGroupJoinToClassicGroup( - RequestContext context, + AuthorizableRequestContext context, JoinGroupRequestData request, CompletableFuture responseFuture ) { @@ -6360,7 +6360,7 @@ public class GroupMetadataManager { * @return The coordinator result that will be appended to the log. */ private CoordinatorResult classicGroupJoinNewMember( - RequestContext context, + AuthorizableRequestContext context, JoinGroupRequestData request, ClassicGroup group, CompletableFuture responseFuture @@ -6418,7 +6418,7 @@ public class GroupMetadataManager { * @return The coordinator result that will be appended to the log. */ private CoordinatorResult classicGroupJoinNewStaticMember( - RequestContext context, + AuthorizableRequestContext context, JoinGroupRequestData request, ClassicGroup group, String newMemberId, @@ -6464,13 +6464,13 @@ public class GroupMetadataManager { * @return The coordinator result that will be appended to the log. */ private CoordinatorResult classicGroupJoinNewDynamicMember( - RequestContext context, + AuthorizableRequestContext context, JoinGroupRequestData request, ClassicGroup group, String newMemberId, CompletableFuture responseFuture ) { - if (JoinGroupRequest.requiresKnownMemberId(context.apiVersion())) { + if (JoinGroupRequest.requiresKnownMemberId(context.requestVersion())) { // If member id required, register the member in the pending member list and send // back a response to call for another join group request with allocated member id. log.info("Dynamic member with unknown member id joins group {} in {} state. " + @@ -6514,7 +6514,7 @@ public class GroupMetadataManager { * @return The coordinator result that will be appended to the log. */ private CoordinatorResult classicGroupJoinExistingMember( - RequestContext context, + AuthorizableRequestContext context, JoinGroupRequestData request, ClassicGroup group, CompletableFuture responseFuture @@ -6937,7 +6937,7 @@ public class GroupMetadataManager { * @return The coordinator result that will be appended to the log. */ private CoordinatorResult addMemberThenRebalanceOrCompleteJoin( - RequestContext context, + AuthorizableRequestContext context, JoinGroupRequestData request, ClassicGroup group, String memberId, @@ -7318,7 +7318,7 @@ public class GroupMetadataManager { * @return The coordinator result that will be appended to the log. */ private CoordinatorResult updateStaticMemberThenRebalanceOrCompleteJoin( - RequestContext context, + AuthorizableRequestContext context, JoinGroupRequestData request, ClassicGroup group, String oldMemberId, @@ -7377,7 +7377,7 @@ public class GroupMetadataManager { .setSkipAssignment(false) .setErrorCode(appendGroupMetadataErrorToResponseError(Errors.forException(t)).code())); - } else if (JoinGroupRequest.supportsSkippingAssignment(context.apiVersion())) { + } else if (JoinGroupRequest.supportsSkippingAssignment(context.requestVersion())) { boolean isLeader = group.isLeader(newMemberId); group.completeJoinFuture(newMember, new JoinGroupResponseData() @@ -7442,7 +7442,7 @@ public class GroupMetadataManager { * @return The result that contains records to append. */ public CoordinatorResult classicGroupSync( - RequestContext context, + AuthorizableRequestContext context, SyncGroupRequestData request, CompletableFuture responseFuture ) throws UnknownMemberIdException { @@ -7484,7 +7484,7 @@ public class GroupMetadataManager { */ private CoordinatorResult classicGroupSyncToClassicGroup( ClassicGroup group, - RequestContext context, + AuthorizableRequestContext context, SyncGroupRequestData request, CompletableFuture responseFuture ) throws IllegalStateException { @@ -7582,7 +7582,7 @@ public class GroupMetadataManager { */ private CoordinatorResult classicGroupSyncToConsumerGroup( ConsumerGroup group, - RequestContext context, + AuthorizableRequestContext context, SyncGroupRequestData request, CompletableFuture responseFuture ) throws UnknownMemberIdException, FencedInstanceIdException, IllegalGenerationException, @@ -7721,7 +7721,7 @@ public class GroupMetadataManager { * @return The coordinator result that contains the heartbeat response. */ public CoordinatorResult classicGroupHeartbeat( - RequestContext context, + AuthorizableRequestContext context, HeartbeatRequestData request ) { Group group; @@ -7755,7 +7755,7 @@ public class GroupMetadataManager { */ private CoordinatorResult classicGroupHeartbeatToClassicGroup( ClassicGroup group, - RequestContext context, + AuthorizableRequestContext context, HeartbeatRequestData request ) { validateClassicGroupHeartbeat(group, request.memberId(), request.groupInstanceId(), request.generationId()); @@ -7838,7 +7838,7 @@ public class GroupMetadataManager { */ private CoordinatorResult classicGroupHeartbeatToConsumerGroup( ConsumerGroup group, - RequestContext context, + AuthorizableRequestContext context, HeartbeatRequestData request ) throws UnknownMemberIdException, FencedInstanceIdException, IllegalGenerationException { String groupId = request.groupId(); @@ -7908,7 +7908,7 @@ public class GroupMetadataManager { * @return The LeaveGroup response and the records to append. */ public CoordinatorResult classicGroupLeave( - RequestContext context, + AuthorizableRequestContext context, LeaveGroupRequestData request ) throws UnknownMemberIdException { Group group; @@ -8135,7 +8135,7 @@ public class GroupMetadataManager { /** * Handles a DeleteGroups request. * Populates the record list passed in with record to update the state machine. - * Validations are done in {@link GroupCoordinatorShard#deleteGroups(RequestContext, List)} by + * Validations are done in {@link GroupCoordinatorShard#deleteGroups(AuthorizableRequestContext, List)} by * calling {@link GroupMetadataManager#validateDeleteGroup(String)}. * * @param groupId The id of the group to be deleted. It has been checked in {@link GroupMetadataManager#validateDeleteGroup}. diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java index 9f601a74917..2b50071a7f7 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java @@ -35,7 +35,6 @@ import org.apache.kafka.common.message.TxnOffsetCommitResponseData.TxnOffsetComm import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.record.RecordBatch; import org.apache.kafka.common.requests.OffsetCommitRequest; -import org.apache.kafka.common.requests.RequestContext; import org.apache.kafka.common.requests.TransactionResult; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; @@ -48,6 +47,7 @@ import org.apache.kafka.coordinator.group.generated.OffsetCommitValue; import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics; import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard; import org.apache.kafka.image.MetadataImage; +import org.apache.kafka.server.authorizer.AuthorizableRequestContext; import org.apache.kafka.timeline.SnapshotRegistry; import org.apache.kafka.timeline.TimelineHashMap; import org.apache.kafka.timeline.TimelineHashSet; @@ -290,7 +290,7 @@ public class OffsetMetadataManager { * @param request The actual request. */ private Group validateOffsetCommit( - RequestContext context, + AuthorizableRequestContext context, OffsetCommitRequestData request ) throws ApiException { Group group; @@ -305,7 +305,7 @@ public class OffsetMetadataManager { log.info("[GroupId {}] Creating a simple consumer group via manual offset commit.", request.groupId()); group = groupMetadataManager.getOrMaybeCreateClassicGroup(request.groupId(), true); } else { - if (context.header.apiVersion() >= 9) { + if (context.requestVersion() >= 9) { // Starting from version 9 of the OffsetCommit API, we return GROUP_ID_NOT_FOUND // if the group does not exist. This error works for both the old and the new // protocol for clients using this version of the API. @@ -323,7 +323,7 @@ public class OffsetMetadataManager { request.groupInstanceId(), request.generationIdOrMemberEpoch(), false, - context.apiVersion() + context.requestVersion() ); return group; @@ -336,7 +336,7 @@ public class OffsetMetadataManager { * @param request The actual request. */ private Group validateTransactionalOffsetCommit( - RequestContext context, + AuthorizableRequestContext context, TxnOffsetCommitRequestData request ) throws ApiException { Group group; @@ -360,7 +360,7 @@ public class OffsetMetadataManager { request.groupInstanceId(), request.generationId(), true, - context.apiVersion() + context.requestVersion() ); } catch (StaleMemberEpochException ex) { throw Errors.ILLEGAL_GENERATION.exception(); @@ -438,7 +438,7 @@ public class OffsetMetadataManager { * a list of records to update the state machine. */ public CoordinatorResult commitOffset( - RequestContext context, + AuthorizableRequestContext context, OffsetCommitRequestData request ) throws ApiException { Group group = validateOffsetCommit(context, request); @@ -511,7 +511,7 @@ public class OffsetMetadataManager { * a list of records to update the state machine. */ public CoordinatorResult commitTransactionalOffset( - RequestContext context, + AuthorizableRequestContext context, TxnOffsetCommitRequestData request ) throws ApiException { validateTransactionalOffsetCommit(context, request); @@ -623,7 +623,7 @@ public class OffsetMetadataManager { /** * Deletes offsets as part of a DeleteGroups request. * Populates the record list passed in with records to update the state machine. - * Validations are done in {@link GroupCoordinatorShard#deleteGroups(RequestContext, List)} + * Validations are done in {@link GroupCoordinatorShard#deleteGroups(AuthorizableRequestContext, List)} * * @param groupId The id of the given group. * @param records The record list to populate. diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/classic/ClassicGroup.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/classic/ClassicGroup.java index 39a05be844d..0e14d4e63db 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/classic/ClassicGroup.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/classic/ClassicGroup.java @@ -831,7 +831,7 @@ public class ClassicGroup implements Group { String groupInstanceId, int generationId, boolean isTransactional, - short apiVersion + int apiVersion ) throws CoordinatorNotAvailableException, UnknownMemberIdException, IllegalGenerationException, FencedInstanceIdException { if (isInState(DEAD)) { throw Errors.COORDINATOR_NOT_AVAILABLE.exception(); diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java index ef95b123ff9..aea5bb42500 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java @@ -644,7 +644,7 @@ public class ConsumerGroup extends ModernGroup { String groupInstanceId, int memberEpoch, boolean isTransactional, - short apiVersion + int apiVersion ) throws UnknownMemberIdException, StaleMemberEpochException, IllegalGenerationException { // When the member epoch is -1, the request comes from either the admin client // or a consumer which does not use the group management facility. In this case, diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroup.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroup.java index b63100744c6..eacbab7dbff 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroup.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroup.java @@ -186,7 +186,7 @@ public class ShareGroup extends ModernGroup { String groupInstanceId, int memberEpoch, boolean isTransactional, - short apiVersion + int apiVersion ) { throw new GroupIdNotFoundException(String.format("Group %s is not a consumer group.", groupId)); } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java index 3ace0d1c5b6..549eefe618a 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java @@ -683,7 +683,7 @@ public class StreamsGroup implements Group { String groupInstanceId, int memberEpoch, boolean isTransactional, - short apiVersion + int apiVersion ) throws UnknownMemberIdException, StaleMemberEpochException { // When the member epoch is -1, the request comes from either the admin client // or a consumer which does not use the group management facility. In this case, diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java index 01e74ea558a..003d864b937 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java @@ -2396,7 +2396,7 @@ public class GroupCoordinatorServiceTest { @ParameterizedTest @ValueSource(shorts = {4, 5}) - public void testCommitTransactionalOffsets(Short txnOffsetCommitVersion) throws ExecutionException, InterruptedException { + public void testCommitTransactionalOffsets(short txnOffsetCommitVersion) throws ExecutionException, InterruptedException { CoordinatorRuntime runtime = mockRuntime(); GroupCoordinatorService service = new GroupCoordinatorServiceBuilder() .setConfig(createConfig()) @@ -2431,7 +2431,7 @@ public class GroupCoordinatorServiceTest { ArgumentMatchers.eq((short) 5), ArgumentMatchers.eq(Duration.ofMillis(5000)), ArgumentMatchers.any(), - ArgumentMatchers.any() + ArgumentMatchers.eq((int) txnOffsetCommitVersion) )).thenReturn(CompletableFuture.completedFuture(response)); CompletableFuture future = service.commitTransactionalOffsets( @@ -2486,7 +2486,7 @@ public class GroupCoordinatorServiceTest { ArgumentMatchers.eq((short) 5), ArgumentMatchers.eq(Duration.ofMillis(5000)), ArgumentMatchers.any(), - ArgumentMatchers.any() + ArgumentMatchers.eq((int) ApiKeys.TXN_OFFSET_COMMIT.latestVersion()) )).thenReturn(FutureUtils.failedFuture(new CompletionException(error.exception()))); CompletableFuture future = service.commitTransactionalOffsets(