KAFKA-14894: MetadataLoader must call finishSnapshot after loading a snapshot (#13541)

The MetadataLoader must call finishSnapshot after loading a snapshot. This function removes
whatever was in the old snapshot that is not in the new snapshot that was just loaded. While this
is not significant when the old snapshot was the empty snapshot, it is important to do when we are
loading a snapshot on top of an existing non-empty image.

In initializeNewPublishers, the newly installed publishers should be given a MetadataDelta based on
MetadataImage.EMPTY, reflecting the fact that they are seeing everything for the first time.

Reviewers: David Arthur <mumrah@gmail.com>
This commit is contained in:
Colin Patrick McCabe 2023-04-11 15:02:33 -07:00 committed by GitHub
parent e49a5a265f
commit f1f35ef1a8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 84 additions and 1 deletions

View File

@ -278,13 +278,16 @@ public class MetadataLoader implements RaftClient.Listener<ApiMessageAndVersion>
log.debug("InitializeNewPublishers: setting up snapshot image for new publisher(s): {}",
uninitializedPublisherNames());
long startNs = time.nanoseconds();
// We base this delta off of the empty image, reflecting the fact that these publishers
// haven't seen anything previously.
MetadataDelta delta = new MetadataDelta.Builder().
setImage(image).
setImage(MetadataImage.EMPTY).
build();
ImageReWriter writer = new ImageReWriter(delta);
image.write(writer, new ImageWriterOptions.Builder().
setMetadataVersion(image.features().metadataVersion()).
build());
// ImageReWriter#close invokes finishSnapshot, so we don't need to invoke it here.
SnapshotManifest manifest = new SnapshotManifest(
image.provenance(),
time.nanoseconds() - startNs);
@ -484,6 +487,7 @@ public class MetadataLoader implements RaftClient.Listener<ApiMessageAndVersion>
snapshotIndex++;
}
}
delta.finishSnapshot();
MetadataProvenance provenance = new MetadataProvenance(reader.lastContainedLogOffset(),
reader.lastContainedLogEpoch(), reader.lastContainedLogTimestamp());
return new SnapshotManifest(provenance,

View File

@ -49,6 +49,7 @@ public class ImageReWriter implements ImageWriter {
if (closed) return;
closed = true;
if (complete) {
delta.finishSnapshot();
image = delta.apply(delta.image().provenance());
}
}

View File

@ -550,4 +550,52 @@ public class MetadataLoaderTest {
)));
loader.waitForAllEventsToBeHandled();
}
private void loadTestSnapshot2(
MetadataLoader loader,
long offset
) throws Exception {
loader.handleSnapshot(MockSnapshotReader.fromRecordLists(
new MetadataProvenance(offset, 100, 4000), asList(
asList(new ApiMessageAndVersion(new FeatureLevelRecord().
setName(MetadataVersion.FEATURE_NAME).
setFeatureLevel(IBP_3_3_IV2.featureLevel()), (short) 0)),
asList(new ApiMessageAndVersion(new TopicRecord().
setName("bar").
setTopicId(Uuid.fromString("VcL2Mw-cT4aL6XV9VujzoQ")), (short) 0))
)));
loader.waitForAllEventsToBeHandled();
}
/**
* Test that loading a snapshot clears the previous state.
*/
@Test
public void testReloadSnapshot() throws Exception {
MockFaultHandler faultHandler = new MockFaultHandler("testLastAppliedOffset");
List<MockPublisher> publishers = asList(new MockPublisher("a"));
try (MetadataLoader loader = new MetadataLoader.Builder().
setFaultHandler(faultHandler).
setHighWaterMarkAccessor(() -> OptionalLong.of(0)).
build()) {
loadTestSnapshot(loader, 100);
loader.installPublishers(publishers).get();
loader.waitForAllEventsToBeHandled();
assertTrue(publishers.get(0).firstPublish.isDone());
assertTrue(publishers.get(0).latestDelta.image().isEmpty());
assertEquals(100L, publishers.get(0).latestImage.provenance().lastContainedOffset());
loadTestSnapshot(loader, 200);
assertEquals(200L, loader.lastAppliedOffset());
assertFalse(publishers.get(0).latestDelta.image().isEmpty());
loadTestSnapshot2(loader, 400);
assertEquals(400L, loader.lastAppliedOffset());
// Make sure the topic in the initial snapshot was overwritten by loading the new snapshot.
assertFalse(publishers.get(0).latestImage.topics().topicsByName().containsKey("foo"));
assertTrue(publishers.get(0).latestImage.topics().topicsByName().containsKey("bar"));
}
faultHandler.maybeRethrowFirstException();
}
}

View File

@ -18,11 +18,17 @@
package org.apache.kafka.image.writer;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.metadata.ConfigRecord;
import org.apache.kafka.common.metadata.TopicRecord;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import java.util.Collections;
import static org.apache.kafka.common.config.ConfigResource.Type.BROKER;
import static org.apache.kafka.metadata.RecordTestUtils.testRecord;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
@ -60,4 +66,28 @@ public class ImageReWriterTest {
setName("foo").
setTopicId(Uuid.fromString("3B134hrsQgKtz8Sp6QBIfg"))));
}
@Test
public void testCloseInvokesFinishSnapshot() {
MetadataDelta delta = new MetadataDelta.Builder().build();
ImageReWriter writer = new ImageReWriter(delta);
writer.write(0, new TopicRecord().
setName("foo").
setTopicId(Uuid.fromString("3B134hrsQgKtz8Sp6QBIfg")));
writer.close(true);
MetadataDelta delta2 = new MetadataDelta.Builder().setImage(writer.image()).build();
ImageReWriter writer2 = new ImageReWriter(delta2);
writer2.write(0, new ConfigRecord().
setResourceName("").
setResourceType(BROKER.id()).
setName("num.io.threads").
setValue("12"));
writer2.close(true);
MetadataImage newImage = writer2.image();
assertEquals(Collections.emptyMap(), newImage.topics().topicsById());
assertEquals(Collections.singletonMap("num.io.threads", "12"),
newImage.configs().configMapForResource(new ConfigResource(BROKER, "")));
}
}