KAFKA-12342: Remove MetaLogShim and use RaftClient directly (#10705)

This patch removes the temporary shim layer we added to bridge the interface
differences between MetaLogManager and RaftClient. Instead, we now use the
RaftClient directly from the metadata module.  This also means that the
metadata gradle module now depends on raft, rather than the other way around.
Finally, this PR also consolidates the handleResign and handleNewLeader APIs
into a single handleLeaderChange API.

Co-authored-by: Jason Gustafson <jason@confluent.io>
This commit is contained in:
José Armando García Sancio 2021-05-20 15:39:46 -07:00 committed by GitHub
parent b56d9e4416
commit f50f13d781
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
40 changed files with 967 additions and 951 deletions

View File

@ -1069,6 +1069,7 @@ project(':metadata') {
dependencies {
implementation project(':server-common')
implementation project(':clients')
implementation project(':raft')
implementation libs.jacksonDatabind
implementation libs.jacksonJDK8Datatypes
implementation libs.metrics
@ -1077,6 +1078,7 @@ project(':metadata') {
testImplementation libs.hamcrest
testImplementation libs.slf4jlog4j
testImplementation project(':clients').sourceSets.test.output
testImplementation project(':raft').sourceSets.test.output
}
task processMessages(type:JavaExec) {
@ -1267,7 +1269,6 @@ project(':raft') {
dependencies {
implementation project(':server-common')
implementation project(':clients')
implementation project(':metadata')
implementation libs.slf4jApi
implementation libs.jacksonDatabind

View File

@ -217,12 +217,15 @@
<allow pkg="org.apache.kafka.common.network" />
<allow pkg="org.apache.kafka.common.protocol" />
<allow pkg="org.apache.kafka.common.quota" />
<allow pkg="org.apache.kafka.common.record" />
<allow pkg="org.apache.kafka.common.requests" />
<allow pkg="org.apache.kafka.controller" />
<allow pkg="org.apache.kafka.metadata" />
<allow pkg="org.apache.kafka.metalog" />
<allow pkg="org.apache.kafka.queue" />
<allow pkg="org.apache.kafka.raft" />
<allow pkg="org.apache.kafka.server.common" />
<allow pkg="org.apache.kafka.snapshot" />
<allow pkg="org.apache.kafka.test" />
<allow pkg="org.apache.kafka.timeline" />
</subpackage>
@ -234,6 +237,8 @@
<allow pkg="org.apache.kafka.common.metadata" />
<allow pkg="org.apache.kafka.common.protocol" />
<allow pkg="org.apache.kafka.metadata" />
<allow pkg="org.apache.kafka.metalog" />
<allow pkg="org.apache.kafka.raft" />
<allow pkg="org.apache.kafka.server.common" />
<allow pkg="org.apache.kafka.test" />
</subpackage>
@ -243,6 +248,8 @@
<allow pkg="org.apache.kafka.common.protocol" />
<allow pkg="org.apache.kafka.metadata" />
<allow pkg="org.apache.kafka.metalog" />
<allow pkg="org.apache.kafka.raft" />
<allow pkg="org.apache.kafka.snapshot" />
<allow pkg="org.apache.kafka.queue" />
<allow pkg="org.apache.kafka.server.common" />
<allow pkg="org.apache.kafka.test" />
@ -292,7 +299,6 @@
<allow pkg="net.sourceforge.argparse4j" />
<allow pkg="org.apache.kafka.common"/>
<allow pkg="org.apache.kafka.metadata"/>
<allow pkg="org.apache.kafka.metalog"/>
<allow pkg="org.apache.kafka.queue"/>
<allow pkg="org.apache.kafka.raft"/>
<allow pkg="org.apache.kafka.server.common" />
@ -418,7 +424,6 @@
<allow pkg="org.apache.kafka.common.record" />
<allow pkg="org.apache.kafka.common.requests" />
<allow pkg="org.apache.kafka.common.protocol" />
<allow pkg="org.apache.kafka.metalog" />
<allow pkg="org.apache.kafka.server.common" />
<allow pkg="org.apache.kafka.server.common.serialization" />
<allow pkg="org.apache.kafka.test"/>

View File

@ -27,7 +27,6 @@ import kafka.server.{KafkaConfig, MetaProperties}
import kafka.utils.timer.SystemTimer
import kafka.utils.{KafkaScheduler, Logging, ShutdownableThread}
import org.apache.kafka.clients.{ApiVersions, ManualMetadataUpdater, NetworkClient}
import org.apache.kafka.common.{TopicPartition, Uuid}
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.{ChannelBuilders, ListenerName, NetworkReceive, Selectable, Selector}
import org.apache.kafka.common.protocol.ApiMessage
@ -35,8 +34,9 @@ import org.apache.kafka.common.requests.RequestHeader
import org.apache.kafka.common.security.JaasContext
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.{LogContext, Time}
import org.apache.kafka.common.{TopicPartition, Uuid}
import org.apache.kafka.raft.RaftConfig.{AddressSpec, InetAddressSpec, NON_ROUTABLE_ADDRESS, UnknownAddressSpec}
import org.apache.kafka.raft.{FileBasedStateStore, KafkaRaftClient, RaftClient, RaftConfig, RaftRequest}
import org.apache.kafka.raft.{FileBasedStateStore, KafkaRaftClient, LeaderAndEpoch, RaftClient, RaftConfig, RaftRequest}
import org.apache.kafka.server.common.serialization.RecordSerde
import scala.jdk.CollectionConverters._
@ -100,6 +100,10 @@ trait RaftManager[T] {
epoch: Int,
records: Seq[T]
): Option[Long]
def leaderAndEpoch: LeaderAndEpoch
def client: RaftClient[T]
}
class KafkaRaftManager[T](
@ -125,10 +129,10 @@ class KafkaRaftManager[T](
private val dataDir = createDataDir()
private val metadataLog = buildMetadataLog()
private val netChannel = buildNetworkChannel()
private val raftClient = buildRaftClient()
private val raftIoThread = new RaftIoThread(raftClient, threadNamePrefix)
val client: KafkaRaftClient[T] = buildRaftClient()
private val raftIoThread = new RaftIoThread(client, threadNamePrefix)
def kafkaRaftClient: KafkaRaftClient[T] = raftClient
def kafkaRaftClient: KafkaRaftClient[T] = client
def startup(): Unit = {
// Update the voter endpoints (if valid) with what's in RaftConfig
@ -151,7 +155,7 @@ class KafkaRaftManager[T](
def shutdown(): Unit = {
raftIoThread.shutdown()
raftClient.close()
client.close()
scheduler.shutdown()
netChannel.close()
metadataLog.close()
@ -160,7 +164,7 @@ class KafkaRaftManager[T](
override def register(
listener: RaftClient.Listener[T]
): Unit = {
raftClient.register(listener)
client.register(listener)
}
override def scheduleAtomicAppend(
@ -183,9 +187,9 @@ class KafkaRaftManager[T](
isAtomic: Boolean
): Option[Long] = {
val offset = if (isAtomic) {
raftClient.scheduleAtomicAppend(epoch, records.asJava)
client.scheduleAtomicAppend(epoch, records.asJava)
} else {
raftClient.scheduleAppend(epoch, records.asJava)
client.scheduleAppend(epoch, records.asJava)
}
Option(offset).map(Long.unbox)
@ -202,7 +206,7 @@ class KafkaRaftManager[T](
createdTimeMs
)
raftClient.handle(inboundRequest)
client.handle(inboundRequest)
inboundRequest.completion.thenApply { response =>
response.data
@ -307,4 +311,7 @@ class KafkaRaftManager[T](
)
}
override def leaderAndEpoch: LeaderAndEpoch = {
client.leaderAndEpoch
}
}

View File

@ -18,9 +18,9 @@
package kafka.server
import java.util
import java.util.concurrent.{CompletableFuture, TimeUnit, TimeoutException}
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.{CompletableFuture, TimeUnit, TimeoutException}
import java.net.InetAddress
import kafka.cluster.Broker.ServerInfo
@ -29,6 +29,7 @@ import kafka.coordinator.transaction.{ProducerIdGenerator, TransactionCoordinato
import kafka.log.LogManager
import kafka.metrics.KafkaYammerMetrics
import kafka.network.SocketServer
import kafka.raft.RaftManager
import kafka.security.CredentialProvider
import kafka.server.metadata.{BrokerMetadataListener, CachedConfigRepository, ClientQuotaCache, ClientQuotaMetadataManager, RaftMetadataCache}
import kafka.utils.{CoreUtils, KafkaScheduler}
@ -43,10 +44,10 @@ import org.apache.kafka.common.security.token.delegation.internals.DelegationTok
import org.apache.kafka.common.utils.{AppInfoParser, LogContext, Time, Utils}
import org.apache.kafka.common.{ClusterResource, Endpoint, KafkaException}
import org.apache.kafka.metadata.{BrokerState, VersionRange}
import org.apache.kafka.metalog.MetaLogManager
import org.apache.kafka.raft.RaftConfig
import org.apache.kafka.raft.RaftConfig.AddressSpec
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.common.ApiMessageAndVersion;
import scala.collection.{Map, Seq}
import scala.jdk.CollectionConverters._
@ -55,16 +56,16 @@ import scala.jdk.CollectionConverters._
* A Kafka broker that runs in KRaft (Kafka Raft) mode.
*/
class BrokerServer(
val config: KafkaConfig,
val metaProps: MetaProperties,
val metaLogManager: MetaLogManager,
val time: Time,
val metrics: Metrics,
val threadNamePrefix: Option[String],
val initialOfflineDirs: Seq[String],
val controllerQuorumVotersFuture: CompletableFuture[util.Map[Integer, AddressSpec]],
val supportedFeatures: util.Map[String, VersionRange]
) extends KafkaBroker {
val config: KafkaConfig,
val metaProps: MetaProperties,
val raftManager: RaftManager[ApiMessageAndVersion],
val time: Time,
val metrics: Metrics,
val threadNamePrefix: Option[String],
val initialOfflineDirs: Seq[String],
val controllerQuorumVotersFuture: CompletableFuture[util.Map[Integer, AddressSpec]],
val supportedFeatures: util.Map[String, VersionRange]
) extends KafkaBroker {
import kafka.server.Server._
@ -181,7 +182,7 @@ class BrokerServer(
credentialProvider = new CredentialProvider(ScramMechanism.mechanismNames, tokenCache)
val controllerNodes = RaftConfig.voterConnectionsToNodes(controllerQuorumVotersFuture.get()).asScala
val controllerNodeProvider = RaftControllerNodeProvider(metaLogManager, config, controllerNodes)
val controllerNodeProvider = RaftControllerNodeProvider(raftManager, config, controllerNodes)
clientToControllerChannelManager = BrokerToControllerChannelManager(
controllerNodeProvider,
@ -284,7 +285,7 @@ class BrokerServer(
metaProps.clusterId, networkListeners, supportedFeatures)
// Register a listener with the Raft layer to receive metadata event notifications
metaLogManager.register(brokerMetadataListener)
raftManager.register(brokerMetadataListener)
val endpoints = new util.ArrayList[Endpoint](networkListeners.size())
var interBrokerListener: Endpoint = null

View File

@ -21,6 +21,7 @@ import java.util.concurrent.LinkedBlockingDeque
import java.util.concurrent.atomic.AtomicReference
import kafka.common.{InterBrokerSendThread, RequestAndCompletionHandler}
import kafka.raft.RaftManager
import kafka.utils.Logging
import org.apache.kafka.clients._
import org.apache.kafka.common.Node
@ -31,9 +32,10 @@ import org.apache.kafka.common.requests.AbstractRequest
import org.apache.kafka.common.security.JaasContext
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.{LogContext, Time}
import org.apache.kafka.metalog.MetaLogManager
import org.apache.kafka.server.common.ApiMessageAndVersion;
import scala.collection.Seq
import scala.compat.java8.OptionConverters._
import scala.jdk.CollectionConverters._
trait ControllerNodeProvider {
@ -77,15 +79,14 @@ class MetadataCacheControllerNodeProvider(
}
object RaftControllerNodeProvider {
def apply(metaLogManager: MetaLogManager,
def apply(raftManager: RaftManager[ApiMessageAndVersion],
config: KafkaConfig,
controllerQuorumVoterNodes: Seq[Node]): RaftControllerNodeProvider = {
val controllerListenerName = new ListenerName(config.controllerListenerNames.head)
val controllerSecurityProtocol = config.listenerSecurityProtocolMap.getOrElse(controllerListenerName, SecurityProtocol.forName(controllerListenerName.value()))
val controllerSaslMechanism = config.saslMechanismControllerProtocol
new RaftControllerNodeProvider(
metaLogManager,
raftManager,
controllerQuorumVoterNodes,
controllerListenerName,
controllerSecurityProtocol,
@ -98,7 +99,7 @@ object RaftControllerNodeProvider {
* Finds the controller node by checking the metadata log manager.
* This provider is used when we are using a Raft-based metadata quorum.
*/
class RaftControllerNodeProvider(val metaLogManager: MetaLogManager,
class RaftControllerNodeProvider(val raftManager: RaftManager[ApiMessageAndVersion],
controllerQuorumVoterNodes: Seq[Node],
val listenerName: ListenerName,
val securityProtocol: SecurityProtocol,
@ -107,14 +108,7 @@ class RaftControllerNodeProvider(val metaLogManager: MetaLogManager,
val idToNode = controllerQuorumVoterNodes.map(node => node.id() -> node).toMap
override def get(): Option[Node] = {
val leader = metaLogManager.leader()
if (leader == null) {
None
} else if (leader.nodeId() < 0) {
None
} else {
idToNode.get(leader.nodeId())
}
raftManager.leaderAndEpoch.leaderId.asScala.map(idToNode)
}
}

View File

@ -17,9 +17,10 @@
package kafka.server
import java.util.concurrent.{CompletableFuture, TimeUnit}
import java.util
import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.{CompletableFuture, TimeUnit}
import kafka.cluster.Broker.ServerInfo
import kafka.log.LogConfig
import kafka.metrics.{KafkaMetricsGroup, KafkaYammerMetrics, LinuxIoMetricsCollector}
@ -37,7 +38,6 @@ import org.apache.kafka.common.utils.{LogContext, Time}
import org.apache.kafka.common.{ClusterResource, Endpoint}
import org.apache.kafka.controller.{Controller, QuorumController, QuorumControllerMetrics}
import org.apache.kafka.metadata.VersionRange
import org.apache.kafka.metalog.MetaLogManager
import org.apache.kafka.raft.RaftConfig
import org.apache.kafka.raft.RaftConfig.AddressSpec
import org.apache.kafka.server.authorizer.Authorizer
@ -49,15 +49,14 @@ import scala.jdk.CollectionConverters._
* A Kafka controller that runs in KRaft (Kafka Raft) mode.
*/
class ControllerServer(
val metaProperties: MetaProperties,
val config: KafkaConfig,
val metaLogManager: MetaLogManager,
val raftManager: RaftManager[ApiMessageAndVersion],
val time: Time,
val metrics: Metrics,
val threadNamePrefix: Option[String],
val controllerQuorumVotersFuture: CompletableFuture[util.Map[Integer, AddressSpec]]
) extends Logging with KafkaMetricsGroup {
val metaProperties: MetaProperties,
val config: KafkaConfig,
val raftManager: RaftManager[ApiMessageAndVersion],
val time: Time,
val metrics: Metrics,
val threadNamePrefix: Option[String],
val controllerQuorumVotersFuture: CompletableFuture[util.Map[Integer, AddressSpec]]
) extends Logging with KafkaMetricsGroup {
import kafka.server.Server._
val lock = new ReentrantLock()
@ -148,7 +147,7 @@ class ControllerServer(
setTime(time).
setThreadNamePrefix(threadNamePrefixAsString).
setConfigDefs(configDefs).
setLogManager(metaLogManager).
setRaftClient(raftManager.client).
setDefaultReplicationFactor(config.defaultReplicationFactor.toShort).
setDefaultNumPartitions(config.numPartitions.intValue()).
setSessionTimeoutNs(TimeUnit.NANOSECONDS.convert(config.brokerSessionTimeoutMs.longValue(),

View File

@ -24,10 +24,10 @@ import kafka.metrics.{KafkaMetricsReporter, KafkaYammerMetrics}
import kafka.raft.KafkaRaftManager
import kafka.server.KafkaRaftServer.{BrokerRole, ControllerRole}
import kafka.utils.{CoreUtils, Logging, Mx4jLoader, VerifiableProperties}
import org.apache.kafka.common.{TopicPartition, Uuid}
import org.apache.kafka.common.utils.{AppInfoParser, Time}
import org.apache.kafka.common.{TopicPartition, Uuid}
import org.apache.kafka.metadata.MetadataRecordSerde
import org.apache.kafka.raft.RaftConfig
import org.apache.kafka.raft.metadata.{MetaLogRaftShim, MetadataRecordSerde}
import org.apache.kafka.server.common.ApiMessageAndVersion
import scala.collection.Seq
@ -55,7 +55,7 @@ class KafkaRaftServer(
private val metrics = Server.initializeMetrics(
config,
time,
metaProps.clusterId.toString
metaProps.clusterId
)
private val controllerQuorumVotersFuture = CompletableFuture.completedFuture(
@ -73,13 +73,11 @@ class KafkaRaftServer(
controllerQuorumVotersFuture
)
private val metaLogShim = new MetaLogRaftShim(raftManager.kafkaRaftClient, config.nodeId)
private val broker: Option[BrokerServer] = if (config.processRoles.contains(BrokerRole)) {
Some(new BrokerServer(
config,
metaProps,
metaLogShim,
raftManager,
time,
metrics,
threadNamePrefix,
@ -95,7 +93,6 @@ class KafkaRaftServer(
Some(new ControllerServer(
metaProps,
config,
metaLogShim,
raftManager,
time,
metrics,

View File

@ -16,8 +16,8 @@
*/
package kafka.server.metadata
import java.util
import java.util.concurrent.TimeUnit
import kafka.coordinator.group.GroupCoordinator
import kafka.coordinator.transaction.TransactionCoordinator
import kafka.metrics.KafkaMetricsGroup
@ -27,9 +27,12 @@ import org.apache.kafka.common.metadata.MetadataRecordType._
import org.apache.kafka.common.metadata._
import org.apache.kafka.common.protocol.ApiMessage
import org.apache.kafka.common.utils.{LogContext, Time}
import org.apache.kafka.metalog.{MetaLogLeader, MetaLogListener}
import org.apache.kafka.queue.{EventQueue, KafkaEventQueue}
import org.apache.kafka.raft.{Batch, BatchReader, LeaderAndEpoch, RaftClient}
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.snapshot.SnapshotReader
import scala.compat.java8.OptionConverters._
import scala.jdk.CollectionConverters._
object BrokerMetadataListener{
@ -37,16 +40,17 @@ object BrokerMetadataListener{
val MetadataBatchSizes = "MetadataBatchSizes"
}
class BrokerMetadataListener(brokerId: Int,
time: Time,
metadataCache: RaftMetadataCache,
configRepository: CachedConfigRepository,
groupCoordinator: GroupCoordinator,
replicaManager: RaftReplicaManager,
txnCoordinator: TransactionCoordinator,
threadNamePrefix: Option[String],
clientQuotaManager: ClientQuotaMetadataManager
) extends MetaLogListener with KafkaMetricsGroup {
class BrokerMetadataListener(
brokerId: Int,
time: Time,
metadataCache: RaftMetadataCache,
configRepository: CachedConfigRepository,
groupCoordinator: GroupCoordinator,
replicaManager: RaftReplicaManager,
txnCoordinator: TransactionCoordinator,
threadNamePrefix: Option[String],
clientQuotaManager: ClientQuotaMetadataManager
) extends RaftClient.Listener[ApiMessageAndVersion] with KafkaMetricsGroup {
private val logContext = new LogContext(s"[BrokerMetadataListener id=${brokerId}] ")
private val log = logContext.logger(classOf[BrokerMetadataListener])
logIdent = logContext.logPrefix()
@ -73,21 +77,42 @@ class BrokerMetadataListener(brokerId: Int,
/**
* Handle new metadata records.
*/
override def handleCommits(lastOffset: Long, records: util.List[ApiMessage]): Unit = {
eventQueue.append(new HandleCommitsEvent(lastOffset, records))
override def handleCommit(reader: BatchReader[ApiMessageAndVersion]): Unit = {
eventQueue.append(new HandleCommitsEvent(reader))
}
// Visible for testing. It's useful to execute events synchronously
private[metadata] def execCommits(lastOffset: Long, records: util.List[ApiMessage]): Unit = {
new HandleCommitsEvent(lastOffset, records).run()
/**
* Handle metadata snapshots
*/
override def handleSnapshot(reader: SnapshotReader[ApiMessageAndVersion]): Unit = {
// Loading snapshot on the broker is currently not supported.
reader.close();
throw new UnsupportedOperationException(s"Loading snapshot (${reader.snapshotId()}) is not supported")
}
class HandleCommitsEvent(lastOffset: Long,
records: util.List[ApiMessage])
extends EventQueue.FailureLoggingEvent(log) {
// Visible for testing. It's useful to execute events synchronously in order
// to make tests deterministic. This object is responsible for closing the reader.
private[metadata] def execCommits(batchReader: BatchReader[ApiMessageAndVersion]): Unit = {
new HandleCommitsEvent(batchReader).run()
}
class HandleCommitsEvent(
reader: BatchReader[ApiMessageAndVersion]
) extends EventQueue.FailureLoggingEvent(log) {
override def run(): Unit = {
try {
apply(reader.next())
} finally {
reader.close()
}
}
private def apply(batch: Batch[ApiMessageAndVersion]): Unit = {
val records = batch.records
val lastOffset = batch.lastOffset
if (isDebugEnabled) {
debug(s"Metadata batch ${lastOffset}: handling ${records.size()} record(s).")
debug(s"Metadata batch $lastOffset: handling ${records.size()} record(s).")
}
val imageBuilder =
MetadataImageBuilder(brokerId, log, metadataCache.currentImage())
@ -100,37 +125,37 @@ class BrokerMetadataListener(brokerId: Int,
trace("Metadata batch %d: processing [%d/%d]: %s.".format(lastOffset, index + 1,
records.size(), record.toString))
}
handleMessage(imageBuilder, record, lastOffset)
handleMessage(imageBuilder, record.message, lastOffset)
} catch {
case e: Exception => error(s"Unable to handle record ${index} in batch " +
s"ending at offset ${lastOffset}", e)
case e: Exception => error(s"Unable to handle record $index in batch " +
s"ending at offset $lastOffset", e)
}
index = index + 1
}
if (imageBuilder.hasChanges) {
val newImage = imageBuilder.build()
if (isTraceEnabled) {
trace(s"Metadata batch ${lastOffset}: creating new metadata image ${newImage}")
trace(s"Metadata batch $lastOffset: creating new metadata image ${newImage}")
} else if (isDebugEnabled) {
debug(s"Metadata batch ${lastOffset}: creating new metadata image")
debug(s"Metadata batch $lastOffset: creating new metadata image")
}
metadataCache.image(newImage)
} else if (isDebugEnabled) {
debug(s"Metadata batch ${lastOffset}: no new metadata image required.")
debug(s"Metadata batch $lastOffset: no new metadata image required.")
}
if (imageBuilder.hasPartitionChanges) {
if (isDebugEnabled) {
debug(s"Metadata batch ${lastOffset}: applying partition changes")
debug(s"Metadata batch $lastOffset: applying partition changes")
}
replicaManager.handleMetadataRecords(imageBuilder, lastOffset,
RequestHandlerHelper.onLeadershipChange(groupCoordinator, txnCoordinator, _, _))
} else if (isDebugEnabled) {
debug(s"Metadata batch ${lastOffset}: no partition changes found.")
debug(s"Metadata batch $lastOffset: no partition changes found.")
}
_highestMetadataOffset = lastOffset
val endNs = time.nanoseconds()
val deltaUs = TimeUnit.MICROSECONDS.convert(endNs - startNs, TimeUnit.NANOSECONDS)
debug(s"Metadata batch ${lastOffset}: advanced highest metadata offset in ${deltaUs} " +
debug(s"Metadata batch $lastOffset: advanced highest metadata offset in ${deltaUs} " +
"microseconds.")
batchProcessingTimeHist.update(deltaUs)
}
@ -234,21 +259,17 @@ class BrokerMetadataListener(brokerId: Int,
clientQuotaManager.handleQuotaRecord(record)
}
class HandleNewLeaderEvent(leader: MetaLogLeader)
class HandleNewLeaderEvent(leaderAndEpoch: LeaderAndEpoch)
extends EventQueue.FailureLoggingEvent(log) {
override def run(): Unit = {
val imageBuilder =
MetadataImageBuilder(brokerId, log, metadataCache.currentImage())
if (leader.nodeId() < 0) {
imageBuilder.controllerId(None)
} else {
imageBuilder.controllerId(Some(leader.nodeId()))
}
imageBuilder.controllerId(leaderAndEpoch.leaderId.asScala)
metadataCache.image(imageBuilder.build())
}
}
override def handleNewLeader(leader: MetaLogLeader): Unit = {
override def handleLeaderChange(leader: LeaderAndEpoch): Unit = {
eventQueue.append(new HandleNewLeaderEvent(leader))
}

View File

@ -30,7 +30,7 @@ import org.apache.kafka.common.metadata.{MetadataJsonConverters, MetadataRecordT
import org.apache.kafka.common.protocol.ByteBufferAccessor
import org.apache.kafka.common.record._
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.raft.metadata.MetadataRecordSerde
import org.apache.kafka.metadata.MetadataRecordSerde
import scala.jdk.CollectionConverters._
import scala.collection.mutable

View File

@ -19,6 +19,7 @@ package kafka.tools
import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}
import java.util.concurrent.{CompletableFuture, CountDownLatch, LinkedBlockingDeque, TimeUnit}
import joptsimple.OptionException
import kafka.network.SocketServer
import kafka.raft.{KafkaRaftManager, RaftManager}
@ -35,7 +36,7 @@ import org.apache.kafka.common.security.scram.internals.ScramMechanism
import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.{TopicPartition, Uuid, protocol}
import org.apache.kafka.raft.{Batch, BatchReader, RaftClient, RaftConfig}
import org.apache.kafka.raft.{Batch, BatchReader, LeaderAndEpoch, RaftClient, RaftConfig}
import org.apache.kafka.server.common.serialization.RecordSerde
import org.apache.kafka.snapshot.SnapshotReader
@ -165,12 +166,12 @@ class TestRaftServer(
raftManager.register(this)
override def handleClaim(epoch: Int): Unit = {
eventQueue.offer(HandleClaim(epoch))
}
override def handleResign(epoch: Int): Unit = {
eventQueue.offer(HandleResign)
override def handleLeaderChange(newLeaderAndEpoch: LeaderAndEpoch): Unit = {
if (newLeaderAndEpoch.isLeader(config.nodeId)) {
eventQueue.offer(HandleClaim(newLeaderAndEpoch.epoch))
} else if (claimedEpoch.isDefined) {
eventQueue.offer(HandleResign)
}
}
override def handleCommit(reader: BatchReader[Array[Byte]]): Unit = {

View File

@ -36,11 +36,9 @@ import org.apache.kafka.common.utils.ThreadUtils;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.controller.Controller;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.metalog.MetaLogManager;
import org.apache.kafka.metadata.MetadataRecordSerde;
import org.apache.kafka.raft.RaftConfig;
import org.apache.kafka.raft.metadata.MetaLogRaftShim;
import org.apache.kafka.raft.metadata.MetadataRecordSerde;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.test.TestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -175,11 +173,9 @@ public class KafkaClusterTestKit implements AutoCloseable {
KafkaRaftManager<ApiMessageAndVersion> raftManager = new KafkaRaftManager<>(
metaProperties, config, new MetadataRecordSerde(), metadataPartition, KafkaRaftServer.MetadataTopicId(),
Time.SYSTEM, new Metrics(), Option.apply(threadNamePrefix), connectFutureManager.future);
MetaLogManager metaLogShim = new MetaLogRaftShim(raftManager.kafkaRaftClient(), config.nodeId());
ControllerServer controller = new ControllerServer(
nodes.controllerProperties(node.id()),
config,
metaLogShim,
raftManager,
Time.SYSTEM,
new Metrics(),
@ -228,11 +224,10 @@ public class KafkaClusterTestKit implements AutoCloseable {
KafkaRaftManager<ApiMessageAndVersion> raftManager = new KafkaRaftManager<>(
metaProperties, config, new MetadataRecordSerde(), metadataPartition, KafkaRaftServer.MetadataTopicId(),
Time.SYSTEM, new Metrics(), Option.apply(threadNamePrefix), connectFutureManager.future);
MetaLogManager metaLogShim = new MetaLogRaftShim(raftManager.kafkaRaftClient(), config.nodeId());
BrokerServer broker = new BrokerServer(
config,
nodes.brokerProperties(node.id()),
metaLogShim,
raftManager,
Time.SYSTEM,
new Metrics(),
Option.apply(threadNamePrefix),

View File

@ -25,9 +25,11 @@ import kafka.server.RaftReplicaManager
import kafka.utils.Implicits._
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.metadata.{ConfigRecord, PartitionRecord, RemoveTopicRecord, TopicRecord}
import org.apache.kafka.common.protocol.ApiMessage
import org.apache.kafka.common.utils.MockTime
import org.apache.kafka.common.{TopicPartition, Uuid}
import org.apache.kafka.raft.Batch
import org.apache.kafka.raft.internals.MemoryBatchReader;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
import org.mockito.ArgumentMatchers._
@ -39,6 +41,7 @@ import scala.jdk.CollectionConverters._
class BrokerMetadataListenerTest {
private val leaderEpoch = 5
private val brokerId = 1
private val time = new MockTime()
private val configRepository = new CachedConfigRepository
@ -82,11 +85,10 @@ class BrokerMetadataListenerTest {
): Unit = {
val deleteRecord = new RemoveTopicRecord()
.setTopicId(topicId)
lastMetadataOffset += 1
listener.execCommits(lastOffset = lastMetadataOffset, List[ApiMessage](
deleteRecord,
).asJava)
applyBatch(List[ApiMessageAndVersion](
new ApiMessageAndVersion(deleteRecord, 0.toShort),
))
assertFalse(metadataCache.contains(topic))
assertEquals(new Properties, configRepository.topicConfig(topic))
@ -108,6 +110,25 @@ class BrokerMetadataListenerTest {
assertEquals(localPartitions, localRemoved.map(_.toTopicPartition).toSet)
}
private def applyBatch(
records: List[ApiMessageAndVersion]
): Unit = {
val baseOffset = lastMetadataOffset + 1
lastMetadataOffset += records.size
listener.execCommits(
new MemoryBatchReader(
List(
Batch.of(
baseOffset,
leaderEpoch,
records.asJava
)
).asJava,
reader => ()
)
)
}
private def createAndAssert(
topicId: Uuid,
topic: String,
@ -115,11 +136,10 @@ class BrokerMetadataListenerTest {
numPartitions: Int,
numBrokers: Int
): Set[TopicPartition] = {
val records = new java.util.ArrayList[ApiMessage]
records.add(new TopicRecord()
val records = mutable.ListBuffer.empty[ApiMessageAndVersion]
records += new ApiMessageAndVersion(new TopicRecord()
.setName(topic)
.setTopicId(topicId)
)
.setTopicId(topicId), 0)
val localTopicPartitions = mutable.Set.empty[TopicPartition]
(0 until numPartitions).map { partitionId =>
@ -134,28 +154,25 @@ class BrokerMetadataListenerTest {
localTopicPartitions.add(new TopicPartition(topic, partitionId))
}
records.add(new PartitionRecord()
records += new ApiMessageAndVersion(new PartitionRecord()
.setTopicId(topicId)
.setPartitionId(partitionId)
.setLeader(preferredLeaderId)
.setLeaderEpoch(0)
.setPartitionEpoch(0)
.setReplicas(replicas)
.setIsr(replicas)
)
.setIsr(replicas), 0)
}
topicConfig.forKeyValue { (key, value) =>
records.add(new ConfigRecord()
records += new ApiMessageAndVersion(new ConfigRecord()
.setResourceName(topic)
.setResourceType(ConfigResource.Type.TOPIC.id())
.setName(key)
.setValue(value)
)
.setValue(value), 0)
}
lastMetadataOffset += records.size()
listener.execCommits(lastOffset = lastMetadataOffset, records)
applyBatch(records.toList)
assertTrue(metadataCache.contains(topic))
assertEquals(Some(numPartitions), metadataCache.numPartitions(topic))
assertEquals(topicConfig, configRepository.topicConfig(topic).asScala)

View File

@ -30,7 +30,7 @@ import org.apache.kafka.common.metadata.{PartitionChangeRecord, RegisterBrokerRe
import org.apache.kafka.common.protocol.{ByteBufferAccessor, ObjectSerializationCache}
import org.apache.kafka.common.record.{CompressionType, MemoryRecords, SimpleRecord}
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.raft.metadata.MetadataRecordSerde
import org.apache.kafka.metadata.MetadataRecordSerde
import org.apache.kafka.server.common.ApiMessageAndVersion
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}

View File

@ -17,22 +17,6 @@
package org.apache.kafka.controller;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map.Entry;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.AlterConfigOp.OpType;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.config.ConfigDef;
@ -80,15 +64,36 @@ import org.apache.kafka.metadata.BrokerHeartbeatReply;
import org.apache.kafka.metadata.BrokerRegistrationReply;
import org.apache.kafka.metadata.FeatureMapAndEpoch;
import org.apache.kafka.metadata.VersionRange;
import org.apache.kafka.metalog.MetaLogLeader;
import org.apache.kafka.metalog.MetaLogListener;
import org.apache.kafka.metalog.MetaLogManager;
import org.apache.kafka.queue.EventQueue.EarliestDeadlineFunction;
import org.apache.kafka.queue.EventQueue;
import org.apache.kafka.queue.EventQueue.EarliestDeadlineFunction;
import org.apache.kafka.queue.KafkaEventQueue;
import org.apache.kafka.raft.Batch;
import org.apache.kafka.raft.BatchReader;
import org.apache.kafka.raft.LeaderAndEpoch;
import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.raft.RaftClient;
import org.apache.kafka.snapshot.SnapshotReader;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.slf4j.Logger;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map.Entry;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import static java.util.concurrent.TimeUnit.MICROSECONDS;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
@ -120,13 +125,12 @@ public final class QuorumController implements Controller {
private String threadNamePrefix = null;
private LogContext logContext = null;
private Map<ConfigResource.Type, ConfigDef> configDefs = Collections.emptyMap();
private MetaLogManager logManager = null;
private RaftClient<ApiMessageAndVersion> raftClient = null;
private Map<String, VersionRange> supportedFeatures = Collections.emptyMap();
private short defaultReplicationFactor = 3;
private int defaultNumPartitions = 1;
private ReplicaPlacer replicaPlacer = new StripedReplicaPlacer(new Random());
private Function<Long, SnapshotWriter> snapshotWriterBuilder;
private SnapshotReader snapshotReader;
private long sessionTimeoutNs = NANOSECONDS.convert(18, TimeUnit.SECONDS);
private ControllerMetrics controllerMetrics = null;
@ -154,8 +158,8 @@ public final class QuorumController implements Controller {
return this;
}
public Builder setLogManager(MetaLogManager logManager) {
this.logManager = logManager;
public Builder setRaftClient(RaftClient<ApiMessageAndVersion> logManager) {
this.raftClient = logManager;
return this;
}
@ -184,11 +188,6 @@ public final class QuorumController implements Controller {
return this;
}
public Builder setSnapshotReader(SnapshotReader snapshotReader) {
this.snapshotReader = snapshotReader;
return this;
}
public Builder setSessionTimeoutNs(long sessionTimeoutNs) {
this.sessionTimeoutNs = sessionTimeoutNs;
return this;
@ -201,8 +200,8 @@ public final class QuorumController implements Controller {
@SuppressWarnings("unchecked")
public QuorumController build() throws Exception {
if (logManager == null) {
throw new RuntimeException("You must set a metadata log manager.");
if (raftClient == null) {
throw new RuntimeException("You must set a raft client.");
}
if (threadNamePrefix == null) {
threadNamePrefix = String.format("Node%d_", nodeId);
@ -217,21 +216,16 @@ public final class QuorumController implements Controller {
if (snapshotWriterBuilder == null) {
snapshotWriterBuilder = new NoOpSnapshotWriterBuilder();
}
if (snapshotReader == null) {
snapshotReader = new EmptySnapshotReader(-1);
}
KafkaEventQueue queue = null;
try {
queue = new KafkaEventQueue(time, logContext, threadNamePrefix);
return new QuorumController(logContext, nodeId, queue, time, configDefs,
logManager, supportedFeatures, defaultReplicationFactor,
raftClient, supportedFeatures, defaultReplicationFactor,
defaultNumPartitions, replicaPlacer, snapshotWriterBuilder,
snapshotReader, sessionTimeoutNs, controllerMetrics);
sessionTimeoutNs, controllerMetrics);
} catch (Exception e) {
Utils.closeQuietly(queue, "event queue");
throw e;
} finally {
Utils.closeQuietly(snapshotReader, "snapshotReader");
}
}
}
@ -240,12 +234,12 @@ public final class QuorumController implements Controller {
"The active controller appears to be node ";
private NotControllerException newNotControllerException() {
int latestController = logManager.leader().nodeId();
if (latestController < 0) {
return new NotControllerException("No controller appears to be active.");
} else {
OptionalInt latestController = raftClient.leaderAndEpoch().leaderId();
if (latestController.isPresent()) {
return new NotControllerException(ACTIVE_CONTROLLER_EXCEPTION_TEXT_PREFIX +
latestController);
latestController.getAsInt());
} else {
return new NotControllerException("No controller appears to be active.");
}
}
@ -536,7 +530,7 @@ public final class QuorumController implements Controller {
public void run() throws Exception {
long now = time.nanoseconds();
controllerMetrics.updateEventQueueTime(NANOSECONDS.toMillis(now - eventCreatedTimeNs));
long controllerEpoch = curClaimEpoch;
int controllerEpoch = curClaimEpoch;
if (controllerEpoch == -1) {
throw newNotControllerException();
}
@ -565,19 +559,19 @@ public final class QuorumController implements Controller {
} else {
// If the operation returned a batch of records, those records need to be
// written before we can return our result to the user. Here, we hand off
// the batch of records to the metadata log manager. They will be written
// out asynchronously.
// the batch of records to the raft client. They will be written out
// asynchronously.
final long offset;
if (result.isAtomic()) {
offset = logManager.scheduleAtomicWrite(controllerEpoch, result.records());
offset = raftClient.scheduleAtomicAppend(controllerEpoch, result.records());
} else {
offset = logManager.scheduleWrite(controllerEpoch, result.records());
offset = raftClient.scheduleAppend(controllerEpoch, result.records());
}
op.processBatchEndOffset(offset);
writeOffset = offset;
resultAndOffset = ControllerResultAndOffset.of(offset, result);
for (ApiMessageAndVersion message : result.records()) {
replay(message.message(), -1, offset);
replay(message.message(), Optional.empty(), offset);
}
snapshotRegistry.createSnapshot(offset);
log.debug("Read-write operation {} will be completed when the log " +
@ -623,50 +617,126 @@ public final class QuorumController implements Controller {
return event.future();
}
class QuorumMetaLogListener implements MetaLogListener {
class QuorumMetaLogListener implements RaftClient.Listener<ApiMessageAndVersion> {
@Override
public void handleCommits(long offset, List<ApiMessage> messages) {
appendControlEvent("handleCommits[" + offset + "]", () -> {
if (curClaimEpoch == -1) {
// If the controller is a standby, replay the records that were
// created by the active controller.
if (log.isDebugEnabled()) {
if (log.isTraceEnabled()) {
log.trace("Replaying commits from the active node up to " +
"offset {}: {}.", offset, messages.stream().
map(m -> m.toString()).collect(Collectors.joining(", ")));
public void handleCommit(BatchReader<ApiMessageAndVersion> reader) {
appendControlEvent("handleCommits[baseOffset=" + reader.baseOffset() + "]", () -> {
try {
boolean isActiveController = curClaimEpoch != -1;
while (reader.hasNext()) {
Batch<ApiMessageAndVersion> batch = reader.next();
long offset = batch.lastOffset();
List<ApiMessageAndVersion> messages = batch.records();
if (isActiveController) {
// If the controller is active, the records were already replayed,
// so we don't need to do it here.
log.debug("Completing purgatory items up to offset {}.", offset);
// Complete any events in the purgatory that were waiting for this offset.
purgatory.completeUpTo(offset);
// Delete all the in-memory snapshots that we no longer need.
// If we are writing a new snapshot, then we need to keep that around;
// otherwise, we should delete up to the current committed offset.
snapshotRegistry.deleteSnapshotsUpTo(
Math.min(offset, snapshotGeneratorManager.snapshotEpoch()));
} else {
log.debug("Replaying commits from the active node up to " +
"offset {}.", offset);
// If the controller is a standby, replay the records that were
// created by the active controller.
if (log.isDebugEnabled()) {
if (log.isTraceEnabled()) {
log.trace("Replaying commits from the active node up to " +
"offset {}: {}.", offset, messages.stream()
.map(ApiMessageAndVersion::toString)
.collect(Collectors.joining(", ")));
} else {
log.debug("Replaying commits from the active node up to " +
"offset {}.", offset);
}
}
for (ApiMessageAndVersion messageAndVersion : messages) {
replay(messageAndVersion.message(), Optional.empty(), offset);
}
}
lastCommittedOffset = offset;
}
for (ApiMessage message : messages) {
replay(message, -1, offset);
}
} else {
// If the controller is active, the records were already replayed,
// so we don't need to do it here.
log.debug("Completing purgatory items up to offset {}.", offset);
// Complete any events in the purgatory that were waiting for this offset.
purgatory.completeUpTo(offset);
// Delete all the in-memory snapshots that we no longer need.
// If we are writing a new snapshot, then we need to keep that around;
// otherwise, we should delete up to the current committed offset.
snapshotRegistry.deleteSnapshotsUpTo(
Math.min(offset, snapshotGeneratorManager.snapshotEpoch()));
} finally {
reader.close();
}
lastCommittedOffset = offset;
});
}
@Override
public void handleNewLeader(MetaLogLeader newLeader) {
if (newLeader.nodeId() == nodeId) {
final long newEpoch = newLeader.epoch();
public void handleSnapshot(SnapshotReader<ApiMessageAndVersion> reader) {
appendControlEvent(String.format("handleSnapshot[snapshotId=%s]", reader.snapshotId()), () -> {
try {
boolean isActiveController = curClaimEpoch != -1;
if (isActiveController) {
throw new IllegalStateException(
String.format(
"Asked to load snasphot (%s) when it is the active controller (%s)",
reader.snapshotId(),
curClaimEpoch
)
);
}
if (lastCommittedOffset != -1) {
throw new IllegalStateException(
String.format(
"Asked to re-load snapshot (%s) after processing records up to %s",
reader.snapshotId(),
lastCommittedOffset
)
);
}
while (reader.hasNext()) {
Batch<ApiMessageAndVersion> batch = reader.next();
long offset = batch.lastOffset();
List<ApiMessageAndVersion> messages = batch.records();
if (log.isDebugEnabled()) {
if (log.isTraceEnabled()) {
log.trace(
"Replaying snapshot ({}) batch with last offset of {}: {}",
reader.snapshotId(),
offset,
messages
.stream()
.map(ApiMessageAndVersion::toString)
.collect(Collectors.joining(", "))
);
} else {
log.debug(
"Replaying snapshot ({}) batch with last offset of {}",
reader.snapshotId(),
offset
);
}
}
for (ApiMessageAndVersion messageAndVersion : messages) {
replay(messageAndVersion.message(), Optional.of(reader.snapshotId()), offset);
}
}
lastCommittedOffset = reader.snapshotId().offset - 1;
snapshotRegistry.createSnapshot(lastCommittedOffset);
} finally {
reader.close();
}
});
}
@Override
public void handleLeaderChange(LeaderAndEpoch newLeader) {
if (newLeader.isLeader(nodeId)) {
final int newEpoch = newLeader.epoch();
appendControlEvent("handleClaim[" + newEpoch + "]", () -> {
long curEpoch = curClaimEpoch;
int curEpoch = curClaimEpoch;
if (curEpoch != -1) {
throw new RuntimeException("Tried to claim controller epoch " +
newEpoch + ", but we never renounced controller epoch " +
@ -678,19 +748,14 @@ public final class QuorumController implements Controller {
writeOffset = lastCommittedOffset;
clusterControl.activate();
});
}
}
@Override
public void handleRenounce(long oldEpoch) {
appendControlEvent("handleRenounce[" + oldEpoch + "]", () -> {
if (curClaimEpoch == oldEpoch) {
} else if (curClaimEpoch != -1) {
appendControlEvent("handleRenounce[" + curClaimEpoch + "]", () -> {
log.warn("Renouncing the leadership at oldEpoch {} due to a metadata " +
"log event. Reverting to last committed offset {}.", curClaimEpoch,
lastCommittedOffset);
"log event. Reverting to last committed offset {}.", curClaimEpoch,
lastCommittedOffset);
renounce();
}
});
});
}
}
@Override
@ -753,7 +818,7 @@ public final class QuorumController implements Controller {
}
@SuppressWarnings("unchecked")
private void replay(ApiMessage message, long snapshotEpoch, long offset) {
private void replay(ApiMessage message, Optional<OffsetAndEpoch> snapshotId, long offset) {
try {
MetadataRecordType type = MetadataRecordType.fromId(message.apiKey());
switch (type) {
@ -794,12 +859,12 @@ public final class QuorumController implements Controller {
throw new RuntimeException("Unhandled record type " + type);
}
} catch (Exception e) {
if (snapshotEpoch < 0) {
log.error("Error replaying record {} at offset {}.",
message.toString(), offset, e);
if (snapshotId.isPresent()) {
log.error("Error replaying record {} from snapshot {} at last offset {}.",
message.toString(), snapshotId.get(), offset, e);
} else {
log.error("Error replaying record {} from snapshot {} at index {}.",
message.toString(), snapshotEpoch, offset, e);
log.error("Error replaying record {} at last offset {}.",
message.toString(), offset, e);
}
}
}
@ -878,7 +943,7 @@ public final class QuorumController implements Controller {
/**
* The interface that we use to mutate the Raft log.
*/
private final MetaLogManager logManager;
private final RaftClient<ApiMessageAndVersion> raftClient;
/**
* The interface that receives callbacks from the Raft log. These callbacks are
@ -891,7 +956,7 @@ public final class QuorumController implements Controller {
* Otherwise, this is -1. This variable must be modified only from the controller
* thread, but it can be read from other threads.
*/
private volatile long curClaimEpoch;
private volatile int curClaimEpoch;
/**
* The last offset we have committed, or -1 if we have not committed any offsets.
@ -908,15 +973,14 @@ public final class QuorumController implements Controller {
KafkaEventQueue queue,
Time time,
Map<ConfigResource.Type, ConfigDef> configDefs,
MetaLogManager logManager,
RaftClient<ApiMessageAndVersion> raftClient,
Map<String, VersionRange> supportedFeatures,
short defaultReplicationFactor,
int defaultNumPartitions,
ReplicaPlacer replicaPlacer,
Function<Long, SnapshotWriter> snapshotWriterBuilder,
SnapshotReader snapshotReader,
long sessionTimeoutNs,
ControllerMetrics controllerMetrics) throws Exception {
ControllerMetrics controllerMetrics) {
this.logContext = logContext;
this.log = logContext.logger(QuorumController.class);
this.nodeId = nodeId;
@ -935,21 +999,14 @@ public final class QuorumController implements Controller {
this.replicationControl = new ReplicationControlManager(snapshotRegistry,
logContext, defaultReplicationFactor, defaultNumPartitions,
configurationControl, clusterControl, controllerMetrics);
this.logManager = logManager;
this.raftClient = raftClient;
this.metaLogListener = new QuorumMetaLogListener();
this.curClaimEpoch = -1L;
this.lastCommittedOffset = snapshotReader.epoch();
this.curClaimEpoch = -1;
this.lastCommittedOffset = -1L;
this.writeOffset = -1L;
while (snapshotReader.hasNext()) {
List<ApiMessage> batch = snapshotReader.next();
long index = 0;
for (ApiMessage message : batch) {
replay(message, snapshotReader.epoch(), index++);
}
}
snapshotRegistry.createSnapshot(lastCommittedOffset);
this.logManager.register(metaLogListener);
this.raftClient.register(metaLogListener);
}
@Override

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.raft.metadata;
package org.apache.kafka.metadata;
import org.apache.kafka.common.metadata.MetadataRecordType;
import org.apache.kafka.common.protocol.ApiMessage;

View File

@ -1,58 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.metalog;
import java.util.Objects;
/**
* The current leader of the MetaLog.
*/
public class MetaLogLeader {
private final int nodeId;
private final long epoch;
public MetaLogLeader(int nodeId, long epoch) {
this.nodeId = nodeId;
this.epoch = epoch;
}
public int nodeId() {
return nodeId;
}
public long epoch() {
return epoch;
}
@Override
public boolean equals(Object o) {
if (!(o instanceof MetaLogLeader)) return false;
MetaLogLeader other = (MetaLogLeader) o;
return other.nodeId == nodeId && other.epoch == epoch;
}
@Override
public int hashCode() {
return Objects.hash(nodeId, epoch);
}
@Override
public String toString() {
return "MetaLogLeader(nodeId=" + nodeId + ", epoch=" + epoch + ")";
}
}

View File

@ -1,55 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.metalog;
import org.apache.kafka.common.protocol.ApiMessage;
import java.util.List;
/**
* Listeners receive notifications from the MetaLogManager.
*/
public interface MetaLogListener {
/**
* Called when the MetaLogManager commits some messages.
*
* @param lastOffset The last offset found in all the given messages.
* @param messages The messages.
*/
void handleCommits(long lastOffset, List<ApiMessage> messages);
/**
* Called when a new leader is elected.
*
* @param leader The new leader id and epoch.
*/
default void handleNewLeader(MetaLogLeader leader) {}
/**
* Called when the MetaLogManager has renounced the leadership.
*
* @param epoch The controller epoch that has ended.
*/
default void handleRenounce(long epoch) {}
/**
* Called when the MetaLogManager has finished shutting down, and wants to tell its
* listener that it is safe to shut down as well.
*/
default void beginShutdown() {}
}

View File

@ -1,96 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.metalog;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import java.util.List;
/**
* The MetaLogManager handles storing metadata and electing leaders.
*/
public interface MetaLogManager {
/**
* Start this meta log manager.
* The manager must be ready to accept incoming calls after this function returns.
* It is an error to initialize a MetaLogManager more than once.
*/
void initialize() throws Exception;
/**
* Register the listener. The manager must be initialized already.
* The listener must be ready to accept incoming calls immediately.
*
* @param listener The listener to register.
*/
void register(MetaLogListener listener) throws Exception;
/**
* Schedule a write to the log.
*
* The write will be scheduled to happen at some time in the future. There is no
* error return or exception thrown if the write fails. Instead, the listener may
* regard the write as successful if and only if the MetaLogManager reaches the given
* offset before renouncing its leadership. The listener should determine this by
* monitoring the committed offsets.
*
* @param epoch the controller epoch
* @param batch the batch of messages to write
*
* @return the offset of the last message in the batch
* @throws IllegalArgumentException if buffer allocatio failed and the client should backoff
*/
long scheduleWrite(long epoch, List<ApiMessageAndVersion> batch);
/**
* Schedule a atomic write to the log.
*
* The write will be scheduled to happen at some time in the future. All of the messages in batch
* will be appended atomically in one batch. The listener may regard the write as successful
* if and only if the MetaLogManager reaches the given offset before renouncing its leadership.
* The listener should determine this by monitoring the committed offsets.
*
* @param epoch the controller epoch
* @param batch the batch of messages to write
*
* @return the offset of the last message in the batch
* @throws IllegalArgumentException if buffer allocatio failed and the client should backoff
*/
long scheduleAtomicWrite(long epoch, List<ApiMessageAndVersion> batch);
/**
* Renounce the leadership.
*
* @param epoch The epoch. If this does not match the current epoch, this
* call will be ignored.
*/
void renounce(long epoch);
/**
* Returns the current leader. The active node may change immediately after this
* function is called, of course.
*/
MetaLogLeader leader();
/**
* Returns the node id.
*/
int nodeId();
}

View File

@ -17,13 +17,20 @@
package org.apache.kafka.controller;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.memory.MemoryPool;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.metadata.MetadataRecordSerde;
import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.snapshot.MockRawSnapshotReader;
import org.apache.kafka.snapshot.MockRawSnapshotWriter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import java.nio.ByteBuffer;
class MockSnapshotWriter implements SnapshotWriter {
private final long epoch;
@ -80,15 +87,24 @@ class MockSnapshotWriter implements SnapshotWriter {
return batches;
}
public MockSnapshotReader toReader() {
List<List<ApiMessage>> readerBatches = new ArrayList<>();
for (List<ApiMessageAndVersion> batch : batches) {
List<ApiMessage> readerBatch = new ArrayList<>();
for (ApiMessageAndVersion messageAndVersion : batch) {
readerBatch.add(messageAndVersion.message());
}
readerBatches.add(readerBatch);
public MockRawSnapshotReader toReader() {
OffsetAndEpoch snapshotId = new OffsetAndEpoch(epoch, 0);
AtomicReference<ByteBuffer> buffer = new AtomicReference<>();
int maxBufferSize = 1024;
try (org.apache.kafka.snapshot.SnapshotWriter<ApiMessageAndVersion> snapshotWriter =
new org.apache.kafka.snapshot.SnapshotWriter<>(
new MockRawSnapshotWriter(snapshotId, buffer::set),
maxBufferSize,
MemoryPool.NONE,
new MockTime(),
CompressionType.NONE,
new MetadataRecordSerde()
)
) {
batches.forEach(snapshotWriter::append);
snapshotWriter.freeze();
}
return new MockSnapshotReader(epoch, readerBatches.iterator());
return new MockRawSnapshotReader(snapshotId, buffer.get());
}
}

View File

@ -23,6 +23,7 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
@ -33,8 +34,8 @@ import java.util.function.Function;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData;
import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData.ReassignableTopic;
import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData;
import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData;
import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
import org.apache.kafka.common.message.BrokerRegistrationRequestData.Listener;
@ -53,18 +54,18 @@ import org.apache.kafka.common.message.ElectLeadersResponseData;
import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData;
import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData;
import org.apache.kafka.common.metadata.PartitionRecord;
import org.apache.kafka.common.metadata.RegisterBrokerRecord;
import org.apache.kafka.common.metadata.RegisterBrokerRecord.BrokerEndpoint;
import org.apache.kafka.common.metadata.RegisterBrokerRecord.BrokerEndpointCollection;
import org.apache.kafka.common.metadata.RegisterBrokerRecord;
import org.apache.kafka.common.metadata.TopicRecord;
import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.controller.BrokersToIsrs.TopicIdPartition;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.metadata.BrokerHeartbeatReply;
import org.apache.kafka.metadata.BrokerRegistrationReply;
import org.apache.kafka.metalog.LocalLogManagerTestEnv;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.slf4j.Logger;
@ -91,7 +92,7 @@ public class QuorumControllerTest {
*/
@Test
public void testCreateAndClose() throws Throwable {
try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1)) {
try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, Optional.empty())) {
try (QuorumControllerTestEnv controlEnv =
new QuorumControllerTestEnv(logEnv, __ -> { })) {
}
@ -103,7 +104,7 @@ public class QuorumControllerTest {
*/
@Test
public void testConfigurationOperations() throws Throwable {
try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1)) {
try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, Optional.empty())) {
try (QuorumControllerTestEnv controlEnv =
new QuorumControllerTestEnv(logEnv, b -> b.setConfigDefs(CONFIGS))) {
testConfigurationOperations(controlEnv.activeController());
@ -134,7 +135,7 @@ public class QuorumControllerTest {
*/
@Test
public void testDelayedConfigurationOperations() throws Throwable {
try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1)) {
try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, Optional.empty())) {
try (QuorumControllerTestEnv controlEnv =
new QuorumControllerTestEnv(logEnv, b -> b.setConfigDefs(CONFIGS))) {
testDelayedConfigurationOperations(logEnv, controlEnv.activeController());
@ -160,7 +161,7 @@ public class QuorumControllerTest {
@Test
public void testUnregisterBroker() throws Throwable {
try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1)) {
try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, Optional.empty())) {
try (QuorumControllerTestEnv controlEnv =
new QuorumControllerTestEnv(logEnv, b -> b.setConfigDefs(CONFIGS))) {
ListenerCollection listeners = new ListenerCollection();
@ -229,7 +230,7 @@ public class QuorumControllerTest {
MockSnapshotWriter writer = null;
Map<Integer, Long> brokerEpochs = new HashMap<>();
Uuid fooId;
try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(3)) {
try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(3, Optional.empty())) {
try (QuorumControllerTestEnv controlEnv =
new QuorumControllerTestEnv(logEnv, b -> b.setConfigDefs(CONFIGS).
setSnapshotWriterBuilder(snapshotWriterBuilder))) {
@ -272,11 +273,11 @@ public class QuorumControllerTest {
writer.waitForCompletion();
checkSnapshotContents(fooId, brokerEpochs, writer.batches().iterator());
}
}
final MockSnapshotReader reader = writer.toReader();
try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(3, Optional.of(writer.toReader()))) {
try (QuorumControllerTestEnv controlEnv =
new QuorumControllerTestEnv(logEnv, b -> b.setConfigDefs(CONFIGS).
setSnapshotReader(reader).
setSnapshotWriterBuilder(snapshotWriterBuilder))) {
QuorumController active = controlEnv.activeController();
long snapshotEpoch = active.beginWritingSnapshot().get();
@ -347,7 +348,7 @@ public class QuorumControllerTest {
*/
@Test
public void testTimeouts() throws Throwable {
try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1)) {
try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, Optional.empty())) {
try (QuorumControllerTestEnv controlEnv =
new QuorumControllerTestEnv(logEnv, b -> b.setConfigDefs(CONFIGS))) {
QuorumController controller = controlEnv.activeController();
@ -403,7 +404,7 @@ public class QuorumControllerTest {
*/
@Test
public void testEarlyControllerResults() throws Throwable {
try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1)) {
try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, Optional.empty())) {
try (QuorumControllerTestEnv controlEnv =
new QuorumControllerTestEnv(logEnv, b -> b.setConfigDefs(CONFIGS))) {
QuorumController controller = controlEnv.activeController();

View File

@ -41,7 +41,7 @@ public class QuorumControllerTestEnv implements AutoCloseable {
try {
for (int i = 0; i < numControllers; i++) {
QuorumController.Builder builder = new QuorumController.Builder(i);
builder.setLogManager(logEnv.logManagers().get(i));
builder.setRaftClient(logEnv.logManagers().get(i));
builderConsumer.accept(builder);
this.controllers.add(builder.build());
}

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.raft.metadata;
package org.apache.kafka.metadata;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.SerializationException;
@ -69,4 +69,4 @@ class MetadataRecordSerdeTest {
() -> serde.read(new ByteBufferAccessor(buffer), 16));
}
}
}

View File

@ -17,22 +17,34 @@
package org.apache.kafka.metalog;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.metadata.MetadataRecordSerde;
import org.apache.kafka.queue.EventQueue;
import org.apache.kafka.queue.KafkaEventQueue;
import org.apache.kafka.raft.Batch;
import org.apache.kafka.raft.LeaderAndEpoch;
import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.raft.RaftClient;
import org.apache.kafka.raft.internals.MemoryBatchReader;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.snapshot.RawSnapshotReader;
import org.apache.kafka.snapshot.SnapshotWriter;
import org.apache.kafka.snapshot.SnapshotReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.AbstractMap.SimpleImmutableEntry;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
@ -43,15 +55,15 @@ import java.util.stream.Collectors;
/**
* The LocalLogManager is a test implementation that relies on the contents of memory.
*/
public final class LocalLogManager implements MetaLogManager, AutoCloseable {
public final class LocalLogManager implements RaftClient<ApiMessageAndVersion>, AutoCloseable {
interface LocalBatch {
int size();
}
static class LeaderChangeBatch implements LocalBatch {
private final MetaLogLeader newLeader;
private final LeaderAndEpoch newLeader;
LeaderChangeBatch(MetaLogLeader newLeader) {
LeaderChangeBatch(LeaderAndEpoch newLeader) {
this.newLeader = newLeader;
}
@ -80,9 +92,11 @@ public final class LocalLogManager implements MetaLogManager, AutoCloseable {
}
static class LocalRecordBatch implements LocalBatch {
private final List<ApiMessage> records;
private final long leaderEpoch;
private final List<ApiMessageAndVersion> records;
LocalRecordBatch(List<ApiMessage> records) {
LocalRecordBatch(long leaderEpoch, List<ApiMessageAndVersion> records) {
this.leaderEpoch = leaderEpoch;
this.records = records;
}
@ -126,16 +140,27 @@ public final class LocalLogManager implements MetaLogManager, AutoCloseable {
/**
* The current leader.
*/
private MetaLogLeader leader = new MetaLogLeader(-1, -1);
private LeaderAndEpoch leader = new LeaderAndEpoch(OptionalInt.empty(), 0);
/**
* The start offset of the last batch that was created, or -1 if no batches have
* been created.
*/
private long prevOffset = -1;
private long prevOffset;
private final Optional<RawSnapshotReader> snapshot;
public SharedLogData(Optional<RawSnapshotReader> snapshot) {
this.snapshot = snapshot;
if (snapshot.isPresent()) {
prevOffset = snapshot.get().snapshotId().offset - 1;
} else {
prevOffset = -1;
}
}
synchronized void registerLogManager(LocalLogManager logManager) {
if (logManagers.put(logManager.nodeId(), logManager) != null) {
if (logManagers.put(logManager.nodeId, logManager) != null) {
throw new RuntimeException("Can't have multiple LocalLogManagers " +
"with id " + logManager.nodeId());
}
@ -143,7 +168,7 @@ public final class LocalLogManager implements MetaLogManager, AutoCloseable {
}
synchronized void unregisterLogManager(LocalLogManager logManager) {
if (!logManagers.remove(logManager.nodeId(), logManager)) {
if (!logManagers.remove(logManager.nodeId, logManager)) {
throw new RuntimeException("Log manager " + logManager.nodeId() +
" was not found.");
}
@ -155,9 +180,9 @@ public final class LocalLogManager implements MetaLogManager, AutoCloseable {
"match the current leader epoch of {}.", nodeId, epoch, leader.epoch());
return Long.MAX_VALUE;
}
if (nodeId != leader.nodeId()) {
if (!leader.isLeader(nodeId)) {
log.trace("tryAppend(nodeId={}, epoch={}): the given node id does not " +
"match the current leader id of {}.", nodeId, epoch, leader.nodeId());
"match the current leader id of {}.", nodeId, epoch, leader.leaderId());
return Long.MAX_VALUE;
}
log.trace("tryAppend(nodeId={}): appending {}.", nodeId, batch);
@ -181,7 +206,7 @@ public final class LocalLogManager implements MetaLogManager, AutoCloseable {
}
synchronized void electLeaderIfNeeded() {
if (leader.nodeId() != -1 || logManagers.isEmpty()) {
if (leader.leaderId().isPresent() || logManagers.isEmpty()) {
return;
}
int nextLeaderIndex = ThreadLocalRandom.current().nextInt(logManagers.size());
@ -190,7 +215,7 @@ public final class LocalLogManager implements MetaLogManager, AutoCloseable {
for (int i = 0; i <= nextLeaderIndex; i++) {
nextLeaderNode = iter.next();
}
MetaLogLeader newLeader = new MetaLogLeader(nextLeaderNode, leader.epoch() + 1);
LeaderAndEpoch newLeader = new LeaderAndEpoch(OptionalInt.of(nextLeaderNode), leader.epoch() + 1);
log.info("Elected new leader: {}.", newLeader);
append(new LeaderChangeBatch(newLeader));
}
@ -202,13 +227,26 @@ public final class LocalLogManager implements MetaLogManager, AutoCloseable {
}
return new SimpleImmutableEntry<>(entry.getKey(), entry.getValue());
}
/**
* Optionally return a snapshot reader if the offset if less than the first batch.
*/
Optional<RawSnapshotReader> maybeNextSnapshot(long offset) {
return snapshot.flatMap(reader -> {
if (offset < reader.snapshotId().offset) {
return Optional.of(reader);
}
return Optional.empty();
});
}
}
private static class MetaLogListenerData {
private long offset = -1;
private final MetaLogListener listener;
private final RaftClient.Listener<ApiMessageAndVersion> listener;
MetaLogListenerData(MetaLogListener listener) {
MetaLogListenerData(RaftClient.Listener<ApiMessageAndVersion> listener) {
this.listener = listener;
}
}
@ -254,7 +292,7 @@ public final class LocalLogManager implements MetaLogManager, AutoCloseable {
/**
* The current leader, as seen by this log manager.
*/
private volatile MetaLogLeader leader = new MetaLogLeader(-1, -1);
private volatile LeaderAndEpoch leader = new LeaderAndEpoch(OptionalInt.empty(), 0);
public LocalLogManager(LogContext logContext,
int nodeId,
@ -274,6 +312,21 @@ public final class LocalLogManager implements MetaLogManager, AutoCloseable {
int numEntriesFound = 0;
for (MetaLogListenerData listenerData : listeners) {
while (true) {
// Load the snapshot if needed
Optional<RawSnapshotReader> snapshot = shared.maybeNextSnapshot(listenerData.offset);
if (snapshot.isPresent()) {
log.trace("Node {}: handling snapshot with id {}.", nodeId, snapshot.get().snapshotId());
listenerData.listener.handleSnapshot(
SnapshotReader.of(
snapshot.get(),
new MetadataRecordSerde(),
BufferSupplier.create(),
Integer.MAX_VALUE
)
);
listenerData.offset = snapshot.get().snapshotId().offset - 1;
}
Entry<Long, LocalBatch> entry = shared.nextBatch(listenerData.offset);
if (entry == null) {
log.trace("Node {}: reached the end of the log after finding " +
@ -291,7 +344,7 @@ public final class LocalLogManager implements MetaLogManager, AutoCloseable {
LeaderChangeBatch batch = (LeaderChangeBatch) entry.getValue();
log.trace("Node {}: handling LeaderChange to {}.",
nodeId, batch.newLeader);
listenerData.listener.handleNewLeader(batch.newLeader);
listenerData.listener.handleLeaderChange(batch.newLeader);
if (batch.newLeader.epoch() > leader.epoch()) {
leader = batch.newLeader;
}
@ -299,7 +352,18 @@ public final class LocalLogManager implements MetaLogManager, AutoCloseable {
LocalRecordBatch batch = (LocalRecordBatch) entry.getValue();
log.trace("Node {}: handling LocalRecordBatch with offset {}.",
nodeId, entryOffset);
listenerData.listener.handleCommits(entryOffset, batch.records);
listenerData.listener.handleCommit(
new MemoryBatchReader<>(
Collections.singletonList(
Batch.of(
entryOffset - batch.records.size() + 1,
Math.toIntExact(batch.leaderEpoch),
batch.records
)
),
reader -> { }
)
);
}
numEntriesFound++;
listenerData.offset = entryOffset;
@ -317,7 +381,7 @@ public final class LocalLogManager implements MetaLogManager, AutoCloseable {
try {
if (initialized && !shutdown) {
log.debug("Node {}: beginning shutdown.", nodeId);
renounce(leader.epoch());
resign(leader.epoch());
for (MetaLogListenerData listenerData : listeners) {
listenerData.listener.beginShutdown();
}
@ -331,14 +395,38 @@ public final class LocalLogManager implements MetaLogManager, AutoCloseable {
}
@Override
public void close() throws InterruptedException {
public void close() {
log.debug("Node {}: closing.", nodeId);
beginShutdown();
eventQueue.close();
try {
eventQueue.close();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}
/**
* Shutdown the log manager.
*
* Even though the API suggests a non-blocking shutdown, this method always returns a completed
* future. This means that shutdown is a blocking operation.
*/
@Override
public CompletableFuture<Void> shutdown(int timeoutMs) {
CompletableFuture<Void> shutdownFuture = new CompletableFuture<>();
try {
close();
shutdownFuture.complete(null);
} catch (Throwable t) {
shutdownFuture.completeExceptionally(t);
}
return shutdownFuture;
}
@Override
public void initialize() throws Exception {
public void initialize() {
eventQueue.append(() -> {
log.debug("initialized local log manager for node " + nodeId);
initialized = true;
@ -346,7 +434,7 @@ public final class LocalLogManager implements MetaLogManager, AutoCloseable {
}
@Override
public void register(MetaLogListener listener) throws Exception {
public void register(RaftClient.Listener<ApiMessageAndVersion> listener) {
CompletableFuture<Void> future = new CompletableFuture<>();
eventQueue.append(() -> {
if (shutdown) {
@ -366,47 +454,54 @@ public final class LocalLogManager implements MetaLogManager, AutoCloseable {
"LocalLogManager was not initialized."));
}
});
future.get();
try {
future.get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
}
@Override
public long scheduleWrite(long epoch, List<ApiMessageAndVersion> batch) {
return scheduleAtomicWrite(epoch, batch);
public Long scheduleAppend(int epoch, List<ApiMessageAndVersion> batch) {
return scheduleAtomicAppend(epoch, batch);
}
@Override
public long scheduleAtomicWrite(long epoch, List<ApiMessageAndVersion> batch) {
public Long scheduleAtomicAppend(int epoch, List<ApiMessageAndVersion> batch) {
return shared.tryAppend(
nodeId,
leader.epoch(),
new LocalRecordBatch(
batch
.stream()
.map(ApiMessageAndVersion::message)
.collect(Collectors.toList())
)
new LocalRecordBatch(leader.epoch(), batch)
);
}
@Override
public void renounce(long epoch) {
MetaLogLeader curLeader = leader;
MetaLogLeader nextLeader = new MetaLogLeader(-1, curLeader.epoch() + 1);
public void resign(int epoch) {
LeaderAndEpoch curLeader = leader;
LeaderAndEpoch nextLeader = new LeaderAndEpoch(OptionalInt.empty(), curLeader.epoch() + 1);
shared.tryAppend(nodeId, curLeader.epoch(), new LeaderChangeBatch(nextLeader));
}
@Override
public MetaLogLeader leader() {
public SnapshotWriter<ApiMessageAndVersion> createSnapshot(OffsetAndEpoch snapshotId) {
throw new UnsupportedOperationException();
}
@Override
public LeaderAndEpoch leaderAndEpoch() {
return leader;
}
@Override
public int nodeId() {
return nodeId;
public OptionalInt nodeId() {
return OptionalInt.of(nodeId);
}
public List<MetaLogListener> listeners() {
final CompletableFuture<List<MetaLogListener>> future = new CompletableFuture<>();
public List<RaftClient.Listener<ApiMessageAndVersion>> listeners() {
final CompletableFuture<List<RaftClient.Listener<ApiMessageAndVersion>>> future = new CompletableFuture<>();
eventQueue.append(() -> {
future.complete(listeners.stream().map(l -> l.listener).collect(Collectors.toList()));
});

View File

@ -18,15 +18,16 @@
package org.apache.kafka.metalog;
import org.apache.kafka.common.metadata.RegisterBrokerRecord;
import org.apache.kafka.raft.LeaderAndEpoch;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.stream.Collectors;
import static org.apache.kafka.metalog.MockMetaLogManagerListener.COMMIT;
@ -37,7 +38,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
@Timeout(value = 40)
public class LocalLogManagerTest {
private static final Logger log = LoggerFactory.getLogger(LocalLogManagerTest.class);
/**
* Test creating a LocalLogManager and closing it.
@ -45,7 +45,7 @@ public class LocalLogManagerTest {
@Test
public void testCreateAndClose() throws Exception {
try (LocalLogManagerTestEnv env =
LocalLogManagerTestEnv.createWithMockListeners(1)) {
LocalLogManagerTestEnv.createWithMockListeners(1, Optional.empty())) {
env.close();
assertEquals(null, env.firstError.get());
}
@ -57,8 +57,8 @@ public class LocalLogManagerTest {
@Test
public void testClaimsLeadership() throws Exception {
try (LocalLogManagerTestEnv env =
LocalLogManagerTestEnv.createWithMockListeners(1)) {
assertEquals(new MetaLogLeader(0, 0), env.waitForLeader());
LocalLogManagerTestEnv.createWithMockListeners(1, Optional.empty())) {
assertEquals(new LeaderAndEpoch(OptionalInt.of(0), 1), env.waitForLeader());
env.close();
assertEquals(null, env.firstError.get());
}
@ -70,12 +70,16 @@ public class LocalLogManagerTest {
@Test
public void testPassLeadership() throws Exception {
try (LocalLogManagerTestEnv env =
LocalLogManagerTestEnv.createWithMockListeners(3)) {
MetaLogLeader first = env.waitForLeader();
MetaLogLeader cur = first;
LocalLogManagerTestEnv.createWithMockListeners(3, Optional.empty())) {
LeaderAndEpoch first = env.waitForLeader();
LeaderAndEpoch cur = first;
do {
env.logManagers().get(cur.nodeId()).renounce(cur.epoch());
MetaLogLeader next = env.waitForLeader();
int currentLeaderId = cur.leaderId().orElseThrow(() ->
new AssertionError("Current leader is undefined")
);
env.logManagers().get(currentLeaderId).resign(cur.epoch());
LeaderAndEpoch next = env.waitForLeader();
while (next.epoch() == cur.epoch()) {
Thread.sleep(1);
next = env.waitForLeader();
@ -84,7 +88,7 @@ public class LocalLogManagerTest {
assertEquals(expectedNextEpoch, next.epoch(), "Expected next epoch to be " + expectedNextEpoch +
", but found " + next);
cur = next;
} while (cur.nodeId() == first.nodeId());
} while (cur.leaderId().equals(first.leaderId()));
env.close();
assertEquals(null, env.firstError.get());
}
@ -120,15 +124,19 @@ public class LocalLogManagerTest {
@Test
public void testCommits() throws Exception {
try (LocalLogManagerTestEnv env =
LocalLogManagerTestEnv.createWithMockListeners(3)) {
MetaLogLeader leaderInfo = env.waitForLeader();
LocalLogManager activeLogManager = env.logManagers().get(leaderInfo.nodeId());
long epoch = activeLogManager.leader().epoch();
LocalLogManagerTestEnv.createWithMockListeners(3, Optional.empty())) {
LeaderAndEpoch leaderInfo = env.waitForLeader();
int leaderId = leaderInfo.leaderId().orElseThrow(() ->
new AssertionError("Current leader is undefined")
);
LocalLogManager activeLogManager = env.logManagers().get(leaderId);
int epoch = activeLogManager.leaderAndEpoch().epoch();
List<ApiMessageAndVersion> messages = Arrays.asList(
new ApiMessageAndVersion(new RegisterBrokerRecord().setBrokerId(0), (short) 0),
new ApiMessageAndVersion(new RegisterBrokerRecord().setBrokerId(1), (short) 0),
new ApiMessageAndVersion(new RegisterBrokerRecord().setBrokerId(2), (short) 0));
assertEquals(3, activeLogManager.scheduleWrite(epoch, messages));
assertEquals(3, activeLogManager.scheduleAppend(epoch, messages));
for (LocalLogManager logManager : env.logManagers()) {
waitForLastCommittedOffset(3, logManager);
}

View File

@ -20,6 +20,8 @@ package org.apache.kafka.metalog;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.metalog.LocalLogManager.SharedLogData;
import org.apache.kafka.raft.LeaderAndEpoch;
import org.apache.kafka.snapshot.RawSnapshotReader;
import org.apache.kafka.test.TestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -28,6 +30,7 @@ import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
public class LocalLogManagerTestEnv implements AutoCloseable {
@ -55,11 +58,14 @@ public class LocalLogManagerTestEnv implements AutoCloseable {
*/
private final List<LocalLogManager> logManagers;
public static LocalLogManagerTestEnv createWithMockListeners(int numManagers) throws Exception {
LocalLogManagerTestEnv testEnv = new LocalLogManagerTestEnv(numManagers);
public static LocalLogManagerTestEnv createWithMockListeners(
int numManagers,
Optional<RawSnapshotReader> snapshot
) throws Exception {
LocalLogManagerTestEnv testEnv = new LocalLogManagerTestEnv(numManagers, snapshot);
try {
for (LocalLogManager logManager : testEnv.logManagers) {
logManager.register(new MockMetaLogManagerListener());
logManager.register(new MockMetaLogManagerListener(logManager.nodeId().getAsInt()));
}
} catch (Exception e) {
testEnv.close();
@ -68,9 +74,9 @@ public class LocalLogManagerTestEnv implements AutoCloseable {
return testEnv;
}
public LocalLogManagerTestEnv(int numManagers) throws Exception {
public LocalLogManagerTestEnv(int numManagers, Optional<RawSnapshotReader> snapshot) throws Exception {
dir = TestUtils.tempDirectory();
shared = new SharedLogData();
shared = new SharedLogData(snapshot);
List<LocalLogManager> newLogManagers = new ArrayList<>(numManagers);
try {
for (int nodeId = 0; nodeId < numManagers; nodeId++) {
@ -100,16 +106,17 @@ public class LocalLogManagerTestEnv implements AutoCloseable {
return dir;
}
MetaLogLeader waitForLeader() throws InterruptedException {
AtomicReference<MetaLogLeader> value = new AtomicReference<>(null);
LeaderAndEpoch waitForLeader() throws InterruptedException {
AtomicReference<LeaderAndEpoch> value = new AtomicReference<>(null);
TestUtils.retryOnExceptionWithTimeout(3, 20000, () -> {
MetaLogLeader result = null;
LeaderAndEpoch result = null;
for (LocalLogManager logManager : logManagers) {
MetaLogLeader leader = logManager.leader();
if (leader.nodeId() == logManager.nodeId()) {
LeaderAndEpoch leader = logManager.leaderAndEpoch();
int nodeId = logManager.nodeId().getAsInt();
if (leader.isLeader(nodeId)) {
if (result != null) {
throw new RuntimeException("node " + leader.nodeId() +
" thinks it's the leader, but so does " + result.nodeId());
throw new RuntimeException("node " + nodeId +
" thinks it's the leader, but so does " + result.leaderId());
}
result = leader;
}

View File

@ -18,46 +18,90 @@
package org.apache.kafka.metalog;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.raft.Batch;
import org.apache.kafka.raft.BatchReader;
import org.apache.kafka.raft.LeaderAndEpoch;
import org.apache.kafka.raft.RaftClient;
import org.apache.kafka.snapshot.SnapshotReader;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import java.util.ArrayList;
import java.util.List;
import java.util.OptionalInt;
public class MockMetaLogManagerListener implements MetaLogListener {
public class MockMetaLogManagerListener implements RaftClient.Listener<ApiMessageAndVersion> {
public static final String COMMIT = "COMMIT";
public static final String LAST_COMMITTED_OFFSET = "LAST_COMMITTED_OFFSET";
public static final String NEW_LEADER = "NEW_LEADER";
public static final String RENOUNCE = "RENOUNCE";
public static final String SHUTDOWN = "SHUTDOWN";
public static final String SNAPSHOT = "SNAPSHOT";
private final int nodeId;
private final List<String> serializedEvents = new ArrayList<>();
private LeaderAndEpoch leaderAndEpoch = new LeaderAndEpoch(OptionalInt.empty(), 0);
public MockMetaLogManagerListener(int nodeId) {
this.nodeId = nodeId;
}
@Override
public synchronized void handleCommits(long lastCommittedOffset, List<ApiMessage> messages) {
for (ApiMessage message : messages) {
public synchronized void handleCommit(BatchReader<ApiMessageAndVersion> reader) {
try {
while (reader.hasNext()) {
Batch<ApiMessageAndVersion> batch = reader.next();
long lastCommittedOffset = batch.lastOffset();
for (ApiMessageAndVersion messageAndVersion : batch.records()) {
ApiMessage message = messageAndVersion.message();
StringBuilder bld = new StringBuilder();
bld.append(COMMIT).append(" ").append(message.toString());
serializedEvents.add(bld.toString());
}
StringBuilder bld = new StringBuilder();
bld.append(LAST_COMMITTED_OFFSET).append(" ").append(lastCommittedOffset);
serializedEvents.add(bld.toString());
}
} finally {
reader.close();
}
}
@Override
public synchronized void handleSnapshot(SnapshotReader<ApiMessageAndVersion> reader) {
long lastCommittedOffset = reader.snapshotId().offset - 1;
try {
while (reader.hasNext()) {
Batch<ApiMessageAndVersion> batch = reader.next();
for (ApiMessageAndVersion messageAndVersion : batch.records()) {
ApiMessage message = messageAndVersion.message();
StringBuilder bld = new StringBuilder();
bld.append(SNAPSHOT).append(" ").append(message.toString());
serializedEvents.add(bld.toString());
}
StringBuilder bld = new StringBuilder();
bld.append(LAST_COMMITTED_OFFSET).append(" ").append(lastCommittedOffset);
serializedEvents.add(bld.toString());
}
} finally {
reader.close();
}
}
@Override
public synchronized void handleLeaderChange(LeaderAndEpoch newLeaderAndEpoch) {
LeaderAndEpoch oldLeaderAndEpoch = this.leaderAndEpoch;
this.leaderAndEpoch = newLeaderAndEpoch;
if (newLeaderAndEpoch.isLeader(nodeId)) {
StringBuilder bld = new StringBuilder();
bld.append(COMMIT).append(" ").append(message.toString());
bld.append(NEW_LEADER).append(" ").
append(nodeId).append(" ").append(newLeaderAndEpoch.epoch());
serializedEvents.add(bld.toString());
}
StringBuilder bld = new StringBuilder();
bld.append(LAST_COMMITTED_OFFSET).append(" ").append(lastCommittedOffset);
serializedEvents.add(bld.toString());
}
@Override
public void handleNewLeader(MetaLogLeader leader) {
StringBuilder bld = new StringBuilder();
bld.append(NEW_LEADER).append(" ").
append(leader.nodeId()).append(" ").append(leader.epoch());
synchronized (this) {
serializedEvents.add(bld.toString());
}
}
@Override
public void handleRenounce(long epoch) {
StringBuilder bld = new StringBuilder();
bld.append(RENOUNCE).append(" ").append(epoch);
synchronized (this) {
} else if (oldLeaderAndEpoch.isLeader(nodeId)) {
StringBuilder bld = new StringBuilder();
bld.append(RENOUNCE).append(" ").append(newLeaderAndEpoch.epoch());
serializedEvents.add(bld.toString());
}
}

View File

@ -38,8 +38,6 @@ import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.FetchResponse;
import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.Records;
@ -51,10 +49,12 @@ import org.apache.kafka.common.requests.DescribeQuorumRequest;
import org.apache.kafka.common.requests.DescribeQuorumResponse;
import org.apache.kafka.common.requests.EndQuorumEpochRequest;
import org.apache.kafka.common.requests.EndQuorumEpochResponse;
import org.apache.kafka.common.requests.FetchResponse;
import org.apache.kafka.common.requests.FetchSnapshotRequest;
import org.apache.kafka.common.requests.FetchSnapshotResponse;
import org.apache.kafka.common.requests.VoteRequest;
import org.apache.kafka.common.requests.VoteResponse;
import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
@ -358,17 +358,15 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
}
}
private void maybeFireHandleClaim(LeaderState<T> state) {
int leaderEpoch = state.epoch();
long epochStartOffset = state.epochStartOffset();
private void maybeFireLeaderChange(LeaderState<T> state) {
for (ListenerContext listenerContext : listenerContexts) {
listenerContext.maybeFireHandleClaim(leaderEpoch, epochStartOffset);
listenerContext.maybeFireLeaderChange(quorum.leaderAndEpoch(), state.epochStartOffset());
}
}
private void fireHandleResign(int epoch) {
private void maybeFireLeaderChange() {
for (ListenerContext listenerContext : listenerContexts) {
listenerContext.fireHandleResign(epoch);
listenerContext.maybeFireLeaderChange(quorum.leaderAndEpoch());
}
}
@ -409,6 +407,11 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
return quorum.leaderAndEpoch();
}
@Override
public OptionalInt nodeId() {
return quorum.localId();
}
private OffsetAndEpoch endOffset() {
return new OffsetAndEpoch(log.endOffset().offset, log.lastFetchedEpoch());
}
@ -432,6 +435,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
);
LeaderState<T> state = quorum.transitionToLeader(endOffset, accumulator);
maybeFireLeaderChange(state);
log.initializeLeaderEpoch(quorum.epoch());
@ -467,33 +471,28 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
}
}
private void maybeResignLeadership() {
if (quorum.isLeader()) {
fireHandleResign(quorum.epoch());
}
}
private void transitionToCandidate(long currentTimeMs) throws IOException {
maybeResignLeadership();
quorum.transitionToCandidate();
maybeFireLeaderChange();
onBecomeCandidate(currentTimeMs);
}
private void transitionToUnattached(int epoch) throws IOException {
maybeResignLeadership();
quorum.transitionToUnattached(epoch);
maybeFireLeaderChange();
resetConnections();
}
private void transitionToResigned(List<Integer> preferredSuccessors) {
fetchPurgatory.completeAllExceptionally(Errors.BROKER_NOT_AVAILABLE.exception("The broker is shutting down"));
quorum.transitionToResigned(preferredSuccessors);
maybeFireLeaderChange();
resetConnections();
}
private void transitionToVoted(int candidateId, int epoch) throws IOException {
maybeResignLeadership();
quorum.transitionToVoted(epoch, candidateId);
maybeFireLeaderChange();
resetConnections();
}
@ -517,8 +516,8 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
int leaderId,
long currentTimeMs
) throws IOException {
maybeResignLeadership();
quorum.transitionToFollower(epoch, leaderId);
maybeFireLeaderChange();
onBecomeFollower(currentTimeMs);
}
@ -1922,7 +1921,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
private long pollLeader(long currentTimeMs) {
LeaderState<T> state = quorum.leaderStateOrThrow();
maybeFireHandleClaim(state);
maybeFireLeaderChange(state);
GracefulShutdown shutdown = this.shutdown.get();
if (shutdown != null) {
@ -2260,6 +2259,11 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
return shutdownComplete;
}
@Override
public void resign(int epoch) {
throw new UnsupportedOperationException();
}
@Override
public SnapshotWriter<T> createSnapshot(OffsetAndEpoch snapshotId) {
return new SnapshotWriter<>(
@ -2328,7 +2332,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
private final class ListenerContext implements CloseListener<BatchReader<T>> {
private final RaftClient.Listener<T> listener;
// This field is used only by the Raft IO thread
private int claimedEpoch = 0;
private LeaderAndEpoch lastFiredLeaderChange = new LeaderAndEpoch(OptionalInt.empty(), 0);
// These fields are visible to both the Raft IO thread and the listener
// and are protected through synchronization on this `ListenerContext` instance
@ -2420,19 +2424,33 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
listener.handleCommit(reader);
}
void maybeFireHandleClaim(int epoch, long epochStartOffset) {
// We can fire `handleClaim` as soon as the listener has caught
// up to the start of the leader epoch. This guarantees that the
// state machine has seen the full committed state before it becomes
// leader and begins writing to the log.
if (epoch > claimedEpoch && nextOffset() >= epochStartOffset) {
claimedEpoch = epoch;
listener.handleClaim(epoch);
void maybeFireLeaderChange(LeaderAndEpoch leaderAndEpoch) {
if (shouldFireLeaderChange(leaderAndEpoch)) {
lastFiredLeaderChange = leaderAndEpoch;
listener.handleLeaderChange(leaderAndEpoch);
}
}
void fireHandleResign(int epoch) {
listener.handleResign(epoch);
private boolean shouldFireLeaderChange(LeaderAndEpoch leaderAndEpoch) {
if (leaderAndEpoch.equals(lastFiredLeaderChange)) {
return false;
} else if (leaderAndEpoch.epoch() > lastFiredLeaderChange.epoch()) {
return true;
} else {
return leaderAndEpoch.leaderId().isPresent() &&
!lastFiredLeaderChange.leaderId().isPresent();
}
}
void maybeFireLeaderChange(LeaderAndEpoch leaderAndEpoch, long epochStartOffset) {
// If this node is becoming the leader, then we can fire `handleClaim` as soon
// as the listener has caught up to the start of the leader epoch. This guarantees
// that the state machine has seen the full committed state before it becomes
// leader and begins writing to the log.
if (shouldFireLeaderChange(leaderAndEpoch) && nextOffset() >= epochStartOffset) {
lastFiredLeaderChange = leaderAndEpoch;
listener.handleLeaderChange(leaderAndEpoch);
}
}
public synchronized void onClose(BatchReader<T> reader) {

View File

@ -20,14 +20,26 @@ import java.util.Objects;
import java.util.OptionalInt;
public class LeaderAndEpoch {
public final OptionalInt leaderId;
public final int epoch;
private final OptionalInt leaderId;
private final int epoch;
public LeaderAndEpoch(OptionalInt leaderId, int epoch) {
this.leaderId = Objects.requireNonNull(leaderId);
this.epoch = epoch;
}
public OptionalInt leaderId() {
return leaderId;
}
public int epoch() {
return epoch;
}
public boolean isLeader(int nodeId) {
return leaderId.isPresent() && leaderId.getAsInt() == nodeId;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
@ -41,4 +53,12 @@ public class LeaderAndEpoch {
public int hashCode() {
return Objects.hash(leaderId, epoch);
}
@Override
public String toString() {
return "LeaderAndEpoch(" +
"leaderId=" + leaderId +
", epoch=" + epoch +
')';
}
}

View File

@ -223,6 +223,10 @@ public class QuorumState {
return localId.orElseThrow(() -> new IllegalStateException("Required local id is not present"));
}
public OptionalInt localId() {
return localId;
}
public int epoch() {
return state.epoch();
}

View File

@ -20,6 +20,7 @@ import org.apache.kafka.snapshot.SnapshotReader;
import org.apache.kafka.snapshot.SnapshotWriter;
import java.util.List;
import java.util.OptionalInt;
import java.util.concurrent.CompletableFuture;
public interface RaftClient<T> extends AutoCloseable {
@ -56,24 +57,29 @@ public interface RaftClient<T> extends AutoCloseable {
void handleSnapshot(SnapshotReader<T> reader);
/**
* Invoked after this node has become a leader. This is only called after
* all commits up to the start of the leader's epoch have been sent to
* {@link #handleCommit(BatchReader)}.
* Called on any change to leadership. This includes both when a leader is elected and
* when a leader steps down or fails.
*
* After becoming a leader, the client is eligible to write to the log
* using {@link #scheduleAppend(int, List)} or {@link #scheduleAtomicAppend(int, List)}.
* If this node is the leader, then the notification of leadership will be delayed until
* the implementation of this interface has caughup to the high-watermark through calls to
* {@link #handleSnapshot(SnapshotReader)} and {@link #handleCommit(BatchReader)}.
*
* @param epoch the claimed leader epoch
* If this node is not the leader, then this method will be called as soon as possible. In
* this case the leader may or may not be known for the current epoch.
*
* Subsequent calls to this method will expose a monotonically increasing epoch. For a
* given epoch the leader may be unknown, {@code leader.leaderId} is {@code OptionalInt#empty},
* or known {@code leader.leaderId} is {@code OptionalInt#of}. Once a leader is known for
* a given epoch it will remain the leader for that epoch. In other words, the implementation of
* method should expect this method will be called at most twice for each epoch. Once if the
* epoch changed but the leader is not known and once when the leader is known for the current
* epoch.
*
* @param leader the current leader and epoch
*/
default void handleClaim(int epoch) {}
default void handleLeaderChange(LeaderAndEpoch leader) {}
/**
* Invoked after a leader has stepped down. This callback may or may not
* fire before the next leader has been elected.
*
* @param epoch the epoch that the leader is resigning from
*/
default void handleResign(int epoch) {}
default void beginShutdown() {}
}
/**
@ -94,6 +100,14 @@ public interface RaftClient<T> extends AutoCloseable {
*/
LeaderAndEpoch leaderAndEpoch();
/**
* Get local nodeId if one is defined. This may be absent when the client is used
* as an anonymous observer, as in the case of the metadata shell.
*
* @return optional node id
*/
OptionalInt nodeId();
/**
* Append a list of records to the log. The write will be scheduled for some time
* in the future. There is no guarantee that appended records will be written to
@ -155,6 +169,16 @@ public interface RaftClient<T> extends AutoCloseable {
*/
CompletableFuture<Void> shutdown(int timeoutMs);
/**
* Resign the leadership. The leader will give up its leadership in the current epoch,
* and a new election will be held. Note that nothing prevents this leader from getting
* reelected.
*
* @param epoch the epoch to resign from. If this does not match the current epoch, this
* call will be ignored.
*/
void resign(int epoch);
/**
* Create a writable snapshot file for a given offset and epoch.
*

View File

@ -140,17 +140,16 @@ public class ReplicatedCounter implements RaftClient.Listener<Integer> {
}
@Override
public synchronized void handleClaim(int epoch) {
log.debug("Counter uncommitted value initialized to {} after claiming leadership in epoch {}",
committed, epoch);
uncommitted = committed;
claimedEpoch = OptionalInt.of(epoch);
}
@Override
public synchronized void handleResign(int epoch) {
log.debug("Counter uncommitted value reset after resigning leadership");
this.uncommitted = -1;
this.claimedEpoch = OptionalInt.empty();
public synchronized void handleLeaderChange(LeaderAndEpoch newLeader) {
if (newLeader.isLeader(nodeId)) {
log.debug("Counter uncommitted value initialized to {} after claiming leadership in epoch {}",
committed, newLeader);
uncommitted = committed;
claimedEpoch = OptionalInt.of(newLeader.epoch());
} else {
log.debug("Counter uncommitted value reset after resigning leadership");
uncommitted = -1;
claimedEpoch = OptionalInt.empty();
}
}
}

View File

@ -1,152 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.raft.metadata;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.metalog.MetaLogLeader;
import org.apache.kafka.metalog.MetaLogListener;
import org.apache.kafka.metalog.MetaLogManager;
import org.apache.kafka.raft.Batch;
import org.apache.kafka.raft.BatchReader;
import org.apache.kafka.raft.LeaderAndEpoch;
import org.apache.kafka.raft.RaftClient;
import org.apache.kafka.snapshot.SnapshotReader;
import java.util.List;
import java.util.stream.Collectors;
/**
* For now, we rely on a shim to translate from `RaftClient` to `MetaLogManager`.
* Once we check in to trunk, we can drop `RaftClient` and implement `MetaLogManager`
* directly.
*/
public class MetaLogRaftShim implements MetaLogManager {
private final RaftClient<ApiMessageAndVersion> client;
private final int nodeId;
public MetaLogRaftShim(RaftClient<ApiMessageAndVersion> client, int nodeId) {
this.client = client;
this.nodeId = nodeId;
}
@Override
public void initialize() {
// NO-OP - The RaftClient is initialized externally
}
@Override
public void register(MetaLogListener listener) {
client.register(new ListenerShim(listener));
}
@Override
public long scheduleAtomicWrite(long epoch, List<ApiMessageAndVersion> batch) {
return write(epoch, batch, true);
}
@Override
public long scheduleWrite(long epoch, List<ApiMessageAndVersion> batch) {
return write(epoch, batch, false);
}
private long write(long epoch, List<ApiMessageAndVersion> batch, boolean isAtomic) {
final Long result;
if (isAtomic) {
result = client.scheduleAtomicAppend((int) epoch, batch);
} else {
result = client.scheduleAppend((int) epoch, batch);
}
if (result == null) {
throw new IllegalArgumentException(
String.format(
"Unable to alloate a buffer for the schedule write operation: epoch %s, batch %s)",
epoch,
batch
)
);
} else {
return result;
}
}
@Override
public void renounce(long epoch) {
throw new UnsupportedOperationException();
}
@Override
public MetaLogLeader leader() {
LeaderAndEpoch leaderAndEpoch = client.leaderAndEpoch();
return new MetaLogLeader(leaderAndEpoch.leaderId.orElse(-1), leaderAndEpoch.epoch);
}
@Override
public int nodeId() {
return nodeId;
}
private class ListenerShim implements RaftClient.Listener<ApiMessageAndVersion> {
private final MetaLogListener listener;
private ListenerShim(MetaLogListener listener) {
this.listener = listener;
}
@Override
public void handleCommit(BatchReader<ApiMessageAndVersion> reader) {
try {
// TODO: The `BatchReader` might need to read from disk if this is
// not a leader. We want to move this IO to the state machine so that
// it does not block Raft replication
while (reader.hasNext()) {
Batch<ApiMessageAndVersion> batch = reader.next();
List<ApiMessage> records = batch.records().stream()
.map(ApiMessageAndVersion::message)
.collect(Collectors.toList());
listener.handleCommits(batch.lastOffset(), records);
}
} finally {
reader.close();
}
}
@Override
public void handleSnapshot(SnapshotReader<ApiMessageAndVersion> reader) {
reader.close();
}
@Override
public void handleClaim(int epoch) {
listener.handleNewLeader(new MetaLogLeader(nodeId, epoch));
}
@Override
public void handleResign(int epoch) {
listener.handleRenounce(epoch);
}
@Override
public String toString() {
return "ListenerShim(" +
"listener=" + listener +
')';
}
}
}

View File

@ -145,7 +145,7 @@ final public class KafkaRaftClientSnapshotTest {
context.pollUntilRequest();
context.assertSentFetchRequest(epoch, localLogEndOffset, snapshotId.epoch);
RaftClientTestContext.MockListener secondListener = new RaftClientTestContext.MockListener();
RaftClientTestContext.MockListener secondListener = new RaftClientTestContext.MockListener(OptionalInt.of(localId));
context.client.register(secondListener);
context.client.poll();

View File

@ -2336,7 +2336,7 @@ public class KafkaRaftClientTest {
assertEquals(9L, context.listener.claimedEpochStartOffset(epoch));
// Register a second listener and allow it to catch up to the high watermark
RaftClientTestContext.MockListener secondListener = new RaftClientTestContext.MockListener();
RaftClientTestContext.MockListener secondListener = new RaftClientTestContext.MockListener(OptionalInt.of(localId));
context.client.register(secondListener);
context.pollUntil(() -> OptionalInt.of(epoch).equals(secondListener.currentClaimedEpoch()));
assertEquals(OptionalLong.of(8L), secondListener.lastCommitOffset());
@ -2427,7 +2427,7 @@ public class KafkaRaftClientTest {
assertEquals(OptionalLong.of(10L), context.client.highWatermark());
// Register another listener and verify that it catches up while we remain 'voted'
RaftClientTestContext.MockListener secondListener = new RaftClientTestContext.MockListener();
RaftClientTestContext.MockListener secondListener = new RaftClientTestContext.MockListener(OptionalInt.of(localId));
context.client.register(secondListener);
context.client.poll();
context.assertVotedCandidate(candidateEpoch, otherNodeId);
@ -2477,7 +2477,7 @@ public class KafkaRaftClientTest {
context.assertVotedCandidate(candidateEpoch, localId);
// Register another listener and verify that it catches up
RaftClientTestContext.MockListener secondListener = new RaftClientTestContext.MockListener();
RaftClientTestContext.MockListener secondListener = new RaftClientTestContext.MockListener(OptionalInt.of(localId));
context.client.register(secondListener);
context.client.poll();
context.assertVotedCandidate(candidateEpoch, localId);

View File

@ -27,12 +27,11 @@ import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.record.SimpleRecord;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.record.UnalignedMemoryRecords;
import org.apache.kafka.common.record.UnalignedRecords;
import org.apache.kafka.common.utils.ByteBufferOutputStream;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.snapshot.RawSnapshotReader;
import org.apache.kafka.snapshot.RawSnapshotWriter;
import org.apache.kafka.snapshot.MockRawSnapshotReader;
import org.apache.kafka.snapshot.MockRawSnapshotWriter;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@ -407,7 +406,9 @@ public class MockLog implements ReplicatedLog {
@Override
public RawSnapshotWriter createSnapshot(OffsetAndEpoch snapshotId) {
return new MockRawSnapshotWriter(snapshotId);
return new MockRawSnapshotWriter(snapshotId, buffer -> {
snapshots.putIfAbsent(snapshotId, new MockRawSnapshotReader(snapshotId, buffer));
});
}
@Override
@ -615,99 +616,4 @@ public class MockLog implements ReplicatedLog {
this.startOffset = startOffset;
}
}
final class MockRawSnapshotWriter implements RawSnapshotWriter {
private final OffsetAndEpoch snapshotId;
private ByteBufferOutputStream data;
private boolean frozen;
public MockRawSnapshotWriter(OffsetAndEpoch snapshotId) {
this.snapshotId = snapshotId;
this.data = new ByteBufferOutputStream(0);
this.frozen = false;
}
@Override
public OffsetAndEpoch snapshotId() {
return snapshotId;
}
@Override
public long sizeInBytes() {
if (frozen) {
throw new RuntimeException("Snapshot is already frozen " + snapshotId);
}
return data.position();
}
@Override
public void append(UnalignedMemoryRecords records) {
if (frozen) {
throw new RuntimeException("Snapshot is already frozen " + snapshotId);
}
data.write(records.buffer());
}
@Override
public void append(MemoryRecords records) {
if (frozen) {
throw new RuntimeException("Snapshot is already frozen " + snapshotId);
}
data.write(records.buffer());
}
@Override
public boolean isFrozen() {
return frozen;
}
@Override
public void freeze() {
if (frozen) {
throw new RuntimeException("Snapshot is already frozen " + snapshotId);
}
frozen = true;
ByteBuffer buffer = data.buffer();
buffer.flip();
snapshots.putIfAbsent(snapshotId, new MockRawSnapshotReader(snapshotId, buffer));
}
@Override
public void close() {}
}
final static class MockRawSnapshotReader implements RawSnapshotReader {
private final OffsetAndEpoch snapshotId;
private final MemoryRecords data;
MockRawSnapshotReader(OffsetAndEpoch snapshotId, ByteBuffer data) {
this.snapshotId = snapshotId;
this.data = MemoryRecords.readableRecords(data);
}
@Override
public OffsetAndEpoch snapshotId() {
return snapshotId;
}
@Override
public long sizeInBytes() {
return data.sizeInBytes();
}
@Override
public UnalignedRecords slice(long position, int size) {
ByteBuffer buffer = data.buffer();
buffer.position(Math.toIntExact(position));
buffer.limit(Math.min(buffer.limit(), Math.toIntExact(position + size)));
return new UnalignedMemoryRecords(buffer.slice());
}
@Override
public Records records() {
return data;
}
}
}

View File

@ -223,7 +223,7 @@ public final class RaftClientTestContext {
Metrics metrics = new Metrics(time);
MockNetworkChannel channel = new MockNetworkChannel(voters);
LogContext logContext = new LogContext();
MockListener listener = new MockListener();
MockListener listener = new MockListener(localId);
Map<Integer, RaftConfig.AddressSpec> voterAddressMap = voters.stream()
.collect(Collectors.toMap(id -> id, RaftClientTestContext::mockAddress));
RaftConfig raftConfig = new RaftConfig(voterAddressMap, requestTimeoutMs, RETRY_BACKOFF_MS, electionTimeoutMs,
@ -365,11 +365,11 @@ public final class RaftClientTestContext {
}
OptionalInt currentLeader() {
return currentLeaderAndEpoch().leaderId;
return currentLeaderAndEpoch().leaderId();
}
int currentEpoch() {
return currentLeaderAndEpoch().epoch;
return currentLeaderAndEpoch().epoch();
}
LeaderAndEpoch currentLeaderAndEpoch() {
@ -1057,9 +1057,14 @@ public final class RaftClientTestContext {
private final List<BatchReader<String>> savedBatches = new ArrayList<>();
private final Map<Integer, Long> claimedEpochStartOffsets = new HashMap<>();
private OptionalInt currentClaimedEpoch = OptionalInt.empty();
private final OptionalInt localId;
private Optional<SnapshotReader<String>> snapshot = Optional.empty();
private boolean readCommit = true;
MockListener(OptionalInt localId) {
this.localId = localId;
}
int numCommittedBatches() {
return commits.size();
}
@ -1141,19 +1146,18 @@ public final class RaftClientTestContext {
}
@Override
public void handleClaim(int epoch) {
public void handleLeaderChange(LeaderAndEpoch leader) {
// We record the next expected offset as the claimed epoch's start
// offset. This is useful to verify that the `handleClaim` callback
// was not received early.
long claimedEpochStartOffset = lastCommitOffset().isPresent() ?
lastCommitOffset().getAsLong() + 1 : 0L;
this.currentClaimedEpoch = OptionalInt.of(epoch);
this.claimedEpochStartOffsets.put(epoch, claimedEpochStartOffset);
}
@Override
public void handleResign(int epoch) {
this.currentClaimedEpoch = OptionalInt.empty();
if (localId.isPresent() && leader.isLeader(localId.getAsInt())) {
long claimedEpochStartOffset = lastCommitOffset().isPresent() ?
lastCommitOffset().getAsLong() + 1 : 0L;
this.currentClaimedEpoch = OptionalInt.of(leader.epoch());
this.claimedEpochStartOffsets.put(leader.epoch(), claimedEpochStartOffset);
} else {
this.currentClaimedEpoch = OptionalInt.empty();
}
}
@Override

View File

@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.snapshot;
import java.nio.ByteBuffer;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.record.UnalignedMemoryRecords;
import org.apache.kafka.common.record.UnalignedRecords;
import org.apache.kafka.raft.OffsetAndEpoch;
public final class MockRawSnapshotReader implements RawSnapshotReader {
private final OffsetAndEpoch snapshotId;
private final MemoryRecords data;
public MockRawSnapshotReader(OffsetAndEpoch snapshotId, ByteBuffer data) {
this.snapshotId = snapshotId;
this.data = MemoryRecords.readableRecords(data);
}
@Override
public OffsetAndEpoch snapshotId() {
return snapshotId;
}
@Override
public long sizeInBytes() {
return data.sizeInBytes();
}
@Override
public UnalignedRecords slice(long position, int size) {
ByteBuffer buffer = data.buffer();
buffer.position(Math.toIntExact(position));
buffer.limit(Math.min(buffer.limit(), Math.toIntExact(position + size)));
return new UnalignedMemoryRecords(buffer.slice());
}
@Override
public Records records() {
return data;
}
}

View File

@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.snapshot;
import java.nio.ByteBuffer;
import java.util.function.Consumer;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.UnalignedMemoryRecords;
import org.apache.kafka.common.utils.ByteBufferOutputStream;
import org.apache.kafka.raft.OffsetAndEpoch;
public final class MockRawSnapshotWriter implements RawSnapshotWriter {
private final ByteBufferOutputStream data = new ByteBufferOutputStream(0);
private final OffsetAndEpoch snapshotId;
private final Consumer<ByteBuffer> frozenHandler;
private boolean frozen = false;
public MockRawSnapshotWriter(
OffsetAndEpoch snapshotId,
Consumer<ByteBuffer> frozenHandler
) {
this.snapshotId = snapshotId;
this.frozenHandler = frozenHandler;
}
@Override
public OffsetAndEpoch snapshotId() {
return snapshotId;
}
@Override
public long sizeInBytes() {
ensureNotFrozen();
return data.position();
}
@Override
public void append(UnalignedMemoryRecords records) {
ensureNotFrozen();
data.write(records.buffer());
}
@Override
public void append(MemoryRecords records) {
ensureNotFrozen();
data.write(records.buffer());
}
@Override
public boolean isFrozen() {
return frozen;
}
@Override
public void freeze() {
ensureNotFrozen();
frozen = true;
ByteBuffer buffer = data.buffer();
buffer.flip();
frozenHandler.accept(buffer);
}
@Override
public void close() {}
private void ensureNotFrozen() {
if (frozen) {
throw new IllegalStateException("Snapshot is already frozen " + snapshotId);
}
}
}

View File

@ -36,21 +36,19 @@ import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.metalog.MetaLogLeader;
import org.apache.kafka.metalog.MetaLogListener;
import org.apache.kafka.queue.EventQueue;
import org.apache.kafka.queue.KafkaEventQueue;
import org.apache.kafka.raft.Batch;
import org.apache.kafka.raft.BatchReader;
import org.apache.kafka.raft.LeaderAndEpoch;
import org.apache.kafka.raft.RaftClient;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.snapshot.SnapshotReader;
import org.apache.kafka.shell.MetadataNode.DirectoryNode;
import org.apache.kafka.shell.MetadataNode.FileNode;
import org.apache.kafka.snapshot.SnapshotReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
@ -79,13 +77,15 @@ public final class MetadataNodeManager implements AutoCloseable {
}
}
class LogListener implements MetaLogListener, RaftClient.Listener<ApiMessageAndVersion> {
class LogListener implements RaftClient.Listener<ApiMessageAndVersion> {
@Override
public void handleCommit(BatchReader<ApiMessageAndVersion> reader) {
try {
// TODO: handle lastOffset
while (reader.hasNext()) {
Batch<ApiMessageAndVersion> batch = reader.next();
log.debug("handleCommits " + batch.records() + " at offset " + batch.lastOffset());
DirectoryNode dir = data.root.mkdirs("metadataQuorum");
dir.create("offset").setContents(String.valueOf(batch.lastOffset()));
for (ApiMessageAndVersion messageAndVersion : batch.records()) {
handleMessage(messageAndVersion.message());
}
@ -95,18 +95,6 @@ public final class MetadataNodeManager implements AutoCloseable {
}
}
@Override
public void handleCommits(long lastOffset, List<ApiMessage> messages) {
appendEvent("handleCommits", () -> {
log.debug("handleCommits " + messages + " at offset " + lastOffset);
DirectoryNode dir = data.root.mkdirs("metadataQuorum");
dir.create("offset").setContents(String.valueOf(lastOffset));
for (ApiMessage message : messages) {
handleMessage(message);
}
}, null);
}
@Override
public void handleSnapshot(SnapshotReader<ApiMessageAndVersion> reader) {
try {
@ -122,7 +110,7 @@ public final class MetadataNodeManager implements AutoCloseable {
}
@Override
public void handleNewLeader(MetaLogLeader leader) {
public void handleLeaderChange(LeaderAndEpoch leader) {
appendEvent("handleNewLeader", () -> {
log.debug("handleNewLeader " + leader);
DirectoryNode dir = data.root.mkdirs("metadataQuorum");
@ -130,21 +118,9 @@ public final class MetadataNodeManager implements AutoCloseable {
}, null);
}
@Override
public void handleClaim(int epoch) {
// This shouldn't happen because we should never be the leader.
log.debug("RaftClient.Listener sent handleClaim(epoch=" + epoch + ")");
}
@Override
public void handleRenounce(long epoch) {
// This shouldn't happen because we should never be the leader.
log.debug("MetaLogListener sent handleRenounce(epoch=" + epoch + ")");
}
@Override
public void beginShutdown() {
log.debug("MetaLogListener sent beginShutdown");
log.debug("Metadata log listener sent beginShutdown");
}
}

View File

@ -18,7 +18,6 @@
package org.apache.kafka.shell;
import org.apache.kafka.common.message.LeaderChangeMessage;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.record.ControlRecordType;
import org.apache.kafka.common.record.FileLogInputStream.FileChannelRecordBatch;
@ -26,19 +25,23 @@ import org.apache.kafka.common.record.FileRecords;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.metalog.MetaLogLeader;
import org.apache.kafka.metalog.MetaLogListener;
import org.apache.kafka.raft.metadata.MetadataRecordSerde;
import org.apache.kafka.metadata.MetadataRecordSerde;
import org.apache.kafka.queue.EventQueue;
import org.apache.kafka.queue.KafkaEventQueue;
import org.apache.kafka.raft.Batch;
import org.apache.kafka.raft.LeaderAndEpoch;
import org.apache.kafka.raft.RaftClient;
import org.apache.kafka.raft.internals.MemoryBatchReader;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.OptionalInt;
import java.util.concurrent.CompletableFuture;
@ -49,14 +52,14 @@ public final class SnapshotFileReader implements AutoCloseable {
private static final Logger log = LoggerFactory.getLogger(SnapshotFileReader.class);
private final String snapshotPath;
private final MetaLogListener listener;
private final RaftClient.Listener<ApiMessageAndVersion> listener;
private final KafkaEventQueue queue;
private final CompletableFuture<Void> caughtUpFuture;
private FileRecords fileRecords;
private Iterator<FileChannelRecordBatch> batchIterator;
private final MetadataRecordSerde serde = new MetadataRecordSerde();
public SnapshotFileReader(String snapshotPath, MetaLogListener listener) {
public SnapshotFileReader(String snapshotPath, RaftClient.Listener<ApiMessageAndVersion> listener) {
this.snapshotPath = snapshotPath;
this.listener = listener;
this.queue = new KafkaEventQueue(Time.SYSTEM,
@ -101,7 +104,7 @@ public final class SnapshotFileReader implements AutoCloseable {
private void scheduleHandleNextBatch() {
queue.append(new EventQueue.Event() {
@Override
public void run() throws Exception {
public void run() {
handleNextBatch();
}
@ -123,8 +126,10 @@ public final class SnapshotFileReader implements AutoCloseable {
case LEADER_CHANGE:
LeaderChangeMessage message = new LeaderChangeMessage();
message.read(new ByteBufferAccessor(record.value()), (short) 0);
listener.handleNewLeader(new MetaLogLeader(message.leaderId(),
batch.partitionLeaderEpoch()));
listener.handleLeaderChange(new LeaderAndEpoch(
OptionalInt.of(message.leaderId()),
batch.partitionLeaderEpoch()
));
break;
default:
log.error("Ignoring control record with type {} at offset {}",
@ -137,18 +142,28 @@ public final class SnapshotFileReader implements AutoCloseable {
}
private void handleMetadataBatch(FileChannelRecordBatch batch) {
List<ApiMessage> messages = new ArrayList<>();
for (Iterator<Record> iter = batch.iterator(); iter.hasNext(); ) {
Record record = iter.next();
List<ApiMessageAndVersion> messages = new ArrayList<>();
for (Record record : batch) {
ByteBufferAccessor accessor = new ByteBufferAccessor(record.value());
try {
ApiMessageAndVersion messageAndVersion = serde.read(accessor, record.valueSize());
messages.add(messageAndVersion.message());
messages.add(messageAndVersion);
} catch (Throwable e) {
log.error("unable to read metadata record at offset {}", record.offset(), e);
}
}
listener.handleCommits(batch.lastOffset(), messages);
listener.handleCommit(
new MemoryBatchReader<>(
Collections.singletonList(
Batch.of(
batch.baseOffset(),
batch.partitionLeaderEpoch(),
messages
)
),
reader -> { }
)
);
}
public void beginShutdown(String reason) {