diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 89d3484cbca..88538d0b8cf 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -237,6 +237,7 @@
+
@@ -249,6 +250,7 @@
+
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala
index 59e267db0c3..87477a11a60 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -331,7 +331,7 @@ class BrokerServer(
setPort(if (ep.port == 0) socketServer.boundPort(ep.listenerName) else ep.port).
setSecurityProtocol(ep.securityProtocol.id))
}
- lifecycleManager.start(() => metadataListener.highestMetadataOffset(),
+ lifecycleManager.start(() => metadataListener.highestMetadataOffset,
BrokerToControllerChannelManager(controllerNodeProvider, time, metrics, config,
"heartbeat", threadNamePrefix, config.brokerSessionTimeoutMs.toLong),
metaProps.clusterId, networkListeners, supportedFeatures)
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 0ef0f3aa529..57a20d8ba89 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -2104,10 +2104,10 @@ class ReplicaManager(val config: KafkaConfig,
/**
* Apply a KRaft topic change delta.
*
- * @param newImage The new metadata image.
* @param delta The delta to apply.
+ * @param newImage The new metadata image.
*/
- def applyDelta(newImage: MetadataImage, delta: TopicsDelta): Unit = {
+ def applyDelta(delta: TopicsDelta, newImage: MetadataImage): Unit = {
// Before taking the lock, compute the local changes
val localChanges = delta.localChanges(config.nodeId)
diff --git a/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala b/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
index c4c027a82a0..702d227a30a 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
@@ -58,12 +58,7 @@ class BrokerMetadataListener(
/**
* The highest metadata offset that we've seen. Written only from the event queue thread.
*/
- @volatile var _highestMetadataOffset = -1L
-
- /**
- * The highest metadata log epoch that we've seen. Written only from the event queue thread.
- */
- private var _highestEpoch = -1
+ @volatile var _highestOffset = -1L
/**
* The highest metadata log time that we've seen. Written only from the event queue thread.
@@ -101,7 +96,7 @@ class BrokerMetadataListener(
/**
* Returns the highest metadata-offset. Thread-safe.
*/
- def highestMetadataOffset(): Long = _highestMetadataOffset
+ def highestMetadataOffset: Long = _highestOffset
/**
* Handle new metadata records.
@@ -113,7 +108,7 @@ class BrokerMetadataListener(
extends EventQueue.FailureLoggingEvent(log) {
override def run(): Unit = {
val results = try {
- val loadResults = loadBatches(_delta, reader)
+ val loadResults = loadBatches(_delta, reader, None, None, None)
if (isDebugEnabled) {
debug(s"Loaded new commits: ${loadResults}")
}
@@ -121,15 +116,12 @@ class BrokerMetadataListener(
} finally {
reader.close()
}
- _publisher.foreach(publish(_, results.highestMetadataOffset))
+ _publisher.foreach(publish)
snapshotter.foreach { snapshotter =>
_bytesSinceLastSnapshot = _bytesSinceLastSnapshot + results.numBytes
if (shouldSnapshot()) {
- if (snapshotter.maybeStartSnapshot(results.highestMetadataOffset,
- _highestEpoch,
- _highestTimestamp,
- _delta.apply())) {
+ if (snapshotter.maybeStartSnapshot(_highestTimestamp, _delta.apply())) {
_bytesSinceLastSnapshot = 0L
}
}
@@ -150,48 +142,74 @@ class BrokerMetadataListener(
class HandleSnapshotEvent(reader: SnapshotReader[ApiMessageAndVersion])
extends EventQueue.FailureLoggingEvent(log) {
override def run(): Unit = {
- val results = try {
+ try {
info(s"Loading snapshot ${reader.snapshotId().offset}-${reader.snapshotId().epoch}.")
_delta = new MetadataDelta(_image) // Discard any previous deltas.
- val loadResults = loadBatches(_delta, reader)
+ val loadResults = loadBatches(
+ _delta,
+ reader,
+ Some(reader.lastContainedLogTimestamp),
+ Some(reader.lastContainedLogOffset),
+ Some(reader.lastContainedLogEpoch)
+ )
_delta.finishSnapshot()
info(s"Loaded snapshot ${reader.snapshotId().offset}-${reader.snapshotId().epoch}: " +
s"${loadResults}")
- loadResults
} finally {
reader.close()
}
- _publisher.foreach(publish(_, results.highestMetadataOffset))
+ _publisher.foreach(publish)
}
}
- case class BatchLoadResults(numBatches: Int,
- numRecords: Int,
- elapsedUs: Long,
- numBytes: Long,
- highestMetadataOffset: Long) {
+ case class BatchLoadResults(numBatches: Int, numRecords: Int, elapsedUs: Long, numBytes: Long) {
override def toString(): String = {
s"${numBatches} batch(es) with ${numRecords} record(s) in ${numBytes} bytes " +
s"ending at offset ${highestMetadataOffset} in ${elapsedUs} microseconds"
}
}
- private def loadBatches(delta: MetadataDelta,
- iterator: util.Iterator[Batch[ApiMessageAndVersion]]): BatchLoadResults = {
+ /**
+ * Load and replay the batches to the metadata delta.
+ *
+ * When loading and replay a snapshot the appendTimestamp and snapshotId parameter should be provided.
+ * In a snapshot the append timestamp, offset and epoch reported by the batch is independent of the ones
+ * reported by the metadata log.
+ *
+ * @param delta metadata delta on which to replay the records
+ * @param iterator sequence of metadata record bacthes to replay
+ * @param lastAppendTimestamp optional append timestamp to use instead of the batches timestamp
+ * @param lastCommittedOffset optional offset to use instead of the batches offset
+ * @param lastCommittedEpoch optional epoch to use instead of the batches epoch
+ */
+ private def loadBatches(
+ delta: MetadataDelta,
+ iterator: util.Iterator[Batch[ApiMessageAndVersion]],
+ lastAppendTimestamp: Option[Long],
+ lastCommittedOffset: Option[Long],
+ lastCommittedEpoch: Option[Int]
+ ): BatchLoadResults = {
val startTimeNs = time.nanoseconds()
var numBatches = 0
var numRecords = 0
- var batch: Batch[ApiMessageAndVersion] = null
var numBytes = 0L
+
while (iterator.hasNext()) {
- batch = iterator.next()
+ val batch = iterator.next()
+
+ val epoch = lastCommittedEpoch.getOrElse(batch.epoch())
+ _highestTimestamp = lastAppendTimestamp.getOrElse(batch.appendTimestamp())
+
var index = 0
batch.records().forEach { messageAndVersion =>
if (isTraceEnabled) {
trace("Metadata batch %d: processing [%d/%d]: %s.".format(batch.lastOffset, index + 1,
batch.records().size(), messageAndVersion.message().toString()))
}
- delta.replay(messageAndVersion.message())
+
+ _highestOffset = lastCommittedOffset.getOrElse(batch.baseOffset() + index)
+
+ delta.replay(highestMetadataOffset, epoch, messageAndVersion.message())
numRecords += 1
index += 1
}
@@ -199,18 +217,11 @@ class BrokerMetadataListener(
metadataBatchSizeHist.update(batch.records().size())
numBatches = numBatches + 1
}
- val newHighestMetadataOffset = if (batch == null) {
- _highestMetadataOffset
- } else {
- _highestMetadataOffset = batch.lastOffset()
- _highestEpoch = batch.epoch()
- _highestTimestamp = batch.appendTimestamp()
- batch.lastOffset()
- }
+
val endTimeNs = time.nanoseconds()
val elapsedUs = TimeUnit.MICROSECONDS.convert(endTimeNs - startTimeNs, TimeUnit.NANOSECONDS)
batchProcessingTimeHist.update(elapsedUs)
- BatchLoadResults(numBatches, numRecords, elapsedUs, numBytes, newHighestMetadataOffset)
+ BatchLoadResults(numBatches, numRecords, elapsedUs, numBytes)
}
def startPublishing(publisher: MetadataPublisher): CompletableFuture[Void] = {
@@ -225,9 +236,9 @@ class BrokerMetadataListener(
override def run(): Unit = {
_publisher = Some(publisher)
- log.info(s"Starting to publish metadata events at offset ${_highestMetadataOffset}.")
+ log.info(s"Starting to publish metadata events at offset ${highestMetadataOffset}.")
try {
- publish(publisher, _highestMetadataOffset)
+ publish(publisher)
future.complete(null)
} catch {
case e: Throwable =>
@@ -237,12 +248,11 @@ class BrokerMetadataListener(
}
}
- private def publish(publisher: MetadataPublisher,
- newHighestMetadataOffset: Long): Unit = {
+ private def publish(publisher: MetadataPublisher): Unit = {
val delta = _delta
_image = _delta.apply()
_delta = new MetadataDelta(_image)
- publisher.publish(newHighestMetadataOffset, delta, _image)
+ publisher.publish(delta, _image)
}
override def handleLeaderChange(leaderAndEpoch: LeaderAndEpoch): Unit = {
diff --git a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
index cedef47fa5d..8cdcb3d7f57 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
@@ -113,34 +113,34 @@ class BrokerMetadataPublisher(conf: KafkaConfig,
*/
var _firstPublish = true
- override def publish(newHighestMetadataOffset: Long,
- delta: MetadataDelta,
- newImage: MetadataImage): Unit = {
+ override def publish(delta: MetadataDelta, newImage: MetadataImage): Unit = {
+ val highestOffsetAndEpoch = newImage.highestOffsetAndEpoch()
+
try {
- trace(s"Publishing delta $delta with highest offset $newHighestMetadataOffset")
+ trace(s"Publishing delta $delta with highest offset $highestOffsetAndEpoch")
// Publish the new metadata image to the metadata cache.
metadataCache.setImage(newImage)
if (_firstPublish) {
- info(s"Publishing initial metadata at offset ${newHighestMetadataOffset}.")
+ info(s"Publishing initial metadata at offset $highestOffsetAndEpoch.")
// If this is the first metadata update we are applying, initialize the managers
// first (but after setting up the metadata cache).
initializeManagers()
} else if (isDebugEnabled) {
- debug(s"Publishing metadata at offset ${newHighestMetadataOffset}.")
+ debug(s"Publishing metadata at offset $highestOffsetAndEpoch.")
}
// Apply feature deltas.
Option(delta.featuresDelta()).foreach { featuresDelta =>
- featureCache.update(featuresDelta, newHighestMetadataOffset)
+ featureCache.update(featuresDelta, highestOffsetAndEpoch.offset)
}
// Apply topic deltas.
Option(delta.topicsDelta()).foreach { topicsDelta =>
// Notify the replica manager about changes to topics.
- replicaManager.applyDelta(newImage, topicsDelta)
+ replicaManager.applyDelta(topicsDelta, newImage)
// Handle the case where the old consumer offsets topic was deleted.
if (topicsDelta.topicWasDeleted(Topic.GROUP_METADATA_TOPIC_NAME)) {
@@ -233,7 +233,7 @@ class BrokerMetadataPublisher(conf: KafkaConfig,
finishInitializingReplicaManager(newImage)
}
} catch {
- case t: Throwable => error(s"Error publishing broker metadata at ${newHighestMetadataOffset}", t)
+ case t: Throwable => error(s"Error publishing broker metadata at $highestOffsetAndEpoch", t)
throw t
} finally {
_firstPublish = false
diff --git a/core/src/main/scala/kafka/server/metadata/BrokerMetadataSnapshotter.scala b/core/src/main/scala/kafka/server/metadata/BrokerMetadataSnapshotter.scala
index db41e0cc9ab..fb5bfbbd81c 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataSnapshotter.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataSnapshotter.scala
@@ -52,18 +52,19 @@ class BrokerMetadataSnapshotter(
*/
val eventQueue = new KafkaEventQueue(time, logContext, threadNamePrefix.getOrElse(""))
- override def maybeStartSnapshot(committedOffset: Long,
- committedEpoch: Int,
- lastContainedLogTime: Long,
- image: MetadataImage): Boolean = synchronized {
+ override def maybeStartSnapshot(lastContainedLogTime: Long, image: MetadataImage): Boolean = synchronized {
if (_currentSnapshotOffset == -1L) {
- val writer = writerBuilder.build(committedOffset, committedEpoch, lastContainedLogTime)
- _currentSnapshotOffset = committedOffset
- info(s"Creating a new snapshot at offset ${committedOffset}...")
+ val writer = writerBuilder.build(
+ image.highestOffsetAndEpoch().offset,
+ image.highestOffsetAndEpoch().epoch,
+ lastContainedLogTime
+ )
+ _currentSnapshotOffset = image.highestOffsetAndEpoch().offset
+ info(s"Creating a new snapshot at offset ${_currentSnapshotOffset}...")
eventQueue.append(new CreateSnapshotEvent(image, writer))
true
} else {
- warn(s"Declining to create a new snapshot at offset ${committedOffset} because " +
+ warn(s"Declining to create a new snapshot at ${image.highestOffsetAndEpoch()} because " +
s"there is already a snapshot in progress at offset ${_currentSnapshotOffset}")
false
}
@@ -89,7 +90,7 @@ class BrokerMetadataSnapshotter(
override def handleException(e: Throwable): Unit = {
e match {
- case _: RejectedExecutionException =>
+ case _: RejectedExecutionException =>
info("Not processing CreateSnapshotEvent because the event queue is closed.")
case _ => error("Unexpected error handling CreateSnapshotEvent", e)
}
diff --git a/core/src/main/scala/kafka/server/metadata/MetadataPublisher.scala b/core/src/main/scala/kafka/server/metadata/MetadataPublisher.scala
index 2962c947615..104d164d9c5 100644
--- a/core/src/main/scala/kafka/server/metadata/MetadataPublisher.scala
+++ b/core/src/main/scala/kafka/server/metadata/MetadataPublisher.scala
@@ -25,12 +25,9 @@ trait MetadataPublisher {
/**
* Publish a new metadata image.
*
- * @param newHighestMetadataOffset The highest metadata offset contained within the image.
- * @param delta The delta between the old image and the new one.
- * @param newImage The new image, which is the result of applying the
- * delta to the previous image.
+ * @param delta The delta between the old image and the new one.
+ * @param newImage The new image, which is the result of applying the
+ * delta to the previous image.
*/
- def publish(newHighestMetadataOffset: Long,
- delta: MetadataDelta,
- newImage: MetadataImage): Unit
+ def publish(delta: MetadataDelta, newImage: MetadataImage): Unit
}
diff --git a/core/src/main/scala/kafka/server/metadata/MetadataSnapshotter.scala b/core/src/main/scala/kafka/server/metadata/MetadataSnapshotter.scala
index 9b377a66cc2..c9d72923c88 100644
--- a/core/src/main/scala/kafka/server/metadata/MetadataSnapshotter.scala
+++ b/core/src/main/scala/kafka/server/metadata/MetadataSnapshotter.scala
@@ -26,15 +26,10 @@ trait MetadataSnapshotter {
/**
* If there is no other snapshot being written out, start writing out a snapshot.
*
- * @param committedOffset The highest metadata log offset of the snapshot.
- * @param committedEpoch The highest metadata log epoch of the snapshot.
* @param lastContainedLogTime The highest time contained in the snapshot.
* @param image The metadata image to write out.
*
* @return True if we will write out a new snapshot; false otherwise.
*/
- def maybeStartSnapshot(committedOffset: Long,
- committedEpoch: Int,
- lastContainedLogTime: Long,
- image: MetadataImage): Boolean
+ def maybeStartSnapshot(lastContainedLogTime: Long, image: MetadataImage): Boolean
}
diff --git a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
index 67ea9e95260..02ce54d8e2e 100644
--- a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
+++ b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
@@ -26,6 +26,7 @@ import org.apache.kafka.common.protocol.{ApiKeys, ApiMessage, Errors}
import org.apache.kafka.common.record.RecordBatch
import org.apache.kafka.common.requests.UpdateMetadataRequest
import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.apache.kafka.raft.{OffsetAndEpoch => RaftOffsetAndEpoch}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.MethodSource
@@ -60,7 +61,9 @@ object MetadataCacheTest {
// a partial list of partitions. Therefore, base our delta off a partial image that
// contains no brokers, but which contains the previous partitions.
val image = c.currentImage()
- val partialImage = new MetadataImage(image.features(), ClusterImage.EMPTY,
+ val partialImage = new MetadataImage(
+ new RaftOffsetAndEpoch(100, 10),
+ image.features(), ClusterImage.EMPTY,
image.topics(), image.configs(), image.clientQuotas())
val delta = new MetadataDelta(partialImage)
@@ -89,7 +92,7 @@ object MetadataCacheTest {
setFenced(fenced)
}
request.liveBrokers().iterator().asScala.foreach { brokerInfo =>
- delta.replay(toRecord(brokerInfo))
+ delta.replay(100, 10, toRecord(brokerInfo))
}
def toRecords(topic: UpdateMetadataTopicState): Seq[ApiMessage] = {
@@ -114,7 +117,7 @@ object MetadataCacheTest {
results
}
request.topicStates().forEach { topic =>
- toRecords(topic).foreach(delta.replay)
+ toRecords(topic).foreach(delta.replay(100, 10, _))
}
c.setImage(delta.apply())
}
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
index 1e35ff88d53..714920d34b0 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
@@ -341,14 +341,14 @@ class ReplicaManagerConcurrencyTest {
val delta = new MetadataDelta(latestImage)
topic.initialize(delta)
latestImage = delta.apply()
- replicaManager.applyDelta(latestImage, delta.topicsDelta)
+ replicaManager.applyDelta(delta.topicsDelta, latestImage)
case AlterIsrEvent(future, topicPartition, leaderAndIsr) =>
val delta = new MetadataDelta(latestImage)
val updatedLeaderAndIsr = topic.alterIsr(topicPartition, leaderAndIsr, delta)
latestImage = delta.apply()
future.complete(updatedLeaderAndIsr)
- replicaManager.applyDelta(latestImage, delta.topicsDelta)
+ replicaManager.applyDelta(delta.topicsDelta, latestImage)
case ShutdownEvent =>
}
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 8819c16fe72..046e8fbeea7 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -52,6 +52,7 @@ import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.{IsolationLevel, Node, TopicPartition, Uuid}
import org.apache.kafka.image.{ClientQuotasImage, ClusterImageTest, ConfigurationsImage, FeaturesImage, MetadataImage, TopicsDelta, TopicsImage}
+import org.apache.kafka.raft.{OffsetAndEpoch => RaftOffsetAndEpoch}
import org.easymock.EasyMock
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
@@ -2933,7 +2934,7 @@ class ReplicaManagerTest {
// Make the local replica the leader
val leaderTopicsDelta = topicsCreateDelta(localId, true)
val leaderMetadataImage = imageFromTopics(leaderTopicsDelta.apply())
- replicaManager.applyDelta(leaderMetadataImage, leaderTopicsDelta)
+ replicaManager.applyDelta(leaderTopicsDelta, leaderMetadataImage)
// Check the state of that partition and fetcher
val HostedPartition.Online(leaderPartition) = replicaManager.getPartition(topicPartition)
@@ -2959,7 +2960,7 @@ class ReplicaManagerTest {
// Change the local replica to follower
val followerTopicsDelta = topicsChangeDelta(leaderMetadataImage.topics(), localId, false)
val followerMetadataImage = imageFromTopics(followerTopicsDelta.apply())
- replicaManager.applyDelta(followerMetadataImage, followerTopicsDelta)
+ replicaManager.applyDelta(followerTopicsDelta, followerMetadataImage)
// Append on a follower should fail
val followerResponse = sendProducerAppend(replicaManager, topicPartition, numOfRecords)
@@ -2991,7 +2992,7 @@ class ReplicaManagerTest {
// Make the local replica the follower
val followerTopicsDelta = topicsCreateDelta(localId, false)
val followerMetadataImage = imageFromTopics(followerTopicsDelta.apply())
- replicaManager.applyDelta(followerMetadataImage, followerTopicsDelta)
+ replicaManager.applyDelta(followerTopicsDelta, followerMetadataImage)
// Check the state of that partition and fetcher
val HostedPartition.Online(followerPartition) = replicaManager.getPartition(topicPartition)
@@ -3008,7 +3009,7 @@ class ReplicaManagerTest {
// Change the local replica to leader
val leaderTopicsDelta = topicsChangeDelta(followerMetadataImage.topics(), localId, true)
val leaderMetadataImage = imageFromTopics(leaderTopicsDelta.apply())
- replicaManager.applyDelta(leaderMetadataImage, leaderTopicsDelta)
+ replicaManager.applyDelta(leaderTopicsDelta, leaderMetadataImage)
// Send a produce request and advance the highwatermark
val leaderResponse = sendProducerAppend(replicaManager, topicPartition, numOfRecords)
@@ -3047,7 +3048,7 @@ class ReplicaManagerTest {
// Make the local replica the follower
val followerTopicsDelta = topicsCreateDelta(localId, false)
val followerMetadataImage = imageFromTopics(followerTopicsDelta.apply())
- replicaManager.applyDelta(followerMetadataImage, followerTopicsDelta)
+ replicaManager.applyDelta(followerTopicsDelta, followerMetadataImage)
// Check the state of that partition and fetcher
val HostedPartition.Online(followerPartition) = replicaManager.getPartition(topicPartition)
@@ -3058,7 +3059,7 @@ class ReplicaManagerTest {
assertEquals(Some(BrokerEndPoint(otherId, "localhost", 9093)), fetcher.map(_.sourceBroker))
// Apply the same delta again
- replicaManager.applyDelta(followerMetadataImage, followerTopicsDelta)
+ replicaManager.applyDelta(followerTopicsDelta, followerMetadataImage)
// Check that the state stays the same
val HostedPartition.Online(noChangePartition) = replicaManager.getPartition(topicPartition)
@@ -3085,7 +3086,7 @@ class ReplicaManagerTest {
// Make the local replica the follower
val followerTopicsDelta = topicsCreateDelta(localId, false)
val followerMetadataImage = imageFromTopics(followerTopicsDelta.apply())
- replicaManager.applyDelta(followerMetadataImage, followerTopicsDelta)
+ replicaManager.applyDelta(followerTopicsDelta, followerMetadataImage)
// Check the state of that partition and fetcher
val HostedPartition.Online(followerPartition) = replicaManager.getPartition(topicPartition)
@@ -3098,7 +3099,7 @@ class ReplicaManagerTest {
// Apply changes that remove replica
val notReplicaTopicsDelta = topicsChangeDelta(followerMetadataImage.topics(), otherId, true)
val notReplicaMetadataImage = imageFromTopics(notReplicaTopicsDelta.apply())
- replicaManager.applyDelta(notReplicaMetadataImage, notReplicaTopicsDelta)
+ replicaManager.applyDelta(notReplicaTopicsDelta, notReplicaMetadataImage)
// Check that the partition was removed
assertEquals(HostedPartition.None, replicaManager.getPartition(topicPartition))
@@ -3122,7 +3123,7 @@ class ReplicaManagerTest {
// Make the local replica the follower
val followerTopicsDelta = topicsCreateDelta(localId, false)
val followerMetadataImage = imageFromTopics(followerTopicsDelta.apply())
- replicaManager.applyDelta(followerMetadataImage, followerTopicsDelta)
+ replicaManager.applyDelta(followerTopicsDelta, followerMetadataImage)
// Check the state of that partition and fetcher
val HostedPartition.Online(followerPartition) = replicaManager.getPartition(topicPartition)
@@ -3135,7 +3136,7 @@ class ReplicaManagerTest {
// Apply changes that remove topic and replica
val removeTopicsDelta = topicsDeleteDelta(followerMetadataImage.topics())
val removeMetadataImage = imageFromTopics(removeTopicsDelta.apply())
- replicaManager.applyDelta(removeMetadataImage, removeTopicsDelta)
+ replicaManager.applyDelta(removeTopicsDelta, removeMetadataImage)
// Check that the partition was removed
assertEquals(HostedPartition.None, replicaManager.getPartition(topicPartition))
@@ -3159,7 +3160,7 @@ class ReplicaManagerTest {
// Make the local replica the follower
val leaderTopicsDelta = topicsCreateDelta(localId, true)
val leaderMetadataImage = imageFromTopics(leaderTopicsDelta.apply())
- replicaManager.applyDelta(leaderMetadataImage, leaderTopicsDelta)
+ replicaManager.applyDelta(leaderTopicsDelta, leaderMetadataImage)
// Check the state of that partition and fetcher
val HostedPartition.Online(leaderPartition) = replicaManager.getPartition(topicPartition)
@@ -3172,7 +3173,7 @@ class ReplicaManagerTest {
// Apply changes that remove replica
val notReplicaTopicsDelta = topicsChangeDelta(leaderMetadataImage.topics(), otherId, true)
val notReplicaMetadataImage = imageFromTopics(notReplicaTopicsDelta.apply())
- replicaManager.applyDelta(notReplicaMetadataImage, notReplicaTopicsDelta)
+ replicaManager.applyDelta(notReplicaTopicsDelta, notReplicaMetadataImage)
// Check that the partition was removed
assertEquals(HostedPartition.None, replicaManager.getPartition(topicPartition))
@@ -3196,7 +3197,7 @@ class ReplicaManagerTest {
// Make the local replica the follower
val leaderTopicsDelta = topicsCreateDelta(localId, true)
val leaderMetadataImage = imageFromTopics(leaderTopicsDelta.apply())
- replicaManager.applyDelta(leaderMetadataImage, leaderTopicsDelta)
+ replicaManager.applyDelta(leaderTopicsDelta, leaderMetadataImage)
// Check the state of that partition and fetcher
val HostedPartition.Online(leaderPartition) = replicaManager.getPartition(topicPartition)
@@ -3209,7 +3210,7 @@ class ReplicaManagerTest {
// Apply changes that remove topic and replica
val removeTopicsDelta = topicsDeleteDelta(leaderMetadataImage.topics())
val removeMetadataImage = imageFromTopics(removeTopicsDelta.apply())
- replicaManager.applyDelta(removeMetadataImage, removeTopicsDelta)
+ replicaManager.applyDelta(removeTopicsDelta, removeMetadataImage)
// Check that the partition was removed
assertEquals(HostedPartition.None, replicaManager.getPartition(topicPartition))
@@ -3234,7 +3235,7 @@ class ReplicaManagerTest {
// Make the local replica the leader
val leaderTopicsDelta = topicsCreateDelta(localId, true)
val leaderMetadataImage = imageFromTopics(leaderTopicsDelta.apply())
- replicaManager.applyDelta(leaderMetadataImage, leaderTopicsDelta)
+ replicaManager.applyDelta(leaderTopicsDelta, leaderMetadataImage)
// Check the state of that partition and fetcher
val HostedPartition.Online(leaderPartition) = replicaManager.getPartition(topicPartition)
@@ -3250,7 +3251,7 @@ class ReplicaManagerTest {
// Change the local replica to follower
val followerTopicsDelta = topicsChangeDelta(leaderMetadataImage.topics(), localId, false)
val followerMetadataImage = imageFromTopics(followerTopicsDelta.apply())
- replicaManager.applyDelta(followerMetadataImage, followerTopicsDelta)
+ replicaManager.applyDelta(followerTopicsDelta, followerMetadataImage)
// Check that the produce failed because it changed to follower before replicating
assertEquals(Errors.NOT_LEADER_OR_FOLLOWER, leaderResponse.get.error)
@@ -3272,7 +3273,7 @@ class ReplicaManagerTest {
// Make the local replica the leader
val leaderTopicsDelta = topicsCreateDelta(localId, true)
val leaderMetadataImage = imageFromTopics(leaderTopicsDelta.apply())
- replicaManager.applyDelta(leaderMetadataImage, leaderTopicsDelta)
+ replicaManager.applyDelta(leaderTopicsDelta, leaderMetadataImage)
// Check the state of that partition and fetcher
val HostedPartition.Online(leaderPartition) = replicaManager.getPartition(topicPartition)
@@ -3296,7 +3297,7 @@ class ReplicaManagerTest {
// Change the local replica to follower
val followerTopicsDelta = topicsChangeDelta(leaderMetadataImage.topics(), localId, false)
val followerMetadataImage = imageFromTopics(followerTopicsDelta.apply())
- replicaManager.applyDelta(followerMetadataImage, followerTopicsDelta)
+ replicaManager.applyDelta(followerTopicsDelta, followerMetadataImage)
// Check that the produce failed because it changed to follower before replicating
assertEquals(Errors.NOT_LEADER_OR_FOLLOWER, fetchCallback.assertFired.error)
@@ -3326,7 +3327,7 @@ class ReplicaManagerTest {
// Make the local replica the leader
val topicsDelta = topicsCreateDelta(localId, isStartIdLeader)
val leaderMetadataImage = imageFromTopics(topicsDelta.apply())
- replicaManager.applyDelta(leaderMetadataImage, topicsDelta)
+ replicaManager.applyDelta(topicsDelta, leaderMetadataImage)
assertEquals(HostedPartition.Offline, replicaManager.getPartition(topicPartition))
} finally {
@@ -3356,7 +3357,7 @@ class ReplicaManagerTest {
// Make the local replica the follower
var followerTopicsDelta = topicsCreateDelta(localId, false)
var followerMetadataImage = imageFromTopics(followerTopicsDelta.apply())
- replicaManager.applyDelta(followerMetadataImage, followerTopicsDelta)
+ replicaManager.applyDelta(followerTopicsDelta, followerMetadataImage)
// Check the state of that partition
val HostedPartition.Online(followerPartition) = replicaManager.getPartition(topicPartition)
@@ -3398,7 +3399,7 @@ class ReplicaManagerTest {
// Apply changes that bumps the leader epoch.
followerTopicsDelta = topicsChangeDelta(followerMetadataImage.topics(), localId, false)
followerMetadataImage = imageFromTopics(followerTopicsDelta.apply())
- replicaManager.applyDelta(followerMetadataImage, followerTopicsDelta)
+ replicaManager.applyDelta(followerTopicsDelta, followerMetadataImage)
assertFalse(followerPartition.isLeader)
assertEquals(1, followerPartition.getLeaderEpoch)
@@ -3465,6 +3466,7 @@ class ReplicaManagerTest {
private def imageFromTopics(topicsImage: TopicsImage): MetadataImage = {
new MetadataImage(
+ new RaftOffsetAndEpoch(100, 10),
FeaturesImage.EMPTY,
ClusterImageTest.IMAGE1,
topicsImage,
diff --git a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala
index 735c779afb8..84ed06996ee 100644
--- a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala
@@ -21,9 +21,9 @@ import java.util
import java.util.concurrent.atomic.AtomicReference
import java.util.{Collections, Optional}
-import org.apache.kafka.common.{Endpoint, Uuid}
import org.apache.kafka.common.metadata.{PartitionChangeRecord, PartitionRecord, RegisterBrokerRecord, TopicRecord}
import org.apache.kafka.common.utils.Time
+import org.apache.kafka.common.{Endpoint, Uuid}
import org.apache.kafka.image.{MetadataDelta, MetadataImage}
import org.apache.kafka.metadata.{BrokerRegistration, RecordTestUtils, VersionRange}
import org.apache.kafka.server.common.ApiMessageAndVersion
@@ -54,7 +54,7 @@ class BrokerMetadataListenerTest {
setIncarnationId(Uuid.fromString("GFBwlTcpQUuLYQ2ig05CSg")), 0.toShort))))
val imageRecords = listener.getImageRecords().get()
assertEquals(0, imageRecords.size())
- assertEquals(100L, listener.highestMetadataOffset())
+ assertEquals(100L, listener.highestMetadataOffset)
listener.handleCommit(RecordTestUtils.mockBatchReader(200L,
util.Arrays.asList(new ApiMessageAndVersion(new RegisterBrokerRecord().
setBrokerId(1).
@@ -63,10 +63,8 @@ class BrokerMetadataListenerTest {
setRack(null).
setIncarnationId(Uuid.fromString("QkOQtNKVTYatADcaJ28xDg")), 0.toShort))))
listener.startPublishing(new MetadataPublisher {
- override def publish(newHighestMetadataOffset: Long,
- delta: MetadataDelta,
- newImage: MetadataImage): Unit = {
- assertEquals(200L, newHighestMetadataOffset)
+ override def publish(delta: MetadataDelta, newImage: MetadataImage): Unit = {
+ assertEquals(200L, newImage.highestOffsetAndEpoch().offset)
assertEquals(new BrokerRegistration(0, 100L,
Uuid.fromString("GFBwlTcpQUuLYQ2ig05CSg"), Collections.emptyList[Endpoint](),
Collections.emptyMap[String, VersionRange](), Optional.empty[String](), false),
@@ -90,20 +88,17 @@ class BrokerMetadataListenerTest {
var prevCommittedEpoch = -1
var prevLastContainedLogTime = -1L
- override def maybeStartSnapshot(committedOffset: Long,
- committedEpoch: Int,
- lastContainedLogTime: Long,
- newImage: MetadataImage): Boolean = {
+ override def maybeStartSnapshot(lastContainedLogTime: Long, newImage: MetadataImage): Boolean = {
try {
if (activeSnapshotOffset == -1L) {
- assertTrue(prevCommittedOffset <= committedOffset)
- assertTrue(prevCommittedEpoch <= committedEpoch)
+ assertTrue(prevCommittedOffset <= newImage.highestOffsetAndEpoch().offset)
+ assertTrue(prevCommittedEpoch <= newImage.highestOffsetAndEpoch().epoch)
assertTrue(prevLastContainedLogTime <= lastContainedLogTime)
- prevCommittedOffset = committedOffset
- prevCommittedEpoch = committedEpoch
+ prevCommittedOffset = newImage.highestOffsetAndEpoch().offset
+ prevCommittedEpoch = newImage.highestOffsetAndEpoch().epoch
prevLastContainedLogTime = lastContainedLogTime
image = newImage
- activeSnapshotOffset = committedOffset
+ activeSnapshotOffset = newImage.highestOffsetAndEpoch().offset
true
} else {
false
@@ -117,9 +112,7 @@ class BrokerMetadataListenerTest {
class MockMetadataPublisher extends MetadataPublisher {
var image = MetadataImage.EMPTY
- override def publish(newHighestMetadataOffset: Long,
- delta: MetadataDelta,
- newImage: MetadataImage): Unit = {
+ override def publish(delta: MetadataDelta, newImage: MetadataImage): Unit = {
image = newImage
}
}
@@ -152,10 +145,10 @@ class BrokerMetadataListenerTest {
registerBrokers(listener, brokerIds, endOffset = 100L)
createTopicWithOnePartition(listener, replicas = brokerIds, endOffset = 200L)
listener.getImageRecords().get()
- assertEquals(200L, listener.highestMetadataOffset())
+ assertEquals(200L, listener.highestMetadataOffset)
generateManyRecords(listener, endOffset = 1000L)
- assertEquals(1000L, listener.highestMetadataOffset())
+ assertEquals(1000L, listener.highestMetadataOffset)
} finally {
listener.close()
}
@@ -171,7 +164,7 @@ class BrokerMetadataListenerTest {
registerBrokers(listener, brokerIds, endOffset = 100L)
createTopicWithOnePartition(listener, replicas = brokerIds, endOffset = 200L)
listener.getImageRecords().get()
- assertEquals(200L, listener.highestMetadataOffset())
+ assertEquals(200L, listener.highestMetadataOffset)
// Check that we generate at least one snapshot once we see enough records.
assertEquals(-1L, snapshotter.prevCommittedOffset)
diff --git a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataSnapshotterTest.scala b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataSnapshotterTest.scala
index b030e87f281..888fec5ad25 100644
--- a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataSnapshotterTest.scala
+++ b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataSnapshotterTest.scala
@@ -51,7 +51,11 @@ class BrokerMetadataSnapshotterTest {
lastContainedLogTime: Long): SnapshotWriter[ApiMessageAndVersion] = {
val offsetAndEpoch = new OffsetAndEpoch(committedOffset, committedEpoch)
SnapshotWriter.createWithHeader(
- () => Optional.of(new MockRawSnapshotWriter(offsetAndEpoch, consumeSnapshotBuffer)),
+ () => {
+ Optional.of(
+ new MockRawSnapshotWriter(offsetAndEpoch, consumeSnapshotBuffer(committedOffset, committedEpoch))
+ )
+ },
1024,
MemoryPool.NONE,
Time.SYSTEM,
@@ -61,7 +65,7 @@ class BrokerMetadataSnapshotterTest {
).get();
}
- def consumeSnapshotBuffer(buffer: ByteBuffer): Unit = {
+ def consumeSnapshotBuffer(committedOffset: Long, committedEpoch: Int)(buffer: ByteBuffer): Unit = {
val delta = new MetadataDelta(MetadataImage.EMPTY)
val memoryRecords = MemoryRecords.readableRecords(buffer)
val batchIterator = memoryRecords.batchIterator()
@@ -72,7 +76,7 @@ class BrokerMetadataSnapshotterTest {
val recordBuffer = record.value().duplicate()
val messageAndVersion = MetadataRecordSerde.INSTANCE.read(
new ByteBufferAccessor(recordBuffer), recordBuffer.remaining())
- delta.replay(messageAndVersion.message())
+ delta.replay(committedOffset, committedEpoch, messageAndVersion.message())
})
}
}
@@ -92,8 +96,8 @@ class BrokerMetadataSnapshotterTest {
try {
val blockingEvent = new BlockingEvent()
snapshotter.eventQueue.append(blockingEvent)
- assertTrue(snapshotter.maybeStartSnapshot(123L, 12, 10000L, MetadataImageTest.IMAGE1))
- assertFalse(snapshotter.maybeStartSnapshot(124L, 12, 11000L, MetadataImageTest.IMAGE2))
+ assertTrue(snapshotter.maybeStartSnapshot(10000L, MetadataImageTest.IMAGE1))
+ assertFalse(snapshotter.maybeStartSnapshot(11000L, MetadataImageTest.IMAGE2))
blockingEvent.latch.countDown()
assertEquals(MetadataImageTest.IMAGE1, writerBuilder.image.get())
} finally {
diff --git a/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java b/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java
index 4b9451b07a5..aa7725b9e4f 100644
--- a/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java
+++ b/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java
@@ -32,6 +32,7 @@ import org.apache.kafka.common.metadata.TopicRecord;
import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import java.util.Iterator;
@@ -46,6 +47,10 @@ import java.util.List;
public final class MetadataDelta {
private final MetadataImage image;
+ private long highestOffset;
+
+ private int highestEpoch;
+
private FeaturesDelta featuresDelta = null;
private ClusterDelta clusterDelta = null;
@@ -58,6 +63,8 @@ public final class MetadataDelta {
public MetadataDelta(MetadataImage image) {
this.image = image;
+ this.highestOffset = image.highestOffsetAndEpoch().offset;
+ this.highestEpoch = image.highestOffsetAndEpoch().epoch;
}
public MetadataImage image() {
@@ -84,16 +91,19 @@ public final class MetadataDelta {
return clientQuotasDelta;
}
- public void read(Iterator> reader) {
+ public void read(long highestOffset, int highestEpoch, Iterator> reader) {
while (reader.hasNext()) {
List batch = reader.next();
for (ApiMessageAndVersion messageAndVersion : batch) {
- replay(messageAndVersion.message());
+ replay(highestOffset, highestEpoch, messageAndVersion.message());
}
}
}
- public void replay(ApiMessage record) {
+ public void replay(long offset, int epoch, ApiMessage record) {
+ highestOffset = offset;
+ highestEpoch = epoch;
+
MetadataRecordType type = MetadataRecordType.fromId(record.apiKey());
switch (type) {
case REGISTER_BROKER_RECORD:
@@ -253,14 +263,22 @@ public final class MetadataDelta {
} else {
newClientQuotas = clientQuotasDelta.apply();
}
- return new MetadataImage(newFeatures, newCluster, newTopics, newConfigs,
- newClientQuotas);
+ return new MetadataImage(
+ new OffsetAndEpoch(highestOffset, highestEpoch),
+ newFeatures,
+ newCluster,
+ newTopics,
+ newConfigs,
+ newClientQuotas
+ );
}
@Override
public String toString() {
return "MetadataDelta(" +
- "featuresDelta=" + featuresDelta +
+ "highestOffset=" + highestOffset +
+ ", highestEpoch=" + highestEpoch +
+ ", featuresDelta=" + featuresDelta +
", clusterDelta=" + clusterDelta +
", topicsDelta=" + topicsDelta +
", configsDelta=" + configsDelta +
diff --git a/metadata/src/main/java/org/apache/kafka/image/MetadataImage.java b/metadata/src/main/java/org/apache/kafka/image/MetadataImage.java
index 620379e9561..b9fdbc17580 100644
--- a/metadata/src/main/java/org/apache/kafka/image/MetadataImage.java
+++ b/metadata/src/main/java/org/apache/kafka/image/MetadataImage.java
@@ -17,6 +17,7 @@
package org.apache.kafka.image;
+import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import java.util.List;
@@ -31,12 +32,15 @@ import java.util.function.Consumer;
*/
public final class MetadataImage {
public final static MetadataImage EMPTY = new MetadataImage(
+ new OffsetAndEpoch(0, 0),
FeaturesImage.EMPTY,
ClusterImage.EMPTY,
TopicsImage.EMPTY,
ConfigurationsImage.EMPTY,
ClientQuotasImage.EMPTY);
+ private final OffsetAndEpoch highestOffsetAndEpoch;
+
private final FeaturesImage features;
private final ClusterImage cluster;
@@ -47,11 +51,15 @@ public final class MetadataImage {
private final ClientQuotasImage clientQuotas;
- public MetadataImage(FeaturesImage features,
- ClusterImage cluster,
- TopicsImage topics,
- ConfigurationsImage configs,
- ClientQuotasImage clientQuotas) {
+ public MetadataImage(
+ OffsetAndEpoch highestOffsetAndEpoch,
+ FeaturesImage features,
+ ClusterImage cluster,
+ TopicsImage topics,
+ ConfigurationsImage configs,
+ ClientQuotasImage clientQuotas
+ ) {
+ this.highestOffsetAndEpoch = highestOffsetAndEpoch;
this.features = features;
this.cluster = cluster;
this.topics = topics;
@@ -67,6 +75,10 @@ public final class MetadataImage {
clientQuotas.isEmpty();
}
+ public OffsetAndEpoch highestOffsetAndEpoch() {
+ return highestOffsetAndEpoch;
+ }
+
public FeaturesImage features() {
return features;
}
@@ -99,7 +111,8 @@ public final class MetadataImage {
public boolean equals(Object o) {
if (!(o instanceof MetadataImage)) return false;
MetadataImage other = (MetadataImage) o;
- return features.equals(other.features) &&
+ return highestOffsetAndEpoch.equals(other.highestOffsetAndEpoch) &&
+ features.equals(other.features) &&
cluster.equals(other.cluster) &&
topics.equals(other.topics) &&
configs.equals(other.configs) &&
@@ -108,12 +121,13 @@ public final class MetadataImage {
@Override
public int hashCode() {
- return Objects.hash(features, cluster, topics, configs, clientQuotas);
+ return Objects.hash(highestOffsetAndEpoch, features, cluster, topics, configs, clientQuotas);
}
@Override
public String toString() {
- return "MetadataImage(features=" + features +
+ return "MetadataImage(highestOffsetAndEpoch=" + highestOffsetAndEpoch +
+ ", features=" + features +
", cluster=" + cluster +
", topics=" + topics +
", configs=" + configs +
diff --git a/metadata/src/test/java/org/apache/kafka/image/MetadataImageTest.java b/metadata/src/test/java/org/apache/kafka/image/MetadataImageTest.java
index 2ee05bb7eb1..43709ba5f2c 100644
--- a/metadata/src/test/java/org/apache/kafka/image/MetadataImageTest.java
+++ b/metadata/src/test/java/org/apache/kafka/image/MetadataImageTest.java
@@ -18,6 +18,7 @@
package org.apache.kafka.image;
import org.apache.kafka.metadata.RecordTestUtils;
+import org.apache.kafka.raft.OffsetAndEpoch;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
@@ -33,20 +34,24 @@ public class MetadataImageTest {
public final static MetadataImage IMAGE2;
static {
- IMAGE1 = new MetadataImage(FeaturesImageTest.IMAGE1,
+ IMAGE1 = new MetadataImage(
+ new OffsetAndEpoch(100, 4),
+ FeaturesImageTest.IMAGE1,
ClusterImageTest.IMAGE1,
TopicsImageTest.IMAGE1,
ConfigurationsImageTest.IMAGE1,
ClientQuotasImageTest.IMAGE1);
DELTA1 = new MetadataDelta(IMAGE1);
- RecordTestUtils.replayAll(DELTA1, FeaturesImageTest.DELTA1_RECORDS);
- RecordTestUtils.replayAll(DELTA1, ClusterImageTest.DELTA1_RECORDS);
- RecordTestUtils.replayAll(DELTA1, TopicsImageTest.DELTA1_RECORDS);
- RecordTestUtils.replayAll(DELTA1, ConfigurationsImageTest.DELTA1_RECORDS);
- RecordTestUtils.replayAll(DELTA1, ClientQuotasImageTest.DELTA1_RECORDS);
+ RecordTestUtils.replayAll(DELTA1, 200, 5, FeaturesImageTest.DELTA1_RECORDS);
+ RecordTestUtils.replayAll(DELTA1, 200, 5, ClusterImageTest.DELTA1_RECORDS);
+ RecordTestUtils.replayAll(DELTA1, 200, 5, TopicsImageTest.DELTA1_RECORDS);
+ RecordTestUtils.replayAll(DELTA1, 200, 5, ConfigurationsImageTest.DELTA1_RECORDS);
+ RecordTestUtils.replayAll(DELTA1, 200, 5, ClientQuotasImageTest.DELTA1_RECORDS);
- IMAGE2 = new MetadataImage(FeaturesImageTest.IMAGE2,
+ IMAGE2 = new MetadataImage(
+ new OffsetAndEpoch(200, 5),
+ FeaturesImageTest.IMAGE2,
ClusterImageTest.IMAGE2,
TopicsImageTest.IMAGE2,
ConfigurationsImageTest.IMAGE2,
@@ -77,7 +82,8 @@ public class MetadataImageTest {
MockSnapshotConsumer writer = new MockSnapshotConsumer();
image.write(writer);
MetadataDelta delta = new MetadataDelta(MetadataImage.EMPTY);
- RecordTestUtils.replayAllBatches(delta, writer.batches());
+ RecordTestUtils.replayAllBatches(
+ delta, image.highestOffsetAndEpoch().offset, image.highestOffsetAndEpoch().epoch, writer.batches());
MetadataImage nextImage = delta.apply();
assertEquals(image, nextImage);
}
diff --git a/metadata/src/test/java/org/apache/kafka/metadata/RecordTestUtils.java b/metadata/src/test/java/org/apache/kafka/metadata/RecordTestUtils.java
index 7bedde33605..431c9bb0a57 100644
--- a/metadata/src/test/java/org/apache/kafka/metadata/RecordTestUtils.java
+++ b/metadata/src/test/java/org/apache/kafka/metadata/RecordTestUtils.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.protocol.Message;
import org.apache.kafka.common.protocol.ObjectSerializationCache;
import org.apache.kafka.common.utils.ImplicitLinkedHashCollection;
+import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.raft.Batch;
import org.apache.kafka.raft.BatchReader;
import org.apache.kafka.raft.internals.MemoryBatchReader;
@@ -48,7 +49,7 @@ public class RecordTestUtils {
* Replay a list of records.
*
* @param target The object to invoke the replay function on.
- * @param recordsAndVersions A list of batches of records.
+ * @param recordsAndVersions A list of records.
*/
public static void replayAll(Object target,
List recordsAndVersions) {
@@ -67,6 +68,26 @@ public class RecordTestUtils {
}
}
+ /**
+ * Replay a list of records to the metadata delta.
+ *
+ * @param delta the metadata delta on which to replay the records
+ * @param highestOffset highest offset from the list of records
+ * @param highestEpoch highest epoch from the list of records
+ * @param recordsAndVersions list of records
+ */
+ public static void replayAll(
+ MetadataDelta delta,
+ long highestOffset,
+ int highestEpoch,
+ List recordsAndVersions
+ ) {
+ for (ApiMessageAndVersion recordAndVersion : recordsAndVersions) {
+ ApiMessage record = recordAndVersion.message();
+ delta.replay(highestOffset, highestEpoch, record);
+ }
+ }
+
/**
* Replay a list of record batches.
*
@@ -80,6 +101,25 @@ public class RecordTestUtils {
}
}
+ /**
+ * Replay a list of record batches to the metadata delta.
+ *
+ * @param delta the metadata delta on which to replay the records
+ * @param highestOffset highest offset from the list of record batches
+ * @param highestEpoch highest epoch from the list of record batches
+ * @param recordsAndVersions list of batches of records
+ */
+ public static void replayAllBatches(
+ MetadataDelta delta,
+ long highestOffset,
+ int highestEpoch,
+ List> batches
+ ) {
+ for (List batch : batches) {
+ replayAll(delta, highestOffset, highestEpoch, batch);
+ }
+ }
+
/**
* Materialize the output of an iterator into a set.
*