KAFKA-19480: Recreate /migration when it has null value (#20627)

When using the zookeeper-security-migration
tool without the '–enable.path.check' option, the script not only
updates the ACLs for the existing znodes, but also creates any
non-existing ones (with the ACL options specified) using null values
based on the list defined in
`ZkData.SecureRootPaths`.
This is especially problematic for the /migration znode as the current
logic only checks for the existence of the znode and later the migration
process will hang when it tries to parse the null value over and over
again.

In summary, the migration cannot be completed if the
zookeeper-security-migration script was run previously, and the only
workaround is to manually remove the /migration znode in such cases. I
propose a simple fix to circumvent the manual step by recreating the
/migration znode if it contains a null value.

---------

Co-authored-by: Gergely Harmadas <harmadasg@gmail.com>
This commit is contained in:
Viktor Somogyi-Vass 2025-10-07 12:12:41 +02:00 committed by GitHub
parent c29c130fd9
commit d13c1f652d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 74 additions and 15 deletions

View File

@ -21,7 +21,7 @@ import joptsimple.{OptionSet, OptionSpec, OptionSpecBuilder}
import kafka.server.KafkaConfig
import kafka.utils.{Exit, Logging, ToolsUtils}
import kafka.utils.Implicits._
import kafka.zk.{ControllerZNode, KafkaZkClient, ZkData, ZkSecurityMigratorUtils}
import kafka.zk.{ControllerZNode, KafkaZkClient, MigrationZNode, ZkData, ZkSecurityMigratorUtils}
import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.server.config.ZkConfigs
@ -260,12 +260,18 @@ class ZkSecurityMigrator(zkClient: KafkaZkClient) extends Logging {
}
private def run(enablePathCheck: Boolean): Unit = {
def skipSetAcl(path: String): Boolean = {
val isControllerPath = path == ControllerZNode.path
val isMigrationPath = path == MigrationZNode.path
(isControllerPath || isMigrationPath) && !zkClient.pathExists(path)
}
try {
setAclIndividually("/")
checkPathExistenceAndMaybeExit(enablePathCheck)
for (path <- ZkData.SecureRootPaths) {
debug("Going to set ACL for %s".format(path))
if (path == ControllerZNode.path && !zkClient.pathExists(path)) {
if (skipSetAcl(path)) {
debug("Ignoring to set ACL for %s, because it doesn't exist".format(path))
} else {
zkClient.makeSurePersistentPathExists(path)

View File

@ -1734,20 +1734,32 @@ class KafkaZkClient private[zk] (
val getDataResponse = retryRequestUntilConnected(getDataRequest)
getDataResponse.resultCode match {
case Code.OK =>
MigrationZNode.decode(getDataResponse.data, getDataResponse.stat.getVersion, getDataResponse.stat.getMtime)
Option(getDataResponse.data) match {
case Some(data) =>
MigrationZNode.decode(data, getDataResponse.stat.getVersion, getDataResponse.stat.getMtime)
case None =>
info("Migration znode exists with null data, recreating initial migration state")
createInitialMigrationState(initialState, removeFirst = true)
}
case Code.NONODE =>
createInitialMigrationState(initialState)
case _ => throw getDataResponse.resultException.get
}
}
private def createInitialMigrationState(initialState: ZkMigrationLeadershipState): ZkMigrationLeadershipState = {
val createRequest = CreateRequest(
private def createInitialMigrationState(initialState: ZkMigrationLeadershipState, removeFirst: Boolean = false): ZkMigrationLeadershipState = {
val createOp = CreateOp(
MigrationZNode.path,
MigrationZNode.encode(initialState),
defaultAcls(MigrationZNode.path),
CreateMode.PERSISTENT)
val response = retryRequestUntilConnected(createRequest)
val deleteOp = DeleteOp(MigrationZNode.path, ZkVersion.MatchAnyVersion)
val multi = if (removeFirst) {
MultiRequest(Seq(deleteOp, createOp))
} else {
MultiRequest(Seq(createOp))
}
val response = retryRequestUntilConnected(multi)
response.maybeThrow()
initialState.withMigrationZkVersion(0)
}

View File

@ -38,6 +38,8 @@ import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.Time
import org.apache.kafka.server.common.MetadataVersion
import org.apache.zookeeper.client.ZKClientConfig
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import scala.jdk.CollectionConverters._
import scala.collection.Seq
@ -147,11 +149,12 @@ class ZkAuthorizationTest extends QuorumTestHarness with Logging {
* Tests the migration tool when making an unsecure
* cluster secure.
*/
@Test
def testZkMigration(): Unit = {
@ParameterizedTest
@ValueSource(booleans = Array(false, true))
def testZkMigration(includeAllZnodes: Boolean): Unit = {
val unsecureZkClient = newKafkaZkClient(zkConnect, isSecure = false)
try {
testMigration(zkConnect, unsecureZkClient, zkClient)
testMigration(zkConnect, unsecureZkClient, zkClient, includeAllZnodes)
} finally {
unsecureZkClient.close()
}
@ -161,11 +164,12 @@ class ZkAuthorizationTest extends QuorumTestHarness with Logging {
* Tests the migration tool when making a secure
* cluster unsecure.
*/
@Test
def testZkAntiMigration(): Unit = {
@ParameterizedTest
@ValueSource(booleans = Array(false, true))
def testZkAntiMigration(includeAllZnodes: Boolean): Unit = {
val unsecureZkClient = newKafkaZkClient(zkConnect, isSecure = false)
try {
testMigration(zkConnect, zkClient, unsecureZkClient)
testMigration(zkConnect, zkClient, unsecureZkClient, includeAllZnodes)
} finally {
unsecureZkClient.close()
}
@ -218,9 +222,17 @@ class ZkAuthorizationTest extends QuorumTestHarness with Logging {
* Exercises the migration tool. It is used in these test cases:
* testZkMigration, testZkAntiMigration, testChroot.
*/
private def testMigration(zkUrl: String, firstZk: KafkaZkClient, secondZk: KafkaZkClient): Unit = {
private def testMigration(
zkUrl: String,
firstZk: KafkaZkClient,
secondZk: KafkaZkClient,
includeAllZnodes: Boolean = true): Unit = {
info(s"zkConnect string: $zkUrl")
for (path <- ZkData.SecureRootPaths ++ ZkData.SensitiveRootPaths) {
// Optionally do not create controller and migration znodes
val predicate: String => Boolean = if (includeAllZnodes) _ => true else skipCreateZnodes
val paths = (ZkData.SecureRootPaths ++ ZkData.SensitiveRootPaths).filter(predicate)
for (path <- paths) {
info(s"Creating $path")
firstZk.makeSurePersistentPathExists(path)
// Create a child for each znode to exercise the recurrent
@ -241,7 +253,7 @@ class ZkAuthorizationTest extends QuorumTestHarness with Logging {
}
ZkSecurityMigrator.run(Array(s"--zookeeper.acl=$secureOpt", s"--zookeeper.connect=$zkUrl"))
info("Done with migration")
for (path <- ZkData.SecureRootPaths ++ ZkData.SensitiveRootPaths) {
for (path <- paths) {
val sensitive = ZkData.sensitivePath(path)
val listParent = secondZk.getAcl(path)
assertTrue(isAclCorrect(listParent, secondZk.secure, sensitive), path)
@ -257,6 +269,18 @@ class ZkAuthorizationTest extends QuorumTestHarness with Logging {
ZkData.sensitivePath(ExtendedAclZNode.path)), "/kafka-acl-extended")
assertTrue(isAclCorrect(firstZk.getAcl("/feature"), secondZk.secure,
ZkData.sensitivePath(FeatureZNode.path)), "ACL mismatch for /feature path")
if (!includeAllZnodes) {
// Check controller and migration znodes should not be created
assertFalse(firstZk.pathExists(ControllerZNode.path))
assertFalse(firstZk.pathExists(MigrationZNode.path))
}
}
private def skipCreateZnodes(path: String): Boolean = {
val isNotControllerPath = path != ControllerZNode.path
val isNotMigrationPath = path != MigrationZNode.path
isNotControllerPath && isNotMigrationPath
}
/**

View File

@ -1442,6 +1442,21 @@ class KafkaZkClientTest extends QuorumTestHarness {
} finally System.clearProperty(ZKConfig.JUTE_MAXBUFFER)
}
@Test
def testMigrationZnodeWithNullValue(): Unit = {
val (controllerEpoch, stat) = zkClient.getControllerEpoch.get
var migrationState = new ZkMigrationLeadershipState(3000, 42, 100, 42, Time.SYSTEM.milliseconds(), -1, controllerEpoch, stat.getVersion)
zkClient.retryRequestUntilConnected(CreateRequest(
MigrationZNode.path,
null,
zkClient.defaultAcls(MigrationZNode.path),
CreateMode.PERSISTENT))
migrationState = zkClient.getOrCreateMigrationState(migrationState)
assertEquals(0, migrationState.migrationZkVersion())
}
@Test
def testFailToUpdateMigrationZNode(): Unit = {
val (controllerEpoch, stat) = zkClient.getControllerEpoch.get

View File

@ -4064,6 +4064,8 @@ inter.broker.listener.name=PLAINTEXT
<p>The new standalone controller in the example configuration above should be formatted using the <code>kafka-storage format --standalone</code>command.</p>
<p>Note: The migration can stall if the <a href="#zk_authz_migration">ZooKeeper Security Migration Tool</a> was previously executed (fixed from 3.9.2, see <a href="https://issues.apache.org/jira/browse/KAFKA-19480">KAFKA-19026</a> for more details). As a workaround, the malformed "/migration" node can be removed from ZooKeeper by running <code>delete /migration</code> with the <code>zookeeper-shell.sh</code> CLI tool.</p>
<p><em>Note: The KRaft cluster <code>node.id</code> values must be different from any existing ZK broker <code>broker.id</code>.
In KRaft-mode, the brokers and controllers share the same Node ID namespace.</em></p>