Compare commits
1 Commits
main
...
feat/licen
| Author | SHA1 | Date |
|---|---|---|
|
|
892a10824e |
|
|
@ -9,14 +9,6 @@ or [Slack](https://join.slack.com/t/automq/shared_invite/zt-29h17vye9-thf31ebIVL
|
|||
Before getting started, please review AutoMQ's Code of Conduct. Everyone interacting in Slack or WeChat
|
||||
follow [Code of Conduct](CODE_OF_CONDUCT.md).
|
||||
|
||||
## Suggested Onboarding Path for New Contributors
|
||||
|
||||
If you are new to AutoMQ, it is recommended to first deploy and run AutoMQ using Docker as described in the README.
|
||||
This helps you quickly understand AutoMQ’s core concepts and behavior without local environment complexity.
|
||||
|
||||
After gaining familiarity, contributors who want to work on code can follow the steps in this guide to build and run AutoMQ locally.
|
||||
|
||||
|
||||
## Code Contributions
|
||||
|
||||
### Finding or Reporting Issues
|
||||
|
|
|
|||
|
|
@ -2341,12 +2341,6 @@ project(':automq-metrics') {
|
|||
configProperties = checkstyleConfigProperties("import-control-server.xml")
|
||||
}
|
||||
|
||||
configurations {
|
||||
all {
|
||||
exclude group: 'io.opentelemetry', module: 'opentelemetry-exporter-sender-okhttp'
|
||||
}
|
||||
}
|
||||
|
||||
dependencies {
|
||||
// OpenTelemetry core dependencies
|
||||
api libs.opentelemetryJava8
|
||||
|
|
@ -2356,7 +2350,6 @@ project(':automq-metrics') {
|
|||
api libs.opentelemetryExporterLogging
|
||||
api libs.opentelemetryExporterProm
|
||||
api libs.opentelemetryExporterOTLP
|
||||
api libs.opentelemetryExporterSenderJdk
|
||||
api libs.opentelemetryJmx
|
||||
|
||||
// Logging dependencies
|
||||
|
|
|
|||
|
|
@ -1739,6 +1739,24 @@ public interface Admin extends AutoCloseable {
|
|||
* @return {@link UpdateGroupResult}
|
||||
*/
|
||||
UpdateGroupResult updateGroup(String groupId, UpdateGroupSpec groupSpec, UpdateGroupOptions options);
|
||||
|
||||
default UpdateLicenseResult updateLicense(String license) {
|
||||
return updateLicense(license, new UpdateLicenseOptions());
|
||||
}
|
||||
|
||||
UpdateLicenseResult updateLicense(String license, UpdateLicenseOptions options);
|
||||
|
||||
default DescribeLicenseResult describeLicense() {
|
||||
return describeLicense(new DescribeLicenseOptions());
|
||||
}
|
||||
|
||||
DescribeLicenseResult describeLicense(DescribeLicenseOptions options);
|
||||
|
||||
default ExportClusterManifestResult exportClusterManifest() {
|
||||
return exportClusterManifest(new ExportClusterManifestOptions());
|
||||
}
|
||||
|
||||
ExportClusterManifestResult exportClusterManifest(ExportClusterManifestOptions options);
|
||||
// AutoMQ inject end
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -0,0 +1,31 @@
|
|||
/*
|
||||
* Copyright 2025, AutoMQ HK Limited.
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.kafka.clients.admin;
|
||||
|
||||
import org.apache.kafka.common.annotation.InterfaceStability;
|
||||
|
||||
/**
|
||||
* Options for {@link Admin#describeLicense(DescribeLicenseOptions)}.
|
||||
*
|
||||
* The API of this class is evolving, see {@link Admin} for details.
|
||||
*/
|
||||
@InterfaceStability.Evolving
|
||||
public class DescribeLicenseOptions extends AbstractOptions<DescribeLicenseOptions> {
|
||||
}
|
||||
|
|
@ -0,0 +1,44 @@
|
|||
/*
|
||||
* Copyright 2025, AutoMQ HK Limited.
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.kafka.clients.admin;
|
||||
|
||||
import org.apache.kafka.common.KafkaFuture;
|
||||
import org.apache.kafka.common.annotation.InterfaceStability;
|
||||
|
||||
/**
|
||||
* The result of the {@link Admin#describeLicense(DescribeLicenseOptions)} call.
|
||||
*
|
||||
* The API of this class is evolving, see {@link Admin} for details.
|
||||
*/
|
||||
@InterfaceStability.Evolving
|
||||
public class DescribeLicenseResult {
|
||||
private final KafkaFuture<String> future;
|
||||
|
||||
DescribeLicenseResult(KafkaFuture<String> future) {
|
||||
this.future = future;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return a future which returns the current license.
|
||||
*/
|
||||
public KafkaFuture<String> license() {
|
||||
return future;
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,31 @@
|
|||
/*
|
||||
* Copyright 2025, AutoMQ HK Limited.
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.kafka.clients.admin;
|
||||
|
||||
import org.apache.kafka.common.annotation.InterfaceStability;
|
||||
|
||||
/**
|
||||
* Options for {@link Admin#exportClusterManifest(ExportClusterManifestOptions)}.
|
||||
*
|
||||
* The API of this class is evolving, see {@link Admin} for details.
|
||||
*/
|
||||
@InterfaceStability.Evolving
|
||||
public class ExportClusterManifestOptions extends AbstractOptions<ExportClusterManifestOptions> {
|
||||
}
|
||||
|
|
@ -0,0 +1,44 @@
|
|||
/*
|
||||
* Copyright 2025, AutoMQ HK Limited.
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.kafka.clients.admin;
|
||||
|
||||
import org.apache.kafka.common.KafkaFuture;
|
||||
import org.apache.kafka.common.annotation.InterfaceStability;
|
||||
|
||||
/**
|
||||
* The result of the {@link Admin#exportClusterManifest(ExportClusterManifestOptions)} call.
|
||||
*
|
||||
* The API of this class is evolving, see {@link Admin} for details.
|
||||
*/
|
||||
@InterfaceStability.Evolving
|
||||
public class ExportClusterManifestResult {
|
||||
private final KafkaFuture<String> future;
|
||||
|
||||
ExportClusterManifestResult(KafkaFuture<String> future) {
|
||||
this.future = future;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return a future which returns the exported cluster identity manifest.
|
||||
*/
|
||||
public KafkaFuture<String> manifest() {
|
||||
return future;
|
||||
}
|
||||
}
|
||||
|
|
@ -320,5 +320,20 @@ public class ForwardingAdmin implements Admin {
|
|||
return delegate.updateGroup(groupId, groupSpec, options);
|
||||
}
|
||||
|
||||
@Override
|
||||
public UpdateLicenseResult updateLicense(String license, UpdateLicenseOptions options) {
|
||||
return delegate.updateLicense(license);
|
||||
}
|
||||
|
||||
@Override
|
||||
public DescribeLicenseResult describeLicense(DescribeLicenseOptions options) {
|
||||
return delegate.describeLicense(options);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ExportClusterManifestResult exportClusterManifest(ExportClusterManifestOptions options) {
|
||||
return delegate.exportClusterManifest(options);
|
||||
}
|
||||
|
||||
// AutoMQ inject end
|
||||
}
|
||||
|
|
|
|||
|
|
@ -139,6 +139,7 @@ import org.apache.kafka.common.message.DescribeClusterRequestData;
|
|||
import org.apache.kafka.common.message.DescribeClusterResponseData;
|
||||
import org.apache.kafka.common.message.DescribeConfigsRequestData;
|
||||
import org.apache.kafka.common.message.DescribeConfigsResponseData;
|
||||
import org.apache.kafka.common.message.DescribeLicenseRequestData;
|
||||
import org.apache.kafka.common.message.DescribeLogDirsRequestData;
|
||||
import org.apache.kafka.common.message.DescribeLogDirsRequestData.DescribableLogDirTopic;
|
||||
import org.apache.kafka.common.message.DescribeLogDirsResponseData;
|
||||
|
|
@ -152,6 +153,7 @@ import org.apache.kafka.common.message.DescribeUserScramCredentialsRequestData;
|
|||
import org.apache.kafka.common.message.DescribeUserScramCredentialsRequestData.UserName;
|
||||
import org.apache.kafka.common.message.DescribeUserScramCredentialsResponseData;
|
||||
import org.apache.kafka.common.message.ExpireDelegationTokenRequestData;
|
||||
import org.apache.kafka.common.message.ExportClusterManifestRequestData;
|
||||
import org.apache.kafka.common.message.GetNextNodeIdRequestData;
|
||||
import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData;
|
||||
import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity;
|
||||
|
|
@ -165,6 +167,7 @@ import org.apache.kafka.common.message.RenewDelegationTokenRequestData;
|
|||
import org.apache.kafka.common.message.UnregisterBrokerRequestData;
|
||||
import org.apache.kafka.common.message.UpdateFeaturesRequestData;
|
||||
import org.apache.kafka.common.message.UpdateFeaturesResponseData.UpdatableFeatureResult;
|
||||
import org.apache.kafka.common.message.UpdateLicenseRequestData;
|
||||
import org.apache.kafka.common.metrics.KafkaMetricsContext;
|
||||
import org.apache.kafka.common.metrics.MetricConfig;
|
||||
import org.apache.kafka.common.metrics.Metrics;
|
||||
|
|
@ -250,8 +253,14 @@ import org.apache.kafka.common.requests.UpdateFeaturesRequest;
|
|||
import org.apache.kafka.common.requests.UpdateFeaturesResponse;
|
||||
import org.apache.kafka.common.requests.s3.AutomqGetNodesRequest;
|
||||
import org.apache.kafka.common.requests.s3.AutomqGetNodesResponse;
|
||||
import org.apache.kafka.common.requests.s3.DescribeLicenseRequest;
|
||||
import org.apache.kafka.common.requests.s3.DescribeLicenseResponse;
|
||||
import org.apache.kafka.common.requests.s3.ExportClusterManifestRequest;
|
||||
import org.apache.kafka.common.requests.s3.ExportClusterManifestResponse;
|
||||
import org.apache.kafka.common.requests.s3.GetNextNodeIdRequest;
|
||||
import org.apache.kafka.common.requests.s3.GetNextNodeIdResponse;
|
||||
import org.apache.kafka.common.requests.s3.UpdateLicenseRequest;
|
||||
import org.apache.kafka.common.requests.s3.UpdateLicenseResponse;
|
||||
import org.apache.kafka.common.security.auth.KafkaPrincipal;
|
||||
import org.apache.kafka.common.security.scram.internals.ScramFormatter;
|
||||
import org.apache.kafka.common.security.token.delegation.DelegationToken;
|
||||
|
|
@ -2464,6 +2473,125 @@ public class KafkaAdminClient extends AdminClient {
|
|||
}, now);
|
||||
return new GetNextNodeIdResult(nodeIdFuture);
|
||||
}
|
||||
|
||||
@Override
|
||||
public DescribeLicenseResult describeLicense(final DescribeLicenseOptions options) {
|
||||
final KafkaFutureImpl<String> future = new KafkaFutureImpl<>();
|
||||
|
||||
final long now = time.milliseconds();
|
||||
final Call call = new Call("describeLicense", calcDeadlineMs(now, options.timeoutMs()),
|
||||
new ControllerNodeProvider()) {
|
||||
|
||||
@Override
|
||||
DescribeLicenseRequest.Builder createRequest(int timeoutMs) {
|
||||
return new DescribeLicenseRequest.Builder(new DescribeLicenseRequestData());
|
||||
}
|
||||
|
||||
@Override
|
||||
void handleResponse(AbstractResponse abstractResponse) {
|
||||
final DescribeLicenseResponse response =
|
||||
(DescribeLicenseResponse) abstractResponse;
|
||||
future.complete(response.data().license());
|
||||
}
|
||||
|
||||
@Override
|
||||
void handleFailure(Throwable throwable) {
|
||||
future.completeExceptionally(throwable);
|
||||
}
|
||||
};
|
||||
|
||||
runnable.call(call, now);
|
||||
return new DescribeLicenseResult(future);
|
||||
}
|
||||
|
||||
@Override
|
||||
public UpdateLicenseResult updateLicense(final String license,
|
||||
final UpdateLicenseOptions options) {
|
||||
if (license == null || license.isEmpty()) {
|
||||
throw new IllegalArgumentException("License can not be null or empty.");
|
||||
}
|
||||
|
||||
final KafkaFutureImpl<Void> future = new KafkaFutureImpl<>();
|
||||
|
||||
final long now = time.milliseconds();
|
||||
final Call call = new Call("updateLicense", calcDeadlineMs(now, options.timeoutMs()),
|
||||
new ControllerNodeProvider(true)) {
|
||||
|
||||
@Override
|
||||
UpdateLicenseRequest.Builder createRequest(int timeoutMs) {
|
||||
return new UpdateLicenseRequest.Builder(
|
||||
new UpdateLicenseRequestData()
|
||||
.setLicense(license));
|
||||
}
|
||||
|
||||
@Override
|
||||
void handleResponse(AbstractResponse abstractResponse) {
|
||||
final UpdateLicenseResponse response =
|
||||
(UpdateLicenseResponse) abstractResponse;
|
||||
Errors topLevelError = Errors.forCode(response.data().errorCode());
|
||||
switch (topLevelError) {
|
||||
case NONE:
|
||||
future.complete(null);
|
||||
break;
|
||||
case NOT_CONTROLLER:
|
||||
handleNotControllerError(topLevelError);
|
||||
break;
|
||||
default:
|
||||
future.completeExceptionally(topLevelError.exception(response.data().errorMessage()));
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
void handleFailure(Throwable throwable) {
|
||||
future.completeExceptionally(throwable);
|
||||
}
|
||||
};
|
||||
|
||||
runnable.call(call, now);
|
||||
return new UpdateLicenseResult(future);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ExportClusterManifestResult exportClusterManifest(final ExportClusterManifestOptions options) {
|
||||
final KafkaFutureImpl<String> future = new KafkaFutureImpl<>();
|
||||
|
||||
final long now = time.milliseconds();
|
||||
final Call call = new Call("exportClusterManifest", calcDeadlineMs(now, options.timeoutMs()),
|
||||
new ControllerNodeProvider(true)) {
|
||||
|
||||
@Override
|
||||
ExportClusterManifestRequest.Builder createRequest(int timeoutMs) {
|
||||
return new ExportClusterManifestRequest.Builder(new ExportClusterManifestRequestData());
|
||||
}
|
||||
|
||||
@Override
|
||||
void handleResponse(AbstractResponse abstractResponse) {
|
||||
final ExportClusterManifestResponse response =
|
||||
(ExportClusterManifestResponse) abstractResponse;
|
||||
Errors topLevelError = Errors.forCode(response.data().errorCode());
|
||||
switch (topLevelError) {
|
||||
case NONE:
|
||||
future.complete(response.data().manifest());
|
||||
break;
|
||||
case NOT_CONTROLLER:
|
||||
handleNotControllerError(topLevelError);
|
||||
break;
|
||||
default:
|
||||
future.completeExceptionally(topLevelError.exception());
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
void handleFailure(Throwable throwable) {
|
||||
future.completeExceptionally(throwable);
|
||||
}
|
||||
};
|
||||
|
||||
runnable.call(call, now);
|
||||
return new ExportClusterManifestResult(future);
|
||||
}
|
||||
// AutoMQ for Kafka inject end
|
||||
|
||||
private TopicDescription getTopicDescriptionFromCluster(Cluster cluster, String topicName, Uuid topicId,
|
||||
|
|
|
|||
|
|
@ -0,0 +1,31 @@
|
|||
/*
|
||||
* Copyright 2025, AutoMQ HK Limited.
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.kafka.clients.admin;
|
||||
|
||||
import org.apache.kafka.common.annotation.InterfaceStability;
|
||||
|
||||
/**
|
||||
* Options for {@link Admin#updateLicense(String, UpdateLicenseOptions)}.
|
||||
*
|
||||
* The API of this class is evolving, see {@link Admin} for details.
|
||||
*/
|
||||
@InterfaceStability.Evolving
|
||||
public class UpdateLicenseOptions extends AbstractOptions<UpdateLicenseOptions> {
|
||||
}
|
||||
|
|
@ -0,0 +1,44 @@
|
|||
/*
|
||||
* Copyright 2025, AutoMQ HK Limited.
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.kafka.clients.admin;
|
||||
|
||||
import org.apache.kafka.common.KafkaFuture;
|
||||
import org.apache.kafka.common.annotation.InterfaceStability;
|
||||
|
||||
/**
|
||||
* The result of the {@link Admin#updateLicense(String, UpdateLicenseOptions)} call.
|
||||
*
|
||||
* The API of this class is evolving, see {@link Admin} for details.
|
||||
*/
|
||||
@InterfaceStability.Evolving
|
||||
public class UpdateLicenseResult {
|
||||
private final KafkaFuture<Void> future;
|
||||
|
||||
UpdateLicenseResult(KafkaFuture<Void> future) {
|
||||
this.future = future;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return a future which succeeds if the license update succeeds.
|
||||
*/
|
||||
public KafkaFuture<Void> all() {
|
||||
return future;
|
||||
}
|
||||
}
|
||||
|
|
@ -145,6 +145,9 @@ public enum ApiKeys {
|
|||
GET_KVS(ApiMessageType.GET_KVS, false, true),
|
||||
PUT_KVS(ApiMessageType.PUT_KVS, false, true),
|
||||
DELETE_KVS(ApiMessageType.DELETE_KVS, false, true),
|
||||
UPDATE_LICENSE(ApiMessageType.UPDATE_LICENSE, false, true),
|
||||
DESCRIBE_LICENSE(ApiMessageType.DESCRIBE_LICENSE, false, true),
|
||||
EXPORT_CLUSTER_MANIFEST(ApiMessageType.EXPORT_CLUSTER_MANIFEST, false, true),
|
||||
AUTOMQ_REGISTER_NODE(ApiMessageType.AUTOMQ_REGISTER_NODE, false, false),
|
||||
AUTOMQ_GET_NODES(ApiMessageType.AUTOMQ_GET_NODES, false, true),
|
||||
AUTOMQ_ZONE_ROUTER(ApiMessageType.AUTOMQ_ZONE_ROUTER, false, false),
|
||||
|
|
|
|||
|
|
@ -34,7 +34,9 @@ import org.apache.kafka.common.requests.s3.CommitStreamSetObjectRequest;
|
|||
import org.apache.kafka.common.requests.s3.CreateStreamsRequest;
|
||||
import org.apache.kafka.common.requests.s3.DeleteKVsRequest;
|
||||
import org.apache.kafka.common.requests.s3.DeleteStreamsRequest;
|
||||
import org.apache.kafka.common.requests.s3.DescribeLicenseRequest;
|
||||
import org.apache.kafka.common.requests.s3.DescribeStreamsRequest;
|
||||
import org.apache.kafka.common.requests.s3.ExportClusterManifestRequest;
|
||||
import org.apache.kafka.common.requests.s3.GetKVsRequest;
|
||||
import org.apache.kafka.common.requests.s3.GetNextNodeIdRequest;
|
||||
import org.apache.kafka.common.requests.s3.GetOpeningStreamsRequest;
|
||||
|
|
@ -42,6 +44,7 @@ import org.apache.kafka.common.requests.s3.OpenStreamsRequest;
|
|||
import org.apache.kafka.common.requests.s3.PrepareS3ObjectRequest;
|
||||
import org.apache.kafka.common.requests.s3.PutKVsRequest;
|
||||
import org.apache.kafka.common.requests.s3.TrimStreamsRequest;
|
||||
import org.apache.kafka.common.requests.s3.UpdateLicenseRequest;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Map;
|
||||
|
|
@ -385,6 +388,12 @@ public abstract class AbstractRequest implements AbstractRequestResponse {
|
|||
return DescribeStreamsRequest.parse(buffer, apiVersion);
|
||||
case AUTOMQ_UPDATE_GROUP:
|
||||
return AutomqUpdateGroupRequest.parse(buffer, apiVersion);
|
||||
case UPDATE_LICENSE:
|
||||
return UpdateLicenseRequest.parse(buffer, apiVersion);
|
||||
case DESCRIBE_LICENSE:
|
||||
return DescribeLicenseRequest.parse(buffer, apiVersion);
|
||||
case EXPORT_CLUSTER_MANIFEST:
|
||||
return ExportClusterManifestRequest.parse(buffer, apiVersion);
|
||||
// AutoMQ for Kafka inject end
|
||||
|
||||
case SHARE_GROUP_HEARTBEAT:
|
||||
|
|
|
|||
|
|
@ -32,7 +32,9 @@ import org.apache.kafka.common.requests.s3.CommitStreamSetObjectResponse;
|
|||
import org.apache.kafka.common.requests.s3.CreateStreamsResponse;
|
||||
import org.apache.kafka.common.requests.s3.DeleteKVsResponse;
|
||||
import org.apache.kafka.common.requests.s3.DeleteStreamsResponse;
|
||||
import org.apache.kafka.common.requests.s3.DescribeLicenseResponse;
|
||||
import org.apache.kafka.common.requests.s3.DescribeStreamsResponse;
|
||||
import org.apache.kafka.common.requests.s3.ExportClusterManifestResponse;
|
||||
import org.apache.kafka.common.requests.s3.GetKVsResponse;
|
||||
import org.apache.kafka.common.requests.s3.GetNextNodeIdResponse;
|
||||
import org.apache.kafka.common.requests.s3.GetOpeningStreamsResponse;
|
||||
|
|
@ -40,6 +42,7 @@ import org.apache.kafka.common.requests.s3.OpenStreamsResponse;
|
|||
import org.apache.kafka.common.requests.s3.PrepareS3ObjectResponse;
|
||||
import org.apache.kafka.common.requests.s3.PutKVsResponse;
|
||||
import org.apache.kafka.common.requests.s3.TrimStreamsResponse;
|
||||
import org.apache.kafka.common.requests.s3.UpdateLicenseResponse;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Collection;
|
||||
|
|
@ -322,6 +325,12 @@ public abstract class AbstractResponse implements AbstractRequestResponse {
|
|||
return DescribeStreamsResponse.parse(responseBuffer, version);
|
||||
case AUTOMQ_UPDATE_GROUP:
|
||||
return AutomqUpdateGroupResponse.parse(responseBuffer, version);
|
||||
case UPDATE_LICENSE:
|
||||
return UpdateLicenseResponse.parse(responseBuffer, version);
|
||||
case DESCRIBE_LICENSE:
|
||||
return DescribeLicenseResponse.parse(responseBuffer, version);
|
||||
case EXPORT_CLUSTER_MANIFEST:
|
||||
return ExportClusterManifestResponse.parse(responseBuffer, version);
|
||||
|
||||
// AutoMQ for Kafka inject end
|
||||
case SHARE_GROUP_HEARTBEAT:
|
||||
|
|
|
|||
|
|
@ -0,0 +1,77 @@
|
|||
/*
|
||||
* Copyright 2025, AutoMQ HK Limited.
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.kafka.common.requests.s3;
|
||||
|
||||
import org.apache.kafka.common.message.DescribeLicenseRequestData;
|
||||
import org.apache.kafka.common.message.DescribeLicenseResponseData;
|
||||
import org.apache.kafka.common.protocol.ApiKeys;
|
||||
import org.apache.kafka.common.protocol.ByteBufferAccessor;
|
||||
import org.apache.kafka.common.requests.AbstractRequest;
|
||||
import org.apache.kafka.common.requests.ApiError;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
public class DescribeLicenseRequest extends AbstractRequest {
|
||||
public static class Builder extends AbstractRequest.Builder<DescribeLicenseRequest> {
|
||||
|
||||
private final DescribeLicenseRequestData data;
|
||||
|
||||
public Builder(DescribeLicenseRequestData data) {
|
||||
super(ApiKeys.DESCRIBE_LICENSE);
|
||||
this.data = data;
|
||||
}
|
||||
|
||||
@Override
|
||||
public DescribeLicenseRequest build(short version) {
|
||||
return new DescribeLicenseRequest(data, version);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return data.toString();
|
||||
}
|
||||
}
|
||||
|
||||
private final DescribeLicenseRequestData data;
|
||||
|
||||
public DescribeLicenseRequest(DescribeLicenseRequestData data, short version) {
|
||||
super(ApiKeys.DESCRIBE_LICENSE, version);
|
||||
this.data = data;
|
||||
}
|
||||
|
||||
@Override
|
||||
public DescribeLicenseResponse getErrorResponse(int throttleTimeMs, Throwable e) {
|
||||
ApiError apiError = ApiError.fromThrowable(e);
|
||||
DescribeLicenseResponseData response = new DescribeLicenseResponseData()
|
||||
.setErrorCode(apiError.error().code())
|
||||
.setThrottleTimeMs(throttleTimeMs);
|
||||
return new DescribeLicenseResponse(response);
|
||||
}
|
||||
|
||||
@Override
|
||||
public DescribeLicenseRequestData data() {
|
||||
return data;
|
||||
}
|
||||
|
||||
public static DescribeLicenseRequest parse(ByteBuffer buffer, short version) {
|
||||
return new DescribeLicenseRequest(new DescribeLicenseRequestData(
|
||||
new ByteBufferAccessor(buffer), version), version);
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,66 @@
|
|||
/*
|
||||
* Copyright 2025, AutoMQ HK Limited.
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.kafka.common.requests.s3;
|
||||
|
||||
import org.apache.kafka.common.message.DescribeLicenseResponseData;
|
||||
import org.apache.kafka.common.protocol.ApiKeys;
|
||||
import org.apache.kafka.common.protocol.ByteBufferAccessor;
|
||||
import org.apache.kafka.common.protocol.Errors;
|
||||
import org.apache.kafka.common.requests.AbstractResponse;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
public class DescribeLicenseResponse extends AbstractResponse {
|
||||
private final DescribeLicenseResponseData data;
|
||||
|
||||
public DescribeLicenseResponse(DescribeLicenseResponseData data) {
|
||||
super(ApiKeys.DESCRIBE_LICENSE);
|
||||
this.data = data;
|
||||
}
|
||||
|
||||
@Override
|
||||
public DescribeLicenseResponseData data() {
|
||||
return data;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<Errors, Integer> errorCounts() {
|
||||
Map<Errors, Integer> errorCounts = new HashMap<>();
|
||||
updateErrorCounts(errorCounts, Errors.forCode(data.errorCode()));
|
||||
return errorCounts;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int throttleTimeMs() {
|
||||
return data.throttleTimeMs();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void maybeSetThrottleTimeMs(int throttleTimeMs) {
|
||||
data.setThrottleTimeMs(throttleTimeMs);
|
||||
}
|
||||
|
||||
public static DescribeLicenseResponse parse(ByteBuffer buffer, short version) {
|
||||
return new DescribeLicenseResponse(new DescribeLicenseResponseData(
|
||||
new ByteBufferAccessor(buffer), version));
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,77 @@
|
|||
/*
|
||||
* Copyright 2025, AutoMQ HK Limited.
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.kafka.common.requests.s3;
|
||||
|
||||
import org.apache.kafka.common.message.ExportClusterManifestRequestData;
|
||||
import org.apache.kafka.common.message.ExportClusterManifestResponseData;
|
||||
import org.apache.kafka.common.protocol.ApiKeys;
|
||||
import org.apache.kafka.common.protocol.ByteBufferAccessor;
|
||||
import org.apache.kafka.common.requests.AbstractRequest;
|
||||
import org.apache.kafka.common.requests.ApiError;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
public class ExportClusterManifestRequest extends AbstractRequest {
|
||||
public static class Builder extends AbstractRequest.Builder<ExportClusterManifestRequest> {
|
||||
|
||||
private final ExportClusterManifestRequestData data;
|
||||
|
||||
public Builder(ExportClusterManifestRequestData data) {
|
||||
super(ApiKeys.EXPORT_CLUSTER_MANIFEST);
|
||||
this.data = data;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ExportClusterManifestRequest build(short version) {
|
||||
return new ExportClusterManifestRequest(data, version);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return data.toString();
|
||||
}
|
||||
}
|
||||
|
||||
private final ExportClusterManifestRequestData data;
|
||||
|
||||
public ExportClusterManifestRequest(ExportClusterManifestRequestData data, short version) {
|
||||
super(ApiKeys.EXPORT_CLUSTER_MANIFEST, version);
|
||||
this.data = data;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ExportClusterManifestResponse getErrorResponse(int throttleTimeMs, Throwable e) {
|
||||
ApiError apiError = ApiError.fromThrowable(e);
|
||||
ExportClusterManifestResponseData response = new ExportClusterManifestResponseData()
|
||||
.setErrorCode(apiError.error().code())
|
||||
.setThrottleTimeMs(throttleTimeMs);
|
||||
return new ExportClusterManifestResponse(response);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ExportClusterManifestRequestData data() {
|
||||
return data;
|
||||
}
|
||||
|
||||
public static ExportClusterManifestRequest parse(ByteBuffer buffer, short version) {
|
||||
return new ExportClusterManifestRequest(new ExportClusterManifestRequestData(
|
||||
new ByteBufferAccessor(buffer), version), version);
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,66 @@
|
|||
/*
|
||||
* Copyright 2025, AutoMQ HK Limited.
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.kafka.common.requests.s3;
|
||||
|
||||
import org.apache.kafka.common.message.ExportClusterManifestResponseData;
|
||||
import org.apache.kafka.common.protocol.ApiKeys;
|
||||
import org.apache.kafka.common.protocol.ByteBufferAccessor;
|
||||
import org.apache.kafka.common.protocol.Errors;
|
||||
import org.apache.kafka.common.requests.AbstractResponse;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
public class ExportClusterManifestResponse extends AbstractResponse {
|
||||
private final ExportClusterManifestResponseData data;
|
||||
|
||||
public ExportClusterManifestResponse(ExportClusterManifestResponseData data) {
|
||||
super(ApiKeys.EXPORT_CLUSTER_MANIFEST);
|
||||
this.data = data;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ExportClusterManifestResponseData data() {
|
||||
return data;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<Errors, Integer> errorCounts() {
|
||||
Map<Errors, Integer> errorCounts = new HashMap<>();
|
||||
updateErrorCounts(errorCounts, Errors.forCode(data.errorCode()));
|
||||
return errorCounts;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int throttleTimeMs() {
|
||||
return data.throttleTimeMs();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void maybeSetThrottleTimeMs(int throttleTimeMs) {
|
||||
data.setThrottleTimeMs(throttleTimeMs);
|
||||
}
|
||||
|
||||
public static ExportClusterManifestResponse parse(ByteBuffer buffer, short version) {
|
||||
return new ExportClusterManifestResponse(new ExportClusterManifestResponseData(
|
||||
new ByteBufferAccessor(buffer), version));
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,78 @@
|
|||
/*
|
||||
* Copyright 2025, AutoMQ HK Limited.
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.kafka.common.requests.s3;
|
||||
|
||||
import org.apache.kafka.common.message.UpdateLicenseRequestData;
|
||||
import org.apache.kafka.common.message.UpdateLicenseResponseData;
|
||||
import org.apache.kafka.common.protocol.ApiKeys;
|
||||
import org.apache.kafka.common.protocol.ByteBufferAccessor;
|
||||
import org.apache.kafka.common.requests.AbstractRequest;
|
||||
import org.apache.kafka.common.requests.ApiError;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
public class UpdateLicenseRequest extends AbstractRequest {
|
||||
public static class Builder extends AbstractRequest.Builder<UpdateLicenseRequest> {
|
||||
|
||||
private final UpdateLicenseRequestData data;
|
||||
|
||||
public Builder(UpdateLicenseRequestData data) {
|
||||
super(ApiKeys.UPDATE_LICENSE);
|
||||
this.data = data;
|
||||
}
|
||||
|
||||
@Override
|
||||
public UpdateLicenseRequest build(short version) {
|
||||
return new UpdateLicenseRequest(data, version);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return data.toString();
|
||||
}
|
||||
}
|
||||
|
||||
private final UpdateLicenseRequestData data;
|
||||
|
||||
public UpdateLicenseRequest(UpdateLicenseRequestData data, short version) {
|
||||
super(ApiKeys.UPDATE_LICENSE, version);
|
||||
this.data = data;
|
||||
}
|
||||
|
||||
@Override
|
||||
public UpdateLicenseResponse getErrorResponse(int throttleTimeMs, Throwable e) {
|
||||
ApiError apiError = ApiError.fromThrowable(e);
|
||||
UpdateLicenseResponseData response = new UpdateLicenseResponseData()
|
||||
.setErrorCode(apiError.error().code())
|
||||
.setErrorMessage(apiError.message())
|
||||
.setThrottleTimeMs(throttleTimeMs);
|
||||
return new UpdateLicenseResponse(response);
|
||||
}
|
||||
|
||||
@Override
|
||||
public UpdateLicenseRequestData data() {
|
||||
return data;
|
||||
}
|
||||
|
||||
public static UpdateLicenseRequest parse(ByteBuffer buffer, short version) {
|
||||
return new UpdateLicenseRequest(new UpdateLicenseRequestData(
|
||||
new ByteBufferAccessor(buffer), version), version);
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,66 @@
|
|||
/*
|
||||
* Copyright 2025, AutoMQ HK Limited.
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.kafka.common.requests.s3;
|
||||
|
||||
import org.apache.kafka.common.message.UpdateLicenseResponseData;
|
||||
import org.apache.kafka.common.protocol.ApiKeys;
|
||||
import org.apache.kafka.common.protocol.ByteBufferAccessor;
|
||||
import org.apache.kafka.common.protocol.Errors;
|
||||
import org.apache.kafka.common.requests.AbstractResponse;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
public class UpdateLicenseResponse extends AbstractResponse {
|
||||
private final UpdateLicenseResponseData data;
|
||||
|
||||
public UpdateLicenseResponse(UpdateLicenseResponseData data) {
|
||||
super(ApiKeys.UPDATE_LICENSE);
|
||||
this.data = data;
|
||||
}
|
||||
|
||||
@Override
|
||||
public UpdateLicenseResponseData data() {
|
||||
return data;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<Errors, Integer> errorCounts() {
|
||||
Map<Errors, Integer> errorCounts = new HashMap<>();
|
||||
updateErrorCounts(errorCounts, Errors.forCode(data.errorCode()));
|
||||
return errorCounts;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int throttleTimeMs() {
|
||||
return data.throttleTimeMs();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void maybeSetThrottleTimeMs(int throttleTimeMs) {
|
||||
data.setThrottleTimeMs(throttleTimeMs);
|
||||
}
|
||||
|
||||
public static UpdateLicenseResponse parse(ByteBuffer buffer, short version) {
|
||||
return new UpdateLicenseResponse(new UpdateLicenseResponseData(
|
||||
new ByteBufferAccessor(buffer), version));
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,28 @@
|
|||
/*
|
||||
* Copyright 2025, AutoMQ HK Limited.
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
{
|
||||
"apiKey": 520,
|
||||
"type": "request",
|
||||
"listeners": ["controller", "broker"],
|
||||
"name": "DescribeLicenseRequest",
|
||||
"validVersions": "0",
|
||||
"flexibleVersions": "0+",
|
||||
"fields": []
|
||||
}
|
||||
|
|
@ -0,0 +1,31 @@
|
|||
/*
|
||||
* Copyright 2025, AutoMQ HK Limited.
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
{
|
||||
"apiKey": 520,
|
||||
"type": "response",
|
||||
"name": "DescribeLicenseResponse",
|
||||
"validVersions": "0",
|
||||
"flexibleVersions": "0+",
|
||||
"fields": [
|
||||
{ "name": "ErrorCode", "type": "int16", "versions": "0+" },
|
||||
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+" },
|
||||
{ "name": "License", "type": "string", "versions": "0+", "default": "" }
|
||||
]
|
||||
}
|
||||
|
|
@ -0,0 +1,28 @@
|
|||
/*
|
||||
* Copyright 2025, AutoMQ HK Limited.
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
{
|
||||
"apiKey": 521,
|
||||
"type": "request",
|
||||
"listeners": ["controller", "broker"],
|
||||
"name": "ExportClusterManifestRequest",
|
||||
"validVersions": "0",
|
||||
"flexibleVersions": "0+",
|
||||
"fields": []
|
||||
}
|
||||
|
|
@ -0,0 +1,31 @@
|
|||
/*
|
||||
* Copyright 2025, AutoMQ HK Limited.
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
{
|
||||
"apiKey": 521,
|
||||
"type": "response",
|
||||
"name": "ExportClusterManifestResponse",
|
||||
"validVersions": "0",
|
||||
"flexibleVersions": "0+",
|
||||
"fields": [
|
||||
{ "name": "ErrorCode", "type": "int16", "versions": "0+" },
|
||||
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+" },
|
||||
{ "name": "Manifest", "type": "string", "versions": "0+", "default": "", "about": "The exported cluster identity manifest" }
|
||||
]
|
||||
}
|
||||
|
|
@ -0,0 +1,30 @@
|
|||
/*
|
||||
* Copyright 2025, AutoMQ HK Limited.
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
{
|
||||
"apiKey": 519,
|
||||
"type": "request",
|
||||
"listeners": ["controller", "broker"],
|
||||
"name": "UpdateLicenseRequest",
|
||||
"validVersions": "0",
|
||||
"flexibleVersions": "0+",
|
||||
"fields": [
|
||||
{ "name": "License", "type": "string", "versions": "0+" }
|
||||
]
|
||||
}
|
||||
|
|
@ -0,0 +1,31 @@
|
|||
/*
|
||||
* Copyright 2025, AutoMQ HK Limited.
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
{
|
||||
"apiKey": 519,
|
||||
"type": "response",
|
||||
"name": "UpdateLicenseResponse",
|
||||
"validVersions": "0",
|
||||
"flexibleVersions": "0+",
|
||||
"fields": [
|
||||
{ "name": "ErrorCode", "type": "int16", "versions": "0+" },
|
||||
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+" },
|
||||
{ "name": "ErrorMessage", "type": "string", "versions": "0+", "default": "" }
|
||||
]
|
||||
}
|
||||
|
|
@ -1450,6 +1450,21 @@ public class MockAdminClient extends AdminClient {
|
|||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public UpdateLicenseResult updateLicense(String license, UpdateLicenseOptions options) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public DescribeLicenseResult describeLicense(DescribeLicenseOptions options) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ExportClusterManifestResult exportClusterManifest(ExportClusterManifestOptions options) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
// AutoMQ inject end
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,6 @@
|
|||
package kafka.automq.license;
|
||||
|
||||
import org.apache.kafka.image.publisher.MetadataPublisher;
|
||||
|
||||
public interface LicenseListener extends MetadataPublisher {
|
||||
}
|
||||
|
|
@ -17,8 +17,10 @@
|
|||
package kafka.server;
|
||||
|
||||
import kafka.automq.table.metric.TableTopicMetricsManager;
|
||||
import kafka.server.streamaspect.LicenseManagerProvider;
|
||||
|
||||
import org.apache.kafka.common.config.types.Password;
|
||||
import org.apache.kafka.controller.LicenseManager;
|
||||
import org.apache.kafka.server.ProcessRole;
|
||||
import org.apache.kafka.server.metrics.KafkaYammerMetrics;
|
||||
import org.apache.kafka.server.metrics.s3stream.S3StreamKafkaMetricsManager;
|
||||
|
|
@ -94,6 +96,19 @@ public final class TelemetrySupport {
|
|||
}
|
||||
});
|
||||
|
||||
S3StreamKafkaMetricsManager.setLicenseExpireDateSupplier(() -> {
|
||||
try {
|
||||
LicenseManager licenseManager = LicenseManagerProvider.get();
|
||||
if (licenseManager != null) {
|
||||
return licenseManager.getExpireDate();
|
||||
}
|
||||
return null;
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("Failed to obtain license expiry date", e);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
|
||||
Meter meter = manager.getMeter();
|
||||
MetricsLevel metricsLevel = parseMetricsLevel(config.s3MetricsLevel());
|
||||
long metricsIntervalMs = (long) config.s3ExporterReportIntervalMs();
|
||||
|
|
|
|||
|
|
@ -133,6 +133,9 @@ object RequestConvertToJson {
|
|||
case req: AutomqGetPartitionSnapshotRequest => AutomqGetPartitionSnapshotRequestDataJsonConverter.write(req.data, request.version)
|
||||
case req: GetNextNodeIdRequest => GetNextNodeIdRequestDataJsonConverter.write(req.data, request.version)
|
||||
case req: DescribeStreamsRequest => DescribeStreamsRequestDataJsonConverter.write(req.data, request.version)
|
||||
case req: UpdateLicenseRequest => UpdateLicenseRequestDataJsonConverter.write(req.data, request.version)
|
||||
case req: DescribeLicenseRequest => DescribeLicenseRequestDataJsonConverter.write(req.data, request.version)
|
||||
case req: ExportClusterManifestRequest => ExportClusterManifestRequestDataJsonConverter.write(req.data, request.version)
|
||||
// AutoMQ for Kafka inject end
|
||||
|
||||
case req: AddRaftVoterRequest => AddRaftVoterRequestDataJsonConverter.write(req.data, request.version)
|
||||
|
|
@ -249,6 +252,9 @@ object RequestConvertToJson {
|
|||
case res: GetNextNodeIdResponse => GetNextNodeIdResponseDataJsonConverter.write(res.data, version)
|
||||
case res: AutomqZoneRouterResponse => AutomqZoneRouterResponseDataJsonConverter.write(res.data, version)
|
||||
case res: DescribeStreamsResponse => DescribeStreamsResponseDataJsonConverter.write(res.data, version)
|
||||
case res: UpdateLicenseResponse => UpdateLicenseResponseDataJsonConverter.write(res.data, version)
|
||||
case res: DescribeLicenseResponse => DescribeLicenseResponseDataJsonConverter.write(res.data, version)
|
||||
case res: ExportClusterManifestResponse => ExportClusterManifestResponseDataJsonConverter.write(res.data, version)
|
||||
// AutoMQ for Kafka inject end
|
||||
|
||||
case res: AddRaftVoterResponse => AddRaftVoterResponseDataJsonConverter.write(res.data, version)
|
||||
|
|
|
|||
|
|
@ -22,6 +22,7 @@ import kafka.automq.backpressure.{BackPressureConfig, BackPressureManager, Defau
|
|||
import kafka.automq.failover.FailoverListener
|
||||
import kafka.automq.kafkalinking.KafkaLinkingManager
|
||||
import kafka.automq.interceptor.{NoopTrafficInterceptor, TrafficInterceptor}
|
||||
import kafka.automq.license.LicenseListener
|
||||
import kafka.automq.table.TableManager
|
||||
import kafka.automq.zerozone.{ConfirmWALProvider, DefaultClientRackProvider, DefaultConfirmWALProvider, DefaultRouterChannelProvider, DefaultLinkRecordDecoder, RouterChannelProvider, ZeroZoneTrafficInterceptor}
|
||||
import kafka.cluster.EndPoint
|
||||
|
|
@ -587,6 +588,8 @@ class BrokerServer(
|
|||
})
|
||||
|
||||
newFailoverListener(ElasticLogManager.INSTANCE.get.client)
|
||||
|
||||
newLicenseListener()
|
||||
// AutoMQ inject end
|
||||
|
||||
// We're now ready to unfence the broker. This also allows this broker to transition
|
||||
|
|
@ -896,6 +899,10 @@ class BrokerServer(
|
|||
metadataLoader.installPublishers(util.List.of(failoverListener));
|
||||
failoverListener
|
||||
}
|
||||
|
||||
protected def newLicenseListener(): LicenseListener = {
|
||||
null
|
||||
}
|
||||
// AutoMQ inject end
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -292,6 +292,7 @@ class ControllerServer(
|
|||
setStreamClient(streamClient).
|
||||
setExtension(c => quorumControllerExtension(c)).
|
||||
setQuorumVoters(config.quorumVoters).
|
||||
setLicenseManager(sharedServer.licenseManager).
|
||||
setReplicaPlacer(replicaPlacer())
|
||||
}
|
||||
controller = controllerBuilder.build()
|
||||
|
|
|
|||
|
|
@ -21,11 +21,13 @@ import com.automq.opentelemetry.AutoMQTelemetryManager
|
|||
import kafka.raft.KafkaRaftManager
|
||||
import kafka.server.Server.MetricsPrefix
|
||||
import kafka.server.metadata.BrokerServerMetrics
|
||||
import kafka.server.streamaspect.LicenseManagerProvider
|
||||
import kafka.utils.{CoreUtils, Logging}
|
||||
import org.apache.kafka.common.es.ElasticStreamSwitch
|
||||
import org.apache.kafka.common.metrics.Metrics
|
||||
import org.apache.kafka.common.network.ListenerName
|
||||
import org.apache.kafka.common.utils.{AppInfoParser, LogContext, Time}
|
||||
import org.apache.kafka.controller.LicenseManager
|
||||
import org.apache.kafka.controller.metrics.ControllerMetadataMetrics
|
||||
import org.apache.kafka.image.MetadataProvenance
|
||||
import org.apache.kafka.image.loader.MetadataLoader
|
||||
|
|
@ -110,6 +112,7 @@ class SharedServer(
|
|||
// AutoMQ for Kafka injection start
|
||||
ElasticStreamSwitch.setSwitch(sharedServerConfig.elasticStreamEnabled)
|
||||
@volatile var telemetryManager: AutoMQTelemetryManager = _
|
||||
@volatile var licenseManager: LicenseManager = _
|
||||
// AutoMQ for Kafka injection end
|
||||
|
||||
@volatile var metrics: Metrics = _metrics
|
||||
|
|
@ -283,6 +286,7 @@ class SharedServer(
|
|||
|
||||
// AutoMQ inject start
|
||||
telemetryManager = buildTelemetryManager(sharedServerConfig, clusterId)
|
||||
licenseManager = LicenseManagerProvider.get()
|
||||
// AutoMQ inject end
|
||||
|
||||
val _raftManager = new KafkaRaftManager[ApiMessageAndVersion](
|
||||
|
|
|
|||
|
|
@ -8,7 +8,7 @@ import kafka.server.{ApiVersionManager, ControllerApis, KafkaConfig, RequestLoca
|
|||
import org.apache.kafka.common.acl.AclOperation.CLUSTER_ACTION
|
||||
import org.apache.kafka.common.errors.ApiException
|
||||
import org.apache.kafka.common.internals.FatalExitError
|
||||
import org.apache.kafka.common.message.{DeleteKVsResponseData, GetKVsResponseData, GetNextNodeIdResponseData, PutKVsResponseData}
|
||||
import org.apache.kafka.common.message.{DeleteKVsResponseData, DescribeLicenseResponseData, ExportClusterManifestResponseData, GetKVsResponseData, GetNextNodeIdResponseData, PutKVsResponseData, UpdateLicenseResponseData}
|
||||
import org.apache.kafka.common.protocol.Errors.{NONE, UNKNOWN_SERVER_ERROR}
|
||||
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
|
||||
import org.apache.kafka.common.requests.s3._
|
||||
|
|
@ -51,6 +51,9 @@ class ElasticControllerApis(
|
|||
case ApiKeys.GET_KVS => handleGetKV(request)
|
||||
case ApiKeys.PUT_KVS => handlePutKV(request)
|
||||
case ApiKeys.DELETE_KVS => handleDeleteKV(request)
|
||||
case ApiKeys.UPDATE_LICENSE => handleUpdateLicense(request)
|
||||
case ApiKeys.DESCRIBE_LICENSE => handleDescribeLicense(request)
|
||||
case ApiKeys.EXPORT_CLUSTER_MANIFEST => handleExportClusterManifest(request)
|
||||
case ApiKeys.AUTOMQ_REGISTER_NODE => handleRegisterNode(request)
|
||||
case ApiKeys.AUTOMQ_GET_NODES => handleGetNodes(request)
|
||||
case ApiKeys.GET_NEXT_NODE_ID => handleGetNextNodeId(request)
|
||||
|
|
@ -99,6 +102,9 @@ class ElasticControllerApis(
|
|||
| ApiKeys.GET_KVS
|
||||
| ApiKeys.PUT_KVS
|
||||
| ApiKeys.DELETE_KVS
|
||||
| ApiKeys.UPDATE_LICENSE
|
||||
| ApiKeys.DESCRIBE_LICENSE
|
||||
| ApiKeys.EXPORT_CLUSTER_MANIFEST
|
||||
| ApiKeys.AUTOMQ_REGISTER_NODE
|
||||
| ApiKeys.AUTOMQ_GET_NODES
|
||||
| ApiKeys.GET_NEXT_NODE_ID
|
||||
|
|
@ -361,6 +367,72 @@ class ElasticControllerApis(
|
|||
}
|
||||
}
|
||||
|
||||
def handleUpdateLicense(request: RequestChannel.Request): CompletableFuture[Unit] = {
|
||||
val updateLicenseRequest = request.body[UpdateLicenseRequest]
|
||||
val context = new ControllerRequestContext(request.context.header.data, request.context.principal,
|
||||
OptionalLong.empty())
|
||||
controller.updateLicense(context, updateLicenseRequest.data)
|
||||
.handle[Unit] { (result, exception) =>
|
||||
if (exception != null) {
|
||||
requestHelper.handleError(request, exception)
|
||||
} else if (result == null) {
|
||||
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {
|
||||
new UpdateLicenseResponse(new UpdateLicenseResponseData().
|
||||
setThrottleTimeMs(requestThrottleMs).
|
||||
setErrorCode(UNKNOWN_SERVER_ERROR.code))
|
||||
})
|
||||
} else {
|
||||
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {
|
||||
new UpdateLicenseResponse(result.setThrottleTimeMs(requestThrottleMs))
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def handleDescribeLicense(request: RequestChannel.Request): CompletableFuture[Unit] = {
|
||||
val describeLicenseRequest = request.body[DescribeLicenseRequest]
|
||||
val context = new ControllerRequestContext(request.context.header.data, request.context.principal,
|
||||
OptionalLong.empty())
|
||||
controller.describeLicense(context, describeLicenseRequest.data)
|
||||
.handle[Unit] { (result, exception) =>
|
||||
if (exception != null) {
|
||||
requestHelper.handleError(request, exception)
|
||||
} else if (result == null) {
|
||||
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {
|
||||
new DescribeLicenseResponse(new DescribeLicenseResponseData().
|
||||
setThrottleTimeMs(requestThrottleMs).
|
||||
setErrorCode(UNKNOWN_SERVER_ERROR.code))
|
||||
})
|
||||
} else {
|
||||
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {
|
||||
new DescribeLicenseResponse(result.setThrottleTimeMs(requestThrottleMs))
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def handleExportClusterManifest(request: RequestChannel.Request): CompletableFuture[Unit] = {
|
||||
val exportClusterManifestRequest = request.body[ExportClusterManifestRequest]
|
||||
val context = new ControllerRequestContext(request.context.header.data, request.context.principal,
|
||||
OptionalLong.empty())
|
||||
controller.exportClusterManifest(context, exportClusterManifestRequest.data)
|
||||
.handle[Unit] { (result, exception) =>
|
||||
if (exception != null) {
|
||||
requestHelper.handleError(request, exception)
|
||||
} else if (result == null) {
|
||||
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {
|
||||
new ExportClusterManifestResponse(new ExportClusterManifestResponseData().
|
||||
setThrottleTimeMs(requestThrottleMs).
|
||||
setErrorCode(UNKNOWN_SERVER_ERROR.code))
|
||||
})
|
||||
} else {
|
||||
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {
|
||||
new ExportClusterManifestResponse(result.setThrottleTimeMs(requestThrottleMs))
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def handleRegisterNode(request: RequestChannel.Request): CompletableFuture[Unit] = {
|
||||
val req = request.body[AutomqRegisterNodeRequest]
|
||||
val context = new ControllerRequestContext(request.context.header.data, request.context.principal,
|
||||
|
|
|
|||
|
|
@ -33,6 +33,7 @@ import org.apache.kafka.common.resource.Resource.CLUSTER_NAME
|
|||
import org.apache.kafka.common.resource.ResourceType.{CLUSTER, TOPIC, TRANSACTIONAL_ID}
|
||||
import org.apache.kafka.common.utils.Time
|
||||
import org.apache.kafka.common.{Node, TopicIdPartition, TopicPartition, Uuid}
|
||||
import org.apache.kafka.controller.LicenseManager
|
||||
import org.apache.kafka.coordinator.group.GroupCoordinator
|
||||
import org.apache.kafka.server.ClientMetricsManager
|
||||
import org.apache.kafka.server.authorizer.Authorizer
|
||||
|
|
@ -91,6 +92,7 @@ class ElasticKafkaApis(
|
|||
|
||||
private var trafficInterceptor: TrafficInterceptor = new NoopTrafficInterceptor(this, metadataCache)
|
||||
private var snapshotAwaitReadySupplier: Supplier[CompletableFuture[Void]] = () => CompletableFuture.completedFuture(null)
|
||||
private val licenseManager: LicenseManager = LicenseManagerProvider.get();
|
||||
|
||||
/**
|
||||
* Generate a map of topic -> [(partitionId, epochId)] based on provided topicsRequestData.
|
||||
|
|
@ -179,6 +181,9 @@ class ElasticKafkaApis(
|
|||
case ApiKeys.DELETE_TOPICS => maybeForwardTopicDeletionToController(request, handleDeleteTopicsRequest)
|
||||
case ApiKeys.GET_NEXT_NODE_ID => forwardToControllerOrFail(request)
|
||||
case ApiKeys.AUTOMQ_UPDATE_GROUP => handleUpdateGroupRequest(request, requestLocal)
|
||||
case ApiKeys.UPDATE_LICENSE => forwardToControllerOrFail(request)
|
||||
case ApiKeys.DESCRIBE_LICENSE => forwardToControllerOrFail(request)
|
||||
case ApiKeys.EXPORT_CLUSTER_MANIFEST => forwardToControllerOrFail(request)
|
||||
|
||||
case _ =>
|
||||
throw new IllegalStateException("Message conversion info is recorded only for Produce/Fetch requests")
|
||||
|
|
@ -208,7 +213,10 @@ class ElasticKafkaApis(
|
|||
| ApiKeys.GET_NEXT_NODE_ID
|
||||
| ApiKeys.AUTOMQ_ZONE_ROUTER
|
||||
| ApiKeys.AUTOMQ_UPDATE_GROUP
|
||||
| ApiKeys.AUTOMQ_GET_PARTITION_SNAPSHOT => handleExtensionRequest(request, requestLocal)
|
||||
| ApiKeys.AUTOMQ_GET_PARTITION_SNAPSHOT
|
||||
| ApiKeys.UPDATE_LICENSE
|
||||
| ApiKeys.DESCRIBE_LICENSE
|
||||
| ApiKeys.EXPORT_CLUSTER_MANIFEST => handleExtensionRequest(request, requestLocal)
|
||||
case _ => super.handle(request, requestLocal)
|
||||
}
|
||||
}
|
||||
|
|
@ -469,6 +477,19 @@ class ElasticKafkaApis(
|
|||
val versionId = request.header.apiVersion
|
||||
val clientId = request.header.clientId
|
||||
val fetchRequest = request.body[FetchRequest]
|
||||
if (!fetchRequest.isFromFollower && licenseManager != null && !licenseManager.checkLicense("", false)) {
|
||||
val topicNames = if (fetchRequest.version() >= 13) metadataCache.topicIdsToNames() else Collections.emptyMap[Uuid, String]()
|
||||
val fetchData = fetchRequest.fetchData(topicNames)
|
||||
val responseData = new util.LinkedHashMap[TopicIdPartition, FetchResponseData.PartitionData]()
|
||||
|
||||
fetchData.forEach { (topicIdPartition, _) =>
|
||||
responseData.put(topicIdPartition, FetchResponse.partitionResponse(topicIdPartition, Errors.TOPIC_AUTHORIZATION_FAILED))
|
||||
}
|
||||
|
||||
val response = FetchResponse.of(Errors.NONE, 0, fetchRequest.metadata.sessionId(), responseData)
|
||||
requestChannel.sendResponse(request, response, None)
|
||||
return
|
||||
}
|
||||
val topicNames =
|
||||
if (fetchRequest.version() >= 13)
|
||||
metadataCache.topicIdsToNames()
|
||||
|
|
|
|||
|
|
@ -0,0 +1,70 @@
|
|||
/*
|
||||
* Copyright 2025, AutoMQ HK Limited.
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package kafka.server.streamaspect;
|
||||
|
||||
|
||||
|
||||
import org.apache.kafka.controller.LicenseManager;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.ServiceLoader;
|
||||
|
||||
public final class LicenseManagerProvider {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(LicenseManagerProvider.class);
|
||||
private static final Object INIT_LOCK = new Object();
|
||||
|
||||
private static volatile LicenseManager cachedInstance;
|
||||
|
||||
private LicenseManagerProvider() {
|
||||
}
|
||||
|
||||
public static LicenseManager get() {
|
||||
LicenseManager current = cachedInstance;
|
||||
if (current == null) {
|
||||
synchronized (INIT_LOCK) {
|
||||
current = cachedInstance;
|
||||
if (current == null) {
|
||||
cachedInstance = current = loadService();
|
||||
}
|
||||
}
|
||||
}
|
||||
return current;
|
||||
}
|
||||
|
||||
private static LicenseManager loadService() {
|
||||
try {
|
||||
ServiceLoader<LicenseManager> loader =
|
||||
ServiceLoader.load(LicenseManager.class, LicenseManager.class.getClassLoader());
|
||||
LicenseManager first = null;
|
||||
for (LicenseManager impl : loader) {
|
||||
if (first != null) {
|
||||
break;
|
||||
}
|
||||
first = impl;
|
||||
}
|
||||
return first;
|
||||
} catch (Throwable t) {
|
||||
LOG.error("Failed to load LicenseManager implementation", t);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -59,12 +59,16 @@ import org.apache.kafka.common.message.DeleteKVsRequestData;
|
|||
import org.apache.kafka.common.message.DeleteKVsResponseData;
|
||||
import org.apache.kafka.common.message.DeleteStreamsRequestData;
|
||||
import org.apache.kafka.common.message.DeleteStreamsResponseData;
|
||||
import org.apache.kafka.common.message.DescribeLicenseRequestData;
|
||||
import org.apache.kafka.common.message.DescribeLicenseResponseData;
|
||||
import org.apache.kafka.common.message.DescribeStreamsRequestData;
|
||||
import org.apache.kafka.common.message.DescribeStreamsResponseData;
|
||||
import org.apache.kafka.common.message.ElectLeadersRequestData;
|
||||
import org.apache.kafka.common.message.ElectLeadersResponseData;
|
||||
import org.apache.kafka.common.message.ExpireDelegationTokenRequestData;
|
||||
import org.apache.kafka.common.message.ExpireDelegationTokenResponseData;
|
||||
import org.apache.kafka.common.message.ExportClusterManifestRequestData;
|
||||
import org.apache.kafka.common.message.ExportClusterManifestResponseData;
|
||||
import org.apache.kafka.common.message.GetKVsRequestData;
|
||||
import org.apache.kafka.common.message.GetKVsResponseData;
|
||||
import org.apache.kafka.common.message.GetNextNodeIdRequestData;
|
||||
|
|
@ -84,6 +88,8 @@ import org.apache.kafka.common.message.TrimStreamsRequestData;
|
|||
import org.apache.kafka.common.message.TrimStreamsResponseData;
|
||||
import org.apache.kafka.common.message.UpdateFeaturesRequestData;
|
||||
import org.apache.kafka.common.message.UpdateFeaturesResponseData;
|
||||
import org.apache.kafka.common.message.UpdateLicenseRequestData;
|
||||
import org.apache.kafka.common.message.UpdateLicenseResponseData;
|
||||
import org.apache.kafka.common.protocol.Errors;
|
||||
import org.apache.kafka.common.quota.ClientQuotaAlteration;
|
||||
import org.apache.kafka.common.quota.ClientQuotaEntity;
|
||||
|
|
@ -663,6 +669,21 @@ public class MockController implements Controller {
|
|||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<DescribeLicenseResponseData> describeLicense(ControllerRequestContext context, DescribeLicenseRequestData request) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<UpdateLicenseResponseData> updateLicense(ControllerRequestContext context, UpdateLicenseRequestData request) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<ExportClusterManifestResponseData> exportClusterManifest(ControllerRequestContext context, ExportClusterManifestRequestData request) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<AutomqRegisterNodeResponseData> registerNode(ControllerRequestContext context,
|
||||
AutomqRegisterNodeRequest req) {
|
||||
|
|
|
|||
|
|
@ -300,7 +300,6 @@ libs += [
|
|||
opentelemetryExporterLogging: "io.opentelemetry:opentelemetry-exporter-logging:$versions.opentelemetrySDK",
|
||||
opentelemetryExporterProm: "io.opentelemetry:opentelemetry-exporter-prometheus:$versions.opentelemetrySDKAlpha",
|
||||
opentelemetryExporterOTLP: "io.opentelemetry:opentelemetry-exporter-otlp:$versions.opentelemetrySDK",
|
||||
opentelemetryExporterSenderJdk: "io.opentelemetry:opentelemetry-exporter-sender-jdk:$versions.opentelemetrySDK",
|
||||
opentelemetryJmx: "io.opentelemetry.instrumentation:opentelemetry-jmx-metrics:$versions.opentelemetryInstrument",
|
||||
oshi: "com.github.oshi:oshi-core-java11:$versions.oshi",
|
||||
bucket4j: "com.bucket4j:bucket4j-core:$versions.bucket4j",
|
||||
|
|
|
|||
|
|
@ -53,12 +53,16 @@ import org.apache.kafka.common.message.DeleteKVsRequestData;
|
|||
import org.apache.kafka.common.message.DeleteKVsResponseData;
|
||||
import org.apache.kafka.common.message.DeleteStreamsRequestData;
|
||||
import org.apache.kafka.common.message.DeleteStreamsResponseData;
|
||||
import org.apache.kafka.common.message.DescribeLicenseRequestData;
|
||||
import org.apache.kafka.common.message.DescribeLicenseResponseData;
|
||||
import org.apache.kafka.common.message.DescribeStreamsRequestData;
|
||||
import org.apache.kafka.common.message.DescribeStreamsResponseData;
|
||||
import org.apache.kafka.common.message.ElectLeadersRequestData;
|
||||
import org.apache.kafka.common.message.ElectLeadersResponseData;
|
||||
import org.apache.kafka.common.message.ExpireDelegationTokenRequestData;
|
||||
import org.apache.kafka.common.message.ExpireDelegationTokenResponseData;
|
||||
import org.apache.kafka.common.message.ExportClusterManifestRequestData;
|
||||
import org.apache.kafka.common.message.ExportClusterManifestResponseData;
|
||||
import org.apache.kafka.common.message.GetKVsRequestData;
|
||||
import org.apache.kafka.common.message.GetKVsResponseData;
|
||||
import org.apache.kafka.common.message.GetNextNodeIdRequestData;
|
||||
|
|
@ -78,6 +82,8 @@ import org.apache.kafka.common.message.TrimStreamsRequestData;
|
|||
import org.apache.kafka.common.message.TrimStreamsResponseData;
|
||||
import org.apache.kafka.common.message.UpdateFeaturesRequestData;
|
||||
import org.apache.kafka.common.message.UpdateFeaturesResponseData;
|
||||
import org.apache.kafka.common.message.UpdateLicenseRequestData;
|
||||
import org.apache.kafka.common.message.UpdateLicenseResponseData;
|
||||
import org.apache.kafka.common.quota.ClientQuotaAlteration;
|
||||
import org.apache.kafka.common.quota.ClientQuotaEntity;
|
||||
import org.apache.kafka.common.requests.ApiError;
|
||||
|
|
@ -609,6 +615,30 @@ public interface Controller extends AclMutator, AutoCloseable {
|
|||
DeleteKVsRequestData request
|
||||
);
|
||||
|
||||
/**
|
||||
* Describe license.
|
||||
*/
|
||||
CompletableFuture<DescribeLicenseResponseData> describeLicense(
|
||||
ControllerRequestContext context,
|
||||
DescribeLicenseRequestData request
|
||||
);
|
||||
|
||||
/**
|
||||
* Update license.
|
||||
*/
|
||||
CompletableFuture<UpdateLicenseResponseData> updateLicense(
|
||||
ControllerRequestContext context,
|
||||
UpdateLicenseRequestData request
|
||||
);
|
||||
|
||||
/**
|
||||
* Export cluster manifest.
|
||||
*/
|
||||
CompletableFuture<ExportClusterManifestResponseData> exportClusterManifest(
|
||||
ControllerRequestContext context,
|
||||
ExportClusterManifestRequestData request
|
||||
);
|
||||
|
||||
/**
|
||||
* Register node metadata
|
||||
*/
|
||||
|
|
|
|||
|
|
@ -0,0 +1,25 @@
|
|||
package org.apache.kafka.controller;
|
||||
|
||||
import org.apache.kafka.common.metadata.KVRecord;
|
||||
import org.apache.kafka.server.common.ApiMessageAndVersion;
|
||||
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
|
||||
|
||||
public interface LicenseManager {
|
||||
|
||||
String describeLicense();
|
||||
|
||||
String exportClusterManifest();
|
||||
|
||||
boolean checkLicense(String license, boolean isUpdate);
|
||||
|
||||
boolean replay(KVRecord record);
|
||||
|
||||
boolean initialized();
|
||||
|
||||
Date getExpireDate();
|
||||
|
||||
List<ApiMessageAndVersion> getRecordsToAppend(String license);
|
||||
}
|
||||
|
|
@ -62,12 +62,16 @@ import org.apache.kafka.common.message.DeleteKVsRequestData;
|
|||
import org.apache.kafka.common.message.DeleteKVsResponseData;
|
||||
import org.apache.kafka.common.message.DeleteStreamsRequestData;
|
||||
import org.apache.kafka.common.message.DeleteStreamsResponseData;
|
||||
import org.apache.kafka.common.message.DescribeLicenseRequestData;
|
||||
import org.apache.kafka.common.message.DescribeLicenseResponseData;
|
||||
import org.apache.kafka.common.message.DescribeStreamsRequestData;
|
||||
import org.apache.kafka.common.message.DescribeStreamsResponseData;
|
||||
import org.apache.kafka.common.message.ElectLeadersRequestData;
|
||||
import org.apache.kafka.common.message.ElectLeadersResponseData;
|
||||
import org.apache.kafka.common.message.ExpireDelegationTokenRequestData;
|
||||
import org.apache.kafka.common.message.ExpireDelegationTokenResponseData;
|
||||
import org.apache.kafka.common.message.ExportClusterManifestRequestData;
|
||||
import org.apache.kafka.common.message.ExportClusterManifestResponseData;
|
||||
import org.apache.kafka.common.message.GetKVsRequestData;
|
||||
import org.apache.kafka.common.message.GetKVsResponseData;
|
||||
import org.apache.kafka.common.message.GetNextNodeIdRequestData;
|
||||
|
|
@ -87,6 +91,8 @@ import org.apache.kafka.common.message.TrimStreamsRequestData;
|
|||
import org.apache.kafka.common.message.TrimStreamsResponseData;
|
||||
import org.apache.kafka.common.message.UpdateFeaturesRequestData;
|
||||
import org.apache.kafka.common.message.UpdateFeaturesResponseData;
|
||||
import org.apache.kafka.common.message.UpdateLicenseRequestData;
|
||||
import org.apache.kafka.common.message.UpdateLicenseResponseData;
|
||||
import org.apache.kafka.common.metadata.AbortTransactionRecord;
|
||||
import org.apache.kafka.common.metadata.AccessControlEntryRecord;
|
||||
import org.apache.kafka.common.metadata.AssignedS3ObjectIdRecord;
|
||||
|
|
@ -289,6 +295,7 @@ public final class QuorumController implements Controller {
|
|||
private StreamClient streamClient;
|
||||
private List<String> quorumVoters = Collections.emptyList();
|
||||
private Function<QuorumController, QuorumControllerExtension> extension = c -> QuorumControllerExtension.NOOP;
|
||||
private LicenseManager licenseManager = null;
|
||||
// AutoMQ for Kafka inject end
|
||||
|
||||
public Builder(int nodeId, String clusterId) {
|
||||
|
|
@ -460,6 +467,11 @@ public final class QuorumController implements Controller {
|
|||
this.extension = extension;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder setLicenseManager(LicenseManager licenseManager) {
|
||||
this.licenseManager = licenseManager;
|
||||
return this;
|
||||
}
|
||||
// AutoMQ for Kafka inject end
|
||||
|
||||
public QuorumController build() throws Exception {
|
||||
|
|
@ -522,6 +534,7 @@ public final class QuorumController implements Controller {
|
|||
eligibleLeaderReplicasEnabled,
|
||||
streamClient,
|
||||
quorumVoters,
|
||||
licenseManager,
|
||||
extension
|
||||
);
|
||||
} catch (Exception e) {
|
||||
|
|
@ -1359,13 +1372,21 @@ public final class QuorumController implements Controller {
|
|||
@Override
|
||||
public ControllerResult<Void> generateRecordsAndResult() {
|
||||
try {
|
||||
return ActivationRecordsGenerator.generate(
|
||||
ControllerResult<Void> base = ActivationRecordsGenerator.generate(
|
||||
log::warn,
|
||||
logReplayTracker.empty(),
|
||||
offsetControl.transactionStartOffset(),
|
||||
zkMigrationEnabled,
|
||||
bootstrapMetadata,
|
||||
featureControl);
|
||||
// AutoMQ for Kafka inject start
|
||||
List<ApiMessageAndVersion> all = new ArrayList<>(base.records());
|
||||
if (licenseManager != null && !licenseManager.initialized()) {
|
||||
List<ApiMessageAndVersion> recordsToAppend = licenseManager.getRecordsToAppend("");
|
||||
all.addAll(recordsToAppend);
|
||||
}
|
||||
// AutoMQ for Kafka inject end
|
||||
return ControllerResult.atomicOf(all, null);
|
||||
} catch (Throwable t) {
|
||||
throw fatalFaultHandler.handleFault("exception while completing controller " +
|
||||
"activation", t);
|
||||
|
|
@ -1599,7 +1620,7 @@ public final class QuorumController implements Controller {
|
|||
* @param snapshotId The snapshotId if this record is from a snapshot
|
||||
* @param offset The offset of the record
|
||||
*/
|
||||
@SuppressWarnings("checkstyle:javaNCSS")
|
||||
@SuppressWarnings({"checkstyle:javaNCSS", "checkstyle:MethodLength"})
|
||||
private void replay(ApiMessage message, Optional<OffsetAndEpoch> snapshotId, long offset) {
|
||||
if (log.isTraceEnabled()) {
|
||||
if (snapshotId.isPresent()) {
|
||||
|
|
@ -1743,6 +1764,9 @@ public final class QuorumController implements Controller {
|
|||
topicDeletionManager.replay(record);
|
||||
nodeControlManager.replay(record);
|
||||
routerChannelEpochControlManager.replay(record);
|
||||
if (licenseManager != null) {
|
||||
licenseManager.replay(record);
|
||||
}
|
||||
break;
|
||||
}
|
||||
case REMOVE_KVRECORD: {
|
||||
|
|
@ -2007,6 +2031,8 @@ public final class QuorumController implements Controller {
|
|||
private final RouterChannelEpochControlManager routerChannelEpochControlManager;
|
||||
|
||||
private final QuorumControllerExtension extension;
|
||||
|
||||
private final LicenseManager licenseManager;
|
||||
// AutoMQ for Kafka inject end
|
||||
|
||||
|
||||
|
|
@ -2046,6 +2072,7 @@ public final class QuorumController implements Controller {
|
|||
// AutoMQ inject start
|
||||
StreamClient streamClient,
|
||||
List<String> quorumVoters,
|
||||
LicenseManager licenseManager,
|
||||
Function<QuorumController, QuorumControllerExtension> extension
|
||||
// AutoMQ inject end
|
||||
|
||||
|
|
@ -2176,6 +2203,8 @@ public final class QuorumController implements Controller {
|
|||
this.nodeControlManager = new NodeControlManager(snapshotRegistry, new DefaultNodeRuntimeInfoManager(clusterControl, streamControlManager));
|
||||
this.routerChannelEpochControlManager = new RouterChannelEpochControlManager(snapshotRegistry, this, nodeControlManager, time);
|
||||
this.extension = extension.apply(this);
|
||||
this.licenseManager = licenseManager;
|
||||
|
||||
|
||||
// set the nodeControlManager here to avoid circular dependency
|
||||
this.replicationControl.setNodeControlManager(nodeControlManager);
|
||||
|
|
@ -2797,6 +2826,91 @@ public final class QuorumController implements Controller {
|
|||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<DescribeLicenseResponseData> describeLicense(
|
||||
ControllerRequestContext context,
|
||||
DescribeLicenseRequestData request
|
||||
) {
|
||||
if (licenseManager == null) {
|
||||
return CompletableFuture.completedFuture(
|
||||
new DescribeLicenseResponseData()
|
||||
.setErrorCode(Errors.UNSUPPORTED_VERSION.code())
|
||||
.setLicense("")
|
||||
.setThrottleTimeMs(0)
|
||||
);
|
||||
}
|
||||
return appendReadEvent("describeLicense", context.deadlineNs(), () -> {
|
||||
String license = licenseManager.describeLicense();
|
||||
return new DescribeLicenseResponseData()
|
||||
.setErrorCode(Errors.NONE.code())
|
||||
.setLicense(license)
|
||||
.setThrottleTimeMs(0);
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<UpdateLicenseResponseData> updateLicense(
|
||||
ControllerRequestContext context,
|
||||
UpdateLicenseRequestData request
|
||||
) {
|
||||
if (licenseManager == null) {
|
||||
return CompletableFuture.completedFuture(
|
||||
new UpdateLicenseResponseData()
|
||||
.setErrorCode(Errors.UNSUPPORTED_VERSION.code())
|
||||
.setErrorMessage("License management is not supported")
|
||||
.setThrottleTimeMs(0)
|
||||
);
|
||||
}
|
||||
String license = request.license();
|
||||
if (!licenseManager.checkLicense(license, true)) {
|
||||
return CompletableFuture.completedFuture(
|
||||
new UpdateLicenseResponseData()
|
||||
.setErrorCode(Errors.POLICY_VIOLATION.code())
|
||||
.setErrorMessage("license check failed")
|
||||
.setThrottleTimeMs(0)
|
||||
);
|
||||
}
|
||||
return appendWriteEvent("updateLicense", context.deadlineNs(), () -> {
|
||||
try {
|
||||
List<ApiMessageAndVersion> recordsToAppend = licenseManager.getRecordsToAppend(license);
|
||||
UpdateLicenseResponseData response = new UpdateLicenseResponseData()
|
||||
.setErrorCode(Errors.NONE.code())
|
||||
.setErrorMessage("")
|
||||
.setThrottleTimeMs(0);
|
||||
return ControllerResult.of(recordsToAppend, response);
|
||||
} catch (Exception e) {
|
||||
log.error("Failed to process license update", e);
|
||||
UpdateLicenseResponseData response = new UpdateLicenseResponseData()
|
||||
.setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code())
|
||||
.setErrorMessage("Failed to process update: " + e.getMessage())
|
||||
.setThrottleTimeMs(0);
|
||||
return ControllerResult.of(Collections.emptyList(), response);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<ExportClusterManifestResponseData> exportClusterManifest(
|
||||
ControllerRequestContext context,
|
||||
ExportClusterManifestRequestData request
|
||||
) {
|
||||
if (licenseManager == null) {
|
||||
return CompletableFuture.completedFuture(
|
||||
new ExportClusterManifestResponseData()
|
||||
.setErrorCode(Errors.UNSUPPORTED_VERSION.code())
|
||||
.setManifest("")
|
||||
.setThrottleTimeMs(0)
|
||||
);
|
||||
}
|
||||
return appendReadEvent("exportClusterManifest", context.deadlineNs(), () -> {
|
||||
String manifest = licenseManager.exportClusterManifest();
|
||||
return new ExportClusterManifestResponseData()
|
||||
.setErrorCode(Errors.NONE.code())
|
||||
.setManifest(manifest)
|
||||
.setThrottleTimeMs(0);
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<AutomqRegisterNodeResponseData> registerNode(ControllerRequestContext context, AutomqRegisterNodeRequest req) {
|
||||
return appendWriteEvent("registerNode", context.deadlineNs(), () -> nodeControlManager.register(req));
|
||||
|
|
|
|||
|
|
@ -60,4 +60,9 @@ public class S3StreamKafkaMetricsConstants {
|
|||
// Back Pressure
|
||||
public static final String BACK_PRESSURE_STATE_METRIC_NAME = "back_pressure_state";
|
||||
public static final AttributeKey<String> LABEL_BACK_PRESSURE_STATE = AttributeKey.stringKey("state");
|
||||
|
||||
// License
|
||||
public static final String LICENSE_EXPIRY_TIMESTAMP_METRIC_NAME = "license_expiry_timestamp";
|
||||
public static final String LICENSE_SECONDS_REMAINING_METRIC_NAME = "automq_enterprise_license_expiry_seconds";
|
||||
public static final String NODE_VCPU_COUNT_METRIC_NAME = "node_vcpu_count";
|
||||
}
|
||||
|
|
|
|||
|
|
@ -131,6 +131,17 @@ public class S3StreamKafkaMetricsManager {
|
|||
*/
|
||||
private static Supplier<String> certChainSupplier = () -> null;
|
||||
|
||||
/**
|
||||
* Supplier for license expiry date.
|
||||
* Returns the expiry date of the license, or null if no license is configured.
|
||||
*/
|
||||
private static Supplier<Date> licenseExpireDateSupplier = () -> null;
|
||||
|
||||
// License metrics
|
||||
private static ObservableLongGauge licenseExpiryTimestampMetrics = new NoopObservableLongGauge();
|
||||
private static ObservableLongGauge licenseSecondsRemainingMetrics = new NoopObservableLongGauge();
|
||||
private static ObservableLongGauge nodeVcpuCountMetrics = new NoopObservableLongGauge();
|
||||
|
||||
public static void configure(MetricsConfig metricsConfig) {
|
||||
synchronized (BASE_ATTRIBUTES_LISTENERS) {
|
||||
S3StreamKafkaMetricsManager.metricsConfig = metricsConfig;
|
||||
|
|
@ -147,6 +158,7 @@ public class S3StreamKafkaMetricsManager {
|
|||
initLogAppendMetrics(meter, prefix);
|
||||
initPartitionStatusStatisticsMetrics(meter, prefix);
|
||||
initBackPressureMetrics(meter, prefix);
|
||||
initLicenseMetrics(meter, prefix);
|
||||
try {
|
||||
initCertMetrics(meter, prefix);
|
||||
} catch (Exception e) {
|
||||
|
|
@ -324,6 +336,44 @@ public class S3StreamKafkaMetricsManager {
|
|||
});
|
||||
}
|
||||
|
||||
private static void initLicenseMetrics(Meter meter, String prefix) {
|
||||
licenseExpiryTimestampMetrics = meter.gaugeBuilder(prefix + S3StreamKafkaMetricsConstants.LICENSE_EXPIRY_TIMESTAMP_METRIC_NAME)
|
||||
.setDescription("The expiry timestamp of the license")
|
||||
.setUnit("milliseconds")
|
||||
.ofLongs()
|
||||
.buildWithCallback(result -> {
|
||||
if (MetricsLevel.INFO.isWithin(metricsConfig.getMetricsLevel())) {
|
||||
Date expireDate = licenseExpireDateSupplier.get();
|
||||
if (expireDate != null) {
|
||||
result.record(expireDate.getTime(), metricsConfig.getBaseAttributes());
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
licenseSecondsRemainingMetrics = meter.gaugeBuilder(prefix + S3StreamKafkaMetricsConstants.LICENSE_SECONDS_REMAINING_METRIC_NAME)
|
||||
.setDescription("The remaining seconds until the license expires")
|
||||
.setUnit("seconds")
|
||||
.ofLongs()
|
||||
.buildWithCallback(result -> {
|
||||
if (MetricsLevel.INFO.isWithin(metricsConfig.getMetricsLevel())) {
|
||||
Date expireDate = licenseExpireDateSupplier.get();
|
||||
if (expireDate != null) {
|
||||
long secondsRemaining = (expireDate.getTime() - System.currentTimeMillis()) / 1000;
|
||||
result.record(secondsRemaining, metricsConfig.getBaseAttributes());
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
nodeVcpuCountMetrics = meter.gaugeBuilder(prefix + S3StreamKafkaMetricsConstants.NODE_VCPU_COUNT_METRIC_NAME)
|
||||
.setDescription("The number of vCPUs available on this node")
|
||||
.ofLongs()
|
||||
.buildWithCallback(result -> {
|
||||
if (MetricsLevel.INFO.isWithin(metricsConfig.getMetricsLevel())) {
|
||||
result.record(Runtime.getRuntime().availableProcessors(), metricsConfig.getBaseAttributes());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Initialize the certificate metrics.
|
||||
*
|
||||
|
|
@ -516,4 +566,8 @@ public class S3StreamKafkaMetricsManager {
|
|||
public static void setCertChainSupplier(Supplier<String> certChainSupplier) {
|
||||
S3StreamKafkaMetricsManager.certChainSupplier = certChainSupplier;
|
||||
}
|
||||
|
||||
public static void setLicenseExpireDateSupplier(Supplier<Date> licenseExpireDateSupplier) {
|
||||
S3StreamKafkaMetricsManager.licenseExpireDateSupplier = licenseExpireDateSupplier;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue