Replace TopicPartition with TopicIdPartition in StopPartition

This commit is contained in:
ally heev 2025-08-27 22:49:14 +05:30
parent e42eaf3884
commit 11530066b3
6 changed files with 55 additions and 52 deletions

View File

@ -20,6 +20,7 @@ package kafka.server
import java.util.Properties
import kafka.server.QuotaFactory.QuotaManagers
import kafka.utils.Logging
import org.apache.kafka.common.TopicIdPartition
import org.apache.kafka.server.config.QuotaConfig
import org.apache.kafka.common.metrics.Quota._
import org.apache.kafka.coordinator.group.GroupCoordinator
@ -89,12 +90,14 @@ class TopicConfigHandler(private val replicaManager: ReplicaManager,
val stopPartitions: java.util.HashSet[StopPartition] = new java.util.HashSet[StopPartition]()
leaderPartitions.foreach(partition => {
// delete remote logs and stop RemoteLogMetadataManager
stopPartitions.add(new StopPartition(partition.topicPartition, false, true, true))
stopPartitions.add(
new StopPartition(new TopicIdPartition(partition.topicId.get, partition.topicPartition), false, true, true))
})
followerPartitions.foreach(partition => {
// we need to cancel follower tasks and stop RemoteLogMetadataManager
stopPartitions.add(new StopPartition(partition.topicPartition, false, false, true))
stopPartitions.add(
new StopPartition(new TopicIdPartition(partition.topicId.get, partition.topicPartition), false, false, true))
})
// update the log start offset to local log start offset for the leader replicas

View File

@ -434,7 +434,7 @@ class ReplicaManager(val config: KafkaConfig,
*/
private def stopPartitions(partitionsToStop: Set[StopPartition]): Map[TopicPartition, Throwable] = {
// First stop fetchers for all partitions.
val partitions = partitionsToStop.map(_.topicPartition)
val partitions = partitionsToStop.map(_.topicIdPartition.topicPartition())
replicaFetcherManager.removeFetcherForPartitions(partitions)
replicaAlterLogDirsManager.removeFetcherForPartitions(partitions)
@ -442,7 +442,7 @@ class ReplicaManager(val config: KafkaConfig,
// ReplicaManager to get Partition's information so they must be stopped first.
val partitionsToDelete = mutable.Set.empty[TopicPartition]
partitionsToStop.foreach { stopPartition =>
val topicPartition = stopPartition.topicPartition
val topicPartition = stopPartition.topicIdPartition.topicPartition()
var topicId: Option[Uuid] = None
if (stopPartition.deleteLocalLog) {
getPartition(topicPartition) match {
@ -467,7 +467,7 @@ class ReplicaManager(val config: KafkaConfig,
// Third delete the logs and checkpoint.
val errorMap = new mutable.HashMap[TopicPartition, Throwable]()
val remotePartitionsToStop = partitionsToStop.filter {
sp => logManager.getLog(sp.topicPartition).exists(unifiedLog => unifiedLog.remoteLogEnabled())
sp => logManager.getLog(sp.topicIdPartition.topicPartition()).exists(unifiedLog => unifiedLog.remoteLogEnabled())
}
if (partitionsToDelete.nonEmpty) {
// Delete the logs and checkpoint.
@ -475,7 +475,7 @@ class ReplicaManager(val config: KafkaConfig,
}
remoteLogManager.foreach { rlm =>
// exclude the partitions with offline/error state
val partitions = remotePartitionsToStop.filterNot(sp => errorMap.contains(sp.topicPartition)).toSet.asJava
val partitions = remotePartitionsToStop.filterNot(sp => errorMap.contains(sp.topicIdPartition.topicPartition())).toSet.asJava
if (!partitions.isEmpty) {
rlm.stopPartitions(partitions, (tp, e) => errorMap.put(tp, e))
}
@ -2347,6 +2347,7 @@ class ReplicaManager(val config: KafkaConfig,
// Before taking the lock, compute the local changes
val localChanges = delta.localChanges(config.nodeId)
val metadataVersion = newImage.features().metadataVersionOrThrow()
val deletedTopicIdMap = delta.image().topicNameToIdView()
replicaStateChangeLock.synchronized {
// Handle deleted partitions. We need to do this first because we might subsequently
@ -2358,7 +2359,7 @@ class ReplicaManager(val config: KafkaConfig,
.map(image => image.partitions().get(tp.partition()))
.exists(partition => partition.leader == config.nodeId)
val deleteRemoteLog = delta.topicWasDeleted(tp.topic()) && isCurrentLeader
new StopPartition(tp, true, deleteRemoteLog, false)
new StopPartition(new TopicIdPartition(deletedTopicIdMap.get(tp.topic()), tp), true, deleteRemoteLog, false)
}
.toSet
stateChangeLogger.info(s"Deleting ${deletes.size} partition(s).")
@ -2443,7 +2444,7 @@ class ReplicaManager(val config: KafkaConfig,
stateChangeLogger.info(s"Transitioning ${localFollowers.size} partition(s) to " +
"local followers.")
val partitionsToStartFetching = new mutable.HashMap[TopicPartition, Partition]
val partitionsToStopFetching = new mutable.HashMap[TopicPartition, Boolean]
val partitionsToStopFetching = new mutable.HashMap[TopicIdPartition, Boolean]
val followerTopicSet = new mutable.HashSet[String]
localFollowers.foreachEntry { (tp, info) =>
getOrCreatePartition(tp, delta, info.topicId).foreach { case (partition, isNew) =>
@ -2462,7 +2463,7 @@ class ReplicaManager(val config: KafkaConfig,
!info.partition.isr.contains(config.brokerId))) {
// During controlled shutdown, replica with no leaders and replica
// where this broker is not in the ISR are stopped.
partitionsToStopFetching.put(tp, false)
partitionsToStopFetching.put(new TopicIdPartition(info.topicId, tp), false)
} else if (isNewLeaderEpoch) {
// Invoke the follower transition listeners for the partition.
partition.invokeOnBecomingFollowerListeners()

View File

@ -4516,6 +4516,7 @@ class ReplicaManagerTest {
val localId = 1
val otherId = localId + 1
val topicPartition = new TopicPartition("foo", 0)
val topicIdPartition = new TopicIdPartition(FOO_UUID, topicPartition)
val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), localId, enableRemoteStorage = enableRemoteStorage)
try {
@ -4546,7 +4547,7 @@ class ReplicaManagerTest {
if (enableRemoteStorage) {
verify(mockRemoteLogManager, never()).onLeadershipChange(anySet(), anySet())
verify(mockRemoteLogManager, times(1))
.stopPartitions(ArgumentMatchers.eq(Collections.singleton(new StopPartition(topicPartition, true, false, false))), any())
.stopPartitions(ArgumentMatchers.eq(Collections.singleton(new StopPartition(topicIdPartition, true, false, false))), any())
}
// Check that the partition was removed
@ -4564,6 +4565,7 @@ class ReplicaManagerTest {
val localId = 1
val otherId = localId + 1
val topicPartition = new TopicPartition("foo", 0)
val topicIdPartition = new TopicIdPartition(FOO_UUID, topicPartition)
val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), localId, enableRemoteStorage = enableRemoteStorage)
try {
@ -4594,7 +4596,7 @@ class ReplicaManagerTest {
if (enableRemoteStorage) {
verify(mockRemoteLogManager, never()).onLeadershipChange(anySet(), anySet())
verify(mockRemoteLogManager, times(1))
.stopPartitions(ArgumentMatchers.eq(Collections.singleton(new StopPartition(topicPartition, true, false, false))), any())
.stopPartitions(ArgumentMatchers.eq(Collections.singleton(new StopPartition(topicIdPartition, true, false, false))), any())
}
// Check that the partition was removed
@ -4612,6 +4614,7 @@ class ReplicaManagerTest {
val localId = 1
val otherId = localId + 1
val topicPartition = new TopicPartition("foo", 0)
val topicIdPartition = new TopicIdPartition(FOO_UUID, topicPartition)
val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), localId, enableRemoteStorage = enableRemoteStorage)
try {
@ -4641,7 +4644,7 @@ class ReplicaManagerTest {
if (enableRemoteStorage) {
verify(mockRemoteLogManager, never()).onLeadershipChange(anySet(), anySet())
verify(mockRemoteLogManager, times(1))
.stopPartitions(ArgumentMatchers.eq(Collections.singleton(new StopPartition(topicPartition, true, false, false))), any())
.stopPartitions(ArgumentMatchers.eq(Collections.singleton(new StopPartition(topicIdPartition, true, false, false))), any())
}
// Check that the partition was removed
@ -4659,6 +4662,7 @@ class ReplicaManagerTest {
val localId = 1
val otherId = localId + 1
val topicPartition = new TopicPartition("foo", 0)
val topicIdPartition = new TopicIdPartition(FOO_UUID, topicPartition)
val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), localId, enableRemoteStorage = enableRemoteStorage)
try {
@ -4688,7 +4692,7 @@ class ReplicaManagerTest {
if (enableRemoteStorage) {
verify(mockRemoteLogManager, never()).onLeadershipChange(anySet(), anySet())
verify(mockRemoteLogManager, times(1))
.stopPartitions(ArgumentMatchers.eq(Collections.singleton(new StopPartition(topicPartition, true, true, false))), any())
.stopPartitions(ArgumentMatchers.eq(Collections.singleton(new StopPartition(topicIdPartition, true, true, false))), any())
}
// Check that the partition was removed

View File

@ -16,7 +16,7 @@
*/
package org.apache.kafka.server.common;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicIdPartition;
import java.util.Objects;
@ -25,13 +25,13 @@ import java.util.Objects;
*/
public class StopPartition {
public final TopicPartition topicPartition;
public final TopicIdPartition topicIdPartition;
public final boolean deleteLocalLog;
public final boolean deleteRemoteLog;
public final boolean stopRemoteLogMetadataManager;
public StopPartition(TopicPartition topicPartition, boolean deleteLocalLog, boolean deleteRemoteLog, boolean stopRemoteLogMetadataManager) {
this.topicPartition = topicPartition;
public StopPartition(TopicIdPartition topicIdPartition, boolean deleteLocalLog, boolean deleteRemoteLog, boolean stopRemoteLogMetadataManager) {
this.topicIdPartition = topicIdPartition;
this.deleteLocalLog = deleteLocalLog;
this.deleteRemoteLog = deleteRemoteLog;
this.stopRemoteLogMetadataManager = stopRemoteLogMetadataManager;
@ -40,7 +40,7 @@ public class StopPartition {
@Override
public String toString() {
return "StopPartition(" +
"topicPartition=" + topicPartition +
"topicIdPartition=" + topicIdPartition +
", deleteLocalLog=" + deleteLocalLog +
", deleteRemoteLog=" + deleteRemoteLog +
", stopRemoteLogMetadataManager=" + stopRemoteLogMetadataManager +
@ -55,11 +55,11 @@ public class StopPartition {
return deleteLocalLog == that.deleteLocalLog &&
deleteRemoteLog == that.deleteRemoteLog &&
stopRemoteLogMetadataManager == that.stopRemoteLogMetadataManager &&
Objects.equals(topicPartition, that.topicPartition);
Objects.equals(topicIdPartition, that.topicIdPartition);
}
@Override
public int hashCode() {
return Objects.hash(topicPartition, deleteLocalLog, deleteRemoteLog, stopRemoteLogMetadataManager);
return Objects.hash(topicIdPartition, deleteLocalLog, deleteRemoteLog, stopRemoteLogMetadataManager);
}
}

View File

@ -504,37 +504,32 @@ public class RemoteLogManager implements Closeable, AsyncOffsetReader {
BiConsumer<TopicPartition, Throwable> errorHandler) {
LOGGER.debug("Stop partitions: {}", stopPartitions);
for (StopPartition stopPartition: stopPartitions) {
TopicPartition tp = stopPartition.topicPartition;
TopicIdPartition tpId = stopPartition.topicIdPartition;
try {
if (metadataCache.contains(tp)) {
TopicIdPartition tpId = new TopicIdPartition(metadataCache.getTopicId(tp.topic()), tp);
leaderCopyRLMTasks.computeIfPresent(tpId, (topicIdPartition, task) -> {
LOGGER.info("Cancelling the copy RLM task for partition: {}", tpId);
task.cancel();
return null;
});
leaderExpirationRLMTasks.computeIfPresent(tpId, (topicIdPartition, task) -> {
LOGGER.info("Cancelling the expiration RLM task for partition: {}", tpId);
task.cancel();
return null;
});
followerRLMTasks.computeIfPresent(tpId, (topicIdPartition, task) -> {
LOGGER.info("Cancelling the follower RLM task for partition: {}", tpId);
task.cancel();
return null;
});
leaderCopyRLMTasks.computeIfPresent(tpId, (topicIdPartition, task) -> {
LOGGER.info("Cancelling the copy RLM task for partition: {}", tpId);
task.cancel();
return null;
});
leaderExpirationRLMTasks.computeIfPresent(tpId, (topicIdPartition, task) -> {
LOGGER.info("Cancelling the expiration RLM task for partition: {}", tpId);
task.cancel();
return null;
});
followerRLMTasks.computeIfPresent(tpId, (topicIdPartition, task) -> {
LOGGER.info("Cancelling the follower RLM task for partition: {}", tpId);
task.cancel();
return null;
});
removeRemoteTopicPartitionMetrics(tpId);
removeRemoteTopicPartitionMetrics(tpId);
if (stopPartition.deleteRemoteLog) {
LOGGER.info("Deleting the remote log segments task for partition: {}", tpId);
deleteRemoteLogPartition(tpId);
}
} else {
LOGGER.warn("StopPartition call is not expected for partition: {}", tp);
if (stopPartition.deleteRemoteLog) {
LOGGER.info("Deleting the remote log segments task for partition: {}", tpId);
deleteRemoteLogPartition(tpId);
}
} catch (Exception ex) {
errorHandler.accept(tp, ex);
errorHandler.accept(tpId.topicPartition(), ex);
LOGGER.error("Error while stopping the partition: {}", stopPartition, ex);
}
}
@ -542,8 +537,8 @@ public class RemoteLogManager implements Closeable, AsyncOffsetReader {
// We want to remove topicId map and stopPartition on RLMM for deleteLocalLog or stopRLMM partitions because
// in both case, they all mean the topic will not be held in this broker anymore.
Set<TopicIdPartition> pendingActionsPartitions = stopPartitions.stream()
.filter(sp -> (sp.stopRemoteLogMetadataManager || sp.deleteLocalLog) && metadataCache.contains(sp.topicPartition))
.map(sp -> new TopicIdPartition(metadataCache.getTopicId(sp.topicPartition.topic()), sp.topicPartition))
.filter(sp -> (sp.stopRemoteLogMetadataManager || sp.deleteLocalLog))
.map(sp -> sp.topicIdPartition)
.collect(Collectors.toSet());
if (!pendingActionsPartitions.isEmpty()) {

View File

@ -2094,8 +2094,8 @@ public class RemoteLogManagerTest {
public void testStopPartitionsWithoutDeletion() throws RemoteStorageException {
BiConsumer<TopicPartition, Throwable> errorHandler = (topicPartition, throwable) -> fail("shouldn't be called");
Set<StopPartition> partitions = new HashSet<>();
partitions.add(new StopPartition(leaderTopicIdPartition.topicPartition(), true, false, false));
partitions.add(new StopPartition(followerTopicIdPartition.topicPartition(), true, false, false));
partitions.add(new StopPartition(leaderTopicIdPartition, true, false, false));
partitions.add(new StopPartition(followerTopicIdPartition, true, false, false));
remoteLogManager.onLeadershipChange(Set.of(mockPartition(leaderTopicIdPartition)),
Set.of(mockPartition(followerTopicIdPartition)));
assertNotNull(remoteLogManager.leaderCopyTask(leaderTopicIdPartition));
@ -2116,8 +2116,8 @@ public class RemoteLogManagerTest {
BiConsumer<TopicPartition, Throwable> errorHandler =
(topicPartition, ex) -> fail("shouldn't be called: " + ex);
Set<StopPartition> partitions = new HashSet<>();
partitions.add(new StopPartition(leaderTopicIdPartition.topicPartition(), true, true, true));
partitions.add(new StopPartition(followerTopicIdPartition.topicPartition(), true, true, true));
partitions.add(new StopPartition(leaderTopicIdPartition, true, true, true));
partitions.add(new StopPartition(followerTopicIdPartition, true, true, true));
when(remoteLogMetadataManager.listRemoteLogSegments(eq(leaderTopicIdPartition)))
.thenAnswer(invocation -> listRemoteLogSegmentMetadata(leaderTopicIdPartition, 5, 100, 1024, RemoteLogSegmentState.DELETE_SEGMENT_FINISHED).iterator());