Compare commits

...

1 Commits

Author SHA1 Message Date
1sonofqiu 1858fd954c
feat(api): add GetConsumerJoinAttempts API request and response handling 2026-01-04 21:30:57 +08:00
7 changed files with 288 additions and 2 deletions

View File

@ -152,7 +152,8 @@ public enum ApiKeys {
GET_NEXT_NODE_ID(ApiMessageType.GET_NEXT_NODE_ID, false, true),
DESCRIBE_STREAMS(ApiMessageType.DESCRIBE_STREAMS, false, true),
AUTOMQ_UPDATE_GROUP(ApiMessageType.AUTOMQ_UPDATE_GROUP);
AUTOMQ_UPDATE_GROUP(ApiMessageType.AUTOMQ_UPDATE_GROUP),
GET_CONSUMER_JOIN_ATTEMPTS(ApiMessageType.GET_CONSUMER_JOIN_ATTEMPTS);
// AutoMQ for Kafka inject end
private static final Map<ApiMessageType.ListenerType, EnumSet<ApiKeys>> APIS_BY_LISTENER =

View File

@ -0,0 +1,77 @@
/*
* Copyright 2025, AutoMQ HK Limited.
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.requests.s3;
import org.apache.kafka.common.message.GetConsumerJoinAttemptsRequestData;
import org.apache.kafka.common.message.GetConsumerJoinAttemptsResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.ApiError;
import java.nio.ByteBuffer;
public class GetConsumerJoinAttemptsRequest extends AbstractRequest {
private final GetConsumerJoinAttemptsRequestData data;
public GetConsumerJoinAttemptsRequest(GetConsumerJoinAttemptsRequestData data, short version) {
super(ApiKeys.GET_CONSUMER_JOIN_ATTEMPTS, version);
this.data = data;
}
@Override
public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
ApiError apiError = ApiError.fromThrowable(e);
GetConsumerJoinAttemptsResponseData response = new GetConsumerJoinAttemptsResponseData()
.setGroupId(data.groupId())
.setErrorCode(apiError.error().code())
.setErrorMessage(apiError.message());
return new GetConsumerJoinAttemptsResponse(response);
}
@Override
public GetConsumerJoinAttemptsRequestData data() {
return data;
}
public static GetConsumerJoinAttemptsRequest parse(ByteBuffer buffer, short version) {
return new GetConsumerJoinAttemptsRequest(new GetConsumerJoinAttemptsRequestData(new ByteBufferAccessor(buffer), version), version);
}
public static class Builder extends AbstractRequest.Builder<GetConsumerJoinAttemptsRequest> {
private final GetConsumerJoinAttemptsRequestData data;
public Builder(GetConsumerJoinAttemptsRequestData data) {
super(ApiKeys.GET_CONSUMER_JOIN_ATTEMPTS);
this.data = data;
}
@Override
public GetConsumerJoinAttemptsRequest build(short version) {
return new GetConsumerJoinAttemptsRequest(data, version);
}
@Override
public String toString() {
return data.toString();
}
}
}

View File

@ -0,0 +1,62 @@
/*
* Copyright 2025, AutoMQ HK Limited.
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.requests.s3;
import org.apache.kafka.common.message.GetConsumerJoinAttemptsResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractResponse;
import java.nio.ByteBuffer;
import java.util.Map;
public class GetConsumerJoinAttemptsResponse extends AbstractResponse {
private final GetConsumerJoinAttemptsResponseData data;
public GetConsumerJoinAttemptsResponse(GetConsumerJoinAttemptsResponseData data) {
super(ApiKeys.GET_CONSUMER_JOIN_ATTEMPTS);
this.data = data;
}
@Override
public GetConsumerJoinAttemptsResponseData data() {
return data;
}
@Override
public Map<Errors, Integer> errorCounts() {
return errorCounts(Errors.forCode(data.errorCode()));
}
@Override
public int throttleTimeMs() {
return 0; // No throttling for this API
}
@Override
public void maybeSetThrottleTimeMs(int throttleTimeMs) {
// No throttling for this API
}
public static GetConsumerJoinAttemptsResponse parse(ByteBuffer buffer, short version) {
return new GetConsumerJoinAttemptsResponse(new GetConsumerJoinAttemptsResponseData(new ByteBufferAccessor(buffer), version));
}
}

View File

@ -0,0 +1,33 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
{
"apiKey": 603,
"type": "request",
"listeners": [
"broker"
],
"name": "GetConsumerJoinAttemptsRequest",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{
"name": "GroupId",
"type": "string",
"versions": "0+",
"about": "The group identifier to query join attempts for."
}
]
}

View File

@ -0,0 +1,81 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
{
"apiKey": 603,
"type": "response",
"name": "GetConsumerJoinAttemptsResponse",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{
"name": "GroupId",
"type": "string",
"versions": "0+",
"about": "The group identifier."
},
{
"name": "ErrorCode",
"type": "int16",
"versions": "0+",
"about": "The error code, 0 if no error."
},
{
"name": "ErrorMessage",
"type": "string",
"versions": "0+",
"nullableVersions": "0+",
"about": "The error message if any."
},
{
"name": "JoinAttempts",
"type": "[]JoinAttempt",
"versions": "0+",
"about": "List of consumer join attempts.",
"fields": [
{
"name": "MemberId",
"type": "string",
"versions": "0+",
"about": "The member ID of the consumer."
},
{
"name": "ClientId",
"type": "string",
"versions": "0+",
"about": "The client ID of the consumer."
},
{
"name": "ClientAddress",
"type": "string",
"versions": "0+",
"about": "The client address."
},
{
"name": "Timestamp",
"type": "int64",
"versions": "0+",
"about": "Timestamp of the join attempt."
},
{
"name": "SessionTimeoutMs",
"type": "int32",
"versions": "0+",
"about": "Session timeout in milliseconds."
}
]
}
]
}

View File

@ -27,7 +27,7 @@ import org.apache.kafka.common.record._
import org.apache.kafka.common.replica.ClientMetadata
import org.apache.kafka.common.replica.ClientMetadata.DefaultClientMetadata
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.requests.s3.{AutomqGetPartitionSnapshotRequest, AutomqUpdateGroupRequest, AutomqUpdateGroupResponse, AutomqZoneRouterRequest}
import org.apache.kafka.common.requests.s3.{AutomqGetPartitionSnapshotRequest, AutomqUpdateGroupRequest, AutomqUpdateGroupResponse, AutomqZoneRouterRequest, GetConsumerJoinAttemptsRequest, GetConsumerJoinAttemptsResponse}
import org.apache.kafka.common.requests.{AbstractResponse, DeleteTopicsRequest, DeleteTopicsResponse, FetchRequest, FetchResponse, ProduceRequest, ProduceResponse, RequestUtils}
import org.apache.kafka.common.resource.Resource.CLUSTER_NAME
import org.apache.kafka.common.resource.ResourceType.{CLUSTER, TOPIC, TRANSACTIONAL_ID}
@ -179,6 +179,7 @@ class ElasticKafkaApis(
case ApiKeys.DELETE_TOPICS => maybeForwardTopicDeletionToController(request, handleDeleteTopicsRequest)
case ApiKeys.GET_NEXT_NODE_ID => forwardToControllerOrFail(request)
case ApiKeys.AUTOMQ_UPDATE_GROUP => handleUpdateGroupRequest(request, requestLocal)
case ApiKeys.GET_CONSUMER_JOIN_ATTEMPTS => handleGetConsumerJoinAttemptsRequest(request, requestLocal)
case _ =>
throw new IllegalStateException("Message conversion info is recorded only for Produce/Fetch requests")
@ -208,6 +209,7 @@ class ElasticKafkaApis(
| ApiKeys.GET_NEXT_NODE_ID
| ApiKeys.AUTOMQ_ZONE_ROUTER
| ApiKeys.AUTOMQ_UPDATE_GROUP
| ApiKeys.GET_CONSUMER_JOIN_ATTEMPTS
| ApiKeys.AUTOMQ_GET_PARTITION_SNAPSHOT => handleExtensionRequest(request, requestLocal)
case _ => super.handle(request, requestLocal)
}
@ -438,6 +440,18 @@ class ElasticKafkaApis(
})
}
def handleGetConsumerJoinAttemptsRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
val getConsumerJoinAttemptsRequest = request.body[GetConsumerJoinAttemptsRequest]
groupCoordinator.getConsumerJoinAttempts(request.context, getConsumerJoinAttemptsRequest.data(), requestLocal.bufferSupplier)
.whenComplete((response, ex) => {
if (ex != null) {
requestHelper.sendMaybeThrottle(request, getConsumerJoinAttemptsRequest.getErrorResponse(ex))
} else {
requestHelper.sendMaybeThrottle(request, new GetConsumerJoinAttemptsResponse(response))
}
})
}
def handleZoneRouterRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
val zoneRouterRequest = request.body[AutomqZoneRouterRequest]
trafficInterceptor.handleZoneRouterRequest(zoneRouterRequest.data()).thenAccept(response => {

View File

@ -19,6 +19,8 @@ package org.apache.kafka.coordinator.group;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.message.AutomqUpdateGroupRequestData;
import org.apache.kafka.common.message.AutomqUpdateGroupResponseData;
import org.apache.kafka.common.message.GetConsumerJoinAttemptsRequestData;
import org.apache.kafka.common.message.GetConsumerJoinAttemptsResponseData;
import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
@ -432,5 +434,21 @@ public interface GroupCoordinator {
AutomqUpdateGroupRequestData request,
BufferSupplier bufferSupplier
);
/**
* Get consumer join attempts for a group
*
* @param context The coordinator request context.
* @param request The GetConsumerJoinAttemptsRequestData data.
* @param bufferSupplier The buffer supplier tight to the request thread.
*
* @return A future yielding the response.
* The error code(s) of the response are set to indicate the error(s) occurred during the execution.
*/
CompletableFuture<GetConsumerJoinAttemptsResponseData> getConsumerJoinAttempts(
RequestContext context,
GetConsumerJoinAttemptsRequestData request,
BufferSupplier bufferSupplier
);
// AutoMQ injection end
}