mirror of https://github.com/apache/kafka.git
KAFKA-16842; Fix config validation and support unknown voters (#16892)
This change fixes the Kafka configuration validation to take into account the reconfiguration changes to configuration and allows KRaft observers to start with an unknown set of voters. For the Kafka configuration validation the high-level change is that now the user only needs to specify either the controller.quorum.bootstrap.servers property or the controller.quorum.voters property. The other notable change in the configuration is that controller listeners can now be (and should be) specified in advertise.listeners property. Because Kafka can now be configured without any voters and just the bootstrap servers. The KRaft client needs to allow for an unknown set of voters during the initial startup. This is done by adding the VoterSet#empty set of voters to the KRaftControlRecordStateMachine. Lastly the RaftClientTestContext type is updated to support this new configuration for KRaft and a test is added to verify that observers can start and send Fetch requests when the voters are unknown. Reviewers: David Arthur <mumrah@gmail.com>
This commit is contained in:
parent
9f330c374d
commit
20c3e7324b
|
@ -1717,6 +1717,7 @@ project(':raft') {
|
|||
testImplementation libs.junitJupiter
|
||||
testImplementation libs.mockitoCore
|
||||
testImplementation libs.jqwik
|
||||
testImplementation libs.hamcrest
|
||||
|
||||
testRuntimeOnly libs.slf4jReload4j
|
||||
testRuntimeOnly libs.junitPlatformLanucher
|
||||
|
|
|
@ -890,30 +890,34 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
|
|||
|
||||
// validate KRaft-related configs
|
||||
val voterIds = QuorumConfig.parseVoterIds(quorumVoters)
|
||||
def validateNonEmptyQuorumVotersForKRaft(): Unit = {
|
||||
if (voterIds.isEmpty) {
|
||||
throw new ConfigException(s"If using ${KRaftConfigs.PROCESS_ROLES_CONFIG}, ${QuorumConfig.QUORUM_VOTERS_CONFIG} must contain a parseable set of voters.")
|
||||
def validateQuorumVotersAndQuorumBootstrapServerForKRaft(): Unit = {
|
||||
if (voterIds.isEmpty && quorumBootstrapServers.isEmpty) {
|
||||
throw new ConfigException(
|
||||
s"""If using ${KRaftConfigs.PROCESS_ROLES_CONFIG}, either ${QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG} must
|
||||
|contain the set of bootstrap controllers or ${QuorumConfig.QUORUM_VOTERS_CONFIG} must contain a parseable
|
||||
|set of controllers.""".stripMargin.replace("\n", " ")
|
||||
)
|
||||
}
|
||||
}
|
||||
def validateNonEmptyQuorumVotersForMigration(): Unit = {
|
||||
if (voterIds.isEmpty) {
|
||||
throw new ConfigException(s"If using ${KRaftConfigs.MIGRATION_ENABLED_CONFIG}, ${QuorumConfig.QUORUM_VOTERS_CONFIG} must contain a parseable set of voters.")
|
||||
def validateQuorumVotersAndQuorumBootstrapServerForMigration(): Unit = {
|
||||
if (voterIds.isEmpty && quorumBootstrapServers.isEmpty) {
|
||||
throw new ConfigException(
|
||||
s"""If using ${KRaftConfigs.MIGRATION_ENABLED_CONFIG}, either ${QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG} must
|
||||
|contain the set of bootstrap controllers or ${QuorumConfig.QUORUM_VOTERS_CONFIG} must contain a parseable
|
||||
|set of controllers.""".stripMargin.replace("\n", " ")
|
||||
)
|
||||
}
|
||||
}
|
||||
def validateControlPlaneListenerEmptyForKRaft(): Unit = {
|
||||
require(controlPlaneListenerName.isEmpty,
|
||||
s"${SocketServerConfigs.CONTROL_PLANE_LISTENER_NAME_CONFIG} is not supported in KRaft mode.")
|
||||
}
|
||||
def validateAdvertisedListenersDoesNotContainControllerListenersForKRaftBroker(): Unit = {
|
||||
require(advertisedBrokerListenerNames.forall(aln => !controllerListenerNames.contains(aln.value())),
|
||||
s"The advertised.listeners config must not contain KRaft controller listeners from ${KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG} when ${KRaftConfigs.PROCESS_ROLES_CONFIG} contains the broker role because Kafka clients that send requests via advertised listeners do not send requests to KRaft controllers -- they only send requests to KRaft brokers.")
|
||||
}
|
||||
def validateControllerQuorumVotersMustContainNodeIdForKRaftController(): Unit = {
|
||||
require(voterIds.contains(nodeId),
|
||||
require(voterIds.isEmpty || voterIds.contains(nodeId),
|
||||
s"If ${KRaftConfigs.PROCESS_ROLES_CONFIG} contains the 'controller' role, the node id $nodeId must be included in the set of voters ${QuorumConfig.QUORUM_VOTERS_CONFIG}=${voterIds.asScala.toSet}")
|
||||
}
|
||||
def validateControllerListenerExistsForKRaftController(): Unit = {
|
||||
require(controllerListeners.nonEmpty,
|
||||
def validateAdvertisedControllerListenersNonEmptyForKRaftController(): Unit = {
|
||||
require(effectiveAdvertisedControllerListeners.nonEmpty,
|
||||
s"${KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG} must contain at least one value appearing in the '${SocketServerConfigs.LISTENERS_CONFIG}' configuration when running the KRaft controller role")
|
||||
}
|
||||
def validateControllerListenerNamesMustAppearInListenersForKRaftController(): Unit = {
|
||||
|
@ -921,16 +925,15 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
|
|||
require(controllerListenerNames.forall(cln => listenerNameValues.contains(cln)),
|
||||
s"${KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG} must only contain values appearing in the '${SocketServerConfigs.LISTENERS_CONFIG}' configuration when running the KRaft controller role")
|
||||
}
|
||||
def validateAdvertisedListenersNonEmptyForBroker(): Unit = {
|
||||
def validateAdvertisedBrokerListenersNonEmptyForBroker(): Unit = {
|
||||
require(advertisedBrokerListenerNames.nonEmpty,
|
||||
"There must be at least one advertised listener." + (
|
||||
"There must be at least one broker advertised listener." + (
|
||||
if (processRoles.contains(ProcessRole.BrokerRole)) s" Perhaps all listeners appear in ${KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG}?" else ""))
|
||||
}
|
||||
if (processRoles == Set(ProcessRole.BrokerRole)) {
|
||||
// KRaft broker-only
|
||||
validateNonEmptyQuorumVotersForKRaft()
|
||||
validateQuorumVotersAndQuorumBootstrapServerForKRaft()
|
||||
validateControlPlaneListenerEmptyForKRaft()
|
||||
validateAdvertisedListenersDoesNotContainControllerListenersForKRaftBroker()
|
||||
// nodeId must not appear in controller.quorum.voters
|
||||
require(!voterIds.contains(nodeId),
|
||||
s"If ${KRaftConfigs.PROCESS_ROLES_CONFIG} contains just the 'broker' role, the node id $nodeId must not be included in the set of voters ${QuorumConfig.QUORUM_VOTERS_CONFIG}=${voterIds.asScala.toSet}")
|
||||
|
@ -952,10 +955,9 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
|
|||
if (controllerListenerNames.size > 1) {
|
||||
warn(s"${KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG} has multiple entries; only the first will be used since ${KRaftConfigs.PROCESS_ROLES_CONFIG}=broker: ${controllerListenerNames.asJava}")
|
||||
}
|
||||
validateAdvertisedListenersNonEmptyForBroker()
|
||||
} else if (processRoles == Set(ProcessRole.ControllerRole)) {
|
||||
// KRaft controller-only
|
||||
validateNonEmptyQuorumVotersForKRaft()
|
||||
validateQuorumVotersAndQuorumBootstrapServerForKRaft()
|
||||
validateControlPlaneListenerEmptyForKRaft()
|
||||
// listeners should only contain listeners also enumerated in the controller listener
|
||||
require(
|
||||
|
@ -963,21 +965,19 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
|
|||
s"The ${SocketServerConfigs.LISTENERS_CONFIG} config must only contain KRaft controller listeners from ${KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG} when ${KRaftConfigs.PROCESS_ROLES_CONFIG}=controller"
|
||||
)
|
||||
validateControllerQuorumVotersMustContainNodeIdForKRaftController()
|
||||
validateControllerListenerExistsForKRaftController()
|
||||
validateAdvertisedControllerListenersNonEmptyForKRaftController()
|
||||
validateControllerListenerNamesMustAppearInListenersForKRaftController()
|
||||
} else if (isKRaftCombinedMode) {
|
||||
// KRaft combined broker and controller
|
||||
validateNonEmptyQuorumVotersForKRaft()
|
||||
validateQuorumVotersAndQuorumBootstrapServerForKRaft()
|
||||
validateControlPlaneListenerEmptyForKRaft()
|
||||
validateAdvertisedListenersDoesNotContainControllerListenersForKRaftBroker()
|
||||
validateControllerQuorumVotersMustContainNodeIdForKRaftController()
|
||||
validateControllerListenerExistsForKRaftController()
|
||||
validateAdvertisedControllerListenersNonEmptyForKRaftController()
|
||||
validateControllerListenerNamesMustAppearInListenersForKRaftController()
|
||||
validateAdvertisedListenersNonEmptyForBroker()
|
||||
} else {
|
||||
// ZK-based
|
||||
if (migrationEnabled) {
|
||||
validateNonEmptyQuorumVotersForMigration()
|
||||
validateQuorumVotersAndQuorumBootstrapServerForMigration()
|
||||
require(controllerListenerNames.nonEmpty,
|
||||
s"${KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG} must not be empty when running in ZooKeeper migration mode: ${controllerListenerNames.asJava}")
|
||||
require(interBrokerProtocolVersion.isMigrationSupported, s"Cannot enable ZooKeeper migration without setting " +
|
||||
|
@ -992,13 +992,12 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
|
|||
require(controllerListenerNames.isEmpty,
|
||||
s"${KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG} must be empty when not running in KRaft mode: ${controllerListenerNames.asJava}")
|
||||
}
|
||||
validateAdvertisedListenersNonEmptyForBroker()
|
||||
}
|
||||
|
||||
val listenerNames = listeners.map(_.listenerName).toSet
|
||||
if (processRoles.isEmpty || processRoles.contains(ProcessRole.BrokerRole)) {
|
||||
// validations for all broker setups (i.e. ZooKeeper and KRaft broker-only and KRaft co-located)
|
||||
validateAdvertisedListenersNonEmptyForBroker()
|
||||
validateAdvertisedBrokerListenersNonEmptyForBroker()
|
||||
require(advertisedBrokerListenerNames.contains(interBrokerListenerName),
|
||||
s"${ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG} must be a listener name defined in ${SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG}. " +
|
||||
s"The valid options based on currently configured listeners are ${advertisedBrokerListenerNames.map(_.value).mkString(",")}")
|
||||
|
|
|
@ -152,8 +152,12 @@ class KafkaConfigTest {
|
|||
propertiesFile.setProperty(KRaftConfigs.NODE_ID_CONFIG, "1")
|
||||
propertiesFile.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, "")
|
||||
setListenerProps(propertiesFile)
|
||||
assertBadConfigContainingMessage(propertiesFile,
|
||||
"If using process.roles, controller.quorum.voters must contain a parseable set of voters.")
|
||||
assertBadConfigContainingMessage(
|
||||
propertiesFile,
|
||||
"""If using process.roles, either controller.quorum.bootstrap.servers
|
||||
|must contain the set of bootstrap controllers or controller.quorum.voters must contain a
|
||||
|parseable set of controllers.""".stripMargin.replace("\n", " ")
|
||||
)
|
||||
|
||||
// Ensure that if neither process.roles nor controller.quorum.voters is populated, then an exception is thrown if zookeeper.connect is not defined
|
||||
propertiesFile.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "")
|
||||
|
|
|
@ -446,7 +446,10 @@ class KafkaConfigTest {
|
|||
props.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, "2@localhost:9093")
|
||||
|
||||
assertFalse(isValidKafkaConfig(props))
|
||||
assertBadConfigContainingMessage(props, "There must be at least one advertised listener. Perhaps all listeners appear in controller.listener.names?")
|
||||
assertBadConfigContainingMessage(
|
||||
props,
|
||||
"There must be at least one broker advertised listener. Perhaps all listeners appear in controller.listener.names?"
|
||||
)
|
||||
|
||||
props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, "PLAINTEXT://localhost:9092,SSL://localhost:9093")
|
||||
KafkaConfig.fromProps(props)
|
||||
|
@ -1832,8 +1835,11 @@ class KafkaConfigTest {
|
|||
val props = TestUtils.createBrokerConfig(1, TestUtils.MockZkConnect, port = TestUtils.MockZkPort)
|
||||
props.setProperty(KRaftConfigs.MIGRATION_ENABLED_CONFIG, "true")
|
||||
assertEquals(
|
||||
"If using zookeeper.metadata.migration.enable, controller.quorum.voters must contain a parseable set of voters.",
|
||||
assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props)).getMessage)
|
||||
"""If using zookeeper.metadata.migration.enable, either controller.quorum.bootstrap.servers
|
||||
|must contain the set of bootstrap controllers or controller.quorum.voters must contain a
|
||||
|parseable set of controllers.""".stripMargin.replace("\n", " "),
|
||||
assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props)).getMessage
|
||||
)
|
||||
|
||||
props.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, "3000@localhost:9093")
|
||||
assertEquals(
|
||||
|
|
|
@ -21,6 +21,7 @@ import org.apache.kafka.common.Node;
|
|||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.apache.kafka.common.Uuid;
|
||||
import org.apache.kafka.common.compress.Compression;
|
||||
import org.apache.kafka.common.config.ConfigException;
|
||||
import org.apache.kafka.common.errors.ClusterAuthorizationException;
|
||||
import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
|
||||
import org.apache.kafka.common.feature.SupportedVersionRange;
|
||||
|
@ -455,9 +456,9 @@ public final class KafkaRaftClient<T> implements RaftClient<T> {
|
|||
QuorumStateStore quorumStateStore,
|
||||
Metrics metrics
|
||||
) {
|
||||
Optional<VoterSet> staticVoters = voterAddresses.isEmpty() ?
|
||||
Optional.empty() :
|
||||
Optional.of(VoterSet.fromInetSocketAddresses(channel.listenerName(), voterAddresses));
|
||||
VoterSet staticVoters = voterAddresses.isEmpty() ?
|
||||
VoterSet.empty() :
|
||||
VoterSet.fromInetSocketAddresses(channel.listenerName(), voterAddresses);
|
||||
|
||||
partitionState = new KRaftControlRecordStateMachine(
|
||||
staticVoters,
|
||||
|
@ -470,8 +471,18 @@ public final class KafkaRaftClient<T> implements RaftClient<T> {
|
|||
// Read the entire log
|
||||
logger.info("Reading KRaft snapshot and log as part of the initialization");
|
||||
partitionState.updateState();
|
||||
logger.info("Starting voters are {}", partitionState.lastVoterSet());
|
||||
|
||||
if (requestManager == null) {
|
||||
if (voterAddresses.isEmpty()) {
|
||||
throw new ConfigException(
|
||||
String.format(
|
||||
"Missing kraft bootstrap servers. Must specify a value for %s.",
|
||||
QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
// The request manager wasn't created using the bootstrap servers
|
||||
// create it using the voters static configuration
|
||||
List<Node> bootstrapNodes = voterAddresses
|
||||
|
|
|
@ -48,11 +48,7 @@ import java.util.stream.Stream;
|
|||
public final class VoterSet {
|
||||
private final Map<Integer, VoterNode> voters;
|
||||
|
||||
public VoterSet(Map<Integer, VoterNode> voters) {
|
||||
if (voters.isEmpty()) {
|
||||
throw new IllegalArgumentException("Voters cannot be empty");
|
||||
}
|
||||
|
||||
private VoterSet(Map<Integer, VoterNode> voters) {
|
||||
this.voters = voters;
|
||||
}
|
||||
|
||||
|
@ -409,6 +405,11 @@ public final class VoterSet {
|
|||
}
|
||||
}
|
||||
|
||||
private static final VoterSet EMPTY = new VoterSet(Collections.emptyMap());
|
||||
public static VoterSet empty() {
|
||||
return EMPTY;
|
||||
}
|
||||
|
||||
/**
|
||||
* Converts a {@code VotersRecord} to a {@code VoterSet}.
|
||||
*
|
||||
|
|
|
@ -82,7 +82,7 @@ public final class KRaftControlRecordStateMachine {
|
|||
* @param logContext the log context
|
||||
*/
|
||||
public KRaftControlRecordStateMachine(
|
||||
Optional<VoterSet> staticVoterSet,
|
||||
VoterSet staticVoterSet,
|
||||
ReplicatedLog log,
|
||||
RecordSerde<?> serde,
|
||||
BufferSupplier bufferSupplier,
|
||||
|
@ -280,19 +280,25 @@ public final class KRaftControlRecordStateMachine {
|
|||
long currentOffset = overrideOffset.orElse(batch.baseOffset() + offsetDelta);
|
||||
switch (record.type()) {
|
||||
case KRAFT_VOTERS:
|
||||
VoterSet voters = VoterSet.fromVotersRecord((VotersRecord) record.message());
|
||||
logger.info("Latest set of voters is {} at offset {}", voters, currentOffset);
|
||||
synchronized (voterSetHistory) {
|
||||
voterSetHistory.addAt(currentOffset, VoterSet.fromVotersRecord((VotersRecord) record.message()));
|
||||
voterSetHistory.addAt(currentOffset, voters);
|
||||
}
|
||||
break;
|
||||
|
||||
case KRAFT_VERSION:
|
||||
synchronized (kraftVersionHistory) {
|
||||
kraftVersionHistory.addAt(
|
||||
currentOffset,
|
||||
KRaftVersion.fromFeatureLevel(
|
||||
KRaftVersion kraftVersion = KRaftVersion.fromFeatureLevel(
|
||||
((KRaftVersionRecord) record.message()).kRaftVersion()
|
||||
)
|
||||
);
|
||||
logger.info(
|
||||
"Latest {} is {} at offset {}",
|
||||
KRaftVersion.FEATURE_NAME,
|
||||
kraftVersion,
|
||||
currentOffset
|
||||
);
|
||||
synchronized (kraftVersionHistory) {
|
||||
kraftVersionHistory.addAt(currentOffset, kraftVersion);
|
||||
}
|
||||
break;
|
||||
|
||||
|
|
|
@ -29,10 +29,10 @@ import java.util.OptionalLong;
|
|||
* evaluating the latest set of voters.
|
||||
*/
|
||||
public final class VoterSetHistory {
|
||||
private final Optional<VoterSet> staticVoterSet;
|
||||
private final VoterSet staticVoterSet;
|
||||
private final LogHistory<VoterSet> votersHistory = new TreeMapLogHistory<>();
|
||||
|
||||
VoterSetHistory(Optional<VoterSet> staticVoterSet) {
|
||||
VoterSetHistory(VoterSet staticVoterSet) {
|
||||
this.staticVoterSet = staticVoterSet;
|
||||
}
|
||||
|
||||
|
@ -85,13 +85,9 @@ public final class VoterSetHistory {
|
|||
* Returns the latest set of voters.
|
||||
*/
|
||||
public VoterSet lastValue() {
|
||||
Optional<LogHistory.Entry<VoterSet>> result = votersHistory.lastEntry();
|
||||
if (result.isPresent()) {
|
||||
return result.get().value();
|
||||
}
|
||||
|
||||
return staticVoterSet
|
||||
.orElseThrow(() -> new IllegalStateException("No voter set found"));
|
||||
return votersHistory.lastEntry()
|
||||
.map(LogHistory.Entry::value)
|
||||
.orElse(staticVoterSet);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -179,7 +179,6 @@ public class KafkaRaftClientReconfigTest {
|
|||
|
||||
RaftClientTestContext context = new RaftClientTestContext.Builder(local.id(), local.directoryId().get())
|
||||
.withStaticVoters(voters)
|
||||
.withBootstrapSnapshot(Optional.empty())
|
||||
.withUnknownLeader(0)
|
||||
.build();
|
||||
|
||||
|
@ -201,19 +200,6 @@ public class KafkaRaftClientReconfigTest {
|
|||
)
|
||||
);
|
||||
|
||||
// check the bootstrap snapshot exists but is empty
|
||||
assertEquals(BOOTSTRAP_SNAPSHOT_ID, context.log.latestSnapshotId().get());
|
||||
try (SnapshotReader<?> reader = RecordsSnapshotReader.of(
|
||||
context.log.latestSnapshot().get(),
|
||||
context.serde,
|
||||
BufferSupplier.NO_CACHING,
|
||||
KafkaRaftClient.MAX_BATCH_SIZE_BYTES,
|
||||
false
|
||||
)
|
||||
) {
|
||||
SnapshotWriterReaderTest.assertControlSnapshot(expectedBootstrapRecords, reader);
|
||||
}
|
||||
|
||||
// check leader does not write bootstrap records to log
|
||||
context.becomeLeader();
|
||||
|
||||
|
@ -2245,6 +2231,25 @@ public class KafkaRaftClientReconfigTest {
|
|||
assertEquals(voter2.id(), fetchRequest.destination().id());
|
||||
}
|
||||
|
||||
@Test
|
||||
void testObserverDiscoversLeaderWithUnknownVoters() throws Exception {
|
||||
ReplicaKey local = replicaKey(randomeReplicaId(), true);
|
||||
InetSocketAddress bootstrapAdddress = InetSocketAddress.createUnresolved("localhost", 1234);
|
||||
int epoch = 3;
|
||||
|
||||
RaftClientTestContext context = new RaftClientTestContext.Builder(local.id(), local.directoryId().get())
|
||||
.withKip853Rpc(true)
|
||||
.withBootstrapSnapshot(Optional.empty())
|
||||
.withUnknownLeader(epoch)
|
||||
.withBootstrapServers(Optional.of(Collections.singletonList(bootstrapAdddress)))
|
||||
.build();
|
||||
|
||||
context.pollUntilRequest();
|
||||
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
|
||||
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
|
||||
assertEquals(-2, fetchRequest.destination().id());
|
||||
}
|
||||
|
||||
private static void verifyVotersRecord(
|
||||
VoterSet expectedVoterSet,
|
||||
ByteBuffer recordKey,
|
||||
|
|
|
@ -69,6 +69,9 @@ import java.util.stream.Stream;
|
|||
import static java.util.Collections.singletonList;
|
||||
import static org.apache.kafka.raft.RaftClientTestContext.Builder.DEFAULT_ELECTION_TIMEOUT_MS;
|
||||
import static org.apache.kafka.test.TestUtils.assertFutureThrows;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.in;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotEquals;
|
||||
|
@ -793,7 +796,7 @@ public class KafkaRaftClientTest {
|
|||
context.pollUntilRequest();
|
||||
|
||||
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
|
||||
assertTrue(voters.contains(fetchRequest.destination().id()));
|
||||
assertThat(fetchRequest.destination().id(), is(in(voters)));
|
||||
context.assertFetchRequestData(fetchRequest, 0, 0L, 0);
|
||||
|
||||
context.deliverResponse(
|
||||
|
@ -1784,7 +1787,7 @@ public class KafkaRaftClientTest {
|
|||
|
||||
context.pollUntilRequest();
|
||||
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
|
||||
assertTrue(voters.contains(fetchRequest.destination().id()));
|
||||
assertThat(fetchRequest.destination().id(), is(in(voters)));
|
||||
context.assertFetchRequestData(fetchRequest, 0, 0L, 0);
|
||||
|
||||
context.deliverResponse(
|
||||
|
@ -1810,7 +1813,7 @@ public class KafkaRaftClientTest {
|
|||
.collect(Collectors.toList());
|
||||
|
||||
RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters)
|
||||
.withBootstrapServers(bootstrapServers)
|
||||
.withBootstrapServers(Optional.of(bootstrapServers))
|
||||
.withKip853Rpc(withKip853Rpc)
|
||||
.build();
|
||||
|
||||
|
@ -1857,7 +1860,7 @@ public class KafkaRaftClientTest {
|
|||
.collect(Collectors.toList());
|
||||
|
||||
RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters)
|
||||
.withBootstrapServers(bootstrapServers)
|
||||
.withBootstrapServers(Optional.of(bootstrapServers))
|
||||
.withKip853Rpc(withKip853Rpc)
|
||||
.build();
|
||||
|
||||
|
@ -1900,7 +1903,7 @@ public class KafkaRaftClientTest {
|
|||
.collect(Collectors.toList());
|
||||
|
||||
RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters)
|
||||
.withBootstrapServers(bootstrapServers)
|
||||
.withBootstrapServers(Optional.of(bootstrapServers))
|
||||
.withKip853Rpc(withKip853Rpc)
|
||||
.build();
|
||||
|
||||
|
@ -1974,7 +1977,7 @@ public class KafkaRaftClientTest {
|
|||
.collect(Collectors.toList());
|
||||
|
||||
RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters)
|
||||
.withBootstrapServers(bootstrapServers)
|
||||
.withBootstrapServers(Optional.of(bootstrapServers))
|
||||
.withKip853Rpc(withKip853Rpc)
|
||||
.build();
|
||||
|
||||
|
@ -2631,7 +2634,7 @@ public class KafkaRaftClientTest {
|
|||
.collect(Collectors.toList());
|
||||
|
||||
RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters)
|
||||
.withBootstrapServers(bootstrapServers)
|
||||
.withBootstrapServers(Optional.of(bootstrapServers))
|
||||
.withKip853Rpc(withKip853Rpc)
|
||||
.build();
|
||||
|
||||
|
@ -2682,7 +2685,7 @@ public class KafkaRaftClientTest {
|
|||
.collect(Collectors.toList());
|
||||
|
||||
RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters)
|
||||
.withBootstrapServers(bootstrapServers)
|
||||
.withBootstrapServers(Optional.of(bootstrapServers))
|
||||
.withKip853Rpc(withKip853Rpc)
|
||||
.build();
|
||||
|
||||
|
@ -4208,7 +4211,7 @@ public class KafkaRaftClientTest {
|
|||
.collect(Collectors.toList());
|
||||
|
||||
RaftClientTestContext context = new RaftClientTestContext.Builder(OptionalInt.empty(), voters)
|
||||
.withBootstrapServers(bootstrapServers)
|
||||
.withBootstrapServers(Optional.of(bootstrapServers))
|
||||
.withKip853Rpc(withKip853Rpc)
|
||||
.build();
|
||||
|
||||
|
|
|
@ -168,9 +168,9 @@ public final class RaftClientTestContext {
|
|||
private int electionTimeoutMs = DEFAULT_ELECTION_TIMEOUT_MS;
|
||||
private int appendLingerMs = DEFAULT_APPEND_LINGER_MS;
|
||||
private MemoryPool memoryPool = MemoryPool.NONE;
|
||||
private List<InetSocketAddress> bootstrapServers = Collections.emptyList();
|
||||
private Optional<List<InetSocketAddress>> bootstrapServers = Optional.empty();
|
||||
private boolean kip853Rpc = false;
|
||||
private Optional<VoterSet> startingVoters = Optional.empty();
|
||||
private VoterSet startingVoters = VoterSet.empty();
|
||||
private Endpoints localListeners = Endpoints.empty();
|
||||
private boolean isStartingVotersStatic = false;
|
||||
|
||||
|
@ -193,15 +193,7 @@ public final class RaftClientTestContext {
|
|||
this.localDirectoryId = localDirectoryId;
|
||||
}
|
||||
|
||||
private static IllegalStateException missingStartingVoterException() {
|
||||
return new IllegalStateException(
|
||||
"The starting voter set must be set with withStaticVoters or withBootstrapSnapshot"
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
Builder withElectedLeader(int epoch, int leaderId) {
|
||||
VoterSet startingVoters = this.startingVoters.orElseThrow(Builder::missingStartingVoterException);
|
||||
quorumStateStore.writeElectionState(
|
||||
ElectionState.withElectedLeader(epoch, leaderId, startingVoters.voterIds()),
|
||||
kraftVersion
|
||||
|
@ -210,7 +202,6 @@ public final class RaftClientTestContext {
|
|||
}
|
||||
|
||||
Builder withUnknownLeader(int epoch) {
|
||||
VoterSet startingVoters = this.startingVoters.orElseThrow(Builder::missingStartingVoterException);
|
||||
quorumStateStore.writeElectionState(
|
||||
ElectionState.withUnknownLeader(epoch, startingVoters.voterIds()),
|
||||
kraftVersion
|
||||
|
@ -219,7 +210,6 @@ public final class RaftClientTestContext {
|
|||
}
|
||||
|
||||
Builder withVotedCandidate(int epoch, ReplicaKey votedKey) {
|
||||
VoterSet startingVoters = this.startingVoters.orElseThrow(Builder::missingStartingVoterException);
|
||||
quorumStateStore.writeElectionState(
|
||||
ElectionState.withVotedCandidate(epoch, votedKey, startingVoters.voterIds()),
|
||||
kraftVersion
|
||||
|
@ -293,7 +283,7 @@ public final class RaftClientTestContext {
|
|||
return this;
|
||||
}
|
||||
|
||||
Builder withBootstrapServers(List<InetSocketAddress> bootstrapServers) {
|
||||
Builder withBootstrapServers(Optional<List<InetSocketAddress>> bootstrapServers) {
|
||||
this.bootstrapServers = bootstrapServers;
|
||||
return this;
|
||||
}
|
||||
|
@ -319,19 +309,20 @@ public final class RaftClientTestContext {
|
|||
}
|
||||
|
||||
Builder withStaticVoters(VoterSet staticVoters) {
|
||||
this.startingVoters = Optional.of(staticVoters);
|
||||
this.isStartingVotersStatic = true;
|
||||
startingVoters = staticVoters;
|
||||
isStartingVotersStatic = true;
|
||||
kraftVersion = KRaftVersion.KRAFT_VERSION_0;
|
||||
|
||||
return this;
|
||||
}
|
||||
|
||||
Builder withBootstrapSnapshot(Optional<VoterSet> voters) {
|
||||
startingVoters = voters.orElse(VoterSet.empty());
|
||||
isStartingVotersStatic = false;
|
||||
|
||||
if (voters.isPresent()) {
|
||||
kraftVersion = KRaftVersion.KRAFT_VERSION_1;
|
||||
|
||||
startingVoters = voters;
|
||||
isStartingVotersStatic = false;
|
||||
|
||||
RecordsSnapshotWriter.Builder builder = new RecordsSnapshotWriter.Builder()
|
||||
.setRawSnapshotWriter(
|
||||
log.createNewSnapshotUnchecked(Snapshots.BOOTSTRAP_SNAPSHOT_ID).get()
|
||||
|
@ -357,8 +348,6 @@ public final class RaftClientTestContext {
|
|||
}
|
||||
|
||||
public RaftClientTestContext build() throws IOException {
|
||||
VoterSet startingVoters = this.startingVoters.orElseThrow(Builder::missingStartingVoterException);
|
||||
|
||||
Metrics metrics = new Metrics(time);
|
||||
MockNetworkChannel channel = new MockNetworkChannel();
|
||||
MockListener listener = new MockListener(localId);
|
||||
|
@ -395,6 +384,18 @@ public final class RaftClientTestContext {
|
|||
appendLingerMs
|
||||
);
|
||||
|
||||
List<InetSocketAddress> computedBootstrapServers = bootstrapServers.orElseGet(() -> {
|
||||
if (isStartingVotersStatic) {
|
||||
return Collections.emptyList();
|
||||
} else {
|
||||
return startingVoters
|
||||
.voterNodes(startingVoters.voterIds().stream(), channel.listenerName())
|
||||
.stream()
|
||||
.map(node -> InetSocketAddress.createUnresolved(node.host(), node.port()))
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
});
|
||||
|
||||
KafkaRaftClient<String> client = new KafkaRaftClient<>(
|
||||
localId,
|
||||
localDirectoryId,
|
||||
|
@ -407,7 +408,7 @@ public final class RaftClientTestContext {
|
|||
new MockExpirationService(time),
|
||||
FETCH_MAX_WAIT_MS,
|
||||
clusterId,
|
||||
bootstrapServers,
|
||||
computedBootstrapServers,
|
||||
localListeners,
|
||||
Features.KRAFT_VERSION.supportedVersionRange(),
|
||||
logContext,
|
||||
|
@ -436,7 +437,7 @@ public final class RaftClientTestContext {
|
|||
startingVoters,
|
||||
IntStream
|
||||
.iterate(-2, id -> id - 1)
|
||||
.limit(bootstrapServers.size())
|
||||
.limit(bootstrapServers.map(List::size).orElse(0))
|
||||
.boxed()
|
||||
.collect(Collectors.toSet()),
|
||||
kip853Rpc,
|
||||
|
|
|
@ -48,12 +48,12 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
|
|||
public final class VoterSetTest {
|
||||
@Test
|
||||
void testEmptyVoterSet() {
|
||||
assertThrows(IllegalArgumentException.class, () -> new VoterSet(Collections.emptyMap()));
|
||||
assertEquals(VoterSet.empty(), VoterSet.fromMap(Collections.emptyMap()));
|
||||
}
|
||||
|
||||
@Test
|
||||
void testVoterNode() {
|
||||
VoterSet voterSet = new VoterSet(voterMap(IntStream.of(1, 2, 3), true));
|
||||
VoterSet voterSet = VoterSet.fromMap(voterMap(IntStream.of(1, 2, 3), true));
|
||||
assertEquals(
|
||||
Optional.of(new Node(1, "localhost", 9991)),
|
||||
voterSet.voterNode(1, DEFAULT_LISTENER_NAME)
|
||||
|
@ -64,7 +64,7 @@ public final class VoterSetTest {
|
|||
|
||||
@Test
|
||||
void testVoterNodes() {
|
||||
VoterSet voterSet = new VoterSet(voterMap(IntStream.of(1, 2, 3), true));
|
||||
VoterSet voterSet = VoterSet.fromMap(voterMap(IntStream.of(1, 2, 3), true));
|
||||
|
||||
assertEquals(
|
||||
Utils.mkSet(new Node(1, "localhost", 9991), new Node(2, "localhost", 9992)),
|
||||
|
@ -84,33 +84,33 @@ public final class VoterSetTest {
|
|||
|
||||
@Test
|
||||
void testVoterIds() {
|
||||
VoterSet voterSet = new VoterSet(voterMap(IntStream.of(1, 2, 3), true));
|
||||
VoterSet voterSet = VoterSet.fromMap(voterMap(IntStream.of(1, 2, 3), true));
|
||||
assertEquals(new HashSet<>(Arrays.asList(1, 2, 3)), voterSet.voterIds());
|
||||
}
|
||||
|
||||
@Test
|
||||
void testAddVoter() {
|
||||
Map<Integer, VoterSet.VoterNode> aVoterMap = voterMap(IntStream.of(1, 2, 3), true);
|
||||
VoterSet voterSet = new VoterSet(new HashMap<>(aVoterMap));
|
||||
VoterSet voterSet = VoterSet.fromMap(new HashMap<>(aVoterMap));
|
||||
|
||||
assertEquals(Optional.empty(), voterSet.addVoter(voterNode(1, true)));
|
||||
|
||||
VoterSet.VoterNode voter4 = voterNode(4, true);
|
||||
aVoterMap.put(voter4.voterKey().id(), voter4);
|
||||
assertEquals(Optional.of(new VoterSet(new HashMap<>(aVoterMap))), voterSet.addVoter(voter4));
|
||||
assertEquals(Optional.of(VoterSet.fromMap(new HashMap<>(aVoterMap))), voterSet.addVoter(voter4));
|
||||
}
|
||||
|
||||
@Test
|
||||
void testRemoveVoter() {
|
||||
Map<Integer, VoterSet.VoterNode> aVoterMap = voterMap(IntStream.of(1, 2, 3), true);
|
||||
VoterSet voterSet = new VoterSet(new HashMap<>(aVoterMap));
|
||||
VoterSet voterSet = VoterSet.fromMap(new HashMap<>(aVoterMap));
|
||||
|
||||
assertEquals(Optional.empty(), voterSet.removeVoter(ReplicaKey.of(4, ReplicaKey.NO_DIRECTORY_ID)));
|
||||
assertEquals(Optional.empty(), voterSet.removeVoter(ReplicaKey.of(4, Uuid.randomUuid())));
|
||||
|
||||
VoterSet.VoterNode voter3 = aVoterMap.remove(3);
|
||||
assertEquals(
|
||||
Optional.of(new VoterSet(new HashMap<>(aVoterMap))),
|
||||
Optional.of(VoterSet.fromMap(new HashMap<>(aVoterMap))),
|
||||
voterSet.removeVoter(voter3.voterKey())
|
||||
);
|
||||
}
|
||||
|
@ -118,7 +118,7 @@ public final class VoterSetTest {
|
|||
@Test
|
||||
void testUpdateVoter() {
|
||||
Map<Integer, VoterSet.VoterNode> aVoterMap = voterMap(IntStream.of(1, 2, 3), true);
|
||||
VoterSet voterSet = new VoterSet(new HashMap<>(aVoterMap));
|
||||
VoterSet voterSet = VoterSet.fromMap(new HashMap<>(aVoterMap));
|
||||
|
||||
assertEquals(Optional.empty(), voterSet.updateVoter(voterNode(4, true)));
|
||||
assertFalse(voterSet.voterNodeNeedsUpdate(voterNode(4, true)));
|
||||
|
@ -140,7 +140,7 @@ public final class VoterSetTest {
|
|||
|
||||
assertTrue(voterSet.voterNodeNeedsUpdate(newVoter3));
|
||||
assertEquals(
|
||||
Optional.of(new VoterSet(new HashMap<>(aVoterMap))),
|
||||
Optional.of(VoterSet.fromMap(new HashMap<>(aVoterMap))),
|
||||
voterSet.updateVoter(newVoter3)
|
||||
);
|
||||
}
|
||||
|
@ -149,7 +149,7 @@ public final class VoterSetTest {
|
|||
@Test
|
||||
void testCannotRemoveToEmptyVoterSet() {
|
||||
Map<Integer, VoterSet.VoterNode> aVoterMap = voterMap(IntStream.of(1), true);
|
||||
VoterSet voterSet = new VoterSet(new HashMap<>(aVoterMap));
|
||||
VoterSet voterSet = VoterSet.fromMap(new HashMap<>(aVoterMap));
|
||||
|
||||
ReplicaKey voter1 = aVoterMap.get(1).voterKey();
|
||||
assertTrue(voterSet.isVoter(voter1));
|
||||
|
@ -159,7 +159,7 @@ public final class VoterSetTest {
|
|||
@Test
|
||||
void testIsVoterWithDirectoryId() {
|
||||
Map<Integer, VoterSet.VoterNode> aVoterMap = voterMap(IntStream.of(1, 2, 3), true);
|
||||
VoterSet voterSet = new VoterSet(new HashMap<>(aVoterMap));
|
||||
VoterSet voterSet = VoterSet.fromMap(new HashMap<>(aVoterMap));
|
||||
|
||||
assertTrue(voterSet.isVoter(aVoterMap.get(1).voterKey()));
|
||||
assertFalse(voterSet.isVoter(ReplicaKey.of(1, Uuid.randomUuid())));
|
||||
|
@ -176,7 +176,7 @@ public final class VoterSetTest {
|
|||
@Test
|
||||
void testIsVoterWithoutDirectoryId() {
|
||||
Map<Integer, VoterSet.VoterNode> aVoterMap = voterMap(IntStream.of(1, 2, 3), false);
|
||||
VoterSet voterSet = new VoterSet(new HashMap<>(aVoterMap));
|
||||
VoterSet voterSet = VoterSet.fromMap(new HashMap<>(aVoterMap));
|
||||
|
||||
assertTrue(voterSet.isVoter(ReplicaKey.of(1, ReplicaKey.NO_DIRECTORY_ID)));
|
||||
assertTrue(voterSet.isVoter(ReplicaKey.of(1, Uuid.randomUuid())));
|
||||
|
@ -212,7 +212,7 @@ public final class VoterSetTest {
|
|||
@ValueSource(booleans = { true, false })
|
||||
void testEndpoints(boolean withDirectoryId) {
|
||||
Map<Integer, VoterSet.VoterNode> aVoterMap = voterMap(IntStream.of(1, 2, 3), withDirectoryId);
|
||||
VoterSet voterSet = new VoterSet(new HashMap<>(aVoterMap));
|
||||
VoterSet voterSet = VoterSet.fromMap(new HashMap<>(aVoterMap));
|
||||
|
||||
assertNotEquals(Endpoints.empty(), voterSet.listeners(1));
|
||||
assertNotEquals(Endpoints.empty(), voterSet.listeners(2));
|
||||
|
@ -223,7 +223,7 @@ public final class VoterSetTest {
|
|||
@Test
|
||||
void testIsOnlyVoterInStandalone() {
|
||||
Map<Integer, VoterSet.VoterNode> aVoterMap = voterMap(IntStream.of(1), true);
|
||||
VoterSet voterSet = new VoterSet(new HashMap<>(aVoterMap));
|
||||
VoterSet voterSet = VoterSet.fromMap(new HashMap<>(aVoterMap));
|
||||
|
||||
assertTrue(voterSet.isOnlyVoter(aVoterMap.get(1).voterKey()));
|
||||
assertFalse(voterSet.isOnlyVoter(ReplicaKey.of(1, Uuid.randomUuid())));
|
||||
|
@ -237,7 +237,7 @@ public final class VoterSetTest {
|
|||
@Test
|
||||
void testIsOnlyVoterInNotStandalone() {
|
||||
Map<Integer, VoterSet.VoterNode> aVoterMap = voterMap(IntStream.of(1, 2), true);
|
||||
VoterSet voterSet = new VoterSet(new HashMap<>(aVoterMap));
|
||||
VoterSet voterSet = VoterSet.fromMap(new HashMap<>(aVoterMap));
|
||||
|
||||
assertFalse(voterSet.isOnlyVoter(aVoterMap.get(1).voterKey()));
|
||||
assertFalse(voterSet.isOnlyVoter(ReplicaKey.of(1, Uuid.randomUuid())));
|
||||
|
@ -253,7 +253,7 @@ public final class VoterSetTest {
|
|||
|
||||
@Test
|
||||
void testRecordRoundTrip() {
|
||||
VoterSet voterSet = new VoterSet(voterMap(IntStream.of(1, 2, 3), true));
|
||||
VoterSet voterSet = VoterSet.fromMap(voterMap(IntStream.of(1, 2, 3), true));
|
||||
|
||||
assertEquals(voterSet, VoterSet.fromVotersRecord(voterSet.toVotersRecord((short) 0)));
|
||||
}
|
||||
|
@ -375,7 +375,7 @@ public final class VoterSetTest {
|
|||
}
|
||||
|
||||
public static VoterSet voterSet(Map<Integer, VoterSet.VoterNode> voters) {
|
||||
return new VoterSet(voters);
|
||||
return VoterSet.fromMap(voters);
|
||||
}
|
||||
|
||||
public static VoterSet voterSet(Stream<ReplicaKey> voterKeys) {
|
||||
|
|
|
@ -44,7 +44,7 @@ final class KRaftControlRecordStateMachineTest {
|
|||
return new MockLog(new TopicPartition("partition", 0), Uuid.randomUuid(), new LogContext());
|
||||
}
|
||||
|
||||
private static KRaftControlRecordStateMachine buildPartitionListener(MockLog log, Optional<VoterSet> staticVoterSet) {
|
||||
private static KRaftControlRecordStateMachine buildPartitionListener(MockLog log, VoterSet staticVoterSet) {
|
||||
return new KRaftControlRecordStateMachine(
|
||||
staticVoterSet,
|
||||
log,
|
||||
|
@ -60,7 +60,7 @@ final class KRaftControlRecordStateMachineTest {
|
|||
MockLog log = buildLog();
|
||||
VoterSet voterSet = VoterSetTest.voterSet(VoterSetTest.voterMap(IntStream.of(1, 2, 3), true));
|
||||
|
||||
KRaftControlRecordStateMachine partitionState = buildPartitionListener(log, Optional.of(voterSet));
|
||||
KRaftControlRecordStateMachine partitionState = buildPartitionListener(log, voterSet);
|
||||
|
||||
// This should be a no-op operation
|
||||
partitionState.updateState();
|
||||
|
@ -68,6 +68,18 @@ final class KRaftControlRecordStateMachineTest {
|
|||
assertEquals(voterSet, partitionState.lastVoterSet());
|
||||
}
|
||||
|
||||
@Test
|
||||
void testEmptyPartitionWithNoStaticVoters() {
|
||||
MockLog log = buildLog();
|
||||
|
||||
KRaftControlRecordStateMachine partitionState = buildPartitionListener(log, VoterSet.empty());
|
||||
|
||||
// This should be a no-op operation
|
||||
partitionState.updateState();
|
||||
|
||||
assertEquals(VoterSet.empty(), partitionState.lastVoterSet());
|
||||
}
|
||||
|
||||
@Test
|
||||
void testUpdateWithoutSnapshot() {
|
||||
MockLog log = buildLog();
|
||||
|
@ -75,7 +87,7 @@ final class KRaftControlRecordStateMachineTest {
|
|||
BufferSupplier bufferSupplier = BufferSupplier.NO_CACHING;
|
||||
int epoch = 1;
|
||||
|
||||
KRaftControlRecordStateMachine partitionState = buildPartitionListener(log, Optional.of(staticVoterSet));
|
||||
KRaftControlRecordStateMachine partitionState = buildPartitionListener(log, staticVoterSet);
|
||||
|
||||
// Append the kraft.version control record
|
||||
KRaftVersion kraftVersion = KRaftVersion.KRAFT_VERSION_1;
|
||||
|
@ -118,7 +130,7 @@ final class KRaftControlRecordStateMachineTest {
|
|||
BufferSupplier bufferSupplier = BufferSupplier.NO_CACHING;
|
||||
int epoch = 1;
|
||||
|
||||
KRaftControlRecordStateMachine partitionState = buildPartitionListener(log, Optional.of(staticVoterSet));
|
||||
KRaftControlRecordStateMachine partitionState = buildPartitionListener(log, staticVoterSet);
|
||||
|
||||
// Create a snapshot that doesn't have any kraft.version or voter set control records
|
||||
RecordsSnapshotWriter.Builder builder = new RecordsSnapshotWriter.Builder()
|
||||
|
@ -168,7 +180,7 @@ final class KRaftControlRecordStateMachineTest {
|
|||
VoterSet staticVoterSet = VoterSetTest.voterSet(VoterSetTest.voterMap(IntStream.of(1, 2, 3), true));
|
||||
int epoch = 1;
|
||||
|
||||
KRaftControlRecordStateMachine partitionState = buildPartitionListener(log, Optional.of(staticVoterSet));
|
||||
KRaftControlRecordStateMachine partitionState = buildPartitionListener(log, staticVoterSet);
|
||||
|
||||
// Create a snapshot that has kraft.version and voter set control records
|
||||
KRaftVersion kraftVersion = KRaftVersion.KRAFT_VERSION_1;
|
||||
|
@ -198,7 +210,7 @@ final class KRaftControlRecordStateMachineTest {
|
|||
BufferSupplier bufferSupplier = BufferSupplier.NO_CACHING;
|
||||
int epoch = 1;
|
||||
|
||||
KRaftControlRecordStateMachine partitionState = buildPartitionListener(log, Optional.of(staticVoterSet));
|
||||
KRaftControlRecordStateMachine partitionState = buildPartitionListener(log, staticVoterSet);
|
||||
|
||||
// Create a snapshot that has kraft.version and voter set control records
|
||||
KRaftVersion kraftVersion = KRaftVersion.KRAFT_VERSION_1;
|
||||
|
@ -245,7 +257,7 @@ final class KRaftControlRecordStateMachineTest {
|
|||
BufferSupplier bufferSupplier = BufferSupplier.NO_CACHING;
|
||||
int epoch = 1;
|
||||
|
||||
KRaftControlRecordStateMachine partitionState = buildPartitionListener(log, Optional.of(staticVoterSet));
|
||||
KRaftControlRecordStateMachine partitionState = buildPartitionListener(log, staticVoterSet);
|
||||
|
||||
// Append the kraft.version control record
|
||||
KRaftVersion kraftVersion = KRaftVersion.KRAFT_VERSION_1;
|
||||
|
@ -313,7 +325,7 @@ final class KRaftControlRecordStateMachineTest {
|
|||
BufferSupplier bufferSupplier = BufferSupplier.NO_CACHING;
|
||||
int epoch = 1;
|
||||
|
||||
KRaftControlRecordStateMachine partitionState = buildPartitionListener(log, Optional.of(staticVoterSet));
|
||||
KRaftControlRecordStateMachine partitionState = buildPartitionListener(log, staticVoterSet);
|
||||
|
||||
// Append the kraft.version control record
|
||||
long kraftVersionOffset = log.endOffset().offset();
|
||||
|
|
|
@ -210,7 +210,7 @@ public final class RecordsIteratorTest {
|
|||
@Test
|
||||
public void testControlRecordIterationWithKraftVersion1() {
|
||||
AtomicReference<ByteBuffer> buffer = new AtomicReference<>(null);
|
||||
VoterSet voterSet = new VoterSet(
|
||||
VoterSet voterSet = VoterSet.fromMap(
|
||||
VoterSetTest.voterMap(IntStream.of(1, 2, 3), true)
|
||||
);
|
||||
RecordsSnapshotWriter.Builder builder = new RecordsSnapshotWriter.Builder()
|
||||
|
|
|
@ -32,8 +32,8 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
|
|||
public final class VoterSetHistoryTest {
|
||||
@Test
|
||||
void testStaticVoterSet() {
|
||||
VoterSet staticVoterSet = new VoterSet(VoterSetTest.voterMap(IntStream.of(1, 2, 3), true));
|
||||
VoterSetHistory votersHistory = new VoterSetHistory(Optional.of(staticVoterSet));
|
||||
VoterSet staticVoterSet = VoterSet.fromMap(VoterSetTest.voterMap(IntStream.of(1, 2, 3), true));
|
||||
VoterSetHistory votersHistory = new VoterSetHistory(staticVoterSet);
|
||||
|
||||
assertEquals(Optional.empty(), votersHistory.valueAtOrBefore(0));
|
||||
assertEquals(Optional.empty(), votersHistory.valueAtOrBefore(100));
|
||||
|
@ -54,27 +54,27 @@ public final class VoterSetHistoryTest {
|
|||
|
||||
@Test
|
||||
void TestNoStaticVoterSet() {
|
||||
VoterSetHistory votersHistory = new VoterSetHistory(Optional.empty());
|
||||
VoterSetHistory votersHistory = new VoterSetHistory(VoterSet.empty());
|
||||
|
||||
assertEquals(Optional.empty(), votersHistory.valueAtOrBefore(0));
|
||||
assertEquals(Optional.empty(), votersHistory.valueAtOrBefore(100));
|
||||
assertThrows(IllegalStateException.class, votersHistory::lastValue);
|
||||
assertEquals(VoterSet.empty(), votersHistory.lastValue());
|
||||
}
|
||||
|
||||
@Test
|
||||
void testAddAt() {
|
||||
Map<Integer, VoterSet.VoterNode> voterMap = VoterSetTest.voterMap(IntStream.of(1, 2, 3), true);
|
||||
VoterSet staticVoterSet = new VoterSet(new HashMap<>(voterMap));
|
||||
VoterSetHistory votersHistory = new VoterSetHistory(Optional.of(staticVoterSet));
|
||||
VoterSet staticVoterSet = VoterSet.fromMap(new HashMap<>(voterMap));
|
||||
VoterSetHistory votersHistory = new VoterSetHistory(staticVoterSet);
|
||||
|
||||
assertThrows(
|
||||
IllegalArgumentException.class,
|
||||
() -> votersHistory.addAt(-2, new VoterSet(VoterSetTest.voterMap(IntStream.of(1, 2, 3), true)))
|
||||
() -> votersHistory.addAt(-2, VoterSet.fromMap(VoterSetTest.voterMap(IntStream.of(1, 2, 3), true)))
|
||||
);
|
||||
assertEquals(staticVoterSet, votersHistory.lastValue());
|
||||
|
||||
voterMap.put(4, VoterSetTest.voterNode(4, true));
|
||||
VoterSet addedVoterSet = new VoterSet(new HashMap<>(voterMap));
|
||||
VoterSet addedVoterSet = VoterSet.fromMap(new HashMap<>(voterMap));
|
||||
votersHistory.addAt(100, addedVoterSet);
|
||||
|
||||
assertEquals(addedVoterSet, votersHistory.lastValue());
|
||||
|
@ -82,7 +82,7 @@ public final class VoterSetHistoryTest {
|
|||
assertEquals(Optional.of(addedVoterSet), votersHistory.valueAtOrBefore(100));
|
||||
|
||||
voterMap.remove(4);
|
||||
VoterSet removedVoterSet = new VoterSet(new HashMap<>(voterMap));
|
||||
VoterSet removedVoterSet = VoterSet.fromMap(new HashMap<>(voterMap));
|
||||
votersHistory.addAt(200, removedVoterSet);
|
||||
|
||||
assertEquals(removedVoterSet, votersHistory.lastValue());
|
||||
|
@ -94,8 +94,8 @@ public final class VoterSetHistoryTest {
|
|||
@Test
|
||||
void testBootstrapAddAt() {
|
||||
Map<Integer, VoterSet.VoterNode> voterMap = VoterSetTest.voterMap(IntStream.of(1, 2, 3), true);
|
||||
VoterSet bootstrapVoterSet = new VoterSet(new HashMap<>(voterMap));
|
||||
VoterSetHistory votersHistory = new VoterSetHistory(Optional.empty());
|
||||
VoterSet bootstrapVoterSet = VoterSet.fromMap(new HashMap<>(voterMap));
|
||||
VoterSetHistory votersHistory = new VoterSetHistory(VoterSet.empty());
|
||||
|
||||
votersHistory.addAt(-1, bootstrapVoterSet);
|
||||
assertEquals(bootstrapVoterSet, votersHistory.lastValue());
|
||||
|
@ -103,7 +103,7 @@ public final class VoterSetHistoryTest {
|
|||
assertEquals(Optional.of(bootstrapVoterSet), votersHistory.valueAtOrBefore(-1));
|
||||
|
||||
voterMap.put(4, VoterSetTest.voterNode(4, true));
|
||||
VoterSet addedVoterSet = new VoterSet(new HashMap<>(voterMap));
|
||||
VoterSet addedVoterSet = VoterSet.fromMap(new HashMap<>(voterMap));
|
||||
votersHistory.addAt(100, addedVoterSet);
|
||||
|
||||
assertEquals(addedVoterSet, votersHistory.lastValue());
|
||||
|
@ -112,7 +112,7 @@ public final class VoterSetHistoryTest {
|
|||
assertEquals(Optional.of(addedVoterSet), votersHistory.valueAtOrBefore(100));
|
||||
|
||||
voterMap.remove(4);
|
||||
VoterSet removedVoterSet = new VoterSet(new HashMap<>(voterMap));
|
||||
VoterSet removedVoterSet = VoterSet.fromMap(new HashMap<>(voterMap));
|
||||
votersHistory.addAt(200, removedVoterSet);
|
||||
|
||||
assertEquals(removedVoterSet, votersHistory.lastValue());
|
||||
|
@ -124,10 +124,10 @@ public final class VoterSetHistoryTest {
|
|||
|
||||
@Test
|
||||
void testAddAtNonOverlapping() {
|
||||
VoterSetHistory votersHistory = new VoterSetHistory(Optional.empty());
|
||||
VoterSetHistory votersHistory = new VoterSetHistory(VoterSet.empty());
|
||||
|
||||
Map<Integer, VoterSet.VoterNode> voterMap = VoterSetTest.voterMap(IntStream.of(1, 2, 3), true);
|
||||
VoterSet voterSet = new VoterSet(new HashMap<>(voterMap));
|
||||
VoterSet voterSet = VoterSet.fromMap(new HashMap<>(voterMap));
|
||||
|
||||
// Add a starting voter to the history
|
||||
votersHistory.addAt(100, voterSet);
|
||||
|
@ -159,8 +159,8 @@ public final class VoterSetHistoryTest {
|
|||
@Test
|
||||
void testNonoverlappingFromStaticVoterSet() {
|
||||
Map<Integer, VoterSet.VoterNode> voterMap = VoterSetTest.voterMap(IntStream.of(1, 2, 3), true);
|
||||
VoterSet staticVoterSet = new VoterSet(new HashMap<>(voterMap));
|
||||
VoterSetHistory votersHistory = new VoterSetHistory(Optional.empty());
|
||||
VoterSet staticVoterSet = VoterSet.fromMap(new HashMap<>(voterMap));
|
||||
VoterSetHistory votersHistory = new VoterSetHistory(VoterSet.empty());
|
||||
|
||||
// Remove voter so that it doesn't overlap
|
||||
VoterSet nonoverlappingRemovedSet = staticVoterSet
|
||||
|
@ -174,17 +174,17 @@ public final class VoterSetHistoryTest {
|
|||
@Test
|
||||
void testTruncateTo() {
|
||||
Map<Integer, VoterSet.VoterNode> voterMap = VoterSetTest.voterMap(IntStream.of(1, 2, 3), true);
|
||||
VoterSet staticVoterSet = new VoterSet(new HashMap<>(voterMap));
|
||||
VoterSetHistory votersHistory = new VoterSetHistory(Optional.of(staticVoterSet));
|
||||
VoterSet staticVoterSet = VoterSet.fromMap(new HashMap<>(voterMap));
|
||||
VoterSetHistory votersHistory = new VoterSetHistory(staticVoterSet);
|
||||
|
||||
// Add voter 4 to the voter set and voter set history
|
||||
voterMap.put(4, VoterSetTest.voterNode(4, true));
|
||||
VoterSet voterSet1234 = new VoterSet(new HashMap<>(voterMap));
|
||||
VoterSet voterSet1234 = VoterSet.fromMap(new HashMap<>(voterMap));
|
||||
votersHistory.addAt(100, voterSet1234);
|
||||
|
||||
// Add voter 5 to the voter set and voter set history
|
||||
voterMap.put(5, VoterSetTest.voterNode(5, true));
|
||||
VoterSet voterSet12345 = new VoterSet(new HashMap<>(voterMap));
|
||||
VoterSet voterSet12345 = VoterSet.fromMap(new HashMap<>(voterMap));
|
||||
votersHistory.addAt(200, voterSet12345);
|
||||
|
||||
votersHistory.truncateNewEntries(201);
|
||||
|
@ -200,17 +200,17 @@ public final class VoterSetHistoryTest {
|
|||
@Test
|
||||
void testTrimPrefixTo() {
|
||||
Map<Integer, VoterSet.VoterNode> voterMap = VoterSetTest.voterMap(IntStream.of(1, 2, 3), true);
|
||||
VoterSet staticVoterSet = new VoterSet(new HashMap<>(voterMap));
|
||||
VoterSetHistory votersHistory = new VoterSetHistory(Optional.of(staticVoterSet));
|
||||
VoterSet staticVoterSet = VoterSet.fromMap(new HashMap<>(voterMap));
|
||||
VoterSetHistory votersHistory = new VoterSetHistory(staticVoterSet);
|
||||
|
||||
// Add voter 4 to the voter set and voter set history
|
||||
voterMap.put(4, VoterSetTest.voterNode(4, true));
|
||||
VoterSet voterSet1234 = new VoterSet(new HashMap<>(voterMap));
|
||||
VoterSet voterSet1234 = VoterSet.fromMap(new HashMap<>(voterMap));
|
||||
votersHistory.addAt(100, voterSet1234);
|
||||
|
||||
// Add voter 5 to the voter set and voter set history
|
||||
voterMap.put(5, VoterSetTest.voterNode(5, true));
|
||||
VoterSet voterSet12345 = new VoterSet(new HashMap<>(voterMap));
|
||||
VoterSet voterSet12345 = VoterSet.fromMap(new HashMap<>(voterMap));
|
||||
votersHistory.addAt(200, voterSet12345);
|
||||
|
||||
votersHistory.truncateOldEntries(99);
|
||||
|
@ -233,17 +233,17 @@ public final class VoterSetHistoryTest {
|
|||
@Test
|
||||
void testClear() {
|
||||
Map<Integer, VoterSet.VoterNode> voterMap = VoterSetTest.voterMap(IntStream.of(1, 2, 3), true);
|
||||
VoterSet staticVoterSet = new VoterSet(new HashMap<>(voterMap));
|
||||
VoterSetHistory votersHistory = new VoterSetHistory(Optional.of(staticVoterSet));
|
||||
VoterSet staticVoterSet = VoterSet.fromMap(new HashMap<>(voterMap));
|
||||
VoterSetHistory votersHistory = new VoterSetHistory(staticVoterSet);
|
||||
|
||||
// Add voter 4 to the voter set and voter set history
|
||||
voterMap.put(4, VoterSetTest.voterNode(4, true));
|
||||
VoterSet voterSet1234 = new VoterSet(new HashMap<>(voterMap));
|
||||
VoterSet voterSet1234 = VoterSet.fromMap(new HashMap<>(voterMap));
|
||||
votersHistory.addAt(100, voterSet1234);
|
||||
|
||||
// Add voter 5 to the voter set and voter set history
|
||||
voterMap.put(5, VoterSetTest.voterNode(5, true));
|
||||
VoterSet voterSet12345 = new VoterSet(new HashMap<>(voterMap));
|
||||
VoterSet voterSet12345 = VoterSet.fromMap(new HashMap<>(voterMap));
|
||||
votersHistory.addAt(200, voterSet12345);
|
||||
|
||||
votersHistory.clear();
|
||||
|
|
Loading…
Reference in New Issue