Compare commits

...

1 Commits

Author SHA1 Message Date
Rancho-7 892a10824e
feat: license 2025-12-26 15:01:27 +08:00
40 changed files with 1486 additions and 4 deletions

View File

@ -1739,6 +1739,24 @@ public interface Admin extends AutoCloseable {
* @return {@link UpdateGroupResult}
*/
UpdateGroupResult updateGroup(String groupId, UpdateGroupSpec groupSpec, UpdateGroupOptions options);
default UpdateLicenseResult updateLicense(String license) {
return updateLicense(license, new UpdateLicenseOptions());
}
UpdateLicenseResult updateLicense(String license, UpdateLicenseOptions options);
default DescribeLicenseResult describeLicense() {
return describeLicense(new DescribeLicenseOptions());
}
DescribeLicenseResult describeLicense(DescribeLicenseOptions options);
default ExportClusterManifestResult exportClusterManifest() {
return exportClusterManifest(new ExportClusterManifestOptions());
}
ExportClusterManifestResult exportClusterManifest(ExportClusterManifestOptions options);
// AutoMQ inject end
/**

View File

@ -0,0 +1,31 @@
/*
* Copyright 2025, AutoMQ HK Limited.
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.annotation.InterfaceStability;
/**
* Options for {@link Admin#describeLicense(DescribeLicenseOptions)}.
*
* The API of this class is evolving, see {@link Admin} for details.
*/
@InterfaceStability.Evolving
public class DescribeLicenseOptions extends AbstractOptions<DescribeLicenseOptions> {
}

View File

@ -0,0 +1,44 @@
/*
* Copyright 2025, AutoMQ HK Limited.
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.annotation.InterfaceStability;
/**
* The result of the {@link Admin#describeLicense(DescribeLicenseOptions)} call.
*
* The API of this class is evolving, see {@link Admin} for details.
*/
@InterfaceStability.Evolving
public class DescribeLicenseResult {
private final KafkaFuture<String> future;
DescribeLicenseResult(KafkaFuture<String> future) {
this.future = future;
}
/**
* Return a future which returns the current license.
*/
public KafkaFuture<String> license() {
return future;
}
}

View File

@ -0,0 +1,31 @@
/*
* Copyright 2025, AutoMQ HK Limited.
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.annotation.InterfaceStability;
/**
* Options for {@link Admin#exportClusterManifest(ExportClusterManifestOptions)}.
*
* The API of this class is evolving, see {@link Admin} for details.
*/
@InterfaceStability.Evolving
public class ExportClusterManifestOptions extends AbstractOptions<ExportClusterManifestOptions> {
}

View File

@ -0,0 +1,44 @@
/*
* Copyright 2025, AutoMQ HK Limited.
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.annotation.InterfaceStability;
/**
* The result of the {@link Admin#exportClusterManifest(ExportClusterManifestOptions)} call.
*
* The API of this class is evolving, see {@link Admin} for details.
*/
@InterfaceStability.Evolving
public class ExportClusterManifestResult {
private final KafkaFuture<String> future;
ExportClusterManifestResult(KafkaFuture<String> future) {
this.future = future;
}
/**
* Return a future which returns the exported cluster identity manifest.
*/
public KafkaFuture<String> manifest() {
return future;
}
}

View File

@ -320,5 +320,20 @@ public class ForwardingAdmin implements Admin {
return delegate.updateGroup(groupId, groupSpec, options);
}
@Override
public UpdateLicenseResult updateLicense(String license, UpdateLicenseOptions options) {
return delegate.updateLicense(license);
}
@Override
public DescribeLicenseResult describeLicense(DescribeLicenseOptions options) {
return delegate.describeLicense(options);
}
@Override
public ExportClusterManifestResult exportClusterManifest(ExportClusterManifestOptions options) {
return delegate.exportClusterManifest(options);
}
// AutoMQ inject end
}

View File

@ -139,6 +139,7 @@ import org.apache.kafka.common.message.DescribeClusterRequestData;
import org.apache.kafka.common.message.DescribeClusterResponseData;
import org.apache.kafka.common.message.DescribeConfigsRequestData;
import org.apache.kafka.common.message.DescribeConfigsResponseData;
import org.apache.kafka.common.message.DescribeLicenseRequestData;
import org.apache.kafka.common.message.DescribeLogDirsRequestData;
import org.apache.kafka.common.message.DescribeLogDirsRequestData.DescribableLogDirTopic;
import org.apache.kafka.common.message.DescribeLogDirsResponseData;
@ -152,6 +153,7 @@ import org.apache.kafka.common.message.DescribeUserScramCredentialsRequestData;
import org.apache.kafka.common.message.DescribeUserScramCredentialsRequestData.UserName;
import org.apache.kafka.common.message.DescribeUserScramCredentialsResponseData;
import org.apache.kafka.common.message.ExpireDelegationTokenRequestData;
import org.apache.kafka.common.message.ExportClusterManifestRequestData;
import org.apache.kafka.common.message.GetNextNodeIdRequestData;
import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData;
import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity;
@ -165,6 +167,7 @@ import org.apache.kafka.common.message.RenewDelegationTokenRequestData;
import org.apache.kafka.common.message.UnregisterBrokerRequestData;
import org.apache.kafka.common.message.UpdateFeaturesRequestData;
import org.apache.kafka.common.message.UpdateFeaturesResponseData.UpdatableFeatureResult;
import org.apache.kafka.common.message.UpdateLicenseRequestData;
import org.apache.kafka.common.metrics.KafkaMetricsContext;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
@ -250,8 +253,14 @@ import org.apache.kafka.common.requests.UpdateFeaturesRequest;
import org.apache.kafka.common.requests.UpdateFeaturesResponse;
import org.apache.kafka.common.requests.s3.AutomqGetNodesRequest;
import org.apache.kafka.common.requests.s3.AutomqGetNodesResponse;
import org.apache.kafka.common.requests.s3.DescribeLicenseRequest;
import org.apache.kafka.common.requests.s3.DescribeLicenseResponse;
import org.apache.kafka.common.requests.s3.ExportClusterManifestRequest;
import org.apache.kafka.common.requests.s3.ExportClusterManifestResponse;
import org.apache.kafka.common.requests.s3.GetNextNodeIdRequest;
import org.apache.kafka.common.requests.s3.GetNextNodeIdResponse;
import org.apache.kafka.common.requests.s3.UpdateLicenseRequest;
import org.apache.kafka.common.requests.s3.UpdateLicenseResponse;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.scram.internals.ScramFormatter;
import org.apache.kafka.common.security.token.delegation.DelegationToken;
@ -2464,6 +2473,125 @@ public class KafkaAdminClient extends AdminClient {
}, now);
return new GetNextNodeIdResult(nodeIdFuture);
}
@Override
public DescribeLicenseResult describeLicense(final DescribeLicenseOptions options) {
final KafkaFutureImpl<String> future = new KafkaFutureImpl<>();
final long now = time.milliseconds();
final Call call = new Call("describeLicense", calcDeadlineMs(now, options.timeoutMs()),
new ControllerNodeProvider()) {
@Override
DescribeLicenseRequest.Builder createRequest(int timeoutMs) {
return new DescribeLicenseRequest.Builder(new DescribeLicenseRequestData());
}
@Override
void handleResponse(AbstractResponse abstractResponse) {
final DescribeLicenseResponse response =
(DescribeLicenseResponse) abstractResponse;
future.complete(response.data().license());
}
@Override
void handleFailure(Throwable throwable) {
future.completeExceptionally(throwable);
}
};
runnable.call(call, now);
return new DescribeLicenseResult(future);
}
@Override
public UpdateLicenseResult updateLicense(final String license,
final UpdateLicenseOptions options) {
if (license == null || license.isEmpty()) {
throw new IllegalArgumentException("License can not be null or empty.");
}
final KafkaFutureImpl<Void> future = new KafkaFutureImpl<>();
final long now = time.milliseconds();
final Call call = new Call("updateLicense", calcDeadlineMs(now, options.timeoutMs()),
new ControllerNodeProvider(true)) {
@Override
UpdateLicenseRequest.Builder createRequest(int timeoutMs) {
return new UpdateLicenseRequest.Builder(
new UpdateLicenseRequestData()
.setLicense(license));
}
@Override
void handleResponse(AbstractResponse abstractResponse) {
final UpdateLicenseResponse response =
(UpdateLicenseResponse) abstractResponse;
Errors topLevelError = Errors.forCode(response.data().errorCode());
switch (topLevelError) {
case NONE:
future.complete(null);
break;
case NOT_CONTROLLER:
handleNotControllerError(topLevelError);
break;
default:
future.completeExceptionally(topLevelError.exception(response.data().errorMessage()));
break;
}
}
@Override
void handleFailure(Throwable throwable) {
future.completeExceptionally(throwable);
}
};
runnable.call(call, now);
return new UpdateLicenseResult(future);
}
@Override
public ExportClusterManifestResult exportClusterManifest(final ExportClusterManifestOptions options) {
final KafkaFutureImpl<String> future = new KafkaFutureImpl<>();
final long now = time.milliseconds();
final Call call = new Call("exportClusterManifest", calcDeadlineMs(now, options.timeoutMs()),
new ControllerNodeProvider(true)) {
@Override
ExportClusterManifestRequest.Builder createRequest(int timeoutMs) {
return new ExportClusterManifestRequest.Builder(new ExportClusterManifestRequestData());
}
@Override
void handleResponse(AbstractResponse abstractResponse) {
final ExportClusterManifestResponse response =
(ExportClusterManifestResponse) abstractResponse;
Errors topLevelError = Errors.forCode(response.data().errorCode());
switch (topLevelError) {
case NONE:
future.complete(response.data().manifest());
break;
case NOT_CONTROLLER:
handleNotControllerError(topLevelError);
break;
default:
future.completeExceptionally(topLevelError.exception());
break;
}
}
@Override
void handleFailure(Throwable throwable) {
future.completeExceptionally(throwable);
}
};
runnable.call(call, now);
return new ExportClusterManifestResult(future);
}
// AutoMQ for Kafka inject end
private TopicDescription getTopicDescriptionFromCluster(Cluster cluster, String topicName, Uuid topicId,

View File

@ -0,0 +1,31 @@
/*
* Copyright 2025, AutoMQ HK Limited.
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.annotation.InterfaceStability;
/**
* Options for {@link Admin#updateLicense(String, UpdateLicenseOptions)}.
*
* The API of this class is evolving, see {@link Admin} for details.
*/
@InterfaceStability.Evolving
public class UpdateLicenseOptions extends AbstractOptions<UpdateLicenseOptions> {
}

View File

@ -0,0 +1,44 @@
/*
* Copyright 2025, AutoMQ HK Limited.
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.annotation.InterfaceStability;
/**
* The result of the {@link Admin#updateLicense(String, UpdateLicenseOptions)} call.
*
* The API of this class is evolving, see {@link Admin} for details.
*/
@InterfaceStability.Evolving
public class UpdateLicenseResult {
private final KafkaFuture<Void> future;
UpdateLicenseResult(KafkaFuture<Void> future) {
this.future = future;
}
/**
* Return a future which succeeds if the license update succeeds.
*/
public KafkaFuture<Void> all() {
return future;
}
}

View File

@ -145,6 +145,9 @@ public enum ApiKeys {
GET_KVS(ApiMessageType.GET_KVS, false, true),
PUT_KVS(ApiMessageType.PUT_KVS, false, true),
DELETE_KVS(ApiMessageType.DELETE_KVS, false, true),
UPDATE_LICENSE(ApiMessageType.UPDATE_LICENSE, false, true),
DESCRIBE_LICENSE(ApiMessageType.DESCRIBE_LICENSE, false, true),
EXPORT_CLUSTER_MANIFEST(ApiMessageType.EXPORT_CLUSTER_MANIFEST, false, true),
AUTOMQ_REGISTER_NODE(ApiMessageType.AUTOMQ_REGISTER_NODE, false, false),
AUTOMQ_GET_NODES(ApiMessageType.AUTOMQ_GET_NODES, false, true),
AUTOMQ_ZONE_ROUTER(ApiMessageType.AUTOMQ_ZONE_ROUTER, false, false),

View File

@ -34,7 +34,9 @@ import org.apache.kafka.common.requests.s3.CommitStreamSetObjectRequest;
import org.apache.kafka.common.requests.s3.CreateStreamsRequest;
import org.apache.kafka.common.requests.s3.DeleteKVsRequest;
import org.apache.kafka.common.requests.s3.DeleteStreamsRequest;
import org.apache.kafka.common.requests.s3.DescribeLicenseRequest;
import org.apache.kafka.common.requests.s3.DescribeStreamsRequest;
import org.apache.kafka.common.requests.s3.ExportClusterManifestRequest;
import org.apache.kafka.common.requests.s3.GetKVsRequest;
import org.apache.kafka.common.requests.s3.GetNextNodeIdRequest;
import org.apache.kafka.common.requests.s3.GetOpeningStreamsRequest;
@ -42,6 +44,7 @@ import org.apache.kafka.common.requests.s3.OpenStreamsRequest;
import org.apache.kafka.common.requests.s3.PrepareS3ObjectRequest;
import org.apache.kafka.common.requests.s3.PutKVsRequest;
import org.apache.kafka.common.requests.s3.TrimStreamsRequest;
import org.apache.kafka.common.requests.s3.UpdateLicenseRequest;
import java.nio.ByteBuffer;
import java.util.Map;
@ -385,6 +388,12 @@ public abstract class AbstractRequest implements AbstractRequestResponse {
return DescribeStreamsRequest.parse(buffer, apiVersion);
case AUTOMQ_UPDATE_GROUP:
return AutomqUpdateGroupRequest.parse(buffer, apiVersion);
case UPDATE_LICENSE:
return UpdateLicenseRequest.parse(buffer, apiVersion);
case DESCRIBE_LICENSE:
return DescribeLicenseRequest.parse(buffer, apiVersion);
case EXPORT_CLUSTER_MANIFEST:
return ExportClusterManifestRequest.parse(buffer, apiVersion);
// AutoMQ for Kafka inject end
case SHARE_GROUP_HEARTBEAT:

View File

@ -32,7 +32,9 @@ import org.apache.kafka.common.requests.s3.CommitStreamSetObjectResponse;
import org.apache.kafka.common.requests.s3.CreateStreamsResponse;
import org.apache.kafka.common.requests.s3.DeleteKVsResponse;
import org.apache.kafka.common.requests.s3.DeleteStreamsResponse;
import org.apache.kafka.common.requests.s3.DescribeLicenseResponse;
import org.apache.kafka.common.requests.s3.DescribeStreamsResponse;
import org.apache.kafka.common.requests.s3.ExportClusterManifestResponse;
import org.apache.kafka.common.requests.s3.GetKVsResponse;
import org.apache.kafka.common.requests.s3.GetNextNodeIdResponse;
import org.apache.kafka.common.requests.s3.GetOpeningStreamsResponse;
@ -40,6 +42,7 @@ import org.apache.kafka.common.requests.s3.OpenStreamsResponse;
import org.apache.kafka.common.requests.s3.PrepareS3ObjectResponse;
import org.apache.kafka.common.requests.s3.PutKVsResponse;
import org.apache.kafka.common.requests.s3.TrimStreamsResponse;
import org.apache.kafka.common.requests.s3.UpdateLicenseResponse;
import java.nio.ByteBuffer;
import java.util.Collection;
@ -322,6 +325,12 @@ public abstract class AbstractResponse implements AbstractRequestResponse {
return DescribeStreamsResponse.parse(responseBuffer, version);
case AUTOMQ_UPDATE_GROUP:
return AutomqUpdateGroupResponse.parse(responseBuffer, version);
case UPDATE_LICENSE:
return UpdateLicenseResponse.parse(responseBuffer, version);
case DESCRIBE_LICENSE:
return DescribeLicenseResponse.parse(responseBuffer, version);
case EXPORT_CLUSTER_MANIFEST:
return ExportClusterManifestResponse.parse(responseBuffer, version);
// AutoMQ for Kafka inject end
case SHARE_GROUP_HEARTBEAT:

View File

@ -0,0 +1,77 @@
/*
* Copyright 2025, AutoMQ HK Limited.
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.requests.s3;
import org.apache.kafka.common.message.DescribeLicenseRequestData;
import org.apache.kafka.common.message.DescribeLicenseResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.ApiError;
import java.nio.ByteBuffer;
public class DescribeLicenseRequest extends AbstractRequest {
public static class Builder extends AbstractRequest.Builder<DescribeLicenseRequest> {
private final DescribeLicenseRequestData data;
public Builder(DescribeLicenseRequestData data) {
super(ApiKeys.DESCRIBE_LICENSE);
this.data = data;
}
@Override
public DescribeLicenseRequest build(short version) {
return new DescribeLicenseRequest(data, version);
}
@Override
public String toString() {
return data.toString();
}
}
private final DescribeLicenseRequestData data;
public DescribeLicenseRequest(DescribeLicenseRequestData data, short version) {
super(ApiKeys.DESCRIBE_LICENSE, version);
this.data = data;
}
@Override
public DescribeLicenseResponse getErrorResponse(int throttleTimeMs, Throwable e) {
ApiError apiError = ApiError.fromThrowable(e);
DescribeLicenseResponseData response = new DescribeLicenseResponseData()
.setErrorCode(apiError.error().code())
.setThrottleTimeMs(throttleTimeMs);
return new DescribeLicenseResponse(response);
}
@Override
public DescribeLicenseRequestData data() {
return data;
}
public static DescribeLicenseRequest parse(ByteBuffer buffer, short version) {
return new DescribeLicenseRequest(new DescribeLicenseRequestData(
new ByteBufferAccessor(buffer), version), version);
}
}

View File

@ -0,0 +1,66 @@
/*
* Copyright 2025, AutoMQ HK Limited.
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.requests.s3;
import org.apache.kafka.common.message.DescribeLicenseResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractResponse;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
public class DescribeLicenseResponse extends AbstractResponse {
private final DescribeLicenseResponseData data;
public DescribeLicenseResponse(DescribeLicenseResponseData data) {
super(ApiKeys.DESCRIBE_LICENSE);
this.data = data;
}
@Override
public DescribeLicenseResponseData data() {
return data;
}
@Override
public Map<Errors, Integer> errorCounts() {
Map<Errors, Integer> errorCounts = new HashMap<>();
updateErrorCounts(errorCounts, Errors.forCode(data.errorCode()));
return errorCounts;
}
@Override
public int throttleTimeMs() {
return data.throttleTimeMs();
}
@Override
public void maybeSetThrottleTimeMs(int throttleTimeMs) {
data.setThrottleTimeMs(throttleTimeMs);
}
public static DescribeLicenseResponse parse(ByteBuffer buffer, short version) {
return new DescribeLicenseResponse(new DescribeLicenseResponseData(
new ByteBufferAccessor(buffer), version));
}
}

View File

@ -0,0 +1,77 @@
/*
* Copyright 2025, AutoMQ HK Limited.
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.requests.s3;
import org.apache.kafka.common.message.ExportClusterManifestRequestData;
import org.apache.kafka.common.message.ExportClusterManifestResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.ApiError;
import java.nio.ByteBuffer;
public class ExportClusterManifestRequest extends AbstractRequest {
public static class Builder extends AbstractRequest.Builder<ExportClusterManifestRequest> {
private final ExportClusterManifestRequestData data;
public Builder(ExportClusterManifestRequestData data) {
super(ApiKeys.EXPORT_CLUSTER_MANIFEST);
this.data = data;
}
@Override
public ExportClusterManifestRequest build(short version) {
return new ExportClusterManifestRequest(data, version);
}
@Override
public String toString() {
return data.toString();
}
}
private final ExportClusterManifestRequestData data;
public ExportClusterManifestRequest(ExportClusterManifestRequestData data, short version) {
super(ApiKeys.EXPORT_CLUSTER_MANIFEST, version);
this.data = data;
}
@Override
public ExportClusterManifestResponse getErrorResponse(int throttleTimeMs, Throwable e) {
ApiError apiError = ApiError.fromThrowable(e);
ExportClusterManifestResponseData response = new ExportClusterManifestResponseData()
.setErrorCode(apiError.error().code())
.setThrottleTimeMs(throttleTimeMs);
return new ExportClusterManifestResponse(response);
}
@Override
public ExportClusterManifestRequestData data() {
return data;
}
public static ExportClusterManifestRequest parse(ByteBuffer buffer, short version) {
return new ExportClusterManifestRequest(new ExportClusterManifestRequestData(
new ByteBufferAccessor(buffer), version), version);
}
}

View File

@ -0,0 +1,66 @@
/*
* Copyright 2025, AutoMQ HK Limited.
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.requests.s3;
import org.apache.kafka.common.message.ExportClusterManifestResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractResponse;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
public class ExportClusterManifestResponse extends AbstractResponse {
private final ExportClusterManifestResponseData data;
public ExportClusterManifestResponse(ExportClusterManifestResponseData data) {
super(ApiKeys.EXPORT_CLUSTER_MANIFEST);
this.data = data;
}
@Override
public ExportClusterManifestResponseData data() {
return data;
}
@Override
public Map<Errors, Integer> errorCounts() {
Map<Errors, Integer> errorCounts = new HashMap<>();
updateErrorCounts(errorCounts, Errors.forCode(data.errorCode()));
return errorCounts;
}
@Override
public int throttleTimeMs() {
return data.throttleTimeMs();
}
@Override
public void maybeSetThrottleTimeMs(int throttleTimeMs) {
data.setThrottleTimeMs(throttleTimeMs);
}
public static ExportClusterManifestResponse parse(ByteBuffer buffer, short version) {
return new ExportClusterManifestResponse(new ExportClusterManifestResponseData(
new ByteBufferAccessor(buffer), version));
}
}

View File

@ -0,0 +1,78 @@
/*
* Copyright 2025, AutoMQ HK Limited.
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.requests.s3;
import org.apache.kafka.common.message.UpdateLicenseRequestData;
import org.apache.kafka.common.message.UpdateLicenseResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.ApiError;
import java.nio.ByteBuffer;
public class UpdateLicenseRequest extends AbstractRequest {
public static class Builder extends AbstractRequest.Builder<UpdateLicenseRequest> {
private final UpdateLicenseRequestData data;
public Builder(UpdateLicenseRequestData data) {
super(ApiKeys.UPDATE_LICENSE);
this.data = data;
}
@Override
public UpdateLicenseRequest build(short version) {
return new UpdateLicenseRequest(data, version);
}
@Override
public String toString() {
return data.toString();
}
}
private final UpdateLicenseRequestData data;
public UpdateLicenseRequest(UpdateLicenseRequestData data, short version) {
super(ApiKeys.UPDATE_LICENSE, version);
this.data = data;
}
@Override
public UpdateLicenseResponse getErrorResponse(int throttleTimeMs, Throwable e) {
ApiError apiError = ApiError.fromThrowable(e);
UpdateLicenseResponseData response = new UpdateLicenseResponseData()
.setErrorCode(apiError.error().code())
.setErrorMessage(apiError.message())
.setThrottleTimeMs(throttleTimeMs);
return new UpdateLicenseResponse(response);
}
@Override
public UpdateLicenseRequestData data() {
return data;
}
public static UpdateLicenseRequest parse(ByteBuffer buffer, short version) {
return new UpdateLicenseRequest(new UpdateLicenseRequestData(
new ByteBufferAccessor(buffer), version), version);
}
}

View File

@ -0,0 +1,66 @@
/*
* Copyright 2025, AutoMQ HK Limited.
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.requests.s3;
import org.apache.kafka.common.message.UpdateLicenseResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractResponse;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
public class UpdateLicenseResponse extends AbstractResponse {
private final UpdateLicenseResponseData data;
public UpdateLicenseResponse(UpdateLicenseResponseData data) {
super(ApiKeys.UPDATE_LICENSE);
this.data = data;
}
@Override
public UpdateLicenseResponseData data() {
return data;
}
@Override
public Map<Errors, Integer> errorCounts() {
Map<Errors, Integer> errorCounts = new HashMap<>();
updateErrorCounts(errorCounts, Errors.forCode(data.errorCode()));
return errorCounts;
}
@Override
public int throttleTimeMs() {
return data.throttleTimeMs();
}
@Override
public void maybeSetThrottleTimeMs(int throttleTimeMs) {
data.setThrottleTimeMs(throttleTimeMs);
}
public static UpdateLicenseResponse parse(ByteBuffer buffer, short version) {
return new UpdateLicenseResponse(new UpdateLicenseResponseData(
new ByteBufferAccessor(buffer), version));
}
}

View File

@ -0,0 +1,28 @@
/*
* Copyright 2025, AutoMQ HK Limited.
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
{
"apiKey": 520,
"type": "request",
"listeners": ["controller", "broker"],
"name": "DescribeLicenseRequest",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": []
}

View File

@ -0,0 +1,31 @@
/*
* Copyright 2025, AutoMQ HK Limited.
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
{
"apiKey": 520,
"type": "response",
"name": "DescribeLicenseResponse",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "ErrorCode", "type": "int16", "versions": "0+" },
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+" },
{ "name": "License", "type": "string", "versions": "0+", "default": "" }
]
}

View File

@ -0,0 +1,28 @@
/*
* Copyright 2025, AutoMQ HK Limited.
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
{
"apiKey": 521,
"type": "request",
"listeners": ["controller", "broker"],
"name": "ExportClusterManifestRequest",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": []
}

View File

@ -0,0 +1,31 @@
/*
* Copyright 2025, AutoMQ HK Limited.
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
{
"apiKey": 521,
"type": "response",
"name": "ExportClusterManifestResponse",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "ErrorCode", "type": "int16", "versions": "0+" },
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+" },
{ "name": "Manifest", "type": "string", "versions": "0+", "default": "", "about": "The exported cluster identity manifest" }
]
}

View File

@ -0,0 +1,30 @@
/*
* Copyright 2025, AutoMQ HK Limited.
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
{
"apiKey": 519,
"type": "request",
"listeners": ["controller", "broker"],
"name": "UpdateLicenseRequest",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "License", "type": "string", "versions": "0+" }
]
}

View File

@ -0,0 +1,31 @@
/*
* Copyright 2025, AutoMQ HK Limited.
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
{
"apiKey": 519,
"type": "response",
"name": "UpdateLicenseResponse",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "ErrorCode", "type": "int16", "versions": "0+" },
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+" },
{ "name": "ErrorMessage", "type": "string", "versions": "0+", "default": "" }
]
}

View File

@ -1450,6 +1450,21 @@ public class MockAdminClient extends AdminClient {
throw new UnsupportedOperationException();
}
@Override
public UpdateLicenseResult updateLicense(String license, UpdateLicenseOptions options) {
throw new UnsupportedOperationException();
}
@Override
public DescribeLicenseResult describeLicense(DescribeLicenseOptions options) {
throw new UnsupportedOperationException();
}
@Override
public ExportClusterManifestResult exportClusterManifest(ExportClusterManifestOptions options) {
throw new UnsupportedOperationException();
}
// AutoMQ inject end
}

View File

@ -0,0 +1,6 @@
package kafka.automq.license;
import org.apache.kafka.image.publisher.MetadataPublisher;
public interface LicenseListener extends MetadataPublisher {
}

View File

@ -17,8 +17,10 @@
package kafka.server;
import kafka.automq.table.metric.TableTopicMetricsManager;
import kafka.server.streamaspect.LicenseManagerProvider;
import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.controller.LicenseManager;
import org.apache.kafka.server.ProcessRole;
import org.apache.kafka.server.metrics.KafkaYammerMetrics;
import org.apache.kafka.server.metrics.s3stream.S3StreamKafkaMetricsManager;
@ -94,6 +96,19 @@ public final class TelemetrySupport {
}
});
S3StreamKafkaMetricsManager.setLicenseExpireDateSupplier(() -> {
try {
LicenseManager licenseManager = LicenseManagerProvider.get();
if (licenseManager != null) {
return licenseManager.getExpireDate();
}
return null;
} catch (Exception e) {
LOGGER.error("Failed to obtain license expiry date", e);
return null;
}
});
Meter meter = manager.getMeter();
MetricsLevel metricsLevel = parseMetricsLevel(config.s3MetricsLevel());
long metricsIntervalMs = (long) config.s3ExporterReportIntervalMs();

View File

@ -133,6 +133,9 @@ object RequestConvertToJson {
case req: AutomqGetPartitionSnapshotRequest => AutomqGetPartitionSnapshotRequestDataJsonConverter.write(req.data, request.version)
case req: GetNextNodeIdRequest => GetNextNodeIdRequestDataJsonConverter.write(req.data, request.version)
case req: DescribeStreamsRequest => DescribeStreamsRequestDataJsonConverter.write(req.data, request.version)
case req: UpdateLicenseRequest => UpdateLicenseRequestDataJsonConverter.write(req.data, request.version)
case req: DescribeLicenseRequest => DescribeLicenseRequestDataJsonConverter.write(req.data, request.version)
case req: ExportClusterManifestRequest => ExportClusterManifestRequestDataJsonConverter.write(req.data, request.version)
// AutoMQ for Kafka inject end
case req: AddRaftVoterRequest => AddRaftVoterRequestDataJsonConverter.write(req.data, request.version)
@ -249,6 +252,9 @@ object RequestConvertToJson {
case res: GetNextNodeIdResponse => GetNextNodeIdResponseDataJsonConverter.write(res.data, version)
case res: AutomqZoneRouterResponse => AutomqZoneRouterResponseDataJsonConverter.write(res.data, version)
case res: DescribeStreamsResponse => DescribeStreamsResponseDataJsonConverter.write(res.data, version)
case res: UpdateLicenseResponse => UpdateLicenseResponseDataJsonConverter.write(res.data, version)
case res: DescribeLicenseResponse => DescribeLicenseResponseDataJsonConverter.write(res.data, version)
case res: ExportClusterManifestResponse => ExportClusterManifestResponseDataJsonConverter.write(res.data, version)
// AutoMQ for Kafka inject end
case res: AddRaftVoterResponse => AddRaftVoterResponseDataJsonConverter.write(res.data, version)

View File

@ -22,6 +22,7 @@ import kafka.automq.backpressure.{BackPressureConfig, BackPressureManager, Defau
import kafka.automq.failover.FailoverListener
import kafka.automq.kafkalinking.KafkaLinkingManager
import kafka.automq.interceptor.{NoopTrafficInterceptor, TrafficInterceptor}
import kafka.automq.license.LicenseListener
import kafka.automq.table.TableManager
import kafka.automq.zerozone.{ConfirmWALProvider, DefaultClientRackProvider, DefaultConfirmWALProvider, DefaultRouterChannelProvider, DefaultLinkRecordDecoder, RouterChannelProvider, ZeroZoneTrafficInterceptor}
import kafka.cluster.EndPoint
@ -587,6 +588,8 @@ class BrokerServer(
})
newFailoverListener(ElasticLogManager.INSTANCE.get.client)
newLicenseListener()
// AutoMQ inject end
// We're now ready to unfence the broker. This also allows this broker to transition
@ -896,6 +899,10 @@ class BrokerServer(
metadataLoader.installPublishers(util.List.of(failoverListener));
failoverListener
}
protected def newLicenseListener(): LicenseListener = {
null
}
// AutoMQ inject end
}

View File

@ -292,6 +292,7 @@ class ControllerServer(
setStreamClient(streamClient).
setExtension(c => quorumControllerExtension(c)).
setQuorumVoters(config.quorumVoters).
setLicenseManager(sharedServer.licenseManager).
setReplicaPlacer(replicaPlacer())
}
controller = controllerBuilder.build()

View File

@ -21,11 +21,13 @@ import com.automq.opentelemetry.AutoMQTelemetryManager
import kafka.raft.KafkaRaftManager
import kafka.server.Server.MetricsPrefix
import kafka.server.metadata.BrokerServerMetrics
import kafka.server.streamaspect.LicenseManagerProvider
import kafka.utils.{CoreUtils, Logging}
import org.apache.kafka.common.es.ElasticStreamSwitch
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.utils.{AppInfoParser, LogContext, Time}
import org.apache.kafka.controller.LicenseManager
import org.apache.kafka.controller.metrics.ControllerMetadataMetrics
import org.apache.kafka.image.MetadataProvenance
import org.apache.kafka.image.loader.MetadataLoader
@ -110,6 +112,7 @@ class SharedServer(
// AutoMQ for Kafka injection start
ElasticStreamSwitch.setSwitch(sharedServerConfig.elasticStreamEnabled)
@volatile var telemetryManager: AutoMQTelemetryManager = _
@volatile var licenseManager: LicenseManager = _
// AutoMQ for Kafka injection end
@volatile var metrics: Metrics = _metrics
@ -283,6 +286,7 @@ class SharedServer(
// AutoMQ inject start
telemetryManager = buildTelemetryManager(sharedServerConfig, clusterId)
licenseManager = LicenseManagerProvider.get()
// AutoMQ inject end
val _raftManager = new KafkaRaftManager[ApiMessageAndVersion](

View File

@ -8,7 +8,7 @@ import kafka.server.{ApiVersionManager, ControllerApis, KafkaConfig, RequestLoca
import org.apache.kafka.common.acl.AclOperation.CLUSTER_ACTION
import org.apache.kafka.common.errors.ApiException
import org.apache.kafka.common.internals.FatalExitError
import org.apache.kafka.common.message.{DeleteKVsResponseData, GetKVsResponseData, GetNextNodeIdResponseData, PutKVsResponseData}
import org.apache.kafka.common.message.{DeleteKVsResponseData, DescribeLicenseResponseData, ExportClusterManifestResponseData, GetKVsResponseData, GetNextNodeIdResponseData, PutKVsResponseData, UpdateLicenseResponseData}
import org.apache.kafka.common.protocol.Errors.{NONE, UNKNOWN_SERVER_ERROR}
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.s3._
@ -51,6 +51,9 @@ class ElasticControllerApis(
case ApiKeys.GET_KVS => handleGetKV(request)
case ApiKeys.PUT_KVS => handlePutKV(request)
case ApiKeys.DELETE_KVS => handleDeleteKV(request)
case ApiKeys.UPDATE_LICENSE => handleUpdateLicense(request)
case ApiKeys.DESCRIBE_LICENSE => handleDescribeLicense(request)
case ApiKeys.EXPORT_CLUSTER_MANIFEST => handleExportClusterManifest(request)
case ApiKeys.AUTOMQ_REGISTER_NODE => handleRegisterNode(request)
case ApiKeys.AUTOMQ_GET_NODES => handleGetNodes(request)
case ApiKeys.GET_NEXT_NODE_ID => handleGetNextNodeId(request)
@ -99,6 +102,9 @@ class ElasticControllerApis(
| ApiKeys.GET_KVS
| ApiKeys.PUT_KVS
| ApiKeys.DELETE_KVS
| ApiKeys.UPDATE_LICENSE
| ApiKeys.DESCRIBE_LICENSE
| ApiKeys.EXPORT_CLUSTER_MANIFEST
| ApiKeys.AUTOMQ_REGISTER_NODE
| ApiKeys.AUTOMQ_GET_NODES
| ApiKeys.GET_NEXT_NODE_ID
@ -361,6 +367,72 @@ class ElasticControllerApis(
}
}
def handleUpdateLicense(request: RequestChannel.Request): CompletableFuture[Unit] = {
val updateLicenseRequest = request.body[UpdateLicenseRequest]
val context = new ControllerRequestContext(request.context.header.data, request.context.principal,
OptionalLong.empty())
controller.updateLicense(context, updateLicenseRequest.data)
.handle[Unit] { (result, exception) =>
if (exception != null) {
requestHelper.handleError(request, exception)
} else if (result == null) {
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {
new UpdateLicenseResponse(new UpdateLicenseResponseData().
setThrottleTimeMs(requestThrottleMs).
setErrorCode(UNKNOWN_SERVER_ERROR.code))
})
} else {
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {
new UpdateLicenseResponse(result.setThrottleTimeMs(requestThrottleMs))
})
}
}
}
def handleDescribeLicense(request: RequestChannel.Request): CompletableFuture[Unit] = {
val describeLicenseRequest = request.body[DescribeLicenseRequest]
val context = new ControllerRequestContext(request.context.header.data, request.context.principal,
OptionalLong.empty())
controller.describeLicense(context, describeLicenseRequest.data)
.handle[Unit] { (result, exception) =>
if (exception != null) {
requestHelper.handleError(request, exception)
} else if (result == null) {
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {
new DescribeLicenseResponse(new DescribeLicenseResponseData().
setThrottleTimeMs(requestThrottleMs).
setErrorCode(UNKNOWN_SERVER_ERROR.code))
})
} else {
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {
new DescribeLicenseResponse(result.setThrottleTimeMs(requestThrottleMs))
})
}
}
}
def handleExportClusterManifest(request: RequestChannel.Request): CompletableFuture[Unit] = {
val exportClusterManifestRequest = request.body[ExportClusterManifestRequest]
val context = new ControllerRequestContext(request.context.header.data, request.context.principal,
OptionalLong.empty())
controller.exportClusterManifest(context, exportClusterManifestRequest.data)
.handle[Unit] { (result, exception) =>
if (exception != null) {
requestHelper.handleError(request, exception)
} else if (result == null) {
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {
new ExportClusterManifestResponse(new ExportClusterManifestResponseData().
setThrottleTimeMs(requestThrottleMs).
setErrorCode(UNKNOWN_SERVER_ERROR.code))
})
} else {
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {
new ExportClusterManifestResponse(result.setThrottleTimeMs(requestThrottleMs))
})
}
}
}
def handleRegisterNode(request: RequestChannel.Request): CompletableFuture[Unit] = {
val req = request.body[AutomqRegisterNodeRequest]
val context = new ControllerRequestContext(request.context.header.data, request.context.principal,

View File

@ -33,6 +33,7 @@ import org.apache.kafka.common.resource.Resource.CLUSTER_NAME
import org.apache.kafka.common.resource.ResourceType.{CLUSTER, TOPIC, TRANSACTIONAL_ID}
import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.{Node, TopicIdPartition, TopicPartition, Uuid}
import org.apache.kafka.controller.LicenseManager
import org.apache.kafka.coordinator.group.GroupCoordinator
import org.apache.kafka.server.ClientMetricsManager
import org.apache.kafka.server.authorizer.Authorizer
@ -91,6 +92,7 @@ class ElasticKafkaApis(
private var trafficInterceptor: TrafficInterceptor = new NoopTrafficInterceptor(this, metadataCache)
private var snapshotAwaitReadySupplier: Supplier[CompletableFuture[Void]] = () => CompletableFuture.completedFuture(null)
private val licenseManager: LicenseManager = LicenseManagerProvider.get();
/**
* Generate a map of topic -> [(partitionId, epochId)] based on provided topicsRequestData.
@ -179,6 +181,9 @@ class ElasticKafkaApis(
case ApiKeys.DELETE_TOPICS => maybeForwardTopicDeletionToController(request, handleDeleteTopicsRequest)
case ApiKeys.GET_NEXT_NODE_ID => forwardToControllerOrFail(request)
case ApiKeys.AUTOMQ_UPDATE_GROUP => handleUpdateGroupRequest(request, requestLocal)
case ApiKeys.UPDATE_LICENSE => forwardToControllerOrFail(request)
case ApiKeys.DESCRIBE_LICENSE => forwardToControllerOrFail(request)
case ApiKeys.EXPORT_CLUSTER_MANIFEST => forwardToControllerOrFail(request)
case _ =>
throw new IllegalStateException("Message conversion info is recorded only for Produce/Fetch requests")
@ -208,7 +213,10 @@ class ElasticKafkaApis(
| ApiKeys.GET_NEXT_NODE_ID
| ApiKeys.AUTOMQ_ZONE_ROUTER
| ApiKeys.AUTOMQ_UPDATE_GROUP
| ApiKeys.AUTOMQ_GET_PARTITION_SNAPSHOT => handleExtensionRequest(request, requestLocal)
| ApiKeys.AUTOMQ_GET_PARTITION_SNAPSHOT
| ApiKeys.UPDATE_LICENSE
| ApiKeys.DESCRIBE_LICENSE
| ApiKeys.EXPORT_CLUSTER_MANIFEST => handleExtensionRequest(request, requestLocal)
case _ => super.handle(request, requestLocal)
}
}
@ -469,6 +477,19 @@ class ElasticKafkaApis(
val versionId = request.header.apiVersion
val clientId = request.header.clientId
val fetchRequest = request.body[FetchRequest]
if (!fetchRequest.isFromFollower && licenseManager != null && !licenseManager.checkLicense("", false)) {
val topicNames = if (fetchRequest.version() >= 13) metadataCache.topicIdsToNames() else Collections.emptyMap[Uuid, String]()
val fetchData = fetchRequest.fetchData(topicNames)
val responseData = new util.LinkedHashMap[TopicIdPartition, FetchResponseData.PartitionData]()
fetchData.forEach { (topicIdPartition, _) =>
responseData.put(topicIdPartition, FetchResponse.partitionResponse(topicIdPartition, Errors.TOPIC_AUTHORIZATION_FAILED))
}
val response = FetchResponse.of(Errors.NONE, 0, fetchRequest.metadata.sessionId(), responseData)
requestChannel.sendResponse(request, response, None)
return
}
val topicNames =
if (fetchRequest.version() >= 13)
metadataCache.topicIdsToNames()

View File

@ -0,0 +1,70 @@
/*
* Copyright 2025, AutoMQ HK Limited.
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server.streamaspect;
import org.apache.kafka.controller.LicenseManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ServiceLoader;
public final class LicenseManagerProvider {
private static final Logger LOG = LoggerFactory.getLogger(LicenseManagerProvider.class);
private static final Object INIT_LOCK = new Object();
private static volatile LicenseManager cachedInstance;
private LicenseManagerProvider() {
}
public static LicenseManager get() {
LicenseManager current = cachedInstance;
if (current == null) {
synchronized (INIT_LOCK) {
current = cachedInstance;
if (current == null) {
cachedInstance = current = loadService();
}
}
}
return current;
}
private static LicenseManager loadService() {
try {
ServiceLoader<LicenseManager> loader =
ServiceLoader.load(LicenseManager.class, LicenseManager.class.getClassLoader());
LicenseManager first = null;
for (LicenseManager impl : loader) {
if (first != null) {
break;
}
first = impl;
}
return first;
} catch (Throwable t) {
LOG.error("Failed to load LicenseManager implementation", t);
return null;
}
}
}

View File

@ -59,12 +59,16 @@ import org.apache.kafka.common.message.DeleteKVsRequestData;
import org.apache.kafka.common.message.DeleteKVsResponseData;
import org.apache.kafka.common.message.DeleteStreamsRequestData;
import org.apache.kafka.common.message.DeleteStreamsResponseData;
import org.apache.kafka.common.message.DescribeLicenseRequestData;
import org.apache.kafka.common.message.DescribeLicenseResponseData;
import org.apache.kafka.common.message.DescribeStreamsRequestData;
import org.apache.kafka.common.message.DescribeStreamsResponseData;
import org.apache.kafka.common.message.ElectLeadersRequestData;
import org.apache.kafka.common.message.ElectLeadersResponseData;
import org.apache.kafka.common.message.ExpireDelegationTokenRequestData;
import org.apache.kafka.common.message.ExpireDelegationTokenResponseData;
import org.apache.kafka.common.message.ExportClusterManifestRequestData;
import org.apache.kafka.common.message.ExportClusterManifestResponseData;
import org.apache.kafka.common.message.GetKVsRequestData;
import org.apache.kafka.common.message.GetKVsResponseData;
import org.apache.kafka.common.message.GetNextNodeIdRequestData;
@ -84,6 +88,8 @@ import org.apache.kafka.common.message.TrimStreamsRequestData;
import org.apache.kafka.common.message.TrimStreamsResponseData;
import org.apache.kafka.common.message.UpdateFeaturesRequestData;
import org.apache.kafka.common.message.UpdateFeaturesResponseData;
import org.apache.kafka.common.message.UpdateLicenseRequestData;
import org.apache.kafka.common.message.UpdateLicenseResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.quota.ClientQuotaAlteration;
import org.apache.kafka.common.quota.ClientQuotaEntity;
@ -663,6 +669,21 @@ public class MockController implements Controller {
throw new UnsupportedOperationException();
}
@Override
public CompletableFuture<DescribeLicenseResponseData> describeLicense(ControllerRequestContext context, DescribeLicenseRequestData request) {
throw new UnsupportedOperationException();
}
@Override
public CompletableFuture<UpdateLicenseResponseData> updateLicense(ControllerRequestContext context, UpdateLicenseRequestData request) {
throw new UnsupportedOperationException();
}
@Override
public CompletableFuture<ExportClusterManifestResponseData> exportClusterManifest(ControllerRequestContext context, ExportClusterManifestRequestData request) {
throw new UnsupportedOperationException();
}
@Override
public CompletableFuture<AutomqRegisterNodeResponseData> registerNode(ControllerRequestContext context,
AutomqRegisterNodeRequest req) {

View File

@ -53,12 +53,16 @@ import org.apache.kafka.common.message.DeleteKVsRequestData;
import org.apache.kafka.common.message.DeleteKVsResponseData;
import org.apache.kafka.common.message.DeleteStreamsRequestData;
import org.apache.kafka.common.message.DeleteStreamsResponseData;
import org.apache.kafka.common.message.DescribeLicenseRequestData;
import org.apache.kafka.common.message.DescribeLicenseResponseData;
import org.apache.kafka.common.message.DescribeStreamsRequestData;
import org.apache.kafka.common.message.DescribeStreamsResponseData;
import org.apache.kafka.common.message.ElectLeadersRequestData;
import org.apache.kafka.common.message.ElectLeadersResponseData;
import org.apache.kafka.common.message.ExpireDelegationTokenRequestData;
import org.apache.kafka.common.message.ExpireDelegationTokenResponseData;
import org.apache.kafka.common.message.ExportClusterManifestRequestData;
import org.apache.kafka.common.message.ExportClusterManifestResponseData;
import org.apache.kafka.common.message.GetKVsRequestData;
import org.apache.kafka.common.message.GetKVsResponseData;
import org.apache.kafka.common.message.GetNextNodeIdRequestData;
@ -78,6 +82,8 @@ import org.apache.kafka.common.message.TrimStreamsRequestData;
import org.apache.kafka.common.message.TrimStreamsResponseData;
import org.apache.kafka.common.message.UpdateFeaturesRequestData;
import org.apache.kafka.common.message.UpdateFeaturesResponseData;
import org.apache.kafka.common.message.UpdateLicenseRequestData;
import org.apache.kafka.common.message.UpdateLicenseResponseData;
import org.apache.kafka.common.quota.ClientQuotaAlteration;
import org.apache.kafka.common.quota.ClientQuotaEntity;
import org.apache.kafka.common.requests.ApiError;
@ -609,6 +615,30 @@ public interface Controller extends AclMutator, AutoCloseable {
DeleteKVsRequestData request
);
/**
* Describe license.
*/
CompletableFuture<DescribeLicenseResponseData> describeLicense(
ControllerRequestContext context,
DescribeLicenseRequestData request
);
/**
* Update license.
*/
CompletableFuture<UpdateLicenseResponseData> updateLicense(
ControllerRequestContext context,
UpdateLicenseRequestData request
);
/**
* Export cluster manifest.
*/
CompletableFuture<ExportClusterManifestResponseData> exportClusterManifest(
ControllerRequestContext context,
ExportClusterManifestRequestData request
);
/**
* Register node metadata
*/

View File

@ -0,0 +1,25 @@
package org.apache.kafka.controller;
import org.apache.kafka.common.metadata.KVRecord;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import java.util.Date;
import java.util.List;
public interface LicenseManager {
String describeLicense();
String exportClusterManifest();
boolean checkLicense(String license, boolean isUpdate);
boolean replay(KVRecord record);
boolean initialized();
Date getExpireDate();
List<ApiMessageAndVersion> getRecordsToAppend(String license);
}

View File

@ -62,12 +62,16 @@ import org.apache.kafka.common.message.DeleteKVsRequestData;
import org.apache.kafka.common.message.DeleteKVsResponseData;
import org.apache.kafka.common.message.DeleteStreamsRequestData;
import org.apache.kafka.common.message.DeleteStreamsResponseData;
import org.apache.kafka.common.message.DescribeLicenseRequestData;
import org.apache.kafka.common.message.DescribeLicenseResponseData;
import org.apache.kafka.common.message.DescribeStreamsRequestData;
import org.apache.kafka.common.message.DescribeStreamsResponseData;
import org.apache.kafka.common.message.ElectLeadersRequestData;
import org.apache.kafka.common.message.ElectLeadersResponseData;
import org.apache.kafka.common.message.ExpireDelegationTokenRequestData;
import org.apache.kafka.common.message.ExpireDelegationTokenResponseData;
import org.apache.kafka.common.message.ExportClusterManifestRequestData;
import org.apache.kafka.common.message.ExportClusterManifestResponseData;
import org.apache.kafka.common.message.GetKVsRequestData;
import org.apache.kafka.common.message.GetKVsResponseData;
import org.apache.kafka.common.message.GetNextNodeIdRequestData;
@ -87,6 +91,8 @@ import org.apache.kafka.common.message.TrimStreamsRequestData;
import org.apache.kafka.common.message.TrimStreamsResponseData;
import org.apache.kafka.common.message.UpdateFeaturesRequestData;
import org.apache.kafka.common.message.UpdateFeaturesResponseData;
import org.apache.kafka.common.message.UpdateLicenseRequestData;
import org.apache.kafka.common.message.UpdateLicenseResponseData;
import org.apache.kafka.common.metadata.AbortTransactionRecord;
import org.apache.kafka.common.metadata.AccessControlEntryRecord;
import org.apache.kafka.common.metadata.AssignedS3ObjectIdRecord;
@ -289,6 +295,7 @@ public final class QuorumController implements Controller {
private StreamClient streamClient;
private List<String> quorumVoters = Collections.emptyList();
private Function<QuorumController, QuorumControllerExtension> extension = c -> QuorumControllerExtension.NOOP;
private LicenseManager licenseManager = null;
// AutoMQ for Kafka inject end
public Builder(int nodeId, String clusterId) {
@ -460,6 +467,11 @@ public final class QuorumController implements Controller {
this.extension = extension;
return this;
}
public Builder setLicenseManager(LicenseManager licenseManager) {
this.licenseManager = licenseManager;
return this;
}
// AutoMQ for Kafka inject end
public QuorumController build() throws Exception {
@ -522,6 +534,7 @@ public final class QuorumController implements Controller {
eligibleLeaderReplicasEnabled,
streamClient,
quorumVoters,
licenseManager,
extension
);
} catch (Exception e) {
@ -1359,13 +1372,21 @@ public final class QuorumController implements Controller {
@Override
public ControllerResult<Void> generateRecordsAndResult() {
try {
return ActivationRecordsGenerator.generate(
ControllerResult<Void> base = ActivationRecordsGenerator.generate(
log::warn,
logReplayTracker.empty(),
offsetControl.transactionStartOffset(),
zkMigrationEnabled,
bootstrapMetadata,
featureControl);
// AutoMQ for Kafka inject start
List<ApiMessageAndVersion> all = new ArrayList<>(base.records());
if (licenseManager != null && !licenseManager.initialized()) {
List<ApiMessageAndVersion> recordsToAppend = licenseManager.getRecordsToAppend("");
all.addAll(recordsToAppend);
}
// AutoMQ for Kafka inject end
return ControllerResult.atomicOf(all, null);
} catch (Throwable t) {
throw fatalFaultHandler.handleFault("exception while completing controller " +
"activation", t);
@ -1599,7 +1620,7 @@ public final class QuorumController implements Controller {
* @param snapshotId The snapshotId if this record is from a snapshot
* @param offset The offset of the record
*/
@SuppressWarnings("checkstyle:javaNCSS")
@SuppressWarnings({"checkstyle:javaNCSS", "checkstyle:MethodLength"})
private void replay(ApiMessage message, Optional<OffsetAndEpoch> snapshotId, long offset) {
if (log.isTraceEnabled()) {
if (snapshotId.isPresent()) {
@ -1743,6 +1764,9 @@ public final class QuorumController implements Controller {
topicDeletionManager.replay(record);
nodeControlManager.replay(record);
routerChannelEpochControlManager.replay(record);
if (licenseManager != null) {
licenseManager.replay(record);
}
break;
}
case REMOVE_KVRECORD: {
@ -2007,6 +2031,8 @@ public final class QuorumController implements Controller {
private final RouterChannelEpochControlManager routerChannelEpochControlManager;
private final QuorumControllerExtension extension;
private final LicenseManager licenseManager;
// AutoMQ for Kafka inject end
@ -2046,6 +2072,7 @@ public final class QuorumController implements Controller {
// AutoMQ inject start
StreamClient streamClient,
List<String> quorumVoters,
LicenseManager licenseManager,
Function<QuorumController, QuorumControllerExtension> extension
// AutoMQ inject end
@ -2176,6 +2203,8 @@ public final class QuorumController implements Controller {
this.nodeControlManager = new NodeControlManager(snapshotRegistry, new DefaultNodeRuntimeInfoManager(clusterControl, streamControlManager));
this.routerChannelEpochControlManager = new RouterChannelEpochControlManager(snapshotRegistry, this, nodeControlManager, time);
this.extension = extension.apply(this);
this.licenseManager = licenseManager;
// set the nodeControlManager here to avoid circular dependency
this.replicationControl.setNodeControlManager(nodeControlManager);
@ -2797,6 +2826,91 @@ public final class QuorumController implements Controller {
);
}
@Override
public CompletableFuture<DescribeLicenseResponseData> describeLicense(
ControllerRequestContext context,
DescribeLicenseRequestData request
) {
if (licenseManager == null) {
return CompletableFuture.completedFuture(
new DescribeLicenseResponseData()
.setErrorCode(Errors.UNSUPPORTED_VERSION.code())
.setLicense("")
.setThrottleTimeMs(0)
);
}
return appendReadEvent("describeLicense", context.deadlineNs(), () -> {
String license = licenseManager.describeLicense();
return new DescribeLicenseResponseData()
.setErrorCode(Errors.NONE.code())
.setLicense(license)
.setThrottleTimeMs(0);
});
}
@Override
public CompletableFuture<UpdateLicenseResponseData> updateLicense(
ControllerRequestContext context,
UpdateLicenseRequestData request
) {
if (licenseManager == null) {
return CompletableFuture.completedFuture(
new UpdateLicenseResponseData()
.setErrorCode(Errors.UNSUPPORTED_VERSION.code())
.setErrorMessage("License management is not supported")
.setThrottleTimeMs(0)
);
}
String license = request.license();
if (!licenseManager.checkLicense(license, true)) {
return CompletableFuture.completedFuture(
new UpdateLicenseResponseData()
.setErrorCode(Errors.POLICY_VIOLATION.code())
.setErrorMessage("license check failed")
.setThrottleTimeMs(0)
);
}
return appendWriteEvent("updateLicense", context.deadlineNs(), () -> {
try {
List<ApiMessageAndVersion> recordsToAppend = licenseManager.getRecordsToAppend(license);
UpdateLicenseResponseData response = new UpdateLicenseResponseData()
.setErrorCode(Errors.NONE.code())
.setErrorMessage("")
.setThrottleTimeMs(0);
return ControllerResult.of(recordsToAppend, response);
} catch (Exception e) {
log.error("Failed to process license update", e);
UpdateLicenseResponseData response = new UpdateLicenseResponseData()
.setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code())
.setErrorMessage("Failed to process update: " + e.getMessage())
.setThrottleTimeMs(0);
return ControllerResult.of(Collections.emptyList(), response);
}
});
}
@Override
public CompletableFuture<ExportClusterManifestResponseData> exportClusterManifest(
ControllerRequestContext context,
ExportClusterManifestRequestData request
) {
if (licenseManager == null) {
return CompletableFuture.completedFuture(
new ExportClusterManifestResponseData()
.setErrorCode(Errors.UNSUPPORTED_VERSION.code())
.setManifest("")
.setThrottleTimeMs(0)
);
}
return appendReadEvent("exportClusterManifest", context.deadlineNs(), () -> {
String manifest = licenseManager.exportClusterManifest();
return new ExportClusterManifestResponseData()
.setErrorCode(Errors.NONE.code())
.setManifest(manifest)
.setThrottleTimeMs(0);
});
}
@Override
public CompletableFuture<AutomqRegisterNodeResponseData> registerNode(ControllerRequestContext context, AutomqRegisterNodeRequest req) {
return appendWriteEvent("registerNode", context.deadlineNs(), () -> nodeControlManager.register(req));

View File

@ -60,4 +60,9 @@ public class S3StreamKafkaMetricsConstants {
// Back Pressure
public static final String BACK_PRESSURE_STATE_METRIC_NAME = "back_pressure_state";
public static final AttributeKey<String> LABEL_BACK_PRESSURE_STATE = AttributeKey.stringKey("state");
// License
public static final String LICENSE_EXPIRY_TIMESTAMP_METRIC_NAME = "license_expiry_timestamp";
public static final String LICENSE_SECONDS_REMAINING_METRIC_NAME = "automq_enterprise_license_expiry_seconds";
public static final String NODE_VCPU_COUNT_METRIC_NAME = "node_vcpu_count";
}

View File

@ -131,6 +131,17 @@ public class S3StreamKafkaMetricsManager {
*/
private static Supplier<String> certChainSupplier = () -> null;
/**
* Supplier for license expiry date.
* Returns the expiry date of the license, or null if no license is configured.
*/
private static Supplier<Date> licenseExpireDateSupplier = () -> null;
// License metrics
private static ObservableLongGauge licenseExpiryTimestampMetrics = new NoopObservableLongGauge();
private static ObservableLongGauge licenseSecondsRemainingMetrics = new NoopObservableLongGauge();
private static ObservableLongGauge nodeVcpuCountMetrics = new NoopObservableLongGauge();
public static void configure(MetricsConfig metricsConfig) {
synchronized (BASE_ATTRIBUTES_LISTENERS) {
S3StreamKafkaMetricsManager.metricsConfig = metricsConfig;
@ -147,6 +158,7 @@ public class S3StreamKafkaMetricsManager {
initLogAppendMetrics(meter, prefix);
initPartitionStatusStatisticsMetrics(meter, prefix);
initBackPressureMetrics(meter, prefix);
initLicenseMetrics(meter, prefix);
try {
initCertMetrics(meter, prefix);
} catch (Exception e) {
@ -324,6 +336,44 @@ public class S3StreamKafkaMetricsManager {
});
}
private static void initLicenseMetrics(Meter meter, String prefix) {
licenseExpiryTimestampMetrics = meter.gaugeBuilder(prefix + S3StreamKafkaMetricsConstants.LICENSE_EXPIRY_TIMESTAMP_METRIC_NAME)
.setDescription("The expiry timestamp of the license")
.setUnit("milliseconds")
.ofLongs()
.buildWithCallback(result -> {
if (MetricsLevel.INFO.isWithin(metricsConfig.getMetricsLevel())) {
Date expireDate = licenseExpireDateSupplier.get();
if (expireDate != null) {
result.record(expireDate.getTime(), metricsConfig.getBaseAttributes());
}
}
});
licenseSecondsRemainingMetrics = meter.gaugeBuilder(prefix + S3StreamKafkaMetricsConstants.LICENSE_SECONDS_REMAINING_METRIC_NAME)
.setDescription("The remaining seconds until the license expires")
.setUnit("seconds")
.ofLongs()
.buildWithCallback(result -> {
if (MetricsLevel.INFO.isWithin(metricsConfig.getMetricsLevel())) {
Date expireDate = licenseExpireDateSupplier.get();
if (expireDate != null) {
long secondsRemaining = (expireDate.getTime() - System.currentTimeMillis()) / 1000;
result.record(secondsRemaining, metricsConfig.getBaseAttributes());
}
}
});
nodeVcpuCountMetrics = meter.gaugeBuilder(prefix + S3StreamKafkaMetricsConstants.NODE_VCPU_COUNT_METRIC_NAME)
.setDescription("The number of vCPUs available on this node")
.ofLongs()
.buildWithCallback(result -> {
if (MetricsLevel.INFO.isWithin(metricsConfig.getMetricsLevel())) {
result.record(Runtime.getRuntime().availableProcessors(), metricsConfig.getBaseAttributes());
}
});
}
/**
* Initialize the certificate metrics.
*
@ -516,4 +566,8 @@ public class S3StreamKafkaMetricsManager {
public static void setCertChainSupplier(Supplier<String> certChainSupplier) {
S3StreamKafkaMetricsManager.certChainSupplier = certChainSupplier;
}
public static void setLicenseExpireDateSupplier(Supplier<Date> licenseExpireDateSupplier) {
S3StreamKafkaMetricsManager.licenseExpireDateSupplier = licenseExpireDateSupplier;
}
}