KAFKA-14712: Produce correct error msg with correct metadataversion (#13773)

Fix the confusing error message in ImageWriterOptions

Reviewers: Luke Chen <showuon@gmail.com>, David Arthur <mumrah@gmail.com>
This commit is contained in:
Owen Leung 2023-07-24 10:37:23 +08:00 committed by GitHub
parent a3204aed2e
commit 4981fa939d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 38 additions and 4 deletions

View File

@ -29,6 +29,7 @@ import java.util.function.Consumer;
public final class ImageWriterOptions {
public static class Builder {
private MetadataVersion metadataVersion;
private MetadataVersion requestedMetadataVersion;
private Consumer<UnwritableMetadataException> lossHandler = e -> {
throw e;
};
@ -42,6 +43,7 @@ public final class ImageWriterOptions {
}
public Builder setMetadataVersion(MetadataVersion metadataVersion) {
setRequestedMetadataVersion(metadataVersion);
if (metadataVersion.isLessThan(MetadataVersion.MINIMUM_BOOTSTRAP_VERSION)) {
// When writing an image, all versions less than 3.3-IV0 are treated as 3.0-IV1.
// This is because those versions don't support FeatureLevelRecord.
@ -58,29 +60,40 @@ public final class ImageWriterOptions {
return this;
}
public void setRequestedMetadataVersion(MetadataVersion orgMetadataVersion) {
this.requestedMetadataVersion = orgMetadataVersion;
}
public MetadataVersion metadataVersion() {
return metadataVersion;
}
public MetadataVersion requestedMetadataVersion() {
return requestedMetadataVersion;
}
public Builder setLossHandler(Consumer<UnwritableMetadataException> lossHandler) {
this.lossHandler = lossHandler;
return this;
}
public ImageWriterOptions build() {
return new ImageWriterOptions(metadataVersion, lossHandler);
return new ImageWriterOptions(metadataVersion, lossHandler, requestedMetadataVersion);
}
}
private final MetadataVersion metadataVersion;
private final MetadataVersion requestedMetadataVersion;
private final Consumer<UnwritableMetadataException> lossHandler;
private ImageWriterOptions(
MetadataVersion metadataVersion,
Consumer<UnwritableMetadataException> lossHandler
Consumer<UnwritableMetadataException> lossHandler,
MetadataVersion orgMetadataVersion
) {
this.metadataVersion = metadataVersion;
this.lossHandler = lossHandler;
this.requestedMetadataVersion = orgMetadataVersion;
}
public MetadataVersion metadataVersion() {
@ -88,7 +101,7 @@ public final class ImageWriterOptions {
}
public void handleLoss(String loss) {
lossHandler.accept(new UnwritableMetadataException(metadataVersion, loss));
lossHandler.accept(new UnwritableMetadataException(requestedMetadataVersion, loss));
}
}

View File

@ -138,7 +138,7 @@ public class ImageDowngradeTest {
MetadataImage image = delta.apply(MetadataProvenance.EMPTY);
RecordListWriter writer = new RecordListWriter();
image.write(writer, new ImageWriterOptions.Builder().
setRawMetadataVersion(metadataVersion).
setMetadataVersion(metadataVersion).
setLossHandler(lossConsumer).
build());
assertEquals(expectedLosses, lossConsumer.losses, "Failed to get expected metadata losses.");

View File

@ -21,6 +21,8 @@ import org.apache.kafka.server.common.MetadataVersion;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import java.util.function.Consumer;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
@ -44,9 +46,28 @@ public class ImageWriterOptionsTest {
setMetadataVersion(version);
if (i < MetadataVersion.MINIMUM_BOOTSTRAP_VERSION.ordinal()) {
assertEquals(MetadataVersion.MINIMUM_KRAFT_VERSION, options.metadataVersion());
assertEquals(version, options.requestedMetadataVersion());
} else {
assertEquals(version, options.metadataVersion());
}
}
}
@Test
public void testHandleLoss() {
String expectedMessage = "stuff";
for (int i = MetadataVersion.MINIMUM_KRAFT_VERSION.ordinal();
i < MetadataVersion.VERSIONS.length;
i++) {
MetadataVersion version = MetadataVersion.VERSIONS[i];
String formattedMessage = String.format("Metadata has been lost because the following could not be represented in metadata version %s: %s", version, expectedMessage);
Consumer<UnwritableMetadataException> customLossHandler = e -> assertEquals(formattedMessage, e.getMessage());
ImageWriterOptions options = new ImageWriterOptions.Builder()
.setMetadataVersion(version)
.setLossHandler(customLossHandler)
.build();
options.handleLoss(expectedMessage);
}
}
}