mirror of https://github.com/apache/kafka.git
KAFKA-16297: Race condition while promoting future replica (#15557)
If a future replica doesn't get promoted, any directory reassignment sent to the controller should be reversed. The current logic is already addressing the case when a replica hasn't yet been promoted and the controller hasn't yet acknowledged the directory reassignment. However, it doesn't cover the case where the replica does not get promoted due to a directory failure after the controller has acknowledged the reassignment but before the future replica catches up again and is promoted to main replica. Reviewers: Luke Chen <showuon@gmail.com>
This commit is contained in:
parent
e5a3f99100
commit
f6c9feea76
|
@ -18,8 +18,8 @@
|
|||
package kafka.server
|
||||
|
||||
import kafka.cluster.Partition
|
||||
import kafka.server.ReplicaAlterLogDirsThread.{DirectoryEventRequestState, QUEUED}
|
||||
import org.apache.kafka.common.TopicPartition
|
||||
import kafka.server.ReplicaAlterLogDirsThread.{PromotionState, ReassignmentState}
|
||||
import org.apache.kafka.common.{TopicPartition, Uuid}
|
||||
import org.apache.kafka.common.requests.FetchResponse
|
||||
import org.apache.kafka.server.common.{DirectoryEventHandler, OffsetAndEpoch, TopicIdPartition}
|
||||
import org.apache.kafka.storage.internals.log.{LogAppendInfo, LogStartOffsetIncrementReason}
|
||||
|
@ -45,7 +45,8 @@ class ReplicaAlterLogDirsThread(name: String,
|
|||
isInterruptible = false,
|
||||
brokerTopicStats) {
|
||||
|
||||
private val assignmentRequestStates: ConcurrentHashMap[TopicPartition, DirectoryEventRequestState] = new ConcurrentHashMap()
|
||||
// Visible for testing
|
||||
private[server] val promotionStates: ConcurrentHashMap[TopicPartition, PromotionState] = new ConcurrentHashMap()
|
||||
|
||||
override protected def latestEpoch(topicPartition: TopicPartition): Option[Int] = {
|
||||
replicaMgr.futureLocalLogOrException(topicPartition).latestEpoch
|
||||
|
@ -96,23 +97,26 @@ class ReplicaAlterLogDirsThread(name: String,
|
|||
}
|
||||
|
||||
override def removePartitions(topicPartitions: Set[TopicPartition]): Map[TopicPartition, PartitionFetchState] = {
|
||||
// Schedule assignment request to revert any queued request before cancelling
|
||||
for {
|
||||
topicPartition <- topicPartitions
|
||||
partitionState <- partitionAssignmentRequestState(topicPartition)
|
||||
if partitionState == QUEUED
|
||||
partition = replicaMgr.getPartitionOrException(topicPartition)
|
||||
topicId <- partition.topicId
|
||||
directoryId <- partition.logDirectoryId()
|
||||
topicIdPartition = new TopicIdPartition(topicId, topicPartition.partition())
|
||||
} directoryEventHandler.handleAssignment(topicIdPartition, directoryId, () => ())
|
||||
for (topicPartition <- topicPartitions) {
|
||||
if (this.promotionStates.containsKey(topicPartition)) {
|
||||
val PromotionState(reassignmentState, topicId, originalDir) = this.promotionStates.get(topicPartition)
|
||||
// Revert any reassignments for partitions that did not complete the future replica promotion
|
||||
if (originalDir.isDefined && topicId.isDefined && reassignmentState.maybeInconsistentMetadata) {
|
||||
directoryEventHandler.handleAssignment(new TopicIdPartition(topicId.get, topicPartition.partition()), originalDir.get, () => ())
|
||||
}
|
||||
this.promotionStates.remove(topicPartition)
|
||||
}
|
||||
}
|
||||
|
||||
super.removePartitions(topicPartitions)
|
||||
}
|
||||
|
||||
private def reassignmentState(topicPartition: TopicPartition): ReassignmentState = promotionStates.get(topicPartition).reassignmentState
|
||||
|
||||
// Visible for testing
|
||||
private[server] def updatedAssignmentRequestState(topicPartition: TopicPartition)(state: ReplicaAlterLogDirsThread.DirectoryEventRequestState): Unit = {
|
||||
assignmentRequestStates.put(topicPartition, state)
|
||||
private[server] def updateReassignmentState(topicPartition: TopicPartition, state: ReassignmentState): Unit = {
|
||||
log.debug(s"Updating future replica ${topicPartition} reassignment state to ${state}")
|
||||
promotionStates.put(topicPartition, promotionStates.get(topicPartition).withAssignment(state))
|
||||
}
|
||||
|
||||
private def maybePromoteFutureReplica(topicPartition: TopicPartition, partition: Partition) = {
|
||||
|
@ -120,33 +124,28 @@ class ReplicaAlterLogDirsThread(name: String,
|
|||
if (topicId.isEmpty)
|
||||
throw new IllegalStateException(s"Topic ${topicPartition.topic()} does not have an ID.")
|
||||
|
||||
partitionAssignmentRequestState(topicPartition) match {
|
||||
case None =>
|
||||
reassignmentState(topicPartition) match {
|
||||
case ReassignmentState.None =>
|
||||
// Schedule assignment request and don't promote the future replica yet until the controller has accepted the request.
|
||||
partition.runCallbackIfFutureReplicaCaughtUp(_ => {
|
||||
partition.futureReplicaDirectoryId()
|
||||
.map(id => {
|
||||
directoryEventHandler.handleAssignment(new TopicIdPartition(topicId.get, topicPartition.partition()), id,
|
||||
() => updatedAssignmentRequestState(topicPartition)(ReplicaAlterLogDirsThread.COMPLETED))
|
||||
// mark the assignment request state as queued.
|
||||
updatedAssignmentRequestState(topicPartition)(ReplicaAlterLogDirsThread.QUEUED)
|
||||
})
|
||||
val targetDir = partition.futureReplicaDirectoryId().get
|
||||
val topicIdPartition = new TopicIdPartition(topicId.get, topicPartition.partition())
|
||||
directoryEventHandler.handleAssignment(topicIdPartition, targetDir, () => updateReassignmentState(topicPartition, ReassignmentState.Accepted))
|
||||
updateReassignmentState(topicPartition, ReassignmentState.Queued)
|
||||
})
|
||||
case Some(ReplicaAlterLogDirsThread.COMPLETED) =>
|
||||
case ReassignmentState.Accepted =>
|
||||
// Promote future replica if controller accepted the request and the replica caught-up with the original log.
|
||||
if (partition.maybeReplaceCurrentWithFutureReplica()) {
|
||||
updateReassignmentState(topicPartition, ReassignmentState.Effective)
|
||||
removePartitions(Set(topicPartition))
|
||||
assignmentRequestStates.remove(topicPartition)
|
||||
}
|
||||
case _ =>
|
||||
log.trace("Waiting for AssignmentRequest to succeed before promoting the future replica.")
|
||||
case ReassignmentState.Queued =>
|
||||
log.trace("Waiting for AssignReplicasToDirsRequest to succeed before promoting the future replica.")
|
||||
case ReassignmentState.Effective =>
|
||||
throw new IllegalStateException("BUG: trying to promote a future replica twice")
|
||||
}
|
||||
}
|
||||
|
||||
private def partitionAssignmentRequestState(topicPartition: TopicPartition): Option[DirectoryEventRequestState] = {
|
||||
Option(assignmentRequestStates.get(topicPartition))
|
||||
}
|
||||
|
||||
override def addPartitions(initialFetchStates: Map[TopicPartition, InitialFetchState]): Set[TopicPartition] = {
|
||||
partitionMapLock.lockInterruptibly()
|
||||
try {
|
||||
|
@ -155,6 +154,13 @@ class ReplicaAlterLogDirsThread(name: String,
|
|||
val filteredFetchStates = initialFetchStates.filter { case (tp, _) =>
|
||||
replicaMgr.futureLogExists(tp)
|
||||
}
|
||||
filteredFetchStates.foreach {
|
||||
case (topicPartition, state) =>
|
||||
val topicId = state.topicId
|
||||
val currentDirectoryId = replicaMgr.getPartitionOrException(topicPartition).logDirectoryId()
|
||||
val promotionState = PromotionState(ReassignmentState.None, topicId, currentDirectoryId)
|
||||
promotionStates.put(topicPartition, promotionState)
|
||||
}
|
||||
super.addPartitions(filteredFetchStates)
|
||||
} finally {
|
||||
partitionMapLock.unlock()
|
||||
|
@ -188,9 +194,52 @@ class ReplicaAlterLogDirsThread(name: String,
|
|||
}
|
||||
}
|
||||
object ReplicaAlterLogDirsThread {
|
||||
sealed trait DirectoryEventRequestState
|
||||
/**
|
||||
* @param reassignmentState Tracks the state of the replica-to-directory assignment update in the metadata
|
||||
* @param topicId The ID of the topic, which is useful if a reverting the assignment is required
|
||||
* @param currentDir The original directory ID from which the future replica fetches from
|
||||
*/
|
||||
case class PromotionState(reassignmentState: ReassignmentState, topicId: Option[Uuid], currentDir: Option[Uuid]) {
|
||||
def withAssignment(newDirReassignmentState: ReassignmentState): PromotionState =
|
||||
PromotionState(newDirReassignmentState, topicId, currentDir)
|
||||
}
|
||||
|
||||
case object QUEUED extends DirectoryEventRequestState
|
||||
/**
|
||||
* Represents the state of the request to update the directory assignment from the current replica directory
|
||||
* to the future replica directory.
|
||||
*/
|
||||
sealed trait ReassignmentState {
|
||||
/**
|
||||
* @return true if the directory assignment in the cluster metadata may be inconsistent with the actual
|
||||
* directory where the main replica is hosted.
|
||||
*/
|
||||
def maybeInconsistentMetadata: Boolean = false
|
||||
}
|
||||
|
||||
case object COMPLETED extends DirectoryEventRequestState
|
||||
object ReassignmentState {
|
||||
|
||||
/**
|
||||
* The request has not been created.
|
||||
*/
|
||||
case object None extends ReassignmentState
|
||||
|
||||
/**
|
||||
* The request has been queued, it may or may not yet have been sent to the Controller.
|
||||
*/
|
||||
case object Queued extends ReassignmentState {
|
||||
override def maybeInconsistentMetadata: Boolean = true
|
||||
}
|
||||
|
||||
/**
|
||||
* The controller has acknowledged the new directory assignment and persisted the change in metadata.
|
||||
*/
|
||||
case object Accepted extends ReassignmentState {
|
||||
override def maybeInconsistentMetadata: Boolean = true
|
||||
}
|
||||
|
||||
/**
|
||||
* The future replica has been promoted and replaced the current replica.
|
||||
*/
|
||||
case object Effective extends ReassignmentState
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,6 +20,7 @@ import kafka.cluster.{BrokerEndPoint, Partition}
|
|||
import kafka.log.{LogManager, UnifiedLog}
|
||||
import kafka.server.AbstractFetcherThread.ResultWithPartitions
|
||||
import kafka.server.QuotaFactory.UnboundedQuota
|
||||
import kafka.server.ReplicaAlterLogDirsThread.ReassignmentState
|
||||
import kafka.server.metadata.ZkMetadataCache
|
||||
import kafka.utils.{DelayedItem, TestUtils}
|
||||
import org.apache.kafka.common.errors.KafkaStorageException
|
||||
|
@ -30,12 +31,13 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors}
|
|||
import org.apache.kafka.common.record.MemoryRecords
|
||||
import org.apache.kafka.common.requests.{FetchRequest, UpdateMetadataRequest}
|
||||
import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
|
||||
import org.apache.kafka.server.common
|
||||
import org.apache.kafka.server.common.{DirectoryEventHandler, MetadataVersion, OffsetAndEpoch}
|
||||
import org.apache.kafka.storage.internals.log.{FetchIsolation, FetchParams, FetchPartitionData}
|
||||
import org.junit.jupiter.api.Assertions._
|
||||
import org.junit.jupiter.api.Test
|
||||
import org.mockito.ArgumentMatchers.{any, anyBoolean}
|
||||
import org.mockito.Mockito.{doNothing, mock, never, times, verify, verifyNoInteractions, when}
|
||||
import org.mockito.Mockito.{doNothing, mock, never, times, verify, verifyNoInteractions, verifyNoMoreInteractions, when}
|
||||
import org.mockito.{ArgumentCaptor, ArgumentMatchers, Mockito}
|
||||
|
||||
import java.util.{Collections, Optional, OptionalInt, OptionalLong}
|
||||
|
@ -129,6 +131,7 @@ class ReplicaAlterLogDirsThreadTest {
|
|||
when(partition.futureLocalLogOrException).thenReturn(futureLog)
|
||||
doNothing().when(partition).truncateTo(offset = 0, isFuture = true)
|
||||
when(partition.maybeReplaceCurrentWithFutureReplica()).thenReturn(true)
|
||||
when(partition.logDirectoryId()).thenReturn(Some(Uuid.fromString("gOZOXHnkR9eiA1W9ZuLk8A")))
|
||||
|
||||
when(futureLog.logStartOffset).thenReturn(0L)
|
||||
when(futureLog.logEndOffset).thenReturn(0L)
|
||||
|
@ -228,6 +231,7 @@ class ReplicaAlterLogDirsThreadTest {
|
|||
when(partition.futureLocalLogOrException).thenReturn(futureLog)
|
||||
doNothing().when(partition).truncateTo(offset = 0, isFuture = true)
|
||||
when(partition.maybeReplaceCurrentWithFutureReplica()).thenReturn(true)
|
||||
when(partition.logDirectoryId()).thenReturn(Some(Uuid.fromString("PGLOjDjKQaCOXFOtxymIig")))
|
||||
|
||||
when(futureLog.logStartOffset).thenReturn(0L)
|
||||
when(futureLog.logEndOffset).thenReturn(0L)
|
||||
|
@ -268,9 +272,9 @@ class ReplicaAlterLogDirsThreadTest {
|
|||
assertEquals(0, thread.partitionCount)
|
||||
}
|
||||
|
||||
def updateAssignmentRequestState(thread: ReplicaAlterLogDirsThread, partitionId:Int, newState: ReplicaAlterLogDirsThread.DirectoryEventRequestState) = {
|
||||
private def updateReassignmentState(thread: ReplicaAlterLogDirsThread, partitionId:Int, newState: ReassignmentState) = {
|
||||
topicNames.get(topicId).map(topicName => {
|
||||
thread.updatedAssignmentRequestState(new TopicPartition(topicName, partitionId))(newState)
|
||||
thread.updateReassignmentState(new TopicPartition(topicName, partitionId), newState)
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -290,6 +294,7 @@ class ReplicaAlterLogDirsThreadTest {
|
|||
|
||||
val leaderEpoch = 5
|
||||
val logEndOffset = 0
|
||||
val currentDirectoryId = Uuid.fromString("EzI9SqkFQKW1iFc1ZwP9SQ")
|
||||
|
||||
when(partition.partitionId).thenReturn(partitionId)
|
||||
when(partition.topicId).thenReturn(Some(topicId))
|
||||
|
@ -312,6 +317,7 @@ class ReplicaAlterLogDirsThreadTest {
|
|||
doNothing().when(partition).truncateTo(offset = 0, isFuture = true)
|
||||
when(partition.maybeReplaceCurrentWithFutureReplica()).thenReturn(true)
|
||||
when(partition.runCallbackIfFutureReplicaCaughtUp(any())).thenReturn(true)
|
||||
when(partition.logDirectoryId()).thenReturn(Some(currentDirectoryId))
|
||||
|
||||
when(futureLog.logStartOffset).thenReturn(0L)
|
||||
when(futureLog.logEndOffset).thenReturn(0L)
|
||||
|
@ -353,13 +359,13 @@ class ReplicaAlterLogDirsThreadTest {
|
|||
assertTrue(thread.fetchState(t1p0).isDefined)
|
||||
assertEquals(1, thread.partitionCount)
|
||||
|
||||
updateAssignmentRequestState(thread, partitionId, ReplicaAlterLogDirsThread.QUEUED)
|
||||
updateReassignmentState(thread, partitionId, ReassignmentState.Queued)
|
||||
|
||||
// Don't promote future replica if assignment request is queued but not completed
|
||||
thread.doWork()
|
||||
assertTrue(thread.fetchState(t1p0).isDefined)
|
||||
assertEquals(1, thread.partitionCount)
|
||||
updateAssignmentRequestState(thread, partitionId, ReplicaAlterLogDirsThread.COMPLETED)
|
||||
updateReassignmentState(thread, partitionId, ReassignmentState.Accepted)
|
||||
|
||||
// Promote future replica if assignment request is completed
|
||||
thread.doWork()
|
||||
|
@ -448,7 +454,7 @@ class ReplicaAlterLogDirsThreadTest {
|
|||
assertTrue(thread.fetchState(t1p0).isDefined)
|
||||
assertEquals(1, thread.partitionCount)
|
||||
|
||||
updateAssignmentRequestState(thread, partitionId, ReplicaAlterLogDirsThread.QUEUED)
|
||||
updateReassignmentState(thread, partitionId, ReassignmentState.Queued)
|
||||
|
||||
// revert assignment and delete request state if assignment is cancelled
|
||||
thread.removePartitions(Set(t1p0))
|
||||
|
@ -464,6 +470,40 @@ class ReplicaAlterLogDirsThreadTest {
|
|||
assertEquals(partition.logDirectoryId().get, logIdCaptureT1p0.getValue)
|
||||
}
|
||||
|
||||
@Test
|
||||
def shouldRevertReassignmentsForIncompleteFutureReplicaPromotions(): Unit = {
|
||||
val replicaManager = Mockito.mock(classOf[ReplicaManager])
|
||||
val directoryEventHandler = mock(classOf[DirectoryEventHandler])
|
||||
val quotaManager = Mockito.mock(classOf[ReplicationQuotaManager])
|
||||
val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234"))
|
||||
val endPoint = new BrokerEndPoint(0, "localhost", 1000)
|
||||
val leader = new LocalLeaderEndPoint(endPoint, config, replicaManager, quotaManager)
|
||||
val thread = new ReplicaAlterLogDirsThread(
|
||||
"alter-logs-dirs-thread",
|
||||
leader,
|
||||
failedPartitions,
|
||||
replicaManager,
|
||||
quotaManager,
|
||||
Mockito.mock(classOf[BrokerTopicStats]),
|
||||
0,
|
||||
directoryEventHandler)
|
||||
|
||||
val tp = Seq.range(0, 4).map(new TopicPartition("t", _))
|
||||
val tips = Seq.range(0, 4).map(new common.TopicIdPartition(topicId, _))
|
||||
val dirIds = Seq.range(0, 4).map(i => Uuid.fromString(s"TESTBROKER0000DIR${i}AAAA"))
|
||||
tp.foreach(tp => thread.promotionStates.put(tp, ReplicaAlterLogDirsThread.PromotionState(ReassignmentState.None, Some(topicId), Some(dirIds(tp.partition())))))
|
||||
thread.updateReassignmentState(tp(0), ReassignmentState.None)
|
||||
thread.updateReassignmentState(tp(1), ReassignmentState.Queued)
|
||||
thread.updateReassignmentState(tp(2), ReassignmentState.Accepted)
|
||||
thread.updateReassignmentState(tp(3), ReassignmentState.Effective)
|
||||
|
||||
thread.removePartitions(tp.toSet)
|
||||
|
||||
verify(directoryEventHandler).handleAssignment(ArgumentMatchers.eq(tips(1)), ArgumentMatchers.eq(dirIds(1)), any())
|
||||
verify(directoryEventHandler).handleAssignment(ArgumentMatchers.eq(tips(2)), ArgumentMatchers.eq(dirIds(2)), any())
|
||||
verifyNoMoreInteractions(directoryEventHandler)
|
||||
}
|
||||
|
||||
private def mockFetchFromCurrentLog(topicIdPartition: TopicIdPartition,
|
||||
requestData: FetchRequest.PartitionData,
|
||||
config: KafkaConfig,
|
||||
|
@ -691,6 +731,8 @@ class ReplicaAlterLogDirsThreadTest {
|
|||
.setErrorCode(Errors.NONE.code)
|
||||
.setLeaderEpoch(leaderEpoch)
|
||||
.setEndOffset(replicaT1p1LEO))
|
||||
when(partitionT1p0.logDirectoryId()).thenReturn(Some(Uuid.fromString("Jsg8ufNCQYONNquPt7VYpA")))
|
||||
when(partitionT1p1.logDirectoryId()).thenReturn(Some(Uuid.fromString("D2Yf6FtNROGVKoIZadSFIg")))
|
||||
|
||||
when(replicaManager.logManager).thenReturn(logManager)
|
||||
stubWithFetchMessages(logT1p0, logT1p1, futureLogT1p0, partitionT1p0, replicaManager, responseCallback)
|
||||
|
@ -775,6 +817,7 @@ class ReplicaAlterLogDirsThreadTest {
|
|||
.setEndOffset(replicaEpochEndOffset))
|
||||
when(futureLog.endOffsetForEpoch(leaderEpoch - 2)).thenReturn(
|
||||
Some(new OffsetAndEpoch(futureReplicaEpochEndOffset, leaderEpoch - 2)))
|
||||
when(partition.logDirectoryId()).thenReturn(Some(Uuid.fromString("n6WOe2zPScqZLIreCWN6Ug")))
|
||||
|
||||
when(replicaManager.logManager).thenReturn(logManager)
|
||||
stubWithFetchMessages(log, null, futureLog, partition, replicaManager, responseCallback)
|
||||
|
@ -829,6 +872,7 @@ class ReplicaAlterLogDirsThreadTest {
|
|||
when(replicaManager.futureLogExists(t1p0)).thenReturn(true)
|
||||
|
||||
when(replicaManager.logManager).thenReturn(logManager)
|
||||
when(partition.logDirectoryId()).thenReturn(Some(Uuid.fromString("b2e1ihvGQiu6A504oKoddQ")))
|
||||
|
||||
// pretend this is a completely new future replica, with no leader epochs recorded
|
||||
when(futureLog.latestEpoch).thenReturn(None)
|
||||
|
@ -880,6 +924,7 @@ class ReplicaAlterLogDirsThreadTest {
|
|||
|
||||
//Stubs
|
||||
when(partition.partitionId).thenReturn(partitionId)
|
||||
when(partition.logDirectoryId()).thenReturn(Some(Uuid.fromString("wO7bUpvcSZC0QKEK6P6AiA")))
|
||||
|
||||
when(replicaManager.metadataCache).thenReturn(metadataCache)
|
||||
when(replicaManager.getPartitionOrException(t1p0))
|
||||
|
@ -967,6 +1012,7 @@ class ReplicaAlterLogDirsThreadTest {
|
|||
val replicaLEO = 213
|
||||
|
||||
when(partition.partitionId).thenReturn(partitionId)
|
||||
when(partition.logDirectoryId()).thenReturn(Some(Uuid.fromString("dybMM9CpRP2s6HSslW4NHg")))
|
||||
|
||||
when(replicaManager.metadataCache).thenReturn(metadataCache)
|
||||
when(replicaManager.getPartitionOrException(t1p0))
|
||||
|
@ -1025,6 +1071,9 @@ class ReplicaAlterLogDirsThreadTest {
|
|||
//Stubs
|
||||
when(replicaManager.logManager).thenReturn(logManager)
|
||||
when(replicaManager.metadataCache).thenReturn(metadataCache)
|
||||
when(replicaManager.getPartitionOrException(t1p0)).thenReturn(partition)
|
||||
when(replicaManager.getPartitionOrException(t1p1)).thenReturn(partition)
|
||||
when(partition.logDirectoryId()).thenReturn(Some(Uuid.fromString("Y0qUL19gSmKAXmohmrUM4g")))
|
||||
stub(log, null, futureLog, partition, replicaManager)
|
||||
|
||||
//Create the fetcher thread
|
||||
|
@ -1076,6 +1125,9 @@ class ReplicaAlterLogDirsThreadTest {
|
|||
when(futureLog.logStartOffset).thenReturn(startOffset)
|
||||
when(replicaManager.logManager).thenReturn(logManager)
|
||||
when(replicaManager.metadataCache).thenReturn(metadataCache)
|
||||
when(replicaManager.getPartitionOrException(t1p0)).thenReturn(partition)
|
||||
when(replicaManager.getPartitionOrException(t1p1)).thenReturn(partition)
|
||||
when(partition.logDirectoryId()).thenReturn(Some(Uuid.fromString("rtrdy3nsQwO1OQUEUYGxRQ")))
|
||||
stub(log, null, futureLog, partition, replicaManager)
|
||||
|
||||
//Create the fetcher thread
|
||||
|
|
|
@ -2951,15 +2951,17 @@ public class ReplicationControlManagerTest {
|
|||
Uuid dir1b1 = Uuid.fromString("hO2YI5bgRUmByNPHiHxjNQ");
|
||||
Uuid dir2b1 = Uuid.fromString("R3Gb1HLoTzuKMgAkH5Vtpw");
|
||||
Uuid dir1b2 = Uuid.fromString("TBGa8UayQi6KguqF5nC0sw");
|
||||
Uuid offlineDir = Uuid.fromString("zvAf9BKZRyyrEWz4FX2nLA");
|
||||
ctx.registerBrokersWithDirs(1, asList(dir1b1, dir2b1), 2, singletonList(dir1b2));
|
||||
ctx.unfenceBrokers(1, 2);
|
||||
Uuid topicA = ctx.createTestTopic("a", new int[][]{new int[]{1, 2}, new int[]{1, 2}}).topicId();
|
||||
Uuid topicA = ctx.createTestTopic("a", new int[][]{new int[]{1, 2}, new int[]{1, 2}, new int[]{1, 2}}).topicId();
|
||||
Uuid topicB = ctx.createTestTopic("b", new int[][]{new int[]{1, 2}, new int[]{1, 2}}).topicId();
|
||||
Uuid topicC = ctx.createTestTopic("c", new int[][]{new int[]{2}}).topicId();
|
||||
|
||||
ControllerResult<AssignReplicasToDirsResponseData> controllerResult = ctx.assignReplicasToDirs(1, new HashMap<TopicIdPartition, Uuid>() {{
|
||||
put(new TopicIdPartition(topicA, 0), dir1b1);
|
||||
put(new TopicIdPartition(topicA, 1), dir2b1);
|
||||
put(new TopicIdPartition(topicA, 2), offlineDir); // unknown/offline dir
|
||||
put(new TopicIdPartition(topicB, 0), dir1b1);
|
||||
put(new TopicIdPartition(topicB, 1), DirectoryId.LOST);
|
||||
put(new TopicIdPartition(Uuid.fromString("nLU9hKNXSZuMe5PO2A4dVQ"), 1), dir2b1); // expect UNKNOWN_TOPIC_ID
|
||||
|
@ -2978,6 +2980,9 @@ public class ReplicationControlManagerTest {
|
|||
put(new TopicIdPartition(topicA, 1), NONE);
|
||||
put(new TopicIdPartition(Uuid.fromString("nLU9hKNXSZuMe5PO2A4dVQ"), 1), UNKNOWN_TOPIC_ID);
|
||||
}});
|
||||
put(offlineDir, new HashMap<TopicIdPartition, Errors>() {{
|
||||
put(new TopicIdPartition(topicA, 2), NONE);
|
||||
}});
|
||||
put(DirectoryId.LOST, new HashMap<TopicIdPartition, Errors>() {{
|
||||
put(new TopicIdPartition(topicB, 1), NONE);
|
||||
}});
|
||||
|
@ -2990,6 +2995,9 @@ public class ReplicationControlManagerTest {
|
|||
new ApiMessageAndVersion(
|
||||
new PartitionChangeRecord().setTopicId(topicA).setPartitionId(1).
|
||||
setDirectories(asList(dir2b1, dir1b2)), recordVersion),
|
||||
new ApiMessageAndVersion(
|
||||
new PartitionChangeRecord().setTopicId(topicA).setPartitionId(2).
|
||||
setDirectories(asList(offlineDir, dir1b2)), recordVersion),
|
||||
new ApiMessageAndVersion(
|
||||
new PartitionChangeRecord().setTopicId(topicB).setPartitionId(0).
|
||||
setDirectories(asList(dir1b1, dir1b2)), recordVersion),
|
||||
|
@ -2997,8 +3005,13 @@ public class ReplicationControlManagerTest {
|
|||
new PartitionChangeRecord().setTopicId(topicB).setPartitionId(1).
|
||||
setDirectories(asList(DirectoryId.LOST, dir1b2)), recordVersion),
|
||||
|
||||
// In addition to the directory assignment changes we expect an additional record,
|
||||
// which elects a new leader for bar-1 which has been assigned to an offline directory.
|
||||
// In addition to the directory assignment changes we expect two additional records,
|
||||
// which elect new leaders for:
|
||||
// - a-2 which has been assigned to a directory which is not an online directory (unknown/offline)
|
||||
// - b-1 which has been assigned to an offline directory.
|
||||
new ApiMessageAndVersion(
|
||||
new PartitionChangeRecord().setTopicId(topicA).setPartitionId(2).
|
||||
setIsr(singletonList(2)).setLeader(2), recordVersion),
|
||||
new ApiMessageAndVersion(
|
||||
new PartitionChangeRecord().setTopicId(topicB).setPartitionId(1).
|
||||
setIsr(singletonList(2)).setLeader(2), recordVersion)
|
||||
|
@ -3011,6 +3024,7 @@ public class ReplicationControlManagerTest {
|
|||
add(new TopicIdPartition(topicB, 0));
|
||||
}}, RecordTestUtils.iteratorToSet(ctx.replicationControl.brokersToIsrs().iterator(1, true)));
|
||||
assertEquals(new HashSet<TopicIdPartition>() {{
|
||||
add(new TopicIdPartition(topicA, 2));
|
||||
add(new TopicIdPartition(topicB, 1));
|
||||
add(new TopicIdPartition(topicC, 0));
|
||||
}},
|
||||
|
|
Loading…
Reference in New Issue