From edd64fa251b665b1ff88ed80112ac56a1ac0f9e2 Mon Sep 17 00:00:00 2001 From: Ron Dagostino Date: Mon, 10 Jul 2023 10:01:10 -0400 Subject: [PATCH] MINOR: more KRaft Metadata Image tests (#13724) Adds additional testing for the various KRaft *Image classes. For every image that we create we already test that we can get there by applying all the records corresponding to that image written out as a list of records. This patch adds more tests to confirm that we can get to each such final image with intermediate stops at all possible intermediate images along the way. Reviewers: Colin P. McCabe , David Arthur --- .../org/apache/kafka/image/AclsImageTest.java | 42 +++++++---- .../kafka/image/ClientQuotasImageTest.java | 42 +++++++---- .../apache/kafka/image/ClusterImageTest.java | 41 +++++++---- .../kafka/image/ConfigurationsImageTest.java | 42 +++++++---- .../apache/kafka/image/FeaturesImageTest.java | 42 +++++++---- .../apache/kafka/image/MetadataImageTest.java | 68 ++++++++++++++---- .../kafka/image/ProducerIdsImageTest.java | 42 +++++++---- .../apache/kafka/image/ScramImageTest.java | 46 ++++++++---- .../apache/kafka/image/TopicsImageTest.java | 58 +++++++++++---- .../kafka/metadata/RecordTestUtils.java | 71 +++++++++++++++++++ 10 files changed, 383 insertions(+), 111 deletions(-) diff --git a/metadata/src/test/java/org/apache/kafka/image/AclsImageTest.java b/metadata/src/test/java/org/apache/kafka/image/AclsImageTest.java index 01910ab28b3..93027138765 100644 --- a/metadata/src/test/java/org/apache/kafka/image/AclsImageTest.java +++ b/metadata/src/test/java/org/apache/kafka/image/AclsImageTest.java @@ -32,6 +32,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import static org.apache.kafka.metadata.authorizer.StandardAclWithIdTest.TEST_ACLS; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -73,31 +74,48 @@ public class AclsImageTest { } @Test - public void testEmptyImageRoundTrip() throws Throwable { - testToImageAndBack(AclsImage.EMPTY); + public void testEmptyImageRoundTrip() { + testToImage(AclsImage.EMPTY); } @Test - public void testImage1RoundTrip() throws Throwable { - testToImageAndBack(IMAGE1); + public void testImage1RoundTrip() { + testToImage(IMAGE1); } @Test - public void testApplyDelta1() throws Throwable { + public void testApplyDelta1() { assertEquals(IMAGE2, DELTA1.apply()); + // check image1 + delta1 = image2, since records for image1 + delta1 might differ from records from image2 + List records = getImageRecords(IMAGE1); + records.addAll(DELTA1_RECORDS); + testToImage(IMAGE2, records); } @Test - public void testImage2RoundTrip() throws Throwable { - testToImageAndBack(IMAGE2); + public void testImage2RoundTrip() { + testToImage(IMAGE2); } - private void testToImageAndBack(AclsImage image) throws Throwable { + private static void testToImage(AclsImage image) { + testToImage(image, Optional.empty()); + } + + private static void testToImage(AclsImage image, Optional> fromRecords) { + testToImage(image, fromRecords.orElseGet(() -> getImageRecords(image))); + } + + private static void testToImage(AclsImage image, List fromRecords) { + // test from empty image stopping each of the various intermediate images along the way + new RecordTestUtils.TestThroughAllIntermediateImagesLeadingToFinalImageHelper<>( + () -> AclsImage.EMPTY, + AclsDelta::new + ).test(image, fromRecords); + } + + private static List getImageRecords(AclsImage image) { RecordListWriter writer = new RecordListWriter(); image.write(writer, new ImageWriterOptions.Builder().build()); - AclsDelta delta = new AclsDelta(AclsImage.EMPTY); - RecordTestUtils.replayAll(delta, writer.records()); - AclsImage nextImage = delta.apply(); - assertEquals(image, nextImage); + return writer.records(); } } diff --git a/metadata/src/test/java/org/apache/kafka/image/ClientQuotasImageTest.java b/metadata/src/test/java/org/apache/kafka/image/ClientQuotasImageTest.java index 1f656baa2e6..8d1a5883cc4 100644 --- a/metadata/src/test/java/org/apache/kafka/image/ClientQuotasImageTest.java +++ b/metadata/src/test/java/org/apache/kafka/image/ClientQuotasImageTest.java @@ -33,6 +33,7 @@ import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import static org.apache.kafka.common.metadata.MetadataRecordType.CLIENT_QUOTA_RECORD; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -89,31 +90,48 @@ public class ClientQuotasImageTest { } @Test - public void testEmptyImageRoundTrip() throws Throwable { - testToImageAndBack(ClientQuotasImage.EMPTY); + public void testEmptyImageRoundTrip() { + testToImage(ClientQuotasImage.EMPTY); } @Test - public void testImage1RoundTrip() throws Throwable { - testToImageAndBack(IMAGE1); + public void testImage1RoundTrip() { + testToImage(IMAGE1); } @Test - public void testApplyDelta1() throws Throwable { + public void testApplyDelta1() { assertEquals(IMAGE2, DELTA1.apply()); + // check image1 + delta1 = image2, since records for image1 + delta1 might differ from records from image2 + List records = getImageRecords(IMAGE1); + records.addAll(DELTA1_RECORDS); + testToImage(IMAGE2, records); } @Test - public void testImage2RoundTrip() throws Throwable { - testToImageAndBack(IMAGE2); + public void testImage2RoundTrip() { + testToImage(IMAGE2); } - private void testToImageAndBack(ClientQuotasImage image) throws Throwable { + private static void testToImage(ClientQuotasImage image) { + testToImage(image, Optional.empty()); + } + + private static void testToImage(ClientQuotasImage image, Optional> fromRecords) { + testToImage(image, fromRecords.orElseGet(() -> getImageRecords(image))); + } + + private static void testToImage(ClientQuotasImage image, List fromRecords) { + // test from empty image stopping each of the various intermediate images along the way + new RecordTestUtils.TestThroughAllIntermediateImagesLeadingToFinalImageHelper<>( + () -> ClientQuotasImage.EMPTY, + ClientQuotasDelta::new + ).test(image, fromRecords); + } + + private static List getImageRecords(ClientQuotasImage image) { RecordListWriter writer = new RecordListWriter(); image.write(writer, new ImageWriterOptions.Builder().build()); - ClientQuotasDelta delta = new ClientQuotasDelta(ClientQuotasImage.EMPTY); - RecordTestUtils.replayAll(delta, writer.records()); - ClientQuotasImage nextImage = delta.apply(); - assertEquals(image, nextImage); + return writer.records(); } } diff --git a/metadata/src/test/java/org/apache/kafka/image/ClusterImageTest.java b/metadata/src/test/java/org/apache/kafka/image/ClusterImageTest.java index 256410631ae..e12e1143c88 100644 --- a/metadata/src/test/java/org/apache/kafka/image/ClusterImageTest.java +++ b/metadata/src/test/java/org/apache/kafka/image/ClusterImageTest.java @@ -127,31 +127,48 @@ public class ClusterImageTest { } @Test - public void testEmptyImageRoundTrip() throws Throwable { - testToImageAndBack(ClusterImage.EMPTY); + public void testEmptyImageRoundTrip() { + testToImage(ClusterImage.EMPTY); } @Test - public void testImage1RoundTrip() throws Throwable { - testToImageAndBack(IMAGE1); + public void testImage1RoundTrip() { + testToImage(IMAGE1); } @Test - public void testApplyDelta1() throws Throwable { + public void testApplyDelta1() { assertEquals(IMAGE2, DELTA1.apply()); + // check image1 + delta1 = image2, since records for image1 + delta1 might differ from records from image2 + List records = getImageRecords(IMAGE1); + records.addAll(DELTA1_RECORDS); + testToImage(IMAGE2, records); } @Test - public void testImage2RoundTrip() throws Throwable { - testToImageAndBack(IMAGE2); + public void testImage2RoundTrip() { + testToImage(IMAGE2); } - private void testToImageAndBack(ClusterImage image) throws Throwable { + private static void testToImage(ClusterImage image) { + testToImage(image, Optional.empty()); + } + + private static void testToImage(ClusterImage image, Optional> fromRecords) { + testToImage(image, fromRecords.orElseGet(() -> getImageRecords(image))); + } + + private static void testToImage(ClusterImage image, List fromRecords) { + // test from empty image stopping each of the various intermediate images along the way + new RecordTestUtils.TestThroughAllIntermediateImagesLeadingToFinalImageHelper<>( + () -> ClusterImage.EMPTY, + ClusterDelta::new + ).test(image, fromRecords); + } + + private static List getImageRecords(ClusterImage image) { RecordListWriter writer = new RecordListWriter(); image.write(writer, new ImageWriterOptions.Builder().build()); - ClusterDelta delta = new ClusterDelta(ClusterImage.EMPTY); - RecordTestUtils.replayAll(delta, writer.records()); - ClusterImage nextImage = delta.apply(); - assertEquals(image, nextImage); + return writer.records(); } } diff --git a/metadata/src/test/java/org/apache/kafka/image/ConfigurationsImageTest.java b/metadata/src/test/java/org/apache/kafka/image/ConfigurationsImageTest.java index 429fd8d9aa2..9b7cd39dcd6 100644 --- a/metadata/src/test/java/org/apache/kafka/image/ConfigurationsImageTest.java +++ b/metadata/src/test/java/org/apache/kafka/image/ConfigurationsImageTest.java @@ -30,6 +30,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import static org.apache.kafka.common.config.ConfigResource.Type.BROKER; import static org.apache.kafka.common.metadata.MetadataRecordType.CONFIG_RECORD; @@ -84,31 +85,48 @@ public class ConfigurationsImageTest { } @Test - public void testEmptyImageRoundTrip() throws Throwable { - testToImageAndBack(ConfigurationsImage.EMPTY); + public void testEmptyImageRoundTrip() { + testToImage(ConfigurationsImage.EMPTY); } @Test - public void testImage1RoundTrip() throws Throwable { - testToImageAndBack(IMAGE1); + public void testImage1RoundTrip() { + testToImage(IMAGE1); } @Test - public void testApplyDelta1() throws Throwable { + public void testApplyDelta1() { assertEquals(IMAGE2, DELTA1.apply()); + // check image1 + delta1 = image2, since records for image1 + delta1 might differ from records from image2 + List records = getImageRecords(IMAGE1); + records.addAll(DELTA1_RECORDS); + testToImage(IMAGE2, records); } @Test - public void testImage2RoundTrip() throws Throwable { - testToImageAndBack(IMAGE2); + public void testImage2RoundTrip() { + testToImage(IMAGE2); } - private void testToImageAndBack(ConfigurationsImage image) throws Throwable { + private static void testToImage(ConfigurationsImage image) { + testToImage(image, Optional.empty()); + } + + private static void testToImage(ConfigurationsImage image, Optional> fromRecords) { + testToImage(image, fromRecords.orElseGet(() -> getImageRecords(image))); + } + + private static void testToImage(ConfigurationsImage image, List fromRecords) { + // test from empty image stopping each of the various intermediate images along the way + new RecordTestUtils.TestThroughAllIntermediateImagesLeadingToFinalImageHelper<>( + () -> ConfigurationsImage.EMPTY, + ConfigurationsDelta::new + ).test(image, fromRecords); + } + + private static List getImageRecords(ConfigurationsImage image) { RecordListWriter writer = new RecordListWriter(); image.write(writer, new ImageWriterOptions.Builder().build()); - ConfigurationsDelta delta = new ConfigurationsDelta(ConfigurationsImage.EMPTY); - RecordTestUtils.replayAll(delta, writer.records()); - ConfigurationsImage nextImage = delta.apply(); - assertEquals(image, nextImage); + return writer.records(); } } diff --git a/metadata/src/test/java/org/apache/kafka/image/FeaturesImageTest.java b/metadata/src/test/java/org/apache/kafka/image/FeaturesImageTest.java index a510bf1d855..1ec1d24b651 100644 --- a/metadata/src/test/java/org/apache/kafka/image/FeaturesImageTest.java +++ b/metadata/src/test/java/org/apache/kafka/image/FeaturesImageTest.java @@ -32,6 +32,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -72,32 +73,49 @@ public class FeaturesImageTest { } @Test - public void testEmptyImageRoundTrip() throws Throwable { - testToImageAndBack(FeaturesImage.EMPTY); + public void testEmptyImageRoundTrip() { + testToImage(FeaturesImage.EMPTY); } @Test - public void testImage1RoundTrip() throws Throwable { - testToImageAndBack(IMAGE1); + public void testImage1RoundTrip() { + testToImage(IMAGE1); } @Test - public void testApplyDelta1() throws Throwable { + public void testApplyDelta1() { assertEquals(IMAGE2, DELTA1.apply()); + // check image1 + delta1 = image2, since records for image1 + delta1 might differ from records from image2 + List records = getImageRecords(IMAGE1); + records.addAll(DELTA1_RECORDS); + testToImage(IMAGE2, records); } @Test - public void testImage2RoundTrip() throws Throwable { - testToImageAndBack(IMAGE2); + public void testImage2RoundTrip() { + testToImage(IMAGE2); } - private void testToImageAndBack(FeaturesImage image) throws Throwable { + private static void testToImage(FeaturesImage image) { + testToImage(image, Optional.empty()); + } + + private static void testToImage(FeaturesImage image, Optional> fromRecords) { + testToImage(image, fromRecords.orElseGet(() -> getImageRecords(image))); + } + + private static void testToImage(FeaturesImage image, List fromRecords) { + // test from empty image stopping each of the various intermediate images along the way + new RecordTestUtils.TestThroughAllIntermediateImagesLeadingToFinalImageHelper<>( + () -> FeaturesImage.EMPTY, + FeaturesDelta::new + ).test(image, fromRecords); + } + + private static List getImageRecords(FeaturesImage image) { RecordListWriter writer = new RecordListWriter(); image.write(writer, new ImageWriterOptions.Builder().setMetadataVersion(image.metadataVersion()).build()); - FeaturesDelta delta = new FeaturesDelta(FeaturesImage.EMPTY); - RecordTestUtils.replayAll(delta, writer.records()); - FeaturesImage nextImage = delta.apply(); - assertEquals(image, nextImage); + return writer.records(); } @Test diff --git a/metadata/src/test/java/org/apache/kafka/image/MetadataImageTest.java b/metadata/src/test/java/org/apache/kafka/image/MetadataImageTest.java index 2a6e3e6f3e5..ae247108fd4 100644 --- a/metadata/src/test/java/org/apache/kafka/image/MetadataImageTest.java +++ b/metadata/src/test/java/org/apache/kafka/image/MetadataImageTest.java @@ -20,9 +20,13 @@ package org.apache.kafka.image; import org.apache.kafka.image.writer.ImageWriterOptions; import org.apache.kafka.image.writer.RecordListWriter; import org.apache.kafka.metadata.RecordTestUtils; +import org.apache.kafka.server.common.ApiMessageAndVersion; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; +import java.util.List; +import java.util.Optional; + import static org.junit.jupiter.api.Assertions.assertEquals; @@ -71,31 +75,69 @@ public class MetadataImageTest { } @Test - public void testEmptyImageRoundTrip() throws Throwable { - testToImageAndBack(MetadataImage.EMPTY); + public void testEmptyImageRoundTrip() { + testToImage(MetadataImage.EMPTY); } @Test - public void testImage1RoundTrip() throws Throwable { - testToImageAndBack(IMAGE1); + public void testImage1RoundTrip() { + testToImage(IMAGE1); } @Test - public void testApplyDelta1() throws Throwable { + public void testApplyDelta1() { assertEquals(IMAGE2, DELTA1.apply(IMAGE2.provenance())); + // check image1 + delta1 = image2, since records for image1 + delta1 might differ from records from image2 + ImageWriterOptions options = new ImageWriterOptions.Builder() + .setMetadataVersion(IMAGE1.features().metadataVersion()) + .build(); + List records = getImageRecords(IMAGE1, options); + records.addAll(FeaturesImageTest.DELTA1_RECORDS); + records.addAll(ClusterImageTest.DELTA1_RECORDS); + records.addAll(TopicsImageTest.DELTA1_RECORDS); + records.addAll(ConfigurationsImageTest.DELTA1_RECORDS); + records.addAll(ClientQuotasImageTest.DELTA1_RECORDS); + records.addAll(ProducerIdsImageTest.DELTA1_RECORDS); + records.addAll(AclsImageTest.DELTA1_RECORDS); + records.addAll(ScramImageTest.DELTA1_RECORDS); + testToImage(IMAGE2, records); } @Test - public void testImage2RoundTrip() throws Throwable { - testToImageAndBack(IMAGE2); + public void testImage2RoundTrip() { + testToImage(IMAGE2); } - private void testToImageAndBack(MetadataImage image) throws Throwable { + private static void testToImage(MetadataImage image) { + testToImage(image, new ImageWriterOptions.Builder() + .setMetadataVersion(image.features().metadataVersion()) + .build(), Optional.empty()); + } + + private static void testToImage(MetadataImage image, ImageWriterOptions options) { + testToImage(image, options, Optional.empty()); + } + + static void testToImage(MetadataImage image, ImageWriterOptions options, Optional> fromRecords) { + testToImage(image, fromRecords.orElseGet(() -> getImageRecords(image, options))); + } + + private static void testToImage(MetadataImage image, List fromRecords) { + // test from empty image stopping each of the various intermediate images along the way + new RecordTestUtils.TestThroughAllIntermediateImagesLeadingToFinalImageHelper( + () -> MetadataImage.EMPTY, + MetadataDelta::new + ) { + @Override + public MetadataImage createImageByApplyingDelta(MetadataDelta delta) { + return delta.apply(image.provenance()); + } + }.test(image, fromRecords); + } + + private static List getImageRecords(MetadataImage image, ImageWriterOptions options) { RecordListWriter writer = new RecordListWriter(); - image.write(writer, new ImageWriterOptions.Builder(image).build()); - MetadataDelta delta = new MetadataDelta(MetadataImage.EMPTY); - RecordTestUtils.replayAll(delta, writer.records()); - MetadataImage nextImage = delta.apply(image.provenance()); - assertEquals(image, nextImage); + image.write(writer, options); + return writer.records(); } } diff --git a/metadata/src/test/java/org/apache/kafka/image/ProducerIdsImageTest.java b/metadata/src/test/java/org/apache/kafka/image/ProducerIdsImageTest.java index 69695473d23..738582fc108 100644 --- a/metadata/src/test/java/org/apache/kafka/image/ProducerIdsImageTest.java +++ b/metadata/src/test/java/org/apache/kafka/image/ProducerIdsImageTest.java @@ -27,6 +27,7 @@ import org.junit.jupiter.api.Timeout; import java.util.ArrayList; import java.util.List; +import java.util.Optional; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -61,31 +62,48 @@ public class ProducerIdsImageTest { } @Test - public void testEmptyImageRoundTrip() throws Throwable { - testToImageAndBack(ProducerIdsImage.EMPTY); + public void testEmptyImageRoundTrip() { + testToImage(ProducerIdsImage.EMPTY); } @Test - public void testImage1RoundTrip() throws Throwable { - testToImageAndBack(IMAGE1); + public void testImage1RoundTrip() { + testToImage(IMAGE1); } @Test - public void testApplyDelta1() throws Throwable { + public void testApplyDelta1() { assertEquals(IMAGE2, DELTA1.apply()); + // check image1 + delta1 = image2, since records for image1 + delta1 might differ from records from image2 + List records = getImageRecords(IMAGE1); + records.addAll(DELTA1_RECORDS); + testToImage(IMAGE2, records); } @Test - public void testImage2RoundTrip() throws Throwable { - testToImageAndBack(IMAGE2); + public void testImage2RoundTrip() { + testToImage(IMAGE2); } - private void testToImageAndBack(ProducerIdsImage image) throws Throwable { + private static void testToImage(ProducerIdsImage image) { + testToImage(image, Optional.empty()); + } + + private static void testToImage(ProducerIdsImage image, Optional> fromRecords) { + testToImage(image, fromRecords.orElseGet(() -> getImageRecords(image))); + } + + private static void testToImage(ProducerIdsImage image, List fromRecords) { + // test from empty image stopping each of the various intermediate images along the way + new RecordTestUtils.TestThroughAllIntermediateImagesLeadingToFinalImageHelper<>( + () -> ProducerIdsImage.EMPTY, + ProducerIdsDelta::new + ).test(image, fromRecords); + } + + private static List getImageRecords(ProducerIdsImage image) { RecordListWriter writer = new RecordListWriter(); image.write(writer, new ImageWriterOptions.Builder().build()); - ProducerIdsDelta delta = new ProducerIdsDelta(ProducerIdsImage.EMPTY); - RecordTestUtils.replayAll(delta, writer.records()); - ProducerIdsImage nextImage = delta.apply(); - assertEquals(image, nextImage); + return writer.records(); } } diff --git a/metadata/src/test/java/org/apache/kafka/image/ScramImageTest.java b/metadata/src/test/java/org/apache/kafka/image/ScramImageTest.java index 3400be47b38..038a5c956c3 100644 --- a/metadata/src/test/java/org/apache/kafka/image/ScramImageTest.java +++ b/metadata/src/test/java/org/apache/kafka/image/ScramImageTest.java @@ -34,6 +34,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Random; import static org.apache.kafka.clients.admin.ScramMechanism.SCRAM_SHA_256; @@ -113,36 +114,53 @@ public class ScramImageTest { } @Test - public void testEmptyImageRoundTrip() throws Throwable { - testToImageAndBack(ScramImage.EMPTY); + public void testEmptyImageRoundTrip() { + testToImage(ScramImage.EMPTY); } @Test - public void testImage1RoundTrip() throws Throwable { - testToImageAndBack(IMAGE1); + public void testImage1RoundTrip() { + testToImage(IMAGE1); } @Test - public void testApplyDelta1() throws Throwable { + public void testApplyDelta1() { assertEquals(IMAGE2, DELTA1.apply()); + // check image1 + delta1 = image2, since records for image1 + delta1 might differ from records from image2 + List records = getImageRecords(IMAGE1); + records.addAll(DELTA1_RECORDS); + testToImage(IMAGE2, records); } @Test - public void testImage2RoundTrip() throws Throwable { - testToImageAndBack(IMAGE2); + public void testImage2RoundTrip() { + testToImage(IMAGE2); } - private void testToImageAndBack(ScramImage image) throws Throwable { + private static void testToImage(ScramImage image) { + testToImage(image, Optional.empty()); + } + + private static void testToImage(ScramImage image, Optional> fromRecords) { + testToImage(image, fromRecords.orElseGet(() -> getImageRecords(image))); + } + + private static void testToImage(ScramImage image, List fromRecords) { + // test from empty image stopping each of the various intermediate images along the way + new RecordTestUtils.TestThroughAllIntermediateImagesLeadingToFinalImageHelper<>( + () -> ScramImage.EMPTY, + ScramDelta::new + ).test(image, fromRecords); + } + + private static List getImageRecords(ScramImage image) { RecordListWriter writer = new RecordListWriter(); image.write(writer, new ImageWriterOptions.Builder().build()); - ScramDelta delta = new ScramDelta(ScramImage.EMPTY); - RecordTestUtils.replayAll(delta, writer.records()); - ScramImage nextImage = delta.apply(); - assertEquals(image, nextImage); + return writer.records(); } @Test - public void testEmptyWithInvalidIBP() throws Throwable { + public void testEmptyWithInvalidIBP() { ImageWriterOptions imageWriterOptions = new ImageWriterOptions.Builder(). setMetadataVersion(MetadataVersion.IBP_3_4_IV0).build(); RecordListWriter writer = new RecordListWriter(); @@ -150,7 +168,7 @@ public class ScramImageTest { } @Test - public void testImage1withInvalidIBP() throws Throwable { + public void testImage1withInvalidIBP() { ImageWriterOptions imageWriterOptions = new ImageWriterOptions.Builder(). setMetadataVersion(MetadataVersion.IBP_3_4_IV0).build(); RecordListWriter writer = new RecordListWriter(); diff --git a/metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java b/metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java index b3e964da85d..d9bf0876714 100644 --- a/metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java +++ b/metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java @@ -41,6 +41,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import static org.apache.kafka.common.metadata.MetadataRecordType.PARTITION_CHANGE_RECORD; import static org.apache.kafka.common.metadata.MetadataRecordType.PARTITION_RECORD; @@ -225,6 +226,11 @@ public class TopicsImageTest { ), changes.followers().keySet() ); + + TopicsImage finalImage = delta.apply(); + List imageRecords = getImageRecords(IMAGE1); + imageRecords.addAll(topicRecords); + testToImage(finalImage, Optional.of(imageRecords)); } @Test @@ -265,6 +271,11 @@ public class TopicsImageTest { assertEquals(new HashSet<>(Arrays.asList(new TopicPartition("zoo", 0))), changes.deletes()); assertEquals(Collections.emptyMap(), changes.leaders()); assertEquals(Collections.emptyMap(), changes.followers()); + + TopicsImage finalImage = delta.apply(); + List imageRecords = getImageRecords(image); + imageRecords.addAll(topicRecords); + testToImage(finalImage, Optional.of(imageRecords)); } @Test @@ -365,35 +376,58 @@ public class TopicsImageTest { new HashSet<>(Arrays.asList(new TopicPartition("zoo", 1), new TopicPartition("zoo", 5))), changes.followers().keySet() ); + + + TopicsImage finalImage = delta.apply(); + List imageRecords = getImageRecords(image); + imageRecords.addAll(topicRecords); + testToImage(finalImage, Optional.of(imageRecords)); } @Test - public void testEmptyImageRoundTrip() throws Throwable { - testToImageAndBack(TopicsImage.EMPTY); + public void testEmptyImageRoundTrip() { + testToImage(TopicsImage.EMPTY); } @Test - public void testImage1RoundTrip() throws Throwable { - testToImageAndBack(IMAGE1); + public void testImage1RoundTrip() { + testToImage(IMAGE1); } @Test - public void testApplyDelta1() throws Throwable { + public void testApplyDelta1() { assertEquals(IMAGE2, DELTA1.apply()); + // check image1 + delta1 = image2, since records for image1 + delta1 might differ from records from image2 + List records = getImageRecords(IMAGE1); + records.addAll(DELTA1_RECORDS); + testToImage(IMAGE2, records); } @Test - public void testImage2RoundTrip() throws Throwable { - testToImageAndBack(IMAGE2); + public void testImage2RoundTrip() { + testToImage(IMAGE2); } - private void testToImageAndBack(TopicsImage image) throws Throwable { + private static void testToImage(TopicsImage image) { + testToImage(image, Optional.empty()); + } + + private static void testToImage(TopicsImage image, Optional> fromRecords) { + testToImage(image, fromRecords.orElseGet(() -> getImageRecords(image))); + } + + private static void testToImage(TopicsImage image, List fromRecords) { + // test from empty image stopping each of the various intermediate images along the way + new RecordTestUtils.TestThroughAllIntermediateImagesLeadingToFinalImageHelper<>( + () -> TopicsImage.EMPTY, + TopicsDelta::new + ).test(image, fromRecords); + } + + private static List getImageRecords(TopicsImage image) { RecordListWriter writer = new RecordListWriter(); image.write(writer, new ImageWriterOptions.Builder().build()); - TopicsDelta delta = new TopicsDelta(TopicsImage.EMPTY); - RecordTestUtils.replayAll(delta, writer.records()); - TopicsImage nextImage = delta.apply(); - assertEquals(image, nextImage); + return writer.records(); } @Test diff --git a/metadata/src/test/java/org/apache/kafka/metadata/RecordTestUtils.java b/metadata/src/test/java/org/apache/kafka/metadata/RecordTestUtils.java index 20569dc1173..a1dc2ce4fbb 100644 --- a/metadata/src/test/java/org/apache/kafka/metadata/RecordTestUtils.java +++ b/metadata/src/test/java/org/apache/kafka/metadata/RecordTestUtils.java @@ -38,8 +38,11 @@ import java.util.Comparator; import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.function.Function; +import java.util.function.Supplier; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -95,6 +98,74 @@ public class RecordTestUtils { replayAll(target, Collections.singletonList(recordAndVersion)); } + public static class TestThroughAllIntermediateImagesLeadingToFinalImageHelper { + private final Supplier emptyImageSupplier; + private final Function deltaUponImageCreator; + + public TestThroughAllIntermediateImagesLeadingToFinalImageHelper( + Supplier emptyImageSupplier, Function deltaUponImageCreator + ) { + this.emptyImageSupplier = Objects.requireNonNull(emptyImageSupplier); + this.deltaUponImageCreator = Objects.requireNonNull(deltaUponImageCreator); + } + + public I getEmptyImage() { + return this.emptyImageSupplier.get(); + } + + public D createDeltaUponImage(I image) { + return this.deltaUponImageCreator.apply(image); + } + + @SuppressWarnings("unchecked") + public I createImageByApplyingDelta(D delta) { + try { + try { + Method method = delta.getClass().getMethod("apply"); + return (I) method.invoke(delta); + } catch (NoSuchMethodException e) { + throw new RuntimeException(e); + } + } catch (InvocationTargetException e) { + throw new RuntimeException(e.getCause()); + } catch (IllegalAccessException e) { + throw new RuntimeException(e); + } + } + + public void test(I finalImage, List fromRecords) { + for (int numRecordsForfirstImage = 1; numRecordsForfirstImage <= fromRecords.size(); ++numRecordsForfirstImage) { + // create first image from first numRecordsForfirstImage records + D delta = createDeltaUponImage(getEmptyImage()); + RecordTestUtils.replayAll(delta, fromRecords.subList(0, numRecordsForfirstImage)); + I firstImage = createImageByApplyingDelta(delta); + // for all possible further batch sizes, apply as many batches as it takes to get to the final image + int remainingRecords = fromRecords.size() - numRecordsForfirstImage; + if (remainingRecords == 0) { + assertEquals(finalImage, firstImage); + } else { + // for all possible further batch sizes... + for (int maxRecordsForSuccessiveBatches = 1; maxRecordsForSuccessiveBatches <= remainingRecords; ++maxRecordsForSuccessiveBatches) { + I latestIntermediateImage = firstImage; + // ... apply as many batches as it takes to get to the final image + int numAdditionalBatches = (int) Math.ceil(remainingRecords * 1.0 / maxRecordsForSuccessiveBatches); + for (int additionalBatchNum = 0; additionalBatchNum < numAdditionalBatches; ++additionalBatchNum) { + // apply up to maxRecordsForSuccessiveBatches records on top of the latest intermediate image + // to obtain the next intermediate image. + delta = createDeltaUponImage(latestIntermediateImage); + int applyFromIndex = numRecordsForfirstImage + additionalBatchNum * maxRecordsForSuccessiveBatches; + int applyToIndex = Math.min(fromRecords.size(), applyFromIndex + maxRecordsForSuccessiveBatches); + RecordTestUtils.replayAll(delta, fromRecords.subList(applyFromIndex, applyToIndex)); + latestIntermediateImage = createImageByApplyingDelta(delta); + } + // The final intermediate image received should be the expected final image + assertEquals(finalImage, latestIntermediateImage); + } + } + } + } + } + /** * Replay a list of record batches. *