mirror of https://github.com/apache/kafka.git
KAFKA-15605: Fix topic deletion handling during ZK migration (#14545)
This patch adds reconciliation logic to migrating ZK brokers to deal with pending topic deletions as well as missed StopReplicas. During the hybrid mode of the ZK migration, the KRaft controller is asynchronously sending UMR and LISR to the ZK brokers to propagate metadata. Since this process is essentially "best effort" it is possible for a broker to miss a StopReplicas. The new logic lets the ZK broker examine its local logs compared with the full set of replicas in a "Full" LISR. Any local logs which are not present in the set of replicas in the request are removed from ReplicaManager and marked as "stray". To avoid inadvertent data loss with this new behavior, the brokers do not delete the "stray" partitions. They will rename the directories and log warning messages during log recovery. It will be up to the operator to manually delete the stray partitions. We can possibly enhance this in the future to clean up old stray logs. This patch makes use of the previously unused Type field on LeaderAndIsrRequest. This was added as part of KIP-516 but never implemented. Since its introduction, an implicit 0 was sent in all LISR. The KRaft controller will now send a value of 2 to indicate a full LISR (as specified by the KIP). The presence of this value acts as a trigger for the ZK broker to perform the log reconciliation. Reviewers: Colin P. McCabe <cmccabe@apache.org> Conflicts: - ReplicaManagerTest.scala: fix imports - ZkMigrationIntegrationTest.scala: handle absence of KIP-919 changes that added a different way to fetch the quorum voters config. - KRaftMigrationDriverTest.java: handle absence of KIP-919 changes that added setupDeltaForMigration.
This commit is contained in:
parent
a1d1834942
commit
1de84590c4
|
@ -42,26 +42,52 @@ import java.util.stream.Collectors;
|
||||||
|
|
||||||
public class LeaderAndIsrRequest extends AbstractControlRequest {
|
public class LeaderAndIsrRequest extends AbstractControlRequest {
|
||||||
|
|
||||||
|
public enum Type {
|
||||||
|
UNKNOWN(0),
|
||||||
|
INCREMENTAL(1),
|
||||||
|
FULL(2);
|
||||||
|
|
||||||
|
private final byte type;
|
||||||
|
private Type(int type) {
|
||||||
|
this.type = (byte) type;
|
||||||
|
}
|
||||||
|
|
||||||
|
public byte toByte() {
|
||||||
|
return type;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static Type fromByte(byte type) {
|
||||||
|
for (Type t : Type.values()) {
|
||||||
|
if (t.type == type) {
|
||||||
|
return t;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return UNKNOWN;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public static class Builder extends AbstractControlRequest.Builder<LeaderAndIsrRequest> {
|
public static class Builder extends AbstractControlRequest.Builder<LeaderAndIsrRequest> {
|
||||||
|
|
||||||
private final List<LeaderAndIsrPartitionState> partitionStates;
|
private final List<LeaderAndIsrPartitionState> partitionStates;
|
||||||
private final Map<String, Uuid> topicIds;
|
private final Map<String, Uuid> topicIds;
|
||||||
private final Collection<Node> liveLeaders;
|
private final Collection<Node> liveLeaders;
|
||||||
|
private final Type updateType;
|
||||||
|
|
||||||
public Builder(short version, int controllerId, int controllerEpoch, long brokerEpoch,
|
public Builder(short version, int controllerId, int controllerEpoch, long brokerEpoch,
|
||||||
List<LeaderAndIsrPartitionState> partitionStates, Map<String, Uuid> topicIds,
|
List<LeaderAndIsrPartitionState> partitionStates, Map<String, Uuid> topicIds,
|
||||||
Collection<Node> liveLeaders) {
|
Collection<Node> liveLeaders) {
|
||||||
this(version, controllerId, controllerEpoch, brokerEpoch, partitionStates, topicIds,
|
this(version, controllerId, controllerEpoch, brokerEpoch, partitionStates, topicIds,
|
||||||
liveLeaders, false);
|
liveLeaders, false, Type.UNKNOWN);
|
||||||
}
|
}
|
||||||
|
|
||||||
public Builder(short version, int controllerId, int controllerEpoch, long brokerEpoch,
|
public Builder(short version, int controllerId, int controllerEpoch, long brokerEpoch,
|
||||||
List<LeaderAndIsrPartitionState> partitionStates, Map<String, Uuid> topicIds,
|
List<LeaderAndIsrPartitionState> partitionStates, Map<String, Uuid> topicIds,
|
||||||
Collection<Node> liveLeaders, boolean kraftController) {
|
Collection<Node> liveLeaders, boolean kraftController, Type updateType) {
|
||||||
super(ApiKeys.LEADER_AND_ISR, version, controllerId, controllerEpoch, brokerEpoch, kraftController);
|
super(ApiKeys.LEADER_AND_ISR, version, controllerId, controllerEpoch, brokerEpoch, kraftController);
|
||||||
this.partitionStates = partitionStates;
|
this.partitionStates = partitionStates;
|
||||||
this.topicIds = topicIds;
|
this.topicIds = topicIds;
|
||||||
this.liveLeaders = liveLeaders;
|
this.liveLeaders = liveLeaders;
|
||||||
|
this.updateType = updateType;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -82,6 +108,10 @@ public class LeaderAndIsrRequest extends AbstractControlRequest {
|
||||||
data.setIsKRaftController(kraftController);
|
data.setIsKRaftController(kraftController);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (version >= 5) {
|
||||||
|
data.setType(updateType.toByte());
|
||||||
|
}
|
||||||
|
|
||||||
if (version >= 2) {
|
if (version >= 2) {
|
||||||
Map<String, LeaderAndIsrTopicState> topicStatesMap = groupByTopic(partitionStates, topicIds);
|
Map<String, LeaderAndIsrTopicState> topicStatesMap = groupByTopic(partitionStates, topicIds);
|
||||||
data.setTopicStates(new ArrayList<>(topicStatesMap.values()));
|
data.setTopicStates(new ArrayList<>(topicStatesMap.values()));
|
||||||
|
@ -210,6 +240,10 @@ public class LeaderAndIsrRequest extends AbstractControlRequest {
|
||||||
return Collections.unmodifiableList(data.liveLeaders());
|
return Collections.unmodifiableList(data.liveLeaders());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Type requestType() {
|
||||||
|
return Type.fromByte(data.type());
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public LeaderAndIsrRequestData data() {
|
public LeaderAndIsrRequestData data() {
|
||||||
return data;
|
return data;
|
||||||
|
|
|
@ -377,6 +377,7 @@ abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig,
|
||||||
val stopReplicaRequestMap = mutable.Map.empty[Int, mutable.Map[TopicPartition, StopReplicaPartitionState]]
|
val stopReplicaRequestMap = mutable.Map.empty[Int, mutable.Map[TopicPartition, StopReplicaPartitionState]]
|
||||||
val updateMetadataRequestBrokerSet = mutable.Set.empty[Int]
|
val updateMetadataRequestBrokerSet = mutable.Set.empty[Int]
|
||||||
val updateMetadataRequestPartitionInfoMap = mutable.Map.empty[TopicPartition, UpdateMetadataPartitionState]
|
val updateMetadataRequestPartitionInfoMap = mutable.Map.empty[TopicPartition, UpdateMetadataPartitionState]
|
||||||
|
private var updateType: LeaderAndIsrRequest.Type = LeaderAndIsrRequest.Type.UNKNOWN
|
||||||
private var metadataInstance: ControllerChannelContext = _
|
private var metadataInstance: ControllerChannelContext = _
|
||||||
|
|
||||||
def sendRequest(brokerId: Int,
|
def sendRequest(brokerId: Int,
|
||||||
|
@ -398,12 +399,17 @@ abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig,
|
||||||
metadataInstance = metadataProvider()
|
metadataInstance = metadataProvider()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
def setUpdateType(updateType: LeaderAndIsrRequest.Type): Unit = {
|
||||||
|
this.updateType = updateType
|
||||||
|
}
|
||||||
|
|
||||||
def clear(): Unit = {
|
def clear(): Unit = {
|
||||||
leaderAndIsrRequestMap.clear()
|
leaderAndIsrRequestMap.clear()
|
||||||
stopReplicaRequestMap.clear()
|
stopReplicaRequestMap.clear()
|
||||||
updateMetadataRequestBrokerSet.clear()
|
updateMetadataRequestBrokerSet.clear()
|
||||||
updateMetadataRequestPartitionInfoMap.clear()
|
updateMetadataRequestPartitionInfoMap.clear()
|
||||||
metadataInstance = null
|
metadataInstance = null
|
||||||
|
updateType = LeaderAndIsrRequest.Type.UNKNOWN
|
||||||
}
|
}
|
||||||
|
|
||||||
def addLeaderAndIsrRequestForBrokers(brokerIds: Seq[Int],
|
def addLeaderAndIsrRequestForBrokers(brokerIds: Seq[Int],
|
||||||
|
@ -543,8 +549,17 @@ abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig,
|
||||||
.toSet[String]
|
.toSet[String]
|
||||||
.map(topic => (topic, metadataInstance.topicIds.getOrElse(topic, Uuid.ZERO_UUID)))
|
.map(topic => (topic, metadataInstance.topicIds.getOrElse(topic, Uuid.ZERO_UUID)))
|
||||||
.toMap
|
.toMap
|
||||||
val leaderAndIsrRequestBuilder = new LeaderAndIsrRequest.Builder(leaderAndIsrRequestVersion, controllerId,
|
val leaderAndIsrRequestBuilder = new LeaderAndIsrRequest.Builder(
|
||||||
controllerEpoch, brokerEpoch, leaderAndIsrPartitionStates.values.toBuffer.asJava, topicIds.asJava, leaders.asJava, kraftController)
|
leaderAndIsrRequestVersion,
|
||||||
|
controllerId,
|
||||||
|
controllerEpoch,
|
||||||
|
brokerEpoch,
|
||||||
|
leaderAndIsrPartitionStates.values.toBuffer.asJava,
|
||||||
|
topicIds.asJava,
|
||||||
|
leaders.asJava,
|
||||||
|
kraftController,
|
||||||
|
updateType
|
||||||
|
)
|
||||||
sendRequest(broker, leaderAndIsrRequestBuilder, (r: AbstractResponse) => {
|
sendRequest(broker, leaderAndIsrRequestBuilder, (r: AbstractResponse) => {
|
||||||
val leaderAndIsrResponse = r.asInstanceOf[LeaderAndIsrResponse]
|
val leaderAndIsrResponse = r.asInstanceOf[LeaderAndIsrResponse]
|
||||||
handleLeaderAndIsrResponse(leaderAndIsrResponse, broker)
|
handleLeaderAndIsrResponse(leaderAndIsrResponse, broker)
|
||||||
|
@ -552,6 +567,7 @@ abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
leaderAndIsrRequestMap.clear()
|
leaderAndIsrRequestMap.clear()
|
||||||
|
updateType = LeaderAndIsrRequest.Type.UNKNOWN
|
||||||
}
|
}
|
||||||
|
|
||||||
def handleLeaderAndIsrResponse(response: LeaderAndIsrResponse, broker: Int): Unit
|
def handleLeaderAndIsrResponse(response: LeaderAndIsrResponse, broker: Int): Unit
|
||||||
|
|
|
@ -610,8 +610,12 @@ object LocalLog extends Logging {
|
||||||
/** a directory that is used for future partition */
|
/** a directory that is used for future partition */
|
||||||
private[log] val FutureDirSuffix = "-future"
|
private[log] val FutureDirSuffix = "-future"
|
||||||
|
|
||||||
|
/** a directory that is used for stray partition */
|
||||||
|
private[log] val StrayDirSuffix = "-stray"
|
||||||
|
|
||||||
private[log] val DeleteDirPattern = Pattern.compile(s"^(\\S+)-(\\S+)\\.(\\S+)$DeleteDirSuffix")
|
private[log] val DeleteDirPattern = Pattern.compile(s"^(\\S+)-(\\S+)\\.(\\S+)$DeleteDirSuffix")
|
||||||
private[log] val FutureDirPattern = Pattern.compile(s"^(\\S+)-(\\S+)\\.(\\S+)$FutureDirSuffix")
|
private[log] val FutureDirPattern = Pattern.compile(s"^(\\S+)-(\\S+)\\.(\\S+)$FutureDirSuffix")
|
||||||
|
private[log] val StrayDirPattern = Pattern.compile(s"^(\\S+)-(\\S+)\\.(\\S+)$StrayDirSuffix")
|
||||||
|
|
||||||
private[log] val UnknownOffset = -1L
|
private[log] val UnknownOffset = -1L
|
||||||
|
|
||||||
|
@ -622,10 +626,17 @@ object LocalLog extends Logging {
|
||||||
* from exceeding 255 characters.
|
* from exceeding 255 characters.
|
||||||
*/
|
*/
|
||||||
private[log] def logDeleteDirName(topicPartition: TopicPartition): String = {
|
private[log] def logDeleteDirName(topicPartition: TopicPartition): String = {
|
||||||
val uniqueId = java.util.UUID.randomUUID.toString.replaceAll("-", "")
|
logDirNameWithSuffixCappedLength(topicPartition, DeleteDirSuffix)
|
||||||
val suffix = s"-${topicPartition.partition()}.$uniqueId$DeleteDirSuffix"
|
}
|
||||||
val prefixLength = Math.min(topicPartition.topic().size, 255 - suffix.size)
|
|
||||||
s"${topicPartition.topic().substring(0, prefixLength)}$suffix"
|
/**
|
||||||
|
* Return a directory name to rename the log directory to for stray partition deletion.
|
||||||
|
* The name will be in the following format: "topic-partitionId.uniqueId-stray".
|
||||||
|
* If the topic name is too long, it will be truncated to prevent the total name
|
||||||
|
* from exceeding 255 characters.
|
||||||
|
*/
|
||||||
|
private[log] def logStrayDirName(topicPartition: TopicPartition): String = {
|
||||||
|
logDirNameWithSuffixCappedLength(topicPartition, StrayDirSuffix)
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -636,6 +647,18 @@ object LocalLog extends Logging {
|
||||||
logDirNameWithSuffix(topicPartition, FutureDirSuffix)
|
logDirNameWithSuffix(topicPartition, FutureDirSuffix)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return a new directory name in the following format: "${topic}-${partitionId}.${uniqueId}${suffix}".
|
||||||
|
* If the topic name is too long, it will be truncated to prevent the total name
|
||||||
|
* from exceeding 255 characters.
|
||||||
|
*/
|
||||||
|
private[log] def logDirNameWithSuffixCappedLength(topicPartition: TopicPartition, suffix: String): String = {
|
||||||
|
val uniqueId = java.util.UUID.randomUUID.toString.replaceAll("-", "")
|
||||||
|
val fullSuffix = s"-${topicPartition.partition()}.$uniqueId$suffix"
|
||||||
|
val prefixLength = Math.min(topicPartition.topic().size, 255 - fullSuffix.size)
|
||||||
|
s"${topicPartition.topic().substring(0, prefixLength)}$fullSuffix"
|
||||||
|
}
|
||||||
|
|
||||||
private[log] def logDirNameWithSuffix(topicPartition: TopicPartition, suffix: String): String = {
|
private[log] def logDirNameWithSuffix(topicPartition: TopicPartition, suffix: String): String = {
|
||||||
val uniqueId = java.util.UUID.randomUUID.toString.replaceAll("-", "")
|
val uniqueId = java.util.UUID.randomUUID.toString.replaceAll("-", "")
|
||||||
s"${logDirName(topicPartition)}.$uniqueId$suffix"
|
s"${logDirName(topicPartition)}.$uniqueId$suffix"
|
||||||
|
@ -666,11 +689,13 @@ object LocalLog extends Logging {
|
||||||
if (dirName == null || dirName.isEmpty || !dirName.contains('-'))
|
if (dirName == null || dirName.isEmpty || !dirName.contains('-'))
|
||||||
throw exception(dir)
|
throw exception(dir)
|
||||||
if (dirName.endsWith(DeleteDirSuffix) && !DeleteDirPattern.matcher(dirName).matches ||
|
if (dirName.endsWith(DeleteDirSuffix) && !DeleteDirPattern.matcher(dirName).matches ||
|
||||||
dirName.endsWith(FutureDirSuffix) && !FutureDirPattern.matcher(dirName).matches)
|
dirName.endsWith(FutureDirSuffix) && !FutureDirPattern.matcher(dirName).matches ||
|
||||||
|
dirName.endsWith(StrayDirSuffix) && !StrayDirPattern.matcher(dirName).matches)
|
||||||
throw exception(dir)
|
throw exception(dir)
|
||||||
|
|
||||||
val name: String =
|
val name: String =
|
||||||
if (dirName.endsWith(DeleteDirSuffix) || dirName.endsWith(FutureDirSuffix)) dirName.substring(0, dirName.lastIndexOf('.'))
|
if (dirName.endsWith(DeleteDirSuffix) || dirName.endsWith(FutureDirSuffix) || dirName.endsWith(StrayDirSuffix))
|
||||||
|
dirName.substring(0, dirName.lastIndexOf('.'))
|
||||||
else dirName
|
else dirName
|
||||||
|
|
||||||
val index = name.lastIndexOf('-')
|
val index = name.lastIndexOf('-')
|
||||||
|
|
|
@ -92,6 +92,10 @@ class LogManager(logDirs: Seq[File],
|
||||||
// Each element in the queue contains the log object to be deleted and the time it is scheduled for deletion.
|
// Each element in the queue contains the log object to be deleted and the time it is scheduled for deletion.
|
||||||
private val logsToBeDeleted = new LinkedBlockingQueue[(UnifiedLog, Long)]()
|
private val logsToBeDeleted = new LinkedBlockingQueue[(UnifiedLog, Long)]()
|
||||||
|
|
||||||
|
// Map of stray partition to stray log. This holds all stray logs detected on the broker.
|
||||||
|
// Visible for testing
|
||||||
|
private val strayLogs = new Pool[TopicPartition, UnifiedLog]()
|
||||||
|
|
||||||
private val _liveLogDirs: ConcurrentLinkedQueue[File] = createAndValidateLogDirs(logDirs, initialOfflineDirs)
|
private val _liveLogDirs: ConcurrentLinkedQueue[File] = createAndValidateLogDirs(logDirs, initialOfflineDirs)
|
||||||
@volatile private var _currentDefaultConfig = initialDefaultConfig
|
@volatile private var _currentDefaultConfig = initialDefaultConfig
|
||||||
@volatile private var numRecoveryThreadsPerDataDir = recoveryThreadsPerDataDir
|
@volatile private var numRecoveryThreadsPerDataDir = recoveryThreadsPerDataDir
|
||||||
|
@ -265,6 +269,10 @@ class LogManager(logDirs: Seq[File],
|
||||||
this.logsToBeDeleted.add((log, time.milliseconds()))
|
this.logsToBeDeleted.add((log, time.milliseconds()))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
def addStrayLog(strayPartition: TopicPartition, strayLog: UnifiedLog): Unit = {
|
||||||
|
this.strayLogs.put(strayPartition, strayLog)
|
||||||
|
}
|
||||||
|
|
||||||
// Only for testing
|
// Only for testing
|
||||||
private[log] def hasLogsToBeDeleted: Boolean = !logsToBeDeleted.isEmpty
|
private[log] def hasLogsToBeDeleted: Boolean = !logsToBeDeleted.isEmpty
|
||||||
|
|
||||||
|
@ -300,6 +308,9 @@ class LogManager(logDirs: Seq[File],
|
||||||
|
|
||||||
if (logDir.getName.endsWith(UnifiedLog.DeleteDirSuffix)) {
|
if (logDir.getName.endsWith(UnifiedLog.DeleteDirSuffix)) {
|
||||||
addLogToBeDeleted(log)
|
addLogToBeDeleted(log)
|
||||||
|
} else if (logDir.getName.endsWith(UnifiedLog.StrayDirSuffix)) {
|
||||||
|
addStrayLog(topicPartition, log)
|
||||||
|
warn(s"Loaded stray log: $logDir")
|
||||||
} else {
|
} else {
|
||||||
val previous = {
|
val previous = {
|
||||||
if (log.isFuture)
|
if (log.isFuture)
|
||||||
|
@ -1165,7 +1176,8 @@ class LogManager(logDirs: Seq[File],
|
||||||
*/
|
*/
|
||||||
def asyncDelete(topicPartition: TopicPartition,
|
def asyncDelete(topicPartition: TopicPartition,
|
||||||
isFuture: Boolean = false,
|
isFuture: Boolean = false,
|
||||||
checkpoint: Boolean = true): Option[UnifiedLog] = {
|
checkpoint: Boolean = true,
|
||||||
|
isStray: Boolean = false): Option[UnifiedLog] = {
|
||||||
val removedLog: Option[UnifiedLog] = logCreationOrDeletionLock synchronized {
|
val removedLog: Option[UnifiedLog] = logCreationOrDeletionLock synchronized {
|
||||||
removeLogAndMetrics(if (isFuture) futureLogs else currentLogs, topicPartition)
|
removeLogAndMetrics(if (isFuture) futureLogs else currentLogs, topicPartition)
|
||||||
}
|
}
|
||||||
|
@ -1178,15 +1190,21 @@ class LogManager(logDirs: Seq[File],
|
||||||
cleaner.updateCheckpoints(removedLog.parentDirFile, partitionToRemove = Option(topicPartition))
|
cleaner.updateCheckpoints(removedLog.parentDirFile, partitionToRemove = Option(topicPartition))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (isStray) {
|
||||||
|
// Move aside stray partitions, don't delete them
|
||||||
|
removedLog.renameDir(UnifiedLog.logStrayDirName(topicPartition), false)
|
||||||
|
warn(s"Log for partition ${removedLog.topicPartition} is marked as stray and renamed to ${removedLog.dir.getAbsolutePath}")
|
||||||
|
} else {
|
||||||
removedLog.renameDir(UnifiedLog.logDeleteDirName(topicPartition), false)
|
removedLog.renameDir(UnifiedLog.logDeleteDirName(topicPartition), false)
|
||||||
|
addLogToBeDeleted(removedLog)
|
||||||
|
info(s"Log for partition ${removedLog.topicPartition} is renamed to ${removedLog.dir.getAbsolutePath} and is scheduled for deletion")
|
||||||
|
}
|
||||||
if (checkpoint) {
|
if (checkpoint) {
|
||||||
val logDir = removedLog.parentDirFile
|
val logDir = removedLog.parentDirFile
|
||||||
val logsToCheckpoint = logsInDir(logDir)
|
val logsToCheckpoint = logsInDir(logDir)
|
||||||
checkpointRecoveryOffsetsInDir(logDir, logsToCheckpoint)
|
checkpointRecoveryOffsetsInDir(logDir, logsToCheckpoint)
|
||||||
checkpointLogStartOffsetsInDir(logDir, logsToCheckpoint)
|
checkpointLogStartOffsetsInDir(logDir, logsToCheckpoint)
|
||||||
}
|
}
|
||||||
addLogToBeDeleted(removedLog)
|
|
||||||
info(s"Log for partition ${removedLog.topicPartition} is renamed to ${removedLog.dir.getAbsolutePath} and is scheduled for deletion")
|
|
||||||
|
|
||||||
case None =>
|
case None =>
|
||||||
if (offlineLogDirs.nonEmpty) {
|
if (offlineLogDirs.nonEmpty) {
|
||||||
|
@ -1206,6 +1224,7 @@ class LogManager(logDirs: Seq[File],
|
||||||
* topic-partition is raised
|
* topic-partition is raised
|
||||||
*/
|
*/
|
||||||
def asyncDelete(topicPartitions: Set[TopicPartition],
|
def asyncDelete(topicPartitions: Set[TopicPartition],
|
||||||
|
isStray: Boolean,
|
||||||
errorHandler: (TopicPartition, Throwable) => Unit): Unit = {
|
errorHandler: (TopicPartition, Throwable) => Unit): Unit = {
|
||||||
val logDirs = mutable.Set.empty[File]
|
val logDirs = mutable.Set.empty[File]
|
||||||
|
|
||||||
|
@ -1213,11 +1232,11 @@ class LogManager(logDirs: Seq[File],
|
||||||
try {
|
try {
|
||||||
getLog(topicPartition).foreach { log =>
|
getLog(topicPartition).foreach { log =>
|
||||||
logDirs += log.parentDirFile
|
logDirs += log.parentDirFile
|
||||||
asyncDelete(topicPartition, checkpoint = false)
|
asyncDelete(topicPartition, checkpoint = false, isStray = isStray)
|
||||||
}
|
}
|
||||||
getLog(topicPartition, isFuture = true).foreach { log =>
|
getLog(topicPartition, isFuture = true).foreach { log =>
|
||||||
logDirs += log.parentDirFile
|
logDirs += log.parentDirFile
|
||||||
asyncDelete(topicPartition, isFuture = true, checkpoint = false)
|
asyncDelete(topicPartition, isFuture = true, checkpoint = false, isStray = isStray)
|
||||||
}
|
}
|
||||||
} catch {
|
} catch {
|
||||||
case e: Throwable => errorHandler(topicPartition, e)
|
case e: Throwable => errorHandler(topicPartition, e)
|
||||||
|
|
|
@ -1872,6 +1872,8 @@ object UnifiedLog extends Logging {
|
||||||
|
|
||||||
val DeleteDirSuffix = LocalLog.DeleteDirSuffix
|
val DeleteDirSuffix = LocalLog.DeleteDirSuffix
|
||||||
|
|
||||||
|
val StrayDirSuffix = LocalLog.StrayDirSuffix
|
||||||
|
|
||||||
val FutureDirSuffix = LocalLog.FutureDirSuffix
|
val FutureDirSuffix = LocalLog.FutureDirSuffix
|
||||||
|
|
||||||
private[log] val DeleteDirPattern = LocalLog.DeleteDirPattern
|
private[log] val DeleteDirPattern = LocalLog.DeleteDirPattern
|
||||||
|
@ -1956,6 +1958,8 @@ object UnifiedLog extends Logging {
|
||||||
|
|
||||||
def logFutureDirName(topicPartition: TopicPartition): String = LocalLog.logFutureDirName(topicPartition)
|
def logFutureDirName(topicPartition: TopicPartition): String = LocalLog.logFutureDirName(topicPartition)
|
||||||
|
|
||||||
|
def logStrayDirName(topicPartition: TopicPartition): String = LocalLog.logStrayDirName(topicPartition)
|
||||||
|
|
||||||
def logDirName(topicPartition: TopicPartition): String = LocalLog.logDirName(topicPartition)
|
def logDirName(topicPartition: TopicPartition): String = LocalLog.logDirName(topicPartition)
|
||||||
|
|
||||||
def offsetIndexFile(dir: File, offset: Long, suffix: String = ""): File = LogFileUtils.offsetIndexFile(dir, offset, suffix)
|
def offsetIndexFile(dir: File, offset: Long, suffix: String = ""): File = LogFileUtils.offsetIndexFile(dir, offset, suffix)
|
||||||
|
|
|
@ -22,6 +22,7 @@ import kafka.controller.{ControllerChannelContext, ControllerChannelManager, Rep
|
||||||
import kafka.server.KafkaConfig
|
import kafka.server.KafkaConfig
|
||||||
import org.apache.kafka.common.TopicPartition
|
import org.apache.kafka.common.TopicPartition
|
||||||
import org.apache.kafka.common.metrics.Metrics
|
import org.apache.kafka.common.metrics.Metrics
|
||||||
|
import org.apache.kafka.common.requests.LeaderAndIsrRequest
|
||||||
import org.apache.kafka.common.utils.Time
|
import org.apache.kafka.common.utils.Time
|
||||||
import org.apache.kafka.image.{ClusterImage, MetadataDelta, MetadataImage, TopicsImage}
|
import org.apache.kafka.image.{ClusterImage, MetadataDelta, MetadataImage, TopicsImage}
|
||||||
import org.apache.kafka.metadata.PartitionRegistration
|
import org.apache.kafka.metadata.PartitionRegistration
|
||||||
|
@ -225,6 +226,7 @@ class MigrationPropagator(
|
||||||
requestBatch.sendRequestsToBrokers(zkControllerEpoch)
|
requestBatch.sendRequestsToBrokers(zkControllerEpoch)
|
||||||
|
|
||||||
requestBatch.newBatch()
|
requestBatch.newBatch()
|
||||||
|
requestBatch.setUpdateType(LeaderAndIsrRequest.Type.FULL)
|
||||||
// When we need to send RPCs from the image, we're sending 'full' requests meaning we let
|
// When we need to send RPCs from the image, we're sending 'full' requests meaning we let
|
||||||
// every broker know about all the metadata and all the LISR requests it needs to handle.
|
// every broker know about all the metadata and all the LISR requests it needs to handle.
|
||||||
// Note that we cannot send StopReplica requests from the image. We don't have any state
|
// Note that we cannot send StopReplica requests from the image. We don't have any state
|
||||||
|
|
|
@ -394,6 +394,47 @@ class ReplicaManager(val config: KafkaConfig,
|
||||||
brokerTopicStats.removeMetrics(topic)
|
brokerTopicStats.removeMetrics(topic)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private[server] def updateStrayLogs(strayPartitions: Set[TopicPartition]): Unit = {
|
||||||
|
if (strayPartitions.isEmpty) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
warn(s"Found stray partitions ${strayPartitions.mkString(",")}")
|
||||||
|
|
||||||
|
// First, stop the partitions. This will shutdown the fetchers and other managers
|
||||||
|
val partitionsToStop = strayPartitions.map { tp => tp -> false }.toMap
|
||||||
|
stopPartitions(partitionsToStop).forKeyValue { (topicPartition, exception) =>
|
||||||
|
error(s"Unable to stop stray partition $topicPartition", exception)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Next, delete the in-memory partition state. Normally, stopPartitions would do this, but since we're not
|
||||||
|
// actually deleting the log, so we can't rely on the "deleteLocalLog" behavior in stopPartitions.
|
||||||
|
strayPartitions.foreach { topicPartition =>
|
||||||
|
getPartition(topicPartition) match {
|
||||||
|
case hostedPartition: HostedPartition.Online =>
|
||||||
|
if (allPartitions.remove(topicPartition, hostedPartition)) {
|
||||||
|
maybeRemoveTopicMetrics(topicPartition.topic)
|
||||||
|
hostedPartition.partition.delete()
|
||||||
|
}
|
||||||
|
case _ =>
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Mark the log as stray in-memory and rename the directory
|
||||||
|
strayPartitions.foreach { tp =>
|
||||||
|
logManager.getLog(tp).foreach(logManager.addStrayLog(tp, _))
|
||||||
|
logManager.getLog(tp, isFuture = true).foreach(logManager.addStrayLog(tp, _))
|
||||||
|
}
|
||||||
|
logManager.asyncDelete(strayPartitions, isStray = true, (topicPartition, e) => {
|
||||||
|
error(s"Failed to delete stray partition $topicPartition due to " +
|
||||||
|
s"${e.getClass.getName} exception: ${e.getMessage}")
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// Find logs which exist on the broker, but aren't present in the full LISR
|
||||||
|
private[server] def findStrayPartitionsFromLeaderAndIsr(partitionsFromRequest: Set[TopicPartition]): Set[TopicPartition] = {
|
||||||
|
logManager.allLogs.map(_.topicPartition).filterNot(partitionsFromRequest.contains).toSet
|
||||||
|
}
|
||||||
|
|
||||||
protected def completeDelayedFetchOrProduceRequests(topicPartition: TopicPartition): Unit = {
|
protected def completeDelayedFetchOrProduceRequests(topicPartition: TopicPartition): Unit = {
|
||||||
val topicPartitionOperationKey = TopicPartitionOperationKey(topicPartition)
|
val topicPartitionOperationKey = TopicPartitionOperationKey(topicPartition)
|
||||||
delayedProducePurgatory.checkAndComplete(topicPartitionOperationKey)
|
delayedProducePurgatory.checkAndComplete(topicPartitionOperationKey)
|
||||||
|
@ -583,7 +624,7 @@ class ReplicaManager(val config: KafkaConfig,
|
||||||
val errorMap = new mutable.HashMap[TopicPartition, Throwable]()
|
val errorMap = new mutable.HashMap[TopicPartition, Throwable]()
|
||||||
if (partitionsToDelete.nonEmpty) {
|
if (partitionsToDelete.nonEmpty) {
|
||||||
// Delete the logs and checkpoint.
|
// Delete the logs and checkpoint.
|
||||||
logManager.asyncDelete(partitionsToDelete, (tp, e) => errorMap.put(tp, e))
|
logManager.asyncDelete(partitionsToDelete, isStray = false, (tp, e) => errorMap.put(tp, e))
|
||||||
}
|
}
|
||||||
remoteLogManager.foreach { rlm =>
|
remoteLogManager.foreach { rlm =>
|
||||||
// exclude the partitions with offline/error state
|
// exclude the partitions with offline/error state
|
||||||
|
@ -1741,10 +1782,12 @@ class ReplicaManager(val config: KafkaConfig,
|
||||||
val partitionsToBeLeader = new mutable.HashMap[Partition, LeaderAndIsrPartitionState]()
|
val partitionsToBeLeader = new mutable.HashMap[Partition, LeaderAndIsrPartitionState]()
|
||||||
val partitionsToBeFollower = new mutable.HashMap[Partition, LeaderAndIsrPartitionState]()
|
val partitionsToBeFollower = new mutable.HashMap[Partition, LeaderAndIsrPartitionState]()
|
||||||
val topicIdUpdateFollowerPartitions = new mutable.HashSet[Partition]()
|
val topicIdUpdateFollowerPartitions = new mutable.HashSet[Partition]()
|
||||||
|
val allTopicPartitionsInRequest = new mutable.HashSet[TopicPartition]()
|
||||||
|
|
||||||
// First create the partition if it doesn't exist already
|
// First create the partition if it doesn't exist already
|
||||||
requestPartitionStates.foreach { partitionState =>
|
requestPartitionStates.foreach { partitionState =>
|
||||||
val topicPartition = new TopicPartition(partitionState.topicName, partitionState.partitionIndex)
|
val topicPartition = new TopicPartition(partitionState.topicName, partitionState.partitionIndex)
|
||||||
|
allTopicPartitionsInRequest += topicPartition
|
||||||
val partitionOpt = getPartition(topicPartition) match {
|
val partitionOpt = getPartition(topicPartition) match {
|
||||||
case HostedPartition.Offline =>
|
case HostedPartition.Offline =>
|
||||||
stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from " +
|
stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from " +
|
||||||
|
@ -1853,6 +1896,17 @@ class ReplicaManager(val config: KafkaConfig,
|
||||||
// have been completely populated before starting the checkpointing there by avoiding weird race conditions
|
// have been completely populated before starting the checkpointing there by avoiding weird race conditions
|
||||||
startHighWatermarkCheckPointThread()
|
startHighWatermarkCheckPointThread()
|
||||||
|
|
||||||
|
// In migration mode, reconcile missed topic deletions when handling full LISR from KRaft controller.
|
||||||
|
// LISR "type" field was previously unspecified (0), so if we see it set to Full (2), then we know the
|
||||||
|
// request came from a KRaft controller.
|
||||||
|
if (
|
||||||
|
config.migrationEnabled &&
|
||||||
|
leaderAndIsrRequest.isKRaftController &&
|
||||||
|
leaderAndIsrRequest.requestType() == LeaderAndIsrRequest.Type.FULL
|
||||||
|
) {
|
||||||
|
updateStrayLogs(findStrayPartitionsFromLeaderAndIsr(allTopicPartitionsInRequest))
|
||||||
|
}
|
||||||
|
|
||||||
maybeAddLogDirFetchers(partitions, highWatermarkCheckpoints, topicIdFromRequest)
|
maybeAddLogDirFetchers(partitions, highWatermarkCheckpoints, topicIdFromRequest)
|
||||||
|
|
||||||
replicaFetcherManager.shutdownIdleFetcherThreads()
|
replicaFetcherManager.shutdownIdleFetcherThreads()
|
||||||
|
|
|
@ -47,8 +47,17 @@ class ZkTopicMigrationClient(zkClient: KafkaZkClient) extends TopicMigrationClie
|
||||||
if (!interests.contains(TopicVisitorInterest.TOPICS)) {
|
if (!interests.contains(TopicVisitorInterest.TOPICS)) {
|
||||||
throw new IllegalArgumentException("Must specify at least TOPICS in topic visitor interests.")
|
throw new IllegalArgumentException("Must specify at least TOPICS in topic visitor interests.")
|
||||||
}
|
}
|
||||||
val topics = zkClient.getAllTopicsInCluster()
|
val allTopics = zkClient.getAllTopicsInCluster()
|
||||||
val replicaAssignmentAndTopicIds = zkClient.getReplicaAssignmentAndTopicIdForTopics(topics)
|
val topicDeletions = readPendingTopicDeletions().asScala
|
||||||
|
val topicsToMigrated = allTopics -- topicDeletions
|
||||||
|
if (topicDeletions.nonEmpty) {
|
||||||
|
warn(s"Found ${topicDeletions.size} pending topic deletions. These will be not migrated " +
|
||||||
|
s"to KRaft. After the migration, the brokers will reconcile their logs with these pending topic deletions.")
|
||||||
|
}
|
||||||
|
topicDeletions.foreach {
|
||||||
|
deletion => logger.info(s"Not migrating pending deleted topic: $deletion")
|
||||||
|
}
|
||||||
|
val replicaAssignmentAndTopicIds = zkClient.getReplicaAssignmentAndTopicIdForTopics(topicsToMigrated)
|
||||||
replicaAssignmentAndTopicIds.foreach { case TopicIdReplicaAssignment(topic, topicIdOpt, partitionAssignments) =>
|
replicaAssignmentAndTopicIds.foreach { case TopicIdReplicaAssignment(topic, topicIdOpt, partitionAssignments) =>
|
||||||
val topicAssignment = partitionAssignments.map { case (partition, assignment) =>
|
val topicAssignment = partitionAssignments.map { case (partition, assignment) =>
|
||||||
partition.partition().asInstanceOf[Integer] -> assignment.replicas.map(Integer.valueOf).asJava
|
partition.partition().asInstanceOf[Integer] -> assignment.replicas.map(Integer.valueOf).asJava
|
||||||
|
@ -206,6 +215,7 @@ class ZkTopicMigrationClient(zkClient: KafkaZkClient) extends TopicMigrationClie
|
||||||
DeleteRequest(ConfigEntityZNode.path(ConfigType.Topic, topicName), ZkVersion.MatchAnyVersion),
|
DeleteRequest(ConfigEntityZNode.path(ConfigType.Topic, topicName), ZkVersion.MatchAnyVersion),
|
||||||
DeleteRequest(TopicZNode.path(topicName), ZkVersion.MatchAnyVersion)
|
DeleteRequest(TopicZNode.path(topicName), ZkVersion.MatchAnyVersion)
|
||||||
)
|
)
|
||||||
|
|
||||||
val (migrationZkVersion, responses) = zkClient.retryMigrationRequestsUntilConnected(deleteRequests, state)
|
val (migrationZkVersion, responses) = zkClient.retryMigrationRequestsUntilConnected(deleteRequests, state)
|
||||||
val resultCodes = responses.map { response => response.path -> response.resultCode }.toMap
|
val resultCodes = responses.map { response => response.path -> response.resultCode }.toMap
|
||||||
if (responses.last.resultCode.equals(Code.OK)) {
|
if (responses.last.resultCode.equals(Code.OK)) {
|
||||||
|
@ -316,4 +326,25 @@ class ZkTopicMigrationClient(zkClient: KafkaZkClient) extends TopicMigrationClie
|
||||||
val (path, data) = partitionStatePathAndData(topicPartition, partitionRegistration, controllerEpoch)
|
val (path, data) = partitionStatePathAndData(topicPartition, partitionRegistration, controllerEpoch)
|
||||||
SetDataRequest(path, data, ZkVersion.MatchAnyVersion, Some(topicPartition))
|
SetDataRequest(path, data, ZkVersion.MatchAnyVersion, Some(topicPartition))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
override def readPendingTopicDeletions(): util.Set[String] = {
|
||||||
|
zkClient.getTopicDeletions.toSet.asJava
|
||||||
|
}
|
||||||
|
|
||||||
|
override def clearPendingTopicDeletions(
|
||||||
|
pendingTopicDeletions: util.Set[String],
|
||||||
|
state: ZkMigrationLeadershipState
|
||||||
|
): ZkMigrationLeadershipState = {
|
||||||
|
val deleteRequests = pendingTopicDeletions.asScala.map { topicName =>
|
||||||
|
DeleteRequest(DeleteTopicsTopicZNode.path(topicName), ZkVersion.MatchAnyVersion)
|
||||||
|
}.toSeq
|
||||||
|
|
||||||
|
val (migrationZkVersion, responses) = zkClient.retryMigrationRequestsUntilConnected(deleteRequests.toSeq, state)
|
||||||
|
val resultCodes = responses.map { response => response.path -> response.resultCode }.toMap
|
||||||
|
if (resultCodes.forall { case (_, code) => code.equals(Code.OK) }) {
|
||||||
|
state.withMigrationZkVersion(migrationZkVersion)
|
||||||
|
} else {
|
||||||
|
throw new MigrationClientException(s"Failed to delete pending topic deletions: $pendingTopicDeletions. ZK transaction had results $resultCodes")
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -47,7 +47,7 @@ import org.apache.kafka.metadata.migration.ZkMigrationLeadershipState
|
||||||
import org.apache.kafka.raft.RaftConfig
|
import org.apache.kafka.raft.RaftConfig
|
||||||
import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion, ProducerIdsBlock}
|
import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion, ProducerIdsBlock}
|
||||||
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertNotEquals, assertNotNull, assertTrue}
|
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertNotEquals, assertNotNull, assertTrue}
|
||||||
import org.junit.jupiter.api.Timeout
|
import org.junit.jupiter.api.{Assumptions, Timeout}
|
||||||
import org.junit.jupiter.api.extension.ExtendWith
|
import org.junit.jupiter.api.extension.ExtendWith
|
||||||
import org.slf4j.LoggerFactory
|
import org.slf4j.LoggerFactory
|
||||||
|
|
||||||
|
@ -267,6 +267,120 @@ class ZkMigrationIntegrationTest {
|
||||||
migrationState = migrationClient.releaseControllerLeadership(migrationState)
|
migrationState = migrationClient.releaseControllerLeadership(migrationState)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ClusterTemplate("zkClustersForAllMigrationVersions")
|
||||||
|
def testMigrateTopicDeletions(zkCluster: ClusterInstance): Unit = {
|
||||||
|
// Create some topics in ZK mode
|
||||||
|
var admin = zkCluster.createAdminClient()
|
||||||
|
val newTopics = new util.ArrayList[NewTopic]()
|
||||||
|
newTopics.add(new NewTopic("test-topic-1", 10, 3.toShort))
|
||||||
|
newTopics.add(new NewTopic("test-topic-2", 10, 3.toShort))
|
||||||
|
newTopics.add(new NewTopic("test-topic-3", 10, 3.toShort))
|
||||||
|
newTopics.add(new NewTopic("test-topic-4", 10, 3.toShort))
|
||||||
|
newTopics.add(new NewTopic("test-topic-5", 10, 3.toShort))
|
||||||
|
val createTopicResult = admin.createTopics(newTopics)
|
||||||
|
createTopicResult.all().get(300, TimeUnit.SECONDS)
|
||||||
|
admin.close()
|
||||||
|
val zkClient = zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying().zkClient
|
||||||
|
|
||||||
|
// Bootstrap the ZK cluster ID into KRaft
|
||||||
|
val clusterId = zkCluster.clusterId()
|
||||||
|
val kraftCluster = new KafkaClusterTestKit.Builder(
|
||||||
|
new TestKitNodes.Builder().
|
||||||
|
setBootstrapMetadataVersion(zkCluster.config().metadataVersion()).
|
||||||
|
setClusterId(Uuid.fromString(clusterId)).
|
||||||
|
setNumBrokerNodes(0).
|
||||||
|
setNumControllerNodes(1).build())
|
||||||
|
.setConfigProp(KafkaConfig.MigrationEnabledProp, "true")
|
||||||
|
.setConfigProp(KafkaConfig.ZkConnectProp, zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkConnect)
|
||||||
|
.build()
|
||||||
|
try {
|
||||||
|
kraftCluster.format()
|
||||||
|
kraftCluster.startup()
|
||||||
|
val readyFuture = kraftCluster.controllers().values().asScala.head.controller.waitForReadyBrokers(3)
|
||||||
|
|
||||||
|
// Start a deletion that will take some time, but don't wait for it
|
||||||
|
admin = zkCluster.createAdminClient()
|
||||||
|
admin.deleteTopics(Seq("test-topic-1", "test-topic-2", "test-topic-3", "test-topic-4", "test-topic-5").asJava)
|
||||||
|
admin.close()
|
||||||
|
|
||||||
|
// Enable migration configs and restart brokers
|
||||||
|
log.info("Restart brokers in migration mode")
|
||||||
|
val clientProps = kraftCluster.controllerClientProperties()
|
||||||
|
val voters = clientProps.get(RaftConfig.QUORUM_VOTERS_CONFIG)
|
||||||
|
zkCluster.config().serverProperties().put(KafkaConfig.MigrationEnabledProp, "true")
|
||||||
|
zkCluster.config().serverProperties().put(RaftConfig.QUORUM_VOTERS_CONFIG, voters)
|
||||||
|
zkCluster.config().serverProperties().put(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER")
|
||||||
|
zkCluster.config().serverProperties().put(KafkaConfig.ListenerSecurityProtocolMapProp, "CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT")
|
||||||
|
zkCluster.rollingBrokerRestart()
|
||||||
|
|
||||||
|
zkCluster.waitForReadyBrokers()
|
||||||
|
readyFuture.get(60, TimeUnit.SECONDS)
|
||||||
|
|
||||||
|
// Only continue with the test if there are some pending deletions to verify. If there are not any pending
|
||||||
|
// deletions, this will mark the test as "skipped" instead of failed.
|
||||||
|
val topicDeletions = zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkClient.getTopicDeletions
|
||||||
|
Assumptions.assumeTrue(topicDeletions.nonEmpty,
|
||||||
|
"This test needs pending topic deletions after a migration in order to verify the behavior")
|
||||||
|
|
||||||
|
// Wait for migration to begin
|
||||||
|
log.info("Waiting for ZK migration to complete")
|
||||||
|
TestUtils.waitUntilTrue(
|
||||||
|
() => zkClient.getOrCreateMigrationState(ZkMigrationLeadershipState.EMPTY).initialZkMigrationComplete(),
|
||||||
|
"Timed out waiting for migration to complete",
|
||||||
|
30000)
|
||||||
|
|
||||||
|
// At this point, some of the topics may have been deleted by ZK controller and the rest will be
|
||||||
|
// implicitly deleted by the KRaft controller and remove from the ZK brokers as stray partitions
|
||||||
|
admin = zkCluster.createAdminClient()
|
||||||
|
TestUtils.waitUntilTrue(
|
||||||
|
() => admin.listTopics().names().get(60, TimeUnit.SECONDS).isEmpty,
|
||||||
|
"Timed out waiting for topics to be deleted",
|
||||||
|
300000)
|
||||||
|
|
||||||
|
val newTopics = new util.ArrayList[NewTopic]()
|
||||||
|
newTopics.add(new NewTopic("test-topic-1", 2, 3.toShort))
|
||||||
|
newTopics.add(new NewTopic("test-topic-2", 1, 3.toShort))
|
||||||
|
newTopics.add(new NewTopic("test-topic-3", 10, 3.toShort))
|
||||||
|
val createTopicResult = admin.createTopics(newTopics)
|
||||||
|
createTopicResult.all().get(60, TimeUnit.SECONDS)
|
||||||
|
|
||||||
|
val expectedNewTopics = Seq("test-topic-1", "test-topic-2", "test-topic-3")
|
||||||
|
TestUtils.waitUntilTrue(
|
||||||
|
() => admin.listTopics().names().get(60, TimeUnit.SECONDS).equals(expectedNewTopics.toSet.asJava),
|
||||||
|
"Timed out waiting for topics to be created",
|
||||||
|
300000)
|
||||||
|
|
||||||
|
TestUtils.retry(300000) {
|
||||||
|
// Need a retry here since topic metadata may be inconsistent between brokers
|
||||||
|
val topicDescriptions = admin.describeTopics(expectedNewTopics.asJavaCollection)
|
||||||
|
.topicNameValues().asScala.map { case (name, description) =>
|
||||||
|
name -> description.get(60, TimeUnit.SECONDS)
|
||||||
|
}.toMap
|
||||||
|
|
||||||
|
assertEquals(2, topicDescriptions("test-topic-1").partitions().size())
|
||||||
|
assertEquals(1, topicDescriptions("test-topic-2").partitions().size())
|
||||||
|
assertEquals(10, topicDescriptions("test-topic-3").partitions().size())
|
||||||
|
topicDescriptions.foreach { case (topic, description) =>
|
||||||
|
description.partitions().forEach(partition => {
|
||||||
|
assertEquals(3, partition.replicas().size(), s"Unexpected number of replicas for ${topic}-${partition.partition()}")
|
||||||
|
assertEquals(3, partition.isr().size(), s"Unexpected ISR for ${topic}-${partition.partition()}")
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
val absentTopics = admin.listTopics().names().get(60, TimeUnit.SECONDS).asScala
|
||||||
|
assertTrue(absentTopics.contains("test-topic-1"))
|
||||||
|
assertTrue(absentTopics.contains("test-topic-2"))
|
||||||
|
assertTrue(absentTopics.contains("test-topic-3"))
|
||||||
|
assertFalse(absentTopics.contains("test-topic-4"))
|
||||||
|
assertFalse(absentTopics.contains("test-topic-5"))
|
||||||
|
}
|
||||||
|
|
||||||
|
admin.close()
|
||||||
|
} finally {
|
||||||
|
shutdownInSequence(zkCluster, kraftCluster)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// SCRAM and Quota are intermixed. Test SCRAM Only here
|
// SCRAM and Quota are intermixed. Test SCRAM Only here
|
||||||
@ClusterTest(clusterType = Type.ZK, brokers = 3, metadataVersion = MetadataVersion.IBP_3_5_IV2, serverProperties = Array(
|
@ClusterTest(clusterType = Type.ZK, brokers = 3, metadataVersion = MetadataVersion.IBP_3_5_IV2, serverProperties = Array(
|
||||||
new ClusterConfigProperty(key = "inter.broker.listener.name", value = "EXTERNAL"),
|
new ClusterConfigProperty(key = "inter.broker.listener.name", value = "EXTERNAL"),
|
||||||
|
|
|
@ -71,6 +71,7 @@ import org.apache.kafka.common.config.AbstractConfig
|
||||||
import org.apache.kafka.common.internals.Topic
|
import org.apache.kafka.common.internals.Topic
|
||||||
import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.{AddPartitionsToTxnTopic, AddPartitionsToTxnTopicCollection, AddPartitionsToTxnTransaction}
|
import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.{AddPartitionsToTxnTopic, AddPartitionsToTxnTopicCollection, AddPartitionsToTxnTransaction}
|
||||||
import org.apache.kafka.common.message.MetadataResponseData.{MetadataResponsePartition, MetadataResponseTopic}
|
import org.apache.kafka.common.message.MetadataResponseData.{MetadataResponsePartition, MetadataResponseTopic}
|
||||||
|
import org.apache.kafka.raft.RaftConfig
|
||||||
import org.apache.kafka.server.log.remote.storage.{NoOpRemoteLogMetadataManager, NoOpRemoteStorageManager, RemoteLogManagerConfig}
|
import org.apache.kafka.server.log.remote.storage.{NoOpRemoteLogMetadataManager, NoOpRemoteStorageManager, RemoteLogManagerConfig}
|
||||||
import org.apache.kafka.server.util.timer.MockTimer
|
import org.apache.kafka.server.util.timer.MockTimer
|
||||||
import org.mockito.invocation.InvocationOnMock
|
import org.mockito.invocation.InvocationOnMock
|
||||||
|
@ -565,7 +566,9 @@ class ReplicaManagerTest {
|
||||||
.setIsNew(true)).asJava,
|
.setIsNew(true)).asJava,
|
||||||
Collections.singletonMap(topic, Uuid.randomUuid()),
|
Collections.singletonMap(topic, Uuid.randomUuid()),
|
||||||
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava,
|
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava,
|
||||||
false).build()
|
false,
|
||||||
|
LeaderAndIsrRequest.Type.UNKNOWN
|
||||||
|
).build()
|
||||||
replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) => ())
|
replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) => ())
|
||||||
replicaManager.getPartitionOrException(new TopicPartition(topic, partition))
|
replicaManager.getPartitionOrException(new TopicPartition(topic, partition))
|
||||||
.localLogOrException
|
.localLogOrException
|
||||||
|
@ -2635,6 +2638,130 @@ class ReplicaManagerTest {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ParameterizedTest
|
||||||
|
@ValueSource(booleans = Array(true, false))
|
||||||
|
def testFullLeaderAndIsrStrayPartitions(zkMigrationEnabled: Boolean): Unit = {
|
||||||
|
val props = TestUtils.createBrokerConfig(1, TestUtils.MockZkConnect)
|
||||||
|
if (zkMigrationEnabled) {
|
||||||
|
props.put(KafkaConfig.MigrationEnabledProp, zkMigrationEnabled)
|
||||||
|
props.put(RaftConfig.QUORUM_VOTERS_CONFIG, "3000@localhost:9071")
|
||||||
|
props.put(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER")
|
||||||
|
props.put(KafkaConfig.ListenerSecurityProtocolMapProp, "CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT")
|
||||||
|
config = KafkaConfig.fromProps(props)
|
||||||
|
}
|
||||||
|
|
||||||
|
val logManager = TestUtils.createLogManager(config.logDirs.map(new File(_)), defaultConfig = new LogConfig(new Properties()), time = time)
|
||||||
|
val replicaManager = new ReplicaManager(
|
||||||
|
metrics = metrics,
|
||||||
|
config = config,
|
||||||
|
time = time,
|
||||||
|
scheduler = time.scheduler,
|
||||||
|
logManager = logManager,
|
||||||
|
quotaManagers = QuotaFactory.instantiate(config, metrics, time, ""),
|
||||||
|
metadataCache = MetadataCache.zkMetadataCache(config.brokerId, config.interBrokerProtocolVersion),
|
||||||
|
logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
|
||||||
|
alterPartitionManager = alterPartitionManager,
|
||||||
|
threadNamePrefix = Option(this.getClass.getName))
|
||||||
|
|
||||||
|
logManager.startup(Set.empty[String])
|
||||||
|
|
||||||
|
// Create a hosted topic, a hosted topic that will become stray
|
||||||
|
createHostedLogs("hosted-topic", numLogs = 2, replicaManager).toSet
|
||||||
|
createHostedLogs("hosted-stray", numLogs = 10, replicaManager).toSet
|
||||||
|
|
||||||
|
val lisr = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion,
|
||||||
|
3000, 0, brokerEpoch,
|
||||||
|
Seq(
|
||||||
|
new LeaderAndIsrPartitionState()
|
||||||
|
.setTopicName("hosted-topic")
|
||||||
|
.setPartitionIndex(0)
|
||||||
|
.setControllerEpoch(controllerEpoch)
|
||||||
|
.setLeader(0)
|
||||||
|
.setLeaderEpoch(10)
|
||||||
|
.setIsr(Seq[Integer](0, 1).asJava)
|
||||||
|
.setPartitionEpoch(0)
|
||||||
|
.setReplicas(Seq[Integer](0, 1).asJava)
|
||||||
|
.setIsNew(false),
|
||||||
|
new LeaderAndIsrPartitionState()
|
||||||
|
.setTopicName("hosted-topic")
|
||||||
|
.setPartitionIndex(1)
|
||||||
|
.setControllerEpoch(controllerEpoch)
|
||||||
|
.setLeader(1)
|
||||||
|
.setLeaderEpoch(10)
|
||||||
|
.setIsr(Seq[Integer](1, 0).asJava)
|
||||||
|
.setPartitionEpoch(0)
|
||||||
|
.setReplicas(Seq[Integer](1, 0).asJava)
|
||||||
|
.setIsNew(false)
|
||||||
|
).asJava,
|
||||||
|
topicIds.asJava,
|
||||||
|
Set(new Node(0, "host0", 0), new Node(1, "host1", 1)).asJava,
|
||||||
|
true,
|
||||||
|
LeaderAndIsrRequest.Type.FULL
|
||||||
|
).build()
|
||||||
|
|
||||||
|
replicaManager.becomeLeaderOrFollower(0, lisr, (_, _) => ())
|
||||||
|
|
||||||
|
val ht0 = replicaManager.getPartition(new TopicPartition("hosted-topic", 0))
|
||||||
|
assertTrue(ht0.isInstanceOf[HostedPartition.Online])
|
||||||
|
|
||||||
|
val stray0 = replicaManager.getPartition(new TopicPartition("hosted-stray", 0))
|
||||||
|
|
||||||
|
if (zkMigrationEnabled) {
|
||||||
|
assertEquals(HostedPartition.None, stray0)
|
||||||
|
} else {
|
||||||
|
assertTrue(stray0.isInstanceOf[HostedPartition.Online])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
def testUpdateStrayLogs(): Unit = {
|
||||||
|
val logManager = TestUtils.createLogManager(config.logDirs.map(new File(_)), defaultConfig = new LogConfig(new Properties()), time = time)
|
||||||
|
val replicaManager = new ReplicaManager(
|
||||||
|
metrics = metrics,
|
||||||
|
config = config,
|
||||||
|
time = time,
|
||||||
|
scheduler = time.scheduler,
|
||||||
|
logManager = logManager,
|
||||||
|
quotaManagers = QuotaFactory.instantiate(config, metrics, time, ""),
|
||||||
|
metadataCache = MetadataCache.zkMetadataCache(config.brokerId, config.interBrokerProtocolVersion),
|
||||||
|
logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
|
||||||
|
alterPartitionManager = alterPartitionManager,
|
||||||
|
threadNamePrefix = Option(this.getClass.getName))
|
||||||
|
|
||||||
|
logManager.startup(Set.empty[String])
|
||||||
|
|
||||||
|
// Create a hosted topic, a hosted topic that will become stray, and a stray topic
|
||||||
|
val validLogs = createHostedLogs("hosted-topic", numLogs = 2, replicaManager).toSet
|
||||||
|
createHostedLogs("hosted-stray", numLogs = 10, replicaManager).toSet
|
||||||
|
createStrayLogs(10, logManager)
|
||||||
|
|
||||||
|
val allReplicasFromLISR = Set(new TopicPartition("hosted-topic", 0), new TopicPartition("hosted-topic", 1))
|
||||||
|
|
||||||
|
replicaManager.updateStrayLogs(replicaManager.findStrayPartitionsFromLeaderAndIsr(allReplicasFromLISR))
|
||||||
|
|
||||||
|
assertEquals(validLogs, logManager.allLogs.toSet)
|
||||||
|
assertEquals(validLogs.size, replicaManager.partitionCount.value)
|
||||||
|
|
||||||
|
replicaManager.shutdown()
|
||||||
|
logManager.shutdown()
|
||||||
|
}
|
||||||
|
|
||||||
|
private def createHostedLogs(name: String, numLogs: Int, replicaManager: ReplicaManager): Seq[UnifiedLog] = {
|
||||||
|
for (i <- 0 until numLogs) yield {
|
||||||
|
val topicPartition = new TopicPartition(name, i)
|
||||||
|
val partition = replicaManager.createPartition(topicPartition)
|
||||||
|
partition.createLogIfNotExists(isNew = true, isFutureReplica = false,
|
||||||
|
new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints), topicId = None)
|
||||||
|
partition.log.get
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private def createStrayLogs(numLogs: Int, logManager: LogManager): Seq[UnifiedLog] = {
|
||||||
|
val name = "stray"
|
||||||
|
for (i <- 0 until numLogs)
|
||||||
|
yield logManager.getOrCreateLog(new TopicPartition(name, i), topicId = None)
|
||||||
|
}
|
||||||
|
|
||||||
private def sendProducerAppend(
|
private def sendProducerAppend(
|
||||||
replicaManager: ReplicaManager,
|
replicaManager: ReplicaManager,
|
||||||
topicPartition: TopicPartition,
|
topicPartition: TopicPartition,
|
||||||
|
|
|
@ -307,7 +307,8 @@ public class KRaftMigrationDriver implements MetadataPublisher {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void transitionTo(MigrationDriverState newState) {
|
// Visible for testing
|
||||||
|
void transitionTo(MigrationDriverState newState) {
|
||||||
if (!isValidStateChange(newState)) {
|
if (!isValidStateChange(newState)) {
|
||||||
throw new IllegalStateException(
|
throw new IllegalStateException(
|
||||||
String.format("Invalid transition in migration driver from %s to %s", migrationState, newState));
|
String.format("Invalid transition in migration driver from %s to %s", migrationState, newState));
|
||||||
|
@ -497,6 +498,17 @@ public class KRaftMigrationDriver implements MetadataPublisher {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Until the metadata has been migrated, the migrationLeadershipState offset is -1. We need to ignore
|
||||||
|
// metadata images until we see that the migration has happened and the image exceeds the offset of the
|
||||||
|
// migration
|
||||||
|
if (!migrationLeadershipState.initialZkMigrationComplete()) {
|
||||||
|
log.info("Ignoring {} {} since the migration has not finished.", metadataType, provenance);
|
||||||
|
completionHandler.accept(null);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// If the migration has finished, the migrationLeadershipState offset will be positive. Ignore any images
|
||||||
|
// which are older than the offset that has been written to ZK.
|
||||||
if (image.highestOffsetAndEpoch().compareTo(migrationLeadershipState.offsetAndEpoch()) < 0) {
|
if (image.highestOffsetAndEpoch().compareTo(migrationLeadershipState.offsetAndEpoch()) < 0) {
|
||||||
log.info("Ignoring {} {} which contains metadata that has already been written to ZK.", metadataType, provenance);
|
log.info("Ignoring {} {} which contains metadata that has already been written to ZK.", metadataType, provenance);
|
||||||
completionHandler.accept(null);
|
completionHandler.accept(null);
|
||||||
|
@ -531,14 +543,19 @@ public class KRaftMigrationDriver implements MetadataPublisher {
|
||||||
applyMigrationOperation("Updating ZK migration state after " + metadataType,
|
applyMigrationOperation("Updating ZK migration state after " + metadataType,
|
||||||
state -> zkMigrationClient.setMigrationRecoveryState(zkStateAfterDualWrite));
|
state -> zkMigrationClient.setMigrationRecoveryState(zkStateAfterDualWrite));
|
||||||
|
|
||||||
// TODO: Unhappy path: Probably relinquish leadership and let new controller
|
if (isSnapshot) {
|
||||||
// retry the write?
|
// When we load a snapshot, need to send full metadata updates to the brokers
|
||||||
|
log.debug("Sending full metadata RPCs to brokers for snapshot.");
|
||||||
|
propagator.sendRPCsToBrokersFromMetadataImage(image, migrationLeadershipState.zkControllerEpoch());
|
||||||
|
} else {
|
||||||
|
// delta
|
||||||
if (delta.topicsDelta() != null || delta.clusterDelta() != null) {
|
if (delta.topicsDelta() != null || delta.clusterDelta() != null) {
|
||||||
log.trace("Sending RPCs to brokers for metadata {}.", metadataType);
|
log.trace("Sending incremental metadata RPCs to brokers for delta.");
|
||||||
propagator.sendRPCsToBrokersFromMetadataDelta(delta, image, migrationLeadershipState.zkControllerEpoch());
|
propagator.sendRPCsToBrokersFromMetadataDelta(delta, image, migrationLeadershipState.zkControllerEpoch());
|
||||||
} else {
|
} else {
|
||||||
log.trace("Not sending RPCs to brokers for metadata {} since no relevant metadata has changed", metadataType);
|
log.trace("Not sending RPCs to brokers for metadata {} since no relevant metadata has changed", metadataType);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
completionHandler.accept(null);
|
completionHandler.accept(null);
|
||||||
}
|
}
|
||||||
|
@ -698,6 +715,13 @@ public class KRaftMigrationDriver implements MetadataPublisher {
|
||||||
@Override
|
@Override
|
||||||
public void run() throws Exception {
|
public void run() throws Exception {
|
||||||
if (checkDriverState(MigrationDriverState.SYNC_KRAFT_TO_ZK)) {
|
if (checkDriverState(MigrationDriverState.SYNC_KRAFT_TO_ZK)) {
|
||||||
|
// The migration offset will be non-negative at this point, so we just need to check that the image
|
||||||
|
// we have actually includes the migration metadata.
|
||||||
|
if (image.highestOffsetAndEpoch().compareTo(migrationLeadershipState.offsetAndEpoch()) < 0) {
|
||||||
|
log.info("Ignoring image {} which does not contain a superset of the metadata in ZK. Staying in " +
|
||||||
|
"SYNC_KRAFT_TO_ZK until a newer image is loaded", image.provenance());
|
||||||
|
return;
|
||||||
|
}
|
||||||
log.info("Performing a full metadata sync from KRaft to ZK.");
|
log.info("Performing a full metadata sync from KRaft to ZK.");
|
||||||
Map<String, Integer> dualWriteCounts = new TreeMap<>();
|
Map<String, Integer> dualWriteCounts = new TreeMap<>();
|
||||||
long startTime = time.nanoseconds();
|
long startTime = time.nanoseconds();
|
||||||
|
|
|
@ -69,6 +69,7 @@ public class KRaftMigrationZkWriter {
|
||||||
private static final String CREATE_TOPIC = "CreateTopic";
|
private static final String CREATE_TOPIC = "CreateTopic";
|
||||||
private static final String UPDATE_TOPIC = "UpdateTopic";
|
private static final String UPDATE_TOPIC = "UpdateTopic";
|
||||||
private static final String DELETE_TOPIC = "DeleteTopic";
|
private static final String DELETE_TOPIC = "DeleteTopic";
|
||||||
|
private static final String DELETE_PENDING_TOPIC_DELETION = "DeletePendingTopicDeletion";
|
||||||
private static final String UPDATE_PARTITION = "UpdatePartition";
|
private static final String UPDATE_PARTITION = "UpdatePartition";
|
||||||
private static final String DELETE_PARTITION = "DeletePartition";
|
private static final String DELETE_PARTITION = "DeletePartition";
|
||||||
private static final String UPDATE_BROKER_CONFIG = "UpdateBrokerConfig";
|
private static final String UPDATE_BROKER_CONFIG = "UpdateBrokerConfig";
|
||||||
|
@ -146,6 +147,15 @@ public class KRaftMigrationZkWriter {
|
||||||
Map<Uuid, Map<Integer, PartitionRegistration>> changedPartitions = new HashMap<>();
|
Map<Uuid, Map<Integer, PartitionRegistration>> changedPartitions = new HashMap<>();
|
||||||
Map<Uuid, Map<Integer, PartitionRegistration>> newPartitions = new HashMap<>();
|
Map<Uuid, Map<Integer, PartitionRegistration>> newPartitions = new HashMap<>();
|
||||||
|
|
||||||
|
Set<String> pendingTopicDeletions = migrationClient.topicClient().readPendingTopicDeletions();
|
||||||
|
if (!pendingTopicDeletions.isEmpty()) {
|
||||||
|
operationConsumer.accept(
|
||||||
|
DELETE_PENDING_TOPIC_DELETION,
|
||||||
|
"Delete pending topic deletions",
|
||||||
|
migrationState -> migrationClient.topicClient().clearPendingTopicDeletions(pendingTopicDeletions, migrationState)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
migrationClient.topicClient().iterateTopics(
|
migrationClient.topicClient().iterateTopics(
|
||||||
EnumSet.of(
|
EnumSet.of(
|
||||||
TopicMigrationClient.TopicVisitorInterest.TOPICS,
|
TopicMigrationClient.TopicVisitorInterest.TOPICS,
|
||||||
|
|
|
@ -42,6 +42,13 @@ public interface TopicMigrationClient {
|
||||||
|
|
||||||
void iterateTopics(EnumSet<TopicVisitorInterest> interests, TopicVisitor visitor);
|
void iterateTopics(EnumSet<TopicVisitorInterest> interests, TopicVisitor visitor);
|
||||||
|
|
||||||
|
Set<String> readPendingTopicDeletions();
|
||||||
|
|
||||||
|
ZkMigrationLeadershipState clearPendingTopicDeletions(
|
||||||
|
Set<String> pendingTopicDeletions,
|
||||||
|
ZkMigrationLeadershipState state
|
||||||
|
);
|
||||||
|
|
||||||
ZkMigrationLeadershipState deleteTopic(
|
ZkMigrationLeadershipState deleteTopic(
|
||||||
String topicName,
|
String topicName,
|
||||||
ZkMigrationLeadershipState state
|
ZkMigrationLeadershipState state
|
||||||
|
|
|
@ -21,6 +21,7 @@ import org.apache.kafka.common.Uuid;
|
||||||
import org.apache.kafka.metadata.PartitionRegistration;
|
import org.apache.kafka.metadata.PartitionRegistration;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.EnumSet;
|
import java.util.EnumSet;
|
||||||
import java.util.LinkedHashMap;
|
import java.util.LinkedHashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -51,7 +52,23 @@ public class CapturingTopicMigrationClient implements TopicMigrationClient {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ZkMigrationLeadershipState deleteTopic(String topicName, ZkMigrationLeadershipState state) {
|
public Set<String> readPendingTopicDeletions() {
|
||||||
|
return Collections.emptySet();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ZkMigrationLeadershipState clearPendingTopicDeletions(
|
||||||
|
Set<String> pendingTopicDeletions,
|
||||||
|
ZkMigrationLeadershipState state
|
||||||
|
) {
|
||||||
|
return state;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ZkMigrationLeadershipState deleteTopic(
|
||||||
|
String topicName,
|
||||||
|
ZkMigrationLeadershipState state
|
||||||
|
) {
|
||||||
deletedTopics.add(topicName);
|
deletedTopics.add(topicName);
|
||||||
return state;
|
return state;
|
||||||
}
|
}
|
||||||
|
|
|
@ -577,7 +577,7 @@ public class KRaftMigrationDriverTest {
|
||||||
|
|
||||||
// Wait for migration
|
// Wait for migration
|
||||||
TestUtils.waitForCondition(() -> driver.migrationState().get(1, TimeUnit.MINUTES).equals(MigrationDriverState.DUAL_WRITE),
|
TestUtils.waitForCondition(() -> driver.migrationState().get(1, TimeUnit.MINUTES).equals(MigrationDriverState.DUAL_WRITE),
|
||||||
"Waiting for KRaftMigrationDriver to enter ZK_MIGRATION state");
|
"Waiting for KRaftMigrationDriver to enter DUAL_WRITE state");
|
||||||
|
|
||||||
// Modify topics in a KRaft snapshot -- delete foo, modify bar, add baz
|
// Modify topics in a KRaft snapshot -- delete foo, modify bar, add baz
|
||||||
provenance = new MetadataProvenance(200, 1, 1);
|
provenance = new MetadataProvenance(200, 1, 1);
|
||||||
|
@ -596,6 +596,60 @@ public class KRaftMigrationDriverTest {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNoDualWriteBeforeMigration() throws Exception {
|
||||||
|
setupTopicDualWrite((driver, migrationClient, topicClient, configClient) -> {
|
||||||
|
MetadataImage image = new MetadataImage(
|
||||||
|
MetadataProvenance.EMPTY,
|
||||||
|
FeaturesImage.EMPTY,
|
||||||
|
ClusterImage.EMPTY,
|
||||||
|
IMAGE1,
|
||||||
|
ConfigurationsImage.EMPTY,
|
||||||
|
ClientQuotasImage.EMPTY,
|
||||||
|
ProducerIdsImage.EMPTY,
|
||||||
|
AclsImage.EMPTY,
|
||||||
|
ScramImage.EMPTY,
|
||||||
|
DelegationTokenImage.EMPTY);
|
||||||
|
MetadataDelta delta = new MetadataDelta(image);
|
||||||
|
|
||||||
|
driver.start();
|
||||||
|
delta.replay(ZkMigrationState.PRE_MIGRATION.toRecord().message());
|
||||||
|
delta.replay(zkBrokerRecord(0));
|
||||||
|
delta.replay(zkBrokerRecord(1));
|
||||||
|
delta.replay(zkBrokerRecord(2));
|
||||||
|
delta.replay(zkBrokerRecord(3));
|
||||||
|
delta.replay(zkBrokerRecord(4));
|
||||||
|
delta.replay(zkBrokerRecord(5));
|
||||||
|
MetadataProvenance provenance = new MetadataProvenance(100, 1, 1);
|
||||||
|
image = delta.apply(provenance);
|
||||||
|
|
||||||
|
// Publish a delta with this node (3000) as the leader
|
||||||
|
LeaderAndEpoch newLeader = new LeaderAndEpoch(OptionalInt.of(3000), 1);
|
||||||
|
driver.onControllerChange(newLeader);
|
||||||
|
|
||||||
|
TestUtils.waitForCondition(() -> driver.migrationState().get(1, TimeUnit.MINUTES).equals(MigrationDriverState.WAIT_FOR_CONTROLLER_QUORUM),
|
||||||
|
"Waiting for KRaftMigrationDriver to enter DUAL_WRITE state");
|
||||||
|
|
||||||
|
driver.onMetadataUpdate(delta, image, logDeltaManifestBuilder(provenance, newLeader).build());
|
||||||
|
|
||||||
|
driver.transitionTo(MigrationDriverState.WAIT_FOR_BROKERS);
|
||||||
|
driver.transitionTo(MigrationDriverState.BECOME_CONTROLLER);
|
||||||
|
driver.transitionTo(MigrationDriverState.ZK_MIGRATION);
|
||||||
|
driver.transitionTo(MigrationDriverState.SYNC_KRAFT_TO_ZK);
|
||||||
|
|
||||||
|
provenance = new MetadataProvenance(200, 1, 1);
|
||||||
|
delta = new MetadataDelta(image);
|
||||||
|
RecordTestUtils.replayAll(delta, DELTA1_RECORDS);
|
||||||
|
image = delta.apply(provenance);
|
||||||
|
driver.onMetadataUpdate(delta, image, new SnapshotManifest(provenance, 100));
|
||||||
|
|
||||||
|
|
||||||
|
// Wait for migration
|
||||||
|
TestUtils.waitForCondition(() -> driver.migrationState().get(1, TimeUnit.MINUTES).equals(MigrationDriverState.DUAL_WRITE),
|
||||||
|
"Waiting for KRaftMigrationDriver to enter DUAL_WRITE state");
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testControllerFailover() throws Exception {
|
public void testControllerFailover() throws Exception {
|
||||||
setupTopicDualWrite((driver, migrationClient, topicClient, configClient) -> {
|
setupTopicDualWrite((driver, migrationClient, topicClient, configClient) -> {
|
||||||
|
|
Loading…
Reference in New Issue