KAFKA-15389: Don't publish until we have replayed at least one record (#14282)

When starting up a controller for the first time (i.e., with an empty log), it is possible for
MetadataLoader to publish an empty MetadataImage before the activation records of the controller
have been written. While this is not a bug, it could be confusing. This patch closes that gap by
waiting for at least one controller record to be committed before the MetadataLoader starts publishing
images.

Reviewers: Colin P. McCabe <cmccabe@apache.org>
This commit is contained in:
David Arthur 2023-08-25 13:41:43 -04:00 committed by GitHub
parent 5785796f98
commit f2d499e25a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 72 additions and 4 deletions

View File

@ -67,6 +67,7 @@ public class MetadataBatchLoader {
private int numBatches;
private long totalBatchElapsedNs;
private TransactionState transactionState;
private boolean hasSeenRecord;
public MetadataBatchLoader(
LogContext logContext,
@ -78,16 +79,27 @@ public class MetadataBatchLoader {
this.time = time;
this.faultHandler = faultHandler;
this.callback = callback;
this.resetToImage(MetadataImage.EMPTY);
this.hasSeenRecord = false;
}
/**
* @return True if this batch loader has seen at least one record.
*/
public boolean hasSeenRecord() {
return hasSeenRecord;
}
/**
* Reset the state of this batch loader to the given image. Any un-flushed state will be
* discarded.
* discarded. This is called after applying a delta and passing it back to MetadataLoader, or
* when MetadataLoader loads a snapshot.
*
* @param image Metadata image to reset this batch loader's state to.
*/
public void resetToImage(MetadataImage image) {
this.image = image;
this.hasSeenRecord = true;
this.delta = new MetadataDelta.Builder().setImage(image).build();
this.transactionState = TransactionState.NO_TRANSACTION;
this.lastOffset = image.provenance().lastContainedOffset();
@ -241,6 +253,7 @@ public class MetadataBatchLoader {
default:
break;
}
hasSeenRecord = true;
delta.replay(record.message());
}
}

View File

@ -212,7 +212,6 @@ public class MetadataLoader implements RaftClient.Listener<ApiMessageAndVersion>
time,
faultHandler,
this::maybePublishMetadata);
this.batchLoader.resetToImage(this.image);
this.eventQueue = new KafkaEventQueue(
Time.SYSTEM,
logContext,
@ -241,6 +240,11 @@ public class MetadataLoader implements RaftClient.Listener<ApiMessageAndVersion>
offset + ", but the high water mark is {}", where, highWaterMark.getAsLong());
return true;
}
if (!batchLoader.hasSeenRecord()) {
log.info("{}: The loader is still catching up because we have not loaded a controller record as of offset " +
offset + " and high water mark is {}", where, highWaterMark.getAsLong());
return true;
}
log.info("{}: The loader finished catching up to the current high water mark of {}",
where, highWaterMark.getAsLong());
catchingUp = false;
@ -387,8 +391,8 @@ public class MetadataLoader implements RaftClient.Listener<ApiMessageAndVersion>
image.provenance().lastContainedOffset(),
NANOSECONDS.toMicros(manifest.elapsedNs()));
MetadataImage image = delta.apply(manifest.provenance());
maybePublishMetadata(delta, image, manifest);
batchLoader.resetToImage(image);
maybePublishMetadata(delta, image, manifest);
} catch (Throwable e) {
// This is a general catch-all block where we don't expect to end up;
// failure-prone operations should have individual try/catch blocks around them.

View File

@ -18,9 +18,11 @@
package org.apache.kafka.image.loader;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.message.SnapshotHeaderRecord;
import org.apache.kafka.common.metadata.AbortTransactionRecord;
import org.apache.kafka.common.metadata.BeginTransactionRecord;
import org.apache.kafka.common.metadata.ConfigRecord;
import org.apache.kafka.common.metadata.EndTransactionRecord;
import org.apache.kafka.common.metadata.FeatureLevelRecord;
import org.apache.kafka.common.metadata.PartitionRecord;
@ -48,6 +50,7 @@ import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.ValueSource;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
@ -338,8 +341,8 @@ public class MetadataLoaderTest {
setHighWaterMarkAccessor(() -> OptionalLong.of(0L)).
build()) {
loader.installPublishers(publishers).get();
publishers.get(0).firstPublish.get(10, TimeUnit.SECONDS);
loadEmptySnapshot(loader, 200);
publishers.get(0).firstPublish.get(10, TimeUnit.SECONDS);
assertEquals(200L, loader.lastAppliedOffset());
loadEmptySnapshot(loader, 300);
assertEquals(300L, loader.lastAppliedOffset());
@ -668,6 +671,7 @@ public class MetadataLoaderTest {
.setTopicId(Uuid.fromString("dMCqhcK4T5miGH5wEX7NsQ")), (short) 0)
)));
loader.waitForAllEventsToBeHandled();
publisher.firstPublish.get(30, TimeUnit.SECONDS);
assertNull(publisher.latestImage.topics().getTopic("foo"),
"Topic should not be visible since we started transaction");
@ -732,6 +736,7 @@ public class MetadataLoaderTest {
// After MetadataLoader is fixed to handle arbitrary transactions, we would expect "foo"
// to be visible at this point.
publisher.firstPublish.get(30, TimeUnit.SECONDS);
assertNotNull(publisher.latestImage.topics().getTopic("foo"));
}
faultHandler.maybeRethrowFirstException();
@ -758,6 +763,7 @@ public class MetadataLoaderTest {
.setTopicId(Uuid.fromString("HQSM3ccPQISrHqYK_C8GpA")), (short) 0)
)));
loader.waitForAllEventsToBeHandled();
publisher.firstPublish.get(30, TimeUnit.SECONDS);
assertNull(publisher.latestImage.topics().getTopic("foo"));
// loading a snapshot discards any in-flight transaction
@ -773,4 +779,49 @@ public class MetadataLoaderTest {
}
faultHandler.maybeRethrowFirstException();
}
@Test
public void testNoPublishEmptyImage() throws Exception {
MockFaultHandler faultHandler = new MockFaultHandler("testNoPublishEmptyImage");
List<MetadataImage> capturedImages = new ArrayList<>();
CompletableFuture<Void> firstPublish = new CompletableFuture<>();
MetadataPublisher capturingPublisher = new MetadataPublisher() {
@Override
public String name() {
return "testNoPublishEmptyImage";
}
@Override
public void onMetadataUpdate(MetadataDelta delta, MetadataImage newImage, LoaderManifest manifest) {
if (!firstPublish.isDone()) {
firstPublish.complete(null);
}
capturedImages.add(newImage);
}
};
try (MetadataLoader loader = new MetadataLoader.Builder().
setFaultHandler(faultHandler).
setHighWaterMarkAccessor(() -> OptionalLong.of(1)).
build()) {
loader.installPublishers(Collections.singletonList(capturingPublisher)).get();
loader.handleCommit(
MockBatchReader.newSingleBatchReader(0, 1, Collections.singletonList(
// Any record will work here
new ApiMessageAndVersion(new ConfigRecord()
.setResourceType(ConfigResource.Type.BROKER.id())
.setResourceName("3000")
.setName("foo")
.setValue("bar"), (short) 0)
)));
firstPublish.get(30, TimeUnit.SECONDS);
assertFalse(capturedImages.isEmpty());
capturedImages.forEach(metadataImage -> {
assertFalse(metadataImage.isEmpty());
});
}
faultHandler.maybeRethrowFirstException();
}
}