MINOR: Move StopPartition to server-common (#17704)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Mickael Maison 2024-11-09 20:13:12 +01:00 committed by GitHub
parent 393455eb1a
commit 5d2074e8f2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 95 additions and 35 deletions

View File

@ -20,7 +20,6 @@ import kafka.cluster.Partition;
import kafka.log.AsyncOffsetReadFutureHolder;
import kafka.log.UnifiedLog;
import kafka.server.DelayedRemoteListOffsets;
import kafka.server.StopPartition;
import org.apache.kafka.common.Endpoint;
import org.apache.kafka.common.KafkaException;
@ -49,6 +48,7 @@ import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.common.CheckpointFile;
import org.apache.kafka.server.common.OffsetAndEpoch;
import org.apache.kafka.server.common.StopPartition;
import org.apache.kafka.server.config.ServerConfigs;
import org.apache.kafka.server.log.remote.metadata.storage.ClassLoaderAwareRemoteLogMetadataManager;
import org.apache.kafka.server.log.remote.quota.RLMQuotaManager;
@ -486,8 +486,8 @@ public class RemoteLogManager implements Closeable {
/**
* Stop the remote-log-manager task for the given partitions. And, calls the
* {@link RemoteLogMetadataManager#onStopPartitions(Set)} when {@link StopPartition#deleteLocalLog()} is true.
* Deletes the partitions from the remote storage when {@link StopPartition#deleteRemoteLog()} is true.
* {@link RemoteLogMetadataManager#onStopPartitions(Set)} when {@link StopPartition#deleteLocalLog} is true.
* Deletes the partitions from the remote storage when {@link StopPartition#deleteRemoteLog} is true.
*
* @param stopPartitions topic partitions that needs to be stopped.
* @param errorHandler callback to handle any errors while stopping the partitions.
@ -496,7 +496,7 @@ public class RemoteLogManager implements Closeable {
BiConsumer<TopicPartition, Throwable> errorHandler) {
LOGGER.debug("Stop partitions: {}", stopPartitions);
for (StopPartition stopPartition: stopPartitions) {
TopicPartition tp = stopPartition.topicPartition();
TopicPartition tp = stopPartition.topicPartition;
try {
if (topicIdByPartitionMap.containsKey(tp)) {
TopicIdPartition tpId = new TopicIdPartition(topicIdByPartitionMap.get(tp), tp);
@ -518,7 +518,7 @@ public class RemoteLogManager implements Closeable {
removeRemoteTopicPartitionMetrics(tpId);
if (stopPartition.deleteRemoteLog()) {
if (stopPartition.deleteRemoteLog) {
LOGGER.info("Deleting the remote log segments task for partition: {}", tpId);
deleteRemoteLogPartition(tpId);
}
@ -535,8 +535,8 @@ public class RemoteLogManager implements Closeable {
// in both case, they all mean the topic will not be held in this broker anymore.
// NOTE: In ZK mode, this#stopPartitions method is called when Replica state changes to Offline and ReplicaDeletionStarted
Set<TopicIdPartition> pendingActionsPartitions = stopPartitions.stream()
.filter(sp -> (sp.stopRemoteLogMetadataManager() || sp.deleteLocalLog()) && topicIdByPartitionMap.containsKey(sp.topicPartition()))
.map(sp -> new TopicIdPartition(topicIdByPartitionMap.get(sp.topicPartition()), sp.topicPartition()))
.filter(sp -> (sp.stopRemoteLogMetadataManager || sp.deleteLocalLog) && topicIdByPartitionMap.containsKey(sp.topicPartition))
.map(sp -> new TopicIdPartition(topicIdByPartitionMap.get(sp.topicPartition), sp.topicPartition))
.collect(Collectors.toSet());
if (!pendingActionsPartitions.isEmpty()) {

View File

@ -32,6 +32,7 @@ import org.apache.kafka.common.utils.Sanitizer
import org.apache.kafka.coordinator.group.GroupCoordinator
import org.apache.kafka.security.CredentialProvider
import org.apache.kafka.server.ClientMetricsManager
import org.apache.kafka.server.common.StopPartition
import org.apache.kafka.storage.internals.log.{LogStartOffsetIncrementReason, ThrottledReplicaListValidator}
import org.apache.kafka.storage.internals.log.LogConfig.MessageFormatVersion
@ -98,7 +99,7 @@ class TopicConfigHandler(private val replicaManager: ReplicaManager,
// When copy disabled, we should stop leaderCopyRLMTask, but keep expirationTask
if (isRemoteLogEnabled && !wasCopyDisabled && isCopyDisabled) {
replicaManager.remoteLogManager.foreach(rlm => {
rlm.stopLeaderCopyRLMTasks(leaderPartitions.toSet.asJava);
rlm.stopLeaderCopyRLMTasks(leaderPartitions.toSet.asJava)
})
}
@ -107,14 +108,12 @@ class TopicConfigHandler(private val replicaManager: ReplicaManager,
val stopPartitions: java.util.HashSet[StopPartition] = new java.util.HashSet[StopPartition]()
leaderPartitions.foreach(partition => {
// delete remote logs and stop RemoteLogMetadataManager
stopPartitions.add(StopPartition(partition.topicPartition, deleteLocalLog = false,
deleteRemoteLog = true, stopRemoteLogMetadataManager = true))
stopPartitions.add(new StopPartition(partition.topicPartition, false, true, true))
})
followerPartitions.foreach(partition => {
// we need to cancel follower tasks and stop RemoteLogMetadataManager
stopPartitions.add(StopPartition(partition.topicPartition, deleteLocalLog = false,
deleteRemoteLog = false, stopRemoteLogMetadataManager = true))
stopPartitions.add(new StopPartition(partition.topicPartition, false, false, true))
})
// update the log start offset to local log start offset for the leader replicas

View File

@ -56,7 +56,7 @@ import org.apache.kafka.image.{LocalReplicaChanges, MetadataImage, TopicsDelta}
import org.apache.kafka.metadata.LeaderAndIsr
import org.apache.kafka.metadata.LeaderConstants.NO_LEADER
import org.apache.kafka.server.{ActionQueue, DelayedActionQueue, common}
import org.apache.kafka.server.common.{DirectoryEventHandler, RequestLocal, TopicOptionalIdPartition}
import org.apache.kafka.server.common.{DirectoryEventHandler, RequestLocal, StopPartition, TopicOptionalIdPartition}
import org.apache.kafka.server.common.MetadataVersion._
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.network.BrokerEndPoint
@ -106,11 +106,6 @@ case class LogDeleteRecordsResult(requestedOffset: Long, lowWatermark: Long, exc
}
}
case class StopPartition(topicPartition: TopicPartition,
deleteLocalLog: Boolean,
deleteRemoteLog: Boolean = false,
stopRemoteLogMetadataManager: Boolean = false)
/**
* Result metadata of a log read operation on the log
* @param info @FetchDataInfo returned by the @Log read
@ -443,7 +438,7 @@ class ReplicaManager(val config: KafkaConfig,
warn(s"Found stray partitions ${strayPartitions.mkString(",")}")
// First, stop the partitions. This will shutdown the fetchers and other managers
val partitionsToStop = strayPartitions.map(tp => StopPartition(tp, deleteLocalLog = false)).toSet
val partitionsToStop = strayPartitions.map(tp => new StopPartition(tp, false, false, false)).toSet
stopPartitions(partitionsToStop).foreachEntry { (topicPartition, exception) =>
error(s"Unable to stop stray partition $topicPartition", exception)
}
@ -582,8 +577,8 @@ class ReplicaManager(val config: KafkaConfig,
if (requestLeaderEpoch == LeaderAndIsr.EPOCH_DURING_DELETE ||
requestLeaderEpoch == LeaderAndIsr.NO_EPOCH ||
requestLeaderEpoch >= currentLeaderEpoch) {
stoppedPartitions += StopPartition(topicPartition, deletePartition,
deletePartition && partition.isLeader && requestLeaderEpoch == LeaderAndIsr.EPOCH_DURING_DELETE)
stoppedPartitions += new StopPartition(topicPartition, deletePartition,
deletePartition && partition.isLeader && requestLeaderEpoch == LeaderAndIsr.EPOCH_DURING_DELETE, false)
// Assume that everything will go right. It is overwritten in case of an error.
responseMap.put(topicPartition, Errors.NONE)
} else {
@ -598,7 +593,7 @@ class ReplicaManager(val config: KafkaConfig,
case HostedPartition.None =>
// Delete log and corresponding folders in case replica manager doesn't hold them anymore.
// This could happen when topic is being deleted while broker is down and recovers.
stoppedPartitions += StopPartition(topicPartition, deletePartition)
stoppedPartitions += new StopPartition(topicPartition, deletePartition, false, false)
responseMap.put(topicPartition, Errors.NONE)
}
}
@ -2909,7 +2904,7 @@ class ReplicaManager(val config: KafkaConfig,
.map(image => image.partitions().get(tp.partition()))
.exists(partition => partition.leader == config.nodeId)
val deleteRemoteLog = delta.topicWasDeleted(tp.topic()) && isCurrentLeader
StopPartition(tp, deleteLocalLog = true, deleteRemoteLog = deleteRemoteLog)
new StopPartition(tp, true, deleteRemoteLog, false)
}
.toSet
stateChangeLogger.info(s"Deleting ${deletes.size} partition(s).")
@ -3081,7 +3076,7 @@ class ReplicaManager(val config: KafkaConfig,
}
if (partitionsToStopFetching.nonEmpty) {
val partitionsToStop = partitionsToStopFetching.map { case (tp, deleteLocalLog) => StopPartition(tp, deleteLocalLog) }.toSet
val partitionsToStop = partitionsToStopFetching.map { case (tp, deleteLocalLog) => new StopPartition(tp, deleteLocalLog, false, false) }.toSet
stopPartitions(partitionsToStop)
stateChangeLogger.info(s"Stopped fetchers as part of controlled shutdown for ${partitionsToStop.size} partitions")
}

View File

@ -19,7 +19,6 @@ package kafka.log.remote;
import kafka.cluster.Partition;
import kafka.log.UnifiedLog;
import kafka.server.KafkaConfig;
import kafka.server.StopPartition;
import org.apache.kafka.common.Endpoint;
import org.apache.kafka.common.KafkaException;
@ -41,6 +40,7 @@ import org.apache.kafka.common.test.api.Flaky;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.common.OffsetAndEpoch;
import org.apache.kafka.server.common.StopPartition;
import org.apache.kafka.server.config.ServerConfigs;
import org.apache.kafka.server.log.remote.quota.RLMQuotaManager;
import org.apache.kafka.server.log.remote.quota.RLMQuotaManagerConfig;

View File

@ -61,7 +61,7 @@ import org.apache.kafka.metadata.properties.{MetaProperties, MetaPropertiesEnsem
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.raft.QuorumConfig
import org.apache.kafka.server.common.MetadataVersion.IBP_2_6_IV0
import org.apache.kafka.server.common.{DirectoryEventHandler, MetadataVersion, OffsetAndEpoch, RequestLocal}
import org.apache.kafka.server.common.{DirectoryEventHandler, MetadataVersion, OffsetAndEpoch, RequestLocal, StopPartition}
import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs, ServerLogConfigs}
import org.apache.kafka.server.log.remote.storage._
import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics}
@ -3925,7 +3925,7 @@ class ReplicaManagerTest {
verify(mockRemoteLogManager, times(0)).stopPartitions(any(), any())
} else {
verify(mockRemoteLogManager, times(1))
.stopPartitions(ArgumentMatchers.eq(Collections.singleton(StopPartition(tp0, deleteLocalLog = deletePartitions))), any())
.stopPartitions(ArgumentMatchers.eq(Collections.singleton(new StopPartition(tp0, deletePartitions, false, false))), any())
}
}
} finally {
@ -4608,9 +4608,10 @@ class ReplicaManagerTest {
assertFalse(readRecoveryPointCheckpoint().containsKey(tp0))
assertFalse(readLogStartOffsetCheckpoint().containsKey(tp0))
if (enableRemoteStorage) {
val stopPartition = StopPartition(tp0,
deleteLocalLog = deletePartition,
deleteRemoteLog = leaderEpoch == LeaderAndIsr.EPOCH_DURING_DELETE)
val stopPartition = new StopPartition(tp0,
deletePartition,
leaderEpoch == LeaderAndIsr.EPOCH_DURING_DELETE,
false)
verify(mockRemoteLogManager)
.stopPartitions(ArgumentMatchers.eq(Collections.singleton(stopPartition)), any())
}
@ -4618,7 +4619,7 @@ class ReplicaManagerTest {
if (expectedOutput == Errors.NONE && !deletePartition && enableRemoteStorage) {
verify(mockRemoteLogManager)
.stopPartitions(ArgumentMatchers.eq(Collections.singleton(StopPartition(tp0, deleteLocalLog = false))), any())
.stopPartitions(ArgumentMatchers.eq(Collections.singleton(new StopPartition(tp0, false, false, false))), any())
}
} finally {
replicaManager.shutdown(checkpointHW = false)
@ -5451,7 +5452,7 @@ class ReplicaManagerTest {
if (enableRemoteStorage) {
verify(mockRemoteLogManager, never()).onLeadershipChange(anySet(), anySet(), anyMap())
verify(mockRemoteLogManager, times(1))
.stopPartitions(ArgumentMatchers.eq(Collections.singleton(StopPartition(topicPartition, deleteLocalLog = true))), any())
.stopPartitions(ArgumentMatchers.eq(Collections.singleton(new StopPartition(topicPartition, true, false, false))), any())
}
// Check that the partition was removed
@ -5499,7 +5500,7 @@ class ReplicaManagerTest {
if (enableRemoteStorage) {
verify(mockRemoteLogManager, never()).onLeadershipChange(anySet(), anySet(), anyMap())
verify(mockRemoteLogManager, times(1))
.stopPartitions(ArgumentMatchers.eq(Collections.singleton(StopPartition(topicPartition, deleteLocalLog = true))), any())
.stopPartitions(ArgumentMatchers.eq(Collections.singleton(new StopPartition(topicPartition, true, false, false))), any())
}
// Check that the partition was removed
@ -5546,7 +5547,7 @@ class ReplicaManagerTest {
if (enableRemoteStorage) {
verify(mockRemoteLogManager, never()).onLeadershipChange(anySet(), anySet(), anyMap())
verify(mockRemoteLogManager, times(1))
.stopPartitions(ArgumentMatchers.eq(Collections.singleton(StopPartition(topicPartition, deleteLocalLog = true))), any())
.stopPartitions(ArgumentMatchers.eq(Collections.singleton(new StopPartition(topicPartition, true, false, false))), any())
}
// Check that the partition was removed
@ -5593,7 +5594,7 @@ class ReplicaManagerTest {
if (enableRemoteStorage) {
verify(mockRemoteLogManager, never()).onLeadershipChange(anySet(), anySet(), anyMap())
verify(mockRemoteLogManager, times(1))
.stopPartitions(ArgumentMatchers.eq(Collections.singleton(StopPartition(topicPartition, deleteLocalLog = true, deleteRemoteLog = true))), any())
.stopPartitions(ArgumentMatchers.eq(Collections.singleton(new StopPartition(topicPartition, true, true, false))), any())
}
// Check that the partition was removed

View File

@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.common;
import org.apache.kafka.common.TopicPartition;
import java.util.Objects;
/**
* A topic partition that brokers should not replicate anymore. Flags indicate is the partition should also be deleted.
*/
public class StopPartition {
public final TopicPartition topicPartition;
public final boolean deleteLocalLog;
public final boolean deleteRemoteLog;
public final boolean stopRemoteLogMetadataManager;
public StopPartition(TopicPartition topicPartition, boolean deleteLocalLog, boolean deleteRemoteLog, boolean stopRemoteLogMetadataManager) {
this.topicPartition = topicPartition;
this.deleteLocalLog = deleteLocalLog;
this.deleteRemoteLog = deleteRemoteLog;
this.stopRemoteLogMetadataManager = stopRemoteLogMetadataManager;
}
@Override
public String toString() {
return "StopPartition(" +
"topicPartition=" + topicPartition +
", deleteLocalLog=" + deleteLocalLog +
", deleteRemoteLog=" + deleteRemoteLog +
", stopRemoteLogMetadataManager=" + stopRemoteLogMetadataManager +
')';
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
StopPartition that = (StopPartition) o;
return deleteLocalLog == that.deleteLocalLog &&
deleteRemoteLog == that.deleteRemoteLog &&
stopRemoteLogMetadataManager == that.stopRemoteLogMetadataManager &&
Objects.equals(topicPartition, that.topicPartition);
}
@Override
public int hashCode() {
return Objects.hash(topicPartition, deleteLocalLog, deleteRemoteLog, stopRemoteLogMetadataManager);
}
}