diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index ba2d72a9e0c..83a1f488f41 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -2248,6 +2248,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami s"${KafkaConfig.ControllerListenerNamesProp} must not be empty when running in ZooKeeper migration mode: ${controllerListenerNames.asJava}") require(interBrokerProtocolVersion.isMigrationSupported, s"Cannot enable ZooKeeper migration without setting " + s"'${KafkaConfig.InterBrokerProtocolVersionProp}' to 3.4 or higher") + require(logDirs.size == 1, "Cannot enable ZooKeeper migration when multiple log directories (aka JBOD) are in use.") } else { // controller listener names must be empty when not in KRaft mode require(controllerListenerNames.isEmpty, diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index f0ac47bad8d..42af3976524 100755 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -1677,6 +1677,18 @@ class KafkaConfigTest { KafkaConfig.fromProps(props) } + @Test + def testMigrationCannotBeEnabledWithJBOD(): Unit = { + val props = TestUtils.createBrokerConfig(1, TestUtils.MockZkConnect, port = TestUtils.MockZkPort, logDirCount = 2) + props.setProperty(KafkaConfig.MigrationEnabledProp, "true") + props.setProperty(KafkaConfig.QuorumVotersProp, "3000@localhost:9093") + props.setProperty(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER") + + assertEquals( + "requirement failed: Cannot enable ZooKeeper migration when multiple log directories (aka JBOD) are in use.", + assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props)).getMessage) + } + @Test def testMigrationEnabledKRaftMode(): Unit = { val props = new Properties()