MINOR: Fix highest offset when loading KRaft metadata snapshots (#11386)

When loading a snapshot the broker BrokerMetadataListener was using the batch's append time, offset
and epoch. These are not the same as the append time, offset and epoch from the log. This PR fixes
it to instead use the lastContainedLogTimeStamp, lastContainedLogOffset and lastContainedLogEpoch
from the SnapshotReader.

This PR refactors the MetadataImage and MetadataDelta to include an offset and epoch. It also swaps
the order of the arguments for ReplicaManager.applyDelta, in order to be more consistent with
MetadataPublisher.publish.

Reviewers: Colin P. McCabe <cmccabe@apache.org>
This commit is contained in:
José Armando García Sancio 2021-10-12 17:19:03 -07:00 committed by GitHub
parent 65b01a0464
commit da58d75c43
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 235 additions and 150 deletions

View File

@ -237,6 +237,7 @@
<allow pkg="org.apache.kafka.common.metadata" />
<allow pkg="org.apache.kafka.common.protocol" />
<allow pkg="org.apache.kafka.common.quota" />
<allow pkg="org.apache.kafka.raft" />
<allow pkg="org.apache.kafka.common.requests" />
<allow pkg="org.apache.kafka.metadata" />
<allow pkg="org.apache.kafka.server.common" />
@ -249,6 +250,7 @@
<allow pkg="org.apache.kafka.common.message" />
<allow pkg="org.apache.kafka.common.metadata" />
<allow pkg="org.apache.kafka.common.protocol" />
<allow pkg="org.apache.kafka.image" />
<allow pkg="org.apache.kafka.metadata" />
<allow pkg="org.apache.kafka.metalog" />
<allow pkg="org.apache.kafka.raft" />

View File

@ -331,7 +331,7 @@ class BrokerServer(
setPort(if (ep.port == 0) socketServer.boundPort(ep.listenerName) else ep.port).
setSecurityProtocol(ep.securityProtocol.id))
}
lifecycleManager.start(() => metadataListener.highestMetadataOffset(),
lifecycleManager.start(() => metadataListener.highestMetadataOffset,
BrokerToControllerChannelManager(controllerNodeProvider, time, metrics, config,
"heartbeat", threadNamePrefix, config.brokerSessionTimeoutMs.toLong),
metaProps.clusterId, networkListeners, supportedFeatures)

View File

@ -2104,10 +2104,10 @@ class ReplicaManager(val config: KafkaConfig,
/**
* Apply a KRaft topic change delta.
*
* @param newImage The new metadata image.
* @param delta The delta to apply.
* @param newImage The new metadata image.
*/
def applyDelta(newImage: MetadataImage, delta: TopicsDelta): Unit = {
def applyDelta(delta: TopicsDelta, newImage: MetadataImage): Unit = {
// Before taking the lock, compute the local changes
val localChanges = delta.localChanges(config.nodeId)

View File

@ -58,12 +58,7 @@ class BrokerMetadataListener(
/**
* The highest metadata offset that we've seen. Written only from the event queue thread.
*/
@volatile var _highestMetadataOffset = -1L
/**
* The highest metadata log epoch that we've seen. Written only from the event queue thread.
*/
private var _highestEpoch = -1
@volatile var _highestOffset = -1L
/**
* The highest metadata log time that we've seen. Written only from the event queue thread.
@ -101,7 +96,7 @@ class BrokerMetadataListener(
/**
* Returns the highest metadata-offset. Thread-safe.
*/
def highestMetadataOffset(): Long = _highestMetadataOffset
def highestMetadataOffset: Long = _highestOffset
/**
* Handle new metadata records.
@ -113,7 +108,7 @@ class BrokerMetadataListener(
extends EventQueue.FailureLoggingEvent(log) {
override def run(): Unit = {
val results = try {
val loadResults = loadBatches(_delta, reader)
val loadResults = loadBatches(_delta, reader, None, None, None)
if (isDebugEnabled) {
debug(s"Loaded new commits: ${loadResults}")
}
@ -121,15 +116,12 @@ class BrokerMetadataListener(
} finally {
reader.close()
}
_publisher.foreach(publish(_, results.highestMetadataOffset))
_publisher.foreach(publish)
snapshotter.foreach { snapshotter =>
_bytesSinceLastSnapshot = _bytesSinceLastSnapshot + results.numBytes
if (shouldSnapshot()) {
if (snapshotter.maybeStartSnapshot(results.highestMetadataOffset,
_highestEpoch,
_highestTimestamp,
_delta.apply())) {
if (snapshotter.maybeStartSnapshot(_highestTimestamp, _delta.apply())) {
_bytesSinceLastSnapshot = 0L
}
}
@ -150,48 +142,74 @@ class BrokerMetadataListener(
class HandleSnapshotEvent(reader: SnapshotReader[ApiMessageAndVersion])
extends EventQueue.FailureLoggingEvent(log) {
override def run(): Unit = {
val results = try {
try {
info(s"Loading snapshot ${reader.snapshotId().offset}-${reader.snapshotId().epoch}.")
_delta = new MetadataDelta(_image) // Discard any previous deltas.
val loadResults = loadBatches(_delta, reader)
val loadResults = loadBatches(
_delta,
reader,
Some(reader.lastContainedLogTimestamp),
Some(reader.lastContainedLogOffset),
Some(reader.lastContainedLogEpoch)
)
_delta.finishSnapshot()
info(s"Loaded snapshot ${reader.snapshotId().offset}-${reader.snapshotId().epoch}: " +
s"${loadResults}")
loadResults
} finally {
reader.close()
}
_publisher.foreach(publish(_, results.highestMetadataOffset))
_publisher.foreach(publish)
}
}
case class BatchLoadResults(numBatches: Int,
numRecords: Int,
elapsedUs: Long,
numBytes: Long,
highestMetadataOffset: Long) {
case class BatchLoadResults(numBatches: Int, numRecords: Int, elapsedUs: Long, numBytes: Long) {
override def toString(): String = {
s"${numBatches} batch(es) with ${numRecords} record(s) in ${numBytes} bytes " +
s"ending at offset ${highestMetadataOffset} in ${elapsedUs} microseconds"
}
}
private def loadBatches(delta: MetadataDelta,
iterator: util.Iterator[Batch[ApiMessageAndVersion]]): BatchLoadResults = {
/**
* Load and replay the batches to the metadata delta.
*
* When loading and replay a snapshot the appendTimestamp and snapshotId parameter should be provided.
* In a snapshot the append timestamp, offset and epoch reported by the batch is independent of the ones
* reported by the metadata log.
*
* @param delta metadata delta on which to replay the records
* @param iterator sequence of metadata record bacthes to replay
* @param lastAppendTimestamp optional append timestamp to use instead of the batches timestamp
* @param lastCommittedOffset optional offset to use instead of the batches offset
* @param lastCommittedEpoch optional epoch to use instead of the batches epoch
*/
private def loadBatches(
delta: MetadataDelta,
iterator: util.Iterator[Batch[ApiMessageAndVersion]],
lastAppendTimestamp: Option[Long],
lastCommittedOffset: Option[Long],
lastCommittedEpoch: Option[Int]
): BatchLoadResults = {
val startTimeNs = time.nanoseconds()
var numBatches = 0
var numRecords = 0
var batch: Batch[ApiMessageAndVersion] = null
var numBytes = 0L
while (iterator.hasNext()) {
batch = iterator.next()
val batch = iterator.next()
val epoch = lastCommittedEpoch.getOrElse(batch.epoch())
_highestTimestamp = lastAppendTimestamp.getOrElse(batch.appendTimestamp())
var index = 0
batch.records().forEach { messageAndVersion =>
if (isTraceEnabled) {
trace("Metadata batch %d: processing [%d/%d]: %s.".format(batch.lastOffset, index + 1,
batch.records().size(), messageAndVersion.message().toString()))
}
delta.replay(messageAndVersion.message())
_highestOffset = lastCommittedOffset.getOrElse(batch.baseOffset() + index)
delta.replay(highestMetadataOffset, epoch, messageAndVersion.message())
numRecords += 1
index += 1
}
@ -199,18 +217,11 @@ class BrokerMetadataListener(
metadataBatchSizeHist.update(batch.records().size())
numBatches = numBatches + 1
}
val newHighestMetadataOffset = if (batch == null) {
_highestMetadataOffset
} else {
_highestMetadataOffset = batch.lastOffset()
_highestEpoch = batch.epoch()
_highestTimestamp = batch.appendTimestamp()
batch.lastOffset()
}
val endTimeNs = time.nanoseconds()
val elapsedUs = TimeUnit.MICROSECONDS.convert(endTimeNs - startTimeNs, TimeUnit.NANOSECONDS)
batchProcessingTimeHist.update(elapsedUs)
BatchLoadResults(numBatches, numRecords, elapsedUs, numBytes, newHighestMetadataOffset)
BatchLoadResults(numBatches, numRecords, elapsedUs, numBytes)
}
def startPublishing(publisher: MetadataPublisher): CompletableFuture[Void] = {
@ -225,9 +236,9 @@ class BrokerMetadataListener(
override def run(): Unit = {
_publisher = Some(publisher)
log.info(s"Starting to publish metadata events at offset ${_highestMetadataOffset}.")
log.info(s"Starting to publish metadata events at offset ${highestMetadataOffset}.")
try {
publish(publisher, _highestMetadataOffset)
publish(publisher)
future.complete(null)
} catch {
case e: Throwable =>
@ -237,12 +248,11 @@ class BrokerMetadataListener(
}
}
private def publish(publisher: MetadataPublisher,
newHighestMetadataOffset: Long): Unit = {
private def publish(publisher: MetadataPublisher): Unit = {
val delta = _delta
_image = _delta.apply()
_delta = new MetadataDelta(_image)
publisher.publish(newHighestMetadataOffset, delta, _image)
publisher.publish(delta, _image)
}
override def handleLeaderChange(leaderAndEpoch: LeaderAndEpoch): Unit = {

View File

@ -113,34 +113,34 @@ class BrokerMetadataPublisher(conf: KafkaConfig,
*/
var _firstPublish = true
override def publish(newHighestMetadataOffset: Long,
delta: MetadataDelta,
newImage: MetadataImage): Unit = {
override def publish(delta: MetadataDelta, newImage: MetadataImage): Unit = {
val highestOffsetAndEpoch = newImage.highestOffsetAndEpoch()
try {
trace(s"Publishing delta $delta with highest offset $newHighestMetadataOffset")
trace(s"Publishing delta $delta with highest offset $highestOffsetAndEpoch")
// Publish the new metadata image to the metadata cache.
metadataCache.setImage(newImage)
if (_firstPublish) {
info(s"Publishing initial metadata at offset ${newHighestMetadataOffset}.")
info(s"Publishing initial metadata at offset $highestOffsetAndEpoch.")
// If this is the first metadata update we are applying, initialize the managers
// first (but after setting up the metadata cache).
initializeManagers()
} else if (isDebugEnabled) {
debug(s"Publishing metadata at offset ${newHighestMetadataOffset}.")
debug(s"Publishing metadata at offset $highestOffsetAndEpoch.")
}
// Apply feature deltas.
Option(delta.featuresDelta()).foreach { featuresDelta =>
featureCache.update(featuresDelta, newHighestMetadataOffset)
featureCache.update(featuresDelta, highestOffsetAndEpoch.offset)
}
// Apply topic deltas.
Option(delta.topicsDelta()).foreach { topicsDelta =>
// Notify the replica manager about changes to topics.
replicaManager.applyDelta(newImage, topicsDelta)
replicaManager.applyDelta(topicsDelta, newImage)
// Handle the case where the old consumer offsets topic was deleted.
if (topicsDelta.topicWasDeleted(Topic.GROUP_METADATA_TOPIC_NAME)) {
@ -233,7 +233,7 @@ class BrokerMetadataPublisher(conf: KafkaConfig,
finishInitializingReplicaManager(newImage)
}
} catch {
case t: Throwable => error(s"Error publishing broker metadata at ${newHighestMetadataOffset}", t)
case t: Throwable => error(s"Error publishing broker metadata at $highestOffsetAndEpoch", t)
throw t
} finally {
_firstPublish = false

View File

@ -52,18 +52,19 @@ class BrokerMetadataSnapshotter(
*/
val eventQueue = new KafkaEventQueue(time, logContext, threadNamePrefix.getOrElse(""))
override def maybeStartSnapshot(committedOffset: Long,
committedEpoch: Int,
lastContainedLogTime: Long,
image: MetadataImage): Boolean = synchronized {
override def maybeStartSnapshot(lastContainedLogTime: Long, image: MetadataImage): Boolean = synchronized {
if (_currentSnapshotOffset == -1L) {
val writer = writerBuilder.build(committedOffset, committedEpoch, lastContainedLogTime)
_currentSnapshotOffset = committedOffset
info(s"Creating a new snapshot at offset ${committedOffset}...")
val writer = writerBuilder.build(
image.highestOffsetAndEpoch().offset,
image.highestOffsetAndEpoch().epoch,
lastContainedLogTime
)
_currentSnapshotOffset = image.highestOffsetAndEpoch().offset
info(s"Creating a new snapshot at offset ${_currentSnapshotOffset}...")
eventQueue.append(new CreateSnapshotEvent(image, writer))
true
} else {
warn(s"Declining to create a new snapshot at offset ${committedOffset} because " +
warn(s"Declining to create a new snapshot at ${image.highestOffsetAndEpoch()} because " +
s"there is already a snapshot in progress at offset ${_currentSnapshotOffset}")
false
}
@ -89,7 +90,7 @@ class BrokerMetadataSnapshotter(
override def handleException(e: Throwable): Unit = {
e match {
case _: RejectedExecutionException =>
case _: RejectedExecutionException =>
info("Not processing CreateSnapshotEvent because the event queue is closed.")
case _ => error("Unexpected error handling CreateSnapshotEvent", e)
}

View File

@ -25,12 +25,9 @@ trait MetadataPublisher {
/**
* Publish a new metadata image.
*
* @param newHighestMetadataOffset The highest metadata offset contained within the image.
* @param delta The delta between the old image and the new one.
* @param newImage The new image, which is the result of applying the
* delta to the previous image.
* @param delta The delta between the old image and the new one.
* @param newImage The new image, which is the result of applying the
* delta to the previous image.
*/
def publish(newHighestMetadataOffset: Long,
delta: MetadataDelta,
newImage: MetadataImage): Unit
def publish(delta: MetadataDelta, newImage: MetadataImage): Unit
}

View File

@ -26,15 +26,10 @@ trait MetadataSnapshotter {
/**
* If there is no other snapshot being written out, start writing out a snapshot.
*
* @param committedOffset The highest metadata log offset of the snapshot.
* @param committedEpoch The highest metadata log epoch of the snapshot.
* @param lastContainedLogTime The highest time contained in the snapshot.
* @param image The metadata image to write out.
*
* @return True if we will write out a new snapshot; false otherwise.
*/
def maybeStartSnapshot(committedOffset: Long,
committedEpoch: Int,
lastContainedLogTime: Long,
image: MetadataImage): Boolean
def maybeStartSnapshot(lastContainedLogTime: Long, image: MetadataImage): Boolean
}

View File

@ -26,6 +26,7 @@ import org.apache.kafka.common.protocol.{ApiKeys, ApiMessage, Errors}
import org.apache.kafka.common.record.RecordBatch
import org.apache.kafka.common.requests.UpdateMetadataRequest
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.raft.{OffsetAndEpoch => RaftOffsetAndEpoch}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.MethodSource
@ -60,7 +61,9 @@ object MetadataCacheTest {
// a partial list of partitions. Therefore, base our delta off a partial image that
// contains no brokers, but which contains the previous partitions.
val image = c.currentImage()
val partialImage = new MetadataImage(image.features(), ClusterImage.EMPTY,
val partialImage = new MetadataImage(
new RaftOffsetAndEpoch(100, 10),
image.features(), ClusterImage.EMPTY,
image.topics(), image.configs(), image.clientQuotas())
val delta = new MetadataDelta(partialImage)
@ -89,7 +92,7 @@ object MetadataCacheTest {
setFenced(fenced)
}
request.liveBrokers().iterator().asScala.foreach { brokerInfo =>
delta.replay(toRecord(brokerInfo))
delta.replay(100, 10, toRecord(brokerInfo))
}
def toRecords(topic: UpdateMetadataTopicState): Seq[ApiMessage] = {
@ -114,7 +117,7 @@ object MetadataCacheTest {
results
}
request.topicStates().forEach { topic =>
toRecords(topic).foreach(delta.replay)
toRecords(topic).foreach(delta.replay(100, 10, _))
}
c.setImage(delta.apply())
}

View File

@ -341,14 +341,14 @@ class ReplicaManagerConcurrencyTest {
val delta = new MetadataDelta(latestImage)
topic.initialize(delta)
latestImage = delta.apply()
replicaManager.applyDelta(latestImage, delta.topicsDelta)
replicaManager.applyDelta(delta.topicsDelta, latestImage)
case AlterIsrEvent(future, topicPartition, leaderAndIsr) =>
val delta = new MetadataDelta(latestImage)
val updatedLeaderAndIsr = topic.alterIsr(topicPartition, leaderAndIsr, delta)
latestImage = delta.apply()
future.complete(updatedLeaderAndIsr)
replicaManager.applyDelta(latestImage, delta.topicsDelta)
replicaManager.applyDelta(delta.topicsDelta, latestImage)
case ShutdownEvent =>
}

View File

@ -52,6 +52,7 @@ import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.{IsolationLevel, Node, TopicPartition, Uuid}
import org.apache.kafka.image.{ClientQuotasImage, ClusterImageTest, ConfigurationsImage, FeaturesImage, MetadataImage, TopicsDelta, TopicsImage}
import org.apache.kafka.raft.{OffsetAndEpoch => RaftOffsetAndEpoch}
import org.easymock.EasyMock
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
@ -2933,7 +2934,7 @@ class ReplicaManagerTest {
// Make the local replica the leader
val leaderTopicsDelta = topicsCreateDelta(localId, true)
val leaderMetadataImage = imageFromTopics(leaderTopicsDelta.apply())
replicaManager.applyDelta(leaderMetadataImage, leaderTopicsDelta)
replicaManager.applyDelta(leaderTopicsDelta, leaderMetadataImage)
// Check the state of that partition and fetcher
val HostedPartition.Online(leaderPartition) = replicaManager.getPartition(topicPartition)
@ -2959,7 +2960,7 @@ class ReplicaManagerTest {
// Change the local replica to follower
val followerTopicsDelta = topicsChangeDelta(leaderMetadataImage.topics(), localId, false)
val followerMetadataImage = imageFromTopics(followerTopicsDelta.apply())
replicaManager.applyDelta(followerMetadataImage, followerTopicsDelta)
replicaManager.applyDelta(followerTopicsDelta, followerMetadataImage)
// Append on a follower should fail
val followerResponse = sendProducerAppend(replicaManager, topicPartition, numOfRecords)
@ -2991,7 +2992,7 @@ class ReplicaManagerTest {
// Make the local replica the follower
val followerTopicsDelta = topicsCreateDelta(localId, false)
val followerMetadataImage = imageFromTopics(followerTopicsDelta.apply())
replicaManager.applyDelta(followerMetadataImage, followerTopicsDelta)
replicaManager.applyDelta(followerTopicsDelta, followerMetadataImage)
// Check the state of that partition and fetcher
val HostedPartition.Online(followerPartition) = replicaManager.getPartition(topicPartition)
@ -3008,7 +3009,7 @@ class ReplicaManagerTest {
// Change the local replica to leader
val leaderTopicsDelta = topicsChangeDelta(followerMetadataImage.topics(), localId, true)
val leaderMetadataImage = imageFromTopics(leaderTopicsDelta.apply())
replicaManager.applyDelta(leaderMetadataImage, leaderTopicsDelta)
replicaManager.applyDelta(leaderTopicsDelta, leaderMetadataImage)
// Send a produce request and advance the highwatermark
val leaderResponse = sendProducerAppend(replicaManager, topicPartition, numOfRecords)
@ -3047,7 +3048,7 @@ class ReplicaManagerTest {
// Make the local replica the follower
val followerTopicsDelta = topicsCreateDelta(localId, false)
val followerMetadataImage = imageFromTopics(followerTopicsDelta.apply())
replicaManager.applyDelta(followerMetadataImage, followerTopicsDelta)
replicaManager.applyDelta(followerTopicsDelta, followerMetadataImage)
// Check the state of that partition and fetcher
val HostedPartition.Online(followerPartition) = replicaManager.getPartition(topicPartition)
@ -3058,7 +3059,7 @@ class ReplicaManagerTest {
assertEquals(Some(BrokerEndPoint(otherId, "localhost", 9093)), fetcher.map(_.sourceBroker))
// Apply the same delta again
replicaManager.applyDelta(followerMetadataImage, followerTopicsDelta)
replicaManager.applyDelta(followerTopicsDelta, followerMetadataImage)
// Check that the state stays the same
val HostedPartition.Online(noChangePartition) = replicaManager.getPartition(topicPartition)
@ -3085,7 +3086,7 @@ class ReplicaManagerTest {
// Make the local replica the follower
val followerTopicsDelta = topicsCreateDelta(localId, false)
val followerMetadataImage = imageFromTopics(followerTopicsDelta.apply())
replicaManager.applyDelta(followerMetadataImage, followerTopicsDelta)
replicaManager.applyDelta(followerTopicsDelta, followerMetadataImage)
// Check the state of that partition and fetcher
val HostedPartition.Online(followerPartition) = replicaManager.getPartition(topicPartition)
@ -3098,7 +3099,7 @@ class ReplicaManagerTest {
// Apply changes that remove replica
val notReplicaTopicsDelta = topicsChangeDelta(followerMetadataImage.topics(), otherId, true)
val notReplicaMetadataImage = imageFromTopics(notReplicaTopicsDelta.apply())
replicaManager.applyDelta(notReplicaMetadataImage, notReplicaTopicsDelta)
replicaManager.applyDelta(notReplicaTopicsDelta, notReplicaMetadataImage)
// Check that the partition was removed
assertEquals(HostedPartition.None, replicaManager.getPartition(topicPartition))
@ -3122,7 +3123,7 @@ class ReplicaManagerTest {
// Make the local replica the follower
val followerTopicsDelta = topicsCreateDelta(localId, false)
val followerMetadataImage = imageFromTopics(followerTopicsDelta.apply())
replicaManager.applyDelta(followerMetadataImage, followerTopicsDelta)
replicaManager.applyDelta(followerTopicsDelta, followerMetadataImage)
// Check the state of that partition and fetcher
val HostedPartition.Online(followerPartition) = replicaManager.getPartition(topicPartition)
@ -3135,7 +3136,7 @@ class ReplicaManagerTest {
// Apply changes that remove topic and replica
val removeTopicsDelta = topicsDeleteDelta(followerMetadataImage.topics())
val removeMetadataImage = imageFromTopics(removeTopicsDelta.apply())
replicaManager.applyDelta(removeMetadataImage, removeTopicsDelta)
replicaManager.applyDelta(removeTopicsDelta, removeMetadataImage)
// Check that the partition was removed
assertEquals(HostedPartition.None, replicaManager.getPartition(topicPartition))
@ -3159,7 +3160,7 @@ class ReplicaManagerTest {
// Make the local replica the follower
val leaderTopicsDelta = topicsCreateDelta(localId, true)
val leaderMetadataImage = imageFromTopics(leaderTopicsDelta.apply())
replicaManager.applyDelta(leaderMetadataImage, leaderTopicsDelta)
replicaManager.applyDelta(leaderTopicsDelta, leaderMetadataImage)
// Check the state of that partition and fetcher
val HostedPartition.Online(leaderPartition) = replicaManager.getPartition(topicPartition)
@ -3172,7 +3173,7 @@ class ReplicaManagerTest {
// Apply changes that remove replica
val notReplicaTopicsDelta = topicsChangeDelta(leaderMetadataImage.topics(), otherId, true)
val notReplicaMetadataImage = imageFromTopics(notReplicaTopicsDelta.apply())
replicaManager.applyDelta(notReplicaMetadataImage, notReplicaTopicsDelta)
replicaManager.applyDelta(notReplicaTopicsDelta, notReplicaMetadataImage)
// Check that the partition was removed
assertEquals(HostedPartition.None, replicaManager.getPartition(topicPartition))
@ -3196,7 +3197,7 @@ class ReplicaManagerTest {
// Make the local replica the follower
val leaderTopicsDelta = topicsCreateDelta(localId, true)
val leaderMetadataImage = imageFromTopics(leaderTopicsDelta.apply())
replicaManager.applyDelta(leaderMetadataImage, leaderTopicsDelta)
replicaManager.applyDelta(leaderTopicsDelta, leaderMetadataImage)
// Check the state of that partition and fetcher
val HostedPartition.Online(leaderPartition) = replicaManager.getPartition(topicPartition)
@ -3209,7 +3210,7 @@ class ReplicaManagerTest {
// Apply changes that remove topic and replica
val removeTopicsDelta = topicsDeleteDelta(leaderMetadataImage.topics())
val removeMetadataImage = imageFromTopics(removeTopicsDelta.apply())
replicaManager.applyDelta(removeMetadataImage, removeTopicsDelta)
replicaManager.applyDelta(removeTopicsDelta, removeMetadataImage)
// Check that the partition was removed
assertEquals(HostedPartition.None, replicaManager.getPartition(topicPartition))
@ -3234,7 +3235,7 @@ class ReplicaManagerTest {
// Make the local replica the leader
val leaderTopicsDelta = topicsCreateDelta(localId, true)
val leaderMetadataImage = imageFromTopics(leaderTopicsDelta.apply())
replicaManager.applyDelta(leaderMetadataImage, leaderTopicsDelta)
replicaManager.applyDelta(leaderTopicsDelta, leaderMetadataImage)
// Check the state of that partition and fetcher
val HostedPartition.Online(leaderPartition) = replicaManager.getPartition(topicPartition)
@ -3250,7 +3251,7 @@ class ReplicaManagerTest {
// Change the local replica to follower
val followerTopicsDelta = topicsChangeDelta(leaderMetadataImage.topics(), localId, false)
val followerMetadataImage = imageFromTopics(followerTopicsDelta.apply())
replicaManager.applyDelta(followerMetadataImage, followerTopicsDelta)
replicaManager.applyDelta(followerTopicsDelta, followerMetadataImage)
// Check that the produce failed because it changed to follower before replicating
assertEquals(Errors.NOT_LEADER_OR_FOLLOWER, leaderResponse.get.error)
@ -3272,7 +3273,7 @@ class ReplicaManagerTest {
// Make the local replica the leader
val leaderTopicsDelta = topicsCreateDelta(localId, true)
val leaderMetadataImage = imageFromTopics(leaderTopicsDelta.apply())
replicaManager.applyDelta(leaderMetadataImage, leaderTopicsDelta)
replicaManager.applyDelta(leaderTopicsDelta, leaderMetadataImage)
// Check the state of that partition and fetcher
val HostedPartition.Online(leaderPartition) = replicaManager.getPartition(topicPartition)
@ -3296,7 +3297,7 @@ class ReplicaManagerTest {
// Change the local replica to follower
val followerTopicsDelta = topicsChangeDelta(leaderMetadataImage.topics(), localId, false)
val followerMetadataImage = imageFromTopics(followerTopicsDelta.apply())
replicaManager.applyDelta(followerMetadataImage, followerTopicsDelta)
replicaManager.applyDelta(followerTopicsDelta, followerMetadataImage)
// Check that the produce failed because it changed to follower before replicating
assertEquals(Errors.NOT_LEADER_OR_FOLLOWER, fetchCallback.assertFired.error)
@ -3326,7 +3327,7 @@ class ReplicaManagerTest {
// Make the local replica the leader
val topicsDelta = topicsCreateDelta(localId, isStartIdLeader)
val leaderMetadataImage = imageFromTopics(topicsDelta.apply())
replicaManager.applyDelta(leaderMetadataImage, topicsDelta)
replicaManager.applyDelta(topicsDelta, leaderMetadataImage)
assertEquals(HostedPartition.Offline, replicaManager.getPartition(topicPartition))
} finally {
@ -3356,7 +3357,7 @@ class ReplicaManagerTest {
// Make the local replica the follower
var followerTopicsDelta = topicsCreateDelta(localId, false)
var followerMetadataImage = imageFromTopics(followerTopicsDelta.apply())
replicaManager.applyDelta(followerMetadataImage, followerTopicsDelta)
replicaManager.applyDelta(followerTopicsDelta, followerMetadataImage)
// Check the state of that partition
val HostedPartition.Online(followerPartition) = replicaManager.getPartition(topicPartition)
@ -3398,7 +3399,7 @@ class ReplicaManagerTest {
// Apply changes that bumps the leader epoch.
followerTopicsDelta = topicsChangeDelta(followerMetadataImage.topics(), localId, false)
followerMetadataImage = imageFromTopics(followerTopicsDelta.apply())
replicaManager.applyDelta(followerMetadataImage, followerTopicsDelta)
replicaManager.applyDelta(followerTopicsDelta, followerMetadataImage)
assertFalse(followerPartition.isLeader)
assertEquals(1, followerPartition.getLeaderEpoch)
@ -3465,6 +3466,7 @@ class ReplicaManagerTest {
private def imageFromTopics(topicsImage: TopicsImage): MetadataImage = {
new MetadataImage(
new RaftOffsetAndEpoch(100, 10),
FeaturesImage.EMPTY,
ClusterImageTest.IMAGE1,
topicsImage,

View File

@ -21,9 +21,9 @@ import java.util
import java.util.concurrent.atomic.AtomicReference
import java.util.{Collections, Optional}
import org.apache.kafka.common.{Endpoint, Uuid}
import org.apache.kafka.common.metadata.{PartitionChangeRecord, PartitionRecord, RegisterBrokerRecord, TopicRecord}
import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.{Endpoint, Uuid}
import org.apache.kafka.image.{MetadataDelta, MetadataImage}
import org.apache.kafka.metadata.{BrokerRegistration, RecordTestUtils, VersionRange}
import org.apache.kafka.server.common.ApiMessageAndVersion
@ -54,7 +54,7 @@ class BrokerMetadataListenerTest {
setIncarnationId(Uuid.fromString("GFBwlTcpQUuLYQ2ig05CSg")), 0.toShort))))
val imageRecords = listener.getImageRecords().get()
assertEquals(0, imageRecords.size())
assertEquals(100L, listener.highestMetadataOffset())
assertEquals(100L, listener.highestMetadataOffset)
listener.handleCommit(RecordTestUtils.mockBatchReader(200L,
util.Arrays.asList(new ApiMessageAndVersion(new RegisterBrokerRecord().
setBrokerId(1).
@ -63,10 +63,8 @@ class BrokerMetadataListenerTest {
setRack(null).
setIncarnationId(Uuid.fromString("QkOQtNKVTYatADcaJ28xDg")), 0.toShort))))
listener.startPublishing(new MetadataPublisher {
override def publish(newHighestMetadataOffset: Long,
delta: MetadataDelta,
newImage: MetadataImage): Unit = {
assertEquals(200L, newHighestMetadataOffset)
override def publish(delta: MetadataDelta, newImage: MetadataImage): Unit = {
assertEquals(200L, newImage.highestOffsetAndEpoch().offset)
assertEquals(new BrokerRegistration(0, 100L,
Uuid.fromString("GFBwlTcpQUuLYQ2ig05CSg"), Collections.emptyList[Endpoint](),
Collections.emptyMap[String, VersionRange](), Optional.empty[String](), false),
@ -90,20 +88,17 @@ class BrokerMetadataListenerTest {
var prevCommittedEpoch = -1
var prevLastContainedLogTime = -1L
override def maybeStartSnapshot(committedOffset: Long,
committedEpoch: Int,
lastContainedLogTime: Long,
newImage: MetadataImage): Boolean = {
override def maybeStartSnapshot(lastContainedLogTime: Long, newImage: MetadataImage): Boolean = {
try {
if (activeSnapshotOffset == -1L) {
assertTrue(prevCommittedOffset <= committedOffset)
assertTrue(prevCommittedEpoch <= committedEpoch)
assertTrue(prevCommittedOffset <= newImage.highestOffsetAndEpoch().offset)
assertTrue(prevCommittedEpoch <= newImage.highestOffsetAndEpoch().epoch)
assertTrue(prevLastContainedLogTime <= lastContainedLogTime)
prevCommittedOffset = committedOffset
prevCommittedEpoch = committedEpoch
prevCommittedOffset = newImage.highestOffsetAndEpoch().offset
prevCommittedEpoch = newImage.highestOffsetAndEpoch().epoch
prevLastContainedLogTime = lastContainedLogTime
image = newImage
activeSnapshotOffset = committedOffset
activeSnapshotOffset = newImage.highestOffsetAndEpoch().offset
true
} else {
false
@ -117,9 +112,7 @@ class BrokerMetadataListenerTest {
class MockMetadataPublisher extends MetadataPublisher {
var image = MetadataImage.EMPTY
override def publish(newHighestMetadataOffset: Long,
delta: MetadataDelta,
newImage: MetadataImage): Unit = {
override def publish(delta: MetadataDelta, newImage: MetadataImage): Unit = {
image = newImage
}
}
@ -152,10 +145,10 @@ class BrokerMetadataListenerTest {
registerBrokers(listener, brokerIds, endOffset = 100L)
createTopicWithOnePartition(listener, replicas = brokerIds, endOffset = 200L)
listener.getImageRecords().get()
assertEquals(200L, listener.highestMetadataOffset())
assertEquals(200L, listener.highestMetadataOffset)
generateManyRecords(listener, endOffset = 1000L)
assertEquals(1000L, listener.highestMetadataOffset())
assertEquals(1000L, listener.highestMetadataOffset)
} finally {
listener.close()
}
@ -171,7 +164,7 @@ class BrokerMetadataListenerTest {
registerBrokers(listener, brokerIds, endOffset = 100L)
createTopicWithOnePartition(listener, replicas = brokerIds, endOffset = 200L)
listener.getImageRecords().get()
assertEquals(200L, listener.highestMetadataOffset())
assertEquals(200L, listener.highestMetadataOffset)
// Check that we generate at least one snapshot once we see enough records.
assertEquals(-1L, snapshotter.prevCommittedOffset)

View File

@ -51,7 +51,11 @@ class BrokerMetadataSnapshotterTest {
lastContainedLogTime: Long): SnapshotWriter[ApiMessageAndVersion] = {
val offsetAndEpoch = new OffsetAndEpoch(committedOffset, committedEpoch)
SnapshotWriter.createWithHeader(
() => Optional.of(new MockRawSnapshotWriter(offsetAndEpoch, consumeSnapshotBuffer)),
() => {
Optional.of(
new MockRawSnapshotWriter(offsetAndEpoch, consumeSnapshotBuffer(committedOffset, committedEpoch))
)
},
1024,
MemoryPool.NONE,
Time.SYSTEM,
@ -61,7 +65,7 @@ class BrokerMetadataSnapshotterTest {
).get();
}
def consumeSnapshotBuffer(buffer: ByteBuffer): Unit = {
def consumeSnapshotBuffer(committedOffset: Long, committedEpoch: Int)(buffer: ByteBuffer): Unit = {
val delta = new MetadataDelta(MetadataImage.EMPTY)
val memoryRecords = MemoryRecords.readableRecords(buffer)
val batchIterator = memoryRecords.batchIterator()
@ -72,7 +76,7 @@ class BrokerMetadataSnapshotterTest {
val recordBuffer = record.value().duplicate()
val messageAndVersion = MetadataRecordSerde.INSTANCE.read(
new ByteBufferAccessor(recordBuffer), recordBuffer.remaining())
delta.replay(messageAndVersion.message())
delta.replay(committedOffset, committedEpoch, messageAndVersion.message())
})
}
}
@ -92,8 +96,8 @@ class BrokerMetadataSnapshotterTest {
try {
val blockingEvent = new BlockingEvent()
snapshotter.eventQueue.append(blockingEvent)
assertTrue(snapshotter.maybeStartSnapshot(123L, 12, 10000L, MetadataImageTest.IMAGE1))
assertFalse(snapshotter.maybeStartSnapshot(124L, 12, 11000L, MetadataImageTest.IMAGE2))
assertTrue(snapshotter.maybeStartSnapshot(10000L, MetadataImageTest.IMAGE1))
assertFalse(snapshotter.maybeStartSnapshot(11000L, MetadataImageTest.IMAGE2))
blockingEvent.latch.countDown()
assertEquals(MetadataImageTest.IMAGE1, writerBuilder.image.get())
} finally {

View File

@ -32,6 +32,7 @@ import org.apache.kafka.common.metadata.TopicRecord;
import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import java.util.Iterator;
@ -46,6 +47,10 @@ import java.util.List;
public final class MetadataDelta {
private final MetadataImage image;
private long highestOffset;
private int highestEpoch;
private FeaturesDelta featuresDelta = null;
private ClusterDelta clusterDelta = null;
@ -58,6 +63,8 @@ public final class MetadataDelta {
public MetadataDelta(MetadataImage image) {
this.image = image;
this.highestOffset = image.highestOffsetAndEpoch().offset;
this.highestEpoch = image.highestOffsetAndEpoch().epoch;
}
public MetadataImage image() {
@ -84,16 +91,19 @@ public final class MetadataDelta {
return clientQuotasDelta;
}
public void read(Iterator<List<ApiMessageAndVersion>> reader) {
public void read(long highestOffset, int highestEpoch, Iterator<List<ApiMessageAndVersion>> reader) {
while (reader.hasNext()) {
List<ApiMessageAndVersion> batch = reader.next();
for (ApiMessageAndVersion messageAndVersion : batch) {
replay(messageAndVersion.message());
replay(highestOffset, highestEpoch, messageAndVersion.message());
}
}
}
public void replay(ApiMessage record) {
public void replay(long offset, int epoch, ApiMessage record) {
highestOffset = offset;
highestEpoch = epoch;
MetadataRecordType type = MetadataRecordType.fromId(record.apiKey());
switch (type) {
case REGISTER_BROKER_RECORD:
@ -253,14 +263,22 @@ public final class MetadataDelta {
} else {
newClientQuotas = clientQuotasDelta.apply();
}
return new MetadataImage(newFeatures, newCluster, newTopics, newConfigs,
newClientQuotas);
return new MetadataImage(
new OffsetAndEpoch(highestOffset, highestEpoch),
newFeatures,
newCluster,
newTopics,
newConfigs,
newClientQuotas
);
}
@Override
public String toString() {
return "MetadataDelta(" +
"featuresDelta=" + featuresDelta +
"highestOffset=" + highestOffset +
", highestEpoch=" + highestEpoch +
", featuresDelta=" + featuresDelta +
", clusterDelta=" + clusterDelta +
", topicsDelta=" + topicsDelta +
", configsDelta=" + configsDelta +

View File

@ -17,6 +17,7 @@
package org.apache.kafka.image;
import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import java.util.List;
@ -31,12 +32,15 @@ import java.util.function.Consumer;
*/
public final class MetadataImage {
public final static MetadataImage EMPTY = new MetadataImage(
new OffsetAndEpoch(0, 0),
FeaturesImage.EMPTY,
ClusterImage.EMPTY,
TopicsImage.EMPTY,
ConfigurationsImage.EMPTY,
ClientQuotasImage.EMPTY);
private final OffsetAndEpoch highestOffsetAndEpoch;
private final FeaturesImage features;
private final ClusterImage cluster;
@ -47,11 +51,15 @@ public final class MetadataImage {
private final ClientQuotasImage clientQuotas;
public MetadataImage(FeaturesImage features,
ClusterImage cluster,
TopicsImage topics,
ConfigurationsImage configs,
ClientQuotasImage clientQuotas) {
public MetadataImage(
OffsetAndEpoch highestOffsetAndEpoch,
FeaturesImage features,
ClusterImage cluster,
TopicsImage topics,
ConfigurationsImage configs,
ClientQuotasImage clientQuotas
) {
this.highestOffsetAndEpoch = highestOffsetAndEpoch;
this.features = features;
this.cluster = cluster;
this.topics = topics;
@ -67,6 +75,10 @@ public final class MetadataImage {
clientQuotas.isEmpty();
}
public OffsetAndEpoch highestOffsetAndEpoch() {
return highestOffsetAndEpoch;
}
public FeaturesImage features() {
return features;
}
@ -99,7 +111,8 @@ public final class MetadataImage {
public boolean equals(Object o) {
if (!(o instanceof MetadataImage)) return false;
MetadataImage other = (MetadataImage) o;
return features.equals(other.features) &&
return highestOffsetAndEpoch.equals(other.highestOffsetAndEpoch) &&
features.equals(other.features) &&
cluster.equals(other.cluster) &&
topics.equals(other.topics) &&
configs.equals(other.configs) &&
@ -108,12 +121,13 @@ public final class MetadataImage {
@Override
public int hashCode() {
return Objects.hash(features, cluster, topics, configs, clientQuotas);
return Objects.hash(highestOffsetAndEpoch, features, cluster, topics, configs, clientQuotas);
}
@Override
public String toString() {
return "MetadataImage(features=" + features +
return "MetadataImage(highestOffsetAndEpoch=" + highestOffsetAndEpoch +
", features=" + features +
", cluster=" + cluster +
", topics=" + topics +
", configs=" + configs +

View File

@ -18,6 +18,7 @@
package org.apache.kafka.image;
import org.apache.kafka.metadata.RecordTestUtils;
import org.apache.kafka.raft.OffsetAndEpoch;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
@ -33,20 +34,24 @@ public class MetadataImageTest {
public final static MetadataImage IMAGE2;
static {
IMAGE1 = new MetadataImage(FeaturesImageTest.IMAGE1,
IMAGE1 = new MetadataImage(
new OffsetAndEpoch(100, 4),
FeaturesImageTest.IMAGE1,
ClusterImageTest.IMAGE1,
TopicsImageTest.IMAGE1,
ConfigurationsImageTest.IMAGE1,
ClientQuotasImageTest.IMAGE1);
DELTA1 = new MetadataDelta(IMAGE1);
RecordTestUtils.replayAll(DELTA1, FeaturesImageTest.DELTA1_RECORDS);
RecordTestUtils.replayAll(DELTA1, ClusterImageTest.DELTA1_RECORDS);
RecordTestUtils.replayAll(DELTA1, TopicsImageTest.DELTA1_RECORDS);
RecordTestUtils.replayAll(DELTA1, ConfigurationsImageTest.DELTA1_RECORDS);
RecordTestUtils.replayAll(DELTA1, ClientQuotasImageTest.DELTA1_RECORDS);
RecordTestUtils.replayAll(DELTA1, 200, 5, FeaturesImageTest.DELTA1_RECORDS);
RecordTestUtils.replayAll(DELTA1, 200, 5, ClusterImageTest.DELTA1_RECORDS);
RecordTestUtils.replayAll(DELTA1, 200, 5, TopicsImageTest.DELTA1_RECORDS);
RecordTestUtils.replayAll(DELTA1, 200, 5, ConfigurationsImageTest.DELTA1_RECORDS);
RecordTestUtils.replayAll(DELTA1, 200, 5, ClientQuotasImageTest.DELTA1_RECORDS);
IMAGE2 = new MetadataImage(FeaturesImageTest.IMAGE2,
IMAGE2 = new MetadataImage(
new OffsetAndEpoch(200, 5),
FeaturesImageTest.IMAGE2,
ClusterImageTest.IMAGE2,
TopicsImageTest.IMAGE2,
ConfigurationsImageTest.IMAGE2,
@ -77,7 +82,8 @@ public class MetadataImageTest {
MockSnapshotConsumer writer = new MockSnapshotConsumer();
image.write(writer);
MetadataDelta delta = new MetadataDelta(MetadataImage.EMPTY);
RecordTestUtils.replayAllBatches(delta, writer.batches());
RecordTestUtils.replayAllBatches(
delta, image.highestOffsetAndEpoch().offset, image.highestOffsetAndEpoch().epoch, writer.batches());
MetadataImage nextImage = delta.apply();
assertEquals(image, nextImage);
}

View File

@ -21,6 +21,7 @@ import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.protocol.Message;
import org.apache.kafka.common.protocol.ObjectSerializationCache;
import org.apache.kafka.common.utils.ImplicitLinkedHashCollection;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.raft.Batch;
import org.apache.kafka.raft.BatchReader;
import org.apache.kafka.raft.internals.MemoryBatchReader;
@ -48,7 +49,7 @@ public class RecordTestUtils {
* Replay a list of records.
*
* @param target The object to invoke the replay function on.
* @param recordsAndVersions A list of batches of records.
* @param recordsAndVersions A list of records.
*/
public static void replayAll(Object target,
List<ApiMessageAndVersion> recordsAndVersions) {
@ -67,6 +68,26 @@ public class RecordTestUtils {
}
}
/**
* Replay a list of records to the metadata delta.
*
* @param delta the metadata delta on which to replay the records
* @param highestOffset highest offset from the list of records
* @param highestEpoch highest epoch from the list of records
* @param recordsAndVersions list of records
*/
public static void replayAll(
MetadataDelta delta,
long highestOffset,
int highestEpoch,
List<ApiMessageAndVersion> recordsAndVersions
) {
for (ApiMessageAndVersion recordAndVersion : recordsAndVersions) {
ApiMessage record = recordAndVersion.message();
delta.replay(highestOffset, highestEpoch, record);
}
}
/**
* Replay a list of record batches.
*
@ -80,6 +101,25 @@ public class RecordTestUtils {
}
}
/**
* Replay a list of record batches to the metadata delta.
*
* @param delta the metadata delta on which to replay the records
* @param highestOffset highest offset from the list of record batches
* @param highestEpoch highest epoch from the list of record batches
* @param recordsAndVersions list of batches of records
*/
public static void replayAllBatches(
MetadataDelta delta,
long highestOffset,
int highestEpoch,
List<List<ApiMessageAndVersion>> batches
) {
for (List<ApiMessageAndVersion> batch : batches) {
replayAll(delta, highestOffset, highestEpoch, batch);
}
}
/**
* Materialize the output of an iterator into a set.
*