mirror of https://github.com/apache/kafka.git
MINOR: Rename handleSnapshot to handleLoadSnapshot (#13727)
Rename handleSnapshot to handleLoadSnapshot to make it explicit that it is handling snapshot load, not generation. Reviewers: Colin P. McCabe <cmccabe@apache.org>, Jason Gustafson <jason@confluent.io>
This commit is contained in:
parent
3c8665025a
commit
d944ef1efb
|
|
@ -41,7 +41,7 @@ class OffsetTrackingListener extends RaftClient.Listener[ApiMessageAndVersion] {
|
|||
reader.close()
|
||||
}
|
||||
|
||||
override def handleSnapshot(reader: SnapshotReader[ApiMessageAndVersion]): Unit = {
|
||||
override def handleLoadSnapshot(reader: SnapshotReader[ApiMessageAndVersion]): Unit = {
|
||||
_highestOffset = reader.lastContainedLogOffset()
|
||||
reader.close()
|
||||
}
|
||||
|
|
|
|||
|
|
@ -180,7 +180,7 @@ class TestRaftServer(
|
|||
eventQueue.offer(HandleCommit(reader))
|
||||
}
|
||||
|
||||
override def handleSnapshot(reader: SnapshotReader[Array[Byte]]): Unit = {
|
||||
override def handleLoadSnapshot(reader: SnapshotReader[Array[Byte]]): Unit = {
|
||||
eventQueue.offer(HandleSnapshot(reader))
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -994,8 +994,8 @@ public final class QuorumController implements Controller {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void handleSnapshot(SnapshotReader<ApiMessageAndVersion> reader) {
|
||||
appendRaftEvent(String.format("handleSnapshot[snapshotId=%s]", reader.snapshotId()), () -> {
|
||||
public void handleLoadSnapshot(SnapshotReader<ApiMessageAndVersion> reader) {
|
||||
appendRaftEvent(String.format("handleLoadSnapshot[snapshotId=%s]", reader.snapshotId()), () -> {
|
||||
try {
|
||||
String snapshotName = Snapshots.filenameFromSnapshotId(reader.snapshotId());
|
||||
if (isActiveController()) {
|
||||
|
|
|
|||
|
|
@ -415,14 +415,14 @@ public class MetadataLoader implements RaftClient.Listener<ApiMessageAndVersion>
|
|||
}
|
||||
|
||||
@Override
|
||||
public void handleSnapshot(SnapshotReader<ApiMessageAndVersion> reader) {
|
||||
public void handleLoadSnapshot(SnapshotReader<ApiMessageAndVersion> reader) {
|
||||
eventQueue.append(() -> {
|
||||
try {
|
||||
MetadataDelta delta = new MetadataDelta.Builder().
|
||||
setImage(image).
|
||||
build();
|
||||
SnapshotManifest manifest = loadSnapshot(delta, reader);
|
||||
log.info("handleSnapshot: generated a metadata delta from a snapshot at offset {} " +
|
||||
log.info("handleLoadSnapshot: generated a metadata delta from a snapshot at offset {} " +
|
||||
"in {} us.", manifest.provenance().lastContainedOffset(),
|
||||
NANOSECONDS.toMicros(manifest.elapsedNs()));
|
||||
try {
|
||||
|
|
@ -432,10 +432,10 @@ public class MetadataLoader implements RaftClient.Listener<ApiMessageAndVersion>
|
|||
"snapshot at offset " + reader.lastContainedLogOffset(), e);
|
||||
return;
|
||||
}
|
||||
if (stillNeedToCatchUp("handleSnapshot", manifest.provenance().lastContainedOffset())) {
|
||||
if (stillNeedToCatchUp("handleLoadSnapshot", manifest.provenance().lastContainedOffset())) {
|
||||
return;
|
||||
}
|
||||
log.info("handleSnapshot: publishing new snapshot image with provenance {}.", image.provenance());
|
||||
log.info("handleLoadSnapshot: publishing new snapshot image with provenance {}.", image.provenance());
|
||||
for (MetadataPublisher publisher : publishers.values()) {
|
||||
try {
|
||||
publisher.onMetadataUpdate(delta, image, manifest);
|
||||
|
|
@ -452,7 +452,7 @@ public class MetadataLoader implements RaftClient.Listener<ApiMessageAndVersion>
|
|||
} catch (Throwable e) {
|
||||
// This is a general catch-all block where we don't expect to end up;
|
||||
// failure-prone operations should have individual try/catch blocks around them.
|
||||
faultHandler.handleFault("Unhandled fault in MetadataLoader#handleSnapshot. " +
|
||||
faultHandler.handleFault("Unhandled fault in MetadataLoader#handleLoadSnapshot. " +
|
||||
"Snapshot offset was " + reader.lastContainedLogOffset(), e);
|
||||
} finally {
|
||||
reader.close();
|
||||
|
|
|
|||
|
|
@ -639,7 +639,7 @@ public class KRaftMigrationDriver implements MetadataPublisher {
|
|||
}
|
||||
|
||||
if (isSnapshot) {
|
||||
zkMetadataWriter.handleSnapshot(image);
|
||||
zkMetadataWriter.handleLoadSnapshot(image);
|
||||
} else {
|
||||
zkMetadataWriter.handleDelta(prevImage, image, delta);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -68,7 +68,7 @@ public class KRaftMigrationZkWriter {
|
|||
this.operationConsumer = operationConsumer;
|
||||
}
|
||||
|
||||
public void handleSnapshot(MetadataImage image) {
|
||||
public void handleLoadSnapshot(MetadataImage image) {
|
||||
handleTopicsSnapshot(image.topics());
|
||||
handleConfigsSnapshot(image.configs());
|
||||
handleClientQuotasSnapshot(image.clientQuotas(), image.scram());
|
||||
|
|
|
|||
|
|
@ -250,7 +250,7 @@ public class MetadataLoaderTest {
|
|||
)
|
||||
)
|
||||
);
|
||||
loader.handleSnapshot(snapshotReader);
|
||||
loader.handleLoadSnapshot(snapshotReader);
|
||||
}
|
||||
loader.waitForAllEventsToBeHandled();
|
||||
if (sameObject) {
|
||||
|
|
@ -291,7 +291,7 @@ public class MetadataLoaderTest {
|
|||
setName(MetadataVersion.FEATURE_NAME).
|
||||
setFeatureLevel(IBP_3_3_IV2.featureLevel()), (short) 0))));
|
||||
assertFalse(snapshotReader.closed);
|
||||
loader.handleSnapshot(snapshotReader);
|
||||
loader.handleLoadSnapshot(snapshotReader);
|
||||
loader.waitForAllEventsToBeHandled();
|
||||
assertTrue(snapshotReader.closed);
|
||||
publishers.get(0).firstPublish.get(1, TimeUnit.MINUTES);
|
||||
|
|
@ -355,7 +355,7 @@ public class MetadataLoaderTest {
|
|||
if (loader.time() instanceof MockTime) {
|
||||
snapshotReader.setTime((MockTime) loader.time());
|
||||
}
|
||||
loader.handleSnapshot(snapshotReader);
|
||||
loader.handleLoadSnapshot(snapshotReader);
|
||||
loader.waitForAllEventsToBeHandled();
|
||||
}
|
||||
|
||||
|
|
@ -469,7 +469,7 @@ public class MetadataLoaderTest {
|
|||
setHighWaterMarkAccessor(() -> OptionalLong.of(1L)).
|
||||
build()) {
|
||||
loader.installPublishers(publishers).get();
|
||||
loader.handleSnapshot(MockSnapshotReader.fromRecordLists(
|
||||
loader.handleLoadSnapshot(MockSnapshotReader.fromRecordLists(
|
||||
new MetadataProvenance(200, 100, 4000), asList(
|
||||
asList(new ApiMessageAndVersion(new FeatureLevelRecord().
|
||||
setName(MetadataVersion.FEATURE_NAME).
|
||||
|
|
@ -539,7 +539,7 @@ public class MetadataLoaderTest {
|
|||
MetadataLoader loader,
|
||||
long offset
|
||||
) throws Exception {
|
||||
loader.handleSnapshot(MockSnapshotReader.fromRecordLists(
|
||||
loader.handleLoadSnapshot(MockSnapshotReader.fromRecordLists(
|
||||
new MetadataProvenance(offset, 100, 4000), asList(
|
||||
asList(new ApiMessageAndVersion(new FeatureLevelRecord().
|
||||
setName(MetadataVersion.FEATURE_NAME).
|
||||
|
|
@ -555,7 +555,7 @@ public class MetadataLoaderTest {
|
|||
MetadataLoader loader,
|
||||
long offset
|
||||
) throws Exception {
|
||||
loader.handleSnapshot(MockSnapshotReader.fromRecordLists(
|
||||
loader.handleLoadSnapshot(MockSnapshotReader.fromRecordLists(
|
||||
new MetadataProvenance(offset, 100, 4000), asList(
|
||||
asList(new ApiMessageAndVersion(new FeatureLevelRecord().
|
||||
setName(MetadataVersion.FEATURE_NAME).
|
||||
|
|
|
|||
|
|
@ -424,8 +424,8 @@ public final class LocalLogManager implements RaftClient<ApiMessageAndVersion>,
|
|||
offset = reader.lastOffset().getAsLong();
|
||||
}
|
||||
|
||||
void handleSnapshot(SnapshotReader<ApiMessageAndVersion> reader) {
|
||||
listener.handleSnapshot(reader);
|
||||
void handleLoadSnapshot(SnapshotReader<ApiMessageAndVersion> reader) {
|
||||
listener.handleLoadSnapshot(reader);
|
||||
offset = reader.lastContainedLogOffset();
|
||||
}
|
||||
|
||||
|
|
@ -519,7 +519,7 @@ public final class LocalLogManager implements RaftClient<ApiMessageAndVersion>,
|
|||
Optional<RawSnapshotReader> snapshot = shared.nextSnapshot(listenerData.offset());
|
||||
if (snapshot.isPresent()) {
|
||||
log.trace("Node {}: handling snapshot with id {}.", nodeId, snapshot.get().snapshotId());
|
||||
listenerData.handleSnapshot(
|
||||
listenerData.handleLoadSnapshot(
|
||||
RecordsSnapshotReader.of(
|
||||
snapshot.get(),
|
||||
new MetadataRecordSerde(),
|
||||
|
|
|
|||
|
|
@ -68,7 +68,7 @@ public class MockMetaLogManagerListener implements RaftClient.Listener<ApiMessag
|
|||
}
|
||||
|
||||
@Override
|
||||
public synchronized void handleSnapshot(SnapshotReader<ApiMessageAndVersion> reader) {
|
||||
public synchronized void handleLoadSnapshot(SnapshotReader<ApiMessageAndVersion> reader) {
|
||||
long lastCommittedOffset = reader.lastContainedLogOffset();
|
||||
try {
|
||||
while (reader.hasNext()) {
|
||||
|
|
|
|||
|
|
@ -2531,7 +2531,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
|
|||
}
|
||||
|
||||
logger.debug("Notifying listener {} of snapshot {}", listenerName(), reader.snapshotId());
|
||||
listener.handleSnapshot(reader);
|
||||
listener.handleLoadSnapshot(reader);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -58,7 +58,7 @@ public interface RaftClient<T> extends AutoCloseable {
|
|||
*
|
||||
* @param reader snapshot reader instance which must be iterated and closed
|
||||
*/
|
||||
void handleSnapshot(SnapshotReader<T> reader);
|
||||
void handleLoadSnapshot(SnapshotReader<T> reader);
|
||||
|
||||
/**
|
||||
* Called on any change to leadership. This includes both when a leader is elected and
|
||||
|
|
@ -66,7 +66,7 @@ public interface RaftClient<T> extends AutoCloseable {
|
|||
*
|
||||
* If this node is the leader, then the notification of leadership will be delayed until
|
||||
* the implementation of this interface has caught up to the high-watermark through calls to
|
||||
* {@link #handleSnapshot(SnapshotReader)} and {@link #handleCommit(BatchReader)}.
|
||||
* {@link #handleLoadSnapshot(SnapshotReader)} and {@link #handleCommit(BatchReader)}.
|
||||
*
|
||||
* If this node is not the leader, then this method will be called as soon as possible. In
|
||||
* this case the leader may or may not be known for the current epoch.
|
||||
|
|
|
|||
|
|
@ -39,7 +39,7 @@ public class ReplicatedCounter implements RaftClient.Listener<Integer> {
|
|||
private OptionalInt claimedEpoch = OptionalInt.empty();
|
||||
private long lastOffsetSnapshotted = -1;
|
||||
|
||||
private int handleSnapshotCalls = 0;
|
||||
private int handleLoadSnapshotCalls = 0;
|
||||
|
||||
public ReplicatedCounter(
|
||||
int nodeId,
|
||||
|
|
@ -135,7 +135,7 @@ public class ReplicatedCounter implements RaftClient.Listener<Integer> {
|
|||
}
|
||||
|
||||
@Override
|
||||
public synchronized void handleSnapshot(SnapshotReader<Integer> reader) {
|
||||
public synchronized void handleLoadSnapshot(SnapshotReader<Integer> reader) {
|
||||
try {
|
||||
log.debug("Loading snapshot {}", reader.snapshotId());
|
||||
while (reader.hasNext()) {
|
||||
|
|
@ -157,7 +157,7 @@ public class ReplicatedCounter implements RaftClient.Listener<Integer> {
|
|||
}
|
||||
}
|
||||
lastOffsetSnapshotted = reader.lastContainedLogOffset();
|
||||
handleSnapshotCalls += 1;
|
||||
handleLoadSnapshotCalls += 1;
|
||||
log.debug("Finished loading snapshot. Set value: {}", committed);
|
||||
} finally {
|
||||
reader.close();
|
||||
|
|
@ -176,11 +176,11 @@ public class ReplicatedCounter implements RaftClient.Listener<Integer> {
|
|||
uncommitted = -1;
|
||||
claimedEpoch = OptionalInt.empty();
|
||||
}
|
||||
handleSnapshotCalls = 0;
|
||||
handleLoadSnapshotCalls = 0;
|
||||
}
|
||||
|
||||
/** Use handleSnapshotCalls to verify leader is never asked to load snapshot */
|
||||
public int handleSnapshotCalls() {
|
||||
return handleSnapshotCalls;
|
||||
/** Use handleLoadSnapshotCalls to verify leader is never asked to load snapshot */
|
||||
public int handleLoadSnapshotCalls() {
|
||||
return handleLoadSnapshotCalls;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1220,7 +1220,7 @@ public final class RaftClientTestContext {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void handleSnapshot(SnapshotReader<String> reader) {
|
||||
public void handleLoadSnapshot(SnapshotReader<String> reader) {
|
||||
snapshot.ifPresent(snapshot -> assertDoesNotThrow(snapshot::close));
|
||||
commits.clear();
|
||||
savedBatches.clear();
|
||||
|
|
|
|||
|
|
@ -1066,7 +1066,7 @@ public class RaftEventSimulationTest {
|
|||
public void verify() {
|
||||
for (RaftNode raftNode : cluster.running()) {
|
||||
if (raftNode.counter.isWritable()) {
|
||||
assertEquals(0, raftNode.counter.handleSnapshotCalls());
|
||||
assertEquals(0, raftNode.counter.handleLoadSnapshotCalls());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -113,7 +113,7 @@ public final class MetadataNodeManager implements AutoCloseable {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void handleSnapshot(SnapshotReader<ApiMessageAndVersion> reader) {
|
||||
public void handleLoadSnapshot(SnapshotReader<ApiMessageAndVersion> reader) {
|
||||
try {
|
||||
while (reader.hasNext()) {
|
||||
Batch<ApiMessageAndVersion> batch = reader.next();
|
||||
|
|
|
|||
Loading…
Reference in New Issue