Compare commits
8 Commits
main
...
kvrecord_e
| Author | SHA1 | Date |
|---|---|---|
|
|
546cf76696 | |
|
|
9013068388 | |
|
|
36a804bb1d | |
|
|
28045f41a7 | |
|
|
e709fb9cbd | |
|
|
ab077eca85 | |
|
|
9a3b426edd | |
|
|
0ef0bed34a |
|
|
@ -39,6 +39,7 @@ import org.apache.kafka.common.utils.Time;
|
|||
|
||||
import com.automq.shell.metrics.S3MetricsExporter;
|
||||
import com.automq.stream.api.KeyValue;
|
||||
import com.automq.stream.api.KeyValue.ValueAndEpoch;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
|
@ -95,6 +96,32 @@ public class ClientKVClient {
|
|||
throw code.exception();
|
||||
}
|
||||
|
||||
public ValueAndEpoch getKV(String key, String namespace) throws IOException {
|
||||
|
||||
if (LOGGER.isTraceEnabled()) {
|
||||
LOGGER.trace("[ClientKVClient]: Get KV: {} Namespace: {}", key, namespace);
|
||||
}
|
||||
|
||||
GetKVsRequestData data = new GetKVsRequestData()
|
||||
.setGetKeyRequests(List.of(new GetKVsRequestData.GetKVRequest().setKey(key).setNamespace(namespace)));
|
||||
|
||||
long now = Time.SYSTEM.milliseconds();
|
||||
ClientRequest clientRequest = networkClient.newClientRequest(String.valueOf(bootstrapServer.id()),
|
||||
new GetKVsRequest.Builder(data), now, true, 3000, null);
|
||||
|
||||
ClientResponse response = NetworkClientUtils.sendAndReceive(networkClient, clientRequest, Time.SYSTEM);
|
||||
GetKVsResponseData responseData = (GetKVsResponseData) response.responseBody().data();
|
||||
|
||||
Errors code = Errors.forCode(responseData.errorCode());
|
||||
if (Objects.requireNonNull(code) == Errors.NONE) {
|
||||
return ValueAndEpoch.of(
|
||||
responseData.getKVResponses().get(0).value(),
|
||||
responseData.getKVResponses().get(0).epoch());
|
||||
}
|
||||
|
||||
throw code.exception();
|
||||
}
|
||||
|
||||
public KeyValue.Value putKV(String key, byte[] value) throws IOException {
|
||||
long now = Time.SYSTEM.milliseconds();
|
||||
|
||||
|
|
@ -119,6 +146,32 @@ public class ClientKVClient {
|
|||
throw code.exception();
|
||||
}
|
||||
|
||||
public ValueAndEpoch putKV(String key, byte[] value, String namespace, long epoch) throws IOException {
|
||||
long now = Time.SYSTEM.milliseconds();
|
||||
|
||||
if (LOGGER.isTraceEnabled()) {
|
||||
LOGGER.trace("[ClientKVClient]: put KV: {}", key);
|
||||
}
|
||||
|
||||
PutKVsRequestData data = new PutKVsRequestData()
|
||||
.setPutKVRequests(List.of(new PutKVsRequestData.PutKVRequest().setKey(key).setValue(value).setNamespace(namespace).setEpoch(epoch)));
|
||||
|
||||
ClientRequest clientRequest = networkClient.newClientRequest(String.valueOf(bootstrapServer.id()),
|
||||
new PutKVsRequest.Builder(data), now, true, 3000, null);
|
||||
|
||||
ClientResponse response = NetworkClientUtils.sendAndReceive(networkClient, clientRequest, Time.SYSTEM);
|
||||
PutKVsResponseData responseData = (PutKVsResponseData) response.responseBody().data();
|
||||
|
||||
Errors code = Errors.forCode(responseData.errorCode());
|
||||
if (Objects.requireNonNull(code) == Errors.NONE) {
|
||||
return ValueAndEpoch.of(
|
||||
responseData.putKVResponses().get(0).value(),
|
||||
responseData.putKVResponses().get(0).epoch());
|
||||
}
|
||||
|
||||
throw code.exception();
|
||||
}
|
||||
|
||||
public KeyValue.Value deleteKV(String key) throws IOException {
|
||||
long now = Time.SYSTEM.milliseconds();
|
||||
|
||||
|
|
@ -142,4 +195,30 @@ public class ClientKVClient {
|
|||
|
||||
throw code.exception();
|
||||
}
|
||||
|
||||
public ValueAndEpoch deleteKV(String key, String namespace, long epoch) throws IOException {
|
||||
long now = Time.SYSTEM.milliseconds();
|
||||
|
||||
if (LOGGER.isTraceEnabled()) {
|
||||
LOGGER.trace("[ClientKVClient]: Delete KV: {}", key);
|
||||
}
|
||||
|
||||
DeleteKVsRequestData data = new DeleteKVsRequestData()
|
||||
.setDeleteKVRequests(List.of(new DeleteKVsRequestData.DeleteKVRequest().setKey(key).setNamespace(namespace).setEpoch(epoch)));
|
||||
|
||||
ClientRequest clientRequest = networkClient.newClientRequest(String.valueOf(bootstrapServer.id()),
|
||||
new DeleteKVsRequest.Builder(data), now, true, 3000, null);
|
||||
|
||||
ClientResponse response = NetworkClientUtils.sendAndReceive(networkClient, clientRequest, Time.SYSTEM);
|
||||
DeleteKVsResponseData responseData = (DeleteKVsResponseData) response.responseBody().data();
|
||||
|
||||
Errors code = Errors.forCode(responseData.errorCode());
|
||||
if (Objects.requireNonNull(code) == Errors.NONE) {
|
||||
return ValueAndEpoch.of(
|
||||
responseData.deleteKVResponses().get(0).value(),
|
||||
responseData.deleteKVResponses().get(0).epoch());
|
||||
}
|
||||
|
||||
throw code.exception();
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1739,6 +1739,48 @@ public interface Admin extends AutoCloseable {
|
|||
* @return {@link UpdateGroupResult}
|
||||
*/
|
||||
UpdateGroupResult updateGroup(String groupId, UpdateGroupSpec groupSpec, UpdateGroupOptions options);
|
||||
|
||||
GetNamespacedKVResult getNamespacedKV(
|
||||
Optional<Set<TopicPartition>> partitions,
|
||||
String namespace,
|
||||
String key,
|
||||
GetNamespacedKVOptions options
|
||||
);
|
||||
|
||||
/**
|
||||
* Put a key-value pair in the namespaced KV store.
|
||||
* @param partitions
|
||||
* @param namespace
|
||||
* @param key
|
||||
* @param value
|
||||
* @param options
|
||||
* @return
|
||||
*/
|
||||
PutNamespacedKVResult putNamespacedKV(
|
||||
Optional<Set<TopicPartition>> partitions,
|
||||
String namespace,
|
||||
String key,
|
||||
String value,
|
||||
PutNamespacedKVOptions options
|
||||
);
|
||||
|
||||
/**
|
||||
* Delete a key-value pair in the namespaced KV store.
|
||||
* @param partitions
|
||||
* @param namespace
|
||||
* @param key
|
||||
* @param options
|
||||
* @return
|
||||
*/
|
||||
DeleteNamespacedKVResult deleteNamespacedKV(
|
||||
Optional<Set<TopicPartition>> partitions,
|
||||
String namespace,
|
||||
String key,
|
||||
DeleteNamespacedKVOptions options
|
||||
);
|
||||
|
||||
|
||||
|
||||
// AutoMQ inject end
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -0,0 +1,15 @@
|
|||
package org.apache.kafka.clients.admin;
|
||||
|
||||
public class DeleteNamespacedKVOptions extends AbstractOptions<DeleteNamespacedKVOptions> {
|
||||
|
||||
private long ifMatchEpoch = 0L;
|
||||
|
||||
public DeleteNamespacedKVOptions ifMatchEpoch(long epoch) {
|
||||
this.ifMatchEpoch = epoch;
|
||||
return this;
|
||||
}
|
||||
|
||||
public long ifMatchEpoch() {
|
||||
return ifMatchEpoch;
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,19 @@
|
|||
package org.apache.kafka.clients.admin;
|
||||
|
||||
import org.apache.kafka.common.KafkaFuture;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
public class DeleteNamespacedKVResult extends AbstractOptions<DeleteNamespacedKVResult> {
|
||||
|
||||
private final Map<TopicPartition, KafkaFuture<Void>> futures;
|
||||
|
||||
public DeleteNamespacedKVResult(Map<TopicPartition, KafkaFuture<Void>> futures) {
|
||||
this.futures = futures;
|
||||
}
|
||||
|
||||
public KafkaFuture<Map<TopicPartition, KafkaFuture<Void>>> all() {
|
||||
return KafkaFuture.completedFuture(futures);
|
||||
}
|
||||
}
|
||||
|
|
@ -320,5 +320,20 @@ public class ForwardingAdmin implements Admin {
|
|||
return delegate.updateGroup(groupId, groupSpec, options);
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetNamespacedKVResult getNamespacedKV(Optional<Set<TopicPartition>> partitions, String namespace, String key, GetNamespacedKVOptions options) {
|
||||
return delegate.getNamespacedKV(partitions, namespace, key, options);
|
||||
}
|
||||
|
||||
@Override
|
||||
public PutNamespacedKVResult putNamespacedKV(Optional<Set<TopicPartition>> partitions, String namespace, String key, String value, PutNamespacedKVOptions options) {
|
||||
return delegate.putNamespacedKV(partitions, namespace, key, value, options);
|
||||
}
|
||||
|
||||
@Override
|
||||
public DeleteNamespacedKVResult deleteNamespacedKV(Optional<Set<TopicPartition>> partitions, String namespace, String key, DeleteNamespacedKVOptions options) {
|
||||
return delegate.deleteNamespacedKV(partitions, namespace, key, options);
|
||||
}
|
||||
|
||||
// AutoMQ inject end
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,4 @@
|
|||
package org.apache.kafka.clients.admin;
|
||||
|
||||
public class GetNamespacedKVOptions extends AbstractOptions<GetNamespacedKVOptions> {
|
||||
}
|
||||
|
|
@ -0,0 +1,21 @@
|
|||
package org.apache.kafka.clients.admin;
|
||||
|
||||
import org.apache.kafka.common.KafkaFuture;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.apache.kafka.common.message.GetKVsResponseData.GetKVResponse;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
|
||||
public class GetNamespacedKVResult {
|
||||
|
||||
private final Map<TopicPartition, KafkaFuture<GetKVResponse>> futures;
|
||||
|
||||
public GetNamespacedKVResult(Map<TopicPartition, KafkaFuture<GetKVResponse>> futures) {
|
||||
this.futures = futures;
|
||||
}
|
||||
|
||||
public KafkaFuture<Map<TopicPartition, KafkaFuture<GetKVResponse>>> all() throws ExecutionException, InterruptedException {
|
||||
return KafkaFuture.completedFuture(futures);
|
||||
}
|
||||
}
|
||||
|
|
@ -47,14 +47,19 @@ import org.apache.kafka.clients.admin.internals.AlterConsumerGroupOffsetsHandler
|
|||
import org.apache.kafka.clients.admin.internals.CoordinatorKey;
|
||||
import org.apache.kafka.clients.admin.internals.DeleteConsumerGroupOffsetsHandler;
|
||||
import org.apache.kafka.clients.admin.internals.DeleteConsumerGroupsHandler;
|
||||
import org.apache.kafka.clients.admin.internals.DeleteNamespacedKVHandler;
|
||||
import org.apache.kafka.clients.admin.internals.DeleteRecordsHandler;
|
||||
import org.apache.kafka.clients.admin.internals.DescribeConsumerGroupsHandler;
|
||||
import org.apache.kafka.clients.admin.internals.DescribeProducersHandler;
|
||||
import org.apache.kafka.clients.admin.internals.DescribeTransactionsHandler;
|
||||
import org.apache.kafka.clients.admin.internals.FenceProducersHandler;
|
||||
import org.apache.kafka.clients.admin.internals.GetNamespacedKVHandler;
|
||||
import org.apache.kafka.clients.admin.internals.ListConsumerGroupOffsetsHandler;
|
||||
import org.apache.kafka.clients.admin.internals.ListOffsetsHandler;
|
||||
import org.apache.kafka.clients.admin.internals.ListTransactionsHandler;
|
||||
import org.apache.kafka.clients.admin.internals.NamespacedKVRecordsToGet;
|
||||
import org.apache.kafka.clients.admin.internals.NamespacedKVRecordsToPut;
|
||||
import org.apache.kafka.clients.admin.internals.PutNamespacedKVHandler;
|
||||
import org.apache.kafka.clients.admin.internals.RemoveMembersFromConsumerGroupHandler;
|
||||
import org.apache.kafka.clients.admin.internals.UpdateGroupHandler;
|
||||
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
|
||||
|
|
@ -132,6 +137,7 @@ import org.apache.kafka.common.message.DeleteAclsRequestData.DeleteAclsFilter;
|
|||
import org.apache.kafka.common.message.DeleteAclsResponseData;
|
||||
import org.apache.kafka.common.message.DeleteAclsResponseData.DeleteAclsFilterResult;
|
||||
import org.apache.kafka.common.message.DeleteAclsResponseData.DeleteAclsMatchingAcl;
|
||||
import org.apache.kafka.common.message.DeleteKVsRequestData;
|
||||
import org.apache.kafka.common.message.DeleteTopicsRequestData;
|
||||
import org.apache.kafka.common.message.DeleteTopicsRequestData.DeleteTopicState;
|
||||
import org.apache.kafka.common.message.DeleteTopicsResponseData.DeletableTopicResult;
|
||||
|
|
@ -152,6 +158,8 @@ import org.apache.kafka.common.message.DescribeUserScramCredentialsRequestData;
|
|||
import org.apache.kafka.common.message.DescribeUserScramCredentialsRequestData.UserName;
|
||||
import org.apache.kafka.common.message.DescribeUserScramCredentialsResponseData;
|
||||
import org.apache.kafka.common.message.ExpireDelegationTokenRequestData;
|
||||
import org.apache.kafka.common.message.GetKVsRequestData;
|
||||
import org.apache.kafka.common.message.GetKVsResponseData;
|
||||
import org.apache.kafka.common.message.GetNextNodeIdRequestData;
|
||||
import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData;
|
||||
import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity;
|
||||
|
|
@ -160,6 +168,8 @@ import org.apache.kafka.common.message.ListGroupsRequestData;
|
|||
import org.apache.kafka.common.message.ListGroupsResponseData;
|
||||
import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData;
|
||||
import org.apache.kafka.common.message.MetadataRequestData;
|
||||
import org.apache.kafka.common.message.PutKVsRequestData;
|
||||
import org.apache.kafka.common.message.PutKVsResponseData;
|
||||
import org.apache.kafka.common.message.RemoveRaftVoterRequestData;
|
||||
import org.apache.kafka.common.message.RenewDelegationTokenRequestData;
|
||||
import org.apache.kafka.common.message.UnregisterBrokerRequestData;
|
||||
|
|
@ -266,6 +276,7 @@ import org.apache.kafka.common.utils.Utils;
|
|||
|
||||
import org.slf4j.Logger;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.security.InvalidKeyException;
|
||||
import java.security.NoSuchAlgorithmException;
|
||||
import java.time.Duration;
|
||||
|
|
@ -4866,6 +4877,86 @@ public class KafkaAdminClient extends AdminClient {
|
|||
return new UpdateGroupResult(future.get(CoordinatorKey.byGroupId(groupId)));
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetNamespacedKVResult getNamespacedKV(Optional<Set<TopicPartition>> partitions, String namespace, String key, GetNamespacedKVOptions options) {
|
||||
Set<TopicPartition> targetPartitions = partitions.orElseThrow(() ->
|
||||
new IllegalArgumentException("Partitions cannot be empty")
|
||||
);
|
||||
|
||||
NamespacedKVRecordsToGet.Builder recordsToGetBuilder = NamespacedKVRecordsToGet.newBuilder();
|
||||
for (TopicPartition tp : targetPartitions) {
|
||||
GetKVsRequestData.GetKVRequest kvRequest = new GetKVsRequestData.GetKVRequest()
|
||||
.setKey(key)
|
||||
.setNamespace(namespace);
|
||||
|
||||
recordsToGetBuilder.addRecord(tp, kvRequest);
|
||||
}
|
||||
|
||||
NamespacedKVRecordsToGet recordsToGet = recordsToGetBuilder.build();
|
||||
GetNamespacedKVHandler handler = new GetNamespacedKVHandler(logContext, recordsToGet);
|
||||
SimpleAdminApiFuture<TopicPartition, GetKVsResponseData.GetKVResponse> future = GetNamespacedKVHandler.newFuture(targetPartitions);
|
||||
|
||||
invokeDriver(handler, future, options.timeoutMs);
|
||||
|
||||
return new GetNamespacedKVResult(future.all());
|
||||
}
|
||||
|
||||
@Override
|
||||
public PutNamespacedKVResult putNamespacedKV(Optional<Set<TopicPartition>> partitions, String namespace, String key, String value, PutNamespacedKVOptions options) {
|
||||
Set<TopicPartition> targetPartitions = partitions.orElseThrow(() ->
|
||||
new IllegalArgumentException("Partitions cannot be empty")
|
||||
);
|
||||
|
||||
NamespacedKVRecordsToPut.Builder recordsToPutBuilder = NamespacedKVRecordsToPut.newBuilder();
|
||||
for (TopicPartition tp : targetPartitions) {
|
||||
PutKVsRequestData.PutKVRequest kvRequest = new PutKVsRequestData.PutKVRequest()
|
||||
.setKey(key)
|
||||
.setValue(value.getBytes(StandardCharsets.UTF_8))
|
||||
.setNamespace(namespace)
|
||||
.setOverwrite(options.overwrite())
|
||||
.setEpoch(options.ifMatchEpoch());
|
||||
|
||||
recordsToPutBuilder.addRecord(tp, kvRequest);
|
||||
}
|
||||
|
||||
NamespacedKVRecordsToPut recordsToPut = recordsToPutBuilder.build();
|
||||
|
||||
PutNamespacedKVHandler handler = new PutNamespacedKVHandler(logContext, recordsToPut);
|
||||
SimpleAdminApiFuture<TopicPartition, PutKVsResponseData.PutKVResponse> future = PutNamespacedKVHandler.newFuture(targetPartitions);
|
||||
|
||||
invokeDriver(handler, future, options.timeoutMs);
|
||||
|
||||
return new PutNamespacedKVResult(future.all());
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public DeleteNamespacedKVResult deleteNamespacedKV(Optional<Set<TopicPartition>> partitions, String namespace, String key, DeleteNamespacedKVOptions options) {
|
||||
|
||||
Set<TopicPartition> targetPartitions = partitions.orElseThrow(() ->
|
||||
new IllegalArgumentException("Partitions cannot be empty")
|
||||
);
|
||||
|
||||
NamespacedKVRecordsToDelete.Builder recordsToDeleteBuilder = NamespacedKVRecordsToDelete.newBuilder();
|
||||
for (TopicPartition tp : targetPartitions) {
|
||||
DeleteKVsRequestData.DeleteKVRequest kvRequest = new DeleteKVsRequestData.DeleteKVRequest()
|
||||
.setKey(key)
|
||||
.setNamespace(namespace)
|
||||
.setEpoch(options.ifMatchEpoch());
|
||||
|
||||
recordsToDeleteBuilder.addRecord(tp, kvRequest);
|
||||
}
|
||||
|
||||
NamespacedKVRecordsToDelete recordsToDelete = recordsToDeleteBuilder.build();
|
||||
|
||||
DeleteNamespacedKVHandler handler = new DeleteNamespacedKVHandler(logContext, recordsToDelete);
|
||||
SimpleAdminApiFuture<TopicPartition, Void> future = DeleteNamespacedKVHandler.newFuture(targetPartitions);
|
||||
|
||||
invokeDriver(handler, future, options.timeoutMs);
|
||||
|
||||
return new DeleteNamespacedKVResult(future.all());
|
||||
}
|
||||
|
||||
private <K, V> void invokeDriver(
|
||||
AdminApiHandler<K, V> handler,
|
||||
AdminApiFuture<K, V> future,
|
||||
|
|
|
|||
|
|
@ -0,0 +1,40 @@
|
|||
package org.apache.kafka.clients.admin;
|
||||
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.apache.kafka.common.message.DeleteKVsRequestData.DeleteKVRequest;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class NamespacedKVRecordsToDelete {
|
||||
|
||||
private final Map<TopicPartition, List<DeleteKVRequest>> recordsByPartition;
|
||||
|
||||
public NamespacedKVRecordsToDelete(Map<TopicPartition, List<DeleteKVRequest>> recordsByPartition) {
|
||||
this.recordsByPartition = recordsByPartition;
|
||||
}
|
||||
|
||||
public static NamespacedKVRecordsToDelete.Builder newBuilder() {
|
||||
return new NamespacedKVRecordsToDelete.Builder();
|
||||
}
|
||||
|
||||
public Map<TopicPartition, List<DeleteKVRequest>> recordsByPartition() {
|
||||
return recordsByPartition;
|
||||
}
|
||||
|
||||
public static class Builder {
|
||||
private final Map<TopicPartition, List<DeleteKVRequest>> records = new HashMap<>();
|
||||
|
||||
public NamespacedKVRecordsToDelete.Builder addRecord(TopicPartition partition, DeleteKVRequest request) {
|
||||
records.computeIfAbsent(partition, k -> new ArrayList<>()).add(request);
|
||||
return this;
|
||||
}
|
||||
|
||||
public NamespacedKVRecordsToDelete build() {
|
||||
return new NamespacedKVRecordsToDelete(records);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,24 @@
|
|||
package org.apache.kafka.clients.admin;
|
||||
|
||||
public class PutNamespacedKVOptions extends AbstractOptions<PutNamespacedKVOptions> {
|
||||
|
||||
private boolean overwrite = false;
|
||||
private long ifMatchEpoch = 0L;
|
||||
|
||||
public PutNamespacedKVOptions overwrite(boolean overwrite) {
|
||||
this.overwrite = overwrite;
|
||||
return this;
|
||||
}
|
||||
|
||||
public PutNamespacedKVOptions ifMatchEpoch(long epoch) {
|
||||
this.ifMatchEpoch = epoch;
|
||||
return this;
|
||||
}
|
||||
|
||||
public boolean overwrite() {
|
||||
return overwrite;
|
||||
}
|
||||
public long ifMatchEpoch() {
|
||||
return ifMatchEpoch;
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,23 @@
|
|||
package org.apache.kafka.clients.admin;
|
||||
|
||||
import org.apache.kafka.common.KafkaFuture;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.apache.kafka.common.message.PutKVsResponseData.PutKVResponse;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
|
||||
public class PutNamespacedKVResult {
|
||||
|
||||
private final Map<TopicPartition, KafkaFuture<PutKVResponse>> futures;
|
||||
|
||||
public PutNamespacedKVResult(Map<TopicPartition, KafkaFuture<PutKVResponse>> futures) {
|
||||
this.futures = futures;
|
||||
}
|
||||
|
||||
|
||||
public KafkaFuture<Map<TopicPartition, KafkaFuture<PutKVResponse>>> all() {
|
||||
return KafkaFuture.completedFuture(futures);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,89 @@
|
|||
package org.apache.kafka.clients.admin.internals;
|
||||
|
||||
import org.apache.kafka.clients.admin.NamespacedKVRecordsToDelete;
|
||||
import org.apache.kafka.common.Node;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.apache.kafka.common.message.DeleteKVsRequestData;
|
||||
import org.apache.kafka.common.message.DeleteKVsRequestData.DeleteKVRequest;
|
||||
import org.apache.kafka.common.message.DeleteKVsResponseData;
|
||||
import org.apache.kafka.common.protocol.Errors;
|
||||
import org.apache.kafka.common.requests.AbstractRequest;
|
||||
import org.apache.kafka.common.requests.AbstractResponse;
|
||||
import org.apache.kafka.common.requests.s3.DeleteKVsRequest;
|
||||
import org.apache.kafka.common.requests.s3.DeleteKVsResponse;
|
||||
import org.apache.kafka.common.utils.LogContext;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
public class DeleteNamespacedKVHandler extends AdminApiHandler.Batched<TopicPartition, Void> {
|
||||
|
||||
private final Logger logger;
|
||||
private final NamespacedKVRecordsToDelete recordsToDelete;
|
||||
private final AdminApiLookupStrategy<TopicPartition> lookupStrategy;
|
||||
|
||||
public DeleteNamespacedKVHandler(LogContext logContext, NamespacedKVRecordsToDelete recordsToDelete) {
|
||||
this.logger = logContext.logger(PutNamespacedKVHandler.class);
|
||||
this.recordsToDelete = recordsToDelete;
|
||||
this.lookupStrategy = new PartitionLeaderStrategy(logContext);
|
||||
}
|
||||
|
||||
@Override
|
||||
AbstractRequest.Builder<?> buildBatchedRequest(int brokerId, Set<TopicPartition> partitions) {
|
||||
Map<TopicPartition, List<DeleteKVRequest>> filteredRecords = new HashMap<>();
|
||||
for (TopicPartition partition : partitions) {
|
||||
if (recordsToDelete.recordsByPartition().containsKey(partition)) {
|
||||
filteredRecords.put(partition, recordsToDelete.recordsByPartition().get(partition));
|
||||
}
|
||||
}
|
||||
|
||||
DeleteKVsRequestData requestData = new DeleteKVsRequestData();
|
||||
List<DeleteKVRequest> allRequests = new ArrayList<>();
|
||||
filteredRecords.values().forEach(allRequests::addAll);
|
||||
requestData.setDeleteKVRequests(allRequests);
|
||||
|
||||
return new DeleteKVsRequest.Builder(requestData);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String apiName() {
|
||||
return "DeleteKVs";
|
||||
}
|
||||
|
||||
@Override
|
||||
public ApiResult<TopicPartition, Void> handleResponse(Node broker, Set<TopicPartition> partitions, AbstractResponse response) {
|
||||
DeleteKVsResponse deleteResponse = (DeleteKVsResponse) response;
|
||||
DeleteKVsResponseData responseData = deleteResponse.data();
|
||||
final Map<TopicPartition, Void> completed = new HashMap<>();
|
||||
final Map<TopicPartition, Throwable> failed = new HashMap<>();
|
||||
|
||||
partitions.forEach(partition -> {
|
||||
Errors error = Errors.forCode(responseData.errorCode());
|
||||
if (error != Errors.NONE) {
|
||||
failed.put(partition, error.exception());
|
||||
} else {
|
||||
completed.put(partition, null);
|
||||
}
|
||||
});
|
||||
|
||||
return new ApiResult<>(completed, failed, Collections.emptyList());
|
||||
}
|
||||
|
||||
@Override
|
||||
public AdminApiLookupStrategy<TopicPartition> lookupStrategy() {
|
||||
return this.lookupStrategy;
|
||||
}
|
||||
|
||||
public static AdminApiFuture.SimpleAdminApiFuture<TopicPartition, Void> newFuture(
|
||||
Set<TopicPartition> partitions
|
||||
) {
|
||||
return AdminApiFuture.forKeys(new HashSet<>(partitions));
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,95 @@
|
|||
package org.apache.kafka.clients.admin.internals;
|
||||
|
||||
import org.apache.kafka.common.Node;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.apache.kafka.common.message.GetKVsRequestData;
|
||||
import org.apache.kafka.common.message.GetKVsResponseData;
|
||||
import org.apache.kafka.common.message.GetKVsResponseData.GetKVResponse;
|
||||
import org.apache.kafka.common.protocol.Errors;
|
||||
import org.apache.kafka.common.requests.AbstractRequest;
|
||||
import org.apache.kafka.common.requests.AbstractResponse;
|
||||
import org.apache.kafka.common.requests.s3.GetKVsRequest;
|
||||
import org.apache.kafka.common.requests.s3.GetKVsResponse;
|
||||
import org.apache.kafka.common.utils.LogContext;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
public class GetNamespacedKVHandler extends AdminApiHandler.Batched<TopicPartition, GetKVResponse> {
|
||||
private final Logger logger;
|
||||
private final NamespacedKVRecordsToGet recordsToGet;
|
||||
private final AdminApiLookupStrategy<TopicPartition> lookupStrategy;
|
||||
private final List<TopicPartition> orderedPartitions;
|
||||
|
||||
public GetNamespacedKVHandler(LogContext logContext, NamespacedKVRecordsToGet recordsToGet) {
|
||||
this.logger = logContext.logger(PutNamespacedKVHandler.class);
|
||||
this.recordsToGet = recordsToGet;
|
||||
this.lookupStrategy = new PartitionLeaderStrategy(logContext);
|
||||
this.orderedPartitions = new ArrayList<>(recordsToGet.recordsByPartition().keySet());
|
||||
}
|
||||
|
||||
@Override
|
||||
AbstractRequest.Builder<?> buildBatchedRequest(int brokerId, Set<TopicPartition> partitions) {
|
||||
|
||||
GetKVsRequestData requestData = new GetKVsRequestData();
|
||||
for (TopicPartition tp : orderedPartitions) {
|
||||
if (partitions.contains(tp)) {
|
||||
requestData.getKeyRequests().addAll(
|
||||
recordsToGet.recordsByPartition().get(tp)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
return new GetKVsRequest.Builder(requestData);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String apiName() {
|
||||
return "GetKVs";
|
||||
}
|
||||
|
||||
@Override
|
||||
public ApiResult<TopicPartition, GetKVResponse> handleResponse(Node broker, Set<TopicPartition> partitions, AbstractResponse response) {
|
||||
|
||||
GetKVsResponseData data = ((GetKVsResponse) response).data();
|
||||
final Map<TopicPartition, GetKVResponse> completed = new LinkedHashMap<>();
|
||||
final Map<TopicPartition, Throwable> failed = new HashMap<>();
|
||||
List<GetKVResponse> responses = data.getKVResponses();
|
||||
int responseIndex = 0;
|
||||
for (TopicPartition tp : orderedPartitions) {
|
||||
if (!partitions.contains(tp)) {
|
||||
continue;
|
||||
}
|
||||
if (responseIndex >= responses.size()) {
|
||||
failed.put(tp, new IllegalStateException("Missing response for partition"));
|
||||
continue;
|
||||
}
|
||||
GetKVResponse resp = responses.get(responseIndex++);
|
||||
if (resp.errorCode() == Errors.NONE.code()) {
|
||||
completed.put(tp, resp);
|
||||
} else {
|
||||
failed.put(tp, Errors.forCode(resp.errorCode()).exception());
|
||||
}
|
||||
}
|
||||
return new ApiResult<>(completed, failed, Collections.emptyList());
|
||||
}
|
||||
|
||||
@Override
|
||||
public AdminApiLookupStrategy<TopicPartition> lookupStrategy() {
|
||||
return this.lookupStrategy;
|
||||
}
|
||||
|
||||
public static AdminApiFuture.SimpleAdminApiFuture<TopicPartition, GetKVResponse> newFuture(
|
||||
Set<TopicPartition> partitions
|
||||
) {
|
||||
return AdminApiFuture.forKeys(new HashSet<>(partitions));
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,38 @@
|
|||
package org.apache.kafka.clients.admin.internals;
|
||||
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.apache.kafka.common.message.GetKVsRequestData.GetKVRequest;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class NamespacedKVRecordsToGet {
|
||||
|
||||
private final Map<TopicPartition, List<GetKVRequest>> recordsByPartition;
|
||||
|
||||
public NamespacedKVRecordsToGet(Map<TopicPartition, List<GetKVRequest>> recordsByPartition) {
|
||||
this.recordsByPartition = recordsByPartition;
|
||||
}
|
||||
|
||||
public static NamespacedKVRecordsToGet.Builder newBuilder() {
|
||||
return new NamespacedKVRecordsToGet.Builder();
|
||||
}
|
||||
|
||||
public static class Builder {
|
||||
private final Map<TopicPartition, List<GetKVRequest>> records = new HashMap<>();
|
||||
public Builder addRecord(TopicPartition tp, GetKVRequest req) {
|
||||
records.computeIfAbsent(tp, k -> new ArrayList<>()).add(req);
|
||||
return this;
|
||||
}
|
||||
|
||||
public NamespacedKVRecordsToGet build() {
|
||||
return new NamespacedKVRecordsToGet(records);
|
||||
}
|
||||
}
|
||||
|
||||
public Map<TopicPartition, List<GetKVRequest>> recordsByPartition() {
|
||||
return recordsByPartition;
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,39 @@
|
|||
package org.apache.kafka.clients.admin.internals;
|
||||
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.apache.kafka.common.message.PutKVsRequestData.PutKVRequest;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class NamespacedKVRecordsToPut {
|
||||
|
||||
private final Map<TopicPartition, List<PutKVRequest>> recordsByPartition;
|
||||
|
||||
private NamespacedKVRecordsToPut(Map<TopicPartition, List<PutKVRequest>> recordsByPartition) {
|
||||
this.recordsByPartition = recordsByPartition;
|
||||
}
|
||||
|
||||
public static Builder newBuilder() {
|
||||
return new Builder();
|
||||
}
|
||||
|
||||
public Map<TopicPartition, List<PutKVRequest>> recordsByPartition() {
|
||||
return recordsByPartition;
|
||||
}
|
||||
|
||||
public static class Builder {
|
||||
private final Map<TopicPartition, List<PutKVRequest>> records = new HashMap<>();
|
||||
|
||||
public Builder addRecord(TopicPartition partition, PutKVRequest request) {
|
||||
records.computeIfAbsent(partition, k -> new ArrayList<>()).add(request);
|
||||
return this;
|
||||
}
|
||||
|
||||
public NamespacedKVRecordsToPut build() {
|
||||
return new NamespacedKVRecordsToPut(records);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,96 @@
|
|||
package org.apache.kafka.clients.admin.internals;
|
||||
|
||||
import org.apache.kafka.common.Node;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.apache.kafka.common.message.PutKVsRequestData;
|
||||
import org.apache.kafka.common.message.PutKVsRequestData.PutKVRequest;
|
||||
import org.apache.kafka.common.message.PutKVsResponseData;
|
||||
import org.apache.kafka.common.message.PutKVsResponseData.PutKVResponse;
|
||||
import org.apache.kafka.common.protocol.Errors;
|
||||
import org.apache.kafka.common.requests.AbstractRequest;
|
||||
import org.apache.kafka.common.requests.AbstractResponse;
|
||||
import org.apache.kafka.common.requests.s3.PutKVsRequest;
|
||||
import org.apache.kafka.common.requests.s3.PutKVsResponse;
|
||||
import org.apache.kafka.common.utils.LogContext;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class PutNamespacedKVHandler extends AdminApiHandler.Batched<TopicPartition, PutKVResponse> {
|
||||
private final Logger logger;
|
||||
private final NamespacedKVRecordsToPut recordsToPut;
|
||||
private final AdminApiLookupStrategy<TopicPartition> lookupStrategy;
|
||||
private final List<TopicPartition> orderedPartitions;
|
||||
|
||||
public PutNamespacedKVHandler(LogContext logContext, NamespacedKVRecordsToPut recordsToPut) {
|
||||
this.logger = logContext.logger(PutNamespacedKVHandler.class);
|
||||
this.recordsToPut = recordsToPut;
|
||||
this.lookupStrategy = new PartitionLeaderStrategy(logContext);
|
||||
this.orderedPartitions = new ArrayList<>(recordsToPut.recordsByPartition().keySet());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected AbstractRequest.Builder<?> buildBatchedRequest(int brokerId, Set<TopicPartition> partitions) {
|
||||
PutKVsRequestData requestData = new PutKVsRequestData();
|
||||
List<PutKVRequest> allPutRequests = orderedPartitions.stream()
|
||||
.filter(partitions::contains)
|
||||
.map(tp -> recordsToPut.recordsByPartition().get(tp))
|
||||
.filter(Objects::nonNull)
|
||||
.flatMap(Collection::stream).collect(Collectors.toList());
|
||||
|
||||
requestData.setPutKVRequests(allPutRequests);
|
||||
return new PutKVsRequest.Builder(requestData);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String apiName() {
|
||||
return "PutKVs";
|
||||
}
|
||||
|
||||
@Override
|
||||
public ApiResult<TopicPartition, PutKVResponse> handleResponse(Node broker, Set<TopicPartition> partitions, AbstractResponse response) {
|
||||
PutKVsResponseData responseData = ((PutKVsResponse) response).data();
|
||||
List<PutKVResponse> responses = responseData.putKVResponses();
|
||||
final Map<TopicPartition, PutKVResponse> completed = new LinkedHashMap<>();
|
||||
final Map<TopicPartition, Throwable> failed = new HashMap<>();
|
||||
int responseIndex = 0;
|
||||
for (TopicPartition tp : orderedPartitions) {
|
||||
if (!partitions.contains(tp)) {
|
||||
continue;
|
||||
}
|
||||
if (responseIndex >= responses.size()) {
|
||||
failed.put(tp, new IllegalStateException("Missing response for partition"));
|
||||
continue;
|
||||
}
|
||||
PutKVResponse resp = responses.get(responseIndex++);
|
||||
if (resp.errorCode() == Errors.NONE.code()) {
|
||||
completed.put(tp, resp);
|
||||
} else {
|
||||
failed.put(tp, Errors.forCode(resp.errorCode()).exception());
|
||||
}
|
||||
}
|
||||
return new ApiResult<>(completed, failed, Collections.emptyList());
|
||||
}
|
||||
|
||||
@Override
|
||||
public AdminApiLookupStrategy<TopicPartition> lookupStrategy() {
|
||||
return this.lookupStrategy;
|
||||
}
|
||||
|
||||
public static AdminApiFuture.SimpleAdminApiFuture<TopicPartition, PutKVResponse> newFuture(
|
||||
Set<TopicPartition> partitions
|
||||
) {
|
||||
return AdminApiFuture.forKeys(new HashSet<>(partitions));
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,8 @@
|
|||
package org.apache.kafka.common.errors;
|
||||
|
||||
public class InvalidKVRecordEpochException extends ApiException {
|
||||
|
||||
public InvalidKVRecordEpochException(String message) {
|
||||
super(message);
|
||||
}
|
||||
}
|
||||
|
|
@ -61,6 +61,7 @@ import org.apache.kafka.common.errors.InvalidConfigurationException;
|
|||
import org.apache.kafka.common.errors.InvalidFetchSessionEpochException;
|
||||
import org.apache.kafka.common.errors.InvalidFetchSizeException;
|
||||
import org.apache.kafka.common.errors.InvalidGroupIdException;
|
||||
import org.apache.kafka.common.errors.InvalidKVRecordEpochException;
|
||||
import org.apache.kafka.common.errors.InvalidPartitionsException;
|
||||
import org.apache.kafka.common.errors.InvalidPidMappingException;
|
||||
import org.apache.kafka.common.errors.InvalidPrincipalTypeException;
|
||||
|
|
@ -440,6 +441,7 @@ public enum Errors {
|
|||
NODE_LOCKED(515, "The node is locked", NodeLockedException::new),
|
||||
OBJECT_NOT_COMMITED(516, "The object is not commited.", ObjectNotCommittedException::new),
|
||||
STREAM_INNER_ERROR(599, "The stream inner error.", StreamInnerErrorException::new),
|
||||
INVALID_KV_RECORD_EPOCH(600, "The KV record epoch is invalid.", InvalidKVRecordEpochException::new),
|
||||
// AutoMQ inject end
|
||||
|
||||
INVALID_RECORD_STATE(121, "The record state is invalid. The acknowledgement of delivery could not be completed.", InvalidRecordStateException::new),
|
||||
|
|
|
|||
|
|
@ -21,7 +21,7 @@
|
|||
"broker"
|
||||
],
|
||||
"name": "DeleteKVsRequest",
|
||||
"validVersions": "0",
|
||||
"validVersions": "0-1",
|
||||
"flexibleVersions": "0+",
|
||||
"fields": [
|
||||
{
|
||||
|
|
@ -35,8 +35,20 @@
|
|||
"type": "string",
|
||||
"versions": "0+",
|
||||
"about": "Key is the key to delete"
|
||||
},
|
||||
{
|
||||
"name": "Namespace",
|
||||
"type": "string",
|
||||
"versions": "1+",
|
||||
"about": "Namespace"
|
||||
},
|
||||
{
|
||||
"name": "Epoch",
|
||||
"type": "int64",
|
||||
"versions": "1+",
|
||||
"about": "Epoch"
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -17,7 +17,7 @@
|
|||
"apiKey": 511,
|
||||
"type": "response",
|
||||
"name": "DeleteKVsResponse",
|
||||
"validVersions": "0",
|
||||
"validVersions": "0-1",
|
||||
"flexibleVersions": "0+",
|
||||
"fields": [
|
||||
{
|
||||
|
|
@ -50,8 +50,15 @@
|
|||
"versions": "0+",
|
||||
"nullableVersions": "0+",
|
||||
"about": "Value"
|
||||
},
|
||||
{
|
||||
"name": "Epoch",
|
||||
"type": "int64",
|
||||
"versions": "1+",
|
||||
"about": "Epoch"
|
||||
}
|
||||
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -21,7 +21,7 @@
|
|||
"broker"
|
||||
],
|
||||
"name": "GetKVsRequest",
|
||||
"validVersions": "0",
|
||||
"validVersions": "0-1",
|
||||
"flexibleVersions": "0+",
|
||||
"fields": [
|
||||
{
|
||||
|
|
@ -35,8 +35,14 @@
|
|||
"type": "string",
|
||||
"versions": "0+",
|
||||
"about": "Key is the key to get"
|
||||
},
|
||||
{
|
||||
"name": "Namespace",
|
||||
"type": "string",
|
||||
"versions": "1+",
|
||||
"about": "Namespace"
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -17,7 +17,7 @@
|
|||
"apiKey": 509,
|
||||
"type": "response",
|
||||
"name": "GetKVsResponse",
|
||||
"validVersions": "0",
|
||||
"validVersions": "0-1",
|
||||
"flexibleVersions": "0+",
|
||||
"fields": [
|
||||
{
|
||||
|
|
@ -50,8 +50,20 @@
|
|||
"versions": "0+",
|
||||
"nullableVersions": "0+",
|
||||
"about": "Value"
|
||||
},
|
||||
{
|
||||
"name": "Namespace",
|
||||
"type": "string",
|
||||
"versions": "1+",
|
||||
"about": "Namespace"
|
||||
},
|
||||
{
|
||||
"name": "Epoch",
|
||||
"type": "int64",
|
||||
"versions": "1+",
|
||||
"about": "Epoch"
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -21,7 +21,7 @@
|
|||
"broker"
|
||||
],
|
||||
"name": "PutKVsRequest",
|
||||
"validVersions": "0",
|
||||
"validVersions": "0-1",
|
||||
"flexibleVersions": "0+",
|
||||
"fields": [
|
||||
{
|
||||
|
|
@ -47,8 +47,20 @@
|
|||
"type": "bool",
|
||||
"versions": "0+",
|
||||
"about": "overwrite put kv"
|
||||
},
|
||||
{
|
||||
"name": "Namespace",
|
||||
"type": "string",
|
||||
"versions": "1+",
|
||||
"about": "Namespace"
|
||||
},
|
||||
{
|
||||
"name": "Epoch",
|
||||
"type": "int64",
|
||||
"versions": "1+",
|
||||
"about": "Epoch"
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -17,7 +17,7 @@
|
|||
"apiKey": 510,
|
||||
"type": "response",
|
||||
"name": "PutKVsResponse",
|
||||
"validVersions": "0",
|
||||
"validVersions": "0-1",
|
||||
"flexibleVersions": "0+",
|
||||
"fields": [
|
||||
{
|
||||
|
|
@ -49,8 +49,14 @@
|
|||
"type": "bytes",
|
||||
"versions": "0+",
|
||||
"about": "Value"
|
||||
},
|
||||
{
|
||||
"name": "Epoch",
|
||||
"type": "int64",
|
||||
"versions": "1+",
|
||||
"about": "Epoch"
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -116,6 +116,7 @@ import org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult;
|
|||
import org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult;
|
||||
import org.apache.kafka.common.message.FindCoordinatorRequestData;
|
||||
import org.apache.kafka.common.message.FindCoordinatorResponseData;
|
||||
import org.apache.kafka.common.message.GetKVsResponseData;
|
||||
import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData;
|
||||
import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData;
|
||||
import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.AlterConfigsResourceResponse;
|
||||
|
|
@ -141,6 +142,7 @@ import org.apache.kafka.common.message.OffsetDeleteResponseData.OffsetDeleteResp
|
|||
import org.apache.kafka.common.message.OffsetFetchRequestData;
|
||||
import org.apache.kafka.common.message.OffsetFetchRequestData.OffsetFetchRequestGroup;
|
||||
import org.apache.kafka.common.message.OffsetFetchRequestData.OffsetFetchRequestTopics;
|
||||
import org.apache.kafka.common.message.PutKVsResponseData;
|
||||
import org.apache.kafka.common.message.UnregisterBrokerResponseData;
|
||||
import org.apache.kafka.common.message.WriteTxnMarkersResponseData;
|
||||
import org.apache.kafka.common.protocol.ApiKeys;
|
||||
|
|
@ -6948,7 +6950,7 @@ public class KafkaAdminClientTest {
|
|||
assertNotNull(result.descriptions().get(1).get());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testDescribeReplicaLogDirsWithNonExistReplica() throws Exception {
|
||||
int brokerId = 0;
|
||||
|
|
@ -6967,7 +6969,7 @@ public class KafkaAdminClientTest {
|
|||
|
||||
DescribeReplicaLogDirsResult result = env.adminClient().describeReplicaLogDirs(asList(tpr1, tpr2));
|
||||
Map<TopicPartitionReplica, KafkaFuture<DescribeReplicaLogDirsResult.ReplicaLogDirInfo>> values = result.values();
|
||||
|
||||
|
||||
assertEquals(logDir, values.get(tpr1).get().getCurrentReplicaLogDir());
|
||||
assertNull(values.get(tpr1).get().getFutureReplicaLogDir());
|
||||
assertEquals(offsetLag, values.get(tpr1).get().getCurrentReplicaOffsetLag());
|
||||
|
|
|
|||
|
|
@ -1450,6 +1450,21 @@ public class MockAdminClient extends AdminClient {
|
|||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetNamespacedKVResult getNamespacedKV(Optional<Set<TopicPartition>> partitions, String namespace, String key, GetNamespacedKVOptions options) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public PutNamespacedKVResult putNamespacedKV(Optional<Set<TopicPartition>> partitions, String namespace, String key, String value, PutNamespacedKVOptions options) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public DeleteNamespacedKVResult deleteNamespacedKV(Optional<Set<TopicPartition>> partitions, String namespace, String key, DeleteNamespacedKVOptions options) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
// AutoMQ inject end
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -44,7 +44,9 @@ import org.apache.kafka.common.requests.s3.PutKVsRequest;
|
|||
import com.automq.stream.api.KVClient;
|
||||
import com.automq.stream.api.KeyValue;
|
||||
import com.automq.stream.api.KeyValue.Key;
|
||||
import com.automq.stream.api.KeyValue.KeyAndNamespace;
|
||||
import com.automq.stream.api.KeyValue.Value;
|
||||
import com.automq.stream.api.KeyValue.ValueAndEpoch;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
|
@ -247,4 +249,196 @@ public class ControllerKVClient implements KVClient {
|
|||
this.requestSender.send(task);
|
||||
return future;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<ValueAndEpoch> putNamespacedKVIfAbsent(KeyValue keyValue) {
|
||||
if (LOGGER.isTraceEnabled()) {
|
||||
LOGGER.trace("[ControllerKVClient]: Put Namespaced KV if absent: {}", keyValue);
|
||||
}
|
||||
PutKVRequest request = new PutKVRequest()
|
||||
.setKey(keyValue.key().get())
|
||||
.setValue(keyValue.value().get().array())
|
||||
.setNamespace(keyValue.namespace())
|
||||
.setEpoch(keyValue.epoch());
|
||||
WrapRequest req = new BatchRequest() {
|
||||
@Override
|
||||
public Builder addSubRequest(Builder builder) {
|
||||
PutKVsRequest.Builder realBuilder = (PutKVsRequest.Builder) builder;
|
||||
return realBuilder.addSubRequest(request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ApiKeys apiKey() {
|
||||
return ApiKeys.PUT_KVS;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Builder toRequestBuilder() {
|
||||
return new PutKVsRequest.Builder(
|
||||
new PutKVsRequestData()
|
||||
).addSubRequest(request);
|
||||
}
|
||||
};
|
||||
CompletableFuture<ValueAndEpoch> future = new CompletableFuture<>();
|
||||
RequestTask<PutKVResponse, ValueAndEpoch> task = new RequestTask<PutKVResponse, ValueAndEpoch>(req, future, response -> {
|
||||
Errors code = Errors.forCode(response.errorCode());
|
||||
switch (code) {
|
||||
case NONE:
|
||||
if (LOGGER.isTraceEnabled()) {
|
||||
LOGGER.trace("[ControllerKVClient]: Put Namespaced KV if absent: {}, result: {}", keyValue, response);
|
||||
}
|
||||
return ResponseHandleResult.withSuccess(ValueAndEpoch.of(response.value(), response.epoch()));
|
||||
case KEY_EXIST:
|
||||
LOGGER.warn("[ControllerKVClient]: Failed to Put Namespaced KV if absent: {}, code: {}, key already exist", keyValue, code);
|
||||
return ResponseHandleResult.withSuccess(ValueAndEpoch.of(response.value(), response.epoch()));
|
||||
default:
|
||||
LOGGER.error("[ControllerKVClient]: Failed to Put Namespaced KV if absent: {}, code: {}, retry later", keyValue, code);
|
||||
return ResponseHandleResult.withRetry();
|
||||
}
|
||||
});
|
||||
this.requestSender.send(task);
|
||||
return future;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<ValueAndEpoch> putNamespacedKV(KeyValue keyValue) {
|
||||
if (LOGGER.isTraceEnabled()) {
|
||||
LOGGER.trace("[ControllerKVClient]: Put Namespaced KV: {}", keyValue);
|
||||
}
|
||||
PutKVRequest request = new PutKVRequest()
|
||||
.setKey(keyValue.key().get())
|
||||
.setValue(keyValue.value().get().array())
|
||||
.setNamespace(keyValue.namespace())
|
||||
.setEpoch(keyValue.epoch())
|
||||
.setOverwrite(true);
|
||||
WrapRequest req = new BatchRequest() {
|
||||
@Override
|
||||
public Builder addSubRequest(Builder builder) {
|
||||
PutKVsRequest.Builder realBuilder = (PutKVsRequest.Builder) builder;
|
||||
return realBuilder.addSubRequest(request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ApiKeys apiKey() {
|
||||
return ApiKeys.PUT_KVS;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Builder toRequestBuilder() {
|
||||
return new PutKVsRequest.Builder(
|
||||
new PutKVsRequestData()
|
||||
).addSubRequest(request);
|
||||
}
|
||||
};
|
||||
CompletableFuture<ValueAndEpoch> future = new CompletableFuture<>();
|
||||
RequestTask<PutKVResponse, ValueAndEpoch> task = new RequestTask<PutKVResponse, ValueAndEpoch>(req, future, response -> {
|
||||
Errors code = Errors.forCode(response.errorCode());
|
||||
switch (code) {
|
||||
case NONE:
|
||||
if (LOGGER.isTraceEnabled()) {
|
||||
LOGGER.trace("[ControllerKVClient]: Put Namespaced KV: {}, result: {}", keyValue, response);
|
||||
}
|
||||
return ResponseHandleResult.withSuccess(ValueAndEpoch.of(response.value(), response.epoch()));
|
||||
default:
|
||||
LOGGER.error("[ControllerKVClient]: Failed to Put Namespaced KV: {}, code: {}, retry later", keyValue, code);
|
||||
return ResponseHandleResult.withRetry();
|
||||
}
|
||||
});
|
||||
this.requestSender.send(task);
|
||||
return future;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<ValueAndEpoch> getNamespacedKV(KeyAndNamespace keyAndNamespace) {
|
||||
if (LOGGER.isTraceEnabled()) {
|
||||
LOGGER.trace("[ControllerKVClient]: Get KV: {}, Namespace: {}", keyAndNamespace.key(), keyAndNamespace.namespace());
|
||||
}
|
||||
GetKVRequest request = new GetKVRequest()
|
||||
.setKey(keyAndNamespace.key().get())
|
||||
.setNamespace(keyAndNamespace.namespace());
|
||||
WrapRequest req = new BatchRequest() {
|
||||
@Override
|
||||
public Builder addSubRequest(Builder builder) {
|
||||
GetKVsRequest.Builder realBuilder = (GetKVsRequest.Builder) builder;
|
||||
return realBuilder.addSubRequest(request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ApiKeys apiKey() {
|
||||
return ApiKeys.GET_KVS;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Builder toRequestBuilder() {
|
||||
return new GetKVsRequest.Builder(
|
||||
new GetKVsRequestData()
|
||||
).addSubRequest(request);
|
||||
}
|
||||
};
|
||||
CompletableFuture<ValueAndEpoch> future = new CompletableFuture<>();
|
||||
RequestTask<GetKVResponse, ValueAndEpoch> task = new RequestTask<>(req, future, response -> {
|
||||
Errors code = Errors.forCode(response.errorCode());
|
||||
switch (code) {
|
||||
case NONE:
|
||||
ValueAndEpoch val = ValueAndEpoch.of(response.value(), response.epoch());
|
||||
if (LOGGER.isTraceEnabled()) {
|
||||
LOGGER.trace("[ControllerKVClient]: Get Namespaced KV: {}, result: {}", keyAndNamespace.key(), response);
|
||||
}
|
||||
return ResponseHandleResult.withSuccess(val);
|
||||
default:
|
||||
LOGGER.error("[ControllerKVClient]: Failed to Get Namespaced KV: {}, code: {}, retry later", keyAndNamespace.key(), code);
|
||||
return ResponseHandleResult.withRetry();
|
||||
}
|
||||
});
|
||||
this.requestSender.send(task);
|
||||
return future;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<ValueAndEpoch> delNamespacedKV(KeyValue keyValue) {
|
||||
if (LOGGER.isTraceEnabled()) {
|
||||
LOGGER.trace("[ControllerKVClient]: Delete Namespaced KV: {}", keyValue.key());
|
||||
}
|
||||
DeleteKVRequest request = new DeleteKVRequest()
|
||||
.setKey(keyValue.key().get());
|
||||
WrapRequest req = new BatchRequest() {
|
||||
@Override
|
||||
public Builder addSubRequest(Builder builder) {
|
||||
DeleteKVsRequest.Builder realBuilder = (DeleteKVsRequest.Builder) builder;
|
||||
return realBuilder.addSubRequest(request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ApiKeys apiKey() {
|
||||
return ApiKeys.DELETE_KVS;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Builder toRequestBuilder() {
|
||||
return new DeleteKVsRequest.Builder(
|
||||
new DeleteKVsRequestData()
|
||||
).addSubRequest(request);
|
||||
}
|
||||
};
|
||||
|
||||
CompletableFuture<ValueAndEpoch> future = new CompletableFuture<>();
|
||||
RequestTask<DeleteKVResponse, ValueAndEpoch> task = new RequestTask<>(req, future, response -> {
|
||||
Errors code = Errors.forCode(response.errorCode());
|
||||
switch (code) {
|
||||
case NONE:
|
||||
if (LOGGER.isTraceEnabled()) {
|
||||
LOGGER.trace("[ControllerKVClient]: Delete Namespaced KV: {}, result: {}", keyValue.key(), response);
|
||||
}
|
||||
return ResponseHandleResult.withSuccess(ValueAndEpoch.of(response.value(), response.epoch()));
|
||||
case KEY_NOT_EXIST:
|
||||
LOGGER.info("[ControllerKVClient]: Delete Namespaced KV: {}, result: KEY_NOT_EXIST", keyValue.key());
|
||||
return ResponseHandleResult.withSuccess(ValueAndEpoch.of((ByteBuffer) null, 0L));
|
||||
default:
|
||||
LOGGER.error("[ControllerKVClient]: Failed to Delete Namespaced KV: {}, code: {}, retry later", keyValue.key(), code);
|
||||
return ResponseHandleResult.withRetry();
|
||||
}
|
||||
});
|
||||
this.requestSender.send(task);
|
||||
return future;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -19,6 +19,7 @@
|
|||
|
||||
package kafka.log.streamaspect;
|
||||
|
||||
|
||||
import com.automq.stream.DefaultRecordBatch;
|
||||
import com.automq.stream.RecordBatchWithContextWrapper;
|
||||
import com.automq.stream.api.AppendResult;
|
||||
|
|
@ -28,7 +29,9 @@ import com.automq.stream.api.FetchResult;
|
|||
import com.automq.stream.api.KVClient;
|
||||
import com.automq.stream.api.KeyValue;
|
||||
import com.automq.stream.api.KeyValue.Key;
|
||||
import com.automq.stream.api.KeyValue.KeyAndNamespace;
|
||||
import com.automq.stream.api.KeyValue.Value;
|
||||
import com.automq.stream.api.KeyValue.ValueAndEpoch;
|
||||
import com.automq.stream.api.OpenStreamOptions;
|
||||
import com.automq.stream.api.RecordBatch;
|
||||
import com.automq.stream.api.RecordBatchWithContext;
|
||||
|
|
@ -51,6 +54,9 @@ import java.util.concurrent.ConcurrentHashMap;
|
|||
import java.util.concurrent.ConcurrentSkipListMap;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import static com.automq.stream.utils.KVRecordUtils.buildCompositeKey;
|
||||
import static org.apache.kafka.common.protocol.Errors.INVALID_KV_RECORD_EPOCH;
|
||||
|
||||
public class MemoryClient implements Client {
|
||||
private final StreamClient streamClient = new StreamClientImpl();
|
||||
private final KVClient kvClient = new KVClientImpl();
|
||||
|
|
@ -180,6 +186,7 @@ public class MemoryClient implements Client {
|
|||
|
||||
public static class KVClientImpl implements KVClient {
|
||||
private final Map<String, ByteBuffer> store = new ConcurrentHashMap<>();
|
||||
private final Map<String, KeyMetadata> keyMetadataMap = new ConcurrentHashMap<>();
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Value> putKV(KeyValue keyValue) {
|
||||
|
|
@ -202,5 +209,77 @@ public class MemoryClient implements Client {
|
|||
public CompletableFuture<Value> delKV(Key key) {
|
||||
return CompletableFuture.completedFuture(Value.of(store.remove(key.get())));
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<ValueAndEpoch> putNamespacedKVIfAbsent(KeyValue keyValue) {
|
||||
String key = buildCompositeKey(keyValue.namespace(), keyValue.key().get());
|
||||
KeyMetadata keyMetadata = keyMetadataMap.get(key);
|
||||
long currentEpoch = keyMetadata != null ? keyMetadata.getEpoch() : 0;
|
||||
if (keyValue.epoch() > 0 && keyValue.epoch() != currentEpoch) {
|
||||
return CompletableFuture.failedFuture(INVALID_KV_RECORD_EPOCH.exception());
|
||||
}
|
||||
long newEpoch = System.currentTimeMillis();
|
||||
|
||||
ByteBuffer value = store.putIfAbsent(key, keyValue.value().get().duplicate());
|
||||
if (keyValue.namespace() != null && !keyValue.namespace().isEmpty()) {
|
||||
keyMetadataMap.putIfAbsent(keyValue.key().get(), new KeyMetadata(keyValue.namespace(), newEpoch));
|
||||
}
|
||||
return CompletableFuture.completedFuture(ValueAndEpoch.of(value, newEpoch));
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<ValueAndEpoch> putNamespacedKV(KeyValue keyValue) {
|
||||
String key = buildCompositeKey(keyValue.namespace(), keyValue.key().get());
|
||||
KeyMetadata keyMetadata = keyMetadataMap.get(key);
|
||||
long currentEpoch = keyMetadata != null ? keyMetadata.getEpoch() : 0;
|
||||
if (keyValue.epoch() > 0 && keyValue.epoch() != currentEpoch) {
|
||||
return CompletableFuture.failedFuture(INVALID_KV_RECORD_EPOCH.exception());
|
||||
}
|
||||
long newEpoch = System.currentTimeMillis();
|
||||
|
||||
ByteBuffer value = store.put(key, keyValue.value().get().duplicate());
|
||||
if (keyValue.namespace() != null && !keyValue.namespace().isEmpty()) {
|
||||
keyMetadataMap.put(keyValue.key().get(), new KeyMetadata(keyValue.namespace(), newEpoch));
|
||||
}
|
||||
return CompletableFuture.completedFuture(ValueAndEpoch.of(value, newEpoch));
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<ValueAndEpoch> getNamespacedKV(KeyAndNamespace keyAndNamespace) {
|
||||
String key = buildCompositeKey(keyAndNamespace.namespace(), keyAndNamespace.key().get());
|
||||
KeyMetadata keyMetadata = null;
|
||||
if (keyAndNamespace.namespace() != null && !keyAndNamespace.namespace().isEmpty()) {
|
||||
keyMetadata = keyMetadataMap.get(key);
|
||||
}
|
||||
return CompletableFuture.completedFuture(ValueAndEpoch.of(store.get(key), keyMetadata != null ? keyMetadata.getEpoch() : 0L));
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<ValueAndEpoch> delNamespacedKV(KeyValue keyValue) {
|
||||
String key = buildCompositeKey(keyValue.namespace(), keyValue.key().get());
|
||||
KeyMetadata keyMetadata = keyMetadataMap.get(key);
|
||||
long currentEpoch = keyMetadata != null ? keyMetadata.getEpoch() : 0;
|
||||
if (keyValue.epoch() > 0 && keyValue.epoch() != currentEpoch) {
|
||||
return CompletableFuture.failedFuture(INVALID_KV_RECORD_EPOCH.exception());
|
||||
}
|
||||
return CompletableFuture.completedFuture(ValueAndEpoch.of(store.remove(key), currentEpoch));
|
||||
}
|
||||
|
||||
private static class KeyMetadata {
|
||||
private final long epoch;
|
||||
private final String namespace;
|
||||
public KeyMetadata(String namespace, long epoch) {
|
||||
this.namespace = namespace;
|
||||
this.epoch = epoch;
|
||||
}
|
||||
|
||||
public long getEpoch() {
|
||||
return epoch;
|
||||
}
|
||||
|
||||
public String getNamespace() {
|
||||
return namespace;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -2051,9 +2051,6 @@ public final class QuorumController implements Controller {
|
|||
this.time = time;
|
||||
this.controllerMetrics = controllerMetrics;
|
||||
this.snapshotRegistry = new SnapshotRegistry(logContext);
|
||||
// AutoMQ for Kafka inject start
|
||||
this.kvControlManager = new KVControlManager(snapshotRegistry, logContext);
|
||||
// AutoMQ for Kafka inject end
|
||||
this.deferredEventQueue = new DeferredEventQueue(logContext);
|
||||
this.deferredUnstableEventQueue = new DeferredEventQueue(logContext);
|
||||
this.offsetControl = new OffsetControlManager.Builder().
|
||||
|
|
@ -2090,6 +2087,9 @@ public final class QuorumController implements Controller {
|
|||
setMetadataVersion(MetadataVersion.MINIMUM_KRAFT_VERSION).
|
||||
setClusterFeatureSupportDescriber(clusterSupportDescriber).
|
||||
build();
|
||||
// AutoMQ for Kafka inject start
|
||||
this.kvControlManager = new KVControlManager(snapshotRegistry, logContext, featureControl);
|
||||
// AutoMQ for Kafka inject end
|
||||
this.clusterControl = new ClusterControlManager.Builder().
|
||||
setLogContext(logContext).
|
||||
setClusterId(clusterId).
|
||||
|
|
|
|||
|
|
@ -28,6 +28,7 @@ import org.apache.kafka.common.metadata.KVRecord.KeyValue;
|
|||
import org.apache.kafka.common.metadata.RemoveKVRecord;
|
||||
import org.apache.kafka.common.utils.LogContext;
|
||||
import org.apache.kafka.controller.ControllerResult;
|
||||
import org.apache.kafka.controller.FeatureControlManager;
|
||||
import org.apache.kafka.server.common.ApiMessageAndVersion;
|
||||
import org.apache.kafka.timeline.SnapshotRegistry;
|
||||
import org.apache.kafka.timeline.TimelineHashMap;
|
||||
|
|
@ -39,6 +40,8 @@ import java.util.Collections;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static com.automq.stream.utils.KVRecordUtils.buildCompositeKey;
|
||||
import static org.apache.kafka.common.protocol.Errors.INVALID_KV_RECORD_EPOCH;
|
||||
import static org.apache.kafka.common.protocol.Errors.KEY_EXIST;
|
||||
import static org.apache.kafka.common.protocol.Errors.KEY_NOT_EXIST;
|
||||
|
||||
|
|
@ -47,30 +50,54 @@ public class KVControlManager {
|
|||
private final SnapshotRegistry registry;
|
||||
private final Logger log;
|
||||
private final TimelineHashMap<String, ByteBuffer> kv;
|
||||
private final TimelineHashMap<String, KeyMetadata> keyMetadataMap;
|
||||
private final FeatureControlManager featureControl;
|
||||
|
||||
public KVControlManager(SnapshotRegistry registry, LogContext logContext) {
|
||||
public KVControlManager(SnapshotRegistry registry, LogContext logContext, FeatureControlManager featureControl) {
|
||||
this.registry = registry;
|
||||
this.log = logContext.logger(KVControlManager.class);
|
||||
this.kv = new TimelineHashMap<>(registry, 0);
|
||||
this.keyMetadataMap = new TimelineHashMap<>(registry, 0);
|
||||
this.featureControl = featureControl;
|
||||
}
|
||||
|
||||
public GetKVResponse getKV(GetKVRequest request) {
|
||||
String key = request.key();
|
||||
String key = buildCompositeKey(request.namespace(), request.key());
|
||||
byte[] value = kv.containsKey(key) ? kv.get(key).array() : null;
|
||||
KeyMetadata keyMetadata = null;
|
||||
if (request.namespace() != null && !request.namespace().isEmpty()) {
|
||||
keyMetadata = keyMetadataMap.get(key);
|
||||
}
|
||||
|
||||
return new GetKVResponse()
|
||||
.setValue(value);
|
||||
.setValue(value)
|
||||
.setNamespace(request.namespace())
|
||||
.setEpoch(keyMetadata != null ? keyMetadata.getEpoch() : 0L);
|
||||
}
|
||||
|
||||
public ControllerResult<PutKVResponse> putKV(PutKVRequest request) {
|
||||
String key = request.key();
|
||||
String key = buildCompositeKey(request.namespace(), request.key());
|
||||
KeyMetadata keyMetadata = keyMetadataMap.get(key);
|
||||
long currentEpoch = keyMetadata != null ? keyMetadata.getEpoch() : 0;
|
||||
if (request.epoch() > 0 && request.epoch() != currentEpoch) {
|
||||
return ControllerResult.of(Collections.emptyList(),
|
||||
new PutKVResponse()
|
||||
.setErrorCode(INVALID_KV_RECORD_EPOCH.code())
|
||||
.setEpoch(currentEpoch));
|
||||
}
|
||||
|
||||
long newEpoch = System.currentTimeMillis();
|
||||
ByteBuffer value = kv.get(key);
|
||||
if (value == null || request.overwrite()) {
|
||||
// generate kv record
|
||||
ApiMessageAndVersion record = new ApiMessageAndVersion(new KVRecord()
|
||||
.setKeyValues(Collections.singletonList(new KeyValue()
|
||||
.setKey(key)
|
||||
.setValue(request.value()))), (short) 0);
|
||||
return ControllerResult.of(Collections.singletonList(record), new PutKVResponse().setValue(request.value()));
|
||||
.setValue(request.value())
|
||||
.setNamespace(request.namespace())
|
||||
.setEpoch(newEpoch))),
|
||||
featureControl.autoMQVersion().namespacedKVRecordVersion());
|
||||
return ControllerResult.of(Collections.singletonList(record), new PutKVResponse().setValue(request.value()).setEpoch(newEpoch));
|
||||
}
|
||||
// exist and not allow overwriting
|
||||
return ControllerResult.of(Collections.emptyList(), new PutKVResponse()
|
||||
|
|
@ -79,14 +106,25 @@ public class KVControlManager {
|
|||
}
|
||||
|
||||
public ControllerResult<DeleteKVResponse> deleteKV(DeleteKVRequest request) {
|
||||
String key = buildCompositeKey(request.namespace(), request.key());
|
||||
KeyMetadata keyMetadata = keyMetadataMap.get(key);
|
||||
long currentEpoch = keyMetadata != null ? keyMetadata.getEpoch() : 0;
|
||||
if (request.epoch() > 0 && request.epoch() != currentEpoch) {
|
||||
return ControllerResult.of(Collections.emptyList(),
|
||||
new DeleteKVResponse()
|
||||
.setErrorCode(INVALID_KV_RECORD_EPOCH.code())
|
||||
.setEpoch(currentEpoch));
|
||||
}
|
||||
log.trace("DeleteKVRequestData: {}", request);
|
||||
DeleteKVResponse resp = new DeleteKVResponse();
|
||||
ByteBuffer value = kv.get(request.key());
|
||||
ByteBuffer value = kv.get(key);
|
||||
if (value != null) {
|
||||
// generate remove-kv record
|
||||
ApiMessageAndVersion record = new ApiMessageAndVersion(new RemoveKVRecord()
|
||||
.setKeys(Collections.singletonList(request.key())), (short) 0);
|
||||
return ControllerResult.of(Collections.singletonList(record), resp.setValue(value.array()));
|
||||
.setKeys(Collections.singletonList(key))
|
||||
.setNamespace(request.namespace()),
|
||||
featureControl.autoMQVersion().namespacedKVRecordVersion());
|
||||
return ControllerResult.of(Collections.singletonList(record), resp.setValue(value.array()).setEpoch(currentEpoch));
|
||||
}
|
||||
return ControllerResult.of(Collections.emptyList(), resp.setErrorCode(KEY_NOT_EXIST.code()));
|
||||
}
|
||||
|
|
@ -95,6 +133,9 @@ public class KVControlManager {
|
|||
List<KeyValue> keyValues = record.keyValues();
|
||||
for (KeyValue keyValue : keyValues) {
|
||||
kv.put(keyValue.key(), ByteBuffer.wrap(keyValue.value()));
|
||||
if (keyValue.namespace() != null && !keyValue.namespace().isEmpty()) {
|
||||
keyMetadataMap.put(keyValue.key(), new KeyMetadata(keyValue.namespace(), keyValue.epoch()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -102,10 +143,30 @@ public class KVControlManager {
|
|||
List<String> keys = record.keys();
|
||||
for (String key : keys) {
|
||||
kv.remove(key);
|
||||
if (record.namespace() != null && !record.namespace().isEmpty()) {
|
||||
keyMetadataMap.remove(key);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public Map<String, ByteBuffer> kv() {
|
||||
return kv;
|
||||
}
|
||||
|
||||
private static class KeyMetadata {
|
||||
private final long epoch;
|
||||
private final String namespace;
|
||||
public KeyMetadata(String namespace, long epoch) {
|
||||
this.namespace = namespace;
|
||||
this.epoch = epoch;
|
||||
}
|
||||
|
||||
public long getEpoch() {
|
||||
return epoch;
|
||||
}
|
||||
|
||||
public String getNamespace() {
|
||||
return namespace;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -17,7 +17,7 @@
|
|||
"apiKey": 516,
|
||||
"type": "metadata",
|
||||
"name": "KVRecord",
|
||||
"validVersions": "0",
|
||||
"validVersions": "0-1",
|
||||
"flexibleVersions": "0+",
|
||||
"fields": [
|
||||
{
|
||||
|
|
@ -37,8 +37,20 @@
|
|||
"type": "bytes",
|
||||
"versions": "0+",
|
||||
"about": "Value"
|
||||
},
|
||||
{
|
||||
"name": "Namespace",
|
||||
"type": "string",
|
||||
"versions": "1+",
|
||||
"about": "Namespace"
|
||||
},
|
||||
{
|
||||
"name": "Epoch",
|
||||
"type": "int64",
|
||||
"versions": "1+",
|
||||
"about": "Epoch"
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -17,7 +17,7 @@
|
|||
"apiKey": 517,
|
||||
"type": "metadata",
|
||||
"name": "RemoveKVRecord",
|
||||
"validVersions": "0",
|
||||
"validVersions": "0-1",
|
||||
"flexibleVersions": "0+",
|
||||
"fields": [
|
||||
{
|
||||
|
|
@ -25,6 +25,12 @@
|
|||
"type": "[]string",
|
||||
"versions": "0+",
|
||||
"about": "Keys"
|
||||
},
|
||||
{
|
||||
"name": "Namespace",
|
||||
"type": "string",
|
||||
"versions": "1+",
|
||||
"about": "Namespace"
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -74,6 +74,7 @@ import java.util.Set;
|
|||
import java.util.stream.Stream;
|
||||
|
||||
import static java.util.Arrays.asList;
|
||||
import static org.apache.kafka.controller.FeatureControlManagerTest.features;
|
||||
import static org.apache.kafka.server.common.MetadataVersion.IBP_3_3_IV2;
|
||||
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
|
|
@ -728,7 +729,12 @@ public class ClusterControlManagerTest {
|
|||
public void testReusableNodeIds() {
|
||||
MockTime time = new MockTime(0, 0, 0);
|
||||
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
|
||||
KVControlManager kvControl = new KVControlManager(snapshotRegistry, new LogContext());
|
||||
FeatureControlManager featureControlManager = new FeatureControlManager.Builder().
|
||||
setQuorumFeatures(features("foo", 1, 2)).
|
||||
setSnapshotRegistry(snapshotRegistry).
|
||||
setMetadataVersion(MetadataVersion.IBP_3_3_IV0).
|
||||
build();
|
||||
KVControlManager kvControl = new KVControlManager(snapshotRegistry, new LogContext(), featureControlManager);
|
||||
FeatureControlManager featureControl = new FeatureControlManager.Builder().
|
||||
setSnapshotRegistry(snapshotRegistry).
|
||||
setQuorumFeatures(new QuorumFeatures(0,
|
||||
|
|
|
|||
|
|
@ -31,6 +31,7 @@ import org.apache.kafka.common.protocol.Errors;
|
|||
import org.apache.kafka.common.utils.LogContext;
|
||||
import org.apache.kafka.controller.stream.KVControlManager;
|
||||
import org.apache.kafka.server.common.ApiMessageAndVersion;
|
||||
import org.apache.kafka.server.common.MetadataVersion;
|
||||
import org.apache.kafka.timeline.SnapshotRegistry;
|
||||
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
|
|
@ -41,8 +42,11 @@ import org.junit.jupiter.api.Timeout;
|
|||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.apache.kafka.common.protocol.Errors.INVALID_KV_RECORD_EPOCH;
|
||||
import static org.apache.kafka.controller.FeatureControlManagerTest.features;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
@Timeout(40)
|
||||
@Tag("S3Unit")
|
||||
|
|
@ -54,7 +58,12 @@ public class KVControlManagerTest {
|
|||
public void setUp() {
|
||||
LogContext logContext = new LogContext();
|
||||
SnapshotRegistry registry = new SnapshotRegistry(logContext);
|
||||
this.manager = new KVControlManager(registry, logContext);
|
||||
FeatureControlManager featureControlManager = new FeatureControlManager.Builder().
|
||||
setQuorumFeatures(features("foo", 1, 2)).
|
||||
setSnapshotRegistry(registry).
|
||||
setMetadataVersion(MetadataVersion.IBP_3_3_IV0).
|
||||
build();
|
||||
this.manager = new KVControlManager(registry, logContext, featureControlManager);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
@ -113,6 +122,149 @@ public class KVControlManagerTest {
|
|||
assertEquals(Errors.KEY_NOT_EXIST.code(), result3.response().errorCode());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNamespacedReadWrite() {
|
||||
ControllerResult<PutKVResponse> result = manager.putKV(new PutKVRequest()
|
||||
.setKey("key1")
|
||||
.setValue("value1".getBytes())
|
||||
.setNamespace("__automq_test")
|
||||
);
|
||||
assertEquals(1, result.records().size());
|
||||
assertEquals(Errors.NONE.code(), result.response().errorCode());
|
||||
assertEquals("value1", new String(result.response().value()));
|
||||
replay(manager, result.records());
|
||||
|
||||
result = manager.putKV(new PutKVRequest()
|
||||
.setKey("key1")
|
||||
.setValue("value1-1".getBytes())
|
||||
.setNamespace("__automq_test")
|
||||
);
|
||||
assertEquals(0, result.records().size());
|
||||
assertEquals(Errors.KEY_EXIST.code(), result.response().errorCode());
|
||||
assertEquals("value1", new String(result.response().value()));
|
||||
|
||||
result = manager.putKV(new PutKVRequest()
|
||||
.setKey("key1")
|
||||
.setValue("value1-2".getBytes())
|
||||
.setNamespace("__automq_test")
|
||||
.setOverwrite(true));
|
||||
assertEquals(1, result.records().size());
|
||||
assertEquals(Errors.NONE.code(), result.response().errorCode());
|
||||
assertEquals("value1-2", new String(result.response().value()));
|
||||
replay(manager, result.records());
|
||||
|
||||
GetKVResponse result2 = manager.getKV(new GetKVRequest()
|
||||
.setKey("key1")
|
||||
.setNamespace("__automq_test"));
|
||||
assertEquals("value1-2", new String(result2.value()));
|
||||
|
||||
result2 = manager.getKV(new GetKVRequest()
|
||||
.setKey("key2")
|
||||
.setNamespace("__automq_test"));
|
||||
assertNull(result2.value());
|
||||
|
||||
ControllerResult<DeleteKVResponse> result3 = manager.deleteKV(new DeleteKVRequest()
|
||||
.setKey("key2")
|
||||
.setNamespace("__automq_test"));
|
||||
assertEquals(0, result3.records().size());
|
||||
assertEquals(Errors.KEY_NOT_EXIST.code(), result3.response().errorCode());
|
||||
|
||||
result3 = manager.deleteKV(new DeleteKVRequest()
|
||||
.setKey("key1")
|
||||
.setNamespace("__automq_test")
|
||||
);
|
||||
assertEquals(1, result3.records().size());
|
||||
assertEquals(Errors.NONE.code(), result3.response().errorCode());
|
||||
assertEquals("value1-2", new String(result3.response().value()));
|
||||
replay(manager, result3.records());
|
||||
// key1 is deleted
|
||||
result2 = manager.getKV(new GetKVRequest()
|
||||
.setKey("key1")
|
||||
.setNamespace("__automq_test"));
|
||||
assertNull(result2.value());
|
||||
|
||||
result3 = manager.deleteKV(new DeleteKVRequest()
|
||||
.setKey("key1")
|
||||
.setNamespace("__automq_test"));
|
||||
assertEquals(0, result3.records().size());
|
||||
assertEquals(Errors.KEY_NOT_EXIST.code(), result3.response().errorCode());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPutWithEpochValidation() {
|
||||
ControllerResult<PutKVResponse> result = manager.putKV(new PutKVRequest()
|
||||
.setKey("key1")
|
||||
.setValue("value1".getBytes())
|
||||
.setNamespace("__epoch_test")
|
||||
.setEpoch(0));
|
||||
assertEquals(1, result.records().size());
|
||||
assertEquals(Errors.NONE.code(), result.response().errorCode());
|
||||
long initialEpoch = result.response().epoch();
|
||||
assertTrue(initialEpoch > 0);
|
||||
replay(manager, result.records());
|
||||
result = manager.putKV(new PutKVRequest()
|
||||
.setKey("key1")
|
||||
.setValue("value2".getBytes())
|
||||
.setNamespace("__epoch_test")
|
||||
.setEpoch(initialEpoch - 1)
|
||||
.setOverwrite(true));
|
||||
assertEquals(0, result.records().size());
|
||||
assertEquals(INVALID_KV_RECORD_EPOCH.code(), result.response().errorCode());
|
||||
assertEquals(initialEpoch, result.response().epoch());
|
||||
// without overwrite, should fail
|
||||
result = manager.putKV(new PutKVRequest()
|
||||
.setKey("key1")
|
||||
.setValue("value2".getBytes())
|
||||
.setNamespace("__epoch_test")
|
||||
.setEpoch(initialEpoch));
|
||||
assertEquals(0, result.records().size());
|
||||
assertEquals(Errors.KEY_EXIST.code(), result.response().errorCode());
|
||||
// with overwrite, should succeed
|
||||
result = manager.putKV(new PutKVRequest()
|
||||
.setKey("key1")
|
||||
.setValue("value2".getBytes())
|
||||
.setNamespace("__epoch_test")
|
||||
.setEpoch(initialEpoch)
|
||||
.setOverwrite(true));
|
||||
assertEquals(1, result.records().size());
|
||||
assertEquals(Errors.NONE.code(), result.response().errorCode());
|
||||
long newEpoch = result.response().epoch();
|
||||
assertTrue(newEpoch > initialEpoch);
|
||||
replay(manager, result.records());
|
||||
GetKVResponse readResp = manager.getKV(new GetKVRequest()
|
||||
.setKey("key1")
|
||||
.setNamespace("__epoch_test"));
|
||||
assertEquals(newEpoch, readResp.epoch());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDeleteWithEpochValidation() {
|
||||
ControllerResult<PutKVResponse> putResult = manager.putKV(new PutKVRequest()
|
||||
.setKey("key1")
|
||||
.setValue("value1".getBytes())
|
||||
.setNamespace("__epoch_test"));
|
||||
long initialEpoch = putResult.response().epoch();
|
||||
replay(manager, putResult.records());
|
||||
ControllerResult<DeleteKVResponse> delResult = manager.deleteKV(new DeleteKVRequest()
|
||||
.setKey("key1")
|
||||
.setNamespace("__epoch_test")
|
||||
.setEpoch(initialEpoch - 1));
|
||||
assertEquals(0, delResult.records().size());
|
||||
assertEquals(INVALID_KV_RECORD_EPOCH.code(), delResult.response().errorCode());
|
||||
assertEquals(initialEpoch, delResult.response().epoch());
|
||||
delResult = manager.deleteKV(new DeleteKVRequest()
|
||||
.setKey("key1")
|
||||
.setNamespace("__epoch_test")
|
||||
.setEpoch(initialEpoch));
|
||||
assertEquals(1, delResult.records().size());
|
||||
assertEquals(Errors.NONE.code(), delResult.response().errorCode());
|
||||
replay(manager, delResult.records());
|
||||
GetKVResponse readResp = manager.getKV(new GetKVRequest()
|
||||
.setKey("key1")
|
||||
.setNamespace("__epoch_test"));
|
||||
assertNull(readResp.value());
|
||||
}
|
||||
|
||||
private void replay(KVControlManager manager, List<ApiMessageAndVersion> records) {
|
||||
List<ApiMessage> messages = records.stream().map(x -> x.message())
|
||||
.collect(Collectors.toList());
|
||||
|
|
|
|||
|
|
@ -20,7 +20,9 @@
|
|||
package com.automq.stream.api;
|
||||
|
||||
import com.automq.stream.api.KeyValue.Key;
|
||||
import com.automq.stream.api.KeyValue.KeyAndNamespace;
|
||||
import com.automq.stream.api.KeyValue.Value;
|
||||
import com.automq.stream.api.KeyValue.ValueAndEpoch;
|
||||
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
|
||||
|
|
@ -36,6 +38,14 @@ public interface KVClient {
|
|||
*/
|
||||
CompletableFuture<Value> putKVIfAbsent(KeyValue keyValue);
|
||||
|
||||
/**
|
||||
* Put namespaced key value if key not exist, return current key value namespace epoch after putting.
|
||||
*
|
||||
* @param keyValue {@link KeyValue} k-v pair namespace and epoch
|
||||
* @return async put result. {@link ValueAndEpoch} current value and epoch after putting.
|
||||
*/
|
||||
CompletableFuture<ValueAndEpoch> putNamespacedKVIfAbsent(KeyValue keyValue);
|
||||
|
||||
/**
|
||||
* Put key value, overwrite if key exist, return current key value after putting.
|
||||
*
|
||||
|
|
@ -44,6 +54,14 @@ public interface KVClient {
|
|||
*/
|
||||
CompletableFuture<Value> putKV(KeyValue keyValue);
|
||||
|
||||
/**
|
||||
* Put key value, overwrite if key exist, return current key value namespace epoch after putting.
|
||||
*
|
||||
* @param keyValue {@link KeyValue} k-v pair namespace and epoch
|
||||
* @return async put result. {@link ValueAndEpoch} current value and epoch after putting.
|
||||
*/
|
||||
CompletableFuture<ValueAndEpoch> putNamespacedKV(KeyValue keyValue);
|
||||
|
||||
/**
|
||||
* Get value by key.
|
||||
*
|
||||
|
|
@ -52,6 +70,14 @@ public interface KVClient {
|
|||
*/
|
||||
CompletableFuture<Value> getKV(Key key);
|
||||
|
||||
/**
|
||||
* Get value by key.
|
||||
*
|
||||
* @param keyAndNamespace key and namespace.
|
||||
* @return async get result. {@link ValueAndEpoch} value and epoch, null if key not exist.
|
||||
*/
|
||||
CompletableFuture<ValueAndEpoch> getNamespacedKV(KeyAndNamespace keyAndNamespace);
|
||||
|
||||
/**
|
||||
* Delete key value by key. If key not exist, return null.
|
||||
*
|
||||
|
|
@ -59,4 +85,12 @@ public interface KVClient {
|
|||
* @return async delete result. {@link Value} deleted value, null if key not exist.
|
||||
*/
|
||||
CompletableFuture<Value> delKV(Key key);
|
||||
|
||||
/**
|
||||
* Delete key value by key. If key not exist, return null.
|
||||
*
|
||||
* @param keyValue k-v pair namespace and epoch
|
||||
* @return async delete result. {@link ValueAndEpoch} deleted value and epoch, null if key not exist.
|
||||
*/
|
||||
CompletableFuture<ValueAndEpoch> delNamespacedKV(KeyValue keyValue);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -25,16 +25,31 @@ import java.util.Objects;
|
|||
public class KeyValue {
|
||||
private final Key key;
|
||||
private final Value value;
|
||||
private final String namespace;
|
||||
private final long epoch;
|
||||
|
||||
private KeyValue(Key key, Value value) {
|
||||
this.key = key;
|
||||
this.value = value;
|
||||
this.namespace = null;
|
||||
this.epoch = 0L;
|
||||
}
|
||||
|
||||
public KeyValue(Key key, Value value, String namespace, long epoch) {
|
||||
this.key = key;
|
||||
this.value = value;
|
||||
this.namespace = namespace;
|
||||
this.epoch = epoch;
|
||||
}
|
||||
|
||||
public static KeyValue of(String key, ByteBuffer value) {
|
||||
return new KeyValue(Key.of(key), Value.of(value));
|
||||
}
|
||||
|
||||
public static KeyValue of(String key, ByteBuffer value, String namespace, long epoch) {
|
||||
return new KeyValue(Key.of(key), Value.of(value), namespace, epoch);
|
||||
}
|
||||
|
||||
public Key key() {
|
||||
return key;
|
||||
}
|
||||
|
|
@ -43,6 +58,14 @@ public class KeyValue {
|
|||
return value;
|
||||
}
|
||||
|
||||
public String namespace() {
|
||||
return namespace;
|
||||
}
|
||||
|
||||
public long epoch() {
|
||||
return epoch;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o)
|
||||
|
|
@ -154,4 +177,52 @@ public class KeyValue {
|
|||
'}';
|
||||
}
|
||||
}
|
||||
|
||||
public static class KeyAndNamespace {
|
||||
private final Key key;
|
||||
private final String namespace;
|
||||
|
||||
public KeyAndNamespace(Key key, String namespace) {
|
||||
this.key = key;
|
||||
this.namespace = namespace;
|
||||
}
|
||||
|
||||
public Key key() {
|
||||
return key;
|
||||
}
|
||||
|
||||
public String namespace() {
|
||||
return namespace;
|
||||
}
|
||||
|
||||
public static KeyAndNamespace of(String key, String namespace) {
|
||||
return new KeyAndNamespace(Key.of(key), namespace);
|
||||
}
|
||||
}
|
||||
|
||||
public static class ValueAndEpoch {
|
||||
private final Value value;
|
||||
private final long epoch;
|
||||
|
||||
public ValueAndEpoch(Value value, long epoch) {
|
||||
this.value = value;
|
||||
this.epoch = epoch;
|
||||
}
|
||||
|
||||
public Value value() {
|
||||
return value;
|
||||
}
|
||||
|
||||
public long epoch() {
|
||||
return epoch;
|
||||
}
|
||||
|
||||
public static ValueAndEpoch of(byte[] value, long epoch) {
|
||||
return new ValueAndEpoch(Value.of(value), epoch);
|
||||
}
|
||||
|
||||
public static ValueAndEpoch of(ByteBuffer value, long epoch) {
|
||||
return new ValueAndEpoch(Value.of(value), epoch);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,11 @@
|
|||
package com.automq.stream.utils;
|
||||
|
||||
public class KVRecordUtils {
|
||||
|
||||
public static String buildCompositeKey(String namespace, String key) {
|
||||
if (namespace == null || namespace.isEmpty()) {
|
||||
return key;
|
||||
}
|
||||
return namespace + "/" + key;
|
||||
}
|
||||
}
|
||||
|
|
@ -31,10 +31,12 @@ public enum AutoMQVersion {
|
|||
// Support object bucket index
|
||||
// Support huge cluster
|
||||
// Support node registration
|
||||
V2((short) 3);
|
||||
V2((short) 3),
|
||||
// Support kv record namespace
|
||||
V3((short) 4);
|
||||
|
||||
public static final String FEATURE_NAME = "automq.version";
|
||||
public static final AutoMQVersion LATEST = V2;
|
||||
public static final AutoMQVersion LATEST = V3;
|
||||
|
||||
private final short level;
|
||||
private final Version s3streamVersion;
|
||||
|
|
@ -125,6 +127,14 @@ public enum AutoMQVersion {
|
|||
}
|
||||
}
|
||||
|
||||
public short namespacedKVRecordVersion() {
|
||||
if (isAtLeast(V3)) {
|
||||
return 1;
|
||||
} else {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
public Version s3streamVersion() {
|
||||
return s3streamVersion;
|
||||
}
|
||||
|
|
@ -139,6 +149,7 @@ public enum AutoMQVersion {
|
|||
case 2:
|
||||
return Version.V0;
|
||||
case 3:
|
||||
case 4:
|
||||
return Version.V1;
|
||||
default:
|
||||
throw new IllegalArgumentException("Unknown AutoMQVersion level: " + automqVersion);
|
||||
|
|
|
|||
Loading…
Reference in New Issue