mirror of https://github.com/apache/kafka.git
KAFKA-16171: Fix ZK migration controller race #15238
This patch causes the active KRaftMigrationDriver to reload the /migration ZK state after electing itself as the leader in ZK. This closes a race condition where the previous active controller could make an update to /migration after the new leader was elected. The update race was not actually a problem regarding the data since both controllers would be syncing the same state from KRaft to ZK, but the change to the znode causes the new controller to fail on the zk version check on /migration. This patch also fixes a as-yet-unseen bug where the active controllers failing to elect itself via claimControllerLeadership would not retry. Reviewers: Colin P. McCabe <cmccabe@apache.org>
This commit is contained in:
parent
11e488b586
commit
5d3e691e47
|
@ -0,0 +1,284 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package kafka.zk
|
||||||
|
|
||||||
|
import kafka.utils.{Logging, PasswordEncoder, TestUtils}
|
||||||
|
import org.apache.kafka.clients.ApiVersions
|
||||||
|
import org.apache.kafka.common.{Node, Uuid}
|
||||||
|
import org.apache.kafka.common.metadata.{FeatureLevelRecord, TopicRecord}
|
||||||
|
import org.apache.kafka.common.utils.{Time, Utils}
|
||||||
|
import org.apache.kafka.controller.QuorumFeatures
|
||||||
|
import org.apache.kafka.controller.metrics.QuorumControllerMetrics
|
||||||
|
import org.apache.kafka.image.loader.LogDeltaManifest
|
||||||
|
import org.apache.kafka.image.publisher.MetadataPublisher
|
||||||
|
import org.apache.kafka.image.{MetadataDelta, MetadataImage, MetadataProvenance}
|
||||||
|
import org.apache.kafka.metadata.KafkaConfigSchema
|
||||||
|
import org.apache.kafka.metadata.migration._
|
||||||
|
import org.apache.kafka.raft.{LeaderAndEpoch, OffsetAndEpoch}
|
||||||
|
import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion}
|
||||||
|
import org.apache.kafka.server.fault.FaultHandler
|
||||||
|
import org.apache.zookeeper.client.ZKClientConfig
|
||||||
|
import org.junit.jupiter.api.Assertions.{assertTrue, fail}
|
||||||
|
import org.junit.jupiter.api.Test
|
||||||
|
|
||||||
|
import java.util
|
||||||
|
import java.util.concurrent.{CompletableFuture, TimeUnit}
|
||||||
|
import java.util.{Optional, OptionalInt}
|
||||||
|
import scala.collection.mutable
|
||||||
|
|
||||||
|
class ZkMigrationFailoverTest extends Logging {
|
||||||
|
|
||||||
|
class CapturingFaultHandler(nodeId: Int) extends FaultHandler {
|
||||||
|
val faults = mutable.Buffer[Throwable]()
|
||||||
|
var future: CompletableFuture[Throwable] = CompletableFuture.completedFuture(new RuntimeException())
|
||||||
|
var waitingForMsg = ""
|
||||||
|
|
||||||
|
override def handleFault(failureMessage: String, cause: Throwable): RuntimeException = {
|
||||||
|
error(s"Fault handled on node $nodeId", cause)
|
||||||
|
faults.append(cause)
|
||||||
|
if (!future.isDone && cause.getMessage.contains(waitingForMsg)) {
|
||||||
|
future.complete(cause)
|
||||||
|
}
|
||||||
|
new RuntimeException(cause)
|
||||||
|
}
|
||||||
|
|
||||||
|
def checkAndClear(verifier: (Seq[Throwable]) => Unit): Unit = {
|
||||||
|
val faultsSoFar = faults.toSeq
|
||||||
|
try {
|
||||||
|
verifier.apply(faultsSoFar)
|
||||||
|
} catch {
|
||||||
|
case ae: AssertionError => fail(s"Assertion failed. Faults on $nodeId were: $faultsSoFar", ae)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
def waitForError(message: String): CompletableFuture[Throwable] = {
|
||||||
|
future = new CompletableFuture[Throwable]()
|
||||||
|
waitingForMsg = message
|
||||||
|
future
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
def buildMigrationDriver(nodeId: Int, zkMigrationClient: ZkMigrationClient): (KRaftMigrationDriver, CapturingFaultHandler) = {
|
||||||
|
val faultHandler = new CapturingFaultHandler(nodeId)
|
||||||
|
val driver = KRaftMigrationDriver.newBuilder
|
||||||
|
.setNodeId(nodeId)
|
||||||
|
.setZkRecordConsumer(new ZkRecordConsumer {
|
||||||
|
override def beginMigration(): CompletableFuture[_] = ???
|
||||||
|
|
||||||
|
override def acceptBatch(recordBatch: util.List[ApiMessageAndVersion]): CompletableFuture[_] = ???
|
||||||
|
|
||||||
|
override def completeMigration(): CompletableFuture[OffsetAndEpoch] = ???
|
||||||
|
|
||||||
|
override def abortMigration(): Unit = ???
|
||||||
|
})
|
||||||
|
.setInitialZkLoadHandler((_: MetadataPublisher) => {})
|
||||||
|
.setZkMigrationClient(zkMigrationClient)
|
||||||
|
.setFaultHandler(faultHandler)
|
||||||
|
.setQuorumFeatures(QuorumFeatures.create(nodeId,
|
||||||
|
new ApiVersions(),
|
||||||
|
QuorumFeatures.defaultFeatureMap(),
|
||||||
|
util.Arrays.asList(
|
||||||
|
new Node(3000, "localhost", 3000),
|
||||||
|
new Node(3001, "localhost", 3001),
|
||||||
|
new Node(3002, "localhost", 3002)
|
||||||
|
)))
|
||||||
|
.setConfigSchema(KafkaConfigSchema.EMPTY)
|
||||||
|
.setControllerMetrics(new QuorumControllerMetrics(Optional.empty(), Time.SYSTEM, true))
|
||||||
|
.setTime(Time.SYSTEM)
|
||||||
|
.setPropagator(new LegacyPropagator() {
|
||||||
|
override def startup(): Unit = ???
|
||||||
|
|
||||||
|
override def shutdown(): Unit = ???
|
||||||
|
|
||||||
|
override def publishMetadata(image: MetadataImage): Unit = ???
|
||||||
|
|
||||||
|
override def sendRPCsToBrokersFromMetadataDelta(delta: MetadataDelta, image: MetadataImage, zkControllerEpoch: Int): Unit = {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
override def sendRPCsToBrokersFromMetadataImage(image: MetadataImage, zkControllerEpoch: Int): Unit = {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
override def clear(): Unit = ???
|
||||||
|
})
|
||||||
|
.build()
|
||||||
|
(driver, faultHandler)
|
||||||
|
}
|
||||||
|
|
||||||
|
def readMigrationZNode(zkMigrationClient: ZkMigrationClient): ZkMigrationLeadershipState = {
|
||||||
|
zkMigrationClient.getOrCreateMigrationRecoveryState(ZkMigrationLeadershipState.EMPTY)
|
||||||
|
}
|
||||||
|
|
||||||
|
def safeGet[T](future: CompletableFuture[T]): T = {
|
||||||
|
future.get(10, TimeUnit.SECONDS)
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
def testControllerFailoverZkRace(): Unit = {
|
||||||
|
val zookeeper = new EmbeddedZookeeper()
|
||||||
|
var zkClient: KafkaZkClient = null
|
||||||
|
val zkConnect = s"127.0.0.1:${zookeeper.port}"
|
||||||
|
try {
|
||||||
|
zkClient = KafkaZkClient(
|
||||||
|
zkConnect,
|
||||||
|
isSecure = false,
|
||||||
|
30000,
|
||||||
|
60000,
|
||||||
|
1,
|
||||||
|
Time.SYSTEM,
|
||||||
|
name = "ZkMigrationFailoverTest",
|
||||||
|
new ZKClientConfig)
|
||||||
|
} catch {
|
||||||
|
case t: Throwable =>
|
||||||
|
Utils.closeQuietly(zookeeper, "EmbeddedZookeeper")
|
||||||
|
zookeeper.shutdown()
|
||||||
|
if (zkClient != null) Utils.closeQuietly(zkClient, "KafkaZkClient")
|
||||||
|
throw t
|
||||||
|
}
|
||||||
|
|
||||||
|
// Safe to reuse these since they don't keep any state
|
||||||
|
val zkMigrationClient = ZkMigrationClient(zkClient, PasswordEncoder.noop())
|
||||||
|
|
||||||
|
val (driver1, faultHandler1) = buildMigrationDriver(3000, zkMigrationClient)
|
||||||
|
val (driver2, faultHandler2) = buildMigrationDriver(3001, zkMigrationClient)
|
||||||
|
|
||||||
|
// Initialize data into /controller and /controller_epoch
|
||||||
|
zkClient.registerControllerAndIncrementControllerEpoch(0)
|
||||||
|
var zkState = zkMigrationClient.claimControllerLeadership(
|
||||||
|
ZkMigrationLeadershipState.EMPTY.withNewKRaftController(3000, 1)
|
||||||
|
)
|
||||||
|
|
||||||
|
// Fake a complete migration
|
||||||
|
zkState = zkState.withKRaftMetadataOffsetAndEpoch(100, 10)
|
||||||
|
zkState = zkMigrationClient.getOrCreateMigrationRecoveryState(zkState)
|
||||||
|
|
||||||
|
try {
|
||||||
|
driver1.start()
|
||||||
|
driver2.start()
|
||||||
|
|
||||||
|
val newLeader1 = new LeaderAndEpoch(OptionalInt.of(3000), 2)
|
||||||
|
var image1 = MetadataImage.EMPTY
|
||||||
|
val delta1 = new MetadataDelta(image1)
|
||||||
|
delta1.replay(new FeatureLevelRecord()
|
||||||
|
.setName(MetadataVersion.FEATURE_NAME)
|
||||||
|
.setFeatureLevel(MetadataVersion.IBP_3_6_IV1.featureLevel))
|
||||||
|
delta1.replay(ZkMigrationState.MIGRATION.toRecord.message)
|
||||||
|
delta1.replay(new TopicRecord().setTopicId(Uuid.randomUuid()).setName("topic-to-sync"))
|
||||||
|
|
||||||
|
val provenance1 = new MetadataProvenance(210, 11, 1)
|
||||||
|
image1 = delta1.apply(provenance1)
|
||||||
|
|
||||||
|
val manifest1 = LogDeltaManifest.newBuilder()
|
||||||
|
.provenance(provenance1)
|
||||||
|
.leaderAndEpoch(newLeader1)
|
||||||
|
.numBatches(1)
|
||||||
|
.elapsedNs(100)
|
||||||
|
.numBytes(42)
|
||||||
|
.build()
|
||||||
|
|
||||||
|
// Load an image into 3000 image and a leader event, this lets it become active and sync to ZK
|
||||||
|
driver1.onMetadataUpdate(delta1, image1, manifest1)
|
||||||
|
driver1.onControllerChange(newLeader1)
|
||||||
|
|
||||||
|
// Hold off on loading image to to 3001. This lets us artificially defer it from claiming leadership in ZK
|
||||||
|
driver2.onControllerChange(newLeader1)
|
||||||
|
|
||||||
|
// Wait for driver 1 to become leader in ZK
|
||||||
|
TestUtils.waitUntilTrue(() => zkClient.getControllerId match {
|
||||||
|
case Some(nodeId) => nodeId == 3000
|
||||||
|
case None => false
|
||||||
|
}, "waiting for 3000 to claim ZK leadership")
|
||||||
|
|
||||||
|
// Now 3001 becomes leader, and loads migration recovery state from ZK.
|
||||||
|
// Since an image hasn't been published to it yet, it will stay in WAIT_FOR_CONTROLLER_QUORUM
|
||||||
|
val newLeader2 = new LeaderAndEpoch(OptionalInt.of(3001), 3)
|
||||||
|
driver2.onControllerChange(newLeader2)
|
||||||
|
TestUtils.waitUntilTrue(
|
||||||
|
() => safeGet(driver2.migrationState()).equals(MigrationDriverState.WAIT_FOR_CONTROLLER_QUORUM),
|
||||||
|
"waiting for node 3001 to enter WAIT_FOR_CONTROLLER_QUORUM")
|
||||||
|
|
||||||
|
// Node 3000 still thinks that its the leader, do a delta update
|
||||||
|
val delta2 = new MetadataDelta(image1)
|
||||||
|
delta2.replay(new TopicRecord().setTopicId(Uuid.randomUuid()).setName("another-topic-to-sync"))
|
||||||
|
val provenance2 = new MetadataProvenance(211, 11, 1)
|
||||||
|
val image2 = delta2.apply(provenance2)
|
||||||
|
val manifest2 = LogDeltaManifest.newBuilder()
|
||||||
|
.provenance(provenance2)
|
||||||
|
.leaderAndEpoch(newLeader1)
|
||||||
|
.numBatches(1)
|
||||||
|
.elapsedNs(100)
|
||||||
|
.numBytes(42)
|
||||||
|
.build()
|
||||||
|
val migrationZkVersion = readMigrationZNode(zkMigrationClient).migrationZkVersion()
|
||||||
|
driver1.onMetadataUpdate(delta2, image2, manifest2)
|
||||||
|
|
||||||
|
// Wait for /migration znode update from 3000 SYNC_KRAFT_TO_ZK
|
||||||
|
TestUtils.waitUntilTrue(() => readMigrationZNode(zkMigrationClient).migrationZkVersion() > migrationZkVersion,
|
||||||
|
"waiting for /migration znode to change")
|
||||||
|
|
||||||
|
// Now unblock 3001 from claiming ZK. This will let it move to BECOME_CONTROLLER
|
||||||
|
val delta3 = new MetadataDelta(image1)
|
||||||
|
delta3.replay(new TopicRecord().setTopicId(Uuid.randomUuid()).setName("another-topic-to-sync"))
|
||||||
|
val provenance3 = new MetadataProvenance(211, 11, 1)
|
||||||
|
val image3 = delta3.apply(provenance3)
|
||||||
|
val manifest3 = LogDeltaManifest.newBuilder()
|
||||||
|
.provenance(provenance3)
|
||||||
|
.leaderAndEpoch(newLeader2)
|
||||||
|
.numBatches(1)
|
||||||
|
.elapsedNs(100)
|
||||||
|
.numBytes(42)
|
||||||
|
.build()
|
||||||
|
driver2.onMetadataUpdate(delta3, image3, manifest3)
|
||||||
|
|
||||||
|
// Now wait for 3001 to become leader in ZK
|
||||||
|
TestUtils.waitUntilTrue(() => zkClient.getControllerId match {
|
||||||
|
case Some(nodeId) => nodeId == 3001
|
||||||
|
case None => false
|
||||||
|
}, "waiting for 3001 to claim ZK leadership")
|
||||||
|
|
||||||
|
// Now, 3001 will reload the /migration state and should not see any errors
|
||||||
|
faultHandler2.checkAndClear(faults => assertTrue(faults.isEmpty))
|
||||||
|
|
||||||
|
// 3000 should not be able to make any more ZK updates now
|
||||||
|
driver1.onMetadataUpdate(delta3, image3, manifest3)
|
||||||
|
safeGet(faultHandler1.waitForError("Controller epoch zkVersion check fails"))
|
||||||
|
|
||||||
|
// 3000 finally processes new leader event
|
||||||
|
driver1.onControllerChange(newLeader2)
|
||||||
|
|
||||||
|
// 3001 should still not have any errors
|
||||||
|
faultHandler2.checkAndClear(faults => assertTrue(faults.isEmpty))
|
||||||
|
|
||||||
|
// Wait until new leader has sync'd to ZK
|
||||||
|
TestUtils.waitUntilTrue(
|
||||||
|
() => safeGet(driver2.migrationState()).equals(MigrationDriverState.DUAL_WRITE),
|
||||||
|
"waiting for driver to enter DUAL_WRITE"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Ensure we still dont have errors on the new leader
|
||||||
|
faultHandler2.checkAndClear(faults => assertTrue(faults.isEmpty))
|
||||||
|
|
||||||
|
} finally {
|
||||||
|
driver1.close()
|
||||||
|
driver2.close()
|
||||||
|
Utils.closeQuietly(zookeeper, "EmbeddedZookeeper")
|
||||||
|
zookeeper.shutdown()
|
||||||
|
if (zkClient != null) Utils.closeQuietly(zkClient, "KafkaZkClient")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -636,12 +636,24 @@ public class KRaftMigrationDriver implements MetadataPublisher {
|
||||||
public void run() throws Exception {
|
public void run() throws Exception {
|
||||||
if (checkDriverState(MigrationDriverState.BECOME_CONTROLLER, this)) {
|
if (checkDriverState(MigrationDriverState.BECOME_CONTROLLER, this)) {
|
||||||
applyMigrationOperation("Claiming ZK controller leadership", zkMigrationClient::claimControllerLeadership);
|
applyMigrationOperation("Claiming ZK controller leadership", zkMigrationClient::claimControllerLeadership);
|
||||||
if (migrationLeadershipState.zkControllerEpochZkVersion() == -1) {
|
if (migrationLeadershipState.zkControllerEpochZkVersion() == ZkMigrationLeadershipState.UNKNOWN_ZK_VERSION) {
|
||||||
log.info("Unable to claim leadership, will retry until we learn of a different KRaft leader");
|
log.info("Unable to claim leadership, will retry until we learn of a different KRaft leader");
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
if (!migrationLeadershipState.initialZkMigrationComplete()) {
|
if (!migrationLeadershipState.initialZkMigrationComplete()) {
|
||||||
transitionTo(MigrationDriverState.ZK_MIGRATION);
|
transitionTo(MigrationDriverState.ZK_MIGRATION);
|
||||||
} else {
|
} else {
|
||||||
|
// KAFKA-16171 after loading the migration state in KRaftLeaderEvent, the previous controller
|
||||||
|
// could have modified the /migration ZNode. Re-read it here after claiming the controller ZNode
|
||||||
|
applyMigrationOperation("Re-reading migration state", state -> {
|
||||||
|
ZkMigrationLeadershipState reloadedState =
|
||||||
|
zkMigrationClient.getOrCreateMigrationRecoveryState(ZkMigrationLeadershipState.EMPTY);
|
||||||
|
return KRaftMigrationDriver.this.migrationLeadershipState
|
||||||
|
.withMigrationZkVersion(reloadedState.migrationZkVersion())
|
||||||
|
.withKRaftMetadataOffsetAndEpoch(
|
||||||
|
reloadedState.kraftMetadataOffset(),
|
||||||
|
reloadedState.kraftMetadataEpoch());
|
||||||
|
});
|
||||||
transitionTo(MigrationDriverState.SYNC_KRAFT_TO_ZK);
|
transitionTo(MigrationDriverState.SYNC_KRAFT_TO_ZK);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -26,10 +26,15 @@ import java.util.Objects;
|
||||||
* that no migration has been started.
|
* that no migration has been started.
|
||||||
*/
|
*/
|
||||||
public class ZkMigrationLeadershipState {
|
public class ZkMigrationLeadershipState {
|
||||||
|
/**
|
||||||
|
* A Kafka-internal constant used to indicate that the znode version is unknown. See ZkVersion.UnknownVersion.
|
||||||
|
*/
|
||||||
|
public static final int UNKNOWN_ZK_VERSION = -2;
|
||||||
|
|
||||||
// Use -2 as sentinel for "unknown version" for ZK versions to avoid sending an actual -1 "any version"
|
// Use -2 as sentinel for "unknown version" for ZK versions to avoid sending an actual -1 "any version"
|
||||||
// when doing ZK writes
|
// when doing ZK writes
|
||||||
public static final ZkMigrationLeadershipState EMPTY = new ZkMigrationLeadershipState(-1, -1, -1, -1, -1, -2, -1, -2);
|
public static final ZkMigrationLeadershipState EMPTY =
|
||||||
|
new ZkMigrationLeadershipState(-1, -1, -1, -1, -1, -2, -1, UNKNOWN_ZK_VERSION);
|
||||||
|
|
||||||
private final int kraftControllerId;
|
private final int kraftControllerId;
|
||||||
|
|
||||||
|
@ -110,7 +115,7 @@ public class ZkMigrationLeadershipState {
|
||||||
return kraftMetadataOffset;
|
return kraftMetadataOffset;
|
||||||
}
|
}
|
||||||
|
|
||||||
public long kraftMetadataEpoch() {
|
public int kraftMetadataEpoch() {
|
||||||
return kraftMetadataEpoch;
|
return kraftMetadataEpoch;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -139,14 +139,18 @@ class CapturingMigrationClient implements MigrationClient {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ZkMigrationLeadershipState claimControllerLeadership(ZkMigrationLeadershipState state) {
|
public ZkMigrationLeadershipState claimControllerLeadership(ZkMigrationLeadershipState state) {
|
||||||
this.state = state;
|
if (state.zkControllerEpochZkVersion() == ZkMigrationLeadershipState.UNKNOWN_ZK_VERSION) {
|
||||||
return state;
|
this.state = state.withZkController(0, 0);
|
||||||
|
} else {
|
||||||
|
this.state = state.withZkController(state.zkControllerEpoch() + 1, state.zkControllerEpochZkVersion() + 1);
|
||||||
|
}
|
||||||
|
return this.state;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ZkMigrationLeadershipState releaseControllerLeadership(ZkMigrationLeadershipState state) {
|
public ZkMigrationLeadershipState releaseControllerLeadership(ZkMigrationLeadershipState state) {
|
||||||
this.state = state;
|
this.state = state.withUnknownZkController();
|
||||||
return state;
|
return this.state;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue