KAFKA-14307; Controller time-based snapshots (#12761)

Implement time based snapshot for the controller. The general strategy for this feature is that the controller will use the record-batch's append time to determine if a snapshot should be generated. If the oldest record that has been committed but is not included in the latest snapshot is older than `metadata.log.max.snapshot.interval.ms`, the controller will trigger a snapshot immediately. This is useful in case the controller was offline for more that `metadata.log.max.snapshot.interval.ms` milliseconds.

If the oldest record that has been committed but is not included in the latest snapshot is NOT older than `metadata.log.max.snapshot.interval.ms`, the controller will schedule a `maybeGenerateSnapshot` deferred task.

It is possible that when the controller wants to generate a new snapshot, either because of time or number of bytes, the controller is currently generating a snapshot. In this case the `SnapshotGeneratorManager` was changed so that it checks and potentially triggers another snapshot when the currently in-progress snapshot finishes.

To better support this feature the following additional changes were made:
1. The configuration `metadata.log.max.snapshot.interval.ms` was added to `KafkaConfig` with a default value of one hour.
2. `RaftClient` was extended to return the latest snapshot id. This snapshot id is used to determine if a given record is included in a snapshot.
3. Improve the `SnapshotReason` type to support the inclusion of values in the message.

Reviewers: Jason Gustafson <jason@confluent.io>, Niket Goel <niket-goel@users.noreply.github.com>
This commit is contained in:
José Armando García Sancio 2022-11-21 17:30:50 -08:00 committed by GitHub
parent 36f933fc5f
commit 72b535acaf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 460 additions and 113 deletions

View File

@ -201,6 +201,7 @@ class ControllerServer(
setSessionTimeoutNs(TimeUnit.NANOSECONDS.convert(config.brokerSessionTimeoutMs.longValue(),
TimeUnit.MILLISECONDS)).
setSnapshotMaxNewRecordBytes(config.metadataSnapshotMaxNewRecordBytes).
setSnapshotMaxIntervalMs(config.metadataSnapshotMaxIntervalMs).
setLeaderImbalanceCheckIntervalNs(leaderImbalanceCheckIntervalNs).
setMaxIdleIntervalNs(maxIdleIntervalNs).
setMetrics(controllerMetrics).

View File

@ -81,6 +81,7 @@ object Defaults {
val BrokerHeartbeatIntervalMs = 2000
val BrokerSessionTimeoutMs = 9000
val MetadataSnapshotMaxNewRecordBytes = 20 * 1024 * 1024
val MetadataSnapshotMaxIntervalMs = TimeUnit.HOURS.toMillis(1);
val MetadataMaxIdleIntervalMs = 500
/** KRaft mode configs */
@ -400,6 +401,7 @@ object KafkaConfig {
val NodeIdProp = "node.id"
val MetadataLogDirProp = "metadata.log.dir"
val MetadataSnapshotMaxNewRecordBytesProp = "metadata.log.max.record.bytes.between.snapshots"
val MetadataSnapshotMaxIntervalMsProp = "metadata.log.max.snapshot.interval.ms"
val ControllerListenerNamesProp = "controller.listener.names"
val SaslMechanismControllerProtocolProp = "sasl.mechanism.controller.protocol"
val MetadataLogSegmentMinBytesProp = "metadata.log.segment.min.bytes"
@ -725,7 +727,17 @@ object KafkaConfig {
"This is required configuration when running in KRaft mode."
val MetadataLogDirDoc = "This configuration determines where we put the metadata log for clusters in KRaft mode. " +
"If it is not set, the metadata log is placed in the first log directory from log.dirs."
val MetadataSnapshotMaxNewRecordBytesDoc = "This is the maximum number of bytes in the log between the latest snapshot and the high-watermark needed before generating a new snapshot."
val MetadataSnapshotMaxNewRecordBytesDoc = "This is the maximum number of bytes in the log between the latest " +
"snapshot and the high-watermark needed before generating a new snapshot. The default value is " +
s"${Defaults.MetadataSnapshotMaxNewRecordBytes}. To generate snapshots based on the time elapsed, see " +
s"the <code>$MetadataSnapshotMaxIntervalMsProp</code> configuration. The Kafka node will generate a snapshot when " +
"either the maximum time interval is reached or the maximum bytes limit is reached."
val MetadataSnapshotMaxIntervalMsDoc = "This is the maximum number of milliseconds to wait to generate a snapshot " +
"if there are committed records in the log that are not included in the latest snapshot. A value of zero disables " +
s"time based snapshot generation. The default value is ${Defaults.MetadataSnapshotMaxIntervalMs}. To generate " +
s"snapshots based on the number of metadata bytes, see the <code>$MetadataSnapshotMaxNewRecordBytesProp</code> " +
"configuration. The Kafka node will generate a snapshot when either the maximum time interval is reached or the " +
"maximum bytes limit is reached."
val MetadataMaxIdleIntervalMsDoc = "This configuration controls how often the active " +
"controller should write no-op records to the metadata partition. If the value is 0, no-op records " +
s"are not appended to the metadata partition. The default value is ${Defaults.MetadataMaxIdleIntervalMs}";
@ -1157,6 +1169,7 @@ object KafkaConfig {
* KRaft mode configs.
*/
.define(MetadataSnapshotMaxNewRecordBytesProp, LONG, Defaults.MetadataSnapshotMaxNewRecordBytes, atLeast(1), HIGH, MetadataSnapshotMaxNewRecordBytesDoc)
.define(MetadataSnapshotMaxIntervalMsProp, LONG, Defaults.MetadataSnapshotMaxIntervalMs, atLeast(0), HIGH, MetadataSnapshotMaxIntervalMsDoc)
/*
* KRaft mode private configs. Note that these configs are defined as internal. We will make them public in the 3.0.0 release.
@ -1697,6 +1710,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
/************* Metadata Configuration ***********/
val metadataSnapshotMaxNewRecordBytes = getLong(KafkaConfig.MetadataSnapshotMaxNewRecordBytesProp)
val metadataSnapshotMaxIntervalMs = getLong(KafkaConfig.MetadataSnapshotMaxIntervalMsProp)
val metadataMaxIdleIntervalNs: Option[Long] = {
val value = TimeUnit.NANOSECONDS.convert(getInt(KafkaConfig.MetadataMaxIdleIntervalMsProp).toLong, TimeUnit.MILLISECONDS)
if (value > 0) Some(value) else None
@ -2282,7 +2296,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
val principalBuilderClass = getClass(KafkaConfig.PrincipalBuilderClassProp)
require(principalBuilderClass != null, s"${KafkaConfig.PrincipalBuilderClassProp} must be non-null")
require(classOf[KafkaPrincipalSerde].isAssignableFrom(principalBuilderClass),
require(classOf[KafkaPrincipalSerde].isAssignableFrom(principalBuilderClass),
s"${KafkaConfig.PrincipalBuilderClassProp} must implement KafkaPrincipalSerde")
}
}

View File

@ -17,19 +17,19 @@
package kafka.server.metadata
import java.util
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.{CompletableFuture, TimeUnit}
import kafka.metrics.KafkaMetricsGroup
import org.apache.kafka.image.{MetadataDelta, MetadataImage}
import org.apache.kafka.common.utils.{LogContext, Time}
import org.apache.kafka.image.writer.{ImageWriterOptions, RecordListWriter}
import org.apache.kafka.image.{MetadataDelta, MetadataImage}
import org.apache.kafka.metadata.util.SnapshotReason
import org.apache.kafka.queue.{EventQueue, KafkaEventQueue}
import org.apache.kafka.raft.{Batch, BatchReader, LeaderAndEpoch, RaftClient}
import org.apache.kafka.server.common.ApiMessageAndVersion
import org.apache.kafka.server.fault.FaultHandler
import org.apache.kafka.snapshot.SnapshotReader
import java.util.concurrent.atomic.AtomicBoolean
import scala.compat.java8.OptionConverters._
object BrokerMetadataListener {
@ -153,25 +153,27 @@ class BrokerMetadataListener(
}
private def shouldSnapshot(): Set[SnapshotReason] = {
val metadataVersionHasChanged = metadataVersionChanged()
val maxBytesHaveExceeded = (_bytesSinceLastSnapshot >= maxBytesBetweenSnapshots)
val maybeMetadataVersionChanged = metadataVersionChanged.toSet
if (maxBytesHaveExceeded && metadataVersionHasChanged) {
Set(SnapshotReason.MetadataVersionChanged, SnapshotReason.MaxBytesExceeded)
} else if (maxBytesHaveExceeded) {
Set(SnapshotReason.MaxBytesExceeded)
} else if (metadataVersionHasChanged) {
Set(SnapshotReason.MetadataVersionChanged)
if (_bytesSinceLastSnapshot >= maxBytesBetweenSnapshots) {
maybeMetadataVersionChanged + SnapshotReason.maxBytesExceeded(_bytesSinceLastSnapshot, maxBytesBetweenSnapshots)
} else {
Set()
maybeMetadataVersionChanged
}
}
private def metadataVersionChanged(): Boolean = {
private def metadataVersionChanged: Option[SnapshotReason] = {
// The _publisher is empty before starting publishing, and we won't compute feature delta
// until we starting publishing
_publisher.nonEmpty && Option(_delta.featuresDelta()).exists { featuresDelta =>
featuresDelta.metadataVersionChange().isPresent
if (_publisher.nonEmpty) {
Option(_delta.featuresDelta()).flatMap { featuresDelta =>
featuresDelta
.metadataVersionChange()
.asScala
.map(SnapshotReason.metadataVersionChanged)
}
} else {
None
}
}
@ -306,9 +308,8 @@ class BrokerMetadataListener(
_publisher = Some(publisher)
log.info(s"Starting to publish metadata events at offset $highestMetadataOffset.")
try {
if (metadataVersionChanged()) {
maybeStartSnapshot(Set(SnapshotReason.MetadataVersionChanged))
}
// Generate a snapshot if the metadata version changed
metadataVersionChanged.foreach(reason => maybeStartSnapshot(Set(reason)))
publish(publisher)
future.complete(null)
} catch {

View File

@ -18,13 +18,14 @@ package kafka.server.metadata
import java.util.concurrent.RejectedExecutionException
import kafka.utils.Logging
import org.apache.kafka.image.MetadataImage
import org.apache.kafka.common.utils.{LogContext, Time}
import org.apache.kafka.image.MetadataImage
import org.apache.kafka.image.writer.{ImageWriterOptions, RaftSnapshotWriter}
import org.apache.kafka.metadata.util.SnapshotReason
import org.apache.kafka.queue.{EventQueue, KafkaEventQueue}
import org.apache.kafka.server.common.ApiMessageAndVersion
import org.apache.kafka.snapshot.SnapshotWriter
import scala.jdk.CollectionConverters._
trait SnapshotWriterBuilder {
def build(committedOffset: Long,
@ -62,9 +63,13 @@ class BrokerMetadataSnapshotter(
*/
val eventQueue = new KafkaEventQueue(time, logContext, threadNamePrefix.getOrElse(""))
override def maybeStartSnapshot(lastContainedLogTime: Long, image: MetadataImage, snapshotReasons: Set[SnapshotReason]): Boolean = synchronized {
override def maybeStartSnapshot(
lastContainedLogTime: Long,
image: MetadataImage,
snapshotReasons: Set[SnapshotReason]
): Boolean = synchronized {
if (_currentSnapshotOffset != -1) {
info(s"Declining to create a new snapshot at ${image.highestOffsetAndEpoch()} because " +
info(s"Declining to create a new snapshot at ${image.highestOffsetAndEpoch} because " +
s"there is already a snapshot in progress at offset ${_currentSnapshotOffset}")
false
} else {
@ -74,9 +79,10 @@ class BrokerMetadataSnapshotter(
lastContainedLogTime
)
if (writer.nonEmpty) {
_currentSnapshotOffset = image.highestOffsetAndEpoch().offset
_currentSnapshotOffset = image.highestOffsetAndEpoch.offset
info(s"Creating a new snapshot at offset ${_currentSnapshotOffset} because, ${snapshotReasons.mkString(" and ")}")
val snapshotReasonsMessage = SnapshotReason.stringFromReasons(snapshotReasons.asJava)
info(s"Creating a new snapshot at ${image.highestOffsetAndEpoch} because: $snapshotReasonsMessage")
eventQueue.append(new CreateSnapshotEvent(image, writer.get))
true
} else {

View File

@ -1616,4 +1616,23 @@ class KafkaConfigTest {
val config = KafkaConfig.fromProps(props)
assertEquals(config.interBrokerProtocolVersion, MetadataVersion.MINIMUM_KRAFT_VERSION)
}
@Test
def testMetadataMaxSnapshotInterval(): Unit = {
val validValue = 100
val props = new Properties()
props.putAll(kraftProps())
props.setProperty(KafkaConfig.MetadataSnapshotMaxIntervalMsProp, validValue.toString)
val config = KafkaConfig.fromProps(props)
assertEquals(validValue, config.metadataSnapshotMaxIntervalMs)
props.setProperty(KafkaConfig.MetadataSnapshotMaxIntervalMsProp, "-1")
val errorMessage = assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props)).getMessage
assertEquals(
"Invalid value -1 for configuration metadata.log.max.snapshot.interval.ms: Value must be at least 0",
errorMessage
)
}
}

View File

@ -95,29 +95,10 @@ class BrokerMetadataSnapshotterTest {
def testCreateSnapshot(): Unit = {
val writerBuilder = new MockSnapshotWriterBuilder()
val snapshotter = new BrokerMetadataSnapshotter(0, Time.SYSTEM, None, writerBuilder)
try {
val blockingEvent = new BlockingEvent()
val reasons = Set(SnapshotReason.UnknownReason)
snapshotter.eventQueue.append(blockingEvent)
assertTrue(snapshotter.maybeStartSnapshot(10000L, MetadataImageTest.IMAGE1, reasons))
assertFalse(snapshotter.maybeStartSnapshot(11000L, MetadataImageTest.IMAGE2, reasons))
blockingEvent.latch.countDown()
assertEquals(MetadataImageTest.IMAGE1, writerBuilder.image.get())
} finally {
snapshotter.close()
}
}
@Test
def testCreateSnapshotMultipleReasons(): Unit = {
val writerBuilder = new MockSnapshotWriterBuilder()
val snapshotter = new BrokerMetadataSnapshotter(0, Time.SYSTEM, None, writerBuilder)
try {
val blockingEvent = new BlockingEvent()
val reasons = Set(SnapshotReason.MaxBytesExceeded, SnapshotReason.MetadataVersionChanged)
val reasons = Set(SnapshotReason.UNKNOWN)
snapshotter.eventQueue.append(blockingEvent)
assertTrue(snapshotter.maybeStartSnapshot(10000L, MetadataImageTest.IMAGE1, reasons))

View File

@ -107,6 +107,7 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map.Entry;
import java.util.Map;
@ -168,6 +169,7 @@ public final class QuorumController implements Controller {
private int defaultNumPartitions = 1;
private ReplicaPlacer replicaPlacer = new StripedReplicaPlacer(new Random());
private long snapshotMaxNewRecordBytes = Long.MAX_VALUE;
private long snapshotMaxIntervalMs = 0;
private OptionalLong leaderImbalanceCheckIntervalNs = OptionalLong.empty();
private OptionalLong maxIdleIntervalNs = OptionalLong.empty();
private long sessionTimeoutNs = ClusterControlManager.DEFAULT_SESSION_TIMEOUT_NS;
@ -244,6 +246,11 @@ public final class QuorumController implements Controller {
return this;
}
public Builder setSnapshotMaxIntervalMs(long value) {
this.snapshotMaxIntervalMs = value;
return this;
}
public Builder setLeaderImbalanceCheckIntervalNs(OptionalLong value) {
this.leaderImbalanceCheckIntervalNs = value;
return this;
@ -339,6 +346,7 @@ public final class QuorumController implements Controller {
defaultNumPartitions,
replicaPlacer,
snapshotMaxNewRecordBytes,
snapshotMaxIntervalMs,
leaderImbalanceCheckIntervalNs,
maxIdleIntervalNs,
sessionTimeoutNs,
@ -468,13 +476,13 @@ public final class QuorumController implements Controller {
/**
* A controller event for handling internal state changes, such as Raft inputs.
*/
class ControlEvent implements EventQueue.Event {
class ControllerEvent implements EventQueue.Event {
private final String name;
private final Runnable handler;
private final long eventCreatedTimeNs = time.nanoseconds();
private OptionalLong startProcessingTimeNs = OptionalLong.empty();
ControlEvent(String name, Runnable handler) {
ControllerEvent(String name, Runnable handler) {
this.name = name;
this.handler = handler;
}
@ -501,7 +509,7 @@ public final class QuorumController implements Controller {
}
private void appendControlEvent(String name, Runnable handler) {
ControlEvent event = new ControlEvent(name, handler);
ControllerEvent event = new ControllerEvent(name, handler);
queue.append(event);
}
@ -513,11 +521,11 @@ public final class QuorumController implements Controller {
private SnapshotGenerator generator = null;
void createSnapshotGenerator(long committedOffset, int committedEpoch, long committedTimestamp) {
if (generator != null) {
throw new RuntimeException("Snapshot generator already exists.");
if (snapshotInProgress()) {
throw new IllegalStateException("Snapshot generator already exists");
}
if (!snapshotRegistry.hasSnapshot(committedOffset)) {
throw new RuntimeException(
throw new IllegalStateException(
String.format(
"Cannot generate a snapshot at committed offset %d because it does not exists in the snapshot registry.",
committedOffset
@ -555,7 +563,7 @@ public final class QuorumController implements Controller {
}
void cancel() {
if (generator == null) return;
if (!snapshotInProgress()) return;
log.error("Cancelling snapshot {}", generator.lastContainedLogOffset());
generator.writer().close();
generator = null;
@ -568,45 +576,62 @@ public final class QuorumController implements Controller {
}
void reschedule(long delayNs) {
ControlEvent event = new ControlEvent(GENERATE_SNAPSHOT, this);
ControllerEvent event = new ControllerEvent(GENERATE_SNAPSHOT, this);
queue.scheduleDeferred(event.name,
new EarliestDeadlineFunction(time.nanoseconds() + delayNs), event);
}
void handleSnapshotFinished(Optional<Exception> exception) {
if (exception.isPresent()) {
log.error("Error while generating snapshot {}", generator.lastContainedLogOffset(), exception.get());
} else {
log.info("Finished generating snapshot {}.", generator.lastContainedLogOffset());
}
generator.writer().close();
generator = null;
// Delete every in-memory snapshot up to the committed offset. They are not needed since this
// snapshot generation finished.
snapshotRegistry.deleteSnapshotsUpTo(lastCommittedOffset);
// The snapshot counters for size-based and time-based snapshots could have changed to cause a new
// snapshot to get generated.
maybeGenerateSnapshot();
}
@Override
public void run() {
if (generator == null) {
log.debug("No snapshot is in progress.");
if (!snapshotInProgress()) {
log.debug("No snapshot is in progress because it was previously canceled");
return;
}
OptionalLong nextDelay;
try {
nextDelay = generator.generateBatches();
} catch (Exception e) {
log.error("Error while generating snapshot {}", generator.lastContainedLogOffset(), e);
generator.writer().close();
generator = null;
handleSnapshotFinished(Optional.of(e));
return;
}
if (!nextDelay.isPresent()) {
log.info("Finished generating snapshot {}.", generator.lastContainedLogOffset());
generator.writer().close();
generator = null;
// Delete every in-memory snapshot up to the committed offset. They are not needed since this
// snapshot generation finished.
snapshotRegistry.deleteSnapshotsUpTo(lastCommittedOffset);
return;
if (nextDelay.isPresent()) {
reschedule(nextDelay.getAsLong());
} else {
handleSnapshotFinished(Optional.empty());
}
reschedule(nextDelay.getAsLong());
}
OptionalLong snapshotLastOffsetFromLog() {
if (generator == null) {
if (!snapshotInProgress()) {
return OptionalLong.empty();
}
return OptionalLong.of(generator.lastContainedLogOffset());
}
public boolean snapshotInProgress() {
return generator != null;
}
}
/**
@ -947,9 +972,10 @@ public final class QuorumController implements Controller {
// Complete any events in the purgatory that were waiting for this offset.
purgatory.completeUpTo(offset);
// Delete all the in-memory snapshots that we no longer need.
// If we are writing a new snapshot, then we need to keep that around;
// otherwise, we should delete up to the current committed offset.
// Delete all the in-memory snapshots that are no longer needed.
//
// If the active controller has a snapshot in progress, it needs to keep that in-memory
// snapshot. Otherwise, the active controller can delete up to the current committed offset.
snapshotRegistry.deleteSnapshotsUpTo(
snapshotGeneratorManager.snapshotLastOffsetFromLog().orElse(offset));
} else {
@ -986,6 +1012,13 @@ public final class QuorumController implements Controller {
batch.appendTimestamp(),
committedBytesSinceLastSnapshot + batch.sizeInBytes()
);
if (offset >= raftClient.latestSnapshotId().map(OffsetAndEpoch::offset).orElse(0L)) {
oldestNonSnapshottedTimestamp = Math.min(
oldestNonSnapshottedTimestamp,
batch.appendTimestamp()
);
}
}
maybeGenerateSnapshot();
@ -1355,15 +1388,19 @@ public final class QuorumController implements Controller {
featureControl.metadataVersion()
);
ControllerWriteEvent<Void> event = new ControllerWriteEvent<>(WRITE_NO_OP_RECORD, () -> {
noOpRecordScheduled = false;
maybeScheduleNextWriteNoOpRecord();
ControllerWriteEvent<Void> event = new ControllerWriteEvent<>(
WRITE_NO_OP_RECORD,
() -> {
noOpRecordScheduled = false;
maybeScheduleNextWriteNoOpRecord();
return ControllerResult.of(
Arrays.asList(new ApiMessageAndVersion(new NoOpRecord(), (short) 0)),
null
);
}, true);
return ControllerResult.of(
Arrays.asList(new ApiMessageAndVersion(new NoOpRecord(), (short) 0)),
null
);
},
true
);
long delayNs = time.nanoseconds() + maxIdleIntervalNs.getAsLong();
queue.scheduleDeferred(WRITE_NO_OP_RECORD, new EarliestDeadlineFunction(delayNs), event);
@ -1376,6 +1413,38 @@ public final class QuorumController implements Controller {
queue.cancelDeferred(WRITE_NO_OP_RECORD);
}
private static final String MAYBE_GENERATE_SNAPSHOT = "maybeGenerateSnapshot";
private void maybeScheduleNextGenerateSnapshot() {
if (!generateSnapshotScheduled) {
long now = time.milliseconds();
long delayMs = Math.min(
0,
snapshotMaxIntervalMs + oldestNonSnapshottedTimestamp - now
);
log.debug(
"Scheduling write event for {} because snapshotMaxIntervalMs ({}), " +
"oldestNonSnapshottedTimestamp ({}) and now ({})",
MAYBE_GENERATE_SNAPSHOT,
snapshotMaxIntervalMs,
oldestNonSnapshottedTimestamp,
now
);
ControllerEvent event = new ControllerEvent(MAYBE_GENERATE_SNAPSHOT, this::maybeGenerateSnapshot);
long scheduleNs = time.nanoseconds() + TimeUnit.MILLISECONDS.toNanos(delayMs);
queue.scheduleDeferred(MAYBE_GENERATE_SNAPSHOT, new EarliestDeadlineFunction(scheduleNs), event);
generateSnapshotScheduled = true;
}
}
private void cancelNextGenerateSnapshot() {
queue.cancelDeferred(MAYBE_GENERATE_SNAPSHOT);
generateSnapshotScheduled = false;
}
private void handleFeatureControlChange() {
// The feature control maybe have changed. On the active controller cancel or schedule noop
// record writes accordingly.
@ -1455,30 +1524,60 @@ public final class QuorumController implements Controller {
}
private void maybeGenerateSnapshot() {
if (committedBytesSinceLastSnapshot >= snapshotMaxNewRecordBytes &&
snapshotGeneratorManager.generator == null
) {
if (!isActiveController()) {
// The active controller creates in-memory snapshot every time an uncommitted
// batch gets appended. The in-active controller can be more efficient and only
// create an in-memory snapshot when needed.
snapshotRegistry.getOrCreateSnapshot(lastCommittedOffset);
if (snapshotGeneratorManager.snapshotInProgress()) {
/* Skip snapshot generation if there is a snaphshot in progress.
*
* When the in-progress snapshot completes it will call this method to check if the controller should
* generate another snapshot due to any of the reasons supported by this method.
*/
} else {
Set<SnapshotReason> snapshotReasons = new HashSet<>();
// Check if a snapshot should be generated because of committed bytes
if (committedBytesSinceLastSnapshot >= snapshotMaxNewRecordBytes) {
snapshotReasons.add(
SnapshotReason.maxBytesExceeded(committedBytesSinceLastSnapshot, snapshotMaxNewRecordBytes)
);
}
log.info(
"Generating a snapshot that includes (epoch={}, offset={}) after {} committed bytes since the last snapshot because, {}.",
lastCommittedEpoch,
lastCommittedOffset,
committedBytesSinceLastSnapshot,
SnapshotReason.MaxBytesExceeded
);
// Check if a snapshot should be generated because of committed append times
if (snapshotMaxIntervalMs > 0) {
// Time base snasphots are enabled
long snapshotIntervalMs = time.milliseconds() - oldestNonSnapshottedTimestamp;
if (snapshotIntervalMs >= snapshotMaxIntervalMs) {
snapshotReasons.add(SnapshotReason.maxIntervalExceeded(snapshotIntervalMs, snapshotMaxIntervalMs));
} else {
maybeScheduleNextGenerateSnapshot();
}
}
snapshotGeneratorManager.createSnapshotGenerator(
lastCommittedOffset,
lastCommittedEpoch,
lastCommittedTimestamp
);
committedBytesSinceLastSnapshot = 0;
if (!snapshotReasons.isEmpty()) {
if (!isActiveController()) {
// The inactive controllers only create an in-memory snapshot when generating a snapshot. This is
// unlike the active controller which creates in-memory snapshots every time an uncommitted batch
// gets replayed.
snapshotRegistry.getOrCreateSnapshot(lastCommittedOffset);
}
log.info(
"Generating a snapshot that includes (epoch={}, offset={}) because: {}",
lastCommittedEpoch,
lastCommittedOffset,
SnapshotReason.stringFromReasons(snapshotReasons)
);
snapshotGeneratorManager.createSnapshotGenerator(
lastCommittedOffset,
lastCommittedEpoch,
lastCommittedTimestamp
);
// Reset all of the snapshot counters
committedBytesSinceLastSnapshot = 0;
oldestNonSnapshottedTimestamp = Long.MAX_VALUE;
// Starting a snapshot invalidates any scheduled snapshot generation
cancelNextGenerateSnapshot();
}
}
}
@ -1664,6 +1763,16 @@ public final class QuorumController implements Controller {
*/
private long committedBytesSinceLastSnapshot = 0;
/**
* Maximum amount of to wait for a record in the log to get included in a snapshot.
*/
private final long snapshotMaxIntervalMs;
/**
* Timestamp for the oldest record that was committed but not included in a snapshot.
*/
private long oldestNonSnapshottedTimestamp = Long.MAX_VALUE;
/**
* How long to delay partition leader balancing operations.
*/
@ -1693,6 +1802,11 @@ public final class QuorumController implements Controller {
*/
private boolean noOpRecordScheduled = false;
/**
* Tracks if a snapshot generate was scheduled.
*/
private boolean generateSnapshotScheduled = false;
/**
* The bootstrap metadata to use for initialization if needed.
*/
@ -1717,6 +1831,7 @@ public final class QuorumController implements Controller {
int defaultNumPartitions,
ReplicaPlacer replicaPlacer,
long snapshotMaxNewRecordBytes,
long snapshotMaxIntervalMs,
OptionalLong leaderImbalanceCheckIntervalNs,
OptionalLong maxIdleIntervalNs,
long sessionTimeoutNs,
@ -1774,6 +1889,7 @@ public final class QuorumController implements Controller {
build();
this.producerIdControlManager = new ProducerIdControlManager(clusterControl, snapshotRegistry);
this.snapshotMaxNewRecordBytes = snapshotMaxNewRecordBytes;
this.snapshotMaxIntervalMs = snapshotMaxIntervalMs;
this.leaderImbalanceCheckIntervalNs = leaderImbalanceCheckIntervalNs;
this.maxIdleIntervalNs = maxIdleIntervalNs;
this.replicationControl = new ReplicationControlManager.Builder().
@ -2116,13 +2232,12 @@ public final class QuorumController implements Controller {
public CompletableFuture<Long> beginWritingSnapshot() {
CompletableFuture<Long> future = new CompletableFuture<>();
appendControlEvent("beginWritingSnapshot", () -> {
if (snapshotGeneratorManager.generator == null) {
if (!snapshotGeneratorManager.snapshotInProgress()) {
log.info(
"Generating a snapshot that includes (epoch={}, offset={}) after {} committed bytes since the last snapshot because, {}.",
"Generating a snapshot that includes (epoch={}, offset={}) because, {}.",
lastCommittedEpoch,
lastCommittedOffset,
committedBytesSinceLastSnapshot,
SnapshotReason.UnknownReason
SnapshotReason.UNKNOWN
);
snapshotGeneratorManager.createSnapshotGenerator(
lastCommittedOffset,

View File

@ -16,19 +16,45 @@
*/
package org.apache.kafka.metadata.util;
public enum SnapshotReason {
UnknownReason("unknown reason"),
MaxBytesExceeded("max bytes were exceeded"),
MetadataVersionChanged("metadata version was changed");
import java.util.Collection;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.common.MetadataVersion;
private final String snapshotReason;
/**
* Reason for generating a snapshot.
*/
public final class SnapshotReason {
static public final SnapshotReason UNKNOWN = new SnapshotReason("unknown reason");
SnapshotReason(String snapshotReason) {
this.snapshotReason = snapshotReason;
static public SnapshotReason maxBytesExceeded(long bytes, long maxBytes) {
return new SnapshotReason(String.format("%s bytes exceeded the maximum bytes of %s", bytes, maxBytes));
}
static public SnapshotReason maxIntervalExceeded(long interval, long maxInterval) {
return new SnapshotReason(
String.format("%s ms exceeded the maximum snapshot interval of %s ms", interval, maxInterval)
);
}
static public SnapshotReason metadataVersionChanged(MetadataVersion metadataVersion) {
return new SnapshotReason(String.format("metadata version was changed to %s", metadataVersion));
}
/**
* Converts a collection of reasons into a string.
*/
static public String stringFromReasons(Collection<SnapshotReason> reasons) {
return Utils.join(reasons, ", ");
}
private final String reason;
private SnapshotReason(String reason) {
this.reason = reason;
}
@Override
public String toString() {
return snapshotReason;
return reason;
}
}

View File

@ -821,6 +821,45 @@ public class QuorumControllerTest {
}
}
@Test
public void testSnapshotAfterConfiguredMaxInterval() throws Throwable {
final int numBrokers = 4;
Map<Integer, Long> brokerEpochs = new HashMap<>();
try (
LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(3).
build();
QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).
setControllerBuilderInitializer(controllerBuilder -> {
controllerBuilder.setConfigSchema(SCHEMA);
controllerBuilder.setSnapshotMaxIntervalMs(100);
// Disable snapshot generation due to bytes committed
controllerBuilder.setSnapshotMaxNewRecordBytes(Long.MAX_VALUE);
}).
build();
) {
QuorumController active = controlEnv.activeController();
for (int i = 0; i < numBrokers; i++) {
BrokerRegistrationReply reply = active.registerBroker(ANONYMOUS_CONTEXT,
new BrokerRegistrationRequestData().
setBrokerId(i).
setRack(null).
setClusterId(active.clusterId()).
setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_3_IV3)).
setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwB" + i)).
setListeners(new ListenerCollection(Arrays.asList(new Listener().
setName("PLAINTEXT").setHost("localhost").
setPort(9092 + i)).iterator()))).get();
brokerEpochs.put(i, reply.epoch());
assertEquals(new BrokerHeartbeatReply(true, false, false, false),
active.processBrokerHeartbeat(ANONYMOUS_CONTEXT, new BrokerHeartbeatRequestData().
setWantFence(false).setBrokerEpoch(brokerEpochs.get(i)).
setBrokerId(i).setCurrentMetadataOffset(100000L)).get());
}
logEnv.waitForLatestSnapshot();
}
}
@Test
public void testSnapshotAfterRepeatedResign() throws Throwable {
final int numBrokers = 4;

View File

@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.metadata.util;
import java.util.Arrays;
import java.util.List;
import org.apache.kafka.server.common.MetadataVersion;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
public final class SnapshotReasonTest {
@Test
public void testUnknown() {
assertEquals("unknown reason", SnapshotReason.UNKNOWN.toString());
}
@Test
public void testMaxBytesExceeded() {
long bytes = 1000;
long maxBytes = 900;
String expectedMessage = "1000 bytes exceeded the maximum bytes of 900";
assertEquals(expectedMessage, SnapshotReason.maxBytesExceeded(bytes, maxBytes).toString());
}
@Test
public void testMaxIntervalExceeded() {
long interval = 1000;
long maxInterval = 900;
String expectedMessage = "1000 ms exceeded the maximum snapshot interval of 900 ms";
assertEquals(expectedMessage, SnapshotReason.maxIntervalExceeded(interval, maxInterval).toString());
}
@Test
public void testMetadataVersionChanged() {
MetadataVersion metadataVersion = MetadataVersion.IBP_3_3_IV3;
String expectedMessage = "metadata version was changed to 3.3-IV3";
assertEquals(expectedMessage, SnapshotReason.metadataVersionChanged(metadataVersion).toString());
}
@Test
public void testJoinReasons() {
long bytes = 1000;
long maxBytes = 900;
long interval = 1000;
long maxInterval = 900;
MetadataVersion metadataVersion = MetadataVersion.IBP_3_3_IV3;
List<SnapshotReason> reasons = Arrays.asList(
SnapshotReason.UNKNOWN,
SnapshotReason.maxBytesExceeded(bytes, maxBytes),
SnapshotReason.maxIntervalExceeded(interval, maxInterval),
SnapshotReason.metadataVersionChanged(metadataVersion)
);
String joinedReasons = SnapshotReason.stringFromReasons(reasons);
assertTrue(joinedReasons.contains("unknown reason"), joinedReasons);
assertTrue(joinedReasons.contains("1000 bytes exceeded the maximum bytes of 900"), joinedReasons);
assertTrue(joinedReasons.contains("1000 ms exceeded the maximum snapshot interval of 900 ms"), joinedReasons);
assertTrue(joinedReasons.contains("metadata version was changed to 3.3-IV3"), joinedReasons);
}
}

View File

@ -346,6 +346,15 @@ public final class LocalLogManager implements RaftClient<ApiMessageAndVersion>,
return Objects.requireNonNull(snapshots.lastEntry()).getValue();
}
/**
* Returns the snapshot id of the latest snapshot if there is one.
*
* If a snapshot doesn't exists, it return an empty Optional.
*/
synchronized Optional<OffsetAndEpoch> latestSnapshotId() {
return Optional.ofNullable(snapshots.lastEntry()).map(entry -> entry.getValue().snapshotId());
}
synchronized long appendedBytes() {
ObjectSerializationCache objectCache = new ObjectSerializationCache();
@ -789,6 +798,11 @@ public final class LocalLogManager implements RaftClient<ApiMessageAndVersion>,
);
}
@Override
public synchronized Optional<OffsetAndEpoch> latestSnapshotId() {
return shared.latestSnapshotId();
}
@Override
public LeaderAndEpoch leaderAndEpoch() {
return leader;

View File

@ -2357,6 +2357,11 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
);
}
@Override
public Optional<OffsetAndEpoch> latestSnapshotId() {
return log.latestSnapshotId();
}
@Override
public void close() {
log.flush(true);

View File

@ -229,4 +229,15 @@ public interface RaftClient<T> extends AutoCloseable {
* or less than the log start offset.
*/
Optional<SnapshotWriter<T>> createSnapshot(long committedOffset, int committedEpoch, long lastContainedLogTime);
/**
* The snapshot id for the lastest snapshot.
*
* Returns the snapshot id of the latest snapshot, if it exists. If a snapshot doesn't exists, returns an
* {@link Optional#empty()}.
*
* @return the id of the latest snaphost, if it exists
*/
Optional<OffsetAndEpoch> latestSnapshotId();
}

View File

@ -52,6 +52,41 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.assertThrows;
final public class KafkaRaftClientSnapshotTest {
@Test
public void testLatestSnapshotId() throws Exception {
int localId = 0;
int leaderId = localId + 1;
Set<Integer> voters = Utils.mkSet(localId, leaderId);
int epoch = 2;
OffsetAndEpoch snapshotId = new OffsetAndEpoch(3, 1);
RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters)
.appendToLog(snapshotId.epoch(), Arrays.asList("a", "b", "c"))
.appendToLog(snapshotId.epoch(), Arrays.asList("d", "e", "f"))
.withEmptySnapshot(snapshotId)
.withElectedLeader(epoch, leaderId)
.build();
assertEquals(Optional.of(snapshotId), context.client.latestSnapshotId());
}
@Test
public void testLatestSnapshotIdMissing() throws Exception {
int localId = 0;
int leaderId = localId + 1;
Set<Integer> voters = Utils.mkSet(localId, leaderId);
int epoch = 2;
OffsetAndEpoch snapshotId = new OffsetAndEpoch(3, 1);
RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters)
.appendToLog(snapshotId.epoch(), Arrays.asList("a", "b", "c"))
.appendToLog(snapshotId.epoch(), Arrays.asList("d", "e", "f"))
.withElectedLeader(epoch, leaderId)
.build();
assertEquals(Optional.empty(), context.client.latestSnapshotId());
}
@Test
public void testLeaderListenerNotified() throws Exception {
int localId = 0;