diff --git a/build.gradle b/build.gradle index 41229843eec..28ff4495e2d 100644 --- a/build.gradle +++ b/build.gradle @@ -162,8 +162,8 @@ allprojects { // ZooKeeper (potentially older and containing CVEs) libs.nettyHandler, libs.nettyTransportNativeEpoll, - // be explicit about the reload4j version instead of relying on the transitive versions - libs.reload4j + // be explicit about the reload4j version instead of relying on the transitive versions + libs.reload4j ) } } @@ -734,7 +734,7 @@ subprojects { jacoco { toolVersion = versions.jacoco } - + jacocoTestReport { dependsOn tasks.test sourceSets sourceSets.main @@ -758,8 +758,8 @@ subprojects { skipProjects = [ ":jmh-benchmarks", ":trogdor" ] skipConfigurations = [ "zinc" ] } - // the task `removeUnusedImports` is implemented by google-java-format, - // and unfortunately the google-java-format version used by spotless 6.14.0 can't work with JDK 21. + // the task `removeUnusedImports` is implemented by google-java-format, + // and unfortunately the google-java-format version used by spotless 6.14.0 can't work with JDK 21. // Hence, we apply spotless tasks only if the env is either JDK11 or JDK17 if ((JavaVersion.current().isJava11() || (JavaVersion.current() == JavaVersion.VERSION_17))) { apply plugin: 'com.diffplug.spotless' @@ -1192,7 +1192,7 @@ project(':core') { //By default gradle does not handle test dependencies between the sub-projects //This line is to include clients project test jar to dependant-testlibs from (project(':clients').testJar ) { "$buildDir/dependant-testlibs" } - // log4j-appender is not in core dependencies, + // log4j-appender is not in core dependencies, // so we add it to dependant-testlibs to avoid ClassNotFoundException in running kafka_log4j_appender.py from (project(':log4j-appender').jar ) { "$buildDir/dependant-testlibs" } duplicatesStrategy 'exclude' @@ -1442,7 +1442,7 @@ project(':transaction-coordinator') { implementation project(':clients') generator project(':generator') } - + sourceSets { main { java { @@ -1717,6 +1717,7 @@ project(':raft') { testImplementation libs.junitJupiter testImplementation libs.mockitoCore testImplementation libs.jqwik + testImplementation libs.hamcrest testRuntimeOnly libs.slf4jReload4j testRuntimeOnly libs.junitPlatformLanucher diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 7868b1bc216..39fbe283698 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -890,30 +890,34 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _]) // validate KRaft-related configs val voterIds = QuorumConfig.parseVoterIds(quorumVoters) - def validateNonEmptyQuorumVotersForKRaft(): Unit = { - if (voterIds.isEmpty) { - throw new ConfigException(s"If using ${KRaftConfigs.PROCESS_ROLES_CONFIG}, ${QuorumConfig.QUORUM_VOTERS_CONFIG} must contain a parseable set of voters.") + def validateQuorumVotersAndQuorumBootstrapServerForKRaft(): Unit = { + if (voterIds.isEmpty && quorumBootstrapServers.isEmpty) { + throw new ConfigException( + s"""If using ${KRaftConfigs.PROCESS_ROLES_CONFIG}, either ${QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG} must + |contain the set of bootstrap controllers or ${QuorumConfig.QUORUM_VOTERS_CONFIG} must contain a parseable + |set of controllers.""".stripMargin.replace("\n", " ") + ) } } - def validateNonEmptyQuorumVotersForMigration(): Unit = { - if (voterIds.isEmpty) { - throw new ConfigException(s"If using ${KRaftConfigs.MIGRATION_ENABLED_CONFIG}, ${QuorumConfig.QUORUM_VOTERS_CONFIG} must contain a parseable set of voters.") + def validateQuorumVotersAndQuorumBootstrapServerForMigration(): Unit = { + if (voterIds.isEmpty && quorumBootstrapServers.isEmpty) { + throw new ConfigException( + s"""If using ${KRaftConfigs.MIGRATION_ENABLED_CONFIG}, either ${QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG} must + |contain the set of bootstrap controllers or ${QuorumConfig.QUORUM_VOTERS_CONFIG} must contain a parseable + |set of controllers.""".stripMargin.replace("\n", " ") + ) } } def validateControlPlaneListenerEmptyForKRaft(): Unit = { require(controlPlaneListenerName.isEmpty, s"${SocketServerConfigs.CONTROL_PLANE_LISTENER_NAME_CONFIG} is not supported in KRaft mode.") } - def validateAdvertisedListenersDoesNotContainControllerListenersForKRaftBroker(): Unit = { - require(advertisedBrokerListenerNames.forall(aln => !controllerListenerNames.contains(aln.value())), - s"The advertised.listeners config must not contain KRaft controller listeners from ${KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG} when ${KRaftConfigs.PROCESS_ROLES_CONFIG} contains the broker role because Kafka clients that send requests via advertised listeners do not send requests to KRaft controllers -- they only send requests to KRaft brokers.") - } def validateControllerQuorumVotersMustContainNodeIdForKRaftController(): Unit = { - require(voterIds.contains(nodeId), + require(voterIds.isEmpty || voterIds.contains(nodeId), s"If ${KRaftConfigs.PROCESS_ROLES_CONFIG} contains the 'controller' role, the node id $nodeId must be included in the set of voters ${QuorumConfig.QUORUM_VOTERS_CONFIG}=${voterIds.asScala.toSet}") } - def validateControllerListenerExistsForKRaftController(): Unit = { - require(controllerListeners.nonEmpty, + def validateAdvertisedControllerListenersNonEmptyForKRaftController(): Unit = { + require(effectiveAdvertisedControllerListeners.nonEmpty, s"${KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG} must contain at least one value appearing in the '${SocketServerConfigs.LISTENERS_CONFIG}' configuration when running the KRaft controller role") } def validateControllerListenerNamesMustAppearInListenersForKRaftController(): Unit = { @@ -921,16 +925,15 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _]) require(controllerListenerNames.forall(cln => listenerNameValues.contains(cln)), s"${KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG} must only contain values appearing in the '${SocketServerConfigs.LISTENERS_CONFIG}' configuration when running the KRaft controller role") } - def validateAdvertisedListenersNonEmptyForBroker(): Unit = { + def validateAdvertisedBrokerListenersNonEmptyForBroker(): Unit = { require(advertisedBrokerListenerNames.nonEmpty, - "There must be at least one advertised listener." + ( + "There must be at least one broker advertised listener." + ( if (processRoles.contains(ProcessRole.BrokerRole)) s" Perhaps all listeners appear in ${KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG}?" else "")) } if (processRoles == Set(ProcessRole.BrokerRole)) { // KRaft broker-only - validateNonEmptyQuorumVotersForKRaft() + validateQuorumVotersAndQuorumBootstrapServerForKRaft() validateControlPlaneListenerEmptyForKRaft() - validateAdvertisedListenersDoesNotContainControllerListenersForKRaftBroker() // nodeId must not appear in controller.quorum.voters require(!voterIds.contains(nodeId), s"If ${KRaftConfigs.PROCESS_ROLES_CONFIG} contains just the 'broker' role, the node id $nodeId must not be included in the set of voters ${QuorumConfig.QUORUM_VOTERS_CONFIG}=${voterIds.asScala.toSet}") @@ -952,10 +955,9 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _]) if (controllerListenerNames.size > 1) { warn(s"${KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG} has multiple entries; only the first will be used since ${KRaftConfigs.PROCESS_ROLES_CONFIG}=broker: ${controllerListenerNames.asJava}") } - validateAdvertisedListenersNonEmptyForBroker() } else if (processRoles == Set(ProcessRole.ControllerRole)) { // KRaft controller-only - validateNonEmptyQuorumVotersForKRaft() + validateQuorumVotersAndQuorumBootstrapServerForKRaft() validateControlPlaneListenerEmptyForKRaft() // listeners should only contain listeners also enumerated in the controller listener require( @@ -963,21 +965,19 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _]) s"The ${SocketServerConfigs.LISTENERS_CONFIG} config must only contain KRaft controller listeners from ${KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG} when ${KRaftConfigs.PROCESS_ROLES_CONFIG}=controller" ) validateControllerQuorumVotersMustContainNodeIdForKRaftController() - validateControllerListenerExistsForKRaftController() + validateAdvertisedControllerListenersNonEmptyForKRaftController() validateControllerListenerNamesMustAppearInListenersForKRaftController() } else if (isKRaftCombinedMode) { // KRaft combined broker and controller - validateNonEmptyQuorumVotersForKRaft() + validateQuorumVotersAndQuorumBootstrapServerForKRaft() validateControlPlaneListenerEmptyForKRaft() - validateAdvertisedListenersDoesNotContainControllerListenersForKRaftBroker() validateControllerQuorumVotersMustContainNodeIdForKRaftController() - validateControllerListenerExistsForKRaftController() + validateAdvertisedControllerListenersNonEmptyForKRaftController() validateControllerListenerNamesMustAppearInListenersForKRaftController() - validateAdvertisedListenersNonEmptyForBroker() } else { // ZK-based if (migrationEnabled) { - validateNonEmptyQuorumVotersForMigration() + validateQuorumVotersAndQuorumBootstrapServerForMigration() require(controllerListenerNames.nonEmpty, s"${KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG} must not be empty when running in ZooKeeper migration mode: ${controllerListenerNames.asJava}") require(interBrokerProtocolVersion.isMigrationSupported, s"Cannot enable ZooKeeper migration without setting " + @@ -992,13 +992,12 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _]) require(controllerListenerNames.isEmpty, s"${KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG} must be empty when not running in KRaft mode: ${controllerListenerNames.asJava}") } - validateAdvertisedListenersNonEmptyForBroker() } val listenerNames = listeners.map(_.listenerName).toSet if (processRoles.isEmpty || processRoles.contains(ProcessRole.BrokerRole)) { // validations for all broker setups (i.e. ZooKeeper and KRaft broker-only and KRaft co-located) - validateAdvertisedListenersNonEmptyForBroker() + validateAdvertisedBrokerListenersNonEmptyForBroker() require(advertisedBrokerListenerNames.contains(interBrokerListenerName), s"${ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG} must be a listener name defined in ${SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG}. " + s"The valid options based on currently configured listeners are ${advertisedBrokerListenerNames.map(_.value).mkString(",")}") diff --git a/core/src/test/scala/unit/kafka/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/KafkaConfigTest.scala index f3e750d4659..07b52dd3c65 100644 --- a/core/src/test/scala/unit/kafka/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/KafkaConfigTest.scala @@ -152,8 +152,12 @@ class KafkaConfigTest { propertiesFile.setProperty(KRaftConfigs.NODE_ID_CONFIG, "1") propertiesFile.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, "") setListenerProps(propertiesFile) - assertBadConfigContainingMessage(propertiesFile, - "If using process.roles, controller.quorum.voters must contain a parseable set of voters.") + assertBadConfigContainingMessage( + propertiesFile, + """If using process.roles, either controller.quorum.bootstrap.servers + |must contain the set of bootstrap controllers or controller.quorum.voters must contain a + |parseable set of controllers.""".stripMargin.replace("\n", " ") + ) // Ensure that if neither process.roles nor controller.quorum.voters is populated, then an exception is thrown if zookeeper.connect is not defined propertiesFile.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "") diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index 56c693473ee..4276dd9cb7d 100755 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -446,7 +446,10 @@ class KafkaConfigTest { props.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, "2@localhost:9093") assertFalse(isValidKafkaConfig(props)) - assertBadConfigContainingMessage(props, "There must be at least one advertised listener. Perhaps all listeners appear in controller.listener.names?") + assertBadConfigContainingMessage( + props, + "There must be at least one broker advertised listener. Perhaps all listeners appear in controller.listener.names?" + ) props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, "PLAINTEXT://localhost:9092,SSL://localhost:9093") KafkaConfig.fromProps(props) @@ -1832,8 +1835,11 @@ class KafkaConfigTest { val props = TestUtils.createBrokerConfig(1, TestUtils.MockZkConnect, port = TestUtils.MockZkPort) props.setProperty(KRaftConfigs.MIGRATION_ENABLED_CONFIG, "true") assertEquals( - "If using zookeeper.metadata.migration.enable, controller.quorum.voters must contain a parseable set of voters.", - assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props)).getMessage) + """If using zookeeper.metadata.migration.enable, either controller.quorum.bootstrap.servers + |must contain the set of bootstrap controllers or controller.quorum.voters must contain a + |parseable set of controllers.""".stripMargin.replace("\n", " "), + assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props)).getMessage + ) props.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, "3000@localhost:9093") assertEquals( diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java index d9afa803829..4d151b3128f 100644 --- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java +++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java @@ -21,6 +21,7 @@ import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.compress.Compression; +import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.errors.ClusterAuthorizationException; import org.apache.kafka.common.errors.NotLeaderOrFollowerException; import org.apache.kafka.common.feature.SupportedVersionRange; @@ -455,9 +456,9 @@ public final class KafkaRaftClient implements RaftClient { QuorumStateStore quorumStateStore, Metrics metrics ) { - Optional staticVoters = voterAddresses.isEmpty() ? - Optional.empty() : - Optional.of(VoterSet.fromInetSocketAddresses(channel.listenerName(), voterAddresses)); + VoterSet staticVoters = voterAddresses.isEmpty() ? + VoterSet.empty() : + VoterSet.fromInetSocketAddresses(channel.listenerName(), voterAddresses); partitionState = new KRaftControlRecordStateMachine( staticVoters, @@ -470,8 +471,18 @@ public final class KafkaRaftClient implements RaftClient { // Read the entire log logger.info("Reading KRaft snapshot and log as part of the initialization"); partitionState.updateState(); + logger.info("Starting voters are {}", partitionState.lastVoterSet()); if (requestManager == null) { + if (voterAddresses.isEmpty()) { + throw new ConfigException( + String.format( + "Missing kraft bootstrap servers. Must specify a value for %s.", + QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG + ) + ); + } + // The request manager wasn't created using the bootstrap servers // create it using the voters static configuration List bootstrapNodes = voterAddresses diff --git a/raft/src/main/java/org/apache/kafka/raft/VoterSet.java b/raft/src/main/java/org/apache/kafka/raft/VoterSet.java index a74d2a49e8f..90813d660d3 100644 --- a/raft/src/main/java/org/apache/kafka/raft/VoterSet.java +++ b/raft/src/main/java/org/apache/kafka/raft/VoterSet.java @@ -48,11 +48,7 @@ import java.util.stream.Stream; public final class VoterSet { private final Map voters; - public VoterSet(Map voters) { - if (voters.isEmpty()) { - throw new IllegalArgumentException("Voters cannot be empty"); - } - + private VoterSet(Map voters) { this.voters = voters; } @@ -409,6 +405,11 @@ public final class VoterSet { } } + private static final VoterSet EMPTY = new VoterSet(Collections.emptyMap()); + public static VoterSet empty() { + return EMPTY; + } + /** * Converts a {@code VotersRecord} to a {@code VoterSet}. * diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java b/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java index 02bc2c823a0..c1d4a0b2f2d 100644 --- a/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java +++ b/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java @@ -82,7 +82,7 @@ public final class KRaftControlRecordStateMachine { * @param logContext the log context */ public KRaftControlRecordStateMachine( - Optional staticVoterSet, + VoterSet staticVoterSet, ReplicatedLog log, RecordSerde serde, BufferSupplier bufferSupplier, @@ -280,19 +280,25 @@ public final class KRaftControlRecordStateMachine { long currentOffset = overrideOffset.orElse(batch.baseOffset() + offsetDelta); switch (record.type()) { case KRAFT_VOTERS: + VoterSet voters = VoterSet.fromVotersRecord((VotersRecord) record.message()); + logger.info("Latest set of voters is {} at offset {}", voters, currentOffset); synchronized (voterSetHistory) { - voterSetHistory.addAt(currentOffset, VoterSet.fromVotersRecord((VotersRecord) record.message())); + voterSetHistory.addAt(currentOffset, voters); } break; case KRAFT_VERSION: + KRaftVersion kraftVersion = KRaftVersion.fromFeatureLevel( + ((KRaftVersionRecord) record.message()).kRaftVersion() + ); + logger.info( + "Latest {} is {} at offset {}", + KRaftVersion.FEATURE_NAME, + kraftVersion, + currentOffset + ); synchronized (kraftVersionHistory) { - kraftVersionHistory.addAt( - currentOffset, - KRaftVersion.fromFeatureLevel( - ((KRaftVersionRecord) record.message()).kRaftVersion() - ) - ); + kraftVersionHistory.addAt(currentOffset, kraftVersion); } break; diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/VoterSetHistory.java b/raft/src/main/java/org/apache/kafka/raft/internals/VoterSetHistory.java index 7f671e6660f..6ab304f8c16 100644 --- a/raft/src/main/java/org/apache/kafka/raft/internals/VoterSetHistory.java +++ b/raft/src/main/java/org/apache/kafka/raft/internals/VoterSetHistory.java @@ -29,10 +29,10 @@ import java.util.OptionalLong; * evaluating the latest set of voters. */ public final class VoterSetHistory { - private final Optional staticVoterSet; + private final VoterSet staticVoterSet; private final LogHistory votersHistory = new TreeMapLogHistory<>(); - VoterSetHistory(Optional staticVoterSet) { + VoterSetHistory(VoterSet staticVoterSet) { this.staticVoterSet = staticVoterSet; } @@ -85,13 +85,9 @@ public final class VoterSetHistory { * Returns the latest set of voters. */ public VoterSet lastValue() { - Optional> result = votersHistory.lastEntry(); - if (result.isPresent()) { - return result.get().value(); - } - - return staticVoterSet - .orElseThrow(() -> new IllegalStateException("No voter set found")); + return votersHistory.lastEntry() + .map(LogHistory.Entry::value) + .orElse(staticVoterSet); } /** diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java index 0d46af767a8..77825f53286 100644 --- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java @@ -179,7 +179,6 @@ public class KafkaRaftClientReconfigTest { RaftClientTestContext context = new RaftClientTestContext.Builder(local.id(), local.directoryId().get()) .withStaticVoters(voters) - .withBootstrapSnapshot(Optional.empty()) .withUnknownLeader(0) .build(); @@ -201,19 +200,6 @@ public class KafkaRaftClientReconfigTest { ) ); - // check the bootstrap snapshot exists but is empty - assertEquals(BOOTSTRAP_SNAPSHOT_ID, context.log.latestSnapshotId().get()); - try (SnapshotReader reader = RecordsSnapshotReader.of( - context.log.latestSnapshot().get(), - context.serde, - BufferSupplier.NO_CACHING, - KafkaRaftClient.MAX_BATCH_SIZE_BYTES, - false - ) - ) { - SnapshotWriterReaderTest.assertControlSnapshot(expectedBootstrapRecords, reader); - } - // check leader does not write bootstrap records to log context.becomeLeader(); @@ -2245,6 +2231,25 @@ public class KafkaRaftClientReconfigTest { assertEquals(voter2.id(), fetchRequest.destination().id()); } + @Test + void testObserverDiscoversLeaderWithUnknownVoters() throws Exception { + ReplicaKey local = replicaKey(randomeReplicaId(), true); + InetSocketAddress bootstrapAdddress = InetSocketAddress.createUnresolved("localhost", 1234); + int epoch = 3; + + RaftClientTestContext context = new RaftClientTestContext.Builder(local.id(), local.directoryId().get()) + .withKip853Rpc(true) + .withBootstrapSnapshot(Optional.empty()) + .withUnknownLeader(epoch) + .withBootstrapServers(Optional.of(Collections.singletonList(bootstrapAdddress))) + .build(); + + context.pollUntilRequest(); + RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest(); + context.assertFetchRequestData(fetchRequest, epoch, 0L, 0); + assertEquals(-2, fetchRequest.destination().id()); + } + private static void verifyVotersRecord( VoterSet expectedVoterSet, ByteBuffer recordKey, diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java index 47aeccb1ae2..c0d50232c44 100644 --- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java @@ -69,6 +69,9 @@ import java.util.stream.Stream; import static java.util.Collections.singletonList; import static org.apache.kafka.raft.RaftClientTestContext.Builder.DEFAULT_ELECTION_TIMEOUT_MS; import static org.apache.kafka.test.TestUtils.assertFutureThrows; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.in; +import static org.hamcrest.Matchers.is; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotEquals; @@ -793,7 +796,7 @@ public class KafkaRaftClientTest { context.pollUntilRequest(); RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest(); - assertTrue(voters.contains(fetchRequest.destination().id())); + assertThat(fetchRequest.destination().id(), is(in(voters))); context.assertFetchRequestData(fetchRequest, 0, 0L, 0); context.deliverResponse( @@ -1784,7 +1787,7 @@ public class KafkaRaftClientTest { context.pollUntilRequest(); RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest(); - assertTrue(voters.contains(fetchRequest.destination().id())); + assertThat(fetchRequest.destination().id(), is(in(voters))); context.assertFetchRequestData(fetchRequest, 0, 0L, 0); context.deliverResponse( @@ -1810,7 +1813,7 @@ public class KafkaRaftClientTest { .collect(Collectors.toList()); RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters) - .withBootstrapServers(bootstrapServers) + .withBootstrapServers(Optional.of(bootstrapServers)) .withKip853Rpc(withKip853Rpc) .build(); @@ -1857,7 +1860,7 @@ public class KafkaRaftClientTest { .collect(Collectors.toList()); RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters) - .withBootstrapServers(bootstrapServers) + .withBootstrapServers(Optional.of(bootstrapServers)) .withKip853Rpc(withKip853Rpc) .build(); @@ -1900,7 +1903,7 @@ public class KafkaRaftClientTest { .collect(Collectors.toList()); RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters) - .withBootstrapServers(bootstrapServers) + .withBootstrapServers(Optional.of(bootstrapServers)) .withKip853Rpc(withKip853Rpc) .build(); @@ -1974,7 +1977,7 @@ public class KafkaRaftClientTest { .collect(Collectors.toList()); RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters) - .withBootstrapServers(bootstrapServers) + .withBootstrapServers(Optional.of(bootstrapServers)) .withKip853Rpc(withKip853Rpc) .build(); @@ -2631,7 +2634,7 @@ public class KafkaRaftClientTest { .collect(Collectors.toList()); RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters) - .withBootstrapServers(bootstrapServers) + .withBootstrapServers(Optional.of(bootstrapServers)) .withKip853Rpc(withKip853Rpc) .build(); @@ -2682,7 +2685,7 @@ public class KafkaRaftClientTest { .collect(Collectors.toList()); RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters) - .withBootstrapServers(bootstrapServers) + .withBootstrapServers(Optional.of(bootstrapServers)) .withKip853Rpc(withKip853Rpc) .build(); @@ -4208,7 +4211,7 @@ public class KafkaRaftClientTest { .collect(Collectors.toList()); RaftClientTestContext context = new RaftClientTestContext.Builder(OptionalInt.empty(), voters) - .withBootstrapServers(bootstrapServers) + .withBootstrapServers(Optional.of(bootstrapServers)) .withKip853Rpc(withKip853Rpc) .build(); diff --git a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java index d12140a6c91..e59513cecbd 100644 --- a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java +++ b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java @@ -168,9 +168,9 @@ public final class RaftClientTestContext { private int electionTimeoutMs = DEFAULT_ELECTION_TIMEOUT_MS; private int appendLingerMs = DEFAULT_APPEND_LINGER_MS; private MemoryPool memoryPool = MemoryPool.NONE; - private List bootstrapServers = Collections.emptyList(); + private Optional> bootstrapServers = Optional.empty(); private boolean kip853Rpc = false; - private Optional startingVoters = Optional.empty(); + private VoterSet startingVoters = VoterSet.empty(); private Endpoints localListeners = Endpoints.empty(); private boolean isStartingVotersStatic = false; @@ -193,15 +193,7 @@ public final class RaftClientTestContext { this.localDirectoryId = localDirectoryId; } - private static IllegalStateException missingStartingVoterException() { - return new IllegalStateException( - "The starting voter set must be set with withStaticVoters or withBootstrapSnapshot" - ); - } - - Builder withElectedLeader(int epoch, int leaderId) { - VoterSet startingVoters = this.startingVoters.orElseThrow(Builder::missingStartingVoterException); quorumStateStore.writeElectionState( ElectionState.withElectedLeader(epoch, leaderId, startingVoters.voterIds()), kraftVersion @@ -210,7 +202,6 @@ public final class RaftClientTestContext { } Builder withUnknownLeader(int epoch) { - VoterSet startingVoters = this.startingVoters.orElseThrow(Builder::missingStartingVoterException); quorumStateStore.writeElectionState( ElectionState.withUnknownLeader(epoch, startingVoters.voterIds()), kraftVersion @@ -219,7 +210,6 @@ public final class RaftClientTestContext { } Builder withVotedCandidate(int epoch, ReplicaKey votedKey) { - VoterSet startingVoters = this.startingVoters.orElseThrow(Builder::missingStartingVoterException); quorumStateStore.writeElectionState( ElectionState.withVotedCandidate(epoch, votedKey, startingVoters.voterIds()), kraftVersion @@ -293,7 +283,7 @@ public final class RaftClientTestContext { return this; } - Builder withBootstrapServers(List bootstrapServers) { + Builder withBootstrapServers(Optional> bootstrapServers) { this.bootstrapServers = bootstrapServers; return this; } @@ -319,19 +309,20 @@ public final class RaftClientTestContext { } Builder withStaticVoters(VoterSet staticVoters) { - this.startingVoters = Optional.of(staticVoters); - this.isStartingVotersStatic = true; + startingVoters = staticVoters; + isStartingVotersStatic = true; + kraftVersion = KRaftVersion.KRAFT_VERSION_0; return this; } Builder withBootstrapSnapshot(Optional voters) { + startingVoters = voters.orElse(VoterSet.empty()); + isStartingVotersStatic = false; + if (voters.isPresent()) { kraftVersion = KRaftVersion.KRAFT_VERSION_1; - startingVoters = voters; - isStartingVotersStatic = false; - RecordsSnapshotWriter.Builder builder = new RecordsSnapshotWriter.Builder() .setRawSnapshotWriter( log.createNewSnapshotUnchecked(Snapshots.BOOTSTRAP_SNAPSHOT_ID).get() @@ -357,8 +348,6 @@ public final class RaftClientTestContext { } public RaftClientTestContext build() throws IOException { - VoterSet startingVoters = this.startingVoters.orElseThrow(Builder::missingStartingVoterException); - Metrics metrics = new Metrics(time); MockNetworkChannel channel = new MockNetworkChannel(); MockListener listener = new MockListener(localId); @@ -395,6 +384,18 @@ public final class RaftClientTestContext { appendLingerMs ); + List computedBootstrapServers = bootstrapServers.orElseGet(() -> { + if (isStartingVotersStatic) { + return Collections.emptyList(); + } else { + return startingVoters + .voterNodes(startingVoters.voterIds().stream(), channel.listenerName()) + .stream() + .map(node -> InetSocketAddress.createUnresolved(node.host(), node.port())) + .collect(Collectors.toList()); + } + }); + KafkaRaftClient client = new KafkaRaftClient<>( localId, localDirectoryId, @@ -407,7 +408,7 @@ public final class RaftClientTestContext { new MockExpirationService(time), FETCH_MAX_WAIT_MS, clusterId, - bootstrapServers, + computedBootstrapServers, localListeners, Features.KRAFT_VERSION.supportedVersionRange(), logContext, @@ -436,7 +437,7 @@ public final class RaftClientTestContext { startingVoters, IntStream .iterate(-2, id -> id - 1) - .limit(bootstrapServers.size()) + .limit(bootstrapServers.map(List::size).orElse(0)) .boxed() .collect(Collectors.toSet()), kip853Rpc, diff --git a/raft/src/test/java/org/apache/kafka/raft/VoterSetTest.java b/raft/src/test/java/org/apache/kafka/raft/VoterSetTest.java index 1f41f4fd22f..26f2dc662b4 100644 --- a/raft/src/test/java/org/apache/kafka/raft/VoterSetTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/VoterSetTest.java @@ -48,12 +48,12 @@ import static org.junit.jupiter.api.Assertions.assertTrue; public final class VoterSetTest { @Test void testEmptyVoterSet() { - assertThrows(IllegalArgumentException.class, () -> new VoterSet(Collections.emptyMap())); + assertEquals(VoterSet.empty(), VoterSet.fromMap(Collections.emptyMap())); } @Test void testVoterNode() { - VoterSet voterSet = new VoterSet(voterMap(IntStream.of(1, 2, 3), true)); + VoterSet voterSet = VoterSet.fromMap(voterMap(IntStream.of(1, 2, 3), true)); assertEquals( Optional.of(new Node(1, "localhost", 9991)), voterSet.voterNode(1, DEFAULT_LISTENER_NAME) @@ -64,7 +64,7 @@ public final class VoterSetTest { @Test void testVoterNodes() { - VoterSet voterSet = new VoterSet(voterMap(IntStream.of(1, 2, 3), true)); + VoterSet voterSet = VoterSet.fromMap(voterMap(IntStream.of(1, 2, 3), true)); assertEquals( Utils.mkSet(new Node(1, "localhost", 9991), new Node(2, "localhost", 9992)), @@ -84,33 +84,33 @@ public final class VoterSetTest { @Test void testVoterIds() { - VoterSet voterSet = new VoterSet(voterMap(IntStream.of(1, 2, 3), true)); + VoterSet voterSet = VoterSet.fromMap(voterMap(IntStream.of(1, 2, 3), true)); assertEquals(new HashSet<>(Arrays.asList(1, 2, 3)), voterSet.voterIds()); } @Test void testAddVoter() { Map aVoterMap = voterMap(IntStream.of(1, 2, 3), true); - VoterSet voterSet = new VoterSet(new HashMap<>(aVoterMap)); + VoterSet voterSet = VoterSet.fromMap(new HashMap<>(aVoterMap)); assertEquals(Optional.empty(), voterSet.addVoter(voterNode(1, true))); VoterSet.VoterNode voter4 = voterNode(4, true); aVoterMap.put(voter4.voterKey().id(), voter4); - assertEquals(Optional.of(new VoterSet(new HashMap<>(aVoterMap))), voterSet.addVoter(voter4)); + assertEquals(Optional.of(VoterSet.fromMap(new HashMap<>(aVoterMap))), voterSet.addVoter(voter4)); } @Test void testRemoveVoter() { Map aVoterMap = voterMap(IntStream.of(1, 2, 3), true); - VoterSet voterSet = new VoterSet(new HashMap<>(aVoterMap)); + VoterSet voterSet = VoterSet.fromMap(new HashMap<>(aVoterMap)); assertEquals(Optional.empty(), voterSet.removeVoter(ReplicaKey.of(4, ReplicaKey.NO_DIRECTORY_ID))); assertEquals(Optional.empty(), voterSet.removeVoter(ReplicaKey.of(4, Uuid.randomUuid()))); VoterSet.VoterNode voter3 = aVoterMap.remove(3); assertEquals( - Optional.of(new VoterSet(new HashMap<>(aVoterMap))), + Optional.of(VoterSet.fromMap(new HashMap<>(aVoterMap))), voterSet.removeVoter(voter3.voterKey()) ); } @@ -118,7 +118,7 @@ public final class VoterSetTest { @Test void testUpdateVoter() { Map aVoterMap = voterMap(IntStream.of(1, 2, 3), true); - VoterSet voterSet = new VoterSet(new HashMap<>(aVoterMap)); + VoterSet voterSet = VoterSet.fromMap(new HashMap<>(aVoterMap)); assertEquals(Optional.empty(), voterSet.updateVoter(voterNode(4, true))); assertFalse(voterSet.voterNodeNeedsUpdate(voterNode(4, true))); @@ -140,7 +140,7 @@ public final class VoterSetTest { assertTrue(voterSet.voterNodeNeedsUpdate(newVoter3)); assertEquals( - Optional.of(new VoterSet(new HashMap<>(aVoterMap))), + Optional.of(VoterSet.fromMap(new HashMap<>(aVoterMap))), voterSet.updateVoter(newVoter3) ); } @@ -149,7 +149,7 @@ public final class VoterSetTest { @Test void testCannotRemoveToEmptyVoterSet() { Map aVoterMap = voterMap(IntStream.of(1), true); - VoterSet voterSet = new VoterSet(new HashMap<>(aVoterMap)); + VoterSet voterSet = VoterSet.fromMap(new HashMap<>(aVoterMap)); ReplicaKey voter1 = aVoterMap.get(1).voterKey(); assertTrue(voterSet.isVoter(voter1)); @@ -159,7 +159,7 @@ public final class VoterSetTest { @Test void testIsVoterWithDirectoryId() { Map aVoterMap = voterMap(IntStream.of(1, 2, 3), true); - VoterSet voterSet = new VoterSet(new HashMap<>(aVoterMap)); + VoterSet voterSet = VoterSet.fromMap(new HashMap<>(aVoterMap)); assertTrue(voterSet.isVoter(aVoterMap.get(1).voterKey())); assertFalse(voterSet.isVoter(ReplicaKey.of(1, Uuid.randomUuid()))); @@ -176,7 +176,7 @@ public final class VoterSetTest { @Test void testIsVoterWithoutDirectoryId() { Map aVoterMap = voterMap(IntStream.of(1, 2, 3), false); - VoterSet voterSet = new VoterSet(new HashMap<>(aVoterMap)); + VoterSet voterSet = VoterSet.fromMap(new HashMap<>(aVoterMap)); assertTrue(voterSet.isVoter(ReplicaKey.of(1, ReplicaKey.NO_DIRECTORY_ID))); assertTrue(voterSet.isVoter(ReplicaKey.of(1, Uuid.randomUuid()))); @@ -212,7 +212,7 @@ public final class VoterSetTest { @ValueSource(booleans = { true, false }) void testEndpoints(boolean withDirectoryId) { Map aVoterMap = voterMap(IntStream.of(1, 2, 3), withDirectoryId); - VoterSet voterSet = new VoterSet(new HashMap<>(aVoterMap)); + VoterSet voterSet = VoterSet.fromMap(new HashMap<>(aVoterMap)); assertNotEquals(Endpoints.empty(), voterSet.listeners(1)); assertNotEquals(Endpoints.empty(), voterSet.listeners(2)); @@ -223,7 +223,7 @@ public final class VoterSetTest { @Test void testIsOnlyVoterInStandalone() { Map aVoterMap = voterMap(IntStream.of(1), true); - VoterSet voterSet = new VoterSet(new HashMap<>(aVoterMap)); + VoterSet voterSet = VoterSet.fromMap(new HashMap<>(aVoterMap)); assertTrue(voterSet.isOnlyVoter(aVoterMap.get(1).voterKey())); assertFalse(voterSet.isOnlyVoter(ReplicaKey.of(1, Uuid.randomUuid()))); @@ -237,7 +237,7 @@ public final class VoterSetTest { @Test void testIsOnlyVoterInNotStandalone() { Map aVoterMap = voterMap(IntStream.of(1, 2), true); - VoterSet voterSet = new VoterSet(new HashMap<>(aVoterMap)); + VoterSet voterSet = VoterSet.fromMap(new HashMap<>(aVoterMap)); assertFalse(voterSet.isOnlyVoter(aVoterMap.get(1).voterKey())); assertFalse(voterSet.isOnlyVoter(ReplicaKey.of(1, Uuid.randomUuid()))); @@ -253,7 +253,7 @@ public final class VoterSetTest { @Test void testRecordRoundTrip() { - VoterSet voterSet = new VoterSet(voterMap(IntStream.of(1, 2, 3), true)); + VoterSet voterSet = VoterSet.fromMap(voterMap(IntStream.of(1, 2, 3), true)); assertEquals(voterSet, VoterSet.fromVotersRecord(voterSet.toVotersRecord((short) 0))); } @@ -375,7 +375,7 @@ public final class VoterSetTest { } public static VoterSet voterSet(Map voters) { - return new VoterSet(voters); + return VoterSet.fromMap(voters); } public static VoterSet voterSet(Stream voterKeys) { diff --git a/raft/src/test/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachineTest.java b/raft/src/test/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachineTest.java index e3673bef521..99226efae20 100644 --- a/raft/src/test/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachineTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachineTest.java @@ -44,7 +44,7 @@ final class KRaftControlRecordStateMachineTest { return new MockLog(new TopicPartition("partition", 0), Uuid.randomUuid(), new LogContext()); } - private static KRaftControlRecordStateMachine buildPartitionListener(MockLog log, Optional staticVoterSet) { + private static KRaftControlRecordStateMachine buildPartitionListener(MockLog log, VoterSet staticVoterSet) { return new KRaftControlRecordStateMachine( staticVoterSet, log, @@ -60,7 +60,7 @@ final class KRaftControlRecordStateMachineTest { MockLog log = buildLog(); VoterSet voterSet = VoterSetTest.voterSet(VoterSetTest.voterMap(IntStream.of(1, 2, 3), true)); - KRaftControlRecordStateMachine partitionState = buildPartitionListener(log, Optional.of(voterSet)); + KRaftControlRecordStateMachine partitionState = buildPartitionListener(log, voterSet); // This should be a no-op operation partitionState.updateState(); @@ -68,6 +68,18 @@ final class KRaftControlRecordStateMachineTest { assertEquals(voterSet, partitionState.lastVoterSet()); } + @Test + void testEmptyPartitionWithNoStaticVoters() { + MockLog log = buildLog(); + + KRaftControlRecordStateMachine partitionState = buildPartitionListener(log, VoterSet.empty()); + + // This should be a no-op operation + partitionState.updateState(); + + assertEquals(VoterSet.empty(), partitionState.lastVoterSet()); + } + @Test void testUpdateWithoutSnapshot() { MockLog log = buildLog(); @@ -75,7 +87,7 @@ final class KRaftControlRecordStateMachineTest { BufferSupplier bufferSupplier = BufferSupplier.NO_CACHING; int epoch = 1; - KRaftControlRecordStateMachine partitionState = buildPartitionListener(log, Optional.of(staticVoterSet)); + KRaftControlRecordStateMachine partitionState = buildPartitionListener(log, staticVoterSet); // Append the kraft.version control record KRaftVersion kraftVersion = KRaftVersion.KRAFT_VERSION_1; @@ -118,7 +130,7 @@ final class KRaftControlRecordStateMachineTest { BufferSupplier bufferSupplier = BufferSupplier.NO_CACHING; int epoch = 1; - KRaftControlRecordStateMachine partitionState = buildPartitionListener(log, Optional.of(staticVoterSet)); + KRaftControlRecordStateMachine partitionState = buildPartitionListener(log, staticVoterSet); // Create a snapshot that doesn't have any kraft.version or voter set control records RecordsSnapshotWriter.Builder builder = new RecordsSnapshotWriter.Builder() @@ -168,7 +180,7 @@ final class KRaftControlRecordStateMachineTest { VoterSet staticVoterSet = VoterSetTest.voterSet(VoterSetTest.voterMap(IntStream.of(1, 2, 3), true)); int epoch = 1; - KRaftControlRecordStateMachine partitionState = buildPartitionListener(log, Optional.of(staticVoterSet)); + KRaftControlRecordStateMachine partitionState = buildPartitionListener(log, staticVoterSet); // Create a snapshot that has kraft.version and voter set control records KRaftVersion kraftVersion = KRaftVersion.KRAFT_VERSION_1; @@ -198,7 +210,7 @@ final class KRaftControlRecordStateMachineTest { BufferSupplier bufferSupplier = BufferSupplier.NO_CACHING; int epoch = 1; - KRaftControlRecordStateMachine partitionState = buildPartitionListener(log, Optional.of(staticVoterSet)); + KRaftControlRecordStateMachine partitionState = buildPartitionListener(log, staticVoterSet); // Create a snapshot that has kraft.version and voter set control records KRaftVersion kraftVersion = KRaftVersion.KRAFT_VERSION_1; @@ -245,7 +257,7 @@ final class KRaftControlRecordStateMachineTest { BufferSupplier bufferSupplier = BufferSupplier.NO_CACHING; int epoch = 1; - KRaftControlRecordStateMachine partitionState = buildPartitionListener(log, Optional.of(staticVoterSet)); + KRaftControlRecordStateMachine partitionState = buildPartitionListener(log, staticVoterSet); // Append the kraft.version control record KRaftVersion kraftVersion = KRaftVersion.KRAFT_VERSION_1; @@ -313,7 +325,7 @@ final class KRaftControlRecordStateMachineTest { BufferSupplier bufferSupplier = BufferSupplier.NO_CACHING; int epoch = 1; - KRaftControlRecordStateMachine partitionState = buildPartitionListener(log, Optional.of(staticVoterSet)); + KRaftControlRecordStateMachine partitionState = buildPartitionListener(log, staticVoterSet); // Append the kraft.version control record long kraftVersionOffset = log.endOffset().offset(); diff --git a/raft/src/test/java/org/apache/kafka/raft/internals/RecordsIteratorTest.java b/raft/src/test/java/org/apache/kafka/raft/internals/RecordsIteratorTest.java index 84904032a57..a8aa1a6e6ae 100644 --- a/raft/src/test/java/org/apache/kafka/raft/internals/RecordsIteratorTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/internals/RecordsIteratorTest.java @@ -210,7 +210,7 @@ public final class RecordsIteratorTest { @Test public void testControlRecordIterationWithKraftVersion1() { AtomicReference buffer = new AtomicReference<>(null); - VoterSet voterSet = new VoterSet( + VoterSet voterSet = VoterSet.fromMap( VoterSetTest.voterMap(IntStream.of(1, 2, 3), true) ); RecordsSnapshotWriter.Builder builder = new RecordsSnapshotWriter.Builder() diff --git a/raft/src/test/java/org/apache/kafka/raft/internals/VoterSetHistoryTest.java b/raft/src/test/java/org/apache/kafka/raft/internals/VoterSetHistoryTest.java index f40311d8b7e..04f8aa8d365 100644 --- a/raft/src/test/java/org/apache/kafka/raft/internals/VoterSetHistoryTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/internals/VoterSetHistoryTest.java @@ -32,8 +32,8 @@ import static org.junit.jupiter.api.Assertions.assertThrows; public final class VoterSetHistoryTest { @Test void testStaticVoterSet() { - VoterSet staticVoterSet = new VoterSet(VoterSetTest.voterMap(IntStream.of(1, 2, 3), true)); - VoterSetHistory votersHistory = new VoterSetHistory(Optional.of(staticVoterSet)); + VoterSet staticVoterSet = VoterSet.fromMap(VoterSetTest.voterMap(IntStream.of(1, 2, 3), true)); + VoterSetHistory votersHistory = new VoterSetHistory(staticVoterSet); assertEquals(Optional.empty(), votersHistory.valueAtOrBefore(0)); assertEquals(Optional.empty(), votersHistory.valueAtOrBefore(100)); @@ -54,27 +54,27 @@ public final class VoterSetHistoryTest { @Test void TestNoStaticVoterSet() { - VoterSetHistory votersHistory = new VoterSetHistory(Optional.empty()); + VoterSetHistory votersHistory = new VoterSetHistory(VoterSet.empty()); assertEquals(Optional.empty(), votersHistory.valueAtOrBefore(0)); assertEquals(Optional.empty(), votersHistory.valueAtOrBefore(100)); - assertThrows(IllegalStateException.class, votersHistory::lastValue); + assertEquals(VoterSet.empty(), votersHistory.lastValue()); } @Test void testAddAt() { Map voterMap = VoterSetTest.voterMap(IntStream.of(1, 2, 3), true); - VoterSet staticVoterSet = new VoterSet(new HashMap<>(voterMap)); - VoterSetHistory votersHistory = new VoterSetHistory(Optional.of(staticVoterSet)); + VoterSet staticVoterSet = VoterSet.fromMap(new HashMap<>(voterMap)); + VoterSetHistory votersHistory = new VoterSetHistory(staticVoterSet); assertThrows( IllegalArgumentException.class, - () -> votersHistory.addAt(-2, new VoterSet(VoterSetTest.voterMap(IntStream.of(1, 2, 3), true))) + () -> votersHistory.addAt(-2, VoterSet.fromMap(VoterSetTest.voterMap(IntStream.of(1, 2, 3), true))) ); assertEquals(staticVoterSet, votersHistory.lastValue()); voterMap.put(4, VoterSetTest.voterNode(4, true)); - VoterSet addedVoterSet = new VoterSet(new HashMap<>(voterMap)); + VoterSet addedVoterSet = VoterSet.fromMap(new HashMap<>(voterMap)); votersHistory.addAt(100, addedVoterSet); assertEquals(addedVoterSet, votersHistory.lastValue()); @@ -82,7 +82,7 @@ public final class VoterSetHistoryTest { assertEquals(Optional.of(addedVoterSet), votersHistory.valueAtOrBefore(100)); voterMap.remove(4); - VoterSet removedVoterSet = new VoterSet(new HashMap<>(voterMap)); + VoterSet removedVoterSet = VoterSet.fromMap(new HashMap<>(voterMap)); votersHistory.addAt(200, removedVoterSet); assertEquals(removedVoterSet, votersHistory.lastValue()); @@ -94,8 +94,8 @@ public final class VoterSetHistoryTest { @Test void testBootstrapAddAt() { Map voterMap = VoterSetTest.voterMap(IntStream.of(1, 2, 3), true); - VoterSet bootstrapVoterSet = new VoterSet(new HashMap<>(voterMap)); - VoterSetHistory votersHistory = new VoterSetHistory(Optional.empty()); + VoterSet bootstrapVoterSet = VoterSet.fromMap(new HashMap<>(voterMap)); + VoterSetHistory votersHistory = new VoterSetHistory(VoterSet.empty()); votersHistory.addAt(-1, bootstrapVoterSet); assertEquals(bootstrapVoterSet, votersHistory.lastValue()); @@ -103,7 +103,7 @@ public final class VoterSetHistoryTest { assertEquals(Optional.of(bootstrapVoterSet), votersHistory.valueAtOrBefore(-1)); voterMap.put(4, VoterSetTest.voterNode(4, true)); - VoterSet addedVoterSet = new VoterSet(new HashMap<>(voterMap)); + VoterSet addedVoterSet = VoterSet.fromMap(new HashMap<>(voterMap)); votersHistory.addAt(100, addedVoterSet); assertEquals(addedVoterSet, votersHistory.lastValue()); @@ -112,7 +112,7 @@ public final class VoterSetHistoryTest { assertEquals(Optional.of(addedVoterSet), votersHistory.valueAtOrBefore(100)); voterMap.remove(4); - VoterSet removedVoterSet = new VoterSet(new HashMap<>(voterMap)); + VoterSet removedVoterSet = VoterSet.fromMap(new HashMap<>(voterMap)); votersHistory.addAt(200, removedVoterSet); assertEquals(removedVoterSet, votersHistory.lastValue()); @@ -124,10 +124,10 @@ public final class VoterSetHistoryTest { @Test void testAddAtNonOverlapping() { - VoterSetHistory votersHistory = new VoterSetHistory(Optional.empty()); + VoterSetHistory votersHistory = new VoterSetHistory(VoterSet.empty()); Map voterMap = VoterSetTest.voterMap(IntStream.of(1, 2, 3), true); - VoterSet voterSet = new VoterSet(new HashMap<>(voterMap)); + VoterSet voterSet = VoterSet.fromMap(new HashMap<>(voterMap)); // Add a starting voter to the history votersHistory.addAt(100, voterSet); @@ -159,8 +159,8 @@ public final class VoterSetHistoryTest { @Test void testNonoverlappingFromStaticVoterSet() { Map voterMap = VoterSetTest.voterMap(IntStream.of(1, 2, 3), true); - VoterSet staticVoterSet = new VoterSet(new HashMap<>(voterMap)); - VoterSetHistory votersHistory = new VoterSetHistory(Optional.empty()); + VoterSet staticVoterSet = VoterSet.fromMap(new HashMap<>(voterMap)); + VoterSetHistory votersHistory = new VoterSetHistory(VoterSet.empty()); // Remove voter so that it doesn't overlap VoterSet nonoverlappingRemovedSet = staticVoterSet @@ -174,17 +174,17 @@ public final class VoterSetHistoryTest { @Test void testTruncateTo() { Map voterMap = VoterSetTest.voterMap(IntStream.of(1, 2, 3), true); - VoterSet staticVoterSet = new VoterSet(new HashMap<>(voterMap)); - VoterSetHistory votersHistory = new VoterSetHistory(Optional.of(staticVoterSet)); + VoterSet staticVoterSet = VoterSet.fromMap(new HashMap<>(voterMap)); + VoterSetHistory votersHistory = new VoterSetHistory(staticVoterSet); // Add voter 4 to the voter set and voter set history voterMap.put(4, VoterSetTest.voterNode(4, true)); - VoterSet voterSet1234 = new VoterSet(new HashMap<>(voterMap)); + VoterSet voterSet1234 = VoterSet.fromMap(new HashMap<>(voterMap)); votersHistory.addAt(100, voterSet1234); // Add voter 5 to the voter set and voter set history voterMap.put(5, VoterSetTest.voterNode(5, true)); - VoterSet voterSet12345 = new VoterSet(new HashMap<>(voterMap)); + VoterSet voterSet12345 = VoterSet.fromMap(new HashMap<>(voterMap)); votersHistory.addAt(200, voterSet12345); votersHistory.truncateNewEntries(201); @@ -200,17 +200,17 @@ public final class VoterSetHistoryTest { @Test void testTrimPrefixTo() { Map voterMap = VoterSetTest.voterMap(IntStream.of(1, 2, 3), true); - VoterSet staticVoterSet = new VoterSet(new HashMap<>(voterMap)); - VoterSetHistory votersHistory = new VoterSetHistory(Optional.of(staticVoterSet)); + VoterSet staticVoterSet = VoterSet.fromMap(new HashMap<>(voterMap)); + VoterSetHistory votersHistory = new VoterSetHistory(staticVoterSet); // Add voter 4 to the voter set and voter set history voterMap.put(4, VoterSetTest.voterNode(4, true)); - VoterSet voterSet1234 = new VoterSet(new HashMap<>(voterMap)); + VoterSet voterSet1234 = VoterSet.fromMap(new HashMap<>(voterMap)); votersHistory.addAt(100, voterSet1234); // Add voter 5 to the voter set and voter set history voterMap.put(5, VoterSetTest.voterNode(5, true)); - VoterSet voterSet12345 = new VoterSet(new HashMap<>(voterMap)); + VoterSet voterSet12345 = VoterSet.fromMap(new HashMap<>(voterMap)); votersHistory.addAt(200, voterSet12345); votersHistory.truncateOldEntries(99); @@ -233,17 +233,17 @@ public final class VoterSetHistoryTest { @Test void testClear() { Map voterMap = VoterSetTest.voterMap(IntStream.of(1, 2, 3), true); - VoterSet staticVoterSet = new VoterSet(new HashMap<>(voterMap)); - VoterSetHistory votersHistory = new VoterSetHistory(Optional.of(staticVoterSet)); + VoterSet staticVoterSet = VoterSet.fromMap(new HashMap<>(voterMap)); + VoterSetHistory votersHistory = new VoterSetHistory(staticVoterSet); // Add voter 4 to the voter set and voter set history voterMap.put(4, VoterSetTest.voterNode(4, true)); - VoterSet voterSet1234 = new VoterSet(new HashMap<>(voterMap)); + VoterSet voterSet1234 = VoterSet.fromMap(new HashMap<>(voterMap)); votersHistory.addAt(100, voterSet1234); // Add voter 5 to the voter set and voter set history voterMap.put(5, VoterSetTest.voterNode(5, true)); - VoterSet voterSet12345 = new VoterSet(new HashMap<>(voterMap)); + VoterSet voterSet12345 = VoterSet.fromMap(new HashMap<>(voterMap)); votersHistory.addAt(200, voterSet12345); votersHistory.clear();