diff --git a/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java b/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java index c20e0d889b6..a52e51fe112 100644 --- a/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java +++ b/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java @@ -117,6 +117,6 @@ public class ClusterTestExtensionsTest { @ClusterTest public void testDefaults(ClusterConfig config) { - Assertions.assertEquals(MetadataVersion.IBP_3_6_IV0, config.metadataVersion()); + Assertions.assertEquals(MetadataVersion.IBP_3_6_IV1, config.metadataVersion()); } } diff --git a/core/src/test/java/kafka/test/annotation/ClusterTest.java b/core/src/test/java/kafka/test/annotation/ClusterTest.java index db6671a0f63..cd9161ce923 100644 --- a/core/src/test/java/kafka/test/annotation/ClusterTest.java +++ b/core/src/test/java/kafka/test/annotation/ClusterTest.java @@ -41,6 +41,6 @@ public @interface ClusterTest { String name() default ""; SecurityProtocol securityProtocol() default SecurityProtocol.PLAINTEXT; String listener() default ""; - MetadataVersion metadataVersion() default MetadataVersion.IBP_3_6_IV0; + MetadataVersion metadataVersion() default MetadataVersion.IBP_3_6_IV1; ClusterConfigProperty[] serverProperties() default {}; } diff --git a/metadata/src/main/java/org/apache/kafka/controller/OffsetControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/OffsetControlManager.java new file mode 100644 index 00000000000..76bae6ada07 --- /dev/null +++ b/metadata/src/main/java/org/apache/kafka/controller/OffsetControlManager.java @@ -0,0 +1,421 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.common.metadata.AbortTransactionRecord; +import org.apache.kafka.common.metadata.BeginTransactionRecord; +import org.apache.kafka.common.metadata.EndTransactionRecord; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.controller.metrics.QuorumControllerMetrics; +import org.apache.kafka.raft.Batch; +import org.apache.kafka.raft.OffsetAndEpoch; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.snapshot.Snapshots; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.slf4j.Logger; + +import java.util.Optional; + + +/** + * Manages read and write offsets, and in-memory snapshots. + * + * Also manages the following metrics: + * kafka.controller:type=KafkaController,name=ActiveControllerCount + * kafka.controller:type=KafkaController,name=LastAppliedRecordLagMs + * kafka.controller:type=KafkaController,name=LastAppliedRecordOffset + * kafka.controller:type=KafkaController,name=LastAppliedRecordTimestamp + * kafka.controller:type=KafkaController,name=LastCommittedRecordOffset + */ +class OffsetControlManager { + public static class Builder { + private LogContext logContext = null; + private SnapshotRegistry snapshotRegistry = null; + private QuorumControllerMetrics metrics = null; + private Time time = Time.SYSTEM; + + Builder setLogContext(LogContext logContext) { + this.logContext = logContext; + return this; + } + + Builder setSnapshotRegistry(SnapshotRegistry snapshotRegistry) { + this.snapshotRegistry = snapshotRegistry; + return this; + } + + Builder setMetrics(QuorumControllerMetrics metrics) { + this.metrics = metrics; + return this; + } + + Builder setTime(Time time) { + this.time = time; + return this; + } + + public OffsetControlManager build() { + if (logContext == null) logContext = new LogContext(); + if (snapshotRegistry == null) snapshotRegistry = new SnapshotRegistry(logContext); + if (metrics == null) { + metrics = new QuorumControllerMetrics(Optional.empty(), time, false); + } + return new OffsetControlManager(logContext, + snapshotRegistry, + metrics, + time); + } + } + + /** + * The slf4j logger. + */ + private final Logger log; + + /** + * The snapshot registry. + */ + private final SnapshotRegistry snapshotRegistry; + + /** + * The quorum controller metrics. + */ + private final QuorumControllerMetrics metrics; + + /** + * The clock. + */ + private final Time time; + + /** + * The ID of the snapshot that we're currently replaying, or null if there is none. + */ + private OffsetAndEpoch currentSnapshotId; + + /** + * The name of the snapshot that we're currently replaying, or null if there is none. + */ + private String currentSnapshotName; + + /** + * The latest committed offset. + */ + private long lastCommittedOffset; + + /** + * The latest committed epoch. + */ + private int lastCommittedEpoch; + + /** + * The latest offset that it is safe to read from. + */ + private long lastStableOffset; + + /** + * The offset of the transaction we're in, or -1 if we are not in one. + */ + private long transactionStartOffset; + + /** + * The next offset we should write to, or -1 if the controller is not active. Exclusive offset. + */ + private long nextWriteOffset; + + private OffsetControlManager( + LogContext logContext, + SnapshotRegistry snapshotRegistry, + QuorumControllerMetrics metrics, + Time time + ) { + this.log = logContext.logger(OffsetControlManager.class); + this.snapshotRegistry = snapshotRegistry; + this.metrics = metrics; + this.time = time; + this.currentSnapshotId = null; + this.currentSnapshotName = null; + this.lastCommittedOffset = -1L; + this.lastCommittedEpoch = -1; + this.lastStableOffset = -1L; + this.transactionStartOffset = -1L; + this.nextWriteOffset = -1L; + snapshotRegistry.getOrCreateSnapshot(-1L); + metrics.setActive(false); + metrics.setLastCommittedRecordOffset(-1L); + metrics.setLastAppliedRecordOffset(-1L); + metrics.setLastAppliedRecordTimestamp(-1L); + } + + /** + * @return The SnapshotRegistry used by this offset control manager. + */ + SnapshotRegistry snapshotRegistry() { + return snapshotRegistry; + } + + /** + * @return QuorumControllerMetrics managed by this offset control manager. + */ + QuorumControllerMetrics metrics() { + return metrics; + } + + /** + * @return the ID of the current snapshot. + */ + OffsetAndEpoch currentSnapshotId() { + return currentSnapshotId; + } + + /** + * @return the name of the current snapshot. + */ + String currentSnapshotName() { + return currentSnapshotName; + } + + /** + * @return the last committed offset. + */ + long lastCommittedOffset() { + return lastCommittedOffset; + } + + /** + * @return the last committed epoch. + */ + int lastCommittedEpoch() { + return lastCommittedEpoch; + } + + /** + * @return the latest offset that it is safe to read from. + */ + long lastStableOffset() { + return lastStableOffset; + } + + /** + * @return the transaction start offset, or -1 if there is no transaction. + */ + long transactionStartOffset() { + return transactionStartOffset; + } + + /** + * @return the next offset that the active controller should write to. + */ + long nextWriteOffset() { + return nextWriteOffset; + } + + /** + * @return true only if the manager is active. + */ + boolean active() { + return nextWriteOffset != -1L; + } + + /** + * Called when the QuorumController becomes active. + * + * @param newNextWriteOffset The new next write offset to use. Must be non-negative. + */ + void activate(long newNextWriteOffset) { + if (active()) { + throw new RuntimeException("Can't activate already active OffsetControlManager."); + } + if (newNextWriteOffset < 0) { + throw new RuntimeException("Invalid negative newNextWriteOffset " + + newNextWriteOffset + "."); + } + // Before switching to active, create an in-memory snapshot at the last committed + // offset. This is required because the active controller assumes that there is always + // an in-memory snapshot at the last committed offset. + snapshotRegistry.getOrCreateSnapshot(lastStableOffset); + this.nextWriteOffset = newNextWriteOffset; + metrics.setActive(true); + } + + /** + * Called when the QuorumController becomes inactive. + */ + void deactivate() { + if (!active()) { + throw new RuntimeException("Can't deactivate inactive OffsetControlManager."); + } + metrics.setActive(false); + metrics.setLastAppliedRecordOffset(lastStableOffset); + this.nextWriteOffset = -1L; + if (!snapshotRegistry.hasSnapshot(lastStableOffset)) { + throw new RuntimeException("Unable to reset to last stable offset " + lastStableOffset + + ". No in-memory snapshot found for this offset."); + } + snapshotRegistry.revertToSnapshot(lastStableOffset); + } + + /** + * Handle the callback from the Raft layer indicating that a batch was committed. + * + * @param batch The batch that has been committed. + */ + void handleCommitBatch(Batch batch) { + this.lastCommittedOffset = batch.lastOffset(); + this.lastCommittedEpoch = batch.epoch(); + maybeAdvanceLastStableOffset(); + metrics.setLastCommittedRecordOffset(batch.lastOffset()); + if (!active()) { + // On standby controllers, the last applied record offset is equals to the last + // committed offset. + metrics.setLastAppliedRecordOffset(batch.lastOffset()); + metrics.setLastAppliedRecordTimestamp(batch.appendTimestamp()); + } + } + + /** + * Called by the active controller after it has invoked scheduleAtomicAppend to schedule some + * records to be written. + * + * @param endOffset The offset of the last record that was written. + */ + void handleScheduleAtomicAppend(long endOffset) { + this.nextWriteOffset = endOffset + 1; + + snapshotRegistry.getOrCreateSnapshot(endOffset); + + metrics.setLastAppliedRecordOffset(endOffset); + + // This is not truly the append timestamp. The KRaft client doesn't expose the append + // time when scheduling a write. This is good enough because this is called right after + // the records were given to the KRAft client for appending and the default append linger + // for KRaft is 25ms. + metrics.setLastAppliedRecordTimestamp(time.milliseconds()); + } + + /** + * Advance the last stable offset if needed. + */ + void maybeAdvanceLastStableOffset() { + long newLastStableOffset; + if (transactionStartOffset == -1L) { + newLastStableOffset = lastCommittedOffset; + } else { + newLastStableOffset = Math.min(transactionStartOffset - 1, lastCommittedOffset); + } + if (lastStableOffset < newLastStableOffset) { + lastStableOffset = newLastStableOffset; + snapshotRegistry.deleteSnapshotsUpTo(lastStableOffset); + if (!active()) { + snapshotRegistry.getOrCreateSnapshot(lastStableOffset); + } + } + } + + /** + * Called before we load a Raft snapshot. + * + * @param snapshotId The Raft snapshot offset and epoch. + */ + void beginLoadSnapshot(OffsetAndEpoch snapshotId) { + if (currentSnapshotId != null) { + throw new RuntimeException("Can't begin reading snapshot for " + snapshotId + + ", because we are already reading " + currentSnapshotId); + } + this.currentSnapshotId = snapshotId; + this.currentSnapshotName = Snapshots.filenameFromSnapshotId(snapshotId); + log.info("Starting to load snapshot {}. Previous lastCommittedOffset was {}. Previous " + + "transactionStartOffset was {}.", currentSnapshotName, lastCommittedOffset, + transactionStartOffset); + this.snapshotRegistry.reset(); + this.lastCommittedOffset = -1L; + this.lastCommittedEpoch = -1; + this.lastStableOffset = -1L; + this.transactionStartOffset = -1L; + this.nextWriteOffset = -1L; + } + + /** + * Called after we have finished loading a Raft snapshot. + * + * @param timestamp The timestamp of the snapshot. + */ + void endLoadSnapshot(long timestamp) { + if (currentSnapshotId == null) { + throw new RuntimeException("Can't end loading snapshot, because there is no " + + "current snapshot."); + } + log.info("Successfully loaded snapshot {}.", currentSnapshotName); + this.snapshotRegistry.getOrCreateSnapshot(currentSnapshotId.offset()); + this.lastCommittedOffset = currentSnapshotId.offset(); + this.lastCommittedEpoch = currentSnapshotId.epoch(); + this.lastStableOffset = currentSnapshotId.offset(); + this.transactionStartOffset = -1L; + this.nextWriteOffset = -1L; + metrics.setLastCommittedRecordOffset(currentSnapshotId.offset()); + metrics.setLastAppliedRecordOffset(currentSnapshotId.offset()); + metrics.setLastAppliedRecordTimestamp(timestamp); + this.currentSnapshotId = null; + this.currentSnapshotName = null; + } + + public void replay(BeginTransactionRecord message, long offset) { + if (currentSnapshotId != null) { + throw new RuntimeException("BeginTransactionRecord cannot appear within a snapshot."); + } + if (transactionStartOffset != -1L) { + throw new RuntimeException("Can't replay a BeginTransactionRecord at " + offset + + " because the transaction at " + transactionStartOffset + " was never closed."); + } + snapshotRegistry.getOrCreateSnapshot(offset - 1); + transactionStartOffset = offset; + log.info("Replayed {} at offset {}.", message, offset); + } + + public void replay(EndTransactionRecord message, long offset) { + if (currentSnapshotId != null) { + throw new RuntimeException("EndTransactionRecord cannot appear within a snapshot."); + } + if (transactionStartOffset == -1L) { + throw new RuntimeException("Can't replay an EndTransactionRecord at " + offset + + " because there is no open transaction."); + } + transactionStartOffset = -1L; + log.info("Replayed {} at offset {}.", message, offset); + } + + public void replay(AbortTransactionRecord message, long offset) { + if (currentSnapshotId != null) { + throw new RuntimeException("AbortTransactionRecord cannot appear within a snapshot."); + } + if (transactionStartOffset == -1L) { + throw new RuntimeException("Can't replay an AbortTransactionRecord at " + offset + + " because there is no open transaction."); + } + long preTransactionOffset = transactionStartOffset - 1; + snapshotRegistry.revertToSnapshot(preTransactionOffset); + transactionStartOffset = -1L; + log.info("Replayed {} at offset {}. Reverted to offset {}.", + message, offset, preTransactionOffset); + } + + // VisibleForTesting + void setNextWriteOffset(long newNextWriteOffset) { + this.nextWriteOffset = newNextWriteOffset; + } +} diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java index b658af4c111..2586be0f457 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java +++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java @@ -49,10 +49,13 @@ import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData; import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData; import org.apache.kafka.common.message.UpdateFeaturesRequestData; import org.apache.kafka.common.message.UpdateFeaturesResponseData; +import org.apache.kafka.common.metadata.AbortTransactionRecord; import org.apache.kafka.common.metadata.AccessControlEntryRecord; +import org.apache.kafka.common.metadata.BeginTransactionRecord; import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord; import org.apache.kafka.common.metadata.ClientQuotaRecord; import org.apache.kafka.common.metadata.ConfigRecord; +import org.apache.kafka.common.metadata.EndTransactionRecord; import org.apache.kafka.common.metadata.FeatureLevelRecord; import org.apache.kafka.common.metadata.FenceBrokerRecord; import org.apache.kafka.common.metadata.MetadataRecordType; @@ -104,6 +107,7 @@ import org.apache.kafka.server.authorizer.AclDeleteResult; import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.server.common.MetadataVersion; import org.apache.kafka.server.fault.FaultHandler; +import org.apache.kafka.server.fault.FaultHandlerException; import org.apache.kafka.server.policy.AlterConfigPolicy; import org.apache.kafka.server.policy.CreateTopicPolicy; import org.apache.kafka.snapshot.SnapshotReader; @@ -426,22 +430,6 @@ public final class QuorumController implements Controller { return raftClient.leaderAndEpoch().leaderId(); } - /** - * @return The offset that we should perform read operations at. - */ - private long currentReadOffset() { - if (isActiveController()) { - // The active controller keeps an in-memory snapshot at the last committed offset, - // which we want to read from when performing read operations. This will avoid - // reading uncommitted data. - return lastCommittedOffset; - } else { - // Standby controllers never have uncommitted data in memory. Therefore, we always - // read the latest from every data structure. - return SnapshotRegistry.LATEST_EPOCH; - } - } - private void handleEventEnd(String name, long startProcessingTimeNs) { long endProcessingTime = time.nanoseconds(); long deltaNs = endProcessingTime - startProcessingTimeNs; @@ -467,10 +455,10 @@ public final class QuorumController implements Controller { fromInternal(exception, () -> latestController()); int epoch = curClaimEpoch; if (epoch == -1) { - epoch = lastCommittedEpoch; + epoch = offsetControl.lastCommittedEpoch(); } String failureMessage = info.failureMessage(epoch, deltaUs, - isActiveController(), lastCommittedOffset); + isActiveController(), offsetControl.lastCommittedOffset()); if (info.isTimeoutException() && (!deltaUs.isPresent())) { controllerMetrics.incrementOperationsTimedOut(); } @@ -712,7 +700,7 @@ public final class QuorumController implements Controller { } ControllerResult result = op.generateRecordsAndResult(); if (result.records().isEmpty()) { - op.processBatchEndOffset(writeOffset); + op.processBatchEndOffset(offsetControl.nextWriteOffset() - 1); // If the operation did not return any records, then it was actually just // a read after all, and not a read + write. However, this read was done // from the latest in-memory state, which might contain uncommitted data. @@ -736,16 +724,15 @@ public final class QuorumController implements Controller { // them to the log. long offset = appendRecords(log, result, maxRecordsPerBatch, new Function, Long>() { - private long prevEndOffset = writeOffset; - @Override public Long apply(List records) { // Start by trying to apply the record to our in-memory state. This should always // succeed; if it does not, that's a fatal error. It is important to do this before // scheduling the record for Raft replication. int recordIndex = 0; + long nextWriteOffset = offsetControl.nextWriteOffset(); for (ApiMessageAndVersion message : records) { - long recordOffset = prevEndOffset + 1 + recordIndex; + long recordOffset = nextWriteOffset + recordIndex; try { replay(message.message(), Optional.empty(), recordOffset); } catch (Throwable e) { @@ -753,22 +740,20 @@ public final class QuorumController implements Controller { "record at offset %d on active controller, from the " + "batch with baseOffset %d", message.message().getClass().getSimpleName(), - recordOffset, prevEndOffset + 1); + recordOffset, nextWriteOffset); throw fatalFaultHandler.handleFault(failureMessage, e); } recordIndex++; } - long nextEndOffset = prevEndOffset + recordIndex; + long nextEndOffset = nextWriteOffset - 1 + recordIndex; raftClient.scheduleAtomicAppend(controllerEpoch, - OptionalLong.of(prevEndOffset + 1), + OptionalLong.of(nextWriteOffset), records); - snapshotRegistry.getOrCreateSnapshot(nextEndOffset); - prevEndOffset = nextEndOffset; + offsetControl.handleScheduleAtomicAppend(nextEndOffset); return nextEndOffset; } }); op.processBatchEndOffset(offset); - updateWriteOffset(offset); resultAndOffset = ControllerResultAndOffset.of(offset, result); log.debug("Read-write operation {} will be completed when the log " + @@ -997,12 +982,7 @@ public final class QuorumController implements Controller { recordIndex++; } } - - updateLastCommittedState( - offset, - epoch, - batch.appendTimestamp() - ); + offsetControl.handleCommitBatch(batch); } } finally { reader.close(); @@ -1017,43 +997,39 @@ public final class QuorumController implements Controller { String snapshotName = Snapshots.filenameFromSnapshotId(reader.snapshotId()); if (isActiveController()) { throw fatalFaultHandler.handleFault("Asked to load snapshot " + snapshotName + - ", but we are the active controller at epoch " + curClaimEpoch); + ", but we are the active controller at epoch " + curClaimEpoch); } - log.info("Starting to replay snapshot {}, from last commit offset {} and epoch {}", - snapshotName, lastCommittedOffset, lastCommittedEpoch); - - resetToEmptyState(); - + offsetControl.beginLoadSnapshot(reader.snapshotId()); while (reader.hasNext()) { Batch batch = reader.next(); long offset = batch.lastOffset(); List messages = batch.records(); log.debug("Replaying snapshot {} batch with last offset of {}", - snapshotName, offset); + snapshotName, offset); int i = 1; for (ApiMessageAndVersion message : messages) { try { - replay(message.message(), Optional.of(reader.snapshotId()), reader.lastContainedLogOffset()); + replay(message.message(), Optional.of(reader.snapshotId()), + reader.lastContainedLogOffset()); } catch (Throwable e) { String failureMessage = String.format("Unable to apply %s record " + - "from snapshot %s on standby controller, which was %d of " + - "%d record(s) in the batch with baseOffset %d.", - message.message().getClass().getSimpleName(), reader.snapshotId(), - i, messages.size(), batch.baseOffset()); + "from snapshot %s on standby controller, which was %d of " + + "%d record(s) in the batch with baseOffset %d.", + message.message().getClass().getSimpleName(), reader.snapshotId(), + i, messages.size(), batch.baseOffset()); throw fatalFaultHandler.handleFault(failureMessage, e); } i++; } } - log.info("Finished replaying snapshot {}", snapshotName); - - updateLastCommittedState( - reader.lastContainedLogOffset(), - reader.lastContainedLogEpoch(), - reader.lastContainedLogTimestamp()); - snapshotRegistry.getOrCreateSnapshot(lastCommittedOffset); + offsetControl.endLoadSnapshot(reader.lastContainedLogTimestamp()); + } catch (FaultHandlerException e) { + throw e; + } catch (Throwable e) { + throw fatalFaultHandler.handleFault("Error while loading snapshot " + + reader.snapshotId(), e); } finally { reader.close(); } @@ -1076,15 +1052,16 @@ public final class QuorumController implements Controller { } else { log.warn("Renouncing the leadership due to a metadata log event. " + "We were the leader at epoch {}, but in the new epoch {}, " + - "the leader is {}. Reverting to last committed offset {}.", - curClaimEpoch, newLeader.epoch(), newLeaderName, lastCommittedOffset); + "the leader is {}. Reverting to last stable offset {}.", + curClaimEpoch, newLeader.epoch(), newLeaderName, + offsetControl.lastStableOffset()); renounce(); } } else if (newLeader.isLeader(nodeId)) { - long newLastWriteOffset = raftClient.logEndOffset() - 1; - log.info("Becoming the active controller at epoch {}, last write offset {}.", - newLeader.epoch(), newLastWriteOffset); - claim(newLeader.epoch(), newLastWriteOffset); + long newNextWriteOffset = raftClient.logEndOffset(); + log.info("Becoming the active controller at epoch {}, next write offset {}.", + newLeader.epoch(), newNextWriteOffset); + claim(newLeader.epoch(), newNextWriteOffset); } else { log.info("In the new epoch {}, the leader is {}.", newLeader.epoch(), newLeaderName); @@ -1116,38 +1093,16 @@ public final class QuorumController implements Controller { return claimEpoch != -1; } - private void updateWriteOffset(long offset) { - writeOffset = offset; - if (isActiveController()) { - controllerMetrics.setLastAppliedRecordOffset(writeOffset); - // This is not truly the append timestamp. The KRaft client doesn't expose the append time when scheduling a write. - // This is good enough because this is called right after the records were given to the KRAft client for appending and - // the default append linger for KRaft is 25ms. - controllerMetrics.setLastAppliedRecordTimestamp(time.milliseconds()); - } else { - // Change the last applied record metrics back to the last committed state. Inactive controllers report the last committed - // state while active controllers report the latest state which may include uncommitted data. - controllerMetrics.setLastAppliedRecordOffset(lastCommittedOffset); - controllerMetrics.setLastAppliedRecordTimestamp(lastCommittedTimestamp); - } - } - - private void claim(int epoch, long newLastWriteOffset) { + private void claim(int epoch, long newNextWriteOffset) { try { if (curClaimEpoch != -1) { throw new RuntimeException("Cannot claim leadership because we are already the " + "active controller."); } curClaimEpoch = epoch; - controllerMetrics.setActive(true); - updateWriteOffset(newLastWriteOffset); + offsetControl.activate(newNextWriteOffset); clusterControl.activate(); - // Before switching to active, create an in-memory snapshot at the last committed - // offset. This is required because the active controller assumes that there is always - // an in-memory snapshot at the last committed offset. - snapshotRegistry.getOrCreateSnapshot(lastCommittedOffset); - // Prepend the activate event. It is important that this event go at the beginning // of the queue rather than the end (hence prepend rather than append). It's also // important not to use prepend for anything else, to preserve the ordering here. @@ -1276,22 +1231,6 @@ public final class QuorumController implements Controller { } } - private void updateLastCommittedState( - long offset, - int epoch, - long timestamp - ) { - lastCommittedOffset = offset; - lastCommittedEpoch = epoch; - lastCommittedTimestamp = timestamp; - - controllerMetrics.setLastCommittedRecordOffset(offset); - if (!isActiveController()) { - controllerMetrics.setLastAppliedRecordOffset(offset); - controllerMetrics.setLastAppliedRecordTimestamp(timestamp); - } - } - void renounce() { try { if (curClaimEpoch == -1) { @@ -1300,16 +1239,9 @@ public final class QuorumController implements Controller { } raftClient.resign(curClaimEpoch); curClaimEpoch = -1; - controllerMetrics.setActive(false); deferredEventQueue.failAll(ControllerExceptions. newWrongControllerException(OptionalInt.empty())); - - if (!snapshotRegistry.hasSnapshot(lastCommittedOffset)) { - throw new RuntimeException("Unable to find last committed offset " + - lastCommittedEpoch + " in snapshot registry."); - } - snapshotRegistry.revertToSnapshot(lastCommittedOffset); - updateWriteOffset(-1); + offsetControl.deactivate(); clusterControl.deactivate(); cancelMaybeFenceReplicas(); cancelMaybeBalancePartitionLeaders(); @@ -1555,20 +1487,20 @@ public final class QuorumController implements Controller { case ZK_MIGRATION_STATE_RECORD: featureControl.replay((ZkMigrationStateRecord) message); break; + case BEGIN_TRANSACTION_RECORD: + offsetControl.replay((BeginTransactionRecord) message, offset); + break; + case END_TRANSACTION_RECORD: + offsetControl.replay((EndTransactionRecord) message, offset); + break; + case ABORT_TRANSACTION_RECORD: + offsetControl.replay((AbortTransactionRecord) message, offset); + break; default: throw new RuntimeException("Unhandled record type " + type); } } - /** - * Clear all data structures and reset all KRaft state. - */ - private void resetToEmptyState() { - snapshotRegistry.reset(); - - updateLastCommittedState(-1, -1, -1); - } - /** * Handles faults that cause a controller failover, but which don't abort the process. */ @@ -1621,6 +1553,11 @@ public final class QuorumController implements Controller { */ private final DeferredEventQueue deferredEventQueue; + /** + * Manages read and write offsets, and in-memory snapshots. + */ + private final OffsetControlManager offsetControl; + /** * A predicate that returns information about whether a ConfigResource exists. */ @@ -1699,26 +1636,6 @@ public final class QuorumController implements Controller { */ private volatile int curClaimEpoch; - /** - * The last offset we have committed, or -1 if we have not committed any offsets. - */ - private long lastCommittedOffset = -1; - - /** - * The epoch of the last offset we have committed, or -1 if we have not committed any offsets. - */ - private int lastCommittedEpoch = -1; - - /** - * The timestamp in milliseconds of the last batch we have committed, or -1 if we have not committed any offset. - */ - private long lastCommittedTimestamp = -1; - - /** - * If we have called scheduleWrite, this is the last offset we got back from it. - */ - private long writeOffset; - /** * How long to delay partition leader balancing operations. */ @@ -1803,6 +1720,12 @@ public final class QuorumController implements Controller { this.controllerMetrics = controllerMetrics; this.snapshotRegistry = new SnapshotRegistry(logContext); this.deferredEventQueue = new DeferredEventQueue(logContext); + this.offsetControl = new OffsetControlManager.Builder(). + setLogContext(logContext). + setSnapshotRegistry(snapshotRegistry). + setMetrics(controllerMetrics). + setTime(time). + build(); this.resourceExists = new ConfigResourceExistenceChecker(); this.configurationControl = new ConfigurationControlManager.Builder(). setLogContext(logContext). @@ -1876,9 +1799,6 @@ public final class QuorumController implements Controller { this.zkRecordConsumer = new MigrationRecordConsumer(); this.zkMigrationEnabled = zkMigrationEnabled; this.recordRedactor = new RecordRedactor(configSchema); - updateWriteOffset(-1); - - resetToEmptyState(); log.info("Creating new QuorumController with clusterId {}.{}", clusterId, zkMigrationEnabled ? " ZK migration mode is enabled." : ""); @@ -1939,7 +1859,7 @@ public final class QuorumController implements Controller { if (names.isEmpty()) return CompletableFuture.completedFuture(Collections.emptyMap()); return appendReadEvent("findTopicIds", context.deadlineNs(), - () -> replicationControl.findTopicIds(currentReadOffset(), names)); + () -> replicationControl.findTopicIds(offsetControl.lastStableOffset(), names)); } @Override @@ -1947,7 +1867,7 @@ public final class QuorumController implements Controller { ControllerRequestContext context ) { return appendReadEvent("findAllTopicIds", context.deadlineNs(), - () -> replicationControl.findAllTopicIds(currentReadOffset())); + () -> replicationControl.findAllTopicIds(offsetControl.lastStableOffset())); } @Override @@ -1958,7 +1878,7 @@ public final class QuorumController implements Controller { if (ids.isEmpty()) return CompletableFuture.completedFuture(Collections.emptyMap()); return appendReadEvent("findTopicNames", context.deadlineNs(), - () -> replicationControl.findTopicNames(currentReadOffset(), ids)); + () -> replicationControl.findTopicNames(offsetControl.lastStableOffset(), ids)); } @Override @@ -1978,7 +1898,7 @@ public final class QuorumController implements Controller { Map> resources ) { return appendReadEvent("describeConfigs", context.deadlineNs(), - () -> configurationControl.describeConfigs(currentReadOffset(), resources)); + () -> configurationControl.describeConfigs(offsetControl.lastStableOffset(), resources)); } @Override @@ -2000,7 +1920,7 @@ public final class QuorumController implements Controller { ControllerRequestContext context ) { return appendReadEvent("getFinalizedFeatures", context.deadlineNs(), - () -> featureControl.finalizedFeatures(currentReadOffset())); + () -> featureControl.finalizedFeatures(offsetControl.lastStableOffset())); } @Override @@ -2045,7 +1965,8 @@ public final class QuorumController implements Controller { new ListPartitionReassignmentsResponseData().setErrorMessage(null)); } return appendReadEvent("listPartitionReassignments", context.deadlineNs(), - () -> replicationControl.listPartitionReassignments(request.topics(), currentReadOffset())); + () -> replicationControl.listPartitionReassignments(request.topics(), + offsetControl.lastStableOffset())); } @Override @@ -2120,7 +2041,7 @@ public final class QuorumController implements Controller { return appendWriteEvent("registerBroker", context.deadlineNs(), () -> { ControllerResult result = clusterControl. - registerBroker(request, writeOffset + 1, featureControl. + registerBroker(request, offsetControl.nextWriteOffset(), featureControl. finalizedFeatures(Long.MAX_VALUE)); rescheduleMaybeFenceStaleBrokers(); return result; @@ -2272,9 +2193,9 @@ public final class QuorumController implements Controller { } // VisibleForTesting - void setWriteOffset(long newWriteOffset) { - appendControlEvent("setWriteOffset", () -> { - this.writeOffset = newWriteOffset; + void setNewNextWriteOffset(long newNextWriteOffset) { + appendControlEvent("setNewNextWriteOffset", () -> { + offsetControl.setNextWriteOffset(newNextWriteOffset); }); } } diff --git a/metadata/src/main/resources/common/metadata/AbortTransactionRecord.json b/metadata/src/main/resources/common/metadata/AbortTransactionRecord.json new file mode 100644 index 00000000000..9ce9c2f5272 --- /dev/null +++ b/metadata/src/main/resources/common/metadata/AbortTransactionRecord.json @@ -0,0 +1,27 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 25, + "type": "metadata", + "name": "AbortTransactionRecord", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ + { "name": "Reason", "type": "string", "default": "null", + "versions": "0+", "nullableVersions": "0+", "taggedVersions": "0+", "tag": 0, + "about": "An optional textual description of why the transaction was aborted." } + ] +} diff --git a/metadata/src/main/resources/common/metadata/BeginTransactionRecord.json b/metadata/src/main/resources/common/metadata/BeginTransactionRecord.json new file mode 100644 index 00000000000..de583916b1f --- /dev/null +++ b/metadata/src/main/resources/common/metadata/BeginTransactionRecord.json @@ -0,0 +1,27 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 23, + "type": "metadata", + "name": "BeginTransactionRecord", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ + { "name": "Name", "type": "string", "default": "null", + "versions": "0+", "nullableVersions": "0+", "taggedVersions": "0+", "tag": 0, + "about": "An optional textual description of this transaction." } + ] +} diff --git a/metadata/src/main/resources/common/metadata/EndTransactionRecord.json b/metadata/src/main/resources/common/metadata/EndTransactionRecord.json new file mode 100644 index 00000000000..6c1489630d1 --- /dev/null +++ b/metadata/src/main/resources/common/metadata/EndTransactionRecord.json @@ -0,0 +1,24 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 24, + "type": "metadata", + "name": "EndTransactionRecord", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ + ] +} diff --git a/metadata/src/test/java/org/apache/kafka/controller/OffsetControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/OffsetControlManagerTest.java new file mode 100644 index 00000000000..bb618f892c8 --- /dev/null +++ b/metadata/src/test/java/org/apache/kafka/controller/OffsetControlManagerTest.java @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.common.metadata.AbortTransactionRecord; +import org.apache.kafka.common.metadata.BeginTransactionRecord; +import org.apache.kafka.common.metadata.EndTransactionRecord; +import org.apache.kafka.common.metadata.NoOpRecord; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.raft.Batch; +import org.apache.kafka.raft.OffsetAndEpoch; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.timeline.TrackingSnapshotRegistry; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import java.util.Arrays; +import java.util.Collections; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + + +@Timeout(value = 40) +public class OffsetControlManagerTest { + @Test + public void testInitialValues() { + OffsetControlManager offsetControl = new OffsetControlManager.Builder().build(); + assertNull(offsetControl.currentSnapshotId()); + assertNull(offsetControl.currentSnapshotName()); + assertEquals(-1L, offsetControl.lastCommittedOffset()); + assertEquals(-1, offsetControl.lastCommittedEpoch()); + assertEquals(-1, offsetControl.lastStableOffset()); + assertEquals(-1, offsetControl.transactionStartOffset()); + assertEquals(-1, offsetControl.nextWriteOffset()); + assertFalse(offsetControl.active()); + assertEquals(Arrays.asList(-1L), offsetControl.snapshotRegistry().epochsList()); + } + + @Test + public void testActivate() { + OffsetControlManager offsetControl = new OffsetControlManager.Builder().build(); + offsetControl.activate(1000L); + assertEquals(1000L, offsetControl.nextWriteOffset()); + assertTrue(offsetControl.active()); + assertTrue(offsetControl.metrics().active()); + assertEquals(Arrays.asList(-1L), offsetControl.snapshotRegistry().epochsList()); + } + + @Test + public void testActivateFailsIfAlreadyActive() { + OffsetControlManager offsetControl = new OffsetControlManager.Builder().build(); + offsetControl.activate(1000L); + assertEquals("Can't activate already active OffsetControlManager.", + assertThrows(RuntimeException.class, + () -> offsetControl.activate(2000L)). + getMessage()); + } + + @Test + public void testActivateFailsIfNewNextWriteOffsetIsNegative() { + OffsetControlManager offsetControl = new OffsetControlManager.Builder().build(); + assertEquals("Invalid negative newNextWriteOffset -2.", + assertThrows(RuntimeException.class, + () -> offsetControl.activate(-2)). + getMessage()); + } + + @Test + public void testActivateAndDeactivate() { + OffsetControlManager offsetControl = new OffsetControlManager.Builder().build(); + offsetControl.activate(1000L); + assertEquals(1000L, offsetControl.nextWriteOffset()); + offsetControl.deactivate(); + assertEquals(-1L, offsetControl.nextWriteOffset()); + } + + @Test + public void testDeactivateFailsIfNotActive() { + OffsetControlManager offsetControl = new OffsetControlManager.Builder().build(); + assertEquals("Can't deactivate inactive OffsetControlManager.", + assertThrows(RuntimeException.class, + () -> offsetControl.deactivate()). + getMessage()); + } + + private static Batch newFakeBatch( + long lastOffset, + int epoch, + long appendTimestamp + ) { + return Batch.data( + lastOffset, + epoch, + appendTimestamp, + 100, + Collections.singletonList(new ApiMessageAndVersion(new NoOpRecord(), (short) 0))); + } + + @Test + public void testHandleCommitBatch() { + OffsetControlManager offsetControl = new OffsetControlManager.Builder().build(); + + offsetControl.handleCommitBatch(newFakeBatch(1000L, 200, 3000L)); + assertEquals(Arrays.asList(1000L), offsetControl.snapshotRegistry().epochsList()); + assertEquals(1000L, offsetControl.lastCommittedOffset()); + assertEquals(200, offsetControl.lastCommittedEpoch()); + assertEquals(1000L, offsetControl.lastStableOffset()); + assertEquals(-1L, offsetControl.transactionStartOffset()); + assertEquals(-1L, offsetControl.nextWriteOffset()); + assertFalse(offsetControl.active()); + assertFalse(offsetControl.metrics().active()); + assertEquals(1000L, offsetControl.metrics().lastAppliedRecordOffset()); + assertEquals(1000L, offsetControl.metrics().lastCommittedRecordOffset()); + assertEquals(3000L, offsetControl.metrics().lastAppliedRecordTimestamp()); + } + + @Test + public void testHandleScheduleAtomicAppend() { + OffsetControlManager offsetControl = new OffsetControlManager.Builder().build(); + + offsetControl.handleScheduleAtomicAppend(2000L); + assertEquals(2001L, offsetControl.nextWriteOffset()); + assertEquals(2000L, offsetControl.metrics().lastAppliedRecordOffset()); + assertEquals(-1L, offsetControl.lastStableOffset()); + assertEquals(-1L, offsetControl.lastCommittedOffset()); + assertEquals(Arrays.asList(-1L, 2000L), offsetControl.snapshotRegistry().epochsList()); + + offsetControl.handleCommitBatch(newFakeBatch(2000L, 200, 3000L)); + assertEquals(2000L, offsetControl.lastStableOffset()); + assertEquals(2000L, offsetControl.lastCommittedOffset()); + assertEquals(Arrays.asList(2000L), offsetControl.snapshotRegistry().epochsList()); + } + + @Test + public void testHandleLoadSnapshot() { + TrackingSnapshotRegistry snapshotRegistry = new TrackingSnapshotRegistry(new LogContext()); + OffsetControlManager offsetControl = new OffsetControlManager.Builder(). + setSnapshotRegistry(snapshotRegistry). + build(); + + offsetControl.beginLoadSnapshot(new OffsetAndEpoch(4000L, 300)); + assertEquals(Arrays.asList("snapshot[-1]", "reset"), snapshotRegistry.operations()); + assertEquals(new OffsetAndEpoch(4000L, 300), offsetControl.currentSnapshotId()); + assertEquals("00000000000000004000-0000000300", offsetControl.currentSnapshotName()); + assertEquals(Arrays.asList(), offsetControl.snapshotRegistry().epochsList()); + + offsetControl.endLoadSnapshot(3456L); + assertEquals(Arrays.asList("snapshot[-1]", "reset", "snapshot[4000]"), + snapshotRegistry.operations()); + assertNull(offsetControl.currentSnapshotId()); + assertNull(offsetControl.currentSnapshotName()); + assertEquals(Arrays.asList(4000L), offsetControl.snapshotRegistry().epochsList()); + assertEquals(4000L, offsetControl.lastCommittedOffset()); + assertEquals(300, offsetControl.lastCommittedEpoch()); + assertEquals(4000L, offsetControl.lastStableOffset()); + assertEquals(-1L, offsetControl.transactionStartOffset()); + assertEquals(-1L, offsetControl.nextWriteOffset()); + assertEquals(4000L, offsetControl.metrics().lastCommittedRecordOffset()); + assertEquals(4000L, offsetControl.metrics().lastAppliedRecordOffset()); + assertEquals(3456L, offsetControl.metrics().lastAppliedRecordTimestamp()); + } + + @Test + public void testBeginTransactionRecordNotAllowedInSnapshot() { + OffsetControlManager offsetControl = new OffsetControlManager.Builder().build(); + offsetControl.beginLoadSnapshot(new OffsetAndEpoch(4000L, 300)); + assertEquals("BeginTransactionRecord cannot appear within a snapshot.", + assertThrows(RuntimeException.class, + () -> offsetControl.replay(new BeginTransactionRecord(), 1000L)). + getMessage()); + } + + @Test + public void testEndTransactionRecordNotAllowedInSnapshot() { + OffsetControlManager offsetControl = new OffsetControlManager.Builder().build(); + offsetControl.beginLoadSnapshot(new OffsetAndEpoch(4000L, 300)); + assertEquals("EndTransactionRecord cannot appear within a snapshot.", + assertThrows(RuntimeException.class, + () -> offsetControl.replay(new EndTransactionRecord(), 1000L)). + getMessage()); + } + + @Test + public void testAbortTransactionRecordNotAllowedInSnapshot() { + OffsetControlManager offsetControl = new OffsetControlManager.Builder().build(); + offsetControl.beginLoadSnapshot(new OffsetAndEpoch(4000L, 300)); + assertEquals("AbortTransactionRecord cannot appear within a snapshot.", + assertThrows(RuntimeException.class, + () -> offsetControl.replay(new AbortTransactionRecord(), 1000L)). + getMessage()); + } + + @Test + public void testEndLoadSnapshotFailsWhenNotInSnapshot() { + OffsetControlManager offsetControl = new OffsetControlManager.Builder().build(); + assertEquals("Can't end loading snapshot, because there is no current snapshot.", + assertThrows(RuntimeException.class, + () -> offsetControl.endLoadSnapshot(1000L)). + getMessage()); + } + + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void testReplayTransaction(boolean aborted) { + TrackingSnapshotRegistry snapshotRegistry = new TrackingSnapshotRegistry(new LogContext()); + OffsetControlManager offsetControl = new OffsetControlManager.Builder(). + setSnapshotRegistry(snapshotRegistry). + build(); + + offsetControl.replay(new BeginTransactionRecord(), 1500L); + assertEquals(1500L, offsetControl.transactionStartOffset()); + assertEquals(Arrays.asList(-1L, 1499L), offsetControl.snapshotRegistry().epochsList()); + + offsetControl.handleCommitBatch(newFakeBatch(1550L, 100, 2000L)); + assertEquals(1550L, offsetControl.lastCommittedOffset()); + assertEquals(100, offsetControl.lastCommittedEpoch()); + assertEquals(1499L, offsetControl.lastStableOffset()); + assertEquals(Arrays.asList(1499L), offsetControl.snapshotRegistry().epochsList()); + + if (aborted) { + offsetControl.replay(new AbortTransactionRecord(), 1600L); + assertEquals(Arrays.asList("snapshot[-1]", "snapshot[1499]", "revert[1499]"), + snapshotRegistry.operations()); + } else { + offsetControl.replay(new EndTransactionRecord(), 1600L); + assertEquals(Arrays.asList("snapshot[-1]", "snapshot[1499]"), + snapshotRegistry.operations()); + } + assertEquals(-1L, offsetControl.transactionStartOffset()); + assertEquals(1499L, offsetControl.lastStableOffset()); + + offsetControl.handleCommitBatch(newFakeBatch(1650, 100, 2100L)); + assertEquals(1650, offsetControl.lastStableOffset()); + assertEquals(Arrays.asList(1650L), offsetControl.snapshotRegistry().epochsList()); + } + + @Test + public void testLoadSnapshotClearsTransactionalState() { + TrackingSnapshotRegistry snapshotRegistry = new TrackingSnapshotRegistry(new LogContext()); + OffsetControlManager offsetControl = new OffsetControlManager.Builder(). + setSnapshotRegistry(snapshotRegistry). + build(); + offsetControl.replay(new BeginTransactionRecord(), 1500L); + offsetControl.beginLoadSnapshot(new OffsetAndEpoch(4000L, 300)); + assertEquals(-1L, offsetControl.transactionStartOffset()); + assertEquals(Arrays.asList("snapshot[-1]", "snapshot[1499]", "reset"), + snapshotRegistry.operations()); + } +} diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerIntegrationTestUtils.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerIntegrationTestUtils.java index ff61e1dfdb2..611c1490942 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerIntegrationTestUtils.java +++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerIntegrationTestUtils.java @@ -92,7 +92,7 @@ public class QuorumControllerIntegrationTestUtils { .setBrokerId(brokerId) .setRack(null) .setClusterId(controller.clusterId()) - .setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_6_IV0)) + .setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.latest())) .setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwB" + brokerId)) .setListeners(new ListenerCollection( Arrays.asList( diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerMetricsIntegrationTest.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerMetricsIntegrationTest.java index 9dc538f4e2f..52049b5626d 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerMetricsIntegrationTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerMetricsIntegrationTest.java @@ -110,7 +110,7 @@ public class QuorumControllerMetricsIntegrationTest { } }); if (forceFailoverUsingLogLayer) { - controlEnv.activeController().setWriteOffset(123L); + controlEnv.activeController().setNewNextWriteOffset(123L); TestUtils.retryOnExceptionWithTimeout(30_000, () -> { createTopics(controlEnv.activeController(), "test_", 1, 1); diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java index bdf5e8f86e5..67e08356ddd 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java @@ -158,7 +158,7 @@ public class QuorumControllerTest { ) { controlEnv.activeController().registerBroker(ANONYMOUS_CONTEXT, new BrokerRegistrationRequestData(). - setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_6_IV0)). + setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_6_IV1)). setBrokerId(0). setClusterId(logEnv.clusterId())).get(); testConfigurationOperations(controlEnv.activeController()); @@ -199,7 +199,7 @@ public class QuorumControllerTest { ) { controlEnv.activeController().registerBroker(ANONYMOUS_CONTEXT, new BrokerRegistrationRequestData(). - setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_6_IV0)). + setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_6_IV1)). setBrokerId(0). setClusterId(logEnv.clusterId())).get(); testDelayedConfigurationOperations(logEnv, controlEnv.activeController()); @@ -536,7 +536,7 @@ public class QuorumControllerTest { setBrokerId(0). setClusterId(active.clusterId()). setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwBA")). - setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_6_IV0)). + setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_6_IV1)). setListeners(listeners)); assertEquals(3L, reply.get().epoch()); CreateTopicsRequestData createTopicsRequestData = diff --git a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java index 17220e7dbf5..1875a798fa0 100644 --- a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java +++ b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java @@ -177,7 +177,10 @@ public enum MetadataVersion { IBP_3_5_IV2(11, "3.5", "IV2", true), // Remove leader epoch bump when KRaft controller shrinks the ISR (KAFKA-15021) - IBP_3_6_IV0(12, "3.6", "IV0", false); + IBP_3_6_IV0(12, "3.6", "IV0", false), + + // Add metadata transactions + IBP_3_6_IV1(13, "3.6", "IV1", true); // NOTE: update the default version in @ClusterTest annotation to point to the latest version public static final String FEATURE_NAME = "metadata.version"; @@ -267,6 +270,10 @@ public enum MetadataVersion { return !this.isAtLeast(IBP_3_6_IV0); } + public boolean isMetadataTransactionSupported() { + return this.isAtLeast(IBP_3_6_IV1); + } + public boolean isKRaftSupported() { return this.featureLevel > 0; } diff --git a/server-common/src/main/java/org/apache/kafka/timeline/SnapshotRegistry.java b/server-common/src/main/java/org/apache/kafka/timeline/SnapshotRegistry.java index 25177d45d71..0d10eeaa2c4 100644 --- a/server-common/src/main/java/org/apache/kafka/timeline/SnapshotRegistry.java +++ b/server-common/src/main/java/org/apache/kafka/timeline/SnapshotRegistry.java @@ -204,6 +204,7 @@ public class SnapshotRegistry { * @param targetEpoch The epoch of the snapshot to revert to. */ public void revertToSnapshot(long targetEpoch) { + log.debug("Reverting to in-memory snapshot {}", targetEpoch); Snapshot target = getSnapshot(targetEpoch); Iterator iterator = iterator(target); iterator.next(); diff --git a/server-common/src/test/java/org/apache/kafka/timeline/TrackingSnapshotRegistry.java b/server-common/src/test/java/org/apache/kafka/timeline/TrackingSnapshotRegistry.java new file mode 100644 index 00000000000..182051a3e88 --- /dev/null +++ b/server-common/src/test/java/org/apache/kafka/timeline/TrackingSnapshotRegistry.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.timeline; + +import org.apache.kafka.common.utils.LogContext; + +import java.util.ArrayList; +import java.util.List; + + +public class TrackingSnapshotRegistry extends SnapshotRegistry { + private final List operations = new ArrayList<>(); + + public TrackingSnapshotRegistry(LogContext logContext) { + super(logContext); + } + + public List operations() { + return new ArrayList<>(operations); + } + + @Override + public void revertToSnapshot(long targetEpoch) { + operations.add("revert[" + targetEpoch + "]"); + super.revertToSnapshot(targetEpoch); + } + + @Override + public void reset() { + operations.add("reset"); + super.reset(); + } + + @Override + public Snapshot getOrCreateSnapshot(long epoch) { + if (!hasSnapshot(epoch)) { + operations.add("snapshot[" + epoch + "]"); + } + return super.getOrCreateSnapshot(epoch); + } +} diff --git a/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java index d8c0b0df0e3..8d7f56fcae7 100644 --- a/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java @@ -67,7 +67,7 @@ public class FeatureCommandTest { assertEquals(0, FeatureCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(), "describe")) ); assertEquals("Feature: metadata.version\tSupportedMinVersion: 3.0-IV1\t" + - "SupportedMaxVersion: 3.6-IV0\tFinalizedVersionLevel: 3.3-IV1\t", outputWithoutEpoch(commandOutput)); + "SupportedMaxVersion: 3.6-IV1\tFinalizedVersionLevel: 3.3-IV1\t", outputWithoutEpoch(commandOutput)); } @ClusterTest(clusterType = Type.ZK, metadataVersion = MetadataVersion.IBP_3_3_IV1) @@ -125,7 +125,7 @@ public class FeatureCommandTest { "disable", "--feature", "metadata.version")) ); assertEquals("Could not disable metadata.version. Invalid update version 0 for feature " + - "metadata.version. Local controller 3000 only supports versions 1-12", commandOutput); + "metadata.version. Local controller 3000 only supports versions 1-13", commandOutput); commandOutput = ToolsTestUtils.captureStandardOut(() -> assertEquals(1, FeatureCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(),