KAFKA-15130: Delete remote segments when deleting a topic (#13947)

* Delete remote segments when deleting a topic

Co-authored-by: Kamal Chandraprakash <kchandraprakash@uber.com>
Co-authored-by: d00791190 <dinglan6@huawei.com>
This commit is contained in:
DL1231 2023-08-18 20:51:09 +08:00 committed by GitHub
parent 3f4816dd3e
commit 4f88fb28f3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 468 additions and 97 deletions

View File

@ -84,6 +84,7 @@ import java.nio.file.Path;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@ -95,6 +96,7 @@ import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
@ -105,6 +107,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
@ -144,7 +147,7 @@ public class RemoteLogManager implements Closeable {
private final ConcurrentHashMap<TopicIdPartition, RLMTaskWithFuture> leaderOrFollowerTasks = new ConcurrentHashMap<>();
// topic ids that are received on leadership changes, this map is cleared on stop partitions
private final ConcurrentMap<TopicPartition, Uuid> topicPartitionIds = new ConcurrentHashMap<>();
private final ConcurrentMap<TopicPartition, Uuid> topicIdByPartitionMap = new ConcurrentHashMap<>();
private final String clusterId;
// The endpoint for remote log metadata manager to connect to
@ -288,7 +291,7 @@ public class RemoteLogManager implements Closeable {
}
private void cacheTopicPartitionIds(TopicIdPartition topicIdPartition) {
Uuid previousTopicId = topicPartitionIds.put(topicIdPartition.topicPartition(), topicIdPartition.topicId());
Uuid previousTopicId = topicIdByPartitionMap.put(topicIdPartition.topicPartition(), topicIdPartition.topicId());
if (previousTopicId != null && previousTopicId != topicIdPartition.topicId()) {
LOGGER.info("Previous cached topic id {} for {} does not match updated topic id {}",
previousTopicId, topicIdPartition.topicPartition(), topicIdPartition.topicId());
@ -343,21 +346,81 @@ public class RemoteLogManager implements Closeable {
/**
* Deletes the internal topic partition info if delete flag is set as true.
*
* @param topicPartition topic partition to be stopped.
* @param topicPartitions topic partitions that needs to be stopped.
* @param delete flag to indicate whether the given topic partitions to be deleted or not.
* @param errorHandler callback to handle any errors while stopping the partitions.
*/
public void stopPartitions(TopicPartition topicPartition, boolean delete) {
public void stopPartitions(Set<TopicPartition> topicPartitions,
boolean delete,
BiConsumer<TopicPartition, Throwable> errorHandler) {
LOGGER.debug("Stopping {} partitions, delete: {}", topicPartitions.size(), delete);
Set<TopicIdPartition> topicIdPartitions = topicPartitions.stream()
.filter(topicIdByPartitionMap::containsKey)
.map(tp -> new TopicIdPartition(topicIdByPartitionMap.get(tp), tp))
.collect(Collectors.toSet());
topicIdPartitions.forEach(tpId -> {
try {
RLMTaskWithFuture task = leaderOrFollowerTasks.remove(tpId);
if (task != null) {
LOGGER.info("Cancelling the RLM task for tpId: {}", tpId);
task.cancel();
}
if (delete) {
LOGGER.info("Deleting the remote log segments task for partition: {}", tpId);
deleteRemoteLogPartition(tpId);
}
} catch (Exception ex) {
errorHandler.accept(tpId.topicPartition(), ex);
LOGGER.error("Error while stopping the partition: {}, delete: {}", tpId.topicPartition(), delete, ex);
}
});
remoteLogMetadataManager.onStopPartitions(topicIdPartitions);
if (delete) {
// Delete from internal datastructures only if it is to be deleted.
Uuid topicIdPartition = topicPartitionIds.remove(topicPartition);
LOGGER.debug("Removed partition: {} from topicPartitionIds", topicIdPartition);
// NOTE: this#stopPartitions method is called when Replica state changes to Offline and ReplicaDeletionStarted
topicPartitions.forEach(topicIdByPartitionMap::remove);
}
}
private void deleteRemoteLogPartition(TopicIdPartition partition) throws RemoteStorageException, ExecutionException, InterruptedException {
List<RemoteLogSegmentMetadata> metadataList = new ArrayList<>();
remoteLogMetadataManager.listRemoteLogSegments(partition).forEachRemaining(metadataList::add);
List<RemoteLogSegmentMetadataUpdate> deleteSegmentStartedEvents = metadataList.stream()
.map(metadata ->
new RemoteLogSegmentMetadataUpdate(metadata.remoteLogSegmentId(), time.milliseconds(),
metadata.customMetadata(), RemoteLogSegmentState.DELETE_SEGMENT_STARTED, brokerId))
.collect(Collectors.toList());
publishEvents(deleteSegmentStartedEvents).get();
// KAFKA-15313: Delete remote log segments partition asynchronously when a partition is deleted.
Collection<Uuid> deletedSegmentIds = new ArrayList<>();
for (RemoteLogSegmentMetadata metadata: metadataList) {
deletedSegmentIds.add(metadata.remoteLogSegmentId().id());
remoteLogStorageManager.deleteLogSegmentData(metadata);
}
indexCache.removeAll(deletedSegmentIds);
List<RemoteLogSegmentMetadataUpdate> deleteSegmentFinishedEvents = metadataList.stream()
.map(metadata ->
new RemoteLogSegmentMetadataUpdate(metadata.remoteLogSegmentId(), time.milliseconds(),
metadata.customMetadata(), RemoteLogSegmentState.DELETE_SEGMENT_FINISHED, brokerId))
.collect(Collectors.toList());
publishEvents(deleteSegmentFinishedEvents).get();
}
private CompletableFuture<Void> publishEvents(List<RemoteLogSegmentMetadataUpdate> events) throws RemoteStorageException {
List<CompletableFuture<Void>> result = new ArrayList<>();
for (RemoteLogSegmentMetadataUpdate event : events) {
result.add(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(event));
}
return CompletableFuture.allOf(result.toArray(new CompletableFuture[0]));
}
public Optional<RemoteLogSegmentMetadata> fetchRemoteLogSegmentMetadata(TopicPartition topicPartition,
int epochForOffset,
long offset) throws RemoteStorageException {
Uuid topicId = topicPartitionIds.get(topicPartition);
Uuid topicId = topicIdByPartitionMap.get(topicPartition);
if (topicId == null) {
throw new KafkaException("No topic id registered for topic partition: " + topicPartition);
@ -418,7 +481,7 @@ public class RemoteLogManager implements Closeable {
long timestamp,
long startingOffset,
LeaderEpochFileCache leaderEpochCache) throws RemoteStorageException, IOException {
Uuid topicId = topicPartitionIds.get(tp);
Uuid topicId = topicIdByPartitionMap.get(tp);
if (topicId == null) {
throw new KafkaException("Topic id does not exist for topic partition: " + tp);
}

View File

@ -89,6 +89,8 @@ case class LogDeleteRecordsResult(requestedOffset: Long, lowWatermark: Long, exc
}
}
case class StopPartition(topicPartition: TopicPartition, deleteLocalLog: Boolean, deleteRemoteLog: Boolean = false)
/**
* Result metadata of a log read operation on the log
* @param info @FetchDataInfo returned by the @Log read
@ -456,7 +458,7 @@ class ReplicaManager(val config: KafkaConfig,
} else {
this.controllerEpoch = controllerEpoch
val stoppedPartitions = mutable.Map.empty[TopicPartition, Boolean]
val stoppedPartitions = mutable.Buffer.empty[StopPartition]
partitionStates.forKeyValue { (topicPartition, partitionState) =>
val deletePartition = partitionState.deletePartition()
@ -478,7 +480,8 @@ class ReplicaManager(val config: KafkaConfig,
if (requestLeaderEpoch == LeaderAndIsr.EpochDuringDelete ||
requestLeaderEpoch == LeaderAndIsr.NoEpoch ||
requestLeaderEpoch >= currentLeaderEpoch) {
stoppedPartitions += topicPartition -> deletePartition
stoppedPartitions += StopPartition(topicPartition, deletePartition,
deletePartition && partition.isLeader && requestLeaderEpoch == LeaderAndIsr.EpochDuringDelete)
// Assume that everything will go right. It is overwritten in case of an error.
responseMap.put(topicPartition, Errors.NONE)
} else if (requestLeaderEpoch < currentLeaderEpoch) {
@ -499,12 +502,12 @@ class ReplicaManager(val config: KafkaConfig,
case HostedPartition.None =>
// Delete log and corresponding folders in case replica manager doesn't hold them anymore.
// This could happen when topic is being deleted while broker is down and recovers.
stoppedPartitions += topicPartition -> deletePartition
stoppedPartitions += StopPartition(topicPartition, deletePartition)
responseMap.put(topicPartition, Errors.NONE)
}
}
stopPartitions(stoppedPartitions).foreach { case (topicPartition, e) =>
stopPartitions(stoppedPartitions.toSet).foreach { case (topicPartition, e) =>
if (e.isInstanceOf[KafkaStorageException]) {
stateChangeLogger.error(s"Ignoring StopReplica request (delete=true) from " +
s"controller $controllerId with correlation id $correlationId " +
@ -526,25 +529,39 @@ class ReplicaManager(val config: KafkaConfig,
/**
* Stop the given partitions.
*
* @param partitionsToStop A map from a topic partition to a boolean indicating
* whether the partition should be deleted.
*
* @return A map from partitions to exceptions which occurred.
* If no errors occurred, the map will be empty.
* @param partitionsToStop A map from a topic partition to a boolean indicating
* whether the partition should be deleted.
* @return A map from partitions to exceptions which occurred.
* If no errors occurred, the map will be empty.
*/
protected def stopPartitions(
partitionsToStop: Map[TopicPartition, Boolean]
): Map[TopicPartition, Throwable] = {
private def stopPartitions(partitionsToStop: Map[TopicPartition, Boolean]): Map[TopicPartition, Throwable] = {
stopPartitions(partitionsToStop.map {
case (topicPartition, deleteLocalLog) => StopPartition(topicPartition, deleteLocalLog)
}.toSet)
}
/**
* Stop the given partitions.
*
* @param partitionsToStop set of topic-partitions to be stopped which also indicates whether to remove the
* partition data from the local and remote log storage.
*
* @return A map from partitions to exceptions which occurred.
* If no errors occurred, the map will be empty.
*/
private def stopPartitions(partitionsToStop: Set[StopPartition]): Map[TopicPartition, Throwable] = {
// First stop fetchers for all partitions.
val partitions = partitionsToStop.keySet
val partitions = partitionsToStop.map(_.topicPartition)
replicaFetcherManager.removeFetcherForPartitions(partitions)
replicaAlterLogDirsManager.removeFetcherForPartitions(partitions)
// Second remove deleted partitions from the partition map. Fetchers rely on the
// ReplicaManager to get Partition's information so they must be stopped first.
val partitionsToDelete = mutable.Set.empty[TopicPartition]
partitionsToStop.forKeyValue { (topicPartition, shouldDelete) =>
if (shouldDelete) {
val remotePartitionsToDelete = mutable.Set.empty[TopicPartition]
partitionsToStop.foreach { stopPartition =>
val topicPartition = stopPartition.topicPartition
if (stopPartition.deleteLocalLog) {
getPartition(topicPartition) match {
case hostedPartition: HostedPartition.Online =>
if (allPartitions.remove(topicPartition, hostedPartition)) {
@ -558,6 +575,9 @@ class ReplicaManager(val config: KafkaConfig,
}
partitionsToDelete += topicPartition
}
if (stopPartition.deleteRemoteLog)
remotePartitionsToDelete += topicPartition
// If we were the leader, we may have some operations still waiting for completion.
// We force completion to prevent them from timing out.
completeDelayedFetchOrProduceRequests(topicPartition)
@ -569,6 +589,17 @@ class ReplicaManager(val config: KafkaConfig,
// Delete the logs and checkpoint.
logManager.asyncDelete(partitionsToDelete, (tp, e) => errorMap.put(tp, e))
}
remoteLogManager.foreach { rlm =>
// exclude the partitions with offline/error state
errorMap.keySet.foreach(remotePartitionsToDelete.remove)
if (remotePartitionsToDelete.nonEmpty) {
rlm.stopPartitions(remotePartitionsToDelete.asJava, true, (tp, e) => errorMap.put(tp, e))
}
val remotePartitionsToNotDelete = partitions.diff(remotePartitionsToDelete)
if (remotePartitionsToNotDelete.nonEmpty) {
rlm.stopPartitions(remotePartitionsToNotDelete.asJava, false, (tp, e) => errorMap.put(tp, e))
}
}
errorMap
}
@ -2418,7 +2449,15 @@ class ReplicaManager(val config: KafkaConfig,
// Handle deleted partitions. We need to do this first because we might subsequently
// create new partitions with the same names as the ones we are deleting here.
if (!localChanges.deletes.isEmpty) {
val deletes = localChanges.deletes.asScala.map(tp => (tp, true)).toMap
val deletes = localChanges.deletes.asScala
.map { tp =>
val isCurrentLeader = Option(delta.image().getTopic(tp.topic()))
.map(image => image.partitions().get(tp.partition()))
.exists(partition => partition.leader == config.nodeId)
val deleteRemoteLog = delta.topicWasDeleted(tp.topic()) && isCurrentLeader
StopPartition(tp, deleteLocalLog = true, deleteRemoteLog = deleteRemoteLog)
}
.toSet
stateChangeLogger.info(s"Deleting ${deletes.size} partition(s).")
stopPartitions(deletes).forKeyValue { (topicPartition, e) =>
if (e.isInstanceOf[KafkaStorageException]) {

View File

@ -81,6 +81,7 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@ -91,6 +92,8 @@ import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX;
import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_CONSUMER_PREFIX;
@ -105,6 +108,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
@ -824,12 +828,12 @@ public class RemoteLogManagerTest {
verifyInCache(followerTopicIdPartition, leaderTopicIdPartition);
// Evicts from topicId cache
remoteLogManager.stopPartitions(leaderTopicIdPartition.topicPartition(), true);
remoteLogManager.stopPartitions(Collections.singleton(leaderTopicIdPartition.topicPartition()), true, (tp, ex) -> { });
verifyNotInCache(leaderTopicIdPartition);
verifyInCache(followerTopicIdPartition);
// Evicts from topicId cache
remoteLogManager.stopPartitions(followerTopicIdPartition.topicPartition(), true);
remoteLogManager.stopPartitions(Collections.singleton(followerTopicIdPartition.topicPartition()), true, (tp, ex) -> { });
verifyNotInCache(leaderTopicIdPartition, followerTopicIdPartition);
}
@ -1053,6 +1057,73 @@ public class RemoteLogManagerTest {
assertEquals(expected, actual);
}
@Test
public void testStopPartitionsWithoutDeletion() throws RemoteStorageException {
BiConsumer<TopicPartition, Throwable> errorHandler = (topicPartition, throwable) -> fail("shouldn't be called");
Set<TopicPartition> partitions = new HashSet<>();
partitions.add(leaderTopicIdPartition.topicPartition());
partitions.add(followerTopicIdPartition.topicPartition());
remoteLogManager.onLeadershipChange(Collections.singleton(mockPartition(leaderTopicIdPartition)),
Collections.singleton(mockPartition(followerTopicIdPartition)), topicIds);
remoteLogManager.stopPartitions(partitions, false, errorHandler);
verify(remoteLogMetadataManager, times(1)).onStopPartitions(any());
verify(remoteStorageManager, times(0)).deleteLogSegmentData(any());
verify(remoteLogMetadataManager, times(0)).updateRemoteLogSegmentMetadata(any());
}
@Test
public void testStopPartitionsWithDeletion() throws RemoteStorageException {
BiConsumer<TopicPartition, Throwable> errorHandler =
(topicPartition, ex) -> fail("shouldn't be called: " + ex);
Set<TopicPartition> partitions = new HashSet<>();
partitions.add(leaderTopicIdPartition.topicPartition());
partitions.add(followerTopicIdPartition.topicPartition());
remoteLogManager.onLeadershipChange(Collections.singleton(mockPartition(leaderTopicIdPartition)),
Collections.singleton(mockPartition(followerTopicIdPartition)), topicIds);
when(remoteLogMetadataManager.listRemoteLogSegments(eq(leaderTopicIdPartition)))
.thenReturn(listRemoteLogSegmentMetadata(leaderTopicIdPartition, 5, 100, 1024).iterator());
when(remoteLogMetadataManager.listRemoteLogSegments(eq(followerTopicIdPartition)))
.thenReturn(listRemoteLogSegmentMetadata(followerTopicIdPartition, 3, 100, 1024).iterator());
CompletableFuture<Void> dummyFuture = new CompletableFuture<>();
dummyFuture.complete(null);
when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any()))
.thenReturn(dummyFuture);
remoteLogManager.stopPartitions(partitions, true, errorHandler);
verify(remoteLogMetadataManager, times(1)).onStopPartitions(any());
verify(remoteStorageManager, times(8)).deleteLogSegmentData(any());
verify(remoteLogMetadataManager, times(16)).updateRemoteLogSegmentMetadata(any());
}
private List<RemoteLogSegmentMetadata> listRemoteLogSegmentMetadata(TopicIdPartition topicIdPartition,
int segmentCount,
int recordsPerSegment,
int segmentSize) {
List<RemoteLogSegmentMetadata> segmentMetadataList = new ArrayList<>();
for (int idx = 0; idx < segmentCount; idx++) {
long timestamp = time.milliseconds();
long startOffset = (long) idx * recordsPerSegment;
long endOffset = startOffset + recordsPerSegment - 1;
Map<Integer, Long> segmentLeaderEpochs = truncateAndGetLeaderEpochs(totalEpochEntries, startOffset, endOffset);
segmentMetadataList.add(new RemoteLogSegmentMetadata(new RemoteLogSegmentId(topicIdPartition,
Uuid.randomUuid()), startOffset, endOffset, timestamp, brokerId, timestamp, segmentSize,
segmentLeaderEpochs));
}
return segmentMetadataList;
}
private Map<Integer, Long> truncateAndGetLeaderEpochs(List<EpochEntry> entries,
Long startOffset,
Long endOffset) {
InMemoryLeaderEpochCheckpoint myCheckpoint = new InMemoryLeaderEpochCheckpoint();
myCheckpoint.write(entries);
LeaderEpochFileCache cache = new LeaderEpochFileCache(null, myCheckpoint);
cache.truncateFromStart(startOffset);
cache.truncateFromEnd(endOffset);
return myCheckpoint.read().stream().collect(Collectors.toMap(e -> e.epoch, e -> e.startOffset));
}
private Partition mockPartition(TopicIdPartition topicIdPartition) {
TopicPartition tp = topicIdPartition.topicPartition();
Partition partition = mock(Partition.class);

View File

@ -18,12 +18,14 @@ package kafka.admin
import kafka.api.IntegrationTestHarness
import kafka.server.KafkaConfig
import kafka.utils.{TestInfoUtils, TestUtils}
import kafka.utils.{Logging, TestInfoUtils, TestUtils}
import org.apache.kafka.clients.admin.{AlterConfigOp, ConfigEntry}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
import org.apache.kafka.common.errors.InvalidConfigurationException
import org.apache.kafka.server.log.remote.storage.{NoOpRemoteLogMetadataManager, NoOpRemoteStorageManager, RemoteLogManagerConfig}
import org.apache.kafka.common.errors.{InvalidConfigurationException, UnknownTopicOrPartitionException}
import org.apache.kafka.common.utils.MockTime
import org.apache.kafka.server.log.remote.storage.{NoOpRemoteLogMetadataManager, NoOpRemoteStorageManager,
RemoteLogManagerConfig, RemoteLogSegmentId, RemoteLogSegmentMetadata, RemoteLogSegmentState}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.function.Executable
import org.junit.jupiter.api.{BeforeEach, Tag, TestInfo}
@ -31,7 +33,8 @@ import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import java.util
import java.util.{Collections, Properties}
import java.util.concurrent.atomic.AtomicInteger
import java.util.{Collections, Optional, Properties}
import scala.collection.Seq
import scala.concurrent.ExecutionException
import scala.util.Random
@ -41,8 +44,11 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
val numPartitions = 2
val numReplicationFactor = 2
var testTopicName: String = _
var sysRemoteStorageEnabled = true
var storageManagerClassName: String = classOf[NoOpRemoteStorageManager].getName
var metadataManagerClassName: String = classOf[NoOpRemoteLogMetadataManager].getName
override protected def brokerCount: Int = 2
@ -59,6 +65,10 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
if (info.getTestMethod.get().getName.endsWith("SystemRemoteStorageIsDisabled")) {
sysRemoteStorageEnabled = false
}
if (info.getTestMethod.get().getName.equals("testTopicDeletion")) {
storageManagerClassName = classOf[MyRemoteStorageManager].getName
metadataManagerClassName = classOf[MyRemoteLogMetadataManager].getName
}
super.setUp(info)
testTopicName = s"${info.getTestMethod.get().getName}-${Random.alphanumeric.take(10).mkString}"
}
@ -270,6 +280,27 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
() => admin.incrementalAlterConfigs(configs).all().get(), "Invalid local retention size")
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testTopicDeletion(quorum: String): Unit = {
val numPartitions = 2
val topicConfig = new Properties()
topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
topicConfig.put(TopicConfig.RETENTION_MS_CONFIG, "200")
topicConfig.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, "100")
TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, numPartitions, brokerCount,
topicConfig = topicConfig)
TestUtils.deleteTopicWithAdmin(createAdminClient(), testTopicName, brokers)
assertThrowsException(classOf[UnknownTopicOrPartitionException],
() => TestUtils.describeTopic(createAdminClient(), testTopicName), "Topic should be deleted")
// FIXME: It seems the storage manager is being instantiated in different class loader so couldn't verify the value
// but ensured it by adding a log statement in the storage manager (manually).
// assertEquals(numPartitions * MyRemoteLogMetadataManager.segmentCount,
// MyRemoteStorageManager.deleteSegmentEventCounter.get(),
// "Remote log segments should be deleted only once by the leader")
}
private def assertThrowsException(exceptionType: Class[_ <: Throwable],
executable: Executable,
message: String = ""): Throwable = {
@ -320,11 +351,8 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
private def overrideProps(): Properties = {
val props = new Properties()
props.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, sysRemoteStorageEnabled.toString)
props.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP,
classOf[NoOpRemoteStorageManager].getName)
props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP,
classOf[NoOpRemoteLogMetadataManager].getName)
props.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP, storageManagerClassName)
props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP, metadataManagerClassName)
props.put(KafkaConfig.LogRetentionTimeMillisProp, "2000")
props.put(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_MS_PROP, "1000")
props.put(KafkaConfig.LogRetentionBytesProp, "2048")
@ -332,3 +360,42 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
props
}
}
object MyRemoteStorageManager {
val deleteSegmentEventCounter = new AtomicInteger(0)
}
class MyRemoteStorageManager extends NoOpRemoteStorageManager with Logging {
import MyRemoteStorageManager._
override def deleteLogSegmentData(remoteLogSegmentMetadata: RemoteLogSegmentMetadata): Unit = {
deleteSegmentEventCounter.incrementAndGet()
info(s"Deleted the remote log segment: $remoteLogSegmentMetadata, counter: ${deleteSegmentEventCounter.get()}")
}
}
class MyRemoteLogMetadataManager extends NoOpRemoteLogMetadataManager {
import MyRemoteLogMetadataManager._
val time = new MockTime()
override def listRemoteLogSegments(topicIdPartition: TopicIdPartition): util.Iterator[RemoteLogSegmentMetadata] = {
val segmentMetadataList = new util.ArrayList[RemoteLogSegmentMetadata]()
for (idx <- 0 until segmentCount) {
val timestamp = time.milliseconds()
val startOffset = idx * recordsPerSegment
val endOffset = startOffset + recordsPerSegment - 1
val segmentLeaderEpochs: util.Map[Integer, java.lang.Long] = Collections.singletonMap(0, 0L)
segmentMetadataList.add(new RemoteLogSegmentMetadata(new RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid()),
startOffset, endOffset, timestamp, 0, timestamp, segmentSize, Optional.empty(),
RemoteLogSegmentState.COPY_SEGMENT_FINISHED, segmentLeaderEpochs))
}
segmentMetadataList.iterator()
}
}
object MyRemoteLogMetadataManager {
val segmentCount = 10
val recordsPerSegment = 100
val segmentSize = 1024
}

View File

@ -34,6 +34,7 @@ import org.slf4j.{Logger, LoggerFactory}
import java.io.{File, FileInputStream}
import java.nio.file.Files
import java.util
import java.util.Collections
import java.util.concurrent.{CountDownLatch, Executors, TimeUnit}
import scala.collection.mutable
@ -443,8 +444,59 @@ class RemoteIndexCacheTest {
verifyNoMoreInteractions(rsm)
}
private def generateSpyCacheEntry(): RemoteIndexCache.Entry = {
val remoteLogSegmentId = RemoteLogSegmentId.generateNew(idPartition)
@Test
def testRemoveItem(): Unit = {
val segmentId = rlsMetadata.remoteLogSegmentId()
val segmentUuid = segmentId.id()
// generate and add entry to cache
val spyEntry = generateSpyCacheEntry(segmentId)
cache.internalCache.put(segmentUuid, spyEntry)
assertTrue(cache.internalCache().asMap().containsKey(segmentUuid))
assertFalse(spyEntry.isMarkedForCleanup)
cache.remove(segmentId.id())
assertFalse(cache.internalCache().asMap().containsKey(segmentUuid))
TestUtils.waitUntilTrue(() => spyEntry.isMarkedForCleanup, "Failed to mark cache entry for cleanup after invalidation")
}
@Test
def testRemoveNonExistentItem(): Unit = {
// generate and add entry to cache
val segmentId = rlsMetadata.remoteLogSegmentId()
val segmentUuid = segmentId.id()
// generate and add entry to cache
val spyEntry = generateSpyCacheEntry(segmentId)
cache.internalCache.put(segmentUuid, spyEntry)
assertTrue(cache.internalCache().asMap().containsKey(segmentUuid))
// remove a random Uuid
cache.remove(Uuid.randomUuid())
assertTrue(cache.internalCache().asMap().containsKey(segmentUuid))
assertFalse(spyEntry.isMarkedForCleanup)
}
@Test
def testRemoveMultipleItems(): Unit = {
// generate and add entry to cache
val uuidAndEntryList = new util.HashMap[Uuid, RemoteIndexCache.Entry]()
for (_ <- 0 until 10) {
val segmentId = RemoteLogSegmentId.generateNew(idPartition)
val segmentUuid = segmentId.id()
val spyEntry = generateSpyCacheEntry(segmentId)
uuidAndEntryList.put(segmentUuid, spyEntry)
cache.internalCache.put(segmentUuid, spyEntry)
assertTrue(cache.internalCache().asMap().containsKey(segmentUuid))
assertFalse(spyEntry.isMarkedForCleanup)
}
cache.removeAll(uuidAndEntryList.keySet())
uuidAndEntryList.values().forEach { entry =>
TestUtils.waitUntilTrue(() => entry.isMarkedForCleanup, "Failed to mark cache entry for cleanup after invalidation")
}
}
private def generateSpyCacheEntry(remoteLogSegmentId: RemoteLogSegmentId
= RemoteLogSegmentId.generateNew(idPartition)): RemoteIndexCache.Entry = {
val rlsMetadata = new RemoteLogSegmentMetadata(remoteLogSegmentId, baseOffset, lastOffset,
time.milliseconds(), brokerId, time.milliseconds(), segmentSize, Collections.singletonMap(0, 0L))
val timeIndex = spy(createTimeIndexForSegmentMetadata(rlsMetadata))

View File

@ -3283,10 +3283,12 @@ class ReplicaManagerTest {
(rm0, rm1)
}
@Test
def testStopReplicaWithStaleControllerEpoch(): Unit = {
@ParameterizedTest
@ValueSource(booleans = Array(true, false))
def testStopReplicaWithStaleControllerEpoch(enableRemoteStorage: Boolean): Unit = {
val mockTimer = new MockTimer(time)
val replicaManager = setupReplicaManagerWithMockedPurgatories(mockTimer, aliveBrokerIds = Seq(0, 1))
val replicaManager = setupReplicaManagerWithMockedPurgatories(mockTimer, aliveBrokerIds = Seq(0, 1),
enableRemoteStorage = enableRemoteStorage)
try {
val tp0 = new TopicPartition(topic, 0)
@ -3309,15 +3311,20 @@ class ReplicaManagerTest {
val (_, error) = replicaManager.stopReplicas(1, 0, 0, partitionStates)
assertEquals(Errors.STALE_CONTROLLER_EPOCH, error)
if (enableRemoteStorage) {
verify(mockRemoteLogManager, times(0)).stopPartitions(any(), any(), any())
}
} finally {
replicaManager.shutdown(checkpointHW = false)
}
}
@Test
def testStopReplicaWithOfflinePartition(): Unit = {
@ParameterizedTest
@ValueSource(booleans = Array(true, false))
def testStopReplicaWithOfflinePartition(enableRemoteStorage: Boolean): Unit = {
val mockTimer = new MockTimer(time)
val replicaManager = setupReplicaManagerWithMockedPurgatories(mockTimer, aliveBrokerIds = Seq(0, 1))
val replicaManager = setupReplicaManagerWithMockedPurgatories(mockTimer, aliveBrokerIds = Seq(0, 1),
enableRemoteStorage = enableRemoteStorage)
try {
val tp0 = new TopicPartition(topic, 0)
@ -3342,29 +3349,38 @@ class ReplicaManagerTest {
val (result, error) = replicaManager.stopReplicas(1, 0, 0, partitionStates)
assertEquals(Errors.NONE, error)
assertEquals(Map(tp0 -> Errors.KAFKA_STORAGE_ERROR), result)
if (enableRemoteStorage) {
verify(mockRemoteLogManager, times(0)).stopPartitions(any(), any(), any())
}
} finally {
replicaManager.shutdown(checkpointHW = false)
}
}
@Test
def testStopReplicaWithInexistentPartition(): Unit = {
testStopReplicaWithInexistentPartition(false, false)
@ParameterizedTest
@ValueSource(booleans = Array(true, false))
def testStopReplicaWithInexistentPartition(enableRemoteStorage: Boolean): Unit = {
testStopReplicaWithInexistentPartition(false, false, enableRemoteStorage)
}
@Test
def testStopReplicaWithInexistentPartitionAndPartitionsDelete(): Unit = {
testStopReplicaWithInexistentPartition(true, false)
@ParameterizedTest
@ValueSource(booleans = Array(true, false))
def testStopReplicaWithInexistentPartitionAndPartitionsDelete(enableRemoteStorage: Boolean): Unit = {
testStopReplicaWithInexistentPartition(true, false, enableRemoteStorage)
}
@Test
def testStopReplicaWithInexistentPartitionAndPartitionsDeleteAndIOException(): Unit = {
testStopReplicaWithInexistentPartition(true, true)
@ParameterizedTest
@ValueSource(booleans = Array(true, false))
def testStopReplicaWithInexistentPartitionAndPartitionsDeleteAndIOException(enableRemoteStorage: Boolean): Unit = {
testStopReplicaWithInexistentPartition(true, true, enableRemoteStorage)
}
private def testStopReplicaWithInexistentPartition(deletePartitions: Boolean, throwIOException: Boolean): Unit = {
private def testStopReplicaWithInexistentPartition(deletePartitions: Boolean,
throwIOException: Boolean,
enableRemoteStorage: Boolean): Unit = {
val mockTimer = new MockTimer(time)
val replicaManager = setupReplicaManagerWithMockedPurgatories(mockTimer, aliveBrokerIds = Seq(0, 1))
val replicaManager = setupReplicaManagerWithMockedPurgatories(mockTimer, aliveBrokerIds = Seq(0, 1),
enableRemoteStorage = enableRemoteStorage)
try {
val tp0 = new TopicPartition(topic, 0)
@ -3396,64 +3412,79 @@ class ReplicaManagerTest {
assertEquals(Map(tp0 -> Errors.NONE), result)
assertTrue(replicaManager.logManager.getLog(tp0).isDefined)
}
if (enableRemoteStorage) {
verify(mockRemoteLogManager, times(1))
.stopPartitions(ArgumentMatchers.eq(Collections.singleton(tp0)), ArgumentMatchers.eq(false), any())
}
} finally {
replicaManager.shutdown(checkpointHW = false)
}
}
@Test
def testStopReplicaWithExistingPartitionAndNewerLeaderEpoch(): Unit = {
testStopReplicaWithExistingPartition(2, false, false, Errors.NONE)
@ParameterizedTest
@ValueSource(booleans = Array(true, false))
def testStopReplicaWithExistingPartitionAndNewerLeaderEpoch(enableRemoteStorage: Boolean): Unit = {
testStopReplicaWithExistingPartition(2, false, false, Errors.NONE, enableRemoteStorage)
}
@Test
def testStopReplicaWithExistingPartitionAndOlderLeaderEpoch(): Unit = {
testStopReplicaWithExistingPartition(0, false, false, Errors.FENCED_LEADER_EPOCH)
@ParameterizedTest
@ValueSource(booleans = Array(true, false))
def testStopReplicaWithExistingPartitionAndOlderLeaderEpoch(enableRemoteStorage: Boolean): Unit = {
testStopReplicaWithExistingPartition(0, false, false, Errors.FENCED_LEADER_EPOCH, enableRemoteStorage)
}
@Test
def testStopReplicaWithExistingPartitionAndEqualLeaderEpoch(): Unit = {
testStopReplicaWithExistingPartition(1, false, false, Errors.NONE)
@ParameterizedTest
@ValueSource(booleans = Array(true, false))
def testStopReplicaWithExistingPartitionAndEqualLeaderEpoch(enableRemoteStorage: Boolean): Unit = {
testStopReplicaWithExistingPartition(1, false, false, Errors.NONE, enableRemoteStorage)
}
@Test
def testStopReplicaWithExistingPartitionAndDeleteSentinel(): Unit = {
testStopReplicaWithExistingPartition(LeaderAndIsr.EpochDuringDelete, false, false, Errors.NONE)
@ParameterizedTest
@ValueSource(booleans = Array(true, false))
def testStopReplicaWithExistingPartitionAndDeleteSentinel(enableRemoteStorage: Boolean): Unit = {
testStopReplicaWithExistingPartition(LeaderAndIsr.EpochDuringDelete, false, false, Errors.NONE, enableRemoteStorage)
}
@Test
def testStopReplicaWithExistingPartitionAndLeaderEpochNotProvided(): Unit = {
testStopReplicaWithExistingPartition(LeaderAndIsr.NoEpoch, false, false, Errors.NONE)
@ParameterizedTest
@ValueSource(booleans = Array(true, false))
def testStopReplicaWithExistingPartitionAndLeaderEpochNotProvided(enableRemoteStorage: Boolean): Unit = {
testStopReplicaWithExistingPartition(LeaderAndIsr.NoEpoch, false, false, Errors.NONE, enableRemoteStorage)
}
@Test
def testStopReplicaWithDeletePartitionAndExistingPartitionAndNewerLeaderEpoch(): Unit = {
testStopReplicaWithExistingPartition(2, true, false, Errors.NONE)
@ParameterizedTest
@ValueSource(booleans = Array(true, false))
def testStopReplicaWithDeletePartitionAndExistingPartitionAndNewerLeaderEpoch(enableRemoteStorage: Boolean): Unit = {
testStopReplicaWithExistingPartition(2, true, false, Errors.NONE, enableRemoteStorage)
}
@Test
def testStopReplicaWithDeletePartitionAndExistingPartitionAndNewerLeaderEpochAndIOException(): Unit = {
testStopReplicaWithExistingPartition(2, true, true, Errors.KAFKA_STORAGE_ERROR)
@ParameterizedTest
@ValueSource(booleans = Array(true, false))
def testStopReplicaWithDeletePartitionAndExistingPartitionAndNewerLeaderEpochAndIOException(enableRemoteStorage: Boolean): Unit = {
testStopReplicaWithExistingPartition(2, true, true, Errors.KAFKA_STORAGE_ERROR, enableRemoteStorage)
}
@Test
def testStopReplicaWithDeletePartitionAndExistingPartitionAndOlderLeaderEpoch(): Unit = {
testStopReplicaWithExistingPartition(0, true, false, Errors.FENCED_LEADER_EPOCH)
@ParameterizedTest
@ValueSource(booleans = Array(true, false))
def testStopReplicaWithDeletePartitionAndExistingPartitionAndOlderLeaderEpoch(enableRemoteStorage: Boolean): Unit = {
testStopReplicaWithExistingPartition(0, true, false, Errors.FENCED_LEADER_EPOCH, enableRemoteStorage)
}
@Test
def testStopReplicaWithDeletePartitionAndExistingPartitionAndEqualLeaderEpoch(): Unit = {
testStopReplicaWithExistingPartition(1, true, false, Errors.NONE)
@ParameterizedTest
@ValueSource(booleans = Array(true, false))
def testStopReplicaWithDeletePartitionAndExistingPartitionAndEqualLeaderEpoch(enableRemoteStorage: Boolean): Unit = {
testStopReplicaWithExistingPartition(1, true, false, Errors.NONE, enableRemoteStorage)
}
@Test
def testStopReplicaWithDeletePartitionAndExistingPartitionAndDeleteSentinel(): Unit = {
testStopReplicaWithExistingPartition(LeaderAndIsr.EpochDuringDelete, true, false, Errors.NONE)
@ParameterizedTest
@ValueSource(booleans = Array(true, false))
def testStopReplicaWithDeletePartitionAndExistingPartitionAndDeleteSentinel(enableRemoteStorage: Boolean): Unit = {
testStopReplicaWithExistingPartition(LeaderAndIsr.EpochDuringDelete, true, false, Errors.NONE, enableRemoteStorage)
}
@Test
def testStopReplicaWithDeletePartitionAndExistingPartitionAndLeaderEpochNotProvided(): Unit = {
testStopReplicaWithExistingPartition(LeaderAndIsr.NoEpoch, true, false, Errors.NONE)
@ParameterizedTest
@ValueSource(booleans = Array(true, false))
def testStopReplicaWithDeletePartitionAndExistingPartitionAndLeaderEpochNotProvided(enableRemoteStorage: Boolean): Unit = {
testStopReplicaWithExistingPartition(LeaderAndIsr.NoEpoch, true, false, Errors.NONE, enableRemoteStorage)
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ -3698,9 +3729,11 @@ class ReplicaManagerTest {
private def testStopReplicaWithExistingPartition(leaderEpoch: Int,
deletePartition: Boolean,
throwIOException: Boolean,
expectedOutput: Errors): Unit = {
expectedOutput: Errors,
enableRemoteStorage: Boolean): Unit = {
val mockTimer = new MockTimer(time)
val replicaManager = setupReplicaManagerWithMockedPurgatories(mockTimer, aliveBrokerIds = Seq(0, 1))
val replicaManager = setupReplicaManagerWithMockedPurgatories(mockTimer, aliveBrokerIds = Seq(0, 1),
enableRemoteStorage = enableRemoteStorage)
try {
val tp0 = new TopicPartition(topic, 0)
@ -3762,6 +3795,15 @@ class ReplicaManagerTest {
assertEquals(HostedPartition.None, replicaManager.getPartition(tp0))
assertFalse(readRecoveryPointCheckpoint().contains(tp0))
assertFalse(readLogStartOffsetCheckpoint().contains(tp0))
if (enableRemoteStorage) {
verify(mockRemoteLogManager).stopPartitions(ArgumentMatchers.eq(Collections.singleton(tp0)),
ArgumentMatchers.eq(leaderEpoch == LeaderAndIsr.EpochDuringDelete), any())
}
}
if (expectedOutput == Errors.NONE && !deletePartition && enableRemoteStorage) {
verify(mockRemoteLogManager).stopPartitions(ArgumentMatchers.eq(Collections.singleton(tp0)),
ArgumentMatchers.eq(false), any())
}
} finally {
replicaManager.shutdown(checkpointHW = false)
@ -4409,7 +4451,11 @@ class ReplicaManagerTest {
val notReplicaMetadataImage = imageFromTopics(notReplicaTopicsDelta.apply())
replicaManager.applyDelta(notReplicaTopicsDelta, notReplicaMetadataImage)
verify(mockRemoteLogManager, never()).onLeadershipChange(anySet(), anySet(), anyMap())
if (enableRemoteStorage) {
verify(mockRemoteLogManager, never()).onLeadershipChange(anySet(), anySet(), anyMap())
verify(mockRemoteLogManager, times(1)).stopPartitions(
ArgumentMatchers.eq(Collections.singleton(topicPartition)), ArgumentMatchers.eq(false), any())
}
// Check that the partition was removed
assertEquals(HostedPartition.None, replicaManager.getPartition(topicPartition))
@ -4451,7 +4497,12 @@ class ReplicaManagerTest {
val removeTopicsDelta = topicsDeleteDelta(followerMetadataImage.topics())
val removeMetadataImage = imageFromTopics(removeTopicsDelta.apply())
replicaManager.applyDelta(removeTopicsDelta, removeMetadataImage)
verify(mockRemoteLogManager, never()).onLeadershipChange(anySet(), anySet(), anyMap())
if (enableRemoteStorage) {
verify(mockRemoteLogManager, never()).onLeadershipChange(anySet(), anySet(), anyMap())
verify(mockRemoteLogManager, times(1)).stopPartitions(
ArgumentMatchers.eq(Collections.singleton(topicPartition)), ArgumentMatchers.eq(false), any())
}
// Check that the partition was removed
assertEquals(HostedPartition.None, replicaManager.getPartition(topicPartition))
@ -4493,7 +4544,12 @@ class ReplicaManagerTest {
val notReplicaTopicsDelta = topicsChangeDelta(leaderMetadataImage.topics(), otherId, true)
val notReplicaMetadataImage = imageFromTopics(notReplicaTopicsDelta.apply())
replicaManager.applyDelta(notReplicaTopicsDelta, notReplicaMetadataImage)
verify(mockRemoteLogManager, never()).onLeadershipChange(anySet(), anySet(), anyMap())
if (enableRemoteStorage) {
verify(mockRemoteLogManager, never()).onLeadershipChange(anySet(), anySet(), anyMap())
verify(mockRemoteLogManager, times(1)).stopPartitions(
ArgumentMatchers.eq(Collections.singleton(topicPartition)), ArgumentMatchers.eq(false), any())
}
// Check that the partition was removed
assertEquals(HostedPartition.None, replicaManager.getPartition(topicPartition))
@ -4535,7 +4591,12 @@ class ReplicaManagerTest {
val removeTopicsDelta = topicsDeleteDelta(leaderMetadataImage.topics())
val removeMetadataImage = imageFromTopics(removeTopicsDelta.apply())
replicaManager.applyDelta(removeTopicsDelta, removeMetadataImage)
verify(mockRemoteLogManager, never()).onLeadershipChange(anySet(), anySet(), anyMap())
if (enableRemoteStorage) {
verify(mockRemoteLogManager, never()).onLeadershipChange(anySet(), anySet(), anyMap())
verify(mockRemoteLogManager, times(1)).stopPartitions(
ArgumentMatchers.eq(Collections.singleton(topicPartition)), ArgumentMatchers.eq(true), any())
}
// Check that the partition was removed
assertEquals(HostedPartition.None, replicaManager.getPartition(topicPartition))

View File

@ -29,12 +29,12 @@ import java.util.concurrent.CompletableFuture;
public class NoOpRemoteLogMetadataManager implements RemoteLogMetadataManager {
@Override
public CompletableFuture<Void> addRemoteLogSegmentMetadata(RemoteLogSegmentMetadata remoteLogSegmentMetadata) {
return null;
return CompletableFuture.completedFuture(null);
}
@Override
public CompletableFuture<Void> updateRemoteLogSegmentMetadata(RemoteLogSegmentMetadataUpdate remoteLogSegmentMetadataUpdate) {
return null;
return CompletableFuture.completedFuture(null);
}
@Override

View File

@ -166,6 +166,24 @@ public class RemoteIndexCache implements Closeable {
return internalCache;
}
public void remove(Uuid key) {
lock.writeLock().lock();
try {
internalCache.invalidate(key);
} finally {
lock.writeLock().unlock();
}
}
public void removeAll(Collection<Uuid> keys) {
lock.writeLock().lock();
try {
internalCache.invalidateAll(keys);
} finally {
lock.writeLock().unlock();
}
}
// Visible for testing
public ShutdownableThread cleanerThread() {
return cleanerThread;