mirror of https://github.com/apache/kafka.git
KAFKA-12988 Asynchronous API support for RemoteLogMetadataManager add/update methods. (#11033)
Added asynchronous API support for RemoeLogMetadataManager add/update/put methods. Implemented the changes on default topic based RemoteLogMetadataManager. Refactored the respective tests to cover the introduced asynchronous APIs. Reviewers: Cong Ding <cong@ccding.com>, Jun Rao <junrao@gmail.com>
This commit is contained in:
parent
df55c7ecff
commit
1e19de3199
|
|
@ -25,6 +25,7 @@ import java.util.Iterator;
|
|||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
|
||||
/**
|
||||
* This interface provides storing and fetching remote log segment metadata with strongly consistent semantics.
|
||||
|
|
@ -53,7 +54,7 @@ import java.util.Set;
|
|||
public interface RemoteLogMetadataManager extends Configurable, Closeable {
|
||||
|
||||
/**
|
||||
* Adds {@link RemoteLogSegmentMetadata} with the containing {@link RemoteLogSegmentId} into {@link RemoteLogMetadataManager}.
|
||||
* This method is used to add {@link RemoteLogSegmentMetadata} asynchronously with the containing {@link RemoteLogSegmentId} into {@link RemoteLogMetadataManager}.
|
||||
* <p>
|
||||
* RemoteLogSegmentMetadata is identified by RemoteLogSegmentId and it should have the initial state which is {@link RemoteLogSegmentState#COPY_SEGMENT_STARTED}.
|
||||
* <p>
|
||||
|
|
@ -62,16 +63,17 @@ public interface RemoteLogMetadataManager extends Configurable, Closeable {
|
|||
* @param remoteLogSegmentMetadata metadata about the remote log segment.
|
||||
* @throws RemoteStorageException if there are any storage related errors occurred.
|
||||
* @throws IllegalArgumentException if the given metadata instance does not have the state as {@link RemoteLogSegmentState#COPY_SEGMENT_STARTED}
|
||||
* @return a CompletableFuture which will complete once this operation is finished.
|
||||
*/
|
||||
void addRemoteLogSegmentMetadata(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException;
|
||||
CompletableFuture<Void> addRemoteLogSegmentMetadata(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException;
|
||||
|
||||
/**
|
||||
* This method is used to update the {@link RemoteLogSegmentMetadata}. Currently, it allows to update with the new
|
||||
* This method is used to update the {@link RemoteLogSegmentMetadata} asynchronously. Currently, it allows to update with the new
|
||||
* state based on the life cycle of the segment. It can go through the below state transitions.
|
||||
* <p>
|
||||
* <pre>
|
||||
* +---------------------+ +----------------------+
|
||||
* |COPY_SEGMENT_STARTED |----------->|COPY_SEGMENT_FINISHED |
|
||||
* |COPY_SEGMENT_STARTED |----------->|COPY_SEGMENT_FINISHED |
|
||||
* +-------------------+-+ +--+-------------------+
|
||||
* | |
|
||||
* | |
|
||||
|
|
@ -104,8 +106,9 @@ public interface RemoteLogMetadataManager extends Configurable, Closeable {
|
|||
* @throws RemoteStorageException if there are any storage related errors occurred.
|
||||
* @throws RemoteResourceNotFoundException when there are no resources associated with the given remoteLogSegmentMetadataUpdate.
|
||||
* @throws IllegalArgumentException if the given metadata instance has the state as {@link RemoteLogSegmentState#COPY_SEGMENT_STARTED}
|
||||
* @return a CompletableFuture which will complete once this operation is finished.
|
||||
*/
|
||||
void updateRemoteLogSegmentMetadata(RemoteLogSegmentMetadataUpdate remoteLogSegmentMetadataUpdate)
|
||||
CompletableFuture<Void> updateRemoteLogSegmentMetadata(RemoteLogSegmentMetadataUpdate remoteLogSegmentMetadataUpdate)
|
||||
throws RemoteStorageException;
|
||||
|
||||
/**
|
||||
|
|
@ -137,7 +140,7 @@ public interface RemoteLogMetadataManager extends Configurable, Closeable {
|
|||
int leaderEpoch) throws RemoteStorageException;
|
||||
|
||||
/**
|
||||
* This method is used to update the metadata about remote partition delete event. Currently, it allows updating the
|
||||
* This method is used to update the metadata about remote partition delete event asynchronously. Currently, it allows updating the
|
||||
* state ({@link RemotePartitionDeleteState}) of a topic partition in remote metadata storage. Controller invokes
|
||||
* this method with {@link RemotePartitionDeleteMetadata} having state as {@link RemotePartitionDeleteState#DELETE_PARTITION_MARKED}.
|
||||
* So, remote partition removers can act on this event to clean the respective remote log segments of the partition.
|
||||
|
|
@ -153,8 +156,9 @@ public interface RemoteLogMetadataManager extends Configurable, Closeable {
|
|||
* @param remotePartitionDeleteMetadata update on delete state of a partition.
|
||||
* @throws RemoteStorageException if there are any storage related errors occurred.
|
||||
* @throws RemoteResourceNotFoundException when there are no resources associated with the given remotePartitionDeleteMetadata.
|
||||
* @return a CompletableFuture which will complete once this operation is finished.
|
||||
*/
|
||||
void putRemotePartitionDeleteMetadata(RemotePartitionDeleteMetadata remotePartitionDeleteMetadata)
|
||||
CompletableFuture<Void> putRemotePartitionDeleteMetadata(RemotePartitionDeleteMetadata remotePartitionDeleteMetadata)
|
||||
throws RemoteStorageException;
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -20,7 +20,6 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
|
|||
import org.apache.kafka.clients.producer.RecordMetadata;
|
||||
import org.apache.kafka.common.KafkaException;
|
||||
import org.apache.kafka.common.TopicIdPartition;
|
||||
import org.apache.kafka.common.errors.TimeoutException;
|
||||
import org.apache.kafka.common.utils.KafkaThread;
|
||||
import org.apache.kafka.common.utils.Time;
|
||||
import org.apache.kafka.common.utils.Utils;
|
||||
|
|
@ -31,6 +30,7 @@ import java.io.Closeable;
|
|||
import java.io.IOException;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
/**
|
||||
* This class manages the consumer thread viz {@link ConsumerTask} that polls messages from the assigned metadata topic partitions.
|
||||
|
|
@ -70,12 +70,27 @@ public class ConsumerManager implements Closeable {
|
|||
}
|
||||
|
||||
/**
|
||||
* Wait until the consumption reaches the offset of the metadata partition for the given {@code recordMetadata}.
|
||||
* Waits if necessary for the consumption to reach the offset of the given {@code recordMetadata}.
|
||||
*
|
||||
* @param recordMetadata record metadata to be checked for consumption.
|
||||
* @throws TimeoutException if this method execution did not complete with in the wait time configured with
|
||||
* property {@code TopicBasedRemoteLogMetadataManagerConfig#REMOTE_LOG_METADATA_CONSUME_WAIT_MS_PROP}.
|
||||
*/
|
||||
public void waitTillConsumptionCatchesUp(RecordMetadata recordMetadata) {
|
||||
public void waitTillConsumptionCatchesUp(RecordMetadata recordMetadata) throws TimeoutException {
|
||||
waitTillConsumptionCatchesUp(recordMetadata, rlmmConfig.consumeWaitMs());
|
||||
}
|
||||
|
||||
/**
|
||||
* Waits if necessary for the consumption to reach the offset of the given {@code recordMetadata}.
|
||||
*
|
||||
* @param recordMetadata record metadata to be checked for consumption.
|
||||
* @param timeoutMs wait timeout in milli seconds
|
||||
* @throws TimeoutException if this method execution did not complete with in the given {@code timeoutMs}.
|
||||
*/
|
||||
public void waitTillConsumptionCatchesUp(RecordMetadata recordMetadata,
|
||||
long timeoutMs) throws TimeoutException {
|
||||
final int partition = recordMetadata.partition();
|
||||
final long consumeCheckIntervalMs = Math.min(CONSUME_RECHECK_INTERVAL_MS, timeoutMs);
|
||||
|
||||
// If the current assignment does not have the subscription for this partition then return immediately.
|
||||
if (!consumerTask.isPartitionAssigned(partition)) {
|
||||
|
|
@ -87,19 +102,19 @@ public class ConsumerManager implements Closeable {
|
|||
while (true) {
|
||||
long receivedOffset = consumerTask.receivedOffsetForPartition(partition).orElse(-1L);
|
||||
if (receivedOffset >= offset) {
|
||||
break;
|
||||
return;
|
||||
}
|
||||
|
||||
log.debug("Committed offset [{}] for partition [{}], but the target offset: [{}], Sleeping for [{}] to retry again",
|
||||
offset, partition, receivedOffset, CONSUME_RECHECK_INTERVAL_MS);
|
||||
offset, partition, receivedOffset, consumeCheckIntervalMs);
|
||||
|
||||
if (time.milliseconds() - startTimeMs > rlmmConfig.consumeWaitMs()) {
|
||||
if (time.milliseconds() - startTimeMs > timeoutMs) {
|
||||
log.warn("Committed offset for partition:[{}] is : [{}], but the target offset: [{}] ",
|
||||
partition, receivedOffset, offset);
|
||||
partition, receivedOffset, offset);
|
||||
throw new TimeoutException("Timed out in catching up with the expected offset by consumer.");
|
||||
}
|
||||
|
||||
time.sleep(CONSUME_RECHECK_INTERVAL_MS);
|
||||
time.sleep(consumeCheckIntervalMs);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -29,6 +29,7 @@ import org.slf4j.LoggerFactory;
|
|||
|
||||
import java.io.Closeable;
|
||||
import java.time.Duration;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
|
||||
/**
|
||||
* This class is responsible for publishing messages into the remote log metadata topic partitions.
|
||||
|
|
@ -50,37 +51,45 @@ public class ProducerManager implements Closeable {
|
|||
topicPartitioner = rlmmTopicPartitioner;
|
||||
}
|
||||
|
||||
public RecordMetadata publishMessage(RemoteLogMetadata remoteLogMetadata) throws KafkaException {
|
||||
/**
|
||||
* Returns {@link CompletableFuture} which will complete only after publishing of the given {@code remoteLogMetadata}
|
||||
* is considered complete.
|
||||
*
|
||||
* @param remoteLogMetadata RemoteLogMetadata to be published
|
||||
* @return
|
||||
*/
|
||||
public CompletableFuture<RecordMetadata> publishMessage(RemoteLogMetadata remoteLogMetadata) {
|
||||
CompletableFuture<RecordMetadata> future = new CompletableFuture<>();
|
||||
|
||||
TopicIdPartition topicIdPartition = remoteLogMetadata.topicIdPartition();
|
||||
int metadataPartitionNum = topicPartitioner.metadataPartition(topicIdPartition);
|
||||
log.debug("Publishing metadata message of partition:[{}] into metadata topic partition:[{}] with payload: [{}]",
|
||||
topicIdPartition, metadataPartitionNum, remoteLogMetadata);
|
||||
topicIdPartition, metadataPartitionNum, remoteLogMetadata);
|
||||
if (metadataPartitionNum >= rlmmConfig.metadataTopicPartitionsCount()) {
|
||||
// This should never occur as long as metadata partitions always remain the same.
|
||||
throw new KafkaException("Chosen partition no " + metadataPartitionNum +
|
||||
" must be less than the partition count: " + rlmmConfig.metadataTopicPartitionsCount());
|
||||
}
|
||||
|
||||
ProducerCallback callback = new ProducerCallback();
|
||||
try {
|
||||
Callback callback = new Callback() {
|
||||
@Override
|
||||
public void onCompletion(RecordMetadata metadata,
|
||||
Exception exception) {
|
||||
if (exception != null) {
|
||||
future.completeExceptionally(exception);
|
||||
} else {
|
||||
future.complete(metadata);
|
||||
}
|
||||
}
|
||||
};
|
||||
producer.send(new ProducerRecord<>(rlmmConfig.remoteLogMetadataTopicName(), metadataPartitionNum, null,
|
||||
serde.serialize(remoteLogMetadata)), callback).get();
|
||||
} catch (KafkaException e) {
|
||||
throw e;
|
||||
} catch (Exception e) {
|
||||
throw new KafkaException("Exception occurred while publishing message for topicIdPartition: " + topicIdPartition, e);
|
||||
serde.serialize(remoteLogMetadata)), callback);
|
||||
} catch (Exception ex) {
|
||||
future.completeExceptionally(ex);
|
||||
}
|
||||
|
||||
if (callback.exception() == null) {
|
||||
return callback.recordMetadata();
|
||||
} else {
|
||||
Exception ex = callback.exception();
|
||||
if (ex instanceof KafkaException) {
|
||||
throw (KafkaException) ex;
|
||||
} else {
|
||||
throw new KafkaException(ex);
|
||||
}
|
||||
}
|
||||
return future;
|
||||
}
|
||||
|
||||
public void close() {
|
||||
|
|
@ -90,24 +99,4 @@ public class ProducerManager implements Closeable {
|
|||
log.error("Error encountered while closing the producer", e);
|
||||
}
|
||||
}
|
||||
|
||||
private static class ProducerCallback implements Callback {
|
||||
private volatile RecordMetadata recordMetadata;
|
||||
private volatile Exception exception;
|
||||
|
||||
@Override
|
||||
public void onCompletion(RecordMetadata recordMetadata, Exception exception) {
|
||||
this.recordMetadata = recordMetadata;
|
||||
this.exception = exception;
|
||||
}
|
||||
|
||||
public RecordMetadata recordMetadata() {
|
||||
return recordMetadata;
|
||||
}
|
||||
|
||||
public Exception exception() {
|
||||
return exception;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -49,7 +49,9 @@ import java.util.Map;
|
|||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
||||
|
|
@ -86,7 +88,7 @@ public class TopicBasedRemoteLogMetadataManager implements RemoteLogMetadataMana
|
|||
private volatile boolean initializationFailed;
|
||||
|
||||
@Override
|
||||
public void addRemoteLogSegmentMetadata(RemoteLogSegmentMetadata remoteLogSegmentMetadata)
|
||||
public CompletableFuture<Void> addRemoteLogSegmentMetadata(RemoteLogSegmentMetadata remoteLogSegmentMetadata)
|
||||
throws RemoteStorageException {
|
||||
Objects.requireNonNull(remoteLogSegmentMetadata, "remoteLogSegmentMetadata can not be null");
|
||||
|
||||
|
|
@ -105,15 +107,15 @@ public class TopicBasedRemoteLogMetadataManager implements RemoteLogMetadataMana
|
|||
}
|
||||
|
||||
// Publish the message to the topic.
|
||||
doPublishMetadata(remoteLogSegmentMetadata.remoteLogSegmentId().topicIdPartition(),
|
||||
remoteLogSegmentMetadata);
|
||||
return storeRemoteLogMetadata(remoteLogSegmentMetadata.remoteLogSegmentId().topicIdPartition(),
|
||||
remoteLogSegmentMetadata);
|
||||
} finally {
|
||||
lock.readLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void updateRemoteLogSegmentMetadata(RemoteLogSegmentMetadataUpdate segmentMetadataUpdate)
|
||||
public CompletableFuture<Void> updateRemoteLogSegmentMetadata(RemoteLogSegmentMetadataUpdate segmentMetadataUpdate)
|
||||
throws RemoteStorageException {
|
||||
Objects.requireNonNull(segmentMetadataUpdate, "segmentMetadataUpdate can not be null");
|
||||
|
||||
|
|
@ -129,14 +131,14 @@ public class TopicBasedRemoteLogMetadataManager implements RemoteLogMetadataMana
|
|||
}
|
||||
|
||||
// Publish the message to the topic.
|
||||
doPublishMetadata(segmentMetadataUpdate.remoteLogSegmentId().topicIdPartition(), segmentMetadataUpdate);
|
||||
return storeRemoteLogMetadata(segmentMetadataUpdate.remoteLogSegmentId().topicIdPartition(), segmentMetadataUpdate);
|
||||
} finally {
|
||||
lock.readLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void putRemotePartitionDeleteMetadata(RemotePartitionDeleteMetadata remotePartitionDeleteMetadata)
|
||||
public CompletableFuture<Void> putRemotePartitionDeleteMetadata(RemotePartitionDeleteMetadata remotePartitionDeleteMetadata)
|
||||
throws RemoteStorageException {
|
||||
Objects.requireNonNull(remotePartitionDeleteMetadata, "remotePartitionDeleteMetadata can not be null");
|
||||
|
||||
|
|
@ -144,22 +146,39 @@ public class TopicBasedRemoteLogMetadataManager implements RemoteLogMetadataMana
|
|||
try {
|
||||
ensureInitializedAndNotClosed();
|
||||
|
||||
doPublishMetadata(remotePartitionDeleteMetadata.topicIdPartition(), remotePartitionDeleteMetadata);
|
||||
return storeRemoteLogMetadata(remotePartitionDeleteMetadata.topicIdPartition(), remotePartitionDeleteMetadata);
|
||||
} finally {
|
||||
lock.readLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
private void doPublishMetadata(TopicIdPartition topicIdPartition, RemoteLogMetadata remoteLogMetadata)
|
||||
/**
|
||||
* Returns {@link CompletableFuture} which will complete only after publishing of the given {@code remoteLogMetadata} into
|
||||
* the remote log metadata topic and the internal consumer is caught up until the produced record's offset.
|
||||
*
|
||||
* @param topicIdPartition partition of the given remoteLogMetadata.
|
||||
* @param remoteLogMetadata RemoteLogMetadata to be stored.
|
||||
* @return
|
||||
* @throws RemoteStorageException if there are any storage errors occur.
|
||||
*/
|
||||
private CompletableFuture<Void> storeRemoteLogMetadata(TopicIdPartition topicIdPartition,
|
||||
RemoteLogMetadata remoteLogMetadata)
|
||||
throws RemoteStorageException {
|
||||
log.debug("Publishing metadata for partition: [{}] with context: [{}]", topicIdPartition, remoteLogMetadata);
|
||||
log.debug("Storing metadata for partition: [{}] with context: [{}]", topicIdPartition, remoteLogMetadata);
|
||||
|
||||
try {
|
||||
// Publish the message to the topic.
|
||||
RecordMetadata recordMetadata = producerManager.publishMessage(remoteLogMetadata);
|
||||
// Wait until the consumer catches up with this offset. This will ensure read-after-write consistency
|
||||
// semantics.
|
||||
consumerManager.waitTillConsumptionCatchesUp(recordMetadata);
|
||||
// Publish the message to the metadata topic.
|
||||
CompletableFuture<RecordMetadata> produceFuture = producerManager.publishMessage(remoteLogMetadata);
|
||||
|
||||
// Create and return a `CompletableFuture` instance which completes when the consumer is caught up with the produced record's offset.
|
||||
return produceFuture.thenApplyAsync(recordMetadata -> {
|
||||
try {
|
||||
consumerManager.waitTillConsumptionCatchesUp(recordMetadata);
|
||||
} catch (TimeoutException e) {
|
||||
throw new KafkaException(e);
|
||||
}
|
||||
return null;
|
||||
});
|
||||
} catch (KafkaException e) {
|
||||
if (e instanceof RetriableException) {
|
||||
throw e;
|
||||
|
|
|
|||
|
|
@ -433,12 +433,22 @@ public class RemoteLogSegmentLifecycleTest {
|
|||
|
||||
@Override
|
||||
public void addRemoteLogSegmentMetadata(RemoteLogSegmentMetadata segmentMetadata) throws RemoteStorageException {
|
||||
topicBasedRlmm().addRemoteLogSegmentMetadata(segmentMetadata);
|
||||
try {
|
||||
// Wait until the segment is added successfully.
|
||||
topicBasedRlmm().addRemoteLogSegmentMetadata(segmentMetadata).get();
|
||||
} catch (Exception e) {
|
||||
throw new RemoteStorageException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void updateRemoteLogSegmentMetadata(RemoteLogSegmentMetadataUpdate segmentMetadataUpdate) throws RemoteStorageException {
|
||||
topicBasedRlmm().updateRemoteLogSegmentMetadata(segmentMetadataUpdate);
|
||||
try {
|
||||
// Wait until the segment is updated successfully.
|
||||
topicBasedRlmm().updateRemoteLogSegmentMetadata(segmentMetadataUpdate).get();
|
||||
} catch (Exception e) {
|
||||
throw new RemoteStorageException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -24,7 +24,7 @@ import org.apache.kafka.common.utils.Time;
|
|||
import org.apache.kafka.common.utils.Utils;
|
||||
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
|
||||
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
|
||||
import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
|
||||
import org.apache.kafka.server.log.remote.storage.RemoteResourceNotFoundException;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
|
|
@ -78,16 +78,16 @@ public class TopicBasedRemoteLogMetadataManagerTest {
|
|||
RemoteLogSegmentMetadata leaderSegmentMetadata = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(newLeaderTopicIdPartition, Uuid.randomUuid()),
|
||||
0, 100, -1L, 0,
|
||||
time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 0L));
|
||||
Assertions.assertThrows(RemoteStorageException.class, () -> topicBasedRlmm().addRemoteLogSegmentMetadata(leaderSegmentMetadata));
|
||||
Assertions.assertThrows(Exception.class, () -> topicBasedRlmm().addRemoteLogSegmentMetadata(leaderSegmentMetadata).get());
|
||||
|
||||
RemoteLogSegmentMetadata followerSegmentMetadata = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(newFollowerTopicIdPartition, Uuid.randomUuid()),
|
||||
0, 100, -1L, 0,
|
||||
time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 0L));
|
||||
Assertions.assertThrows(RemoteStorageException.class, () -> topicBasedRlmm().addRemoteLogSegmentMetadata(followerSegmentMetadata));
|
||||
Assertions.assertThrows(Exception.class, () -> topicBasedRlmm().addRemoteLogSegmentMetadata(followerSegmentMetadata).get());
|
||||
|
||||
// `listRemoteLogSegments` will receive an exception as these topic partitions are not yet registered.
|
||||
Assertions.assertThrows(RemoteStorageException.class, () -> topicBasedRlmm().listRemoteLogSegments(newLeaderTopicIdPartition));
|
||||
Assertions.assertThrows(RemoteStorageException.class, () -> topicBasedRlmm().listRemoteLogSegments(newFollowerTopicIdPartition));
|
||||
Assertions.assertThrows(RemoteResourceNotFoundException.class, () -> topicBasedRlmm().listRemoteLogSegments(newLeaderTopicIdPartition));
|
||||
Assertions.assertThrows(RemoteResourceNotFoundException.class, () -> topicBasedRlmm().listRemoteLogSegments(newFollowerTopicIdPartition));
|
||||
|
||||
topicBasedRlmm().onPartitionLeadershipChanges(Collections.singleton(newLeaderTopicIdPartition),
|
||||
Collections.singleton(newFollowerTopicIdPartition));
|
||||
|
|
|
|||
|
|
@ -29,19 +29,20 @@ import java.util.Iterator;
|
|||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
|
||||
public class TopicBasedRemoteLogMetadataManagerWrapperWithHarness implements RemoteLogMetadataManager {
|
||||
|
||||
private final TopicBasedRemoteLogMetadataManagerHarness remoteLogMetadataManagerHarness = new TopicBasedRemoteLogMetadataManagerHarness();
|
||||
|
||||
@Override
|
||||
public void addRemoteLogSegmentMetadata(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException {
|
||||
remoteLogMetadataManagerHarness.topicBasedRlmm().addRemoteLogSegmentMetadata(remoteLogSegmentMetadata);
|
||||
public CompletableFuture<Void> addRemoteLogSegmentMetadata(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException {
|
||||
return remoteLogMetadataManagerHarness.topicBasedRlmm().addRemoteLogSegmentMetadata(remoteLogSegmentMetadata);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void updateRemoteLogSegmentMetadata(RemoteLogSegmentMetadataUpdate remoteLogSegmentMetadataUpdate) throws RemoteStorageException {
|
||||
remoteLogMetadataManagerHarness.topicBasedRlmm().updateRemoteLogSegmentMetadata(remoteLogSegmentMetadataUpdate);
|
||||
public CompletableFuture<Void> updateRemoteLogSegmentMetadata(RemoteLogSegmentMetadataUpdate remoteLogSegmentMetadataUpdate) throws RemoteStorageException {
|
||||
return remoteLogMetadataManagerHarness.topicBasedRlmm().updateRemoteLogSegmentMetadata(remoteLogSegmentMetadataUpdate);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -58,8 +59,8 @@ public class TopicBasedRemoteLogMetadataManagerWrapperWithHarness implements Rem
|
|||
}
|
||||
|
||||
@Override
|
||||
public void putRemotePartitionDeleteMetadata(RemotePartitionDeleteMetadata remotePartitionDeleteMetadata) throws RemoteStorageException {
|
||||
remoteLogMetadataManagerHarness.topicBasedRlmm().putRemotePartitionDeleteMetadata(remotePartitionDeleteMetadata);
|
||||
public CompletableFuture<Void> putRemotePartitionDeleteMetadata(RemotePartitionDeleteMetadata remotePartitionDeleteMetadata) throws RemoteStorageException {
|
||||
return remoteLogMetadataManagerHarness.topicBasedRlmm().putRemotePartitionDeleteMetadata(remotePartitionDeleteMetadata);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -28,6 +28,7 @@ import java.util.Map;
|
|||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
/**
|
||||
|
|
@ -42,8 +43,13 @@ public class InmemoryRemoteLogMetadataManager implements RemoteLogMetadataManage
|
|||
|
||||
private Map<TopicIdPartition, RemoteLogMetadataCache> idToRemoteLogMetadataCache = new ConcurrentHashMap<>();
|
||||
|
||||
private static final CompletableFuture<Void> COMPLETED_FUTURE = new CompletableFuture<>();
|
||||
static {
|
||||
COMPLETED_FUTURE.complete(null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addRemoteLogSegmentMetadata(RemoteLogSegmentMetadata remoteLogSegmentMetadata)
|
||||
public CompletableFuture<Void> addRemoteLogSegmentMetadata(RemoteLogSegmentMetadata remoteLogSegmentMetadata)
|
||||
throws RemoteStorageException {
|
||||
log.debug("Adding remote log segment : [{}]", remoteLogSegmentMetadata);
|
||||
Objects.requireNonNull(remoteLogSegmentMetadata, "remoteLogSegmentMetadata can not be null");
|
||||
|
|
@ -53,16 +59,20 @@ public class InmemoryRemoteLogMetadataManager implements RemoteLogMetadataManage
|
|||
idToRemoteLogMetadataCache
|
||||
.computeIfAbsent(remoteLogSegmentId.topicIdPartition(), id -> new RemoteLogMetadataCache())
|
||||
.addCopyInProgressSegment(remoteLogSegmentMetadata);
|
||||
|
||||
return COMPLETED_FUTURE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void updateRemoteLogSegmentMetadata(RemoteLogSegmentMetadataUpdate metadataUpdate)
|
||||
public CompletableFuture<Void> updateRemoteLogSegmentMetadata(RemoteLogSegmentMetadataUpdate metadataUpdate)
|
||||
throws RemoteStorageException {
|
||||
log.debug("Updating remote log segment: [{}]", metadataUpdate);
|
||||
Objects.requireNonNull(metadataUpdate, "metadataUpdate can not be null");
|
||||
|
||||
getRemoteLogMetadataCache(metadataUpdate.remoteLogSegmentId().topicIdPartition())
|
||||
.updateRemoteLogSegmentMetadata(metadataUpdate);
|
||||
|
||||
return COMPLETED_FUTURE;
|
||||
}
|
||||
|
||||
private RemoteLogMetadataCache getRemoteLogMetadataCache(TopicIdPartition topicIdPartition)
|
||||
|
|
@ -94,7 +104,7 @@ public class InmemoryRemoteLogMetadataManager implements RemoteLogMetadataManage
|
|||
}
|
||||
|
||||
@Override
|
||||
public void putRemotePartitionDeleteMetadata(RemotePartitionDeleteMetadata remotePartitionDeleteMetadata)
|
||||
public CompletableFuture<Void> putRemotePartitionDeleteMetadata(RemotePartitionDeleteMetadata remotePartitionDeleteMetadata)
|
||||
throws RemoteStorageException {
|
||||
log.debug("Adding delete state with: [{}]", remotePartitionDeleteMetadata);
|
||||
Objects.requireNonNull(remotePartitionDeleteMetadata, "remotePartitionDeleteMetadata can not be null");
|
||||
|
|
@ -115,6 +125,8 @@ public class InmemoryRemoteLogMetadataManager implements RemoteLogMetadataManage
|
|||
idToRemoteLogMetadataCache.remove(topicIdPartition);
|
||||
idToPartitionDeleteMetadata.remove(topicIdPartition);
|
||||
}
|
||||
|
||||
return COMPLETED_FUTURE;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -61,7 +61,8 @@ public class RemoteLogMetadataManagerTest {
|
|||
RemoteLogSegmentId segmentId = new RemoteLogSegmentId(TP0, Uuid.randomUuid());
|
||||
RemoteLogSegmentMetadata segmentMetadata = new RemoteLogSegmentMetadata(segmentId, 101L, 200L, -1L, BROKER_ID_0,
|
||||
time.milliseconds(), SEG_SIZE, segmentLeaderEpochs);
|
||||
remoteLogMetadataManager.addRemoteLogSegmentMetadata(segmentMetadata);
|
||||
// Wait until the segment is added successfully.
|
||||
remoteLogMetadataManager.addRemoteLogSegmentMetadata(segmentMetadata).get();
|
||||
|
||||
// Search should not return the above segment.
|
||||
Assertions.assertFalse(remoteLogMetadataManager.remoteLogSegmentMetadata(TP0, 0, 150).isPresent());
|
||||
|
|
@ -70,7 +71,8 @@ public class RemoteLogMetadataManagerTest {
|
|||
RemoteLogSegmentMetadataUpdate segmentMetadataUpdate = new RemoteLogSegmentMetadataUpdate(segmentId, time.milliseconds(),
|
||||
RemoteLogSegmentState.COPY_SEGMENT_FINISHED,
|
||||
BROKER_ID_1);
|
||||
remoteLogMetadataManager.updateRemoteLogSegmentMetadata(segmentMetadataUpdate);
|
||||
// Wait until the segment is updated successfully.
|
||||
remoteLogMetadataManager.updateRemoteLogSegmentMetadata(segmentMetadataUpdate).get();
|
||||
RemoteLogSegmentMetadata expectedSegmentMetadata = segmentMetadata.createWithUpdates(segmentMetadataUpdate);
|
||||
|
||||
// Search should return the above segment.
|
||||
|
|
@ -103,10 +105,13 @@ public class RemoteLogMetadataManagerTest {
|
|||
RemoteLogSegmentMetadata segmentMetadata = new RemoteLogSegmentMetadata(segmentId, 0L, 100L,
|
||||
-1L, BROKER_ID_0, time.milliseconds(), SEG_SIZE,
|
||||
segmentLeaderEpochs);
|
||||
remoteLogMetadataManager.addRemoteLogSegmentMetadata(segmentMetadata);
|
||||
// Wait until the segment is added successfully.
|
||||
remoteLogMetadataManager.addRemoteLogSegmentMetadata(segmentMetadata).get();
|
||||
|
||||
RemoteLogSegmentMetadataUpdate segmentMetadataUpdate = new RemoteLogSegmentMetadataUpdate(
|
||||
segmentId, time.milliseconds(), RemoteLogSegmentState.COPY_SEGMENT_FINISHED, BROKER_ID_1);
|
||||
remoteLogMetadataManager.updateRemoteLogSegmentMetadata(segmentMetadataUpdate);
|
||||
// Wait until the segment is updated successfully.
|
||||
remoteLogMetadataManager.updateRemoteLogSegmentMetadata(segmentMetadataUpdate).get();
|
||||
|
||||
RemoteLogSegmentMetadata expectedSegMetadata = segmentMetadata.createWithUpdates(segmentMetadataUpdate);
|
||||
|
||||
|
|
@ -114,26 +119,27 @@ public class RemoteLogMetadataManagerTest {
|
|||
Optional<RemoteLogSegmentMetadata> segMetadataForOffset30Epoch1 = remoteLogMetadataManager.remoteLogSegmentMetadata(TP0, 1, 30L);
|
||||
Assertions.assertEquals(Optional.of(expectedSegMetadata), segMetadataForOffset30Epoch1);
|
||||
|
||||
// Mark the partition for deletion.
|
||||
// Mark the partition for deletion and wait for it to be updated successfully.
|
||||
remoteLogMetadataManager.putRemotePartitionDeleteMetadata(
|
||||
createRemotePartitionDeleteMetadata(RemotePartitionDeleteState.DELETE_PARTITION_MARKED));
|
||||
createRemotePartitionDeleteMetadata(RemotePartitionDeleteState.DELETE_PARTITION_MARKED)).get();
|
||||
|
||||
Optional<RemoteLogSegmentMetadata> segmentMetadataAfterDelMark = remoteLogMetadataManager.remoteLogSegmentMetadata(TP0,
|
||||
1, 30L);
|
||||
Assertions.assertEquals(Optional.of(expectedSegMetadata), segmentMetadataAfterDelMark);
|
||||
|
||||
// Set the partition deletion state as started. Partition and segments should still be accessible as they are not
|
||||
// yet deleted.
|
||||
// yet deleted. Wait until the segment state is updated successfully.
|
||||
remoteLogMetadataManager.putRemotePartitionDeleteMetadata(
|
||||
createRemotePartitionDeleteMetadata(RemotePartitionDeleteState.DELETE_PARTITION_STARTED));
|
||||
createRemotePartitionDeleteMetadata(RemotePartitionDeleteState.DELETE_PARTITION_STARTED)).get();
|
||||
|
||||
Optional<RemoteLogSegmentMetadata> segmentMetadataAfterDelStart = remoteLogMetadataManager.remoteLogSegmentMetadata(TP0,
|
||||
1, 30L);
|
||||
Assertions.assertEquals(Optional.of(expectedSegMetadata), segmentMetadataAfterDelStart);
|
||||
|
||||
// Set the partition deletion state as finished. RLMM should clear all its internal state for that partition.
|
||||
// Wait until the segment state is updated successfully.
|
||||
remoteLogMetadataManager.putRemotePartitionDeleteMetadata(
|
||||
createRemotePartitionDeleteMetadata(RemotePartitionDeleteState.DELETE_PARTITION_FINISHED));
|
||||
createRemotePartitionDeleteMetadata(RemotePartitionDeleteState.DELETE_PARTITION_FINISHED)).get();
|
||||
|
||||
Assertions.assertThrows(RemoteResourceNotFoundException.class,
|
||||
() -> remoteLogMetadataManager.remoteLogSegmentMetadata(TP0, 1, 30L));
|
||||
|
|
|
|||
Loading…
Reference in New Issue