Compare commits

...

8 Commits

38 changed files with 1483 additions and 34 deletions

View File

@ -39,6 +39,7 @@ import org.apache.kafka.common.utils.Time;
import com.automq.shell.metrics.S3MetricsExporter;
import com.automq.stream.api.KeyValue;
import com.automq.stream.api.KeyValue.ValueAndEpoch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -95,6 +96,32 @@ public class ClientKVClient {
throw code.exception();
}
public ValueAndEpoch getKV(String key, String namespace) throws IOException {
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("[ClientKVClient]: Get KV: {} Namespace: {}", key, namespace);
}
GetKVsRequestData data = new GetKVsRequestData()
.setGetKeyRequests(List.of(new GetKVsRequestData.GetKVRequest().setKey(key).setNamespace(namespace)));
long now = Time.SYSTEM.milliseconds();
ClientRequest clientRequest = networkClient.newClientRequest(String.valueOf(bootstrapServer.id()),
new GetKVsRequest.Builder(data), now, true, 3000, null);
ClientResponse response = NetworkClientUtils.sendAndReceive(networkClient, clientRequest, Time.SYSTEM);
GetKVsResponseData responseData = (GetKVsResponseData) response.responseBody().data();
Errors code = Errors.forCode(responseData.errorCode());
if (Objects.requireNonNull(code) == Errors.NONE) {
return ValueAndEpoch.of(
responseData.getKVResponses().get(0).value(),
responseData.getKVResponses().get(0).epoch());
}
throw code.exception();
}
public KeyValue.Value putKV(String key, byte[] value) throws IOException {
long now = Time.SYSTEM.milliseconds();
@ -119,6 +146,32 @@ public class ClientKVClient {
throw code.exception();
}
public ValueAndEpoch putKV(String key, byte[] value, String namespace, long epoch) throws IOException {
long now = Time.SYSTEM.milliseconds();
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("[ClientKVClient]: put KV: {}", key);
}
PutKVsRequestData data = new PutKVsRequestData()
.setPutKVRequests(List.of(new PutKVsRequestData.PutKVRequest().setKey(key).setValue(value).setNamespace(namespace).setEpoch(epoch)));
ClientRequest clientRequest = networkClient.newClientRequest(String.valueOf(bootstrapServer.id()),
new PutKVsRequest.Builder(data), now, true, 3000, null);
ClientResponse response = NetworkClientUtils.sendAndReceive(networkClient, clientRequest, Time.SYSTEM);
PutKVsResponseData responseData = (PutKVsResponseData) response.responseBody().data();
Errors code = Errors.forCode(responseData.errorCode());
if (Objects.requireNonNull(code) == Errors.NONE) {
return ValueAndEpoch.of(
responseData.putKVResponses().get(0).value(),
responseData.putKVResponses().get(0).epoch());
}
throw code.exception();
}
public KeyValue.Value deleteKV(String key) throws IOException {
long now = Time.SYSTEM.milliseconds();
@ -142,4 +195,30 @@ public class ClientKVClient {
throw code.exception();
}
public ValueAndEpoch deleteKV(String key, String namespace, long epoch) throws IOException {
long now = Time.SYSTEM.milliseconds();
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("[ClientKVClient]: Delete KV: {}", key);
}
DeleteKVsRequestData data = new DeleteKVsRequestData()
.setDeleteKVRequests(List.of(new DeleteKVsRequestData.DeleteKVRequest().setKey(key).setNamespace(namespace).setEpoch(epoch)));
ClientRequest clientRequest = networkClient.newClientRequest(String.valueOf(bootstrapServer.id()),
new DeleteKVsRequest.Builder(data), now, true, 3000, null);
ClientResponse response = NetworkClientUtils.sendAndReceive(networkClient, clientRequest, Time.SYSTEM);
DeleteKVsResponseData responseData = (DeleteKVsResponseData) response.responseBody().data();
Errors code = Errors.forCode(responseData.errorCode());
if (Objects.requireNonNull(code) == Errors.NONE) {
return ValueAndEpoch.of(
responseData.deleteKVResponses().get(0).value(),
responseData.deleteKVResponses().get(0).epoch());
}
throw code.exception();
}
}

View File

@ -1739,6 +1739,48 @@ public interface Admin extends AutoCloseable {
* @return {@link UpdateGroupResult}
*/
UpdateGroupResult updateGroup(String groupId, UpdateGroupSpec groupSpec, UpdateGroupOptions options);
GetNamespacedKVResult getNamespacedKV(
Optional<Set<TopicPartition>> partitions,
String namespace,
String key,
GetNamespacedKVOptions options
);
/**
* Put a key-value pair in the namespaced KV store.
* @param partitions
* @param namespace
* @param key
* @param value
* @param options
* @return
*/
PutNamespacedKVResult putNamespacedKV(
Optional<Set<TopicPartition>> partitions,
String namespace,
String key,
String value,
PutNamespacedKVOptions options
);
/**
* Delete a key-value pair in the namespaced KV store.
* @param partitions
* @param namespace
* @param key
* @param options
* @return
*/
DeleteNamespacedKVResult deleteNamespacedKV(
Optional<Set<TopicPartition>> partitions,
String namespace,
String key,
DeleteNamespacedKVOptions options
);
// AutoMQ inject end
/**

View File

@ -0,0 +1,15 @@
package org.apache.kafka.clients.admin;
public class DeleteNamespacedKVOptions extends AbstractOptions<DeleteNamespacedKVOptions> {
private long ifMatchEpoch = 0L;
public DeleteNamespacedKVOptions ifMatchEpoch(long epoch) {
this.ifMatchEpoch = epoch;
return this;
}
public long ifMatchEpoch() {
return ifMatchEpoch;
}
}

View File

@ -0,0 +1,19 @@
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import java.util.Map;
public class DeleteNamespacedKVResult extends AbstractOptions<DeleteNamespacedKVResult> {
private final Map<TopicPartition, KafkaFuture<Void>> futures;
public DeleteNamespacedKVResult(Map<TopicPartition, KafkaFuture<Void>> futures) {
this.futures = futures;
}
public KafkaFuture<Map<TopicPartition, KafkaFuture<Void>>> all() {
return KafkaFuture.completedFuture(futures);
}
}

View File

@ -320,5 +320,20 @@ public class ForwardingAdmin implements Admin {
return delegate.updateGroup(groupId, groupSpec, options);
}
@Override
public GetNamespacedKVResult getNamespacedKV(Optional<Set<TopicPartition>> partitions, String namespace, String key, GetNamespacedKVOptions options) {
return delegate.getNamespacedKV(partitions, namespace, key, options);
}
@Override
public PutNamespacedKVResult putNamespacedKV(Optional<Set<TopicPartition>> partitions, String namespace, String key, String value, PutNamespacedKVOptions options) {
return delegate.putNamespacedKV(partitions, namespace, key, value, options);
}
@Override
public DeleteNamespacedKVResult deleteNamespacedKV(Optional<Set<TopicPartition>> partitions, String namespace, String key, DeleteNamespacedKVOptions options) {
return delegate.deleteNamespacedKV(partitions, namespace, key, options);
}
// AutoMQ inject end
}

View File

@ -0,0 +1,4 @@
package org.apache.kafka.clients.admin;
public class GetNamespacedKVOptions extends AbstractOptions<GetNamespacedKVOptions> {
}

View File

@ -0,0 +1,21 @@
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.message.GetKVsResponseData.GetKVResponse;
import java.util.Map;
import java.util.concurrent.ExecutionException;
public class GetNamespacedKVResult {
private final Map<TopicPartition, KafkaFuture<GetKVResponse>> futures;
public GetNamespacedKVResult(Map<TopicPartition, KafkaFuture<GetKVResponse>> futures) {
this.futures = futures;
}
public KafkaFuture<Map<TopicPartition, KafkaFuture<GetKVResponse>>> all() throws ExecutionException, InterruptedException {
return KafkaFuture.completedFuture(futures);
}
}

View File

@ -47,14 +47,19 @@ import org.apache.kafka.clients.admin.internals.AlterConsumerGroupOffsetsHandler
import org.apache.kafka.clients.admin.internals.CoordinatorKey;
import org.apache.kafka.clients.admin.internals.DeleteConsumerGroupOffsetsHandler;
import org.apache.kafka.clients.admin.internals.DeleteConsumerGroupsHandler;
import org.apache.kafka.clients.admin.internals.DeleteNamespacedKVHandler;
import org.apache.kafka.clients.admin.internals.DeleteRecordsHandler;
import org.apache.kafka.clients.admin.internals.DescribeConsumerGroupsHandler;
import org.apache.kafka.clients.admin.internals.DescribeProducersHandler;
import org.apache.kafka.clients.admin.internals.DescribeTransactionsHandler;
import org.apache.kafka.clients.admin.internals.FenceProducersHandler;
import org.apache.kafka.clients.admin.internals.GetNamespacedKVHandler;
import org.apache.kafka.clients.admin.internals.ListConsumerGroupOffsetsHandler;
import org.apache.kafka.clients.admin.internals.ListOffsetsHandler;
import org.apache.kafka.clients.admin.internals.ListTransactionsHandler;
import org.apache.kafka.clients.admin.internals.NamespacedKVRecordsToGet;
import org.apache.kafka.clients.admin.internals.NamespacedKVRecordsToPut;
import org.apache.kafka.clients.admin.internals.PutNamespacedKVHandler;
import org.apache.kafka.clients.admin.internals.RemoveMembersFromConsumerGroupHandler;
import org.apache.kafka.clients.admin.internals.UpdateGroupHandler;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
@ -132,6 +137,7 @@ import org.apache.kafka.common.message.DeleteAclsRequestData.DeleteAclsFilter;
import org.apache.kafka.common.message.DeleteAclsResponseData;
import org.apache.kafka.common.message.DeleteAclsResponseData.DeleteAclsFilterResult;
import org.apache.kafka.common.message.DeleteAclsResponseData.DeleteAclsMatchingAcl;
import org.apache.kafka.common.message.DeleteKVsRequestData;
import org.apache.kafka.common.message.DeleteTopicsRequestData;
import org.apache.kafka.common.message.DeleteTopicsRequestData.DeleteTopicState;
import org.apache.kafka.common.message.DeleteTopicsResponseData.DeletableTopicResult;
@ -152,6 +158,8 @@ import org.apache.kafka.common.message.DescribeUserScramCredentialsRequestData;
import org.apache.kafka.common.message.DescribeUserScramCredentialsRequestData.UserName;
import org.apache.kafka.common.message.DescribeUserScramCredentialsResponseData;
import org.apache.kafka.common.message.ExpireDelegationTokenRequestData;
import org.apache.kafka.common.message.GetKVsRequestData;
import org.apache.kafka.common.message.GetKVsResponseData;
import org.apache.kafka.common.message.GetNextNodeIdRequestData;
import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData;
import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity;
@ -160,6 +168,8 @@ import org.apache.kafka.common.message.ListGroupsRequestData;
import org.apache.kafka.common.message.ListGroupsResponseData;
import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData;
import org.apache.kafka.common.message.MetadataRequestData;
import org.apache.kafka.common.message.PutKVsRequestData;
import org.apache.kafka.common.message.PutKVsResponseData;
import org.apache.kafka.common.message.RemoveRaftVoterRequestData;
import org.apache.kafka.common.message.RenewDelegationTokenRequestData;
import org.apache.kafka.common.message.UnregisterBrokerRequestData;
@ -266,6 +276,7 @@ import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import java.nio.charset.StandardCharsets;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.time.Duration;
@ -4866,6 +4877,86 @@ public class KafkaAdminClient extends AdminClient {
return new UpdateGroupResult(future.get(CoordinatorKey.byGroupId(groupId)));
}
@Override
public GetNamespacedKVResult getNamespacedKV(Optional<Set<TopicPartition>> partitions, String namespace, String key, GetNamespacedKVOptions options) {
Set<TopicPartition> targetPartitions = partitions.orElseThrow(() ->
new IllegalArgumentException("Partitions cannot be empty")
);
NamespacedKVRecordsToGet.Builder recordsToGetBuilder = NamespacedKVRecordsToGet.newBuilder();
for (TopicPartition tp : targetPartitions) {
GetKVsRequestData.GetKVRequest kvRequest = new GetKVsRequestData.GetKVRequest()
.setKey(key)
.setNamespace(namespace);
recordsToGetBuilder.addRecord(tp, kvRequest);
}
NamespacedKVRecordsToGet recordsToGet = recordsToGetBuilder.build();
GetNamespacedKVHandler handler = new GetNamespacedKVHandler(logContext, recordsToGet);
SimpleAdminApiFuture<TopicPartition, GetKVsResponseData.GetKVResponse> future = GetNamespacedKVHandler.newFuture(targetPartitions);
invokeDriver(handler, future, options.timeoutMs);
return new GetNamespacedKVResult(future.all());
}
@Override
public PutNamespacedKVResult putNamespacedKV(Optional<Set<TopicPartition>> partitions, String namespace, String key, String value, PutNamespacedKVOptions options) {
Set<TopicPartition> targetPartitions = partitions.orElseThrow(() ->
new IllegalArgumentException("Partitions cannot be empty")
);
NamespacedKVRecordsToPut.Builder recordsToPutBuilder = NamespacedKVRecordsToPut.newBuilder();
for (TopicPartition tp : targetPartitions) {
PutKVsRequestData.PutKVRequest kvRequest = new PutKVsRequestData.PutKVRequest()
.setKey(key)
.setValue(value.getBytes(StandardCharsets.UTF_8))
.setNamespace(namespace)
.setOverwrite(options.overwrite())
.setEpoch(options.ifMatchEpoch());
recordsToPutBuilder.addRecord(tp, kvRequest);
}
NamespacedKVRecordsToPut recordsToPut = recordsToPutBuilder.build();
PutNamespacedKVHandler handler = new PutNamespacedKVHandler(logContext, recordsToPut);
SimpleAdminApiFuture<TopicPartition, PutKVsResponseData.PutKVResponse> future = PutNamespacedKVHandler.newFuture(targetPartitions);
invokeDriver(handler, future, options.timeoutMs);
return new PutNamespacedKVResult(future.all());
}
@Override
public DeleteNamespacedKVResult deleteNamespacedKV(Optional<Set<TopicPartition>> partitions, String namespace, String key, DeleteNamespacedKVOptions options) {
Set<TopicPartition> targetPartitions = partitions.orElseThrow(() ->
new IllegalArgumentException("Partitions cannot be empty")
);
NamespacedKVRecordsToDelete.Builder recordsToDeleteBuilder = NamespacedKVRecordsToDelete.newBuilder();
for (TopicPartition tp : targetPartitions) {
DeleteKVsRequestData.DeleteKVRequest kvRequest = new DeleteKVsRequestData.DeleteKVRequest()
.setKey(key)
.setNamespace(namespace)
.setEpoch(options.ifMatchEpoch());
recordsToDeleteBuilder.addRecord(tp, kvRequest);
}
NamespacedKVRecordsToDelete recordsToDelete = recordsToDeleteBuilder.build();
DeleteNamespacedKVHandler handler = new DeleteNamespacedKVHandler(logContext, recordsToDelete);
SimpleAdminApiFuture<TopicPartition, Void> future = DeleteNamespacedKVHandler.newFuture(targetPartitions);
invokeDriver(handler, future, options.timeoutMs);
return new DeleteNamespacedKVResult(future.all());
}
private <K, V> void invokeDriver(
AdminApiHandler<K, V> handler,
AdminApiFuture<K, V> future,

View File

@ -0,0 +1,40 @@
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.message.DeleteKVsRequestData.DeleteKVRequest;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class NamespacedKVRecordsToDelete {
private final Map<TopicPartition, List<DeleteKVRequest>> recordsByPartition;
public NamespacedKVRecordsToDelete(Map<TopicPartition, List<DeleteKVRequest>> recordsByPartition) {
this.recordsByPartition = recordsByPartition;
}
public static NamespacedKVRecordsToDelete.Builder newBuilder() {
return new NamespacedKVRecordsToDelete.Builder();
}
public Map<TopicPartition, List<DeleteKVRequest>> recordsByPartition() {
return recordsByPartition;
}
public static class Builder {
private final Map<TopicPartition, List<DeleteKVRequest>> records = new HashMap<>();
public NamespacedKVRecordsToDelete.Builder addRecord(TopicPartition partition, DeleteKVRequest request) {
records.computeIfAbsent(partition, k -> new ArrayList<>()).add(request);
return this;
}
public NamespacedKVRecordsToDelete build() {
return new NamespacedKVRecordsToDelete(records);
}
}
}

View File

@ -0,0 +1,24 @@
package org.apache.kafka.clients.admin;
public class PutNamespacedKVOptions extends AbstractOptions<PutNamespacedKVOptions> {
private boolean overwrite = false;
private long ifMatchEpoch = 0L;
public PutNamespacedKVOptions overwrite(boolean overwrite) {
this.overwrite = overwrite;
return this;
}
public PutNamespacedKVOptions ifMatchEpoch(long epoch) {
this.ifMatchEpoch = epoch;
return this;
}
public boolean overwrite() {
return overwrite;
}
public long ifMatchEpoch() {
return ifMatchEpoch;
}
}

View File

@ -0,0 +1,23 @@
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.message.PutKVsResponseData.PutKVResponse;
import java.util.Map;
public class PutNamespacedKVResult {
private final Map<TopicPartition, KafkaFuture<PutKVResponse>> futures;
public PutNamespacedKVResult(Map<TopicPartition, KafkaFuture<PutKVResponse>> futures) {
this.futures = futures;
}
public KafkaFuture<Map<TopicPartition, KafkaFuture<PutKVResponse>>> all() {
return KafkaFuture.completedFuture(futures);
}
}

View File

@ -0,0 +1,89 @@
package org.apache.kafka.clients.admin.internals;
import org.apache.kafka.clients.admin.NamespacedKVRecordsToDelete;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.message.DeleteKVsRequestData;
import org.apache.kafka.common.message.DeleteKVsRequestData.DeleteKVRequest;
import org.apache.kafka.common.message.DeleteKVsResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.s3.DeleteKVsRequest;
import org.apache.kafka.common.requests.s3.DeleteKVsResponse;
import org.apache.kafka.common.utils.LogContext;
import org.slf4j.Logger;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
public class DeleteNamespacedKVHandler extends AdminApiHandler.Batched<TopicPartition, Void> {
private final Logger logger;
private final NamespacedKVRecordsToDelete recordsToDelete;
private final AdminApiLookupStrategy<TopicPartition> lookupStrategy;
public DeleteNamespacedKVHandler(LogContext logContext, NamespacedKVRecordsToDelete recordsToDelete) {
this.logger = logContext.logger(PutNamespacedKVHandler.class);
this.recordsToDelete = recordsToDelete;
this.lookupStrategy = new PartitionLeaderStrategy(logContext);
}
@Override
AbstractRequest.Builder<?> buildBatchedRequest(int brokerId, Set<TopicPartition> partitions) {
Map<TopicPartition, List<DeleteKVRequest>> filteredRecords = new HashMap<>();
for (TopicPartition partition : partitions) {
if (recordsToDelete.recordsByPartition().containsKey(partition)) {
filteredRecords.put(partition, recordsToDelete.recordsByPartition().get(partition));
}
}
DeleteKVsRequestData requestData = new DeleteKVsRequestData();
List<DeleteKVRequest> allRequests = new ArrayList<>();
filteredRecords.values().forEach(allRequests::addAll);
requestData.setDeleteKVRequests(allRequests);
return new DeleteKVsRequest.Builder(requestData);
}
@Override
public String apiName() {
return "DeleteKVs";
}
@Override
public ApiResult<TopicPartition, Void> handleResponse(Node broker, Set<TopicPartition> partitions, AbstractResponse response) {
DeleteKVsResponse deleteResponse = (DeleteKVsResponse) response;
DeleteKVsResponseData responseData = deleteResponse.data();
final Map<TopicPartition, Void> completed = new HashMap<>();
final Map<TopicPartition, Throwable> failed = new HashMap<>();
partitions.forEach(partition -> {
Errors error = Errors.forCode(responseData.errorCode());
if (error != Errors.NONE) {
failed.put(partition, error.exception());
} else {
completed.put(partition, null);
}
});
return new ApiResult<>(completed, failed, Collections.emptyList());
}
@Override
public AdminApiLookupStrategy<TopicPartition> lookupStrategy() {
return this.lookupStrategy;
}
public static AdminApiFuture.SimpleAdminApiFuture<TopicPartition, Void> newFuture(
Set<TopicPartition> partitions
) {
return AdminApiFuture.forKeys(new HashSet<>(partitions));
}
}

View File

@ -0,0 +1,95 @@
package org.apache.kafka.clients.admin.internals;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.message.GetKVsRequestData;
import org.apache.kafka.common.message.GetKVsResponseData;
import org.apache.kafka.common.message.GetKVsResponseData.GetKVResponse;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.s3.GetKVsRequest;
import org.apache.kafka.common.requests.s3.GetKVsResponse;
import org.apache.kafka.common.utils.LogContext;
import org.slf4j.Logger;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
public class GetNamespacedKVHandler extends AdminApiHandler.Batched<TopicPartition, GetKVResponse> {
private final Logger logger;
private final NamespacedKVRecordsToGet recordsToGet;
private final AdminApiLookupStrategy<TopicPartition> lookupStrategy;
private final List<TopicPartition> orderedPartitions;
public GetNamespacedKVHandler(LogContext logContext, NamespacedKVRecordsToGet recordsToGet) {
this.logger = logContext.logger(PutNamespacedKVHandler.class);
this.recordsToGet = recordsToGet;
this.lookupStrategy = new PartitionLeaderStrategy(logContext);
this.orderedPartitions = new ArrayList<>(recordsToGet.recordsByPartition().keySet());
}
@Override
AbstractRequest.Builder<?> buildBatchedRequest(int brokerId, Set<TopicPartition> partitions) {
GetKVsRequestData requestData = new GetKVsRequestData();
for (TopicPartition tp : orderedPartitions) {
if (partitions.contains(tp)) {
requestData.getKeyRequests().addAll(
recordsToGet.recordsByPartition().get(tp)
);
}
}
return new GetKVsRequest.Builder(requestData);
}
@Override
public String apiName() {
return "GetKVs";
}
@Override
public ApiResult<TopicPartition, GetKVResponse> handleResponse(Node broker, Set<TopicPartition> partitions, AbstractResponse response) {
GetKVsResponseData data = ((GetKVsResponse) response).data();
final Map<TopicPartition, GetKVResponse> completed = new LinkedHashMap<>();
final Map<TopicPartition, Throwable> failed = new HashMap<>();
List<GetKVResponse> responses = data.getKVResponses();
int responseIndex = 0;
for (TopicPartition tp : orderedPartitions) {
if (!partitions.contains(tp)) {
continue;
}
if (responseIndex >= responses.size()) {
failed.put(tp, new IllegalStateException("Missing response for partition"));
continue;
}
GetKVResponse resp = responses.get(responseIndex++);
if (resp.errorCode() == Errors.NONE.code()) {
completed.put(tp, resp);
} else {
failed.put(tp, Errors.forCode(resp.errorCode()).exception());
}
}
return new ApiResult<>(completed, failed, Collections.emptyList());
}
@Override
public AdminApiLookupStrategy<TopicPartition> lookupStrategy() {
return this.lookupStrategy;
}
public static AdminApiFuture.SimpleAdminApiFuture<TopicPartition, GetKVResponse> newFuture(
Set<TopicPartition> partitions
) {
return AdminApiFuture.forKeys(new HashSet<>(partitions));
}
}

View File

@ -0,0 +1,38 @@
package org.apache.kafka.clients.admin.internals;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.message.GetKVsRequestData.GetKVRequest;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class NamespacedKVRecordsToGet {
private final Map<TopicPartition, List<GetKVRequest>> recordsByPartition;
public NamespacedKVRecordsToGet(Map<TopicPartition, List<GetKVRequest>> recordsByPartition) {
this.recordsByPartition = recordsByPartition;
}
public static NamespacedKVRecordsToGet.Builder newBuilder() {
return new NamespacedKVRecordsToGet.Builder();
}
public static class Builder {
private final Map<TopicPartition, List<GetKVRequest>> records = new HashMap<>();
public Builder addRecord(TopicPartition tp, GetKVRequest req) {
records.computeIfAbsent(tp, k -> new ArrayList<>()).add(req);
return this;
}
public NamespacedKVRecordsToGet build() {
return new NamespacedKVRecordsToGet(records);
}
}
public Map<TopicPartition, List<GetKVRequest>> recordsByPartition() {
return recordsByPartition;
}
}

View File

@ -0,0 +1,39 @@
package org.apache.kafka.clients.admin.internals;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.message.PutKVsRequestData.PutKVRequest;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class NamespacedKVRecordsToPut {
private final Map<TopicPartition, List<PutKVRequest>> recordsByPartition;
private NamespacedKVRecordsToPut(Map<TopicPartition, List<PutKVRequest>> recordsByPartition) {
this.recordsByPartition = recordsByPartition;
}
public static Builder newBuilder() {
return new Builder();
}
public Map<TopicPartition, List<PutKVRequest>> recordsByPartition() {
return recordsByPartition;
}
public static class Builder {
private final Map<TopicPartition, List<PutKVRequest>> records = new HashMap<>();
public Builder addRecord(TopicPartition partition, PutKVRequest request) {
records.computeIfAbsent(partition, k -> new ArrayList<>()).add(request);
return this;
}
public NamespacedKVRecordsToPut build() {
return new NamespacedKVRecordsToPut(records);
}
}
}

View File

@ -0,0 +1,96 @@
package org.apache.kafka.clients.admin.internals;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.message.PutKVsRequestData;
import org.apache.kafka.common.message.PutKVsRequestData.PutKVRequest;
import org.apache.kafka.common.message.PutKVsResponseData;
import org.apache.kafka.common.message.PutKVsResponseData.PutKVResponse;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.s3.PutKVsRequest;
import org.apache.kafka.common.requests.s3.PutKVsResponse;
import org.apache.kafka.common.utils.LogContext;
import org.slf4j.Logger;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
public class PutNamespacedKVHandler extends AdminApiHandler.Batched<TopicPartition, PutKVResponse> {
private final Logger logger;
private final NamespacedKVRecordsToPut recordsToPut;
private final AdminApiLookupStrategy<TopicPartition> lookupStrategy;
private final List<TopicPartition> orderedPartitions;
public PutNamespacedKVHandler(LogContext logContext, NamespacedKVRecordsToPut recordsToPut) {
this.logger = logContext.logger(PutNamespacedKVHandler.class);
this.recordsToPut = recordsToPut;
this.lookupStrategy = new PartitionLeaderStrategy(logContext);
this.orderedPartitions = new ArrayList<>(recordsToPut.recordsByPartition().keySet());
}
@Override
protected AbstractRequest.Builder<?> buildBatchedRequest(int brokerId, Set<TopicPartition> partitions) {
PutKVsRequestData requestData = new PutKVsRequestData();
List<PutKVRequest> allPutRequests = orderedPartitions.stream()
.filter(partitions::contains)
.map(tp -> recordsToPut.recordsByPartition().get(tp))
.filter(Objects::nonNull)
.flatMap(Collection::stream).collect(Collectors.toList());
requestData.setPutKVRequests(allPutRequests);
return new PutKVsRequest.Builder(requestData);
}
@Override
public String apiName() {
return "PutKVs";
}
@Override
public ApiResult<TopicPartition, PutKVResponse> handleResponse(Node broker, Set<TopicPartition> partitions, AbstractResponse response) {
PutKVsResponseData responseData = ((PutKVsResponse) response).data();
List<PutKVResponse> responses = responseData.putKVResponses();
final Map<TopicPartition, PutKVResponse> completed = new LinkedHashMap<>();
final Map<TopicPartition, Throwable> failed = new HashMap<>();
int responseIndex = 0;
for (TopicPartition tp : orderedPartitions) {
if (!partitions.contains(tp)) {
continue;
}
if (responseIndex >= responses.size()) {
failed.put(tp, new IllegalStateException("Missing response for partition"));
continue;
}
PutKVResponse resp = responses.get(responseIndex++);
if (resp.errorCode() == Errors.NONE.code()) {
completed.put(tp, resp);
} else {
failed.put(tp, Errors.forCode(resp.errorCode()).exception());
}
}
return new ApiResult<>(completed, failed, Collections.emptyList());
}
@Override
public AdminApiLookupStrategy<TopicPartition> lookupStrategy() {
return this.lookupStrategy;
}
public static AdminApiFuture.SimpleAdminApiFuture<TopicPartition, PutKVResponse> newFuture(
Set<TopicPartition> partitions
) {
return AdminApiFuture.forKeys(new HashSet<>(partitions));
}
}

View File

@ -0,0 +1,8 @@
package org.apache.kafka.common.errors;
public class InvalidKVRecordEpochException extends ApiException {
public InvalidKVRecordEpochException(String message) {
super(message);
}
}

View File

@ -61,6 +61,7 @@ import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.apache.kafka.common.errors.InvalidFetchSessionEpochException;
import org.apache.kafka.common.errors.InvalidFetchSizeException;
import org.apache.kafka.common.errors.InvalidGroupIdException;
import org.apache.kafka.common.errors.InvalidKVRecordEpochException;
import org.apache.kafka.common.errors.InvalidPartitionsException;
import org.apache.kafka.common.errors.InvalidPidMappingException;
import org.apache.kafka.common.errors.InvalidPrincipalTypeException;
@ -440,6 +441,7 @@ public enum Errors {
NODE_LOCKED(515, "The node is locked", NodeLockedException::new),
OBJECT_NOT_COMMITED(516, "The object is not commited.", ObjectNotCommittedException::new),
STREAM_INNER_ERROR(599, "The stream inner error.", StreamInnerErrorException::new),
INVALID_KV_RECORD_EPOCH(600, "The KV record epoch is invalid.", InvalidKVRecordEpochException::new),
// AutoMQ inject end
INVALID_RECORD_STATE(121, "The record state is invalid. The acknowledgement of delivery could not be completed.", InvalidRecordStateException::new),

View File

@ -21,7 +21,7 @@
"broker"
],
"name": "DeleteKVsRequest",
"validVersions": "0",
"validVersions": "0-1",
"flexibleVersions": "0+",
"fields": [
{
@ -35,8 +35,20 @@
"type": "string",
"versions": "0+",
"about": "Key is the key to delete"
},
{
"name": "Namespace",
"type": "string",
"versions": "1+",
"about": "Namespace"
},
{
"name": "Epoch",
"type": "int64",
"versions": "1+",
"about": "Epoch"
}
]
}
]
}
}

View File

@ -17,7 +17,7 @@
"apiKey": 511,
"type": "response",
"name": "DeleteKVsResponse",
"validVersions": "0",
"validVersions": "0-1",
"flexibleVersions": "0+",
"fields": [
{
@ -50,8 +50,15 @@
"versions": "0+",
"nullableVersions": "0+",
"about": "Value"
},
{
"name": "Epoch",
"type": "int64",
"versions": "1+",
"about": "Epoch"
}
]
}
]
}
}

View File

@ -21,7 +21,7 @@
"broker"
],
"name": "GetKVsRequest",
"validVersions": "0",
"validVersions": "0-1",
"flexibleVersions": "0+",
"fields": [
{
@ -35,8 +35,14 @@
"type": "string",
"versions": "0+",
"about": "Key is the key to get"
},
{
"name": "Namespace",
"type": "string",
"versions": "1+",
"about": "Namespace"
}
]
}
]
}
}

View File

@ -17,7 +17,7 @@
"apiKey": 509,
"type": "response",
"name": "GetKVsResponse",
"validVersions": "0",
"validVersions": "0-1",
"flexibleVersions": "0+",
"fields": [
{
@ -50,8 +50,20 @@
"versions": "0+",
"nullableVersions": "0+",
"about": "Value"
},
{
"name": "Namespace",
"type": "string",
"versions": "1+",
"about": "Namespace"
},
{
"name": "Epoch",
"type": "int64",
"versions": "1+",
"about": "Epoch"
}
]
}
]
}
}

View File

@ -21,7 +21,7 @@
"broker"
],
"name": "PutKVsRequest",
"validVersions": "0",
"validVersions": "0-1",
"flexibleVersions": "0+",
"fields": [
{
@ -47,8 +47,20 @@
"type": "bool",
"versions": "0+",
"about": "overwrite put kv"
},
{
"name": "Namespace",
"type": "string",
"versions": "1+",
"about": "Namespace"
},
{
"name": "Epoch",
"type": "int64",
"versions": "1+",
"about": "Epoch"
}
]
}
]
}
}

View File

@ -17,7 +17,7 @@
"apiKey": 510,
"type": "response",
"name": "PutKVsResponse",
"validVersions": "0",
"validVersions": "0-1",
"flexibleVersions": "0+",
"fields": [
{
@ -49,8 +49,14 @@
"type": "bytes",
"versions": "0+",
"about": "Value"
},
{
"name": "Epoch",
"type": "int64",
"versions": "1+",
"about": "Epoch"
}
]
}
]
}
}

View File

@ -116,6 +116,7 @@ import org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult;
import org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult;
import org.apache.kafka.common.message.FindCoordinatorRequestData;
import org.apache.kafka.common.message.FindCoordinatorResponseData;
import org.apache.kafka.common.message.GetKVsResponseData;
import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData;
import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData;
import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.AlterConfigsResourceResponse;
@ -141,6 +142,7 @@ import org.apache.kafka.common.message.OffsetDeleteResponseData.OffsetDeleteResp
import org.apache.kafka.common.message.OffsetFetchRequestData;
import org.apache.kafka.common.message.OffsetFetchRequestData.OffsetFetchRequestGroup;
import org.apache.kafka.common.message.OffsetFetchRequestData.OffsetFetchRequestTopics;
import org.apache.kafka.common.message.PutKVsResponseData;
import org.apache.kafka.common.message.UnregisterBrokerResponseData;
import org.apache.kafka.common.message.WriteTxnMarkersResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
@ -6948,7 +6950,7 @@ public class KafkaAdminClientTest {
assertNotNull(result.descriptions().get(1).get());
}
}
@Test
public void testDescribeReplicaLogDirsWithNonExistReplica() throws Exception {
int brokerId = 0;
@ -6967,7 +6969,7 @@ public class KafkaAdminClientTest {
DescribeReplicaLogDirsResult result = env.adminClient().describeReplicaLogDirs(asList(tpr1, tpr2));
Map<TopicPartitionReplica, KafkaFuture<DescribeReplicaLogDirsResult.ReplicaLogDirInfo>> values = result.values();
assertEquals(logDir, values.get(tpr1).get().getCurrentReplicaLogDir());
assertNull(values.get(tpr1).get().getFutureReplicaLogDir());
assertEquals(offsetLag, values.get(tpr1).get().getCurrentReplicaOffsetLag());

View File

@ -1450,6 +1450,21 @@ public class MockAdminClient extends AdminClient {
throw new UnsupportedOperationException();
}
@Override
public GetNamespacedKVResult getNamespacedKV(Optional<Set<TopicPartition>> partitions, String namespace, String key, GetNamespacedKVOptions options) {
throw new UnsupportedOperationException();
}
@Override
public PutNamespacedKVResult putNamespacedKV(Optional<Set<TopicPartition>> partitions, String namespace, String key, String value, PutNamespacedKVOptions options) {
throw new UnsupportedOperationException();
}
@Override
public DeleteNamespacedKVResult deleteNamespacedKV(Optional<Set<TopicPartition>> partitions, String namespace, String key, DeleteNamespacedKVOptions options) {
throw new UnsupportedOperationException();
}
// AutoMQ inject end
}

View File

@ -44,7 +44,9 @@ import org.apache.kafka.common.requests.s3.PutKVsRequest;
import com.automq.stream.api.KVClient;
import com.automq.stream.api.KeyValue;
import com.automq.stream.api.KeyValue.Key;
import com.automq.stream.api.KeyValue.KeyAndNamespace;
import com.automq.stream.api.KeyValue.Value;
import com.automq.stream.api.KeyValue.ValueAndEpoch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -247,4 +249,196 @@ public class ControllerKVClient implements KVClient {
this.requestSender.send(task);
return future;
}
@Override
public CompletableFuture<ValueAndEpoch> putNamespacedKVIfAbsent(KeyValue keyValue) {
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("[ControllerKVClient]: Put Namespaced KV if absent: {}", keyValue);
}
PutKVRequest request = new PutKVRequest()
.setKey(keyValue.key().get())
.setValue(keyValue.value().get().array())
.setNamespace(keyValue.namespace())
.setEpoch(keyValue.epoch());
WrapRequest req = new BatchRequest() {
@Override
public Builder addSubRequest(Builder builder) {
PutKVsRequest.Builder realBuilder = (PutKVsRequest.Builder) builder;
return realBuilder.addSubRequest(request);
}
@Override
public ApiKeys apiKey() {
return ApiKeys.PUT_KVS;
}
@Override
public Builder toRequestBuilder() {
return new PutKVsRequest.Builder(
new PutKVsRequestData()
).addSubRequest(request);
}
};
CompletableFuture<ValueAndEpoch> future = new CompletableFuture<>();
RequestTask<PutKVResponse, ValueAndEpoch> task = new RequestTask<PutKVResponse, ValueAndEpoch>(req, future, response -> {
Errors code = Errors.forCode(response.errorCode());
switch (code) {
case NONE:
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("[ControllerKVClient]: Put Namespaced KV if absent: {}, result: {}", keyValue, response);
}
return ResponseHandleResult.withSuccess(ValueAndEpoch.of(response.value(), response.epoch()));
case KEY_EXIST:
LOGGER.warn("[ControllerKVClient]: Failed to Put Namespaced KV if absent: {}, code: {}, key already exist", keyValue, code);
return ResponseHandleResult.withSuccess(ValueAndEpoch.of(response.value(), response.epoch()));
default:
LOGGER.error("[ControllerKVClient]: Failed to Put Namespaced KV if absent: {}, code: {}, retry later", keyValue, code);
return ResponseHandleResult.withRetry();
}
});
this.requestSender.send(task);
return future;
}
@Override
public CompletableFuture<ValueAndEpoch> putNamespacedKV(KeyValue keyValue) {
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("[ControllerKVClient]: Put Namespaced KV: {}", keyValue);
}
PutKVRequest request = new PutKVRequest()
.setKey(keyValue.key().get())
.setValue(keyValue.value().get().array())
.setNamespace(keyValue.namespace())
.setEpoch(keyValue.epoch())
.setOverwrite(true);
WrapRequest req = new BatchRequest() {
@Override
public Builder addSubRequest(Builder builder) {
PutKVsRequest.Builder realBuilder = (PutKVsRequest.Builder) builder;
return realBuilder.addSubRequest(request);
}
@Override
public ApiKeys apiKey() {
return ApiKeys.PUT_KVS;
}
@Override
public Builder toRequestBuilder() {
return new PutKVsRequest.Builder(
new PutKVsRequestData()
).addSubRequest(request);
}
};
CompletableFuture<ValueAndEpoch> future = new CompletableFuture<>();
RequestTask<PutKVResponse, ValueAndEpoch> task = new RequestTask<PutKVResponse, ValueAndEpoch>(req, future, response -> {
Errors code = Errors.forCode(response.errorCode());
switch (code) {
case NONE:
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("[ControllerKVClient]: Put Namespaced KV: {}, result: {}", keyValue, response);
}
return ResponseHandleResult.withSuccess(ValueAndEpoch.of(response.value(), response.epoch()));
default:
LOGGER.error("[ControllerKVClient]: Failed to Put Namespaced KV: {}, code: {}, retry later", keyValue, code);
return ResponseHandleResult.withRetry();
}
});
this.requestSender.send(task);
return future;
}
@Override
public CompletableFuture<ValueAndEpoch> getNamespacedKV(KeyAndNamespace keyAndNamespace) {
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("[ControllerKVClient]: Get KV: {}, Namespace: {}", keyAndNamespace.key(), keyAndNamespace.namespace());
}
GetKVRequest request = new GetKVRequest()
.setKey(keyAndNamespace.key().get())
.setNamespace(keyAndNamespace.namespace());
WrapRequest req = new BatchRequest() {
@Override
public Builder addSubRequest(Builder builder) {
GetKVsRequest.Builder realBuilder = (GetKVsRequest.Builder) builder;
return realBuilder.addSubRequest(request);
}
@Override
public ApiKeys apiKey() {
return ApiKeys.GET_KVS;
}
@Override
public Builder toRequestBuilder() {
return new GetKVsRequest.Builder(
new GetKVsRequestData()
).addSubRequest(request);
}
};
CompletableFuture<ValueAndEpoch> future = new CompletableFuture<>();
RequestTask<GetKVResponse, ValueAndEpoch> task = new RequestTask<>(req, future, response -> {
Errors code = Errors.forCode(response.errorCode());
switch (code) {
case NONE:
ValueAndEpoch val = ValueAndEpoch.of(response.value(), response.epoch());
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("[ControllerKVClient]: Get Namespaced KV: {}, result: {}", keyAndNamespace.key(), response);
}
return ResponseHandleResult.withSuccess(val);
default:
LOGGER.error("[ControllerKVClient]: Failed to Get Namespaced KV: {}, code: {}, retry later", keyAndNamespace.key(), code);
return ResponseHandleResult.withRetry();
}
});
this.requestSender.send(task);
return future;
}
@Override
public CompletableFuture<ValueAndEpoch> delNamespacedKV(KeyValue keyValue) {
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("[ControllerKVClient]: Delete Namespaced KV: {}", keyValue.key());
}
DeleteKVRequest request = new DeleteKVRequest()
.setKey(keyValue.key().get());
WrapRequest req = new BatchRequest() {
@Override
public Builder addSubRequest(Builder builder) {
DeleteKVsRequest.Builder realBuilder = (DeleteKVsRequest.Builder) builder;
return realBuilder.addSubRequest(request);
}
@Override
public ApiKeys apiKey() {
return ApiKeys.DELETE_KVS;
}
@Override
public Builder toRequestBuilder() {
return new DeleteKVsRequest.Builder(
new DeleteKVsRequestData()
).addSubRequest(request);
}
};
CompletableFuture<ValueAndEpoch> future = new CompletableFuture<>();
RequestTask<DeleteKVResponse, ValueAndEpoch> task = new RequestTask<>(req, future, response -> {
Errors code = Errors.forCode(response.errorCode());
switch (code) {
case NONE:
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("[ControllerKVClient]: Delete Namespaced KV: {}, result: {}", keyValue.key(), response);
}
return ResponseHandleResult.withSuccess(ValueAndEpoch.of(response.value(), response.epoch()));
case KEY_NOT_EXIST:
LOGGER.info("[ControllerKVClient]: Delete Namespaced KV: {}, result: KEY_NOT_EXIST", keyValue.key());
return ResponseHandleResult.withSuccess(ValueAndEpoch.of((ByteBuffer) null, 0L));
default:
LOGGER.error("[ControllerKVClient]: Failed to Delete Namespaced KV: {}, code: {}, retry later", keyValue.key(), code);
return ResponseHandleResult.withRetry();
}
});
this.requestSender.send(task);
return future;
}
}

View File

@ -19,6 +19,7 @@
package kafka.log.streamaspect;
import com.automq.stream.DefaultRecordBatch;
import com.automq.stream.RecordBatchWithContextWrapper;
import com.automq.stream.api.AppendResult;
@ -28,7 +29,9 @@ import com.automq.stream.api.FetchResult;
import com.automq.stream.api.KVClient;
import com.automq.stream.api.KeyValue;
import com.automq.stream.api.KeyValue.Key;
import com.automq.stream.api.KeyValue.KeyAndNamespace;
import com.automq.stream.api.KeyValue.Value;
import com.automq.stream.api.KeyValue.ValueAndEpoch;
import com.automq.stream.api.OpenStreamOptions;
import com.automq.stream.api.RecordBatch;
import com.automq.stream.api.RecordBatchWithContext;
@ -51,6 +54,9 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicLong;
import static com.automq.stream.utils.KVRecordUtils.buildCompositeKey;
import static org.apache.kafka.common.protocol.Errors.INVALID_KV_RECORD_EPOCH;
public class MemoryClient implements Client {
private final StreamClient streamClient = new StreamClientImpl();
private final KVClient kvClient = new KVClientImpl();
@ -180,6 +186,7 @@ public class MemoryClient implements Client {
public static class KVClientImpl implements KVClient {
private final Map<String, ByteBuffer> store = new ConcurrentHashMap<>();
private final Map<String, KeyMetadata> keyMetadataMap = new ConcurrentHashMap<>();
@Override
public CompletableFuture<Value> putKV(KeyValue keyValue) {
@ -202,5 +209,77 @@ public class MemoryClient implements Client {
public CompletableFuture<Value> delKV(Key key) {
return CompletableFuture.completedFuture(Value.of(store.remove(key.get())));
}
@Override
public CompletableFuture<ValueAndEpoch> putNamespacedKVIfAbsent(KeyValue keyValue) {
String key = buildCompositeKey(keyValue.namespace(), keyValue.key().get());
KeyMetadata keyMetadata = keyMetadataMap.get(key);
long currentEpoch = keyMetadata != null ? keyMetadata.getEpoch() : 0;
if (keyValue.epoch() > 0 && keyValue.epoch() != currentEpoch) {
return CompletableFuture.failedFuture(INVALID_KV_RECORD_EPOCH.exception());
}
long newEpoch = System.currentTimeMillis();
ByteBuffer value = store.putIfAbsent(key, keyValue.value().get().duplicate());
if (keyValue.namespace() != null && !keyValue.namespace().isEmpty()) {
keyMetadataMap.putIfAbsent(keyValue.key().get(), new KeyMetadata(keyValue.namespace(), newEpoch));
}
return CompletableFuture.completedFuture(ValueAndEpoch.of(value, newEpoch));
}
@Override
public CompletableFuture<ValueAndEpoch> putNamespacedKV(KeyValue keyValue) {
String key = buildCompositeKey(keyValue.namespace(), keyValue.key().get());
KeyMetadata keyMetadata = keyMetadataMap.get(key);
long currentEpoch = keyMetadata != null ? keyMetadata.getEpoch() : 0;
if (keyValue.epoch() > 0 && keyValue.epoch() != currentEpoch) {
return CompletableFuture.failedFuture(INVALID_KV_RECORD_EPOCH.exception());
}
long newEpoch = System.currentTimeMillis();
ByteBuffer value = store.put(key, keyValue.value().get().duplicate());
if (keyValue.namespace() != null && !keyValue.namespace().isEmpty()) {
keyMetadataMap.put(keyValue.key().get(), new KeyMetadata(keyValue.namespace(), newEpoch));
}
return CompletableFuture.completedFuture(ValueAndEpoch.of(value, newEpoch));
}
@Override
public CompletableFuture<ValueAndEpoch> getNamespacedKV(KeyAndNamespace keyAndNamespace) {
String key = buildCompositeKey(keyAndNamespace.namespace(), keyAndNamespace.key().get());
KeyMetadata keyMetadata = null;
if (keyAndNamespace.namespace() != null && !keyAndNamespace.namespace().isEmpty()) {
keyMetadata = keyMetadataMap.get(key);
}
return CompletableFuture.completedFuture(ValueAndEpoch.of(store.get(key), keyMetadata != null ? keyMetadata.getEpoch() : 0L));
}
@Override
public CompletableFuture<ValueAndEpoch> delNamespacedKV(KeyValue keyValue) {
String key = buildCompositeKey(keyValue.namespace(), keyValue.key().get());
KeyMetadata keyMetadata = keyMetadataMap.get(key);
long currentEpoch = keyMetadata != null ? keyMetadata.getEpoch() : 0;
if (keyValue.epoch() > 0 && keyValue.epoch() != currentEpoch) {
return CompletableFuture.failedFuture(INVALID_KV_RECORD_EPOCH.exception());
}
return CompletableFuture.completedFuture(ValueAndEpoch.of(store.remove(key), currentEpoch));
}
private static class KeyMetadata {
private final long epoch;
private final String namespace;
public KeyMetadata(String namespace, long epoch) {
this.namespace = namespace;
this.epoch = epoch;
}
public long getEpoch() {
return epoch;
}
public String getNamespace() {
return namespace;
}
}
}
}

View File

@ -2051,9 +2051,6 @@ public final class QuorumController implements Controller {
this.time = time;
this.controllerMetrics = controllerMetrics;
this.snapshotRegistry = new SnapshotRegistry(logContext);
// AutoMQ for Kafka inject start
this.kvControlManager = new KVControlManager(snapshotRegistry, logContext);
// AutoMQ for Kafka inject end
this.deferredEventQueue = new DeferredEventQueue(logContext);
this.deferredUnstableEventQueue = new DeferredEventQueue(logContext);
this.offsetControl = new OffsetControlManager.Builder().
@ -2090,6 +2087,9 @@ public final class QuorumController implements Controller {
setMetadataVersion(MetadataVersion.MINIMUM_KRAFT_VERSION).
setClusterFeatureSupportDescriber(clusterSupportDescriber).
build();
// AutoMQ for Kafka inject start
this.kvControlManager = new KVControlManager(snapshotRegistry, logContext, featureControl);
// AutoMQ for Kafka inject end
this.clusterControl = new ClusterControlManager.Builder().
setLogContext(logContext).
setClusterId(clusterId).

View File

@ -28,6 +28,7 @@ import org.apache.kafka.common.metadata.KVRecord.KeyValue;
import org.apache.kafka.common.metadata.RemoveKVRecord;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.controller.ControllerResult;
import org.apache.kafka.controller.FeatureControlManager;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.apache.kafka.timeline.TimelineHashMap;
@ -39,6 +40,8 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import static com.automq.stream.utils.KVRecordUtils.buildCompositeKey;
import static org.apache.kafka.common.protocol.Errors.INVALID_KV_RECORD_EPOCH;
import static org.apache.kafka.common.protocol.Errors.KEY_EXIST;
import static org.apache.kafka.common.protocol.Errors.KEY_NOT_EXIST;
@ -47,30 +50,54 @@ public class KVControlManager {
private final SnapshotRegistry registry;
private final Logger log;
private final TimelineHashMap<String, ByteBuffer> kv;
private final TimelineHashMap<String, KeyMetadata> keyMetadataMap;
private final FeatureControlManager featureControl;
public KVControlManager(SnapshotRegistry registry, LogContext logContext) {
public KVControlManager(SnapshotRegistry registry, LogContext logContext, FeatureControlManager featureControl) {
this.registry = registry;
this.log = logContext.logger(KVControlManager.class);
this.kv = new TimelineHashMap<>(registry, 0);
this.keyMetadataMap = new TimelineHashMap<>(registry, 0);
this.featureControl = featureControl;
}
public GetKVResponse getKV(GetKVRequest request) {
String key = request.key();
String key = buildCompositeKey(request.namespace(), request.key());
byte[] value = kv.containsKey(key) ? kv.get(key).array() : null;
KeyMetadata keyMetadata = null;
if (request.namespace() != null && !request.namespace().isEmpty()) {
keyMetadata = keyMetadataMap.get(key);
}
return new GetKVResponse()
.setValue(value);
.setValue(value)
.setNamespace(request.namespace())
.setEpoch(keyMetadata != null ? keyMetadata.getEpoch() : 0L);
}
public ControllerResult<PutKVResponse> putKV(PutKVRequest request) {
String key = request.key();
String key = buildCompositeKey(request.namespace(), request.key());
KeyMetadata keyMetadata = keyMetadataMap.get(key);
long currentEpoch = keyMetadata != null ? keyMetadata.getEpoch() : 0;
if (request.epoch() > 0 && request.epoch() != currentEpoch) {
return ControllerResult.of(Collections.emptyList(),
new PutKVResponse()
.setErrorCode(INVALID_KV_RECORD_EPOCH.code())
.setEpoch(currentEpoch));
}
long newEpoch = System.currentTimeMillis();
ByteBuffer value = kv.get(key);
if (value == null || request.overwrite()) {
// generate kv record
ApiMessageAndVersion record = new ApiMessageAndVersion(new KVRecord()
.setKeyValues(Collections.singletonList(new KeyValue()
.setKey(key)
.setValue(request.value()))), (short) 0);
return ControllerResult.of(Collections.singletonList(record), new PutKVResponse().setValue(request.value()));
.setValue(request.value())
.setNamespace(request.namespace())
.setEpoch(newEpoch))),
featureControl.autoMQVersion().namespacedKVRecordVersion());
return ControllerResult.of(Collections.singletonList(record), new PutKVResponse().setValue(request.value()).setEpoch(newEpoch));
}
// exist and not allow overwriting
return ControllerResult.of(Collections.emptyList(), new PutKVResponse()
@ -79,14 +106,25 @@ public class KVControlManager {
}
public ControllerResult<DeleteKVResponse> deleteKV(DeleteKVRequest request) {
String key = buildCompositeKey(request.namespace(), request.key());
KeyMetadata keyMetadata = keyMetadataMap.get(key);
long currentEpoch = keyMetadata != null ? keyMetadata.getEpoch() : 0;
if (request.epoch() > 0 && request.epoch() != currentEpoch) {
return ControllerResult.of(Collections.emptyList(),
new DeleteKVResponse()
.setErrorCode(INVALID_KV_RECORD_EPOCH.code())
.setEpoch(currentEpoch));
}
log.trace("DeleteKVRequestData: {}", request);
DeleteKVResponse resp = new DeleteKVResponse();
ByteBuffer value = kv.get(request.key());
ByteBuffer value = kv.get(key);
if (value != null) {
// generate remove-kv record
ApiMessageAndVersion record = new ApiMessageAndVersion(new RemoveKVRecord()
.setKeys(Collections.singletonList(request.key())), (short) 0);
return ControllerResult.of(Collections.singletonList(record), resp.setValue(value.array()));
.setKeys(Collections.singletonList(key))
.setNamespace(request.namespace()),
featureControl.autoMQVersion().namespacedKVRecordVersion());
return ControllerResult.of(Collections.singletonList(record), resp.setValue(value.array()).setEpoch(currentEpoch));
}
return ControllerResult.of(Collections.emptyList(), resp.setErrorCode(KEY_NOT_EXIST.code()));
}
@ -95,6 +133,9 @@ public class KVControlManager {
List<KeyValue> keyValues = record.keyValues();
for (KeyValue keyValue : keyValues) {
kv.put(keyValue.key(), ByteBuffer.wrap(keyValue.value()));
if (keyValue.namespace() != null && !keyValue.namespace().isEmpty()) {
keyMetadataMap.put(keyValue.key(), new KeyMetadata(keyValue.namespace(), keyValue.epoch()));
}
}
}
@ -102,10 +143,30 @@ public class KVControlManager {
List<String> keys = record.keys();
for (String key : keys) {
kv.remove(key);
if (record.namespace() != null && !record.namespace().isEmpty()) {
keyMetadataMap.remove(key);
}
}
}
public Map<String, ByteBuffer> kv() {
return kv;
}
private static class KeyMetadata {
private final long epoch;
private final String namespace;
public KeyMetadata(String namespace, long epoch) {
this.namespace = namespace;
this.epoch = epoch;
}
public long getEpoch() {
return epoch;
}
public String getNamespace() {
return namespace;
}
}
}

View File

@ -17,7 +17,7 @@
"apiKey": 516,
"type": "metadata",
"name": "KVRecord",
"validVersions": "0",
"validVersions": "0-1",
"flexibleVersions": "0+",
"fields": [
{
@ -37,8 +37,20 @@
"type": "bytes",
"versions": "0+",
"about": "Value"
},
{
"name": "Namespace",
"type": "string",
"versions": "1+",
"about": "Namespace"
},
{
"name": "Epoch",
"type": "int64",
"versions": "1+",
"about": "Epoch"
}
]
}
]
}
}

View File

@ -17,7 +17,7 @@
"apiKey": 517,
"type": "metadata",
"name": "RemoveKVRecord",
"validVersions": "0",
"validVersions": "0-1",
"flexibleVersions": "0+",
"fields": [
{
@ -25,6 +25,12 @@
"type": "[]string",
"versions": "0+",
"about": "Keys"
},
{
"name": "Namespace",
"type": "string",
"versions": "1+",
"about": "Namespace"
}
]
}
}

View File

@ -74,6 +74,7 @@ import java.util.Set;
import java.util.stream.Stream;
import static java.util.Arrays.asList;
import static org.apache.kafka.controller.FeatureControlManagerTest.features;
import static org.apache.kafka.server.common.MetadataVersion.IBP_3_3_IV2;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
@ -728,7 +729,12 @@ public class ClusterControlManagerTest {
public void testReusableNodeIds() {
MockTime time = new MockTime(0, 0, 0);
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
KVControlManager kvControl = new KVControlManager(snapshotRegistry, new LogContext());
FeatureControlManager featureControlManager = new FeatureControlManager.Builder().
setQuorumFeatures(features("foo", 1, 2)).
setSnapshotRegistry(snapshotRegistry).
setMetadataVersion(MetadataVersion.IBP_3_3_IV0).
build();
KVControlManager kvControl = new KVControlManager(snapshotRegistry, new LogContext(), featureControlManager);
FeatureControlManager featureControl = new FeatureControlManager.Builder().
setSnapshotRegistry(snapshotRegistry).
setQuorumFeatures(new QuorumFeatures(0,

View File

@ -31,6 +31,7 @@ import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.controller.stream.KVControlManager;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.junit.jupiter.api.BeforeEach;
@ -41,8 +42,11 @@ import org.junit.jupiter.api.Timeout;
import java.util.List;
import java.util.stream.Collectors;
import static org.apache.kafka.common.protocol.Errors.INVALID_KV_RECORD_EPOCH;
import static org.apache.kafka.controller.FeatureControlManagerTest.features;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
@Timeout(40)
@Tag("S3Unit")
@ -54,7 +58,12 @@ public class KVControlManagerTest {
public void setUp() {
LogContext logContext = new LogContext();
SnapshotRegistry registry = new SnapshotRegistry(logContext);
this.manager = new KVControlManager(registry, logContext);
FeatureControlManager featureControlManager = new FeatureControlManager.Builder().
setQuorumFeatures(features("foo", 1, 2)).
setSnapshotRegistry(registry).
setMetadataVersion(MetadataVersion.IBP_3_3_IV0).
build();
this.manager = new KVControlManager(registry, logContext, featureControlManager);
}
@Test
@ -113,6 +122,149 @@ public class KVControlManagerTest {
assertEquals(Errors.KEY_NOT_EXIST.code(), result3.response().errorCode());
}
@Test
public void testNamespacedReadWrite() {
ControllerResult<PutKVResponse> result = manager.putKV(new PutKVRequest()
.setKey("key1")
.setValue("value1".getBytes())
.setNamespace("__automq_test")
);
assertEquals(1, result.records().size());
assertEquals(Errors.NONE.code(), result.response().errorCode());
assertEquals("value1", new String(result.response().value()));
replay(manager, result.records());
result = manager.putKV(new PutKVRequest()
.setKey("key1")
.setValue("value1-1".getBytes())
.setNamespace("__automq_test")
);
assertEquals(0, result.records().size());
assertEquals(Errors.KEY_EXIST.code(), result.response().errorCode());
assertEquals("value1", new String(result.response().value()));
result = manager.putKV(new PutKVRequest()
.setKey("key1")
.setValue("value1-2".getBytes())
.setNamespace("__automq_test")
.setOverwrite(true));
assertEquals(1, result.records().size());
assertEquals(Errors.NONE.code(), result.response().errorCode());
assertEquals("value1-2", new String(result.response().value()));
replay(manager, result.records());
GetKVResponse result2 = manager.getKV(new GetKVRequest()
.setKey("key1")
.setNamespace("__automq_test"));
assertEquals("value1-2", new String(result2.value()));
result2 = manager.getKV(new GetKVRequest()
.setKey("key2")
.setNamespace("__automq_test"));
assertNull(result2.value());
ControllerResult<DeleteKVResponse> result3 = manager.deleteKV(new DeleteKVRequest()
.setKey("key2")
.setNamespace("__automq_test"));
assertEquals(0, result3.records().size());
assertEquals(Errors.KEY_NOT_EXIST.code(), result3.response().errorCode());
result3 = manager.deleteKV(new DeleteKVRequest()
.setKey("key1")
.setNamespace("__automq_test")
);
assertEquals(1, result3.records().size());
assertEquals(Errors.NONE.code(), result3.response().errorCode());
assertEquals("value1-2", new String(result3.response().value()));
replay(manager, result3.records());
// key1 is deleted
result2 = manager.getKV(new GetKVRequest()
.setKey("key1")
.setNamespace("__automq_test"));
assertNull(result2.value());
result3 = manager.deleteKV(new DeleteKVRequest()
.setKey("key1")
.setNamespace("__automq_test"));
assertEquals(0, result3.records().size());
assertEquals(Errors.KEY_NOT_EXIST.code(), result3.response().errorCode());
}
@Test
public void testPutWithEpochValidation() {
ControllerResult<PutKVResponse> result = manager.putKV(new PutKVRequest()
.setKey("key1")
.setValue("value1".getBytes())
.setNamespace("__epoch_test")
.setEpoch(0));
assertEquals(1, result.records().size());
assertEquals(Errors.NONE.code(), result.response().errorCode());
long initialEpoch = result.response().epoch();
assertTrue(initialEpoch > 0);
replay(manager, result.records());
result = manager.putKV(new PutKVRequest()
.setKey("key1")
.setValue("value2".getBytes())
.setNamespace("__epoch_test")
.setEpoch(initialEpoch - 1)
.setOverwrite(true));
assertEquals(0, result.records().size());
assertEquals(INVALID_KV_RECORD_EPOCH.code(), result.response().errorCode());
assertEquals(initialEpoch, result.response().epoch());
// without overwrite, should fail
result = manager.putKV(new PutKVRequest()
.setKey("key1")
.setValue("value2".getBytes())
.setNamespace("__epoch_test")
.setEpoch(initialEpoch));
assertEquals(0, result.records().size());
assertEquals(Errors.KEY_EXIST.code(), result.response().errorCode());
// with overwrite, should succeed
result = manager.putKV(new PutKVRequest()
.setKey("key1")
.setValue("value2".getBytes())
.setNamespace("__epoch_test")
.setEpoch(initialEpoch)
.setOverwrite(true));
assertEquals(1, result.records().size());
assertEquals(Errors.NONE.code(), result.response().errorCode());
long newEpoch = result.response().epoch();
assertTrue(newEpoch > initialEpoch);
replay(manager, result.records());
GetKVResponse readResp = manager.getKV(new GetKVRequest()
.setKey("key1")
.setNamespace("__epoch_test"));
assertEquals(newEpoch, readResp.epoch());
}
@Test
public void testDeleteWithEpochValidation() {
ControllerResult<PutKVResponse> putResult = manager.putKV(new PutKVRequest()
.setKey("key1")
.setValue("value1".getBytes())
.setNamespace("__epoch_test"));
long initialEpoch = putResult.response().epoch();
replay(manager, putResult.records());
ControllerResult<DeleteKVResponse> delResult = manager.deleteKV(new DeleteKVRequest()
.setKey("key1")
.setNamespace("__epoch_test")
.setEpoch(initialEpoch - 1));
assertEquals(0, delResult.records().size());
assertEquals(INVALID_KV_RECORD_EPOCH.code(), delResult.response().errorCode());
assertEquals(initialEpoch, delResult.response().epoch());
delResult = manager.deleteKV(new DeleteKVRequest()
.setKey("key1")
.setNamespace("__epoch_test")
.setEpoch(initialEpoch));
assertEquals(1, delResult.records().size());
assertEquals(Errors.NONE.code(), delResult.response().errorCode());
replay(manager, delResult.records());
GetKVResponse readResp = manager.getKV(new GetKVRequest()
.setKey("key1")
.setNamespace("__epoch_test"));
assertNull(readResp.value());
}
private void replay(KVControlManager manager, List<ApiMessageAndVersion> records) {
List<ApiMessage> messages = records.stream().map(x -> x.message())
.collect(Collectors.toList());

View File

@ -20,7 +20,9 @@
package com.automq.stream.api;
import com.automq.stream.api.KeyValue.Key;
import com.automq.stream.api.KeyValue.KeyAndNamespace;
import com.automq.stream.api.KeyValue.Value;
import com.automq.stream.api.KeyValue.ValueAndEpoch;
import java.util.concurrent.CompletableFuture;
@ -36,6 +38,14 @@ public interface KVClient {
*/
CompletableFuture<Value> putKVIfAbsent(KeyValue keyValue);
/**
* Put namespaced key value if key not exist, return current key value namespace epoch after putting.
*
* @param keyValue {@link KeyValue} k-v pair namespace and epoch
* @return async put result. {@link ValueAndEpoch} current value and epoch after putting.
*/
CompletableFuture<ValueAndEpoch> putNamespacedKVIfAbsent(KeyValue keyValue);
/**
* Put key value, overwrite if key exist, return current key value after putting.
*
@ -44,6 +54,14 @@ public interface KVClient {
*/
CompletableFuture<Value> putKV(KeyValue keyValue);
/**
* Put key value, overwrite if key exist, return current key value namespace epoch after putting.
*
* @param keyValue {@link KeyValue} k-v pair namespace and epoch
* @return async put result. {@link ValueAndEpoch} current value and epoch after putting.
*/
CompletableFuture<ValueAndEpoch> putNamespacedKV(KeyValue keyValue);
/**
* Get value by key.
*
@ -52,6 +70,14 @@ public interface KVClient {
*/
CompletableFuture<Value> getKV(Key key);
/**
* Get value by key.
*
* @param keyAndNamespace key and namespace.
* @return async get result. {@link ValueAndEpoch} value and epoch, null if key not exist.
*/
CompletableFuture<ValueAndEpoch> getNamespacedKV(KeyAndNamespace keyAndNamespace);
/**
* Delete key value by key. If key not exist, return null.
*
@ -59,4 +85,12 @@ public interface KVClient {
* @return async delete result. {@link Value} deleted value, null if key not exist.
*/
CompletableFuture<Value> delKV(Key key);
/**
* Delete key value by key. If key not exist, return null.
*
* @param keyValue k-v pair namespace and epoch
* @return async delete result. {@link ValueAndEpoch} deleted value and epoch, null if key not exist.
*/
CompletableFuture<ValueAndEpoch> delNamespacedKV(KeyValue keyValue);
}

View File

@ -25,16 +25,31 @@ import java.util.Objects;
public class KeyValue {
private final Key key;
private final Value value;
private final String namespace;
private final long epoch;
private KeyValue(Key key, Value value) {
this.key = key;
this.value = value;
this.namespace = null;
this.epoch = 0L;
}
public KeyValue(Key key, Value value, String namespace, long epoch) {
this.key = key;
this.value = value;
this.namespace = namespace;
this.epoch = epoch;
}
public static KeyValue of(String key, ByteBuffer value) {
return new KeyValue(Key.of(key), Value.of(value));
}
public static KeyValue of(String key, ByteBuffer value, String namespace, long epoch) {
return new KeyValue(Key.of(key), Value.of(value), namespace, epoch);
}
public Key key() {
return key;
}
@ -43,6 +58,14 @@ public class KeyValue {
return value;
}
public String namespace() {
return namespace;
}
public long epoch() {
return epoch;
}
@Override
public boolean equals(Object o) {
if (this == o)
@ -154,4 +177,52 @@ public class KeyValue {
'}';
}
}
public static class KeyAndNamespace {
private final Key key;
private final String namespace;
public KeyAndNamespace(Key key, String namespace) {
this.key = key;
this.namespace = namespace;
}
public Key key() {
return key;
}
public String namespace() {
return namespace;
}
public static KeyAndNamespace of(String key, String namespace) {
return new KeyAndNamespace(Key.of(key), namespace);
}
}
public static class ValueAndEpoch {
private final Value value;
private final long epoch;
public ValueAndEpoch(Value value, long epoch) {
this.value = value;
this.epoch = epoch;
}
public Value value() {
return value;
}
public long epoch() {
return epoch;
}
public static ValueAndEpoch of(byte[] value, long epoch) {
return new ValueAndEpoch(Value.of(value), epoch);
}
public static ValueAndEpoch of(ByteBuffer value, long epoch) {
return new ValueAndEpoch(Value.of(value), epoch);
}
}
}

View File

@ -0,0 +1,11 @@
package com.automq.stream.utils;
public class KVRecordUtils {
public static String buildCompositeKey(String namespace, String key) {
if (namespace == null || namespace.isEmpty()) {
return key;
}
return namespace + "/" + key;
}
}

View File

@ -31,10 +31,12 @@ public enum AutoMQVersion {
// Support object bucket index
// Support huge cluster
// Support node registration
V2((short) 3);
V2((short) 3),
// Support kv record namespace
V3((short) 4);
public static final String FEATURE_NAME = "automq.version";
public static final AutoMQVersion LATEST = V2;
public static final AutoMQVersion LATEST = V3;
private final short level;
private final Version s3streamVersion;
@ -125,6 +127,14 @@ public enum AutoMQVersion {
}
}
public short namespacedKVRecordVersion() {
if (isAtLeast(V3)) {
return 1;
} else {
return 0;
}
}
public Version s3streamVersion() {
return s3streamVersion;
}
@ -139,6 +149,7 @@ public enum AutoMQVersion {
case 2:
return Version.V0;
case 3:
case 4:
return Version.V1;
default:
throw new IllegalArgumentException("Unknown AutoMQVersion level: " + automqVersion);