diff --git a/build.gradle b/build.gradle
index e67f5a3b2e0..5b628b7ca86 100644
--- a/build.gradle
+++ b/build.gradle
@@ -749,6 +749,7 @@ project(':core') {
dependencies {
compile project(':clients')
+ compile project(':metadata')
compile project(':raft')
compile libs.jacksonDatabind
compile libs.jacksonModuleScala
diff --git a/checkstyle/import-control-jmh-benchmarks.xml b/checkstyle/import-control-jmh-benchmarks.xml
index 15a7940f6eb..5b9b4185ff0 100644
--- a/checkstyle/import-control-jmh-benchmarks.xml
+++ b/checkstyle/import-control-jmh-benchmarks.xml
@@ -49,6 +49,7 @@
+
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index cd8b4c894d7..ad6e885d498 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -469,6 +469,7 @@
+
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java
index 8b213f798d4..6f287d060d3 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java
@@ -16,11 +16,9 @@
*/
package org.apache.kafka.connect.util.clusters;
-import kafka.server.BrokerState;
import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$;
import kafka.server.KafkaServer;
-import kafka.server.RunningAsBroker;
import kafka.utils.CoreUtils;
import kafka.utils.TestUtils;
import kafka.zk.EmbeddedZookeeper;
@@ -47,6 +45,7 @@ import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.metadata.BrokerState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -241,13 +240,13 @@ public class EmbeddedKafkaCluster {
}
/**
- * Get the brokers that have a {@link RunningAsBroker} state.
+ * Get the brokers that have a {@link BrokerState#RUNNING} state.
*
* @return the list of {@link KafkaServer} instances that are running;
* never null but possibly empty
*/
public Set runningBrokers() {
- return brokersInState(state -> state.currentState() == RunningAsBroker.state());
+ return brokersInState(state -> state == BrokerState.RUNNING);
}
/**
@@ -264,7 +263,7 @@ public class EmbeddedKafkaCluster {
protected boolean hasState(KafkaServer server, Predicate desiredState) {
try {
- return desiredState.test(server.brokerState());
+ return desiredState.test(server.brokerState().get());
} catch (Throwable e) {
// Broker failed to respond.
return false;
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index f5f77e03492..761ba69bfac 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -20,16 +20,17 @@ package kafka.log
import java.io._
import java.nio.file.Files
import java.util.concurrent._
-import java.util.concurrent.atomic.AtomicInteger
+import java.util.concurrent.atomic.{AtomicInteger, AtomicReference}
import kafka.metrics.KafkaMetricsGroup
import kafka.server.checkpoints.OffsetCheckpointFile
-import kafka.server.{BrokerState, RecoveringFromUncleanShutdown, _}
+import kafka.server._
import kafka.utils._
import kafka.zk.KafkaZkClient
import org.apache.kafka.common.{KafkaException, TopicPartition}
import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.errors.{KafkaStorageException, LogDirNotFoundException}
+import org.apache.kafka.metadata.BrokerState
import scala.jdk.CollectionConverters._
import scala.collection._
@@ -60,7 +61,7 @@ class LogManager(logDirs: Seq[File],
val retentionCheckMs: Long,
val maxPidExpirationMs: Int,
scheduler: Scheduler,
- val brokerState: BrokerState,
+ val brokerState: AtomicReference[BrokerState],
brokerTopicStats: BrokerTopicStats,
logDirFailureChannel: LogDirFailureChannel,
time: Time) extends Logging with KafkaMetricsGroup {
@@ -326,7 +327,7 @@ class LogManager(logDirs: Seq[File],
} else {
// log recovery itself is being performed by `Log` class during initialization
info(s"Attempting recovery for all logs in $logDirAbsolutePath since no clean shutdown file was found")
- brokerState.newState(RecoveringFromUncleanShutdown)
+ brokerState.set(BrokerState.RECOVERY)
}
var recoveryPoints = Map[TopicPartition, Long]()
@@ -1182,7 +1183,7 @@ object LogManager {
def apply(config: KafkaConfig,
initialOfflineDirs: Seq[String],
zkClient: KafkaZkClient,
- brokerState: BrokerState,
+ brokerState: AtomicReference[BrokerState],
kafkaScheduler: KafkaScheduler,
time: Time,
brokerTopicStats: BrokerTopicStats,
diff --git a/core/src/main/scala/kafka/server/BrokerStates.scala b/core/src/main/scala/kafka/server/BrokerStates.scala
deleted file mode 100644
index f53ed833c22..00000000000
--- a/core/src/main/scala/kafka/server/BrokerStates.scala
+++ /dev/null
@@ -1,78 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.server
-
-/**
- * Broker states are the possible state that a kafka broker can be in.
- * A broker should be only in one state at a time.
- * The expected state transition with the following defined states is:
- *
- * +-----------+
- * |Not Running|
- * +-----+-----+
- * |
- * v
- * +-----+-----+
- * |Starting +--+
- * +-----+-----+ | +----+------------+
- * | +>+RecoveringFrom |
- * v |UncleanShutdown |
- * +-------+-------+ +-------+---------+
- * |RunningAsBroker| |
- * +-------+-------+<-----------+
- * |
- * v
- * +-----+------------+
- * |PendingControlled |
- * |Shutdown |
- * +-----+------------+
- * |
- * v
- * +-----+----------+
- * |BrokerShutting |
- * |Down |
- * +-----+----------+
- * |
- * v
- * +-----+-----+
- * |Not Running|
- * +-----------+
- *
- * Custom states is also allowed for cases where there are custom kafka states for different scenarios.
- */
-sealed trait BrokerStates { def state: Byte }
-case object NotRunning extends BrokerStates { val state: Byte = 0 }
-case object Starting extends BrokerStates { val state: Byte = 1 }
-case object RecoveringFromUncleanShutdown extends BrokerStates { val state: Byte = 2 }
-case object RunningAsBroker extends BrokerStates { val state: Byte = 3 }
-case object PendingControlledShutdown extends BrokerStates { val state: Byte = 6 }
-case object BrokerShuttingDown extends BrokerStates { val state: Byte = 7 }
-
-
-case class BrokerState() {
- @volatile var currentState: Byte = NotRunning.state
-
- def newState(newState: BrokerStates): Unit = {
- this.newState(newState.state)
- }
-
- // Allowing undefined custom state
- def newState(newState: Byte): Unit = {
- currentState = newState
- }
-}
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 88abe460bb7..9eca82ee482 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -20,7 +20,7 @@ package kafka.server
import java.io.{File, IOException}
import java.net.{InetAddress, SocketTimeoutException}
import java.util.concurrent._
-import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger, AtomicReference}
import kafka.api.{KAFKA_0_9_0, KAFKA_2_2_IV0, KAFKA_2_4_IV1}
import kafka.cluster.Broker
@@ -46,6 +46,7 @@ import org.apache.kafka.common.security.token.delegation.internals.DelegationTok
import org.apache.kafka.common.security.{JaasContext, JaasUtils}
import org.apache.kafka.common.utils.{AppInfoParser, LogContext, Time}
import org.apache.kafka.common.{ClusterResource, Endpoint, Node}
+import org.apache.kafka.metadata.BrokerState
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.zookeeper.client.ZKClientConfig
@@ -102,7 +103,7 @@ class KafkaServer(
var kafkaYammerMetrics: KafkaYammerMetrics = null
var metrics: Metrics = null
- val brokerState: BrokerState = new BrokerState
+ val brokerState = new AtomicReference[BrokerState](BrokerState.NOT_RUNNING)
var dataPlaneRequestProcessor: KafkaApis = null
var controlPlaneRequestProcessor: KafkaApis = null
@@ -166,7 +167,7 @@ class KafkaServer(
private[kafka] def featureChangeListener = _featureChangeListener
- newGauge("BrokerState", () => brokerState.currentState)
+ newGauge("BrokerState", () => brokerState.get().value())
newGauge("ClusterId", () => clusterId)
newGauge("yammer-metrics-count", () => KafkaYammerMetrics.defaultRegistry.allMetrics.size)
@@ -193,7 +194,7 @@ class KafkaServer(
val canStartup = isStartingUp.compareAndSet(false, true)
if (canStartup) {
- brokerState.newState(Starting)
+ brokerState.set(BrokerState.STARTING)
/* setup zookeeper */
initZkClient(time)
@@ -380,7 +381,7 @@ class KafkaServer(
socketServer.startProcessingRequests(authorizerFutures)
- brokerState.newState(RunningAsBroker)
+ brokerState.set(BrokerState.RUNNING)
shutdownLatch = new CountDownLatch(1)
startupComplete.set(true)
isStartingUp.set(false)
@@ -632,7 +633,7 @@ class KafkaServer(
// the shutdown.
info("Starting controlled shutdown")
- brokerState.newState(PendingControlledShutdown)
+ brokerState.set(BrokerState.PENDING_CONTROLLED_SHUTDOWN)
val shutdownSucceeded = doControlledShutdown(config.controlledShutdownMaxRetries.intValue)
@@ -657,7 +658,7 @@ class KafkaServer(
// `true` at the end of this method.
if (shutdownLatch.getCount > 0 && isShuttingDown.compareAndSet(false, true)) {
CoreUtils.swallow(controlledShutdown(), this)
- brokerState.newState(BrokerShuttingDown)
+ brokerState.set(BrokerState.SHUTTING_DOWN)
if (dynamicConfigManager != null)
CoreUtils.swallow(dynamicConfigManager.shutdown(), this)
@@ -726,7 +727,7 @@ class KafkaServer(
// Clear all reconfigurable instances stored in DynamicBrokerConfig
config.dynamicConfig.clear()
- brokerState.newState(NotRunning)
+ brokerState.set(BrokerState.NOT_RUNNING)
startupComplete.set(false)
isShuttingDown.set(false)
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index ee107dc768c..e9694103d79 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -20,6 +20,7 @@ package kafka.log
import java.io._
import java.nio.ByteBuffer
import java.nio.file.{Files, Paths}
+import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.{Callable, Executors}
import java.util.regex.Pattern
import java.util.{Collections, Optional, Properties}
@@ -30,7 +31,7 @@ import kafka.log.Log.DeleteDirSuffix
import kafka.metrics.KafkaYammerMetrics
import kafka.server.checkpoints.LeaderEpochCheckpointFile
import kafka.server.epoch.{EpochEntry, LeaderEpochFileCache}
-import kafka.server.{BrokerState, BrokerTopicStats, FetchDataInfo, FetchHighWatermark, FetchIsolation, FetchLogEnd, FetchTxnCommitted, KafkaConfig, LogDirFailureChannel, LogOffsetMetadata, PartitionMetadataFile}
+import kafka.server.{BrokerTopicStats, FetchDataInfo, FetchHighWatermark, FetchIsolation, FetchLogEnd, FetchTxnCommitted, KafkaConfig, LogDirFailureChannel, LogOffsetMetadata, PartitionMetadataFile}
import kafka.utils._
import org.apache.kafka.common.{InvalidRecordException, KafkaException, TopicPartition, Uuid}
import org.apache.kafka.common.errors._
@@ -41,6 +42,7 @@ import org.apache.kafka.common.record._
import org.apache.kafka.common.requests.FetchResponse.AbortedTransaction
import org.apache.kafka.common.requests.{ListOffsetsRequest, ListOffsetsResponse}
import org.apache.kafka.common.utils.{Time, Utils}
+import org.apache.kafka.metadata.BrokerState
import org.easymock.EasyMock
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
@@ -96,7 +98,8 @@ class LogTest {
new LogManager(logDirs = logDirs.map(_.getAbsoluteFile), initialOfflineDirs = Array.empty[File], topicConfigs = Map(),
initialDefaultConfig = logConfig, cleanerConfig = CleanerConfig(enableCleaner = false), recoveryThreadsPerDataDir = 4,
flushCheckMs = 1000L, flushRecoveryOffsetCheckpointMs = 10000L, flushStartOffsetCheckpointMs = 10000L,
- retentionCheckMs = 1000L, maxPidExpirationMs = 60 * 60 * 1000, scheduler = time.scheduler, time = time, brokerState = BrokerState(),
+ retentionCheckMs = 1000L, maxPidExpirationMs = 60 * 60 * 1000, scheduler = time.scheduler, time = time,
+ brokerState = new AtomicReference[BrokerState](BrokerState.NOT_RUNNING),
brokerTopicStats = new BrokerTopicStats, logDirFailureChannel = new LogDirFailureChannel(logDirs.size)) {
override def loadLog(logDir: File, hadCleanShutdown: Boolean, recoveryPoints: Map[TopicPartition, Long],
diff --git a/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala b/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
index 13cd74e5a3b..720dbaf2792 100644
--- a/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
@@ -21,6 +21,7 @@ import java.io.{DataInputStream, DataOutputStream}
import java.net.Socket
import java.nio.ByteBuffer
import java.util.Properties
+
import kafka.api.IntegrationTestHarness
import kafka.network.SocketServer
import kafka.utils.NotNothing
@@ -28,6 +29,7 @@ import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.ApiKeys
import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, RequestHeader, RequestTestUtils, ResponseHeader}
import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.metadata.BrokerState
import scala.annotation.nowarn
import scala.collection.Seq
@@ -51,8 +53,8 @@ abstract class BaseRequestTest extends IntegrationTestHarness {
def anySocketServer: SocketServer = {
servers.find { server =>
- val state = server.brokerState.currentState
- state != NotRunning.state && state != BrokerShuttingDown.state
+ val state = server.brokerState.get()
+ state != BrokerState.NOT_RUNNING && state != BrokerState.SHUTTING_DOWN
}.map(_.socketServer).getOrElse(throw new IllegalStateException("No live broker is available"))
}
diff --git a/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala b/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
index 1b4508f0e11..d04866ebe77 100644
--- a/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
@@ -27,6 +27,7 @@ import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.message.MetadataRequestData
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{MetadataRequest, MetadataResponse}
+import org.apache.kafka.metadata.BrokerState
import org.apache.kafka.test.TestUtils.isValidClusterId
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{BeforeEach, Test}
@@ -332,7 +333,7 @@ class MetadataRequestTest extends BaseRequestTest {
@Test
def testIsrAfterBrokerShutDownAndJoinsBack(): Unit = {
def checkIsr(servers: Seq[KafkaServer], topic: String): Unit = {
- val activeBrokers = servers.filter(_.brokerState.currentState != NotRunning.state)
+ val activeBrokers = servers.filter(_.brokerState.get() != BrokerState.NOT_RUNNING)
val expectedIsr = activeBrokers.map(_.config.brokerId).toSet
// Assert that topic metadata at new brokers is updated correctly
@@ -378,7 +379,7 @@ class MetadataRequestTest extends BaseRequestTest {
val brokersInController = controllerMetadataResponse.get.brokers.asScala.toSeq.sortBy(_.id)
// Assert that metadata is propagated correctly
- servers.filter(_.brokerState.currentState != NotRunning.state).foreach { broker =>
+ servers.filter(_.brokerState.get() != BrokerState.NOT_RUNNING).foreach { broker =>
TestUtils.waitUntilTrue(() => {
val metadataResponse = sendMetadataRequest(MetadataRequest.Builder.allTopics.build,
Some(brokerSocketServer(broker.config.brokerId)))
diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
index a93949eaded..0239465b648 100755
--- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
@@ -39,6 +39,7 @@ import org.apache.kafka.common.requests.LeaderAndIsrRequest
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.serialization.{IntegerDeserializer, IntegerSerializer, StringDeserializer, StringSerializer}
import org.apache.kafka.common.utils.Time
+import org.apache.kafka.metadata.BrokerState
import org.junit.jupiter.api.{BeforeEach, Test}
import org.junit.jupiter.api.Assertions._
@@ -171,10 +172,10 @@ class ServerShutdownTest extends ZooKeeperTestHarness {
// goes wrong so that awaitShutdown doesn't hang
case e: Exception =>
assertTrue(exceptionClassTag.runtimeClass.isInstance(e), s"Unexpected exception $e")
- assertEquals(NotRunning.state, server.brokerState.currentState)
+ assertEquals(BrokerState.NOT_RUNNING, server.brokerState.get())
}
finally {
- if (server.brokerState.currentState != NotRunning.state)
+ if (server.brokerState.get() != BrokerState.NOT_RUNNING)
server.shutdown()
server.awaitShutdown()
}
diff --git a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
index 24130312f60..cc5706e40c9 100755
--- a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
@@ -20,8 +20,8 @@ package kafka.server
import kafka.utils.TestUtils
import kafka.zk.ZooKeeperTestHarness
import org.apache.kafka.common.KafkaException
+import org.apache.kafka.metadata.BrokerState
import org.apache.zookeeper.KeeperException.NodeExistsException
-import org.easymock.EasyMock
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, Test}
@@ -94,24 +94,15 @@ class ServerStartupTest extends ZooKeeperTestHarness {
@Test
def testBrokerStateRunningAfterZK(): Unit = {
val brokerId = 0
- val mockBrokerState: BrokerState = EasyMock.niceMock(classOf[BrokerState])
-
- class BrokerStateInterceptor() extends BrokerState {
- override def newState(newState: BrokerStates): Unit = {
- val brokers = zkClient.getAllBrokersInCluster
- assertEquals(1, brokers.size)
- assertEquals(brokerId, brokers.head.id)
- }
- }
-
- class MockKafkaServer(override val config: KafkaConfig, override val brokerState: BrokerState = mockBrokerState) extends KafkaServer(config) {}
val props = TestUtils.createBrokerConfig(brokerId, zkConnect)
- server = new MockKafkaServer(KafkaConfig.fromProps(props))
-
- EasyMock.expect(mockBrokerState.newState(RunningAsBroker)).andDelegateTo(new BrokerStateInterceptor).once()
- EasyMock.replay(mockBrokerState)
+ server = new KafkaServer(KafkaConfig.fromProps(props))
server.startup()
+ TestUtils.waitUntilTrue(() => server.brokerState.get() == BrokerState.RUNNING,
+ "waiting for the broker state to become RUNNING")
+ val brokers = zkClient.getAllBrokersInCluster
+ assertEquals(1, brokers.size)
+ assertEquals(brokerId, brokers.head.id)
}
}
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 998df4632a6..190a59e99cc 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -23,7 +23,7 @@ import java.nio.charset.{Charset, StandardCharsets}
import java.nio.file.{Files, StandardOpenOption}
import java.security.cert.X509Certificate
import java.time.Duration
-import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger, AtomicReference}
import java.util.{Arrays, Collections, Properties}
import java.util.concurrent.{Callable, ExecutionException, Executors, TimeUnit}
@@ -60,6 +60,7 @@ import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySe
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.utils.Utils._
import org.apache.kafka.common.{KafkaFuture, TopicPartition}
+import org.apache.kafka.metadata.BrokerState
import org.apache.kafka.server.authorizer.{Authorizer => JAuthorizer}
import org.apache.kafka.test.{TestSslUtils, TestUtils => JTestUtils}
import org.apache.zookeeper.KeeperException.SessionExpiredException
@@ -1076,7 +1077,7 @@ object TestUtils extends Logging {
maxPidExpirationMs = 60 * 60 * 1000,
scheduler = time.scheduler,
time = time,
- brokerState = BrokerState(),
+ brokerState = new AtomicReference[BrokerState](BrokerState.NOT_RUNNING),
brokerTopicStats = new BrokerTopicStats,
logDirFailureChannel = new LogDirFailureChannel(logDirs.size))
}
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
index 26fb960855b..a3960546882 100644
--- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
@@ -28,7 +28,6 @@ import kafka.log.LogAppendInfo;
import kafka.log.LogConfig;
import kafka.log.LogManager;
import kafka.server.AlterIsrManager;
-import kafka.server.BrokerState;
import kafka.server.BrokerTopicStats;
import kafka.server.FailedPartitions;
import kafka.server.InitialFetchState;
@@ -56,6 +55,7 @@ import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.FetchResponse;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.metadata.BrokerState;
import org.mockito.Mockito;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
@@ -88,6 +88,7 @@ import java.util.Optional;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
@State(Scope.Benchmark)
@Fork(value = 1)
@@ -132,7 +133,7 @@ public class ReplicaFetcherThreadBenchmark {
1000L,
60000,
scheduler,
- new BrokerState(),
+ new AtomicReference<>(BrokerState.NOT_RUNNING),
brokerTopicStats,
logDirFailureChannel,
Time.SYSTEM);
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java
index 9598390b35c..dcff8c4eb21 100644
--- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java
@@ -26,7 +26,6 @@ import kafka.log.Defaults;
import kafka.log.LogConfig;
import kafka.log.LogManager;
import kafka.server.AlterIsrManager;
-import kafka.server.BrokerState;
import kafka.server.BrokerTopicStats;
import kafka.server.LogDirFailureChannel;
import kafka.server.MetadataCache;
@@ -39,6 +38,7 @@ import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.SimpleRecord;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.metadata.BrokerState;
import org.mockito.Mockito;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
@@ -67,6 +67,7 @@ import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
@State(Scope.Benchmark)
@Fork(value = 1)
@@ -108,7 +109,7 @@ public class PartitionMakeFollowerBenchmark {
1000L,
60000,
scheduler,
- new BrokerState(),
+ new AtomicReference<>(BrokerState.NOT_RUNNING),
brokerTopicStats,
logDirFailureChannel,
Time.SYSTEM);
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java
index a58bd7dd8cb..cb237f4ffe1 100644
--- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java
@@ -26,7 +26,6 @@ import kafka.log.Defaults;
import kafka.log.LogConfig;
import kafka.log.LogManager;
import kafka.server.AlterIsrManager;
-import kafka.server.BrokerState;
import kafka.server.BrokerTopicStats;
import kafka.server.LogDirFailureChannel;
import kafka.server.LogOffsetMetadata;
@@ -36,6 +35,7 @@ import kafka.utils.KafkaScheduler;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState;
import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.metadata.BrokerState;
import org.mockito.Mockito;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
@@ -59,6 +59,7 @@ import java.util.List;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
@State(Scope.Benchmark)
@Fork(value = 1)
@@ -93,7 +94,7 @@ public class UpdateFollowerFetchStateBenchmark {
1000L,
60000,
scheduler,
- new BrokerState(),
+ new AtomicReference<>(BrokerState.NOT_RUNNING),
brokerTopicStats,
logDirFailureChannel,
Time.SYSTEM);