Compare commits

..

1 Commits

Author SHA1 Message Date
Rancho-7 892a10824e
feat: license 2025-12-26 15:01:27 +08:00
43 changed files with 1486 additions and 20 deletions

View File

@ -9,14 +9,6 @@ or [Slack](https://join.slack.com/t/automq/shared_invite/zt-29h17vye9-thf31ebIVL
Before getting started, please review AutoMQ's Code of Conduct. Everyone interacting in Slack or WeChat
follow [Code of Conduct](CODE_OF_CONDUCT.md).
## Suggested Onboarding Path for New Contributors
If you are new to AutoMQ, it is recommended to first deploy and run AutoMQ using Docker as described in the README.
This helps you quickly understand AutoMQs core concepts and behavior without local environment complexity.
After gaining familiarity, contributors who want to work on code can follow the steps in this guide to build and run AutoMQ locally.
## Code Contributions
### Finding or Reporting Issues

View File

@ -2341,12 +2341,6 @@ project(':automq-metrics') {
configProperties = checkstyleConfigProperties("import-control-server.xml")
}
configurations {
all {
exclude group: 'io.opentelemetry', module: 'opentelemetry-exporter-sender-okhttp'
}
}
dependencies {
// OpenTelemetry core dependencies
api libs.opentelemetryJava8
@ -2356,7 +2350,6 @@ project(':automq-metrics') {
api libs.opentelemetryExporterLogging
api libs.opentelemetryExporterProm
api libs.opentelemetryExporterOTLP
api libs.opentelemetryExporterSenderJdk
api libs.opentelemetryJmx
// Logging dependencies

View File

@ -1739,6 +1739,24 @@ public interface Admin extends AutoCloseable {
* @return {@link UpdateGroupResult}
*/
UpdateGroupResult updateGroup(String groupId, UpdateGroupSpec groupSpec, UpdateGroupOptions options);
default UpdateLicenseResult updateLicense(String license) {
return updateLicense(license, new UpdateLicenseOptions());
}
UpdateLicenseResult updateLicense(String license, UpdateLicenseOptions options);
default DescribeLicenseResult describeLicense() {
return describeLicense(new DescribeLicenseOptions());
}
DescribeLicenseResult describeLicense(DescribeLicenseOptions options);
default ExportClusterManifestResult exportClusterManifest() {
return exportClusterManifest(new ExportClusterManifestOptions());
}
ExportClusterManifestResult exportClusterManifest(ExportClusterManifestOptions options);
// AutoMQ inject end
/**

View File

@ -0,0 +1,31 @@
/*
* Copyright 2025, AutoMQ HK Limited.
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.annotation.InterfaceStability;
/**
* Options for {@link Admin#describeLicense(DescribeLicenseOptions)}.
*
* The API of this class is evolving, see {@link Admin} for details.
*/
@InterfaceStability.Evolving
public class DescribeLicenseOptions extends AbstractOptions<DescribeLicenseOptions> {
}

View File

@ -0,0 +1,44 @@
/*
* Copyright 2025, AutoMQ HK Limited.
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.annotation.InterfaceStability;
/**
* The result of the {@link Admin#describeLicense(DescribeLicenseOptions)} call.
*
* The API of this class is evolving, see {@link Admin} for details.
*/
@InterfaceStability.Evolving
public class DescribeLicenseResult {
private final KafkaFuture<String> future;
DescribeLicenseResult(KafkaFuture<String> future) {
this.future = future;
}
/**
* Return a future which returns the current license.
*/
public KafkaFuture<String> license() {
return future;
}
}

View File

@ -0,0 +1,31 @@
/*
* Copyright 2025, AutoMQ HK Limited.
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.annotation.InterfaceStability;
/**
* Options for {@link Admin#exportClusterManifest(ExportClusterManifestOptions)}.
*
* The API of this class is evolving, see {@link Admin} for details.
*/
@InterfaceStability.Evolving
public class ExportClusterManifestOptions extends AbstractOptions<ExportClusterManifestOptions> {
}

View File

@ -0,0 +1,44 @@
/*
* Copyright 2025, AutoMQ HK Limited.
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.annotation.InterfaceStability;
/**
* The result of the {@link Admin#exportClusterManifest(ExportClusterManifestOptions)} call.
*
* The API of this class is evolving, see {@link Admin} for details.
*/
@InterfaceStability.Evolving
public class ExportClusterManifestResult {
private final KafkaFuture<String> future;
ExportClusterManifestResult(KafkaFuture<String> future) {
this.future = future;
}
/**
* Return a future which returns the exported cluster identity manifest.
*/
public KafkaFuture<String> manifest() {
return future;
}
}

View File

@ -320,5 +320,20 @@ public class ForwardingAdmin implements Admin {
return delegate.updateGroup(groupId, groupSpec, options);
}
@Override
public UpdateLicenseResult updateLicense(String license, UpdateLicenseOptions options) {
return delegate.updateLicense(license);
}
@Override
public DescribeLicenseResult describeLicense(DescribeLicenseOptions options) {
return delegate.describeLicense(options);
}
@Override
public ExportClusterManifestResult exportClusterManifest(ExportClusterManifestOptions options) {
return delegate.exportClusterManifest(options);
}
// AutoMQ inject end
}

View File

@ -139,6 +139,7 @@ import org.apache.kafka.common.message.DescribeClusterRequestData;
import org.apache.kafka.common.message.DescribeClusterResponseData;
import org.apache.kafka.common.message.DescribeConfigsRequestData;
import org.apache.kafka.common.message.DescribeConfigsResponseData;
import org.apache.kafka.common.message.DescribeLicenseRequestData;
import org.apache.kafka.common.message.DescribeLogDirsRequestData;
import org.apache.kafka.common.message.DescribeLogDirsRequestData.DescribableLogDirTopic;
import org.apache.kafka.common.message.DescribeLogDirsResponseData;
@ -152,6 +153,7 @@ import org.apache.kafka.common.message.DescribeUserScramCredentialsRequestData;
import org.apache.kafka.common.message.DescribeUserScramCredentialsRequestData.UserName;
import org.apache.kafka.common.message.DescribeUserScramCredentialsResponseData;
import org.apache.kafka.common.message.ExpireDelegationTokenRequestData;
import org.apache.kafka.common.message.ExportClusterManifestRequestData;
import org.apache.kafka.common.message.GetNextNodeIdRequestData;
import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData;
import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity;
@ -165,6 +167,7 @@ import org.apache.kafka.common.message.RenewDelegationTokenRequestData;
import org.apache.kafka.common.message.UnregisterBrokerRequestData;
import org.apache.kafka.common.message.UpdateFeaturesRequestData;
import org.apache.kafka.common.message.UpdateFeaturesResponseData.UpdatableFeatureResult;
import org.apache.kafka.common.message.UpdateLicenseRequestData;
import org.apache.kafka.common.metrics.KafkaMetricsContext;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
@ -250,8 +253,14 @@ import org.apache.kafka.common.requests.UpdateFeaturesRequest;
import org.apache.kafka.common.requests.UpdateFeaturesResponse;
import org.apache.kafka.common.requests.s3.AutomqGetNodesRequest;
import org.apache.kafka.common.requests.s3.AutomqGetNodesResponse;
import org.apache.kafka.common.requests.s3.DescribeLicenseRequest;
import org.apache.kafka.common.requests.s3.DescribeLicenseResponse;
import org.apache.kafka.common.requests.s3.ExportClusterManifestRequest;
import org.apache.kafka.common.requests.s3.ExportClusterManifestResponse;
import org.apache.kafka.common.requests.s3.GetNextNodeIdRequest;
import org.apache.kafka.common.requests.s3.GetNextNodeIdResponse;
import org.apache.kafka.common.requests.s3.UpdateLicenseRequest;
import org.apache.kafka.common.requests.s3.UpdateLicenseResponse;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.scram.internals.ScramFormatter;
import org.apache.kafka.common.security.token.delegation.DelegationToken;
@ -2464,6 +2473,125 @@ public class KafkaAdminClient extends AdminClient {
}, now);
return new GetNextNodeIdResult(nodeIdFuture);
}
@Override
public DescribeLicenseResult describeLicense(final DescribeLicenseOptions options) {
final KafkaFutureImpl<String> future = new KafkaFutureImpl<>();
final long now = time.milliseconds();
final Call call = new Call("describeLicense", calcDeadlineMs(now, options.timeoutMs()),
new ControllerNodeProvider()) {
@Override
DescribeLicenseRequest.Builder createRequest(int timeoutMs) {
return new DescribeLicenseRequest.Builder(new DescribeLicenseRequestData());
}
@Override
void handleResponse(AbstractResponse abstractResponse) {
final DescribeLicenseResponse response =
(DescribeLicenseResponse) abstractResponse;
future.complete(response.data().license());
}
@Override
void handleFailure(Throwable throwable) {
future.completeExceptionally(throwable);
}
};
runnable.call(call, now);
return new DescribeLicenseResult(future);
}
@Override
public UpdateLicenseResult updateLicense(final String license,
final UpdateLicenseOptions options) {
if (license == null || license.isEmpty()) {
throw new IllegalArgumentException("License can not be null or empty.");
}
final KafkaFutureImpl<Void> future = new KafkaFutureImpl<>();
final long now = time.milliseconds();
final Call call = new Call("updateLicense", calcDeadlineMs(now, options.timeoutMs()),
new ControllerNodeProvider(true)) {
@Override
UpdateLicenseRequest.Builder createRequest(int timeoutMs) {
return new UpdateLicenseRequest.Builder(
new UpdateLicenseRequestData()
.setLicense(license));
}
@Override
void handleResponse(AbstractResponse abstractResponse) {
final UpdateLicenseResponse response =
(UpdateLicenseResponse) abstractResponse;
Errors topLevelError = Errors.forCode(response.data().errorCode());
switch (topLevelError) {
case NONE:
future.complete(null);
break;
case NOT_CONTROLLER:
handleNotControllerError(topLevelError);
break;
default:
future.completeExceptionally(topLevelError.exception(response.data().errorMessage()));
break;
}
}
@Override
void handleFailure(Throwable throwable) {
future.completeExceptionally(throwable);
}
};
runnable.call(call, now);
return new UpdateLicenseResult(future);
}
@Override
public ExportClusterManifestResult exportClusterManifest(final ExportClusterManifestOptions options) {
final KafkaFutureImpl<String> future = new KafkaFutureImpl<>();
final long now = time.milliseconds();
final Call call = new Call("exportClusterManifest", calcDeadlineMs(now, options.timeoutMs()),
new ControllerNodeProvider(true)) {
@Override
ExportClusterManifestRequest.Builder createRequest(int timeoutMs) {
return new ExportClusterManifestRequest.Builder(new ExportClusterManifestRequestData());
}
@Override
void handleResponse(AbstractResponse abstractResponse) {
final ExportClusterManifestResponse response =
(ExportClusterManifestResponse) abstractResponse;
Errors topLevelError = Errors.forCode(response.data().errorCode());
switch (topLevelError) {
case NONE:
future.complete(response.data().manifest());
break;
case NOT_CONTROLLER:
handleNotControllerError(topLevelError);
break;
default:
future.completeExceptionally(topLevelError.exception());
break;
}
}
@Override
void handleFailure(Throwable throwable) {
future.completeExceptionally(throwable);
}
};
runnable.call(call, now);
return new ExportClusterManifestResult(future);
}
// AutoMQ for Kafka inject end
private TopicDescription getTopicDescriptionFromCluster(Cluster cluster, String topicName, Uuid topicId,

View File

@ -0,0 +1,31 @@
/*
* Copyright 2025, AutoMQ HK Limited.
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.annotation.InterfaceStability;
/**
* Options for {@link Admin#updateLicense(String, UpdateLicenseOptions)}.
*
* The API of this class is evolving, see {@link Admin} for details.
*/
@InterfaceStability.Evolving
public class UpdateLicenseOptions extends AbstractOptions<UpdateLicenseOptions> {
}

View File

@ -0,0 +1,44 @@
/*
* Copyright 2025, AutoMQ HK Limited.
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.annotation.InterfaceStability;
/**
* The result of the {@link Admin#updateLicense(String, UpdateLicenseOptions)} call.
*
* The API of this class is evolving, see {@link Admin} for details.
*/
@InterfaceStability.Evolving
public class UpdateLicenseResult {
private final KafkaFuture<Void> future;
UpdateLicenseResult(KafkaFuture<Void> future) {
this.future = future;
}
/**
* Return a future which succeeds if the license update succeeds.
*/
public KafkaFuture<Void> all() {
return future;
}
}

View File

@ -145,6 +145,9 @@ public enum ApiKeys {
GET_KVS(ApiMessageType.GET_KVS, false, true),
PUT_KVS(ApiMessageType.PUT_KVS, false, true),
DELETE_KVS(ApiMessageType.DELETE_KVS, false, true),
UPDATE_LICENSE(ApiMessageType.UPDATE_LICENSE, false, true),
DESCRIBE_LICENSE(ApiMessageType.DESCRIBE_LICENSE, false, true),
EXPORT_CLUSTER_MANIFEST(ApiMessageType.EXPORT_CLUSTER_MANIFEST, false, true),
AUTOMQ_REGISTER_NODE(ApiMessageType.AUTOMQ_REGISTER_NODE, false, false),
AUTOMQ_GET_NODES(ApiMessageType.AUTOMQ_GET_NODES, false, true),
AUTOMQ_ZONE_ROUTER(ApiMessageType.AUTOMQ_ZONE_ROUTER, false, false),

View File

@ -34,7 +34,9 @@ import org.apache.kafka.common.requests.s3.CommitStreamSetObjectRequest;
import org.apache.kafka.common.requests.s3.CreateStreamsRequest;
import org.apache.kafka.common.requests.s3.DeleteKVsRequest;
import org.apache.kafka.common.requests.s3.DeleteStreamsRequest;
import org.apache.kafka.common.requests.s3.DescribeLicenseRequest;
import org.apache.kafka.common.requests.s3.DescribeStreamsRequest;
import org.apache.kafka.common.requests.s3.ExportClusterManifestRequest;
import org.apache.kafka.common.requests.s3.GetKVsRequest;
import org.apache.kafka.common.requests.s3.GetNextNodeIdRequest;
import org.apache.kafka.common.requests.s3.GetOpeningStreamsRequest;
@ -42,6 +44,7 @@ import org.apache.kafka.common.requests.s3.OpenStreamsRequest;
import org.apache.kafka.common.requests.s3.PrepareS3ObjectRequest;
import org.apache.kafka.common.requests.s3.PutKVsRequest;
import org.apache.kafka.common.requests.s3.TrimStreamsRequest;
import org.apache.kafka.common.requests.s3.UpdateLicenseRequest;
import java.nio.ByteBuffer;
import java.util.Map;
@ -385,6 +388,12 @@ public abstract class AbstractRequest implements AbstractRequestResponse {
return DescribeStreamsRequest.parse(buffer, apiVersion);
case AUTOMQ_UPDATE_GROUP:
return AutomqUpdateGroupRequest.parse(buffer, apiVersion);
case UPDATE_LICENSE:
return UpdateLicenseRequest.parse(buffer, apiVersion);
case DESCRIBE_LICENSE:
return DescribeLicenseRequest.parse(buffer, apiVersion);
case EXPORT_CLUSTER_MANIFEST:
return ExportClusterManifestRequest.parse(buffer, apiVersion);
// AutoMQ for Kafka inject end
case SHARE_GROUP_HEARTBEAT:

View File

@ -32,7 +32,9 @@ import org.apache.kafka.common.requests.s3.CommitStreamSetObjectResponse;
import org.apache.kafka.common.requests.s3.CreateStreamsResponse;
import org.apache.kafka.common.requests.s3.DeleteKVsResponse;
import org.apache.kafka.common.requests.s3.DeleteStreamsResponse;
import org.apache.kafka.common.requests.s3.DescribeLicenseResponse;
import org.apache.kafka.common.requests.s3.DescribeStreamsResponse;
import org.apache.kafka.common.requests.s3.ExportClusterManifestResponse;
import org.apache.kafka.common.requests.s3.GetKVsResponse;
import org.apache.kafka.common.requests.s3.GetNextNodeIdResponse;
import org.apache.kafka.common.requests.s3.GetOpeningStreamsResponse;
@ -40,6 +42,7 @@ import org.apache.kafka.common.requests.s3.OpenStreamsResponse;
import org.apache.kafka.common.requests.s3.PrepareS3ObjectResponse;
import org.apache.kafka.common.requests.s3.PutKVsResponse;
import org.apache.kafka.common.requests.s3.TrimStreamsResponse;
import org.apache.kafka.common.requests.s3.UpdateLicenseResponse;
import java.nio.ByteBuffer;
import java.util.Collection;
@ -322,6 +325,12 @@ public abstract class AbstractResponse implements AbstractRequestResponse {
return DescribeStreamsResponse.parse(responseBuffer, version);
case AUTOMQ_UPDATE_GROUP:
return AutomqUpdateGroupResponse.parse(responseBuffer, version);
case UPDATE_LICENSE:
return UpdateLicenseResponse.parse(responseBuffer, version);
case DESCRIBE_LICENSE:
return DescribeLicenseResponse.parse(responseBuffer, version);
case EXPORT_CLUSTER_MANIFEST:
return ExportClusterManifestResponse.parse(responseBuffer, version);
// AutoMQ for Kafka inject end
case SHARE_GROUP_HEARTBEAT:

View File

@ -0,0 +1,77 @@
/*
* Copyright 2025, AutoMQ HK Limited.
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.requests.s3;
import org.apache.kafka.common.message.DescribeLicenseRequestData;
import org.apache.kafka.common.message.DescribeLicenseResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.ApiError;
import java.nio.ByteBuffer;
public class DescribeLicenseRequest extends AbstractRequest {
public static class Builder extends AbstractRequest.Builder<DescribeLicenseRequest> {
private final DescribeLicenseRequestData data;
public Builder(DescribeLicenseRequestData data) {
super(ApiKeys.DESCRIBE_LICENSE);
this.data = data;
}
@Override
public DescribeLicenseRequest build(short version) {
return new DescribeLicenseRequest(data, version);
}
@Override
public String toString() {
return data.toString();
}
}
private final DescribeLicenseRequestData data;
public DescribeLicenseRequest(DescribeLicenseRequestData data, short version) {
super(ApiKeys.DESCRIBE_LICENSE, version);
this.data = data;
}
@Override
public DescribeLicenseResponse getErrorResponse(int throttleTimeMs, Throwable e) {
ApiError apiError = ApiError.fromThrowable(e);
DescribeLicenseResponseData response = new DescribeLicenseResponseData()
.setErrorCode(apiError.error().code())
.setThrottleTimeMs(throttleTimeMs);
return new DescribeLicenseResponse(response);
}
@Override
public DescribeLicenseRequestData data() {
return data;
}
public static DescribeLicenseRequest parse(ByteBuffer buffer, short version) {
return new DescribeLicenseRequest(new DescribeLicenseRequestData(
new ByteBufferAccessor(buffer), version), version);
}
}

View File

@ -0,0 +1,66 @@
/*
* Copyright 2025, AutoMQ HK Limited.
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.requests.s3;
import org.apache.kafka.common.message.DescribeLicenseResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractResponse;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
public class DescribeLicenseResponse extends AbstractResponse {
private final DescribeLicenseResponseData data;
public DescribeLicenseResponse(DescribeLicenseResponseData data) {
super(ApiKeys.DESCRIBE_LICENSE);
this.data = data;
}
@Override
public DescribeLicenseResponseData data() {
return data;
}
@Override
public Map<Errors, Integer> errorCounts() {
Map<Errors, Integer> errorCounts = new HashMap<>();
updateErrorCounts(errorCounts, Errors.forCode(data.errorCode()));
return errorCounts;
}
@Override
public int throttleTimeMs() {
return data.throttleTimeMs();
}
@Override
public void maybeSetThrottleTimeMs(int throttleTimeMs) {
data.setThrottleTimeMs(throttleTimeMs);
}
public static DescribeLicenseResponse parse(ByteBuffer buffer, short version) {
return new DescribeLicenseResponse(new DescribeLicenseResponseData(
new ByteBufferAccessor(buffer), version));
}
}

View File

@ -0,0 +1,77 @@
/*
* Copyright 2025, AutoMQ HK Limited.
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.requests.s3;
import org.apache.kafka.common.message.ExportClusterManifestRequestData;
import org.apache.kafka.common.message.ExportClusterManifestResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.ApiError;
import java.nio.ByteBuffer;
public class ExportClusterManifestRequest extends AbstractRequest {
public static class Builder extends AbstractRequest.Builder<ExportClusterManifestRequest> {
private final ExportClusterManifestRequestData data;
public Builder(ExportClusterManifestRequestData data) {
super(ApiKeys.EXPORT_CLUSTER_MANIFEST);
this.data = data;
}
@Override
public ExportClusterManifestRequest build(short version) {
return new ExportClusterManifestRequest(data, version);
}
@Override
public String toString() {
return data.toString();
}
}
private final ExportClusterManifestRequestData data;
public ExportClusterManifestRequest(ExportClusterManifestRequestData data, short version) {
super(ApiKeys.EXPORT_CLUSTER_MANIFEST, version);
this.data = data;
}
@Override
public ExportClusterManifestResponse getErrorResponse(int throttleTimeMs, Throwable e) {
ApiError apiError = ApiError.fromThrowable(e);
ExportClusterManifestResponseData response = new ExportClusterManifestResponseData()
.setErrorCode(apiError.error().code())
.setThrottleTimeMs(throttleTimeMs);
return new ExportClusterManifestResponse(response);
}
@Override
public ExportClusterManifestRequestData data() {
return data;
}
public static ExportClusterManifestRequest parse(ByteBuffer buffer, short version) {
return new ExportClusterManifestRequest(new ExportClusterManifestRequestData(
new ByteBufferAccessor(buffer), version), version);
}
}

View File

@ -0,0 +1,66 @@
/*
* Copyright 2025, AutoMQ HK Limited.
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.requests.s3;
import org.apache.kafka.common.message.ExportClusterManifestResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractResponse;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
public class ExportClusterManifestResponse extends AbstractResponse {
private final ExportClusterManifestResponseData data;
public ExportClusterManifestResponse(ExportClusterManifestResponseData data) {
super(ApiKeys.EXPORT_CLUSTER_MANIFEST);
this.data = data;
}
@Override
public ExportClusterManifestResponseData data() {
return data;
}
@Override
public Map<Errors, Integer> errorCounts() {
Map<Errors, Integer> errorCounts = new HashMap<>();
updateErrorCounts(errorCounts, Errors.forCode(data.errorCode()));
return errorCounts;
}
@Override
public int throttleTimeMs() {
return data.throttleTimeMs();
}
@Override
public void maybeSetThrottleTimeMs(int throttleTimeMs) {
data.setThrottleTimeMs(throttleTimeMs);
}
public static ExportClusterManifestResponse parse(ByteBuffer buffer, short version) {
return new ExportClusterManifestResponse(new ExportClusterManifestResponseData(
new ByteBufferAccessor(buffer), version));
}
}

View File

@ -0,0 +1,78 @@
/*
* Copyright 2025, AutoMQ HK Limited.
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.requests.s3;
import org.apache.kafka.common.message.UpdateLicenseRequestData;
import org.apache.kafka.common.message.UpdateLicenseResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.ApiError;
import java.nio.ByteBuffer;
public class UpdateLicenseRequest extends AbstractRequest {
public static class Builder extends AbstractRequest.Builder<UpdateLicenseRequest> {
private final UpdateLicenseRequestData data;
public Builder(UpdateLicenseRequestData data) {
super(ApiKeys.UPDATE_LICENSE);
this.data = data;
}
@Override
public UpdateLicenseRequest build(short version) {
return new UpdateLicenseRequest(data, version);
}
@Override
public String toString() {
return data.toString();
}
}
private final UpdateLicenseRequestData data;
public UpdateLicenseRequest(UpdateLicenseRequestData data, short version) {
super(ApiKeys.UPDATE_LICENSE, version);
this.data = data;
}
@Override
public UpdateLicenseResponse getErrorResponse(int throttleTimeMs, Throwable e) {
ApiError apiError = ApiError.fromThrowable(e);
UpdateLicenseResponseData response = new UpdateLicenseResponseData()
.setErrorCode(apiError.error().code())
.setErrorMessage(apiError.message())
.setThrottleTimeMs(throttleTimeMs);
return new UpdateLicenseResponse(response);
}
@Override
public UpdateLicenseRequestData data() {
return data;
}
public static UpdateLicenseRequest parse(ByteBuffer buffer, short version) {
return new UpdateLicenseRequest(new UpdateLicenseRequestData(
new ByteBufferAccessor(buffer), version), version);
}
}

View File

@ -0,0 +1,66 @@
/*
* Copyright 2025, AutoMQ HK Limited.
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.requests.s3;
import org.apache.kafka.common.message.UpdateLicenseResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractResponse;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
public class UpdateLicenseResponse extends AbstractResponse {
private final UpdateLicenseResponseData data;
public UpdateLicenseResponse(UpdateLicenseResponseData data) {
super(ApiKeys.UPDATE_LICENSE);
this.data = data;
}
@Override
public UpdateLicenseResponseData data() {
return data;
}
@Override
public Map<Errors, Integer> errorCounts() {
Map<Errors, Integer> errorCounts = new HashMap<>();
updateErrorCounts(errorCounts, Errors.forCode(data.errorCode()));
return errorCounts;
}
@Override
public int throttleTimeMs() {
return data.throttleTimeMs();
}
@Override
public void maybeSetThrottleTimeMs(int throttleTimeMs) {
data.setThrottleTimeMs(throttleTimeMs);
}
public static UpdateLicenseResponse parse(ByteBuffer buffer, short version) {
return new UpdateLicenseResponse(new UpdateLicenseResponseData(
new ByteBufferAccessor(buffer), version));
}
}

View File

@ -0,0 +1,28 @@
/*
* Copyright 2025, AutoMQ HK Limited.
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
{
"apiKey": 520,
"type": "request",
"listeners": ["controller", "broker"],
"name": "DescribeLicenseRequest",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": []
}

View File

@ -0,0 +1,31 @@
/*
* Copyright 2025, AutoMQ HK Limited.
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
{
"apiKey": 520,
"type": "response",
"name": "DescribeLicenseResponse",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "ErrorCode", "type": "int16", "versions": "0+" },
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+" },
{ "name": "License", "type": "string", "versions": "0+", "default": "" }
]
}

View File

@ -0,0 +1,28 @@
/*
* Copyright 2025, AutoMQ HK Limited.
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
{
"apiKey": 521,
"type": "request",
"listeners": ["controller", "broker"],
"name": "ExportClusterManifestRequest",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": []
}

View File

@ -0,0 +1,31 @@
/*
* Copyright 2025, AutoMQ HK Limited.
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
{
"apiKey": 521,
"type": "response",
"name": "ExportClusterManifestResponse",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "ErrorCode", "type": "int16", "versions": "0+" },
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+" },
{ "name": "Manifest", "type": "string", "versions": "0+", "default": "", "about": "The exported cluster identity manifest" }
]
}

View File

@ -0,0 +1,30 @@
/*
* Copyright 2025, AutoMQ HK Limited.
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
{
"apiKey": 519,
"type": "request",
"listeners": ["controller", "broker"],
"name": "UpdateLicenseRequest",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "License", "type": "string", "versions": "0+" }
]
}

View File

@ -0,0 +1,31 @@
/*
* Copyright 2025, AutoMQ HK Limited.
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
{
"apiKey": 519,
"type": "response",
"name": "UpdateLicenseResponse",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "ErrorCode", "type": "int16", "versions": "0+" },
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+" },
{ "name": "ErrorMessage", "type": "string", "versions": "0+", "default": "" }
]
}

View File

@ -1450,6 +1450,21 @@ public class MockAdminClient extends AdminClient {
throw new UnsupportedOperationException();
}
@Override
public UpdateLicenseResult updateLicense(String license, UpdateLicenseOptions options) {
throw new UnsupportedOperationException();
}
@Override
public DescribeLicenseResult describeLicense(DescribeLicenseOptions options) {
throw new UnsupportedOperationException();
}
@Override
public ExportClusterManifestResult exportClusterManifest(ExportClusterManifestOptions options) {
throw new UnsupportedOperationException();
}
// AutoMQ inject end
}

View File

@ -0,0 +1,6 @@
package kafka.automq.license;
import org.apache.kafka.image.publisher.MetadataPublisher;
public interface LicenseListener extends MetadataPublisher {
}

View File

@ -17,8 +17,10 @@
package kafka.server;
import kafka.automq.table.metric.TableTopicMetricsManager;
import kafka.server.streamaspect.LicenseManagerProvider;
import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.controller.LicenseManager;
import org.apache.kafka.server.ProcessRole;
import org.apache.kafka.server.metrics.KafkaYammerMetrics;
import org.apache.kafka.server.metrics.s3stream.S3StreamKafkaMetricsManager;
@ -94,6 +96,19 @@ public final class TelemetrySupport {
}
});
S3StreamKafkaMetricsManager.setLicenseExpireDateSupplier(() -> {
try {
LicenseManager licenseManager = LicenseManagerProvider.get();
if (licenseManager != null) {
return licenseManager.getExpireDate();
}
return null;
} catch (Exception e) {
LOGGER.error("Failed to obtain license expiry date", e);
return null;
}
});
Meter meter = manager.getMeter();
MetricsLevel metricsLevel = parseMetricsLevel(config.s3MetricsLevel());
long metricsIntervalMs = (long) config.s3ExporterReportIntervalMs();

View File

@ -133,6 +133,9 @@ object RequestConvertToJson {
case req: AutomqGetPartitionSnapshotRequest => AutomqGetPartitionSnapshotRequestDataJsonConverter.write(req.data, request.version)
case req: GetNextNodeIdRequest => GetNextNodeIdRequestDataJsonConverter.write(req.data, request.version)
case req: DescribeStreamsRequest => DescribeStreamsRequestDataJsonConverter.write(req.data, request.version)
case req: UpdateLicenseRequest => UpdateLicenseRequestDataJsonConverter.write(req.data, request.version)
case req: DescribeLicenseRequest => DescribeLicenseRequestDataJsonConverter.write(req.data, request.version)
case req: ExportClusterManifestRequest => ExportClusterManifestRequestDataJsonConverter.write(req.data, request.version)
// AutoMQ for Kafka inject end
case req: AddRaftVoterRequest => AddRaftVoterRequestDataJsonConverter.write(req.data, request.version)
@ -249,6 +252,9 @@ object RequestConvertToJson {
case res: GetNextNodeIdResponse => GetNextNodeIdResponseDataJsonConverter.write(res.data, version)
case res: AutomqZoneRouterResponse => AutomqZoneRouterResponseDataJsonConverter.write(res.data, version)
case res: DescribeStreamsResponse => DescribeStreamsResponseDataJsonConverter.write(res.data, version)
case res: UpdateLicenseResponse => UpdateLicenseResponseDataJsonConverter.write(res.data, version)
case res: DescribeLicenseResponse => DescribeLicenseResponseDataJsonConverter.write(res.data, version)
case res: ExportClusterManifestResponse => ExportClusterManifestResponseDataJsonConverter.write(res.data, version)
// AutoMQ for Kafka inject end
case res: AddRaftVoterResponse => AddRaftVoterResponseDataJsonConverter.write(res.data, version)

View File

@ -22,6 +22,7 @@ import kafka.automq.backpressure.{BackPressureConfig, BackPressureManager, Defau
import kafka.automq.failover.FailoverListener
import kafka.automq.kafkalinking.KafkaLinkingManager
import kafka.automq.interceptor.{NoopTrafficInterceptor, TrafficInterceptor}
import kafka.automq.license.LicenseListener
import kafka.automq.table.TableManager
import kafka.automq.zerozone.{ConfirmWALProvider, DefaultClientRackProvider, DefaultConfirmWALProvider, DefaultRouterChannelProvider, DefaultLinkRecordDecoder, RouterChannelProvider, ZeroZoneTrafficInterceptor}
import kafka.cluster.EndPoint
@ -587,6 +588,8 @@ class BrokerServer(
})
newFailoverListener(ElasticLogManager.INSTANCE.get.client)
newLicenseListener()
// AutoMQ inject end
// We're now ready to unfence the broker. This also allows this broker to transition
@ -896,6 +899,10 @@ class BrokerServer(
metadataLoader.installPublishers(util.List.of(failoverListener));
failoverListener
}
protected def newLicenseListener(): LicenseListener = {
null
}
// AutoMQ inject end
}

View File

@ -292,6 +292,7 @@ class ControllerServer(
setStreamClient(streamClient).
setExtension(c => quorumControllerExtension(c)).
setQuorumVoters(config.quorumVoters).
setLicenseManager(sharedServer.licenseManager).
setReplicaPlacer(replicaPlacer())
}
controller = controllerBuilder.build()

View File

@ -21,11 +21,13 @@ import com.automq.opentelemetry.AutoMQTelemetryManager
import kafka.raft.KafkaRaftManager
import kafka.server.Server.MetricsPrefix
import kafka.server.metadata.BrokerServerMetrics
import kafka.server.streamaspect.LicenseManagerProvider
import kafka.utils.{CoreUtils, Logging}
import org.apache.kafka.common.es.ElasticStreamSwitch
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.utils.{AppInfoParser, LogContext, Time}
import org.apache.kafka.controller.LicenseManager
import org.apache.kafka.controller.metrics.ControllerMetadataMetrics
import org.apache.kafka.image.MetadataProvenance
import org.apache.kafka.image.loader.MetadataLoader
@ -110,6 +112,7 @@ class SharedServer(
// AutoMQ for Kafka injection start
ElasticStreamSwitch.setSwitch(sharedServerConfig.elasticStreamEnabled)
@volatile var telemetryManager: AutoMQTelemetryManager = _
@volatile var licenseManager: LicenseManager = _
// AutoMQ for Kafka injection end
@volatile var metrics: Metrics = _metrics
@ -283,6 +286,7 @@ class SharedServer(
// AutoMQ inject start
telemetryManager = buildTelemetryManager(sharedServerConfig, clusterId)
licenseManager = LicenseManagerProvider.get()
// AutoMQ inject end
val _raftManager = new KafkaRaftManager[ApiMessageAndVersion](

View File

@ -8,7 +8,7 @@ import kafka.server.{ApiVersionManager, ControllerApis, KafkaConfig, RequestLoca
import org.apache.kafka.common.acl.AclOperation.CLUSTER_ACTION
import org.apache.kafka.common.errors.ApiException
import org.apache.kafka.common.internals.FatalExitError
import org.apache.kafka.common.message.{DeleteKVsResponseData, GetKVsResponseData, GetNextNodeIdResponseData, PutKVsResponseData}
import org.apache.kafka.common.message.{DeleteKVsResponseData, DescribeLicenseResponseData, ExportClusterManifestResponseData, GetKVsResponseData, GetNextNodeIdResponseData, PutKVsResponseData, UpdateLicenseResponseData}
import org.apache.kafka.common.protocol.Errors.{NONE, UNKNOWN_SERVER_ERROR}
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.s3._
@ -51,6 +51,9 @@ class ElasticControllerApis(
case ApiKeys.GET_KVS => handleGetKV(request)
case ApiKeys.PUT_KVS => handlePutKV(request)
case ApiKeys.DELETE_KVS => handleDeleteKV(request)
case ApiKeys.UPDATE_LICENSE => handleUpdateLicense(request)
case ApiKeys.DESCRIBE_LICENSE => handleDescribeLicense(request)
case ApiKeys.EXPORT_CLUSTER_MANIFEST => handleExportClusterManifest(request)
case ApiKeys.AUTOMQ_REGISTER_NODE => handleRegisterNode(request)
case ApiKeys.AUTOMQ_GET_NODES => handleGetNodes(request)
case ApiKeys.GET_NEXT_NODE_ID => handleGetNextNodeId(request)
@ -99,6 +102,9 @@ class ElasticControllerApis(
| ApiKeys.GET_KVS
| ApiKeys.PUT_KVS
| ApiKeys.DELETE_KVS
| ApiKeys.UPDATE_LICENSE
| ApiKeys.DESCRIBE_LICENSE
| ApiKeys.EXPORT_CLUSTER_MANIFEST
| ApiKeys.AUTOMQ_REGISTER_NODE
| ApiKeys.AUTOMQ_GET_NODES
| ApiKeys.GET_NEXT_NODE_ID
@ -361,6 +367,72 @@ class ElasticControllerApis(
}
}
def handleUpdateLicense(request: RequestChannel.Request): CompletableFuture[Unit] = {
val updateLicenseRequest = request.body[UpdateLicenseRequest]
val context = new ControllerRequestContext(request.context.header.data, request.context.principal,
OptionalLong.empty())
controller.updateLicense(context, updateLicenseRequest.data)
.handle[Unit] { (result, exception) =>
if (exception != null) {
requestHelper.handleError(request, exception)
} else if (result == null) {
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {
new UpdateLicenseResponse(new UpdateLicenseResponseData().
setThrottleTimeMs(requestThrottleMs).
setErrorCode(UNKNOWN_SERVER_ERROR.code))
})
} else {
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {
new UpdateLicenseResponse(result.setThrottleTimeMs(requestThrottleMs))
})
}
}
}
def handleDescribeLicense(request: RequestChannel.Request): CompletableFuture[Unit] = {
val describeLicenseRequest = request.body[DescribeLicenseRequest]
val context = new ControllerRequestContext(request.context.header.data, request.context.principal,
OptionalLong.empty())
controller.describeLicense(context, describeLicenseRequest.data)
.handle[Unit] { (result, exception) =>
if (exception != null) {
requestHelper.handleError(request, exception)
} else if (result == null) {
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {
new DescribeLicenseResponse(new DescribeLicenseResponseData().
setThrottleTimeMs(requestThrottleMs).
setErrorCode(UNKNOWN_SERVER_ERROR.code))
})
} else {
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {
new DescribeLicenseResponse(result.setThrottleTimeMs(requestThrottleMs))
})
}
}
}
def handleExportClusterManifest(request: RequestChannel.Request): CompletableFuture[Unit] = {
val exportClusterManifestRequest = request.body[ExportClusterManifestRequest]
val context = new ControllerRequestContext(request.context.header.data, request.context.principal,
OptionalLong.empty())
controller.exportClusterManifest(context, exportClusterManifestRequest.data)
.handle[Unit] { (result, exception) =>
if (exception != null) {
requestHelper.handleError(request, exception)
} else if (result == null) {
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {
new ExportClusterManifestResponse(new ExportClusterManifestResponseData().
setThrottleTimeMs(requestThrottleMs).
setErrorCode(UNKNOWN_SERVER_ERROR.code))
})
} else {
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {
new ExportClusterManifestResponse(result.setThrottleTimeMs(requestThrottleMs))
})
}
}
}
def handleRegisterNode(request: RequestChannel.Request): CompletableFuture[Unit] = {
val req = request.body[AutomqRegisterNodeRequest]
val context = new ControllerRequestContext(request.context.header.data, request.context.principal,

View File

@ -33,6 +33,7 @@ import org.apache.kafka.common.resource.Resource.CLUSTER_NAME
import org.apache.kafka.common.resource.ResourceType.{CLUSTER, TOPIC, TRANSACTIONAL_ID}
import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.{Node, TopicIdPartition, TopicPartition, Uuid}
import org.apache.kafka.controller.LicenseManager
import org.apache.kafka.coordinator.group.GroupCoordinator
import org.apache.kafka.server.ClientMetricsManager
import org.apache.kafka.server.authorizer.Authorizer
@ -91,6 +92,7 @@ class ElasticKafkaApis(
private var trafficInterceptor: TrafficInterceptor = new NoopTrafficInterceptor(this, metadataCache)
private var snapshotAwaitReadySupplier: Supplier[CompletableFuture[Void]] = () => CompletableFuture.completedFuture(null)
private val licenseManager: LicenseManager = LicenseManagerProvider.get();
/**
* Generate a map of topic -> [(partitionId, epochId)] based on provided topicsRequestData.
@ -179,6 +181,9 @@ class ElasticKafkaApis(
case ApiKeys.DELETE_TOPICS => maybeForwardTopicDeletionToController(request, handleDeleteTopicsRequest)
case ApiKeys.GET_NEXT_NODE_ID => forwardToControllerOrFail(request)
case ApiKeys.AUTOMQ_UPDATE_GROUP => handleUpdateGroupRequest(request, requestLocal)
case ApiKeys.UPDATE_LICENSE => forwardToControllerOrFail(request)
case ApiKeys.DESCRIBE_LICENSE => forwardToControllerOrFail(request)
case ApiKeys.EXPORT_CLUSTER_MANIFEST => forwardToControllerOrFail(request)
case _ =>
throw new IllegalStateException("Message conversion info is recorded only for Produce/Fetch requests")
@ -208,7 +213,10 @@ class ElasticKafkaApis(
| ApiKeys.GET_NEXT_NODE_ID
| ApiKeys.AUTOMQ_ZONE_ROUTER
| ApiKeys.AUTOMQ_UPDATE_GROUP
| ApiKeys.AUTOMQ_GET_PARTITION_SNAPSHOT => handleExtensionRequest(request, requestLocal)
| ApiKeys.AUTOMQ_GET_PARTITION_SNAPSHOT
| ApiKeys.UPDATE_LICENSE
| ApiKeys.DESCRIBE_LICENSE
| ApiKeys.EXPORT_CLUSTER_MANIFEST => handleExtensionRequest(request, requestLocal)
case _ => super.handle(request, requestLocal)
}
}
@ -469,6 +477,19 @@ class ElasticKafkaApis(
val versionId = request.header.apiVersion
val clientId = request.header.clientId
val fetchRequest = request.body[FetchRequest]
if (!fetchRequest.isFromFollower && licenseManager != null && !licenseManager.checkLicense("", false)) {
val topicNames = if (fetchRequest.version() >= 13) metadataCache.topicIdsToNames() else Collections.emptyMap[Uuid, String]()
val fetchData = fetchRequest.fetchData(topicNames)
val responseData = new util.LinkedHashMap[TopicIdPartition, FetchResponseData.PartitionData]()
fetchData.forEach { (topicIdPartition, _) =>
responseData.put(topicIdPartition, FetchResponse.partitionResponse(topicIdPartition, Errors.TOPIC_AUTHORIZATION_FAILED))
}
val response = FetchResponse.of(Errors.NONE, 0, fetchRequest.metadata.sessionId(), responseData)
requestChannel.sendResponse(request, response, None)
return
}
val topicNames =
if (fetchRequest.version() >= 13)
metadataCache.topicIdsToNames()

View File

@ -0,0 +1,70 @@
/*
* Copyright 2025, AutoMQ HK Limited.
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server.streamaspect;
import org.apache.kafka.controller.LicenseManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ServiceLoader;
public final class LicenseManagerProvider {
private static final Logger LOG = LoggerFactory.getLogger(LicenseManagerProvider.class);
private static final Object INIT_LOCK = new Object();
private static volatile LicenseManager cachedInstance;
private LicenseManagerProvider() {
}
public static LicenseManager get() {
LicenseManager current = cachedInstance;
if (current == null) {
synchronized (INIT_LOCK) {
current = cachedInstance;
if (current == null) {
cachedInstance = current = loadService();
}
}
}
return current;
}
private static LicenseManager loadService() {
try {
ServiceLoader<LicenseManager> loader =
ServiceLoader.load(LicenseManager.class, LicenseManager.class.getClassLoader());
LicenseManager first = null;
for (LicenseManager impl : loader) {
if (first != null) {
break;
}
first = impl;
}
return first;
} catch (Throwable t) {
LOG.error("Failed to load LicenseManager implementation", t);
return null;
}
}
}

View File

@ -59,12 +59,16 @@ import org.apache.kafka.common.message.DeleteKVsRequestData;
import org.apache.kafka.common.message.DeleteKVsResponseData;
import org.apache.kafka.common.message.DeleteStreamsRequestData;
import org.apache.kafka.common.message.DeleteStreamsResponseData;
import org.apache.kafka.common.message.DescribeLicenseRequestData;
import org.apache.kafka.common.message.DescribeLicenseResponseData;
import org.apache.kafka.common.message.DescribeStreamsRequestData;
import org.apache.kafka.common.message.DescribeStreamsResponseData;
import org.apache.kafka.common.message.ElectLeadersRequestData;
import org.apache.kafka.common.message.ElectLeadersResponseData;
import org.apache.kafka.common.message.ExpireDelegationTokenRequestData;
import org.apache.kafka.common.message.ExpireDelegationTokenResponseData;
import org.apache.kafka.common.message.ExportClusterManifestRequestData;
import org.apache.kafka.common.message.ExportClusterManifestResponseData;
import org.apache.kafka.common.message.GetKVsRequestData;
import org.apache.kafka.common.message.GetKVsResponseData;
import org.apache.kafka.common.message.GetNextNodeIdRequestData;
@ -84,6 +88,8 @@ import org.apache.kafka.common.message.TrimStreamsRequestData;
import org.apache.kafka.common.message.TrimStreamsResponseData;
import org.apache.kafka.common.message.UpdateFeaturesRequestData;
import org.apache.kafka.common.message.UpdateFeaturesResponseData;
import org.apache.kafka.common.message.UpdateLicenseRequestData;
import org.apache.kafka.common.message.UpdateLicenseResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.quota.ClientQuotaAlteration;
import org.apache.kafka.common.quota.ClientQuotaEntity;
@ -663,6 +669,21 @@ public class MockController implements Controller {
throw new UnsupportedOperationException();
}
@Override
public CompletableFuture<DescribeLicenseResponseData> describeLicense(ControllerRequestContext context, DescribeLicenseRequestData request) {
throw new UnsupportedOperationException();
}
@Override
public CompletableFuture<UpdateLicenseResponseData> updateLicense(ControllerRequestContext context, UpdateLicenseRequestData request) {
throw new UnsupportedOperationException();
}
@Override
public CompletableFuture<ExportClusterManifestResponseData> exportClusterManifest(ControllerRequestContext context, ExportClusterManifestRequestData request) {
throw new UnsupportedOperationException();
}
@Override
public CompletableFuture<AutomqRegisterNodeResponseData> registerNode(ControllerRequestContext context,
AutomqRegisterNodeRequest req) {

View File

@ -300,7 +300,6 @@ libs += [
opentelemetryExporterLogging: "io.opentelemetry:opentelemetry-exporter-logging:$versions.opentelemetrySDK",
opentelemetryExporterProm: "io.opentelemetry:opentelemetry-exporter-prometheus:$versions.opentelemetrySDKAlpha",
opentelemetryExporterOTLP: "io.opentelemetry:opentelemetry-exporter-otlp:$versions.opentelemetrySDK",
opentelemetryExporterSenderJdk: "io.opentelemetry:opentelemetry-exporter-sender-jdk:$versions.opentelemetrySDK",
opentelemetryJmx: "io.opentelemetry.instrumentation:opentelemetry-jmx-metrics:$versions.opentelemetryInstrument",
oshi: "com.github.oshi:oshi-core-java11:$versions.oshi",
bucket4j: "com.bucket4j:bucket4j-core:$versions.bucket4j",

View File

@ -53,12 +53,16 @@ import org.apache.kafka.common.message.DeleteKVsRequestData;
import org.apache.kafka.common.message.DeleteKVsResponseData;
import org.apache.kafka.common.message.DeleteStreamsRequestData;
import org.apache.kafka.common.message.DeleteStreamsResponseData;
import org.apache.kafka.common.message.DescribeLicenseRequestData;
import org.apache.kafka.common.message.DescribeLicenseResponseData;
import org.apache.kafka.common.message.DescribeStreamsRequestData;
import org.apache.kafka.common.message.DescribeStreamsResponseData;
import org.apache.kafka.common.message.ElectLeadersRequestData;
import org.apache.kafka.common.message.ElectLeadersResponseData;
import org.apache.kafka.common.message.ExpireDelegationTokenRequestData;
import org.apache.kafka.common.message.ExpireDelegationTokenResponseData;
import org.apache.kafka.common.message.ExportClusterManifestRequestData;
import org.apache.kafka.common.message.ExportClusterManifestResponseData;
import org.apache.kafka.common.message.GetKVsRequestData;
import org.apache.kafka.common.message.GetKVsResponseData;
import org.apache.kafka.common.message.GetNextNodeIdRequestData;
@ -78,6 +82,8 @@ import org.apache.kafka.common.message.TrimStreamsRequestData;
import org.apache.kafka.common.message.TrimStreamsResponseData;
import org.apache.kafka.common.message.UpdateFeaturesRequestData;
import org.apache.kafka.common.message.UpdateFeaturesResponseData;
import org.apache.kafka.common.message.UpdateLicenseRequestData;
import org.apache.kafka.common.message.UpdateLicenseResponseData;
import org.apache.kafka.common.quota.ClientQuotaAlteration;
import org.apache.kafka.common.quota.ClientQuotaEntity;
import org.apache.kafka.common.requests.ApiError;
@ -609,6 +615,30 @@ public interface Controller extends AclMutator, AutoCloseable {
DeleteKVsRequestData request
);
/**
* Describe license.
*/
CompletableFuture<DescribeLicenseResponseData> describeLicense(
ControllerRequestContext context,
DescribeLicenseRequestData request
);
/**
* Update license.
*/
CompletableFuture<UpdateLicenseResponseData> updateLicense(
ControllerRequestContext context,
UpdateLicenseRequestData request
);
/**
* Export cluster manifest.
*/
CompletableFuture<ExportClusterManifestResponseData> exportClusterManifest(
ControllerRequestContext context,
ExportClusterManifestRequestData request
);
/**
* Register node metadata
*/

View File

@ -0,0 +1,25 @@
package org.apache.kafka.controller;
import org.apache.kafka.common.metadata.KVRecord;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import java.util.Date;
import java.util.List;
public interface LicenseManager {
String describeLicense();
String exportClusterManifest();
boolean checkLicense(String license, boolean isUpdate);
boolean replay(KVRecord record);
boolean initialized();
Date getExpireDate();
List<ApiMessageAndVersion> getRecordsToAppend(String license);
}

View File

@ -62,12 +62,16 @@ import org.apache.kafka.common.message.DeleteKVsRequestData;
import org.apache.kafka.common.message.DeleteKVsResponseData;
import org.apache.kafka.common.message.DeleteStreamsRequestData;
import org.apache.kafka.common.message.DeleteStreamsResponseData;
import org.apache.kafka.common.message.DescribeLicenseRequestData;
import org.apache.kafka.common.message.DescribeLicenseResponseData;
import org.apache.kafka.common.message.DescribeStreamsRequestData;
import org.apache.kafka.common.message.DescribeStreamsResponseData;
import org.apache.kafka.common.message.ElectLeadersRequestData;
import org.apache.kafka.common.message.ElectLeadersResponseData;
import org.apache.kafka.common.message.ExpireDelegationTokenRequestData;
import org.apache.kafka.common.message.ExpireDelegationTokenResponseData;
import org.apache.kafka.common.message.ExportClusterManifestRequestData;
import org.apache.kafka.common.message.ExportClusterManifestResponseData;
import org.apache.kafka.common.message.GetKVsRequestData;
import org.apache.kafka.common.message.GetKVsResponseData;
import org.apache.kafka.common.message.GetNextNodeIdRequestData;
@ -87,6 +91,8 @@ import org.apache.kafka.common.message.TrimStreamsRequestData;
import org.apache.kafka.common.message.TrimStreamsResponseData;
import org.apache.kafka.common.message.UpdateFeaturesRequestData;
import org.apache.kafka.common.message.UpdateFeaturesResponseData;
import org.apache.kafka.common.message.UpdateLicenseRequestData;
import org.apache.kafka.common.message.UpdateLicenseResponseData;
import org.apache.kafka.common.metadata.AbortTransactionRecord;
import org.apache.kafka.common.metadata.AccessControlEntryRecord;
import org.apache.kafka.common.metadata.AssignedS3ObjectIdRecord;
@ -289,6 +295,7 @@ public final class QuorumController implements Controller {
private StreamClient streamClient;
private List<String> quorumVoters = Collections.emptyList();
private Function<QuorumController, QuorumControllerExtension> extension = c -> QuorumControllerExtension.NOOP;
private LicenseManager licenseManager = null;
// AutoMQ for Kafka inject end
public Builder(int nodeId, String clusterId) {
@ -460,6 +467,11 @@ public final class QuorumController implements Controller {
this.extension = extension;
return this;
}
public Builder setLicenseManager(LicenseManager licenseManager) {
this.licenseManager = licenseManager;
return this;
}
// AutoMQ for Kafka inject end
public QuorumController build() throws Exception {
@ -522,6 +534,7 @@ public final class QuorumController implements Controller {
eligibleLeaderReplicasEnabled,
streamClient,
quorumVoters,
licenseManager,
extension
);
} catch (Exception e) {
@ -1359,13 +1372,21 @@ public final class QuorumController implements Controller {
@Override
public ControllerResult<Void> generateRecordsAndResult() {
try {
return ActivationRecordsGenerator.generate(
ControllerResult<Void> base = ActivationRecordsGenerator.generate(
log::warn,
logReplayTracker.empty(),
offsetControl.transactionStartOffset(),
zkMigrationEnabled,
bootstrapMetadata,
featureControl);
// AutoMQ for Kafka inject start
List<ApiMessageAndVersion> all = new ArrayList<>(base.records());
if (licenseManager != null && !licenseManager.initialized()) {
List<ApiMessageAndVersion> recordsToAppend = licenseManager.getRecordsToAppend("");
all.addAll(recordsToAppend);
}
// AutoMQ for Kafka inject end
return ControllerResult.atomicOf(all, null);
} catch (Throwable t) {
throw fatalFaultHandler.handleFault("exception while completing controller " +
"activation", t);
@ -1599,7 +1620,7 @@ public final class QuorumController implements Controller {
* @param snapshotId The snapshotId if this record is from a snapshot
* @param offset The offset of the record
*/
@SuppressWarnings("checkstyle:javaNCSS")
@SuppressWarnings({"checkstyle:javaNCSS", "checkstyle:MethodLength"})
private void replay(ApiMessage message, Optional<OffsetAndEpoch> snapshotId, long offset) {
if (log.isTraceEnabled()) {
if (snapshotId.isPresent()) {
@ -1743,6 +1764,9 @@ public final class QuorumController implements Controller {
topicDeletionManager.replay(record);
nodeControlManager.replay(record);
routerChannelEpochControlManager.replay(record);
if (licenseManager != null) {
licenseManager.replay(record);
}
break;
}
case REMOVE_KVRECORD: {
@ -2007,6 +2031,8 @@ public final class QuorumController implements Controller {
private final RouterChannelEpochControlManager routerChannelEpochControlManager;
private final QuorumControllerExtension extension;
private final LicenseManager licenseManager;
// AutoMQ for Kafka inject end
@ -2046,6 +2072,7 @@ public final class QuorumController implements Controller {
// AutoMQ inject start
StreamClient streamClient,
List<String> quorumVoters,
LicenseManager licenseManager,
Function<QuorumController, QuorumControllerExtension> extension
// AutoMQ inject end
@ -2176,6 +2203,8 @@ public final class QuorumController implements Controller {
this.nodeControlManager = new NodeControlManager(snapshotRegistry, new DefaultNodeRuntimeInfoManager(clusterControl, streamControlManager));
this.routerChannelEpochControlManager = new RouterChannelEpochControlManager(snapshotRegistry, this, nodeControlManager, time);
this.extension = extension.apply(this);
this.licenseManager = licenseManager;
// set the nodeControlManager here to avoid circular dependency
this.replicationControl.setNodeControlManager(nodeControlManager);
@ -2797,6 +2826,91 @@ public final class QuorumController implements Controller {
);
}
@Override
public CompletableFuture<DescribeLicenseResponseData> describeLicense(
ControllerRequestContext context,
DescribeLicenseRequestData request
) {
if (licenseManager == null) {
return CompletableFuture.completedFuture(
new DescribeLicenseResponseData()
.setErrorCode(Errors.UNSUPPORTED_VERSION.code())
.setLicense("")
.setThrottleTimeMs(0)
);
}
return appendReadEvent("describeLicense", context.deadlineNs(), () -> {
String license = licenseManager.describeLicense();
return new DescribeLicenseResponseData()
.setErrorCode(Errors.NONE.code())
.setLicense(license)
.setThrottleTimeMs(0);
});
}
@Override
public CompletableFuture<UpdateLicenseResponseData> updateLicense(
ControllerRequestContext context,
UpdateLicenseRequestData request
) {
if (licenseManager == null) {
return CompletableFuture.completedFuture(
new UpdateLicenseResponseData()
.setErrorCode(Errors.UNSUPPORTED_VERSION.code())
.setErrorMessage("License management is not supported")
.setThrottleTimeMs(0)
);
}
String license = request.license();
if (!licenseManager.checkLicense(license, true)) {
return CompletableFuture.completedFuture(
new UpdateLicenseResponseData()
.setErrorCode(Errors.POLICY_VIOLATION.code())
.setErrorMessage("license check failed")
.setThrottleTimeMs(0)
);
}
return appendWriteEvent("updateLicense", context.deadlineNs(), () -> {
try {
List<ApiMessageAndVersion> recordsToAppend = licenseManager.getRecordsToAppend(license);
UpdateLicenseResponseData response = new UpdateLicenseResponseData()
.setErrorCode(Errors.NONE.code())
.setErrorMessage("")
.setThrottleTimeMs(0);
return ControllerResult.of(recordsToAppend, response);
} catch (Exception e) {
log.error("Failed to process license update", e);
UpdateLicenseResponseData response = new UpdateLicenseResponseData()
.setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code())
.setErrorMessage("Failed to process update: " + e.getMessage())
.setThrottleTimeMs(0);
return ControllerResult.of(Collections.emptyList(), response);
}
});
}
@Override
public CompletableFuture<ExportClusterManifestResponseData> exportClusterManifest(
ControllerRequestContext context,
ExportClusterManifestRequestData request
) {
if (licenseManager == null) {
return CompletableFuture.completedFuture(
new ExportClusterManifestResponseData()
.setErrorCode(Errors.UNSUPPORTED_VERSION.code())
.setManifest("")
.setThrottleTimeMs(0)
);
}
return appendReadEvent("exportClusterManifest", context.deadlineNs(), () -> {
String manifest = licenseManager.exportClusterManifest();
return new ExportClusterManifestResponseData()
.setErrorCode(Errors.NONE.code())
.setManifest(manifest)
.setThrottleTimeMs(0);
});
}
@Override
public CompletableFuture<AutomqRegisterNodeResponseData> registerNode(ControllerRequestContext context, AutomqRegisterNodeRequest req) {
return appendWriteEvent("registerNode", context.deadlineNs(), () -> nodeControlManager.register(req));

View File

@ -60,4 +60,9 @@ public class S3StreamKafkaMetricsConstants {
// Back Pressure
public static final String BACK_PRESSURE_STATE_METRIC_NAME = "back_pressure_state";
public static final AttributeKey<String> LABEL_BACK_PRESSURE_STATE = AttributeKey.stringKey("state");
// License
public static final String LICENSE_EXPIRY_TIMESTAMP_METRIC_NAME = "license_expiry_timestamp";
public static final String LICENSE_SECONDS_REMAINING_METRIC_NAME = "automq_enterprise_license_expiry_seconds";
public static final String NODE_VCPU_COUNT_METRIC_NAME = "node_vcpu_count";
}

View File

@ -131,6 +131,17 @@ public class S3StreamKafkaMetricsManager {
*/
private static Supplier<String> certChainSupplier = () -> null;
/**
* Supplier for license expiry date.
* Returns the expiry date of the license, or null if no license is configured.
*/
private static Supplier<Date> licenseExpireDateSupplier = () -> null;
// License metrics
private static ObservableLongGauge licenseExpiryTimestampMetrics = new NoopObservableLongGauge();
private static ObservableLongGauge licenseSecondsRemainingMetrics = new NoopObservableLongGauge();
private static ObservableLongGauge nodeVcpuCountMetrics = new NoopObservableLongGauge();
public static void configure(MetricsConfig metricsConfig) {
synchronized (BASE_ATTRIBUTES_LISTENERS) {
S3StreamKafkaMetricsManager.metricsConfig = metricsConfig;
@ -147,6 +158,7 @@ public class S3StreamKafkaMetricsManager {
initLogAppendMetrics(meter, prefix);
initPartitionStatusStatisticsMetrics(meter, prefix);
initBackPressureMetrics(meter, prefix);
initLicenseMetrics(meter, prefix);
try {
initCertMetrics(meter, prefix);
} catch (Exception e) {
@ -324,6 +336,44 @@ public class S3StreamKafkaMetricsManager {
});
}
private static void initLicenseMetrics(Meter meter, String prefix) {
licenseExpiryTimestampMetrics = meter.gaugeBuilder(prefix + S3StreamKafkaMetricsConstants.LICENSE_EXPIRY_TIMESTAMP_METRIC_NAME)
.setDescription("The expiry timestamp of the license")
.setUnit("milliseconds")
.ofLongs()
.buildWithCallback(result -> {
if (MetricsLevel.INFO.isWithin(metricsConfig.getMetricsLevel())) {
Date expireDate = licenseExpireDateSupplier.get();
if (expireDate != null) {
result.record(expireDate.getTime(), metricsConfig.getBaseAttributes());
}
}
});
licenseSecondsRemainingMetrics = meter.gaugeBuilder(prefix + S3StreamKafkaMetricsConstants.LICENSE_SECONDS_REMAINING_METRIC_NAME)
.setDescription("The remaining seconds until the license expires")
.setUnit("seconds")
.ofLongs()
.buildWithCallback(result -> {
if (MetricsLevel.INFO.isWithin(metricsConfig.getMetricsLevel())) {
Date expireDate = licenseExpireDateSupplier.get();
if (expireDate != null) {
long secondsRemaining = (expireDate.getTime() - System.currentTimeMillis()) / 1000;
result.record(secondsRemaining, metricsConfig.getBaseAttributes());
}
}
});
nodeVcpuCountMetrics = meter.gaugeBuilder(prefix + S3StreamKafkaMetricsConstants.NODE_VCPU_COUNT_METRIC_NAME)
.setDescription("The number of vCPUs available on this node")
.ofLongs()
.buildWithCallback(result -> {
if (MetricsLevel.INFO.isWithin(metricsConfig.getMetricsLevel())) {
result.record(Runtime.getRuntime().availableProcessors(), metricsConfig.getBaseAttributes());
}
});
}
/**
* Initialize the certificate metrics.
*
@ -516,4 +566,8 @@ public class S3StreamKafkaMetricsManager {
public static void setCertChainSupplier(Supplier<String> certChainSupplier) {
S3StreamKafkaMetricsManager.certChainSupplier = certChainSupplier;
}
public static void setLicenseExpireDateSupplier(Supplier<Date> licenseExpireDateSupplier) {
S3StreamKafkaMetricsManager.licenseExpireDateSupplier = licenseExpireDateSupplier;
}
}