MINOR: Metadata image test improvements (#15373)

Reviewers: Mickael Maison <mickael.maison@gmail.com>
This commit is contained in:
Alyssa Huang 2024-03-28 03:22:02 -07:00 committed by GitHub
parent 4cb6806cb8
commit 4ccbf1634a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 219 additions and 21 deletions

View File

@ -105,7 +105,14 @@ public class ProducerIdControlManagerTest {
.setBrokerEpoch(100)
.setNextProducerId(40));
}, "Producer ID range must only increase");
range = producerIdControlManager.generateNextProducerId(1, 100).response();
assertThrows(RuntimeException.class, () -> {
producerIdControlManager.replay(
new ProducerIdsRecord()
.setBrokerId(2)
.setBrokerEpoch(100)
.setNextProducerId(42));
}, "Producer ID range must only increase");
range = producerIdControlManager.generateNextProducerId(3, 100).response();
assertEquals(42, range.firstProducerId());
// Gaps in the ID range are okay.

View File

@ -30,6 +30,7 @@ import org.junit.jupiter.api.Timeout;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -51,27 +52,31 @@ public class ClientQuotasImageTest {
static {
Map<ClientQuotaEntity, ClientQuotaImage> entities1 = new HashMap<>();
Map<String, String> fooUser = new HashMap<>();
fooUser.put(ClientQuotaEntity.USER, "foo");
Map<String, Double> fooUserQuotas = new HashMap<>();
fooUserQuotas.put(QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, 123.0);
Map<String, String> fooUser = Collections.singletonMap(ClientQuotaEntity.USER, "foo");
Map<String, Double> fooUserQuotas = Collections.singletonMap(QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, 123.0);
entities1.put(new ClientQuotaEntity(fooUser), new ClientQuotaImage(fooUserQuotas));
Map<String, String> barUserAndIp = new HashMap<>();
barUserAndIp.put(ClientQuotaEntity.USER, "bar");
barUserAndIp.put(ClientQuotaEntity.IP, "127.0.0.1");
Map<String, Double> barUserAndIpQuotas = new HashMap<>();
barUserAndIpQuotas.put(QuotaConfigs.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG, 456.0);
entities1.put(new ClientQuotaEntity(barUserAndIp),
new ClientQuotaImage(barUserAndIpQuotas));
Map<String, Double> barUserAndIpQuotas = Collections.singletonMap(QuotaConfigs.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG, 456.0);
entities1.put(new ClientQuotaEntity(barUserAndIp), new ClientQuotaImage(barUserAndIpQuotas));
IMAGE1 = new ClientQuotasImage(entities1);
DELTA1_RECORDS = new ArrayList<>();
// remove quota
DELTA1_RECORDS.add(new ApiMessageAndVersion(new ClientQuotaRecord().
setEntity(Arrays.asList(
new EntityData().setEntityType(ClientQuotaEntity.USER).setEntityName("bar"),
new EntityData().setEntityType(ClientQuotaEntity.IP).setEntityName("127.0.0.1"))).
setKey(QuotaConfigs.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG).
setRemove(true), CLIENT_QUOTA_RECORD.highestSupportedVersion()));
// alter quota
DELTA1_RECORDS.add(new ApiMessageAndVersion(new ClientQuotaRecord().
setEntity(Arrays.asList(
new EntityData().setEntityType(ClientQuotaEntity.USER).setEntityName("foo"))).
setKey(QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG).
setValue(234.0), CLIENT_QUOTA_RECORD.highestSupportedVersion()));
// add quota to entity with existing quota
DELTA1_RECORDS.add(new ApiMessageAndVersion(new ClientQuotaRecord().
setEntity(Arrays.asList(
new EntityData().setEntityType(ClientQuotaEntity.USER).setEntityName("foo"))).
@ -83,7 +88,7 @@ public class ClientQuotasImageTest {
Map<ClientQuotaEntity, ClientQuotaImage> entities2 = new HashMap<>();
Map<String, Double> fooUserQuotas2 = new HashMap<>();
fooUserQuotas2.put(QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, 123.0);
fooUserQuotas2.put(QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, 234.0);
fooUserQuotas2.put(QuotaConfigs.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG, 999.0);
entities2.put(new ClientQuotaEntity(fooUser), new ClientQuotaImage(fooUserQuotas2));
IMAGE2 = new ClientQuotasImage(entities2);

View File

@ -21,6 +21,11 @@ import org.apache.kafka.common.Endpoint;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
import org.apache.kafka.common.metadata.FenceBrokerRecord;
import org.apache.kafka.common.metadata.RegisterBrokerRecord;
import org.apache.kafka.common.metadata.RegisterBrokerRecord.BrokerEndpoint;
import org.apache.kafka.common.metadata.RegisterBrokerRecord.BrokerEndpointCollection;
import org.apache.kafka.common.metadata.RegisterBrokerRecord.BrokerFeature;
import org.apache.kafka.common.metadata.RegisterBrokerRecord.BrokerFeatureCollection;
import org.apache.kafka.common.metadata.RegisterControllerRecord;
import org.apache.kafka.common.metadata.RegisterControllerRecord.ControllerEndpoint;
import org.apache.kafka.common.metadata.RegisterControllerRecord.ControllerEndpointCollection;
@ -48,7 +53,9 @@ import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import static org.apache.kafka.common.metadata.MetadataRecordType.BROKER_REGISTRATION_CHANGE_RECORD;
import static org.apache.kafka.common.metadata.MetadataRecordType.FENCE_BROKER_RECORD;
import static org.apache.kafka.common.metadata.MetadataRecordType.REGISTER_BROKER_RECORD;
import static org.apache.kafka.common.metadata.MetadataRecordType.UNFENCE_BROKER_RECORD;
import static org.junit.jupiter.api.Assertions.assertEquals;
@ -56,13 +63,19 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
@Timeout(value = 40)
public class ClusterImageTest {
public final static ClusterImage IMAGE1;
public static final ClusterImage IMAGE1;
static final List<ApiMessageAndVersion> DELTA1_RECORDS;
final static ClusterDelta DELTA1;
static final ClusterDelta DELTA1;
final static ClusterImage IMAGE2;
static final ClusterImage IMAGE2;
static final List<ApiMessageAndVersion> DELTA2_RECORDS;
static final ClusterDelta DELTA2;
static final ClusterImage IMAGE3;
static {
Map<Integer, BrokerRegistration> map1 = new HashMap<>();
@ -88,7 +101,7 @@ public class ClusterImageTest {
setId(2).
setEpoch(123).
setIncarnationId(Uuid.fromString("hr4TVh3YQiu3p16Awkka6w")).
setListeners(Arrays.asList(new Endpoint("PLAINTEXT", SecurityProtocol.PLAINTEXT, "localhost", 9093))).
setListeners(Arrays.asList(new Endpoint("PLAINTEXT", SecurityProtocol.PLAINTEXT, "localhost", 9094))).
setSupportedFeatures(Collections.emptyMap()).
setRack(Optional.of("arack")).
setFenced(false).
@ -104,14 +117,18 @@ public class ClusterImageTest {
IMAGE1 = new ClusterImage(map1, cmap1);
DELTA1_RECORDS = new ArrayList<>();
// unfence b0
DELTA1_RECORDS.add(new ApiMessageAndVersion(new UnfenceBrokerRecord().
setId(0).setEpoch(1000), UNFENCE_BROKER_RECORD.highestSupportedVersion()));
// fence b1
DELTA1_RECORDS.add(new ApiMessageAndVersion(new FenceBrokerRecord().
setId(1).setEpoch(1001), FENCE_BROKER_RECORD.highestSupportedVersion()));
// mark b0 in controlled shutdown
DELTA1_RECORDS.add(new ApiMessageAndVersion(new BrokerRegistrationChangeRecord().
setBrokerId(0).setBrokerEpoch(1000).setInControlledShutdown(
BrokerRegistrationInControlledShutdownChange.IN_CONTROLLED_SHUTDOWN.value()),
(short) 0));
BrokerRegistrationInControlledShutdownChange.IN_CONTROLLED_SHUTDOWN.value()),
BROKER_REGISTRATION_CHANGE_RECORD.highestSupportedVersion()));
// unregister b2
DELTA1_RECORDS.add(new ApiMessageAndVersion(new UnregisterBrokerRecord().
setBrokerId(2).setBrokerEpoch(123),
(short) 0));
@ -160,6 +177,67 @@ public class ClusterImageTest {
new Endpoint("PLAINTEXT", SecurityProtocol.PLAINTEXT, "localhost", 19093))).
setSupportedFeatures(Collections.emptyMap()).build());
IMAGE2 = new ClusterImage(map2, cmap2);
DELTA2_RECORDS = new ArrayList<>(DELTA1_RECORDS);
// fence b0
DELTA2_RECORDS.add(new ApiMessageAndVersion(new FenceBrokerRecord().
setId(0).setEpoch(1000), FENCE_BROKER_RECORD.highestSupportedVersion()));
// unfence b1
DELTA2_RECORDS.add(new ApiMessageAndVersion(new UnfenceBrokerRecord().
setId(1).setEpoch(1001), UNFENCE_BROKER_RECORD.highestSupportedVersion()));
// mark b0 as not in controlled shutdown
DELTA2_RECORDS.add(new ApiMessageAndVersion(new BrokerRegistrationChangeRecord().
setBrokerId(0).setBrokerEpoch(1000).setInControlledShutdown(
BrokerRegistrationInControlledShutdownChange.NONE.value()),
BROKER_REGISTRATION_CHANGE_RECORD.highestSupportedVersion()));
// re-register b2
DELTA2_RECORDS.add(new ApiMessageAndVersion(new RegisterBrokerRecord().
setBrokerId(2).setIsMigratingZkBroker(true).setIncarnationId(Uuid.fromString("Am5Yse7GQxaw0b2alM74bP")).
setBrokerEpoch(1002).setEndPoints(new BrokerEndpointCollection(
Arrays.asList(new BrokerEndpoint().setName("PLAINTEXT").setHost("localhost").
setPort(9094).setSecurityProtocol((short) 0)).iterator())).
setFeatures(new BrokerFeatureCollection(
Collections.singleton(new BrokerFeature().
setName(MetadataVersion.FEATURE_NAME).
setMinSupportedVersion(MetadataVersion.IBP_3_3_IV3.featureLevel()).
setMaxSupportedVersion(MetadataVersion.IBP_3_6_IV0.featureLevel())).iterator())).
setRack("rack3"),
REGISTER_BROKER_RECORD.highestSupportedVersion()));
DELTA2 = new ClusterDelta(IMAGE2);
RecordTestUtils.replayAll(DELTA2, DELTA2_RECORDS);
Map<Integer, BrokerRegistration> map3 = new HashMap<>();
map3.put(0, new BrokerRegistration.Builder().
setId(0).
setEpoch(1000).
setIncarnationId(Uuid.fromString("vZKYST0pSA2HO5x_6hoO2Q")).
setListeners(Arrays.asList(new Endpoint("PLAINTEXT", SecurityProtocol.PLAINTEXT, "localhost", 9092))).
setSupportedFeatures(Collections.singletonMap("foo", VersionRange.of((short) 1, (short) 3))).
setRack(Optional.empty()).
setFenced(true).
setInControlledShutdown(true).build());
map3.put(1, new BrokerRegistration.Builder().
setId(1).
setEpoch(1001).
setIncarnationId(Uuid.fromString("U52uRe20RsGI0RvpcTx33Q")).
setListeners(Arrays.asList(new Endpoint("PLAINTEXT", SecurityProtocol.PLAINTEXT, "localhost", 9093))).
setSupportedFeatures(Collections.singletonMap("foo", VersionRange.of((short) 1, (short) 3))).
setRack(Optional.empty()).
setFenced(false).
setInControlledShutdown(false).build());
map3.put(2, new BrokerRegistration.Builder().
setId(2).
setEpoch(1002).
setIncarnationId(Uuid.fromString("Am5Yse7GQxaw0b2alM74bP")).
setListeners(Arrays.asList(new Endpoint("PLAINTEXT", SecurityProtocol.PLAINTEXT, "localhost", 9094))).
setSupportedFeatures(Collections.singletonMap("metadata.version",
VersionRange.of(MetadataVersion.IBP_3_3_IV3.featureLevel(), MetadataVersion.IBP_3_6_IV0.featureLevel()))).
setRack(Optional.of("rack3")).
setFenced(true).
setIsMigratingZkBroker(true).build());
IMAGE3 = new ClusterImage(map3, cmap2);
}
@Test
@ -186,6 +264,20 @@ public class ClusterImageTest {
testToImage(IMAGE2);
}
@Test
public void testApplyDelta2() {
assertEquals(IMAGE3, DELTA2.apply());
// check image2 + delta2 = image3, since records for image2 + delta2 might differ from records from image3
List<ApiMessageAndVersion> records = getImageRecords(IMAGE2);
records.addAll(DELTA2_RECORDS);
testToImage(IMAGE3, records);
}
@Test
public void testImage3RoundTrip() {
testToImage(IMAGE3);
}
private static void testToImage(ClusterImage image) {
testToImage(image, Optional.empty());
}

View File

@ -45,23 +45,28 @@ public class FeaturesImageTest {
public final static List<ApiMessageAndVersion> DELTA1_RECORDS;
final static FeaturesDelta DELTA1;
final static FeaturesImage IMAGE2;
final static List<ApiMessageAndVersion> DELTA2_RECORDS;
final static FeaturesDelta DELTA2;
final static FeaturesImage IMAGE3;
static {
Map<String, Short> map1 = new HashMap<>();
map1.put("foo", (short) 2);
map1.put("bar", (short) 1);
map1.put("baz", (short) 8);
IMAGE1 = new FeaturesImage(map1, MetadataVersion.latestTesting(), ZkMigrationState.NONE);
DELTA1_RECORDS = new ArrayList<>();
// change feature level
DELTA1_RECORDS.add(new ApiMessageAndVersion(new FeatureLevelRecord().
setName("foo").setFeatureLevel((short) 3),
(short) 0));
// remove feature
DELTA1_RECORDS.add(new ApiMessageAndVersion(new FeatureLevelRecord().
setName("bar").setFeatureLevel((short) 0),
(short) 0));
// add feature
DELTA1_RECORDS.add(new ApiMessageAndVersion(new FeatureLevelRecord().
setName("baz").setFeatureLevel((short) 0),
setName("baz").setFeatureLevel((short) 8),
(short) 0));
DELTA1 = new FeaturesDelta(IMAGE1);
@ -69,7 +74,27 @@ public class FeaturesImageTest {
Map<String, Short> map2 = new HashMap<>();
map2.put("foo", (short) 3);
map2.put("baz", (short) 8);
IMAGE2 = new FeaturesImage(map2, MetadataVersion.latestTesting(), ZkMigrationState.NONE);
DELTA2_RECORDS = new ArrayList<>();
// remove all features
DELTA2_RECORDS.add(new ApiMessageAndVersion(new FeatureLevelRecord().
setName("foo").setFeatureLevel((short) 0),
(short) 0));
DELTA2_RECORDS.add(new ApiMessageAndVersion(new FeatureLevelRecord().
setName("baz").setFeatureLevel((short) 0),
(short) 0));
// add feature back with different feature level
DELTA2_RECORDS.add(new ApiMessageAndVersion(new FeatureLevelRecord().
setName("bar").setFeatureLevel((short) 1),
(short) 0));
DELTA2 = new FeaturesDelta(IMAGE2);
RecordTestUtils.replayAll(DELTA2, DELTA2_RECORDS);
Map<String, Short> map3 = Collections.singletonMap("bar", (short) 1);
IMAGE3 = new FeaturesImage(map3, MetadataVersion.latestTesting(), ZkMigrationState.NONE);
}
@Test
@ -96,6 +121,20 @@ public class FeaturesImageTest {
testToImage(IMAGE2);
}
@Test
public void testImage3RoundTrip() {
testToImage(IMAGE3);
}
@Test
public void testApplyDelta2() {
assertEquals(IMAGE3, DELTA2.apply());
// check image2 + delta2 = image3, since records for image2 + delta2 might differ from records from image3
List<ApiMessageAndVersion> records = getImageRecords(IMAGE2);
records.addAll(DELTA2_RECORDS);
testToImage(IMAGE3, records);
}
private static void testToImage(FeaturesImage image) {
testToImage(image, Optional.empty());
}

View File

@ -128,6 +128,39 @@ public class ImageDowngradeTest {
TEST_RECORDS.get(1)));
}
/**
* Test downgrading to a MetadataVersion that doesn't support ZK migration.
*/
@Test
public void testPreZkMigrationSupportVersion() {
writeWithExpectedLosses(MetadataVersion.IBP_3_3_IV3,
Arrays.asList(
"the isMigratingZkBroker state of one or more brokers"),
Arrays.asList(
metadataVersionRecord(MetadataVersion.IBP_3_4_IV0),
new ApiMessageAndVersion(new RegisterBrokerRecord().
setBrokerId(123).
setIncarnationId(Uuid.fromString("XgjKo16hRWeWrTui0iR5Nw")).
setBrokerEpoch(456).
setRack(null).
setFenced(false).
setInControlledShutdown(true).
setIsMigratingZkBroker(true), (short) 2),
TEST_RECORDS.get(0),
TEST_RECORDS.get(1)),
Arrays.asList(
metadataVersionRecord(MetadataVersion.IBP_3_3_IV3),
new ApiMessageAndVersion(new RegisterBrokerRecord().
setBrokerId(123).
setIncarnationId(Uuid.fromString("XgjKo16hRWeWrTui0iR5Nw")).
setBrokerEpoch(456).
setRack(null).
setFenced(false).
setInControlledShutdown(true), (short) 1),
TEST_RECORDS.get(0),
TEST_RECORDS.get(1)));
}
@Test
void testDirectoryAssignmentState() {
MetadataVersion outputMetadataVersion = MetadataVersion.IBP_3_7_IV0;

View File

@ -53,12 +53,20 @@ public class ProducerIdsImageTest {
DELTA1_RECORDS.add(new ApiMessageAndVersion(new ProducerIdsRecord().
setBrokerId(3).
setBrokerEpoch(100).
setNextProducerId(789), (short) 0));
setNextProducerId(780), (short) 0));
DELTA1_RECORDS.add(new ApiMessageAndVersion(new ProducerIdsRecord().
setBrokerId(3).
setBrokerEpoch(100).
setNextProducerId(785), (short) 0));
DELTA1_RECORDS.add(new ApiMessageAndVersion(new ProducerIdsRecord().
setBrokerId(2).
setBrokerEpoch(100).
setNextProducerId(800), (short) 0));
DELTA1 = new ProducerIdsDelta(IMAGE1);
RecordTestUtils.replayAll(DELTA1, DELTA1_RECORDS);
IMAGE2 = new ProducerIdsImage(789);
IMAGE2 = new ProducerIdsImage(800);
}
@Test

View File

@ -85,10 +85,15 @@ public class ScramImageTest {
IMAGE1 = new ScramImage(image1mechanisms);
DELTA1_RECORDS = new ArrayList<>();
// remove all sha512 credentials
DELTA1_RECORDS.add(new ApiMessageAndVersion(new RemoveUserScramCredentialRecord().
setName("alpha").
setMechanism(SCRAM_SHA_512.type()), (short) 0));
DELTA1_RECORDS.add(new ApiMessageAndVersion(new RemoveUserScramCredentialRecord().
setName("gamma").
setMechanism(SCRAM_SHA_512.type()), (short) 0));
ScramCredentialData secondAlpha256Credential = randomScramCredentialData(random);
// add sha256 credential
DELTA1_RECORDS.add(new ApiMessageAndVersion(new UserScramCredentialRecord().
setName("alpha").
setMechanism(SCRAM_SHA_256.type()).
@ -96,6 +101,15 @@ public class ScramImageTest {
setStoredKey(secondAlpha256Credential.storedKey()).
setServerKey(secondAlpha256Credential.serverKey()).
setIterations(secondAlpha256Credential.iterations()), (short) 0));
// add sha512 credential re-using name
ScramCredentialData secondAlpha512Credential = randomScramCredentialData(random);
DELTA1_RECORDS.add(new ApiMessageAndVersion(new UserScramCredentialRecord().
setName("alpha").
setMechanism(SCRAM_SHA_512.type()).
setSalt(secondAlpha512Credential.salt()).
setStoredKey(secondAlpha512Credential.storedKey()).
setServerKey(secondAlpha512Credential.serverKey()).
setIterations(secondAlpha512Credential.iterations()), (short) 0));
DELTA1 = new ScramDelta(IMAGE1);
RecordTestUtils.replayAll(DELTA1, DELTA1_RECORDS);
@ -107,7 +121,7 @@ public class ScramImageTest {
image2mechanisms.put(SCRAM_SHA_256, image2sha256);
Map<String, ScramCredentialData> image2sha512 = new HashMap<>();
image2sha512.put("alpha", image1sha512.get("alpha"));
image2sha512.put("alpha", secondAlpha512Credential);
image2mechanisms.put(SCRAM_SHA_512, image2sha512);
IMAGE2 = new ScramImage(image2mechanisms);