[MINOR] QuorumController tests use testToImage (#14405)

This commit is contained in:
Alyssa Huang 2023-09-22 11:50:20 -07:00 committed by GitHub
parent 5bdea94c05
commit 2d262efb00
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 108 additions and 4 deletions

View File

@ -38,6 +38,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function; import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.IntStream; import java.util.stream.IntStream;
@ -90,12 +91,32 @@ import org.apache.kafka.common.metadata.TopicRecord;
import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ApiError; import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.controller.QuorumController.ConfigResourceExistenceChecker; import org.apache.kafka.controller.QuorumController.ConfigResourceExistenceChecker;
import org.apache.kafka.image.AclsDelta;
import org.apache.kafka.image.AclsImage;
import org.apache.kafka.image.ClientQuotasDelta;
import org.apache.kafka.image.ClientQuotasImage;
import org.apache.kafka.image.ClusterDelta;
import org.apache.kafka.image.ClusterImage;
import org.apache.kafka.image.ConfigurationsDelta;
import org.apache.kafka.image.ConfigurationsImage;
import org.apache.kafka.image.DelegationTokenDelta;
import org.apache.kafka.image.DelegationTokenImage;
import org.apache.kafka.image.FeaturesDelta;
import org.apache.kafka.image.FeaturesImage;
import org.apache.kafka.image.ProducerIdsDelta;
import org.apache.kafka.image.ProducerIdsImage;
import org.apache.kafka.image.ScramDelta;
import org.apache.kafka.image.ScramImage;
import org.apache.kafka.image.TopicsDelta;
import org.apache.kafka.image.TopicsImage;
import org.apache.kafka.metadata.BrokerHeartbeatReply; import org.apache.kafka.metadata.BrokerHeartbeatReply;
import org.apache.kafka.metadata.BrokerRegistrationFencingChange; import org.apache.kafka.metadata.BrokerRegistrationFencingChange;
import org.apache.kafka.metadata.BrokerRegistrationReply; import org.apache.kafka.metadata.BrokerRegistrationReply;
import org.apache.kafka.metadata.FinalizedControllerFeatures; import org.apache.kafka.metadata.FinalizedControllerFeatures;
import org.apache.kafka.metadata.PartitionRegistration; import org.apache.kafka.metadata.PartitionRegistration;
import org.apache.kafka.metadata.RecordTestUtils; import org.apache.kafka.metadata.RecordTestUtils;
import org.apache.kafka.metadata.RecordTestUtils.ImageDeltaPair;
import org.apache.kafka.metadata.RecordTestUtils.TestThroughAllIntermediateImagesLeadingToFinalImageHelper;
import org.apache.kafka.metadata.authorizer.StandardAuthorizer; import org.apache.kafka.metadata.authorizer.StandardAuthorizer;
import org.apache.kafka.metadata.bootstrap.BootstrapMetadata; import org.apache.kafka.metadata.bootstrap.BootstrapMetadata;
import org.apache.kafka.metadata.migration.ZkMigrationState; import org.apache.kafka.metadata.migration.ZkMigrationState;
@ -109,7 +130,6 @@ import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.MetadataVersion; import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.fault.FaultHandlerException; import org.apache.kafka.server.fault.FaultHandlerException;
import org.apache.kafka.snapshot.FileRawSnapshotReader; import org.apache.kafka.snapshot.FileRawSnapshotReader;
import org.apache.kafka.snapshot.RawSnapshotReader;
import org.apache.kafka.snapshot.Snapshots; import org.apache.kafka.snapshot.Snapshots;
import org.apache.kafka.test.TestUtils; import org.apache.kafka.test.TestUtils;
import org.apache.kafka.timeline.SnapshotRegistry; import org.apache.kafka.timeline.SnapshotRegistry;
@ -171,6 +191,8 @@ public class QuorumControllerTest {
setBrokerId(0). setBrokerId(0).
setClusterId(logEnv.clusterId())).get(); setClusterId(logEnv.clusterId())).get();
testConfigurationOperations(controlEnv.activeController()); testConfigurationOperations(controlEnv.activeController());
testToImages(logEnv.allRecords());
} }
} }
@ -212,6 +234,8 @@ public class QuorumControllerTest {
setBrokerId(0). setBrokerId(0).
setClusterId(logEnv.clusterId())).get(); setClusterId(logEnv.clusterId())).get();
testDelayedConfigurationOperations(logEnv, controlEnv.activeController()); testDelayedConfigurationOperations(logEnv, controlEnv.activeController());
testToImages(logEnv.allRecords());
} }
} }
@ -325,6 +349,8 @@ public class QuorumControllerTest {
// Check that there are imbalaned partitions // Check that there are imbalaned partitions
assertTrue(active.replicationControl().arePartitionLeadersImbalanced()); assertTrue(active.replicationControl().arePartitionLeadersImbalanced());
testToImages(logEnv.allRecords());
} }
} }
@ -469,6 +495,8 @@ public class QuorumControllerTest {
TimeUnit.MILLISECONDS.convert(leaderImbalanceCheckIntervalNs * 10, TimeUnit.NANOSECONDS), TimeUnit.MILLISECONDS.convert(leaderImbalanceCheckIntervalNs * 10, TimeUnit.NANOSECONDS),
"Leaders were not balanced after unfencing all of the brokers" "Leaders were not balanced after unfencing all of the brokers"
); );
testToImages(logEnv.allRecords());
} }
} }
@ -506,7 +534,6 @@ public class QuorumControllerTest {
"High watermark was not established" "High watermark was not established"
); );
final long firstHighWatermark = localLogManager.highWatermark().getAsLong(); final long firstHighWatermark = localLogManager.highWatermark().getAsLong();
TestUtils.waitForCondition( TestUtils.waitForCondition(
() -> localLogManager.highWatermark().getAsLong() > firstHighWatermark, () -> localLogManager.highWatermark().getAsLong() > firstHighWatermark,
@ -585,6 +612,8 @@ public class QuorumControllerTest {
return iterator.next(); return iterator.next();
}); });
assertEquals(0, topicPartitionFuture.get().partitionId()); assertEquals(0, topicPartitionFuture.get().partitionId());
testToImages(logEnv.allRecords());
} }
} }
@ -604,7 +633,6 @@ public class QuorumControllerTest {
public void testSnapshotSaveAndLoad() throws Throwable { public void testSnapshotSaveAndLoad() throws Throwable {
final int numBrokers = 4; final int numBrokers = 4;
Map<Integer, Long> brokerEpochs = new HashMap<>(); Map<Integer, Long> brokerEpochs = new HashMap<>();
RawSnapshotReader reader = null;
Uuid fooId; Uuid fooId;
try ( try (
LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(3). LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(3).
@ -655,6 +683,8 @@ public class QuorumControllerTest {
new AllocateProducerIdsRequestData().setBrokerId(0).setBrokerEpoch(brokerEpochs.get(0))).get(); new AllocateProducerIdsRequestData().setBrokerId(0).setBrokerEpoch(brokerEpochs.get(0))).get();
controlEnv.close(); controlEnv.close();
assertEquals(generateTestRecords(fooId, brokerEpochs), logEnv.allRecords()); assertEquals(generateTestRecords(fooId, brokerEpochs), logEnv.allRecords());
testToImages(logEnv.allRecords());
} }
} }
@ -784,6 +814,8 @@ public class QuorumControllerTest {
assertYieldsTimeout(electLeadersFuture); assertYieldsTimeout(electLeadersFuture);
assertYieldsTimeout(alterReassignmentsFuture); assertYieldsTimeout(alterReassignmentsFuture);
assertYieldsTimeout(listReassignmentsFuture); assertYieldsTimeout(listReassignmentsFuture);
testToImages(logEnv.allRecords());
} }
} }
@ -833,6 +865,8 @@ public class QuorumControllerTest {
electLeadersFuture.get(); electLeadersFuture.get();
alterReassignmentsFuture.get(); alterReassignmentsFuture.get();
countDownLatch.countDown(); countDownLatch.countDown();
testToImages(logEnv.allRecords());
} }
} }
@ -939,6 +973,8 @@ public class QuorumControllerTest {
partitionsWithReplica2 partitionsWithReplica2
) )
); );
testToImages(logEnv.allRecords());
} }
} }
@ -980,6 +1016,8 @@ public class QuorumControllerTest {
// Topic bar does not exist, so this should throw an exception. // Topic bar does not exist, so this should throw an exception.
assertThrows(UnknownTopicOrPartitionException.class, assertThrows(UnknownTopicOrPartitionException.class,
() -> checker.accept(new ConfigResource(TOPIC, "bar"))); () -> checker.accept(new ConfigResource(TOPIC, "bar")));
testToImages(logEnv.allRecords());
} }
} }
@ -1136,6 +1174,8 @@ public class QuorumControllerTest {
// were already records present. // were already records present.
assertEquals(Collections.emptyMap(), active.configurationControl(). assertEquals(Collections.emptyMap(), active.configurationControl().
getConfigs(new ConfigResource(BROKER, ""))); getConfigs(new ConfigResource(BROKER, "")));
testToImages(logEnv.allRecords());
} }
} }
@ -1177,6 +1217,8 @@ public class QuorumControllerTest {
return resultOrError.isResult() && return resultOrError.isResult() &&
Collections.singletonMap("foo", "bar").equals(resultOrError.result()); Collections.singletonMap("foo", "bar").equals(resultOrError.result());
}, "Failed to see expected config change from bootstrap metadata"); }, "Failed to see expected config change from bootstrap metadata");
testToImages(logEnv.allRecords());
} }
} }
@ -1252,8 +1294,10 @@ public class QuorumControllerTest {
build(); build();
) { ) {
QuorumController active = controlEnv.activeController(); QuorumController active = controlEnv.activeController();
return active.appendReadEvent("read migration state", OptionalLong.empty(), ZkMigrationState zkMigrationState = active.appendReadEvent("read migration state", OptionalLong.empty(),
() -> active.featureControl().zkMigrationState()).get(30, TimeUnit.SECONDS); () -> active.featureControl().zkMigrationState()).get(30, TimeUnit.SECONDS);
testToImages(logEnv.allRecords());
return zkMigrationState;
} }
} }
@ -1278,6 +1322,8 @@ public class QuorumControllerTest {
assertEquals(active.featureControl().zkMigrationState(), ZkMigrationState.MIGRATION); assertEquals(active.featureControl().zkMigrationState(), ZkMigrationState.MIGRATION);
assertFalse(active.featureControl().inPreMigrationMode()); assertFalse(active.featureControl().inPreMigrationMode());
} }
testToImages(logEnv.allRecords());
} }
} }
@ -1419,6 +1465,34 @@ public class QuorumControllerTest {
assertEquals(ZkMigrationState.NONE, active.appendReadEvent("read migration state", OptionalLong.empty(), assertEquals(ZkMigrationState.NONE, active.appendReadEvent("read migration state", OptionalLong.empty(),
() -> active.featureControl().zkMigrationState()).get(30, TimeUnit.SECONDS)); () -> active.featureControl().zkMigrationState()).get(30, TimeUnit.SECONDS));
assertThrows(FaultHandlerException.class, controlEnv::close); assertThrows(FaultHandlerException.class, controlEnv::close);
testToImages(logEnv.allRecords());
}
}
/**
* Tests all intermediate images lead to the same final image for each image & delta type.
* @param fromRecords
*/
@SuppressWarnings("unchecked")
private static void testToImages(List<ApiMessageAndVersion> fromRecords) {
List<ImageDeltaPair<?, ?>> testMatrix = Arrays.asList(
new ImageDeltaPair<>(() -> AclsImage.EMPTY, AclsDelta::new),
new ImageDeltaPair<>(() -> ClientQuotasImage.EMPTY, ClientQuotasDelta::new),
new ImageDeltaPair<>(() -> ClusterImage.EMPTY, ClusterDelta::new),
new ImageDeltaPair<>(() -> ConfigurationsImage.EMPTY, ConfigurationsDelta::new),
new ImageDeltaPair<>(() -> DelegationTokenImage.EMPTY, DelegationTokenDelta::new),
new ImageDeltaPair<>(() -> FeaturesImage.EMPTY, FeaturesDelta::new),
new ImageDeltaPair<>(() -> ProducerIdsImage.EMPTY, ProducerIdsDelta::new),
new ImageDeltaPair<>(() -> ScramImage.EMPTY, ScramDelta::new),
new ImageDeltaPair<>(() -> TopicsImage.EMPTY, TopicsDelta::new)
);
// test from empty image stopping each of the various intermediate images along the way
for (ImageDeltaPair<?, ?> pair : testMatrix) {
new TestThroughAllIntermediateImagesLeadingToFinalImageHelper<>(
(Supplier<Object>) pair.imageSupplier(), (Function<Object, Object>) pair.deltaCreator()
).test(fromRecords);
} }
} }

View File

@ -121,6 +121,24 @@ public class RecordTestUtils {
} }
} }
public static class ImageDeltaPair<I, D> {
private final Supplier<I> imageSupplier;
private final Function<I, D> deltaCreator;
public ImageDeltaPair(Supplier<I> imageSupplier, Function<I, D> deltaCreator) {
this.imageSupplier = imageSupplier;
this.deltaCreator = deltaCreator;
}
public Supplier<I> imageSupplier() {
return imageSupplier;
}
public Function<I, D> deltaCreator() {
return deltaCreator;
}
}
public static class TestThroughAllIntermediateImagesLeadingToFinalImageHelper<D, I> { public static class TestThroughAllIntermediateImagesLeadingToFinalImageHelper<D, I> {
private final Supplier<I> emptyImageSupplier; private final Supplier<I> emptyImageSupplier;
private final Function<I, D> deltaUponImageCreator; private final Function<I, D> deltaUponImageCreator;
@ -187,6 +205,18 @@ public class RecordTestUtils {
} }
} }
} }
/**
* Tests applying records in all variations of batch sizes will result in the same image as applying all records in one batch.
* @param fromRecords The list of records to apply.
*/
public void test(List<ApiMessageAndVersion> fromRecords) {
D finalImageDelta = createDeltaUponImage(getEmptyImage());
RecordTestUtils.replayAll(finalImageDelta, fromRecords);
I finalImage = createImageByApplyingDelta(finalImageDelta);
test(finalImage, fromRecords);
}
} }
/** /**