MINOR: Replace BrokerStates.scala with BrokerState.java (#10028)

Replace BrokerStates.scala with BrokerState.java, to make it easier to use from Java code if needed.  This also makes it easier to go from a numeric type to an enum.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Colin Patrick McCabe 2021-02-03 13:41:38 -08:00 committed by GitHub
parent 70404baffa
commit 772f2cfc82
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 56 additions and 128 deletions

View File

@ -749,6 +749,7 @@ project(':core') {
dependencies { dependencies {
compile project(':clients') compile project(':clients')
compile project(':metadata')
compile project(':raft') compile project(':raft')
compile libs.jacksonDatabind compile libs.jacksonDatabind
compile libs.jacksonModuleScala compile libs.jacksonModuleScala

View File

@ -49,6 +49,7 @@
<allow pkg="kafka.security.authorizer"/> <allow pkg="kafka.security.authorizer"/>
<allow pkg="org.apache.kafka.server"/> <allow pkg="org.apache.kafka.server"/>
<allow pkg="org.apache.kafka.clients"/> <allow pkg="org.apache.kafka.clients"/>
<allow pkg="org.apache.kafka.metadata"/>
<allow pkg="org.apache.kafka.timeline" /> <allow pkg="org.apache.kafka.timeline" />
<subpackage name="cache"> <subpackage name="cache">

View File

@ -469,6 +469,7 @@
<allow class="javax.servlet.http.HttpServletResponse" /> <allow class="javax.servlet.http.HttpServletResponse" />
<allow class="javax.ws.rs.core.Response" /> <allow class="javax.ws.rs.core.Response" />
<allow pkg="com.fasterxml.jackson.core.type" /> <allow pkg="com.fasterxml.jackson.core.type" />
<allow pkg="org.apache.kafka.metadata" />
</subpackage> </subpackage>
</subpackage> </subpackage>

View File

@ -16,11 +16,9 @@
*/ */
package org.apache.kafka.connect.util.clusters; package org.apache.kafka.connect.util.clusters;
import kafka.server.BrokerState;
import kafka.server.KafkaConfig; import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$; import kafka.server.KafkaConfig$;
import kafka.server.KafkaServer; import kafka.server.KafkaServer;
import kafka.server.RunningAsBroker;
import kafka.utils.CoreUtils; import kafka.utils.CoreUtils;
import kafka.utils.TestUtils; import kafka.utils.TestUtils;
import kafka.zk.EmbeddedZookeeper; import kafka.zk.EmbeddedZookeeper;
@ -47,6 +45,7 @@ import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.metadata.BrokerState;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -241,13 +240,13 @@ public class EmbeddedKafkaCluster {
} }
/** /**
* Get the brokers that have a {@link RunningAsBroker} state. * Get the brokers that have a {@link BrokerState#RUNNING} state.
* *
* @return the list of {@link KafkaServer} instances that are running; * @return the list of {@link KafkaServer} instances that are running;
* never null but possibly empty * never null but possibly empty
*/ */
public Set<KafkaServer> runningBrokers() { public Set<KafkaServer> runningBrokers() {
return brokersInState(state -> state.currentState() == RunningAsBroker.state()); return brokersInState(state -> state == BrokerState.RUNNING);
} }
/** /**
@ -264,7 +263,7 @@ public class EmbeddedKafkaCluster {
protected boolean hasState(KafkaServer server, Predicate<BrokerState> desiredState) { protected boolean hasState(KafkaServer server, Predicate<BrokerState> desiredState) {
try { try {
return desiredState.test(server.brokerState()); return desiredState.test(server.brokerState().get());
} catch (Throwable e) { } catch (Throwable e) {
// Broker failed to respond. // Broker failed to respond.
return false; return false;

View File

@ -20,16 +20,17 @@ package kafka.log
import java.io._ import java.io._
import java.nio.file.Files import java.nio.file.Files
import java.util.concurrent._ import java.util.concurrent._
import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.{AtomicInteger, AtomicReference}
import kafka.metrics.KafkaMetricsGroup import kafka.metrics.KafkaMetricsGroup
import kafka.server.checkpoints.OffsetCheckpointFile import kafka.server.checkpoints.OffsetCheckpointFile
import kafka.server.{BrokerState, RecoveringFromUncleanShutdown, _} import kafka.server._
import kafka.utils._ import kafka.utils._
import kafka.zk.KafkaZkClient import kafka.zk.KafkaZkClient
import org.apache.kafka.common.{KafkaException, TopicPartition} import org.apache.kafka.common.{KafkaException, TopicPartition}
import org.apache.kafka.common.utils.Time import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.errors.{KafkaStorageException, LogDirNotFoundException} import org.apache.kafka.common.errors.{KafkaStorageException, LogDirNotFoundException}
import org.apache.kafka.metadata.BrokerState
import scala.jdk.CollectionConverters._ import scala.jdk.CollectionConverters._
import scala.collection._ import scala.collection._
@ -60,7 +61,7 @@ class LogManager(logDirs: Seq[File],
val retentionCheckMs: Long, val retentionCheckMs: Long,
val maxPidExpirationMs: Int, val maxPidExpirationMs: Int,
scheduler: Scheduler, scheduler: Scheduler,
val brokerState: BrokerState, val brokerState: AtomicReference[BrokerState],
brokerTopicStats: BrokerTopicStats, brokerTopicStats: BrokerTopicStats,
logDirFailureChannel: LogDirFailureChannel, logDirFailureChannel: LogDirFailureChannel,
time: Time) extends Logging with KafkaMetricsGroup { time: Time) extends Logging with KafkaMetricsGroup {
@ -326,7 +327,7 @@ class LogManager(logDirs: Seq[File],
} else { } else {
// log recovery itself is being performed by `Log` class during initialization // log recovery itself is being performed by `Log` class during initialization
info(s"Attempting recovery for all logs in $logDirAbsolutePath since no clean shutdown file was found") info(s"Attempting recovery for all logs in $logDirAbsolutePath since no clean shutdown file was found")
brokerState.newState(RecoveringFromUncleanShutdown) brokerState.set(BrokerState.RECOVERY)
} }
var recoveryPoints = Map[TopicPartition, Long]() var recoveryPoints = Map[TopicPartition, Long]()
@ -1182,7 +1183,7 @@ object LogManager {
def apply(config: KafkaConfig, def apply(config: KafkaConfig,
initialOfflineDirs: Seq[String], initialOfflineDirs: Seq[String],
zkClient: KafkaZkClient, zkClient: KafkaZkClient,
brokerState: BrokerState, brokerState: AtomicReference[BrokerState],
kafkaScheduler: KafkaScheduler, kafkaScheduler: KafkaScheduler,
time: Time, time: Time,
brokerTopicStats: BrokerTopicStats, brokerTopicStats: BrokerTopicStats,

View File

@ -1,78 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server
/**
* Broker states are the possible state that a kafka broker can be in.
* A broker should be only in one state at a time.
* The expected state transition with the following defined states is:
*
* +-----------+
* |Not Running|
* +-----+-----+
* |
* v
* +-----+-----+
* |Starting +--+
* +-----+-----+ | +----+------------+
* | +>+RecoveringFrom |
* v |UncleanShutdown |
* +-------+-------+ +-------+---------+
* |RunningAsBroker| |
* +-------+-------+<-----------+
* |
* v
* +-----+------------+
* |PendingControlled |
* |Shutdown |
* +-----+------------+
* |
* v
* +-----+----------+
* |BrokerShutting |
* |Down |
* +-----+----------+
* |
* v
* +-----+-----+
* |Not Running|
* +-----------+
*
* Custom states is also allowed for cases where there are custom kafka states for different scenarios.
*/
sealed trait BrokerStates { def state: Byte }
case object NotRunning extends BrokerStates { val state: Byte = 0 }
case object Starting extends BrokerStates { val state: Byte = 1 }
case object RecoveringFromUncleanShutdown extends BrokerStates { val state: Byte = 2 }
case object RunningAsBroker extends BrokerStates { val state: Byte = 3 }
case object PendingControlledShutdown extends BrokerStates { val state: Byte = 6 }
case object BrokerShuttingDown extends BrokerStates { val state: Byte = 7 }
case class BrokerState() {
@volatile var currentState: Byte = NotRunning.state
def newState(newState: BrokerStates): Unit = {
this.newState(newState.state)
}
// Allowing undefined custom state
def newState(newState: Byte): Unit = {
currentState = newState
}
}

View File

@ -20,7 +20,7 @@ package kafka.server
import java.io.{File, IOException} import java.io.{File, IOException}
import java.net.{InetAddress, SocketTimeoutException} import java.net.{InetAddress, SocketTimeoutException}
import java.util.concurrent._ import java.util.concurrent._
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger} import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger, AtomicReference}
import kafka.api.{KAFKA_0_9_0, KAFKA_2_2_IV0, KAFKA_2_4_IV1} import kafka.api.{KAFKA_0_9_0, KAFKA_2_2_IV0, KAFKA_2_4_IV1}
import kafka.cluster.Broker import kafka.cluster.Broker
@ -46,6 +46,7 @@ import org.apache.kafka.common.security.token.delegation.internals.DelegationTok
import org.apache.kafka.common.security.{JaasContext, JaasUtils} import org.apache.kafka.common.security.{JaasContext, JaasUtils}
import org.apache.kafka.common.utils.{AppInfoParser, LogContext, Time} import org.apache.kafka.common.utils.{AppInfoParser, LogContext, Time}
import org.apache.kafka.common.{ClusterResource, Endpoint, Node} import org.apache.kafka.common.{ClusterResource, Endpoint, Node}
import org.apache.kafka.metadata.BrokerState
import org.apache.kafka.server.authorizer.Authorizer import org.apache.kafka.server.authorizer.Authorizer
import org.apache.zookeeper.client.ZKClientConfig import org.apache.zookeeper.client.ZKClientConfig
@ -102,7 +103,7 @@ class KafkaServer(
var kafkaYammerMetrics: KafkaYammerMetrics = null var kafkaYammerMetrics: KafkaYammerMetrics = null
var metrics: Metrics = null var metrics: Metrics = null
val brokerState: BrokerState = new BrokerState val brokerState = new AtomicReference[BrokerState](BrokerState.NOT_RUNNING)
var dataPlaneRequestProcessor: KafkaApis = null var dataPlaneRequestProcessor: KafkaApis = null
var controlPlaneRequestProcessor: KafkaApis = null var controlPlaneRequestProcessor: KafkaApis = null
@ -166,7 +167,7 @@ class KafkaServer(
private[kafka] def featureChangeListener = _featureChangeListener private[kafka] def featureChangeListener = _featureChangeListener
newGauge("BrokerState", () => brokerState.currentState) newGauge("BrokerState", () => brokerState.get().value())
newGauge("ClusterId", () => clusterId) newGauge("ClusterId", () => clusterId)
newGauge("yammer-metrics-count", () => KafkaYammerMetrics.defaultRegistry.allMetrics.size) newGauge("yammer-metrics-count", () => KafkaYammerMetrics.defaultRegistry.allMetrics.size)
@ -193,7 +194,7 @@ class KafkaServer(
val canStartup = isStartingUp.compareAndSet(false, true) val canStartup = isStartingUp.compareAndSet(false, true)
if (canStartup) { if (canStartup) {
brokerState.newState(Starting) brokerState.set(BrokerState.STARTING)
/* setup zookeeper */ /* setup zookeeper */
initZkClient(time) initZkClient(time)
@ -380,7 +381,7 @@ class KafkaServer(
socketServer.startProcessingRequests(authorizerFutures) socketServer.startProcessingRequests(authorizerFutures)
brokerState.newState(RunningAsBroker) brokerState.set(BrokerState.RUNNING)
shutdownLatch = new CountDownLatch(1) shutdownLatch = new CountDownLatch(1)
startupComplete.set(true) startupComplete.set(true)
isStartingUp.set(false) isStartingUp.set(false)
@ -632,7 +633,7 @@ class KafkaServer(
// the shutdown. // the shutdown.
info("Starting controlled shutdown") info("Starting controlled shutdown")
brokerState.newState(PendingControlledShutdown) brokerState.set(BrokerState.PENDING_CONTROLLED_SHUTDOWN)
val shutdownSucceeded = doControlledShutdown(config.controlledShutdownMaxRetries.intValue) val shutdownSucceeded = doControlledShutdown(config.controlledShutdownMaxRetries.intValue)
@ -657,7 +658,7 @@ class KafkaServer(
// `true` at the end of this method. // `true` at the end of this method.
if (shutdownLatch.getCount > 0 && isShuttingDown.compareAndSet(false, true)) { if (shutdownLatch.getCount > 0 && isShuttingDown.compareAndSet(false, true)) {
CoreUtils.swallow(controlledShutdown(), this) CoreUtils.swallow(controlledShutdown(), this)
brokerState.newState(BrokerShuttingDown) brokerState.set(BrokerState.SHUTTING_DOWN)
if (dynamicConfigManager != null) if (dynamicConfigManager != null)
CoreUtils.swallow(dynamicConfigManager.shutdown(), this) CoreUtils.swallow(dynamicConfigManager.shutdown(), this)
@ -726,7 +727,7 @@ class KafkaServer(
// Clear all reconfigurable instances stored in DynamicBrokerConfig // Clear all reconfigurable instances stored in DynamicBrokerConfig
config.dynamicConfig.clear() config.dynamicConfig.clear()
brokerState.newState(NotRunning) brokerState.set(BrokerState.NOT_RUNNING)
startupComplete.set(false) startupComplete.set(false)
isShuttingDown.set(false) isShuttingDown.set(false)

View File

@ -20,6 +20,7 @@ package kafka.log
import java.io._ import java.io._
import java.nio.ByteBuffer import java.nio.ByteBuffer
import java.nio.file.{Files, Paths} import java.nio.file.{Files, Paths}
import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.{Callable, Executors} import java.util.concurrent.{Callable, Executors}
import java.util.regex.Pattern import java.util.regex.Pattern
import java.util.{Collections, Optional, Properties} import java.util.{Collections, Optional, Properties}
@ -30,7 +31,7 @@ import kafka.log.Log.DeleteDirSuffix
import kafka.metrics.KafkaYammerMetrics import kafka.metrics.KafkaYammerMetrics
import kafka.server.checkpoints.LeaderEpochCheckpointFile import kafka.server.checkpoints.LeaderEpochCheckpointFile
import kafka.server.epoch.{EpochEntry, LeaderEpochFileCache} import kafka.server.epoch.{EpochEntry, LeaderEpochFileCache}
import kafka.server.{BrokerState, BrokerTopicStats, FetchDataInfo, FetchHighWatermark, FetchIsolation, FetchLogEnd, FetchTxnCommitted, KafkaConfig, LogDirFailureChannel, LogOffsetMetadata, PartitionMetadataFile} import kafka.server.{BrokerTopicStats, FetchDataInfo, FetchHighWatermark, FetchIsolation, FetchLogEnd, FetchTxnCommitted, KafkaConfig, LogDirFailureChannel, LogOffsetMetadata, PartitionMetadataFile}
import kafka.utils._ import kafka.utils._
import org.apache.kafka.common.{InvalidRecordException, KafkaException, TopicPartition, Uuid} import org.apache.kafka.common.{InvalidRecordException, KafkaException, TopicPartition, Uuid}
import org.apache.kafka.common.errors._ import org.apache.kafka.common.errors._
@ -41,6 +42,7 @@ import org.apache.kafka.common.record._
import org.apache.kafka.common.requests.FetchResponse.AbortedTransaction import org.apache.kafka.common.requests.FetchResponse.AbortedTransaction
import org.apache.kafka.common.requests.{ListOffsetsRequest, ListOffsetsResponse} import org.apache.kafka.common.requests.{ListOffsetsRequest, ListOffsetsResponse}
import org.apache.kafka.common.utils.{Time, Utils} import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.metadata.BrokerState
import org.easymock.EasyMock import org.easymock.EasyMock
import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
@ -96,7 +98,8 @@ class LogTest {
new LogManager(logDirs = logDirs.map(_.getAbsoluteFile), initialOfflineDirs = Array.empty[File], topicConfigs = Map(), new LogManager(logDirs = logDirs.map(_.getAbsoluteFile), initialOfflineDirs = Array.empty[File], topicConfigs = Map(),
initialDefaultConfig = logConfig, cleanerConfig = CleanerConfig(enableCleaner = false), recoveryThreadsPerDataDir = 4, initialDefaultConfig = logConfig, cleanerConfig = CleanerConfig(enableCleaner = false), recoveryThreadsPerDataDir = 4,
flushCheckMs = 1000L, flushRecoveryOffsetCheckpointMs = 10000L, flushStartOffsetCheckpointMs = 10000L, flushCheckMs = 1000L, flushRecoveryOffsetCheckpointMs = 10000L, flushStartOffsetCheckpointMs = 10000L,
retentionCheckMs = 1000L, maxPidExpirationMs = 60 * 60 * 1000, scheduler = time.scheduler, time = time, brokerState = BrokerState(), retentionCheckMs = 1000L, maxPidExpirationMs = 60 * 60 * 1000, scheduler = time.scheduler, time = time,
brokerState = new AtomicReference[BrokerState](BrokerState.NOT_RUNNING),
brokerTopicStats = new BrokerTopicStats, logDirFailureChannel = new LogDirFailureChannel(logDirs.size)) { brokerTopicStats = new BrokerTopicStats, logDirFailureChannel = new LogDirFailureChannel(logDirs.size)) {
override def loadLog(logDir: File, hadCleanShutdown: Boolean, recoveryPoints: Map[TopicPartition, Long], override def loadLog(logDir: File, hadCleanShutdown: Boolean, recoveryPoints: Map[TopicPartition, Long],

View File

@ -21,6 +21,7 @@ import java.io.{DataInputStream, DataOutputStream}
import java.net.Socket import java.net.Socket
import java.nio.ByteBuffer import java.nio.ByteBuffer
import java.util.Properties import java.util.Properties
import kafka.api.IntegrationTestHarness import kafka.api.IntegrationTestHarness
import kafka.network.SocketServer import kafka.network.SocketServer
import kafka.utils.NotNothing import kafka.utils.NotNothing
@ -28,6 +29,7 @@ import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.ApiKeys import org.apache.kafka.common.protocol.ApiKeys
import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, RequestHeader, RequestTestUtils, ResponseHeader} import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, RequestHeader, RequestTestUtils, ResponseHeader}
import org.apache.kafka.common.utils.Utils import org.apache.kafka.common.utils.Utils
import org.apache.kafka.metadata.BrokerState
import scala.annotation.nowarn import scala.annotation.nowarn
import scala.collection.Seq import scala.collection.Seq
@ -51,8 +53,8 @@ abstract class BaseRequestTest extends IntegrationTestHarness {
def anySocketServer: SocketServer = { def anySocketServer: SocketServer = {
servers.find { server => servers.find { server =>
val state = server.brokerState.currentState val state = server.brokerState.get()
state != NotRunning.state && state != BrokerShuttingDown.state state != BrokerState.NOT_RUNNING && state != BrokerState.SHUTTING_DOWN
}.map(_.socketServer).getOrElse(throw new IllegalStateException("No live broker is available")) }.map(_.socketServer).getOrElse(throw new IllegalStateException("No live broker is available"))
} }

View File

@ -27,6 +27,7 @@ import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.message.MetadataRequestData import org.apache.kafka.common.message.MetadataRequestData
import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{MetadataRequest, MetadataResponse} import org.apache.kafka.common.requests.{MetadataRequest, MetadataResponse}
import org.apache.kafka.metadata.BrokerState
import org.apache.kafka.test.TestUtils.isValidClusterId import org.apache.kafka.test.TestUtils.isValidClusterId
import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{BeforeEach, Test} import org.junit.jupiter.api.{BeforeEach, Test}
@ -332,7 +333,7 @@ class MetadataRequestTest extends BaseRequestTest {
@Test @Test
def testIsrAfterBrokerShutDownAndJoinsBack(): Unit = { def testIsrAfterBrokerShutDownAndJoinsBack(): Unit = {
def checkIsr(servers: Seq[KafkaServer], topic: String): Unit = { def checkIsr(servers: Seq[KafkaServer], topic: String): Unit = {
val activeBrokers = servers.filter(_.brokerState.currentState != NotRunning.state) val activeBrokers = servers.filter(_.brokerState.get() != BrokerState.NOT_RUNNING)
val expectedIsr = activeBrokers.map(_.config.brokerId).toSet val expectedIsr = activeBrokers.map(_.config.brokerId).toSet
// Assert that topic metadata at new brokers is updated correctly // Assert that topic metadata at new brokers is updated correctly
@ -378,7 +379,7 @@ class MetadataRequestTest extends BaseRequestTest {
val brokersInController = controllerMetadataResponse.get.brokers.asScala.toSeq.sortBy(_.id) val brokersInController = controllerMetadataResponse.get.brokers.asScala.toSeq.sortBy(_.id)
// Assert that metadata is propagated correctly // Assert that metadata is propagated correctly
servers.filter(_.brokerState.currentState != NotRunning.state).foreach { broker => servers.filter(_.brokerState.get() != BrokerState.NOT_RUNNING).foreach { broker =>
TestUtils.waitUntilTrue(() => { TestUtils.waitUntilTrue(() => {
val metadataResponse = sendMetadataRequest(MetadataRequest.Builder.allTopics.build, val metadataResponse = sendMetadataRequest(MetadataRequest.Builder.allTopics.build,
Some(brokerSocketServer(broker.config.brokerId))) Some(brokerSocketServer(broker.config.brokerId)))

View File

@ -39,6 +39,7 @@ import org.apache.kafka.common.requests.LeaderAndIsrRequest
import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.serialization.{IntegerDeserializer, IntegerSerializer, StringDeserializer, StringSerializer} import org.apache.kafka.common.serialization.{IntegerDeserializer, IntegerSerializer, StringDeserializer, StringSerializer}
import org.apache.kafka.common.utils.Time import org.apache.kafka.common.utils.Time
import org.apache.kafka.metadata.BrokerState
import org.junit.jupiter.api.{BeforeEach, Test} import org.junit.jupiter.api.{BeforeEach, Test}
import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Assertions._
@ -171,10 +172,10 @@ class ServerShutdownTest extends ZooKeeperTestHarness {
// goes wrong so that awaitShutdown doesn't hang // goes wrong so that awaitShutdown doesn't hang
case e: Exception => case e: Exception =>
assertTrue(exceptionClassTag.runtimeClass.isInstance(e), s"Unexpected exception $e") assertTrue(exceptionClassTag.runtimeClass.isInstance(e), s"Unexpected exception $e")
assertEquals(NotRunning.state, server.brokerState.currentState) assertEquals(BrokerState.NOT_RUNNING, server.brokerState.get())
} }
finally { finally {
if (server.brokerState.currentState != NotRunning.state) if (server.brokerState.get() != BrokerState.NOT_RUNNING)
server.shutdown() server.shutdown()
server.awaitShutdown() server.awaitShutdown()
} }

View File

@ -20,8 +20,8 @@ package kafka.server
import kafka.utils.TestUtils import kafka.utils.TestUtils
import kafka.zk.ZooKeeperTestHarness import kafka.zk.ZooKeeperTestHarness
import org.apache.kafka.common.KafkaException import org.apache.kafka.common.KafkaException
import org.apache.kafka.metadata.BrokerState
import org.apache.zookeeper.KeeperException.NodeExistsException import org.apache.zookeeper.KeeperException.NodeExistsException
import org.easymock.EasyMock
import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, Test} import org.junit.jupiter.api.{AfterEach, Test}
@ -94,24 +94,15 @@ class ServerStartupTest extends ZooKeeperTestHarness {
@Test @Test
def testBrokerStateRunningAfterZK(): Unit = { def testBrokerStateRunningAfterZK(): Unit = {
val brokerId = 0 val brokerId = 0
val mockBrokerState: BrokerState = EasyMock.niceMock(classOf[BrokerState])
class BrokerStateInterceptor() extends BrokerState {
override def newState(newState: BrokerStates): Unit = {
val brokers = zkClient.getAllBrokersInCluster
assertEquals(1, brokers.size)
assertEquals(brokerId, brokers.head.id)
}
}
class MockKafkaServer(override val config: KafkaConfig, override val brokerState: BrokerState = mockBrokerState) extends KafkaServer(config) {}
val props = TestUtils.createBrokerConfig(brokerId, zkConnect) val props = TestUtils.createBrokerConfig(brokerId, zkConnect)
server = new MockKafkaServer(KafkaConfig.fromProps(props)) server = new KafkaServer(KafkaConfig.fromProps(props))
EasyMock.expect(mockBrokerState.newState(RunningAsBroker)).andDelegateTo(new BrokerStateInterceptor).once()
EasyMock.replay(mockBrokerState)
server.startup() server.startup()
TestUtils.waitUntilTrue(() => server.brokerState.get() == BrokerState.RUNNING,
"waiting for the broker state to become RUNNING")
val brokers = zkClient.getAllBrokersInCluster
assertEquals(1, brokers.size)
assertEquals(brokerId, brokers.head.id)
} }
} }

View File

@ -23,7 +23,7 @@ import java.nio.charset.{Charset, StandardCharsets}
import java.nio.file.{Files, StandardOpenOption} import java.nio.file.{Files, StandardOpenOption}
import java.security.cert.X509Certificate import java.security.cert.X509Certificate
import java.time.Duration import java.time.Duration
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger} import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger, AtomicReference}
import java.util.{Arrays, Collections, Properties} import java.util.{Arrays, Collections, Properties}
import java.util.concurrent.{Callable, ExecutionException, Executors, TimeUnit} import java.util.concurrent.{Callable, ExecutionException, Executors, TimeUnit}
@ -60,6 +60,7 @@ import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySe
import org.apache.kafka.common.utils.{Time, Utils} import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.utils.Utils._ import org.apache.kafka.common.utils.Utils._
import org.apache.kafka.common.{KafkaFuture, TopicPartition} import org.apache.kafka.common.{KafkaFuture, TopicPartition}
import org.apache.kafka.metadata.BrokerState
import org.apache.kafka.server.authorizer.{Authorizer => JAuthorizer} import org.apache.kafka.server.authorizer.{Authorizer => JAuthorizer}
import org.apache.kafka.test.{TestSslUtils, TestUtils => JTestUtils} import org.apache.kafka.test.{TestSslUtils, TestUtils => JTestUtils}
import org.apache.zookeeper.KeeperException.SessionExpiredException import org.apache.zookeeper.KeeperException.SessionExpiredException
@ -1076,7 +1077,7 @@ object TestUtils extends Logging {
maxPidExpirationMs = 60 * 60 * 1000, maxPidExpirationMs = 60 * 60 * 1000,
scheduler = time.scheduler, scheduler = time.scheduler,
time = time, time = time,
brokerState = BrokerState(), brokerState = new AtomicReference[BrokerState](BrokerState.NOT_RUNNING),
brokerTopicStats = new BrokerTopicStats, brokerTopicStats = new BrokerTopicStats,
logDirFailureChannel = new LogDirFailureChannel(logDirs.size)) logDirFailureChannel = new LogDirFailureChannel(logDirs.size))
} }

View File

@ -28,7 +28,6 @@ import kafka.log.LogAppendInfo;
import kafka.log.LogConfig; import kafka.log.LogConfig;
import kafka.log.LogManager; import kafka.log.LogManager;
import kafka.server.AlterIsrManager; import kafka.server.AlterIsrManager;
import kafka.server.BrokerState;
import kafka.server.BrokerTopicStats; import kafka.server.BrokerTopicStats;
import kafka.server.FailedPartitions; import kafka.server.FailedPartitions;
import kafka.server.InitialFetchState; import kafka.server.InitialFetchState;
@ -56,6 +55,7 @@ import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.FetchResponse; import org.apache.kafka.common.requests.FetchResponse;
import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils; import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.metadata.BrokerState;
import org.mockito.Mockito; import org.mockito.Mockito;
import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode; import org.openjdk.jmh.annotations.BenchmarkMode;
@ -88,6 +88,7 @@ import java.util.Optional;
import java.util.Properties; import java.util.Properties;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@State(Scope.Benchmark) @State(Scope.Benchmark)
@Fork(value = 1) @Fork(value = 1)
@ -132,7 +133,7 @@ public class ReplicaFetcherThreadBenchmark {
1000L, 1000L,
60000, 60000,
scheduler, scheduler,
new BrokerState(), new AtomicReference<>(BrokerState.NOT_RUNNING),
brokerTopicStats, brokerTopicStats,
logDirFailureChannel, logDirFailureChannel,
Time.SYSTEM); Time.SYSTEM);

View File

@ -26,7 +26,6 @@ import kafka.log.Defaults;
import kafka.log.LogConfig; import kafka.log.LogConfig;
import kafka.log.LogManager; import kafka.log.LogManager;
import kafka.server.AlterIsrManager; import kafka.server.AlterIsrManager;
import kafka.server.BrokerState;
import kafka.server.BrokerTopicStats; import kafka.server.BrokerTopicStats;
import kafka.server.LogDirFailureChannel; import kafka.server.LogDirFailureChannel;
import kafka.server.MetadataCache; import kafka.server.MetadataCache;
@ -39,6 +38,7 @@ import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.SimpleRecord; import org.apache.kafka.common.record.SimpleRecord;
import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils; import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.metadata.BrokerState;
import org.mockito.Mockito; import org.mockito.Mockito;
import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode; import org.openjdk.jmh.annotations.BenchmarkMode;
@ -67,6 +67,7 @@ import java.util.UUID;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@State(Scope.Benchmark) @State(Scope.Benchmark)
@Fork(value = 1) @Fork(value = 1)
@ -108,7 +109,7 @@ public class PartitionMakeFollowerBenchmark {
1000L, 1000L,
60000, 60000,
scheduler, scheduler,
new BrokerState(), new AtomicReference<>(BrokerState.NOT_RUNNING),
brokerTopicStats, brokerTopicStats,
logDirFailureChannel, logDirFailureChannel,
Time.SYSTEM); Time.SYSTEM);

View File

@ -26,7 +26,6 @@ import kafka.log.Defaults;
import kafka.log.LogConfig; import kafka.log.LogConfig;
import kafka.log.LogManager; import kafka.log.LogManager;
import kafka.server.AlterIsrManager; import kafka.server.AlterIsrManager;
import kafka.server.BrokerState;
import kafka.server.BrokerTopicStats; import kafka.server.BrokerTopicStats;
import kafka.server.LogDirFailureChannel; import kafka.server.LogDirFailureChannel;
import kafka.server.LogOffsetMetadata; import kafka.server.LogOffsetMetadata;
@ -36,6 +35,7 @@ import kafka.utils.KafkaScheduler;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState; import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState;
import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Time;
import org.apache.kafka.metadata.BrokerState;
import org.mockito.Mockito; import org.mockito.Mockito;
import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode; import org.openjdk.jmh.annotations.BenchmarkMode;
@ -59,6 +59,7 @@ import java.util.List;
import java.util.Properties; import java.util.Properties;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@State(Scope.Benchmark) @State(Scope.Benchmark)
@Fork(value = 1) @Fork(value = 1)
@ -93,7 +94,7 @@ public class UpdateFollowerFetchStateBenchmark {
1000L, 1000L,
60000, 60000,
scheduler, scheduler,
new BrokerState(), new AtomicReference<>(BrokerState.NOT_RUNNING),
brokerTopicStats, brokerTopicStats,
logDirFailureChannel, logDirFailureChannel,
Time.SYSTEM); Time.SYSTEM);