Compare commits
1 Commits
main
...
feat/linki
| Author | SHA1 | Date |
|---|---|---|
|
|
1858fd954c |
|
|
@ -152,7 +152,8 @@ public enum ApiKeys {
|
|||
|
||||
GET_NEXT_NODE_ID(ApiMessageType.GET_NEXT_NODE_ID, false, true),
|
||||
DESCRIBE_STREAMS(ApiMessageType.DESCRIBE_STREAMS, false, true),
|
||||
AUTOMQ_UPDATE_GROUP(ApiMessageType.AUTOMQ_UPDATE_GROUP);
|
||||
AUTOMQ_UPDATE_GROUP(ApiMessageType.AUTOMQ_UPDATE_GROUP),
|
||||
GET_CONSUMER_JOIN_ATTEMPTS(ApiMessageType.GET_CONSUMER_JOIN_ATTEMPTS);
|
||||
// AutoMQ for Kafka inject end
|
||||
|
||||
private static final Map<ApiMessageType.ListenerType, EnumSet<ApiKeys>> APIS_BY_LISTENER =
|
||||
|
|
|
|||
|
|
@ -0,0 +1,77 @@
|
|||
/*
|
||||
* Copyright 2025, AutoMQ HK Limited.
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.kafka.common.requests.s3;
|
||||
|
||||
import org.apache.kafka.common.message.GetConsumerJoinAttemptsRequestData;
|
||||
import org.apache.kafka.common.message.GetConsumerJoinAttemptsResponseData;
|
||||
import org.apache.kafka.common.protocol.ApiKeys;
|
||||
import org.apache.kafka.common.protocol.ByteBufferAccessor;
|
||||
import org.apache.kafka.common.requests.AbstractRequest;
|
||||
import org.apache.kafka.common.requests.AbstractResponse;
|
||||
import org.apache.kafka.common.requests.ApiError;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
public class GetConsumerJoinAttemptsRequest extends AbstractRequest {
|
||||
private final GetConsumerJoinAttemptsRequestData data;
|
||||
|
||||
public GetConsumerJoinAttemptsRequest(GetConsumerJoinAttemptsRequestData data, short version) {
|
||||
super(ApiKeys.GET_CONSUMER_JOIN_ATTEMPTS, version);
|
||||
this.data = data;
|
||||
}
|
||||
|
||||
@Override
|
||||
public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
|
||||
ApiError apiError = ApiError.fromThrowable(e);
|
||||
GetConsumerJoinAttemptsResponseData response = new GetConsumerJoinAttemptsResponseData()
|
||||
.setGroupId(data.groupId())
|
||||
.setErrorCode(apiError.error().code())
|
||||
.setErrorMessage(apiError.message());
|
||||
return new GetConsumerJoinAttemptsResponse(response);
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetConsumerJoinAttemptsRequestData data() {
|
||||
return data;
|
||||
}
|
||||
|
||||
public static GetConsumerJoinAttemptsRequest parse(ByteBuffer buffer, short version) {
|
||||
return new GetConsumerJoinAttemptsRequest(new GetConsumerJoinAttemptsRequestData(new ByteBufferAccessor(buffer), version), version);
|
||||
}
|
||||
|
||||
public static class Builder extends AbstractRequest.Builder<GetConsumerJoinAttemptsRequest> {
|
||||
private final GetConsumerJoinAttemptsRequestData data;
|
||||
|
||||
public Builder(GetConsumerJoinAttemptsRequestData data) {
|
||||
super(ApiKeys.GET_CONSUMER_JOIN_ATTEMPTS);
|
||||
this.data = data;
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetConsumerJoinAttemptsRequest build(short version) {
|
||||
return new GetConsumerJoinAttemptsRequest(data, version);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return data.toString();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,62 @@
|
|||
/*
|
||||
* Copyright 2025, AutoMQ HK Limited.
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.kafka.common.requests.s3;
|
||||
|
||||
import org.apache.kafka.common.message.GetConsumerJoinAttemptsResponseData;
|
||||
import org.apache.kafka.common.protocol.ApiKeys;
|
||||
import org.apache.kafka.common.protocol.ByteBufferAccessor;
|
||||
import org.apache.kafka.common.protocol.Errors;
|
||||
import org.apache.kafka.common.requests.AbstractResponse;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Map;
|
||||
|
||||
public class GetConsumerJoinAttemptsResponse extends AbstractResponse {
|
||||
private final GetConsumerJoinAttemptsResponseData data;
|
||||
|
||||
public GetConsumerJoinAttemptsResponse(GetConsumerJoinAttemptsResponseData data) {
|
||||
super(ApiKeys.GET_CONSUMER_JOIN_ATTEMPTS);
|
||||
this.data = data;
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetConsumerJoinAttemptsResponseData data() {
|
||||
return data;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<Errors, Integer> errorCounts() {
|
||||
return errorCounts(Errors.forCode(data.errorCode()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public int throttleTimeMs() {
|
||||
return 0; // No throttling for this API
|
||||
}
|
||||
|
||||
@Override
|
||||
public void maybeSetThrottleTimeMs(int throttleTimeMs) {
|
||||
// No throttling for this API
|
||||
}
|
||||
|
||||
public static GetConsumerJoinAttemptsResponse parse(ByteBuffer buffer, short version) {
|
||||
return new GetConsumerJoinAttemptsResponse(new GetConsumerJoinAttemptsResponseData(new ByteBufferAccessor(buffer), version));
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,33 @@
|
|||
// Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
// contributor license agreements. See the NOTICE file distributed with
|
||||
// this work for additional information regarding copyright ownership.
|
||||
// The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
// (the "License"); you may not use this file except in compliance with
|
||||
// the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
{
|
||||
"apiKey": 603,
|
||||
"type": "request",
|
||||
"listeners": [
|
||||
"broker"
|
||||
],
|
||||
"name": "GetConsumerJoinAttemptsRequest",
|
||||
"validVersions": "0",
|
||||
"flexibleVersions": "0+",
|
||||
"fields": [
|
||||
{
|
||||
"name": "GroupId",
|
||||
"type": "string",
|
||||
"versions": "0+",
|
||||
"about": "The group identifier to query join attempts for."
|
||||
}
|
||||
]
|
||||
}
|
||||
|
|
@ -0,0 +1,81 @@
|
|||
// Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
// contributor license agreements. See the NOTICE file distributed with
|
||||
// this work for additional information regarding copyright ownership.
|
||||
// The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
// (the "License"); you may not use this file except in compliance with
|
||||
// the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
{
|
||||
"apiKey": 603,
|
||||
"type": "response",
|
||||
"name": "GetConsumerJoinAttemptsResponse",
|
||||
"validVersions": "0",
|
||||
"flexibleVersions": "0+",
|
||||
"fields": [
|
||||
{
|
||||
"name": "GroupId",
|
||||
"type": "string",
|
||||
"versions": "0+",
|
||||
"about": "The group identifier."
|
||||
},
|
||||
{
|
||||
"name": "ErrorCode",
|
||||
"type": "int16",
|
||||
"versions": "0+",
|
||||
"about": "The error code, 0 if no error."
|
||||
},
|
||||
{
|
||||
"name": "ErrorMessage",
|
||||
"type": "string",
|
||||
"versions": "0+",
|
||||
"nullableVersions": "0+",
|
||||
"about": "The error message if any."
|
||||
},
|
||||
{
|
||||
"name": "JoinAttempts",
|
||||
"type": "[]JoinAttempt",
|
||||
"versions": "0+",
|
||||
"about": "List of consumer join attempts.",
|
||||
"fields": [
|
||||
{
|
||||
"name": "MemberId",
|
||||
"type": "string",
|
||||
"versions": "0+",
|
||||
"about": "The member ID of the consumer."
|
||||
},
|
||||
{
|
||||
"name": "ClientId",
|
||||
"type": "string",
|
||||
"versions": "0+",
|
||||
"about": "The client ID of the consumer."
|
||||
},
|
||||
{
|
||||
"name": "ClientAddress",
|
||||
"type": "string",
|
||||
"versions": "0+",
|
||||
"about": "The client address."
|
||||
},
|
||||
{
|
||||
"name": "Timestamp",
|
||||
"type": "int64",
|
||||
"versions": "0+",
|
||||
"about": "Timestamp of the join attempt."
|
||||
},
|
||||
{
|
||||
"name": "SessionTimeoutMs",
|
||||
"type": "int32",
|
||||
"versions": "0+",
|
||||
"about": "Session timeout in milliseconds."
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
|
|
@ -27,7 +27,7 @@ import org.apache.kafka.common.record._
|
|||
import org.apache.kafka.common.replica.ClientMetadata
|
||||
import org.apache.kafka.common.replica.ClientMetadata.DefaultClientMetadata
|
||||
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
|
||||
import org.apache.kafka.common.requests.s3.{AutomqGetPartitionSnapshotRequest, AutomqUpdateGroupRequest, AutomqUpdateGroupResponse, AutomqZoneRouterRequest}
|
||||
import org.apache.kafka.common.requests.s3.{AutomqGetPartitionSnapshotRequest, AutomqUpdateGroupRequest, AutomqUpdateGroupResponse, AutomqZoneRouterRequest, GetConsumerJoinAttemptsRequest, GetConsumerJoinAttemptsResponse}
|
||||
import org.apache.kafka.common.requests.{AbstractResponse, DeleteTopicsRequest, DeleteTopicsResponse, FetchRequest, FetchResponse, ProduceRequest, ProduceResponse, RequestUtils}
|
||||
import org.apache.kafka.common.resource.Resource.CLUSTER_NAME
|
||||
import org.apache.kafka.common.resource.ResourceType.{CLUSTER, TOPIC, TRANSACTIONAL_ID}
|
||||
|
|
@ -179,6 +179,7 @@ class ElasticKafkaApis(
|
|||
case ApiKeys.DELETE_TOPICS => maybeForwardTopicDeletionToController(request, handleDeleteTopicsRequest)
|
||||
case ApiKeys.GET_NEXT_NODE_ID => forwardToControllerOrFail(request)
|
||||
case ApiKeys.AUTOMQ_UPDATE_GROUP => handleUpdateGroupRequest(request, requestLocal)
|
||||
case ApiKeys.GET_CONSUMER_JOIN_ATTEMPTS => handleGetConsumerJoinAttemptsRequest(request, requestLocal)
|
||||
|
||||
case _ =>
|
||||
throw new IllegalStateException("Message conversion info is recorded only for Produce/Fetch requests")
|
||||
|
|
@ -208,6 +209,7 @@ class ElasticKafkaApis(
|
|||
| ApiKeys.GET_NEXT_NODE_ID
|
||||
| ApiKeys.AUTOMQ_ZONE_ROUTER
|
||||
| ApiKeys.AUTOMQ_UPDATE_GROUP
|
||||
| ApiKeys.GET_CONSUMER_JOIN_ATTEMPTS
|
||||
| ApiKeys.AUTOMQ_GET_PARTITION_SNAPSHOT => handleExtensionRequest(request, requestLocal)
|
||||
case _ => super.handle(request, requestLocal)
|
||||
}
|
||||
|
|
@ -438,6 +440,18 @@ class ElasticKafkaApis(
|
|||
})
|
||||
}
|
||||
|
||||
def handleGetConsumerJoinAttemptsRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
|
||||
val getConsumerJoinAttemptsRequest = request.body[GetConsumerJoinAttemptsRequest]
|
||||
groupCoordinator.getConsumerJoinAttempts(request.context, getConsumerJoinAttemptsRequest.data(), requestLocal.bufferSupplier)
|
||||
.whenComplete((response, ex) => {
|
||||
if (ex != null) {
|
||||
requestHelper.sendMaybeThrottle(request, getConsumerJoinAttemptsRequest.getErrorResponse(ex))
|
||||
} else {
|
||||
requestHelper.sendMaybeThrottle(request, new GetConsumerJoinAttemptsResponse(response))
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
def handleZoneRouterRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
|
||||
val zoneRouterRequest = request.body[AutomqZoneRouterRequest]
|
||||
trafficInterceptor.handleZoneRouterRequest(zoneRouterRequest.data()).thenAccept(response => {
|
||||
|
|
|
|||
|
|
@ -19,6 +19,8 @@ package org.apache.kafka.coordinator.group;
|
|||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.apache.kafka.common.message.AutomqUpdateGroupRequestData;
|
||||
import org.apache.kafka.common.message.AutomqUpdateGroupResponseData;
|
||||
import org.apache.kafka.common.message.GetConsumerJoinAttemptsRequestData;
|
||||
import org.apache.kafka.common.message.GetConsumerJoinAttemptsResponseData;
|
||||
import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData;
|
||||
import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
|
||||
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
|
||||
|
|
@ -432,5 +434,21 @@ public interface GroupCoordinator {
|
|||
AutomqUpdateGroupRequestData request,
|
||||
BufferSupplier bufferSupplier
|
||||
);
|
||||
|
||||
/**
|
||||
* Get consumer join attempts for a group
|
||||
*
|
||||
* @param context The coordinator request context.
|
||||
* @param request The GetConsumerJoinAttemptsRequestData data.
|
||||
* @param bufferSupplier The buffer supplier tight to the request thread.
|
||||
*
|
||||
* @return A future yielding the response.
|
||||
* The error code(s) of the response are set to indicate the error(s) occurred during the execution.
|
||||
*/
|
||||
CompletableFuture<GetConsumerJoinAttemptsResponseData> getConsumerJoinAttempts(
|
||||
RequestContext context,
|
||||
GetConsumerJoinAttemptsRequestData request,
|
||||
BufferSupplier bufferSupplier
|
||||
);
|
||||
// AutoMQ injection end
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue