mirror of https://github.com/apache/kafka.git
MINOR: Rename RaftConfig to QuorumConfig (#15797)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
parent
ce9026f597
commit
6feae817d2
|
@ -40,8 +40,8 @@ import org.apache.kafka.common.requests.RequestHeader
|
|||
import org.apache.kafka.common.security.JaasContext
|
||||
import org.apache.kafka.common.security.auth.SecurityProtocol
|
||||
import org.apache.kafka.common.utils.{LogContext, Time, Utils}
|
||||
import org.apache.kafka.raft.RaftConfig.{AddressSpec, InetAddressSpec, NON_ROUTABLE_ADDRESS, UnknownAddressSpec}
|
||||
import org.apache.kafka.raft.{FileBasedStateStore, KafkaNetworkChannel, KafkaRaftClient, KafkaRaftClientDriver, LeaderAndEpoch, RaftClient, RaftConfig, ReplicatedLog}
|
||||
import org.apache.kafka.raft.QuorumConfig.{AddressSpec, InetAddressSpec, NON_ROUTABLE_ADDRESS, UnknownAddressSpec}
|
||||
import org.apache.kafka.raft.{FileBasedStateStore, KafkaNetworkChannel, KafkaRaftClient, KafkaRaftClientDriver, LeaderAndEpoch, RaftClient, QuorumConfig, ReplicatedLog}
|
||||
import org.apache.kafka.server.ProcessRole
|
||||
import org.apache.kafka.server.common.serialization.RecordSerde
|
||||
import org.apache.kafka.server.util.KafkaScheduler
|
||||
|
@ -149,7 +149,7 @@ class KafkaRaftManager[T](
|
|||
) extends RaftManager[T] with Logging {
|
||||
|
||||
val apiVersions = new ApiVersions()
|
||||
private val raftConfig = new RaftConfig(config)
|
||||
private val raftConfig = new QuorumConfig(config)
|
||||
private val threadNamePrefix = threadNamePrefixOpt.getOrElse("kafka-raft")
|
||||
private val logContext = new LogContext(s"[RaftManager id=${config.nodeId}] ")
|
||||
this.logIdent = logContext.logPrefix()
|
||||
|
|
|
@ -40,7 +40,7 @@ import org.apache.kafka.coordinator.group.metrics.{GroupCoordinatorMetrics, Grou
|
|||
import org.apache.kafka.coordinator.group.{GroupCoordinator, GroupCoordinatorConfig, GroupCoordinatorService, RecordSerde}
|
||||
import org.apache.kafka.image.publisher.MetadataPublisher
|
||||
import org.apache.kafka.metadata.{BrokerState, ListenerInfo, VersionRange}
|
||||
import org.apache.kafka.raft.RaftConfig
|
||||
import org.apache.kafka.raft.QuorumConfig
|
||||
import org.apache.kafka.security.CredentialProvider
|
||||
import org.apache.kafka.server.{AssignmentsManager, ClientMetricsManager, NodeToControllerChannelManager}
|
||||
import org.apache.kafka.server.authorizer.Authorizer
|
||||
|
@ -222,7 +222,7 @@ class BrokerServer(
|
|||
"controller quorum voters future",
|
||||
sharedServer.controllerQuorumVotersFuture,
|
||||
startupDeadline, time)
|
||||
val controllerNodes = RaftConfig.voterConnectionsToNodes(voterConnections).asScala
|
||||
val controllerNodes = QuorumConfig.voterConnectionsToNodes(voterConnections).asScala
|
||||
val controllerNodeProvider = RaftControllerNodeProvider(raftManager, config, controllerNodes)
|
||||
|
||||
clientToControllerChannelManager = new NodeToControllerChannelManagerImpl(
|
||||
|
|
|
@ -41,7 +41,7 @@ import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer
|
|||
import org.apache.kafka.metadata.bootstrap.BootstrapMetadata
|
||||
import org.apache.kafka.metadata.migration.{KRaftMigrationDriver, LegacyPropagator}
|
||||
import org.apache.kafka.metadata.publisher.FeaturesPublisher
|
||||
import org.apache.kafka.raft.RaftConfig
|
||||
import org.apache.kafka.raft.QuorumConfig
|
||||
import org.apache.kafka.security.{CredentialProvider, PasswordEncoder}
|
||||
import org.apache.kafka.server.NodeToControllerChannelManager
|
||||
import org.apache.kafka.server.authorizer.Authorizer
|
||||
|
@ -215,7 +215,7 @@ class ControllerServer(
|
|||
"controller quorum voters future",
|
||||
sharedServer.controllerQuorumVotersFuture,
|
||||
startupDeadline, time)
|
||||
val controllerNodes = RaftConfig.voterConnectionsToNodes(voterConnections)
|
||||
val controllerNodes = QuorumConfig.voterConnectionsToNodes(voterConnections)
|
||||
val quorumFeatures = new QuorumFeatures(config.nodeId,
|
||||
QuorumFeatures.defaultFeatureMap(config.unstableMetadataVersionsEnabled),
|
||||
controllerNodes.asScala.map(node => Integer.valueOf(node.id())).asJava)
|
||||
|
|
|
@ -41,7 +41,7 @@ import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
|
|||
import org.apache.kafka.coordinator.group.assignor.PartitionAssignor
|
||||
import org.apache.kafka.coordinator.transaction.{TransactionLogConfigs, TransactionStateManagerConfigs}
|
||||
import org.apache.kafka.network.SocketServerConfigs
|
||||
import org.apache.kafka.raft.RaftConfig
|
||||
import org.apache.kafka.raft.QuorumConfig
|
||||
import org.apache.kafka.security.authorizer.AuthorizerUtils
|
||||
import org.apache.kafka.security.PasswordEncoderConfigs
|
||||
import org.apache.kafka.server.ProcessRole
|
||||
|
@ -125,7 +125,7 @@ object KafkaConfig {
|
|||
val MetadataLogSegmentMillisProp = "metadata.log.segment.ms"
|
||||
val MetadataMaxRetentionBytesProp = "metadata.max.retention.bytes"
|
||||
val MetadataMaxRetentionMillisProp = "metadata.max.retention.ms"
|
||||
val QuorumVotersProp = RaftConfig.QUORUM_VOTERS_CONFIG
|
||||
val QuorumVotersProp = QuorumConfig.QUORUM_VOTERS_CONFIG
|
||||
val MetadataMaxIdleIntervalMsProp = "metadata.max.idle.interval.ms"
|
||||
val ServerMaxStartupTimeMsProp = "server.max.startup.time.ms"
|
||||
|
||||
|
@ -640,13 +640,13 @@ object KafkaConfig {
|
|||
.define(PasswordEncoderConfigs.PASSWORD_ENCODER_ITERATIONS_CONFIG, INT, PasswordEncoderConfigs.PASSWORD_ENCODER_ITERATIONS_DEFAULT, atLeast(1024), LOW, PasswordEncoderConfigs.PASSWORD_ENCODER_ITERATIONS_DOC)
|
||||
|
||||
/** ********* Raft Quorum Configuration *********/
|
||||
.define(RaftConfig.QUORUM_VOTERS_CONFIG, LIST, Defaults.QUORUM_VOTERS, new RaftConfig.ControllerQuorumVotersValidator(), HIGH, RaftConfig.QUORUM_VOTERS_DOC)
|
||||
.define(RaftConfig.QUORUM_ELECTION_TIMEOUT_MS_CONFIG, INT, Defaults.QUORUM_ELECTION_TIMEOUT_MS, null, HIGH, RaftConfig.QUORUM_ELECTION_TIMEOUT_MS_DOC)
|
||||
.define(RaftConfig.QUORUM_FETCH_TIMEOUT_MS_CONFIG, INT, Defaults.QUORUM_FETCH_TIMEOUT_MS, null, HIGH, RaftConfig.QUORUM_FETCH_TIMEOUT_MS_DOC)
|
||||
.define(RaftConfig.QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG, INT, Defaults.QUORUM_ELECTION_BACKOFF_MS, null, HIGH, RaftConfig.QUORUM_ELECTION_BACKOFF_MAX_MS_DOC)
|
||||
.define(RaftConfig.QUORUM_LINGER_MS_CONFIG, INT, Defaults.QUORUM_LINGER_MS, null, MEDIUM, RaftConfig.QUORUM_LINGER_MS_DOC)
|
||||
.define(RaftConfig.QUORUM_REQUEST_TIMEOUT_MS_CONFIG, INT, Defaults.QUORUM_REQUEST_TIMEOUT_MS, null, MEDIUM, RaftConfig.QUORUM_REQUEST_TIMEOUT_MS_DOC)
|
||||
.define(RaftConfig.QUORUM_RETRY_BACKOFF_MS_CONFIG, INT, Defaults.QUORUM_RETRY_BACKOFF_MS, null, LOW, RaftConfig.QUORUM_RETRY_BACKOFF_MS_DOC)
|
||||
.define(QuorumConfig.QUORUM_VOTERS_CONFIG, LIST, Defaults.QUORUM_VOTERS, new QuorumConfig.ControllerQuorumVotersValidator(), HIGH, QuorumConfig.QUORUM_VOTERS_DOC)
|
||||
.define(QuorumConfig.QUORUM_ELECTION_TIMEOUT_MS_CONFIG, INT, Defaults.QUORUM_ELECTION_TIMEOUT_MS, null, HIGH, QuorumConfig.QUORUM_ELECTION_TIMEOUT_MS_DOC)
|
||||
.define(QuorumConfig.QUORUM_FETCH_TIMEOUT_MS_CONFIG, INT, Defaults.QUORUM_FETCH_TIMEOUT_MS, null, HIGH, QuorumConfig.QUORUM_FETCH_TIMEOUT_MS_DOC)
|
||||
.define(QuorumConfig.QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG, INT, Defaults.QUORUM_ELECTION_BACKOFF_MS, null, HIGH, QuorumConfig.QUORUM_ELECTION_BACKOFF_MAX_MS_DOC)
|
||||
.define(QuorumConfig.QUORUM_LINGER_MS_CONFIG, INT, Defaults.QUORUM_LINGER_MS, null, MEDIUM, QuorumConfig.QUORUM_LINGER_MS_DOC)
|
||||
.define(QuorumConfig.QUORUM_REQUEST_TIMEOUT_MS_CONFIG, INT, Defaults.QUORUM_REQUEST_TIMEOUT_MS, null, MEDIUM, QuorumConfig.QUORUM_REQUEST_TIMEOUT_MS_DOC)
|
||||
.define(QuorumConfig.QUORUM_RETRY_BACKOFF_MS_CONFIG, INT, Defaults.QUORUM_RETRY_BACKOFF_MS, null, LOW, QuorumConfig.QUORUM_RETRY_BACKOFF_MS_DOC)
|
||||
|
||||
/** Internal Configurations **/
|
||||
// This indicates whether unreleased APIs should be advertised by this node.
|
||||
|
@ -1244,13 +1244,13 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
|
|||
def compressionType = getString(KafkaConfig.CompressionTypeProp)
|
||||
|
||||
/** ********* Raft Quorum Configuration *********/
|
||||
val quorumVoters = getList(RaftConfig.QUORUM_VOTERS_CONFIG)
|
||||
val quorumElectionTimeoutMs = getInt(RaftConfig.QUORUM_ELECTION_TIMEOUT_MS_CONFIG)
|
||||
val quorumFetchTimeoutMs = getInt(RaftConfig.QUORUM_FETCH_TIMEOUT_MS_CONFIG)
|
||||
val quorumElectionBackoffMs = getInt(RaftConfig.QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG)
|
||||
val quorumLingerMs = getInt(RaftConfig.QUORUM_LINGER_MS_CONFIG)
|
||||
val quorumRequestTimeoutMs = getInt(RaftConfig.QUORUM_REQUEST_TIMEOUT_MS_CONFIG)
|
||||
val quorumRetryBackoffMs = getInt(RaftConfig.QUORUM_RETRY_BACKOFF_MS_CONFIG)
|
||||
val quorumVoters = getList(QuorumConfig.QUORUM_VOTERS_CONFIG)
|
||||
val quorumElectionTimeoutMs = getInt(QuorumConfig.QUORUM_ELECTION_TIMEOUT_MS_CONFIG)
|
||||
val quorumFetchTimeoutMs = getInt(QuorumConfig.QUORUM_FETCH_TIMEOUT_MS_CONFIG)
|
||||
val quorumElectionBackoffMs = getInt(QuorumConfig.QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG)
|
||||
val quorumLingerMs = getInt(QuorumConfig.QUORUM_LINGER_MS_CONFIG)
|
||||
val quorumRequestTimeoutMs = getInt(QuorumConfig.QUORUM_REQUEST_TIMEOUT_MS_CONFIG)
|
||||
val quorumRetryBackoffMs = getInt(QuorumConfig.QUORUM_RETRY_BACKOFF_MS_CONFIG)
|
||||
|
||||
/** Internal Configurations **/
|
||||
val unstableApiVersionsEnabled = getBoolean(KafkaConfig.UnstableApiVersionsEnableProp)
|
||||
|
@ -1446,7 +1446,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
|
|||
val advertisedListenerNames = effectiveAdvertisedListeners.map(_.listenerName).toSet
|
||||
|
||||
// validate KRaft-related configs
|
||||
val voterAddressSpecsByNodeId = RaftConfig.parseVoterConnections(quorumVoters)
|
||||
val voterAddressSpecsByNodeId = QuorumConfig.parseVoterConnections(quorumVoters)
|
||||
def validateNonEmptyQuorumVotersForKRaft(): Unit = {
|
||||
if (voterAddressSpecsByNodeId.isEmpty) {
|
||||
throw new ConfigException(s"If using ${KafkaConfig.ProcessRolesProp}, ${KafkaConfig.QuorumVotersProp} must contain a parseable set of voters.")
|
||||
|
|
|
@ -29,7 +29,7 @@ import org.apache.kafka.metadata.KafkaConfigSchema
|
|||
import org.apache.kafka.metadata.bootstrap.{BootstrapDirectory, BootstrapMetadata}
|
||||
import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble.VerificationFlag.{REQUIRE_AT_LEAST_ONE_VALID, REQUIRE_METADATA_LOG_DIR}
|
||||
import org.apache.kafka.metadata.properties.{MetaProperties, MetaPropertiesEnsemble}
|
||||
import org.apache.kafka.raft.RaftConfig
|
||||
import org.apache.kafka.raft.QuorumConfig
|
||||
import org.apache.kafka.server.ProcessRole
|
||||
import org.apache.kafka.server.config.ServerTopicConfigSynonyms
|
||||
import org.apache.kafka.server.metrics.KafkaYammerMetrics
|
||||
|
@ -66,7 +66,7 @@ class KafkaRaftServer(
|
|||
)
|
||||
|
||||
private val controllerQuorumVotersFuture = CompletableFuture.completedFuture(
|
||||
RaftConfig.parseVoterConnections(config.quorumVoters))
|
||||
QuorumConfig.parseVoterConnections(config.quorumVoters))
|
||||
|
||||
private val sharedServer = new SharedServer(
|
||||
config,
|
||||
|
|
|
@ -51,7 +51,7 @@ import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble.VerificationF
|
|||
import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble.VerificationFlag.REQUIRE_V0
|
||||
import org.apache.kafka.metadata.properties.{MetaProperties, MetaPropertiesEnsemble}
|
||||
import org.apache.kafka.metadata.{BrokerState, MetadataRecordSerde, VersionRange}
|
||||
import org.apache.kafka.raft.RaftConfig
|
||||
import org.apache.kafka.raft.QuorumConfig
|
||||
import org.apache.kafka.security.CredentialProvider
|
||||
import org.apache.kafka.server.NodeToControllerChannelManager
|
||||
import org.apache.kafka.server.authorizer.Authorizer
|
||||
|
@ -322,8 +322,8 @@ class KafkaServer(
|
|||
remoteLogManagerOpt = createRemoteLogManager()
|
||||
|
||||
if (config.migrationEnabled) {
|
||||
kraftControllerNodes = RaftConfig.voterConnectionsToNodes(
|
||||
RaftConfig.parseVoterConnections(config.quorumVoters)).asScala
|
||||
kraftControllerNodes = QuorumConfig.voterConnectionsToNodes(
|
||||
QuorumConfig.parseVoterConnections(config.quorumVoters)).asScala
|
||||
} else {
|
||||
kraftControllerNodes = Seq.empty
|
||||
}
|
||||
|
@ -428,7 +428,7 @@ class KafkaServer(
|
|||
|
||||
// If the ZK broker is in migration mode, start up a RaftManager to learn about the new KRaft controller
|
||||
val controllerQuorumVotersFuture = CompletableFuture.completedFuture(
|
||||
RaftConfig.parseVoterConnections(config.quorumVoters))
|
||||
QuorumConfig.parseVoterConnections(config.quorumVoters))
|
||||
raftManager = new KafkaRaftManager[ApiMessageAndVersion](
|
||||
metaPropsEnsemble.clusterId().get(),
|
||||
config,
|
||||
|
@ -441,7 +441,7 @@ class KafkaServer(
|
|||
controllerQuorumVotersFuture,
|
||||
fatalFaultHandler = new LoggingFaultHandler("raftManager", () => shutdown())
|
||||
)
|
||||
val controllerNodes = RaftConfig.voterConnectionsToNodes(controllerQuorumVotersFuture.get()).asScala
|
||||
val controllerNodes = QuorumConfig.voterConnectionsToNodes(controllerQuorumVotersFuture.get()).asScala
|
||||
val quorumControllerNodeProvider = RaftControllerNodeProvider(raftManager, config, controllerNodes)
|
||||
val brokerToQuorumChannelManager = new NodeToControllerChannelManagerImpl(
|
||||
controllerNodeProvider = quorumControllerNodeProvider,
|
||||
|
|
|
@ -31,7 +31,7 @@ import org.apache.kafka.image.publisher.{SnapshotEmitter, SnapshotGenerator}
|
|||
import org.apache.kafka.image.publisher.metrics.SnapshotEmitterMetrics
|
||||
import org.apache.kafka.metadata.MetadataRecordSerde
|
||||
import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble
|
||||
import org.apache.kafka.raft.RaftConfig.AddressSpec
|
||||
import org.apache.kafka.raft.QuorumConfig.AddressSpec
|
||||
import org.apache.kafka.server.ProcessRole
|
||||
import org.apache.kafka.server.common.ApiMessageAndVersion
|
||||
import org.apache.kafka.server.fault.{FaultHandler, LoggingFaultHandler, ProcessTerminatingFaultHandler}
|
||||
|
|
|
@ -35,7 +35,7 @@ import org.apache.kafka.common.security.token.delegation.internals.DelegationTok
|
|||
import org.apache.kafka.common.utils.{Time, Utils}
|
||||
import org.apache.kafka.common.{TopicPartition, Uuid, protocol}
|
||||
import org.apache.kafka.raft.errors.NotLeaderException
|
||||
import org.apache.kafka.raft.{Batch, BatchReader, LeaderAndEpoch, RaftClient, RaftConfig}
|
||||
import org.apache.kafka.raft.{Batch, BatchReader, LeaderAndEpoch, RaftClient, QuorumConfig}
|
||||
import org.apache.kafka.security.CredentialProvider
|
||||
import org.apache.kafka.server.common.{Features, MetadataVersion}
|
||||
import org.apache.kafka.server.common.serialization.RecordSerde
|
||||
|
@ -91,7 +91,7 @@ class TestRaftServer(
|
|||
time,
|
||||
metrics,
|
||||
Some(threadNamePrefix),
|
||||
CompletableFuture.completedFuture(RaftConfig.parseVoterConnections(config.quorumVoters)),
|
||||
CompletableFuture.completedFuture(QuorumConfig.parseVoterConnections(config.quorumVoters)),
|
||||
new ProcessTerminatingFaultHandler.Builder().build()
|
||||
)
|
||||
|
||||
|
|
|
@ -38,7 +38,7 @@ import org.apache.kafka.metadata.bootstrap.BootstrapDirectory;
|
|||
import org.apache.kafka.metadata.properties.MetaProperties;
|
||||
import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble;
|
||||
import org.apache.kafka.network.SocketServerConfigs;
|
||||
import org.apache.kafka.raft.RaftConfig;
|
||||
import org.apache.kafka.raft.QuorumConfig;
|
||||
import org.apache.kafka.server.common.ApiMessageAndVersion;
|
||||
import org.apache.kafka.server.fault.FaultHandler;
|
||||
import org.apache.kafka.server.fault.MockFaultHandler;
|
||||
|
@ -89,7 +89,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
|
|||
*/
|
||||
private static class ControllerQuorumVotersFutureManager implements AutoCloseable {
|
||||
private final int expectedControllers;
|
||||
private final CompletableFuture<Map<Integer, RaftConfig.AddressSpec>> future = new CompletableFuture<>();
|
||||
private final CompletableFuture<Map<Integer, QuorumConfig.AddressSpec>> future = new CompletableFuture<>();
|
||||
private final Map<Integer, Integer> controllerPorts = new TreeMap<>();
|
||||
|
||||
ControllerQuorumVotersFutureManager(int expectedControllers) {
|
||||
|
@ -102,7 +102,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
|
|||
future.complete(controllerPorts.entrySet().stream().
|
||||
collect(Collectors.toMap(
|
||||
Map.Entry::getKey,
|
||||
entry -> new RaftConfig.InetAddressSpec(new InetSocketAddress("localhost", entry.getValue()))
|
||||
entry -> new QuorumConfig.InetAddressSpec(new InetSocketAddress("localhost", entry.getValue()))
|
||||
)));
|
||||
}
|
||||
}
|
||||
|
@ -193,7 +193,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
|
|||
String uninitializedQuorumVotersString = nodes.controllerNodes().keySet().stream().
|
||||
map(n -> String.format("%d@0.0.0.0:0", n)).
|
||||
collect(Collectors.joining(","));
|
||||
props.put(RaftConfig.QUORUM_VOTERS_CONFIG, uninitializedQuorumVotersString);
|
||||
props.put(QuorumConfig.QUORUM_VOTERS_CONFIG, uninitializedQuorumVotersString);
|
||||
|
||||
// reduce log cleaner offset map memory usage
|
||||
props.put(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP, "2097152");
|
||||
|
@ -450,7 +450,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
|
|||
}
|
||||
|
||||
public String quorumVotersConfig() throws ExecutionException, InterruptedException {
|
||||
Collection<Node> controllerNodes = RaftConfig.voterConnectionsToNodes(
|
||||
Collection<Node> controllerNodes = QuorumConfig.voterConnectionsToNodes(
|
||||
controllerQuorumVotersFutureManager.future.get());
|
||||
StringBuilder bld = new StringBuilder();
|
||||
String prefix = "";
|
||||
|
|
|
@ -24,7 +24,7 @@ import kafka.test.junit.ZkClusterInvocationContext.ZkClusterInstance
|
|||
import kafka.testkit.{KafkaClusterTestKit, TestKitNodes}
|
||||
import org.apache.kafka.common.Uuid
|
||||
import org.apache.kafka.network.SocketServerConfigs
|
||||
import org.apache.kafka.raft.RaftConfig
|
||||
import org.apache.kafka.raft.QuorumConfig
|
||||
import org.apache.kafka.server.common.MetadataVersion
|
||||
import org.apache.kafka.server.config.ZkConfigs
|
||||
import org.junit.jupiter.api.Assertions.{assertThrows, fail}
|
||||
|
@ -72,7 +72,7 @@ class KafkaServerKRaftRegistrationTest {
|
|||
|
||||
// Enable migration configs and restart brokers
|
||||
zkCluster.config().serverProperties().put(KafkaConfig.MigrationEnabledProp, "true")
|
||||
zkCluster.config().serverProperties().put(RaftConfig.QUORUM_VOTERS_CONFIG, kraftCluster.quorumVotersConfig())
|
||||
zkCluster.config().serverProperties().put(QuorumConfig.QUORUM_VOTERS_CONFIG, kraftCluster.quorumVotersConfig())
|
||||
zkCluster.config().serverProperties().put(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER")
|
||||
zkCluster.config().serverProperties().put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, "CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT")
|
||||
zkCluster.rollingBrokerRestart()
|
||||
|
@ -109,7 +109,7 @@ class KafkaServerKRaftRegistrationTest {
|
|||
|
||||
// Enable migration configs and restart brokers
|
||||
zkCluster.config().serverProperties().put(KafkaConfig.MigrationEnabledProp, "true")
|
||||
zkCluster.config().serverProperties().put(RaftConfig.QUORUM_VOTERS_CONFIG, kraftCluster.quorumVotersConfig())
|
||||
zkCluster.config().serverProperties().put(QuorumConfig.QUORUM_VOTERS_CONFIG, kraftCluster.quorumVotersConfig())
|
||||
zkCluster.config().serverProperties().put(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER")
|
||||
zkCluster.config().serverProperties().put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT")
|
||||
assertThrows(classOf[IllegalArgumentException], () => zkCluster.rollingBrokerRestart())
|
||||
|
|
|
@ -36,7 +36,7 @@ import org.apache.kafka.common.metadata.FeatureLevelRecord
|
|||
import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble.VerificationFlag.{REQUIRE_AT_LEAST_ONE_VALID, REQUIRE_METADATA_LOG_DIR}
|
||||
import org.apache.kafka.metadata.properties.{MetaProperties, MetaPropertiesEnsemble, MetaPropertiesVersion}
|
||||
import org.apache.kafka.network.SocketServerConfigs
|
||||
import org.apache.kafka.raft.RaftConfig.{AddressSpec, InetAddressSpec}
|
||||
import org.apache.kafka.raft.QuorumConfig.{AddressSpec, InetAddressSpec}
|
||||
import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion}
|
||||
import org.apache.kafka.server.fault.{FaultHandler, MockFaultHandler}
|
||||
import org.apache.zookeeper.client.ZKClientConfig
|
||||
|
|
|
@ -44,7 +44,7 @@ import org.apache.kafka.image.{MetadataDelta, MetadataImage, MetadataProvenance}
|
|||
import org.apache.kafka.metadata.authorizer.StandardAcl
|
||||
import org.apache.kafka.metadata.migration.ZkMigrationLeadershipState
|
||||
import org.apache.kafka.network.SocketServerConfigs
|
||||
import org.apache.kafka.raft.RaftConfig
|
||||
import org.apache.kafka.raft.QuorumConfig
|
||||
import org.apache.kafka.security.authorizer.AclEntry.{WILDCARD_HOST, WILDCARD_PRINCIPAL_STRING}
|
||||
import org.apache.kafka.security.PasswordEncoder
|
||||
import org.apache.kafka.server.ControllerRequestCompletionHandler
|
||||
|
@ -188,7 +188,7 @@ class ZkMigrationIntegrationTest {
|
|||
// Enable migration configs and restart brokers
|
||||
log.info("Restart brokers in migration mode")
|
||||
zkCluster.config().serverProperties().put(KafkaConfig.MigrationEnabledProp, "true")
|
||||
zkCluster.config().serverProperties().put(RaftConfig.QUORUM_VOTERS_CONFIG, kraftCluster.quorumVotersConfig())
|
||||
zkCluster.config().serverProperties().put(QuorumConfig.QUORUM_VOTERS_CONFIG, kraftCluster.quorumVotersConfig())
|
||||
zkCluster.config().serverProperties().put(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER")
|
||||
zkCluster.config().serverProperties().put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, "CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT")
|
||||
zkCluster.rollingBrokerRestart() // This would throw if authorizers weren't allowed
|
||||
|
@ -320,7 +320,7 @@ class ZkMigrationIntegrationTest {
|
|||
// Enable migration configs and restart brokers
|
||||
log.info("Restart brokers in migration mode")
|
||||
zkCluster.config().serverProperties().put(KafkaConfig.MigrationEnabledProp, "true")
|
||||
zkCluster.config().serverProperties().put(RaftConfig.QUORUM_VOTERS_CONFIG, kraftCluster.quorumVotersConfig())
|
||||
zkCluster.config().serverProperties().put(QuorumConfig.QUORUM_VOTERS_CONFIG, kraftCluster.quorumVotersConfig())
|
||||
zkCluster.config().serverProperties().put(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER")
|
||||
zkCluster.config().serverProperties().put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, "CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT")
|
||||
zkCluster.rollingBrokerRestart()
|
||||
|
@ -454,7 +454,7 @@ class ZkMigrationIntegrationTest {
|
|||
// Enable migration configs and restart brokers
|
||||
log.info("Restart brokers in migration mode")
|
||||
zkCluster.config().serverProperties().put(KafkaConfig.MigrationEnabledProp, "true")
|
||||
zkCluster.config().serverProperties().put(RaftConfig.QUORUM_VOTERS_CONFIG, kraftCluster.quorumVotersConfig())
|
||||
zkCluster.config().serverProperties().put(QuorumConfig.QUORUM_VOTERS_CONFIG, kraftCluster.quorumVotersConfig())
|
||||
zkCluster.config().serverProperties().put(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER")
|
||||
zkCluster.config().serverProperties().put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, "CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT")
|
||||
zkCluster.rollingBrokerRestart()
|
||||
|
@ -518,7 +518,7 @@ class ZkMigrationIntegrationTest {
|
|||
// Enable migration configs and restart brokers
|
||||
log.info("Restart brokers in migration mode")
|
||||
zkCluster.config().serverProperties().put(KafkaConfig.MigrationEnabledProp, "true")
|
||||
zkCluster.config().serverProperties().put(RaftConfig.QUORUM_VOTERS_CONFIG, kraftCluster.quorumVotersConfig())
|
||||
zkCluster.config().serverProperties().put(QuorumConfig.QUORUM_VOTERS_CONFIG, kraftCluster.quorumVotersConfig())
|
||||
zkCluster.config().serverProperties().put(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER")
|
||||
zkCluster.config().serverProperties().put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, "CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT")
|
||||
zkCluster.rollingBrokerRestart() // This would throw if authorizers weren't allowed
|
||||
|
@ -603,7 +603,7 @@ class ZkMigrationIntegrationTest {
|
|||
// Enable migration configs and restart brokers
|
||||
log.info("Restart brokers in migration mode")
|
||||
zkCluster.config().serverProperties().put(KafkaConfig.MigrationEnabledProp, "true")
|
||||
zkCluster.config().serverProperties().put(RaftConfig.QUORUM_VOTERS_CONFIG, kraftCluster.quorumVotersConfig())
|
||||
zkCluster.config().serverProperties().put(QuorumConfig.QUORUM_VOTERS_CONFIG, kraftCluster.quorumVotersConfig())
|
||||
zkCluster.config().serverProperties().put(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER")
|
||||
zkCluster.config().serverProperties().put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, "CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT")
|
||||
zkCluster.rollingBrokerRestart()
|
||||
|
@ -668,7 +668,7 @@ class ZkMigrationIntegrationTest {
|
|||
// Enable migration configs and restart brokers
|
||||
log.info("Restart brokers in migration mode")
|
||||
zkCluster.config().serverProperties().put(KafkaConfig.MigrationEnabledProp, "true")
|
||||
zkCluster.config().serverProperties().put(RaftConfig.QUORUM_VOTERS_CONFIG, kraftCluster.quorumVotersConfig())
|
||||
zkCluster.config().serverProperties().put(QuorumConfig.QUORUM_VOTERS_CONFIG, kraftCluster.quorumVotersConfig())
|
||||
zkCluster.config().serverProperties().put(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER")
|
||||
zkCluster.config().serverProperties().put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, "CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT")
|
||||
zkCluster.rollingBrokerRestart()
|
||||
|
@ -728,7 +728,7 @@ class ZkMigrationIntegrationTest {
|
|||
// Enable migration configs and restart brokers
|
||||
log.info("Restart brokers in migration mode")
|
||||
zkCluster.config().serverProperties().put(KafkaConfig.MigrationEnabledProp, "true")
|
||||
zkCluster.config().serverProperties().put(RaftConfig.QUORUM_VOTERS_CONFIG, kraftCluster.quorumVotersConfig())
|
||||
zkCluster.config().serverProperties().put(QuorumConfig.QUORUM_VOTERS_CONFIG, kraftCluster.quorumVotersConfig())
|
||||
zkCluster.config().serverProperties().put(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER")
|
||||
zkCluster.config().serverProperties().put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, "CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT")
|
||||
zkCluster.rollingBrokerRestart()
|
||||
|
@ -803,7 +803,7 @@ class ZkMigrationIntegrationTest {
|
|||
// Enable migration configs and restart brokers
|
||||
log.info("Restart brokers in migration mode")
|
||||
zkCluster.config().serverProperties().put(KafkaConfig.MigrationEnabledProp, "true")
|
||||
zkCluster.config().serverProperties().put(RaftConfig.QUORUM_VOTERS_CONFIG, kraftCluster.quorumVotersConfig())
|
||||
zkCluster.config().serverProperties().put(QuorumConfig.QUORUM_VOTERS_CONFIG, kraftCluster.quorumVotersConfig())
|
||||
zkCluster.config().serverProperties().put(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER")
|
||||
zkCluster.config().serverProperties().put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, "CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT")
|
||||
zkCluster.rollingBrokerRestart()
|
||||
|
|
|
@ -30,7 +30,7 @@ import org.apache.kafka.common.Uuid
|
|||
import org.apache.kafka.common.metrics.Metrics
|
||||
import org.apache.kafka.common.utils.Time
|
||||
import org.apache.kafka.network.SocketServerConfigs
|
||||
import org.apache.kafka.raft.RaftConfig
|
||||
import org.apache.kafka.raft.QuorumConfig
|
||||
import org.apache.kafka.server.config.{ReplicationConfigs, ServerLogConfigs}
|
||||
import org.apache.kafka.server.ProcessRole
|
||||
import org.apache.kafka.server.config.ZkConfigs
|
||||
|
@ -115,7 +115,7 @@ class RaftManagerTest {
|
|||
Time.SYSTEM,
|
||||
new Metrics(Time.SYSTEM),
|
||||
Option.empty,
|
||||
CompletableFuture.completedFuture(RaftConfig.parseVoterConnections(config.quorumVoters)),
|
||||
CompletableFuture.completedFuture(QuorumConfig.parseVoterConnections(config.quorumVoters)),
|
||||
mock(classOf[FaultHandler])
|
||||
)
|
||||
}
|
||||
|
|
|
@ -26,8 +26,7 @@ import org.apache.kafka.common.metrics.Sensor
|
|||
import org.apache.kafka.common.network.ListenerName
|
||||
import org.apache.kafka.common.record.{CompressionType, Records}
|
||||
import org.apache.kafka.common.security.auth.SecurityProtocol
|
||||
import org.apache.kafka.raft.RaftConfig
|
||||
import org.apache.kafka.raft.RaftConfig.{AddressSpec, InetAddressSpec, UNKNOWN_ADDRESS_SPEC_INSTANCE}
|
||||
import org.apache.kafka.raft.QuorumConfig.{AddressSpec, InetAddressSpec, UNKNOWN_ADDRESS_SPEC_INSTANCE}
|
||||
import org.junit.jupiter.api.Assertions._
|
||||
import org.junit.jupiter.api.Test
|
||||
|
||||
|
@ -41,6 +40,7 @@ import org.apache.kafka.coordinator.transaction.{TransactionLogConfigs, Transact
|
|||
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
|
||||
import org.apache.kafka.security.PasswordEncoderConfigs
|
||||
import org.apache.kafka.network.SocketServerConfigs
|
||||
import org.apache.kafka.raft.QuorumConfig
|
||||
import org.apache.kafka.server.common.MetadataVersion
|
||||
import org.apache.kafka.server.common.MetadataVersion.{IBP_0_8_2, IBP_3_0_IV1}
|
||||
import org.apache.kafka.server.config.{KafkaSecurityConfigs, ReplicationConfigs, ServerLogConfigs, QuotaConfigs, ServerTopicConfigSynonyms, ZkConfigs}
|
||||
|
@ -1003,13 +1003,13 @@ class KafkaConfigTest {
|
|||
case KafkaSecurityConfigs.SASL_SERVER_MAX_RECEIVE_SIZE_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number")
|
||||
|
||||
// Raft Quorum Configs
|
||||
case RaftConfig.QUORUM_VOTERS_CONFIG => // ignore string
|
||||
case RaftConfig.QUORUM_ELECTION_TIMEOUT_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number")
|
||||
case RaftConfig.QUORUM_FETCH_TIMEOUT_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number")
|
||||
case RaftConfig.QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number")
|
||||
case RaftConfig.QUORUM_LINGER_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number")
|
||||
case RaftConfig.QUORUM_REQUEST_TIMEOUT_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number")
|
||||
case RaftConfig.QUORUM_RETRY_BACKOFF_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number")
|
||||
case QuorumConfig.QUORUM_VOTERS_CONFIG => // ignore string
|
||||
case QuorumConfig.QUORUM_ELECTION_TIMEOUT_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number")
|
||||
case QuorumConfig.QUORUM_FETCH_TIMEOUT_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number")
|
||||
case QuorumConfig.QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number")
|
||||
case QuorumConfig.QUORUM_LINGER_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number")
|
||||
case QuorumConfig.QUORUM_REQUEST_TIMEOUT_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number")
|
||||
case QuorumConfig.QUORUM_RETRY_BACKOFF_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number")
|
||||
|
||||
// Remote Log Manager Configs
|
||||
case RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP => assertPropertyInvalid(baseProperties, name, "not_a_boolean")
|
||||
|
@ -1310,12 +1310,12 @@ class KafkaConfigTest {
|
|||
|
||||
@Test
|
||||
def testControllerQuorumVoterStringsToNodes(): Unit = {
|
||||
assertThrows(classOf[ConfigException], () => RaftConfig.quorumVoterStringsToNodes(Collections.singletonList("")))
|
||||
assertThrows(classOf[ConfigException], () => QuorumConfig.quorumVoterStringsToNodes(Collections.singletonList("")))
|
||||
assertEquals(Seq(new Node(3000, "example.com", 9093)),
|
||||
RaftConfig.quorumVoterStringsToNodes(util.Arrays.asList("3000@example.com:9093")).asScala.toSeq)
|
||||
QuorumConfig.quorumVoterStringsToNodes(util.Arrays.asList("3000@example.com:9093")).asScala.toSeq)
|
||||
assertEquals(Seq(new Node(3000, "example.com", 9093),
|
||||
new Node(3001, "example.com", 9094)),
|
||||
RaftConfig.quorumVoterStringsToNodes(util.Arrays.asList("3000@example.com:9093","3001@example.com:9094")).asScala.toSeq)
|
||||
QuorumConfig.quorumVoterStringsToNodes(util.Arrays.asList("3000@example.com:9093","3001@example.com:9094")).asScala.toSeq)
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -1362,7 +1362,7 @@ class KafkaConfigTest {
|
|||
private def assertValidQuorumVoters(value: String, expectedVoters: util.Map[Integer, AddressSpec]): Unit = {
|
||||
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
|
||||
props.setProperty(KafkaConfig.QuorumVotersProp, value)
|
||||
val raftConfig = new RaftConfig(KafkaConfig.fromProps(props))
|
||||
val raftConfig = new QuorumConfig(KafkaConfig.fromProps(props))
|
||||
assertEquals(expectedVoters, raftConfig.quorumVoterConnections())
|
||||
}
|
||||
|
||||
|
|
|
@ -76,7 +76,7 @@ import org.apache.kafka.common.config.{AbstractConfig, TopicConfig}
|
|||
import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
|
||||
import org.apache.kafka.metadata.properties.{MetaProperties, MetaPropertiesEnsemble, MetaPropertiesVersion, PropertiesUtils}
|
||||
import org.apache.kafka.network.SocketServerConfigs
|
||||
import org.apache.kafka.raft.RaftConfig
|
||||
import org.apache.kafka.raft.QuorumConfig
|
||||
import org.apache.kafka.server.config.{ReplicationConfigs, ServerLogConfigs}
|
||||
import org.apache.kafka.server.util.timer.MockTimer
|
||||
import org.apache.kafka.storage.internals.checkpoint.PartitionMetadataFile
|
||||
|
@ -2617,7 +2617,7 @@ class ReplicaManagerTest {
|
|||
val props = TestUtils.createBrokerConfig(1, TestUtils.MockZkConnect)
|
||||
if (zkMigrationEnabled) {
|
||||
props.put(KafkaConfig.MigrationEnabledProp, "" + zkMigrationEnabled)
|
||||
props.put(RaftConfig.QUORUM_VOTERS_CONFIG, "3000@localhost:9071")
|
||||
props.put(QuorumConfig.QUORUM_VOTERS_CONFIG, "3000@localhost:9071")
|
||||
props.put(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER")
|
||||
props.put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, "CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT")
|
||||
config = KafkaConfig.fromProps(props)
|
||||
|
|
|
@ -148,7 +148,7 @@ public class KafkaNetworkChannel implements NetworkChannel {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void updateEndpoint(int id, RaftConfig.InetAddressSpec spec) {
|
||||
public void updateEndpoint(int id, QuorumConfig.InetAddressSpec spec) {
|
||||
Node node = new Node(id, spec.address.getHostString(), spec.address.getPort());
|
||||
endpoints.put(id, node);
|
||||
}
|
||||
|
|
|
@ -158,7 +158,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
|
|||
private final RecordSerde<T> serde;
|
||||
private final MemoryPool memoryPool;
|
||||
private final RaftMessageQueue messageQueue;
|
||||
private final RaftConfig raftConfig;
|
||||
private final QuorumConfig quorumConfig;
|
||||
private final KafkaRaftMetrics kafkaRaftMetrics;
|
||||
private final QuorumState quorum;
|
||||
private final RequestManager requestManager;
|
||||
|
@ -184,7 +184,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
|
|||
LogContext logContext,
|
||||
String clusterId,
|
||||
OptionalInt nodeId,
|
||||
RaftConfig raftConfig
|
||||
QuorumConfig quorumConfig
|
||||
) {
|
||||
this(serde,
|
||||
channel,
|
||||
|
@ -200,7 +200,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
|
|||
nodeId,
|
||||
logContext,
|
||||
new Random(),
|
||||
raftConfig);
|
||||
quorumConfig);
|
||||
}
|
||||
|
||||
KafkaRaftClient(
|
||||
|
@ -218,7 +218,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
|
|||
OptionalInt nodeId,
|
||||
LogContext logContext,
|
||||
Random random,
|
||||
RaftConfig raftConfig
|
||||
QuorumConfig quorumConfig
|
||||
) {
|
||||
this.serde = serde;
|
||||
this.channel = channel;
|
||||
|
@ -232,16 +232,16 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
|
|||
this.fetchMaxWaitMs = fetchMaxWaitMs;
|
||||
this.logger = logContext.logger(KafkaRaftClient.class);
|
||||
this.random = random;
|
||||
this.raftConfig = raftConfig;
|
||||
this.quorumConfig = quorumConfig;
|
||||
this.snapshotCleaner = new RaftMetadataLogCleanerManager(logger, time, 60000, log::maybeClean);
|
||||
Set<Integer> quorumVoterIds = raftConfig.quorumVoterIds();
|
||||
this.requestManager = new RequestManager(quorumVoterIds, raftConfig.retryBackoffMs(),
|
||||
raftConfig.requestTimeoutMs(), random);
|
||||
Set<Integer> quorumVoterIds = quorumConfig.quorumVoterIds();
|
||||
this.requestManager = new RequestManager(quorumVoterIds, quorumConfig.retryBackoffMs(),
|
||||
quorumConfig.requestTimeoutMs(), random);
|
||||
this.quorum = new QuorumState(
|
||||
nodeId,
|
||||
quorumVoterIds,
|
||||
raftConfig.electionTimeoutMs(),
|
||||
raftConfig.fetchTimeoutMs(),
|
||||
quorumConfig.electionTimeoutMs(),
|
||||
quorumConfig.fetchTimeoutMs(),
|
||||
quorumStateStore,
|
||||
time,
|
||||
logContext,
|
||||
|
@ -252,10 +252,10 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
|
|||
kafkaRaftMetrics.updateNumUnknownVoterConnections(0);
|
||||
|
||||
// Update the voter endpoints with what's in RaftConfig
|
||||
Map<Integer, RaftConfig.AddressSpec> voterAddresses = raftConfig.quorumVoterConnections();
|
||||
Map<Integer, QuorumConfig.AddressSpec> voterAddresses = quorumConfig.quorumVoterConnections();
|
||||
voterAddresses.entrySet().stream()
|
||||
.filter(e -> e.getValue() instanceof RaftConfig.InetAddressSpec)
|
||||
.forEach(e -> this.channel.updateEndpoint(e.getKey(), (RaftConfig.InetAddressSpec) e.getValue()));
|
||||
.filter(e -> e.getValue() instanceof QuorumConfig.InetAddressSpec)
|
||||
.forEach(e -> this.channel.updateEndpoint(e.getKey(), (QuorumConfig.InetAddressSpec) e.getValue()));
|
||||
}
|
||||
|
||||
private void updateFollowerHighWatermark(
|
||||
|
@ -429,7 +429,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
|
|||
BatchAccumulator<T> accumulator = new BatchAccumulator<>(
|
||||
quorum.epoch(),
|
||||
endOffset,
|
||||
raftConfig.appendLingerMs(),
|
||||
quorumConfig.appendLingerMs(),
|
||||
MAX_BATCH_SIZE_BYTES,
|
||||
memoryPool,
|
||||
time,
|
||||
|
@ -658,7 +658,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
|
|||
}
|
||||
// upper limit exponential co-efficients at 20 to avoid overflow
|
||||
return Math.min(RETRY_BACKOFF_BASE_MS * random.nextInt(2 << Math.min(20, retries - 1)),
|
||||
raftConfig.electionBackoffMaxMs());
|
||||
quorumConfig.electionBackoffMaxMs());
|
||||
}
|
||||
|
||||
private int strictExponentialElectionBackoffMs(int positionInSuccessors, int totalNumSuccessors) {
|
||||
|
@ -667,8 +667,8 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
|
|||
" and smaller than total number of successors " + totalNumSuccessors);
|
||||
}
|
||||
|
||||
int retryBackOffBaseMs = raftConfig.electionBackoffMaxMs() >> (totalNumSuccessors - 1);
|
||||
return Math.min(raftConfig.electionBackoffMaxMs(), retryBackOffBaseMs << (positionInSuccessors - 1));
|
||||
int retryBackOffBaseMs = quorumConfig.electionBackoffMaxMs() >> (totalNumSuccessors - 1);
|
||||
return Math.min(quorumConfig.electionBackoffMaxMs(), retryBackOffBaseMs << (positionInSuccessors - 1));
|
||||
}
|
||||
|
||||
private BeginQuorumEpochResponseData buildBeginQuorumEpochResponse(Errors partitionLevelError) {
|
||||
|
|
|
@ -37,7 +37,7 @@ public interface NetworkChannel extends AutoCloseable {
|
|||
/**
|
||||
* Update connection information for the given id.
|
||||
*/
|
||||
void updateEndpoint(int id, RaftConfig.InetAddressSpec address);
|
||||
void updateEndpoint(int id, QuorumConfig.InetAddressSpec address);
|
||||
|
||||
default void close() throws InterruptedException {}
|
||||
|
||||
|
|
|
@ -47,7 +47,7 @@ import java.util.stream.Collectors;
|
|||
* controller should be able to transition from standby to active without reloading all of
|
||||
* the metadata. The standby is a "hot" standby, not a "cold" one.
|
||||
*/
|
||||
public class RaftConfig {
|
||||
public class QuorumConfig {
|
||||
|
||||
private static final String QUORUM_PREFIX = "controller.quorum.";
|
||||
|
||||
|
@ -138,7 +138,7 @@ public class RaftConfig {
|
|||
}
|
||||
}
|
||||
|
||||
public RaftConfig(AbstractConfig abstractConfig) {
|
||||
public QuorumConfig(AbstractConfig abstractConfig) {
|
||||
this(parseVoterConnections(abstractConfig.getList(QUORUM_VOTERS_CONFIG)),
|
||||
abstractConfig.getInt(QUORUM_REQUEST_TIMEOUT_MS_CONFIG),
|
||||
abstractConfig.getInt(QUORUM_RETRY_BACKOFF_MS_CONFIG),
|
||||
|
@ -148,7 +148,7 @@ public class RaftConfig {
|
|||
abstractConfig.getInt(QUORUM_LINGER_MS_CONFIG));
|
||||
}
|
||||
|
||||
public RaftConfig(
|
||||
public QuorumConfig(
|
||||
Map<Integer, AddressSpec> voterConnections,
|
||||
int requestTimeoutMs,
|
||||
int retryBackoffMs,
|
||||
|
@ -245,7 +245,7 @@ public class RaftConfig {
|
|||
return voterConnectionsToNodes(parseVoterConnections(voters));
|
||||
}
|
||||
|
||||
public static List<Node> voterConnectionsToNodes(Map<Integer, RaftConfig.AddressSpec> voterConnections) {
|
||||
public static List<Node> voterConnectionsToNodes(Map<Integer, QuorumConfig.AddressSpec> voterConnections) {
|
||||
return voterConnections.entrySet().stream()
|
||||
.filter(Objects::nonNull)
|
||||
.filter(connection -> connection.getValue() instanceof InetAddressSpec)
|
|
@ -108,7 +108,7 @@ public class KafkaNetworkChannelTest {
|
|||
public void testSendToBlackedOutDestination() throws ExecutionException, InterruptedException {
|
||||
int destinationId = 2;
|
||||
Node destinationNode = new Node(destinationId, "127.0.0.1", 9092);
|
||||
channel.updateEndpoint(destinationId, new RaftConfig.InetAddressSpec(
|
||||
channel.updateEndpoint(destinationId, new QuorumConfig.InetAddressSpec(
|
||||
new InetSocketAddress(destinationNode.host(), destinationNode.port())));
|
||||
client.backoff(destinationNode, 500);
|
||||
assertBrokerNotAvailable(destinationId);
|
||||
|
@ -118,7 +118,7 @@ public class KafkaNetworkChannelTest {
|
|||
public void testWakeupClientOnSend() throws InterruptedException, ExecutionException {
|
||||
int destinationId = 2;
|
||||
Node destinationNode = new Node(destinationId, "127.0.0.1", 9092);
|
||||
channel.updateEndpoint(destinationId, new RaftConfig.InetAddressSpec(
|
||||
channel.updateEndpoint(destinationId, new QuorumConfig.InetAddressSpec(
|
||||
new InetSocketAddress(destinationNode.host(), destinationNode.port())));
|
||||
|
||||
client.enableBlockingUntilWakeup(1);
|
||||
|
@ -145,7 +145,7 @@ public class KafkaNetworkChannelTest {
|
|||
public void testSendAndDisconnect() throws ExecutionException, InterruptedException {
|
||||
int destinationId = 2;
|
||||
Node destinationNode = new Node(destinationId, "127.0.0.1", 9092);
|
||||
channel.updateEndpoint(destinationId, new RaftConfig.InetAddressSpec(
|
||||
channel.updateEndpoint(destinationId, new QuorumConfig.InetAddressSpec(
|
||||
new InetSocketAddress(destinationNode.host(), destinationNode.port())));
|
||||
|
||||
for (ApiKeys apiKey : RAFT_APIS) {
|
||||
|
@ -159,7 +159,7 @@ public class KafkaNetworkChannelTest {
|
|||
public void testSendAndFailAuthentication() throws ExecutionException, InterruptedException {
|
||||
int destinationId = 2;
|
||||
Node destinationNode = new Node(destinationId, "127.0.0.1", 9092);
|
||||
channel.updateEndpoint(destinationId, new RaftConfig.InetAddressSpec(
|
||||
channel.updateEndpoint(destinationId, new QuorumConfig.InetAddressSpec(
|
||||
new InetSocketAddress(destinationNode.host(), destinationNode.port())));
|
||||
|
||||
for (ApiKeys apiKey : RAFT_APIS) {
|
||||
|
@ -181,7 +181,7 @@ public class KafkaNetworkChannelTest {
|
|||
public void testSendAndReceiveOutboundRequest() throws ExecutionException, InterruptedException {
|
||||
int destinationId = 2;
|
||||
Node destinationNode = new Node(destinationId, "127.0.0.1", 9092);
|
||||
channel.updateEndpoint(destinationId, new RaftConfig.InetAddressSpec(
|
||||
channel.updateEndpoint(destinationId, new QuorumConfig.InetAddressSpec(
|
||||
new InetSocketAddress(destinationNode.host(), destinationNode.port())));
|
||||
|
||||
for (ApiKeys apiKey : RAFT_APIS) {
|
||||
|
@ -197,7 +197,7 @@ public class KafkaNetworkChannelTest {
|
|||
public void testUnsupportedVersionError() throws ExecutionException, InterruptedException {
|
||||
int destinationId = 2;
|
||||
Node destinationNode = new Node(destinationId, "127.0.0.1", 9092);
|
||||
channel.updateEndpoint(destinationId, new RaftConfig.InetAddressSpec(
|
||||
channel.updateEndpoint(destinationId, new QuorumConfig.InetAddressSpec(
|
||||
new InetSocketAddress(destinationNode.host(), destinationNode.port())));
|
||||
|
||||
for (ApiKeys apiKey : RAFT_APIS) {
|
||||
|
@ -211,7 +211,7 @@ public class KafkaNetworkChannelTest {
|
|||
public void testFetchRequestDowngrade(short version) {
|
||||
int destinationId = 2;
|
||||
Node destinationNode = new Node(destinationId, "127.0.0.1", 9092);
|
||||
channel.updateEndpoint(destinationId, new RaftConfig.InetAddressSpec(
|
||||
channel.updateEndpoint(destinationId, new QuorumConfig.InetAddressSpec(
|
||||
new InetSocketAddress(destinationNode.host(), destinationNode.port())));
|
||||
sendTestRequest(ApiKeys.FETCH, destinationId);
|
||||
channel.pollOnce();
|
||||
|
|
|
@ -57,7 +57,7 @@ public class MockNetworkChannel implements NetworkChannel {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void updateEndpoint(int id, RaftConfig.InetAddressSpec address) {
|
||||
public void updateEndpoint(int id, QuorumConfig.InetAddressSpec address) {
|
||||
// empty
|
||||
}
|
||||
|
||||
|
|
|
@ -236,9 +236,9 @@ public final class RaftClientTestContext {
|
|||
Metrics metrics = new Metrics(time);
|
||||
MockNetworkChannel channel = new MockNetworkChannel(voters);
|
||||
MockListener listener = new MockListener(localId);
|
||||
Map<Integer, RaftConfig.AddressSpec> voterAddressMap = voters.stream()
|
||||
Map<Integer, QuorumConfig.AddressSpec> voterAddressMap = voters.stream()
|
||||
.collect(Collectors.toMap(id -> id, RaftClientTestContext::mockAddress));
|
||||
RaftConfig raftConfig = new RaftConfig(voterAddressMap, requestTimeoutMs, RETRY_BACKOFF_MS, electionTimeoutMs,
|
||||
QuorumConfig quorumConfig = new QuorumConfig(voterAddressMap, requestTimeoutMs, RETRY_BACKOFF_MS, electionTimeoutMs,
|
||||
ELECTION_BACKOFF_MAX_MS, FETCH_TIMEOUT_MS, appendLingerMs);
|
||||
|
||||
KafkaRaftClient<String> client = new KafkaRaftClient<>(
|
||||
|
@ -256,7 +256,7 @@ public final class RaftClientTestContext {
|
|||
localId,
|
||||
logContext,
|
||||
random,
|
||||
raftConfig
|
||||
quorumConfig
|
||||
);
|
||||
|
||||
client.register(listener);
|
||||
|
@ -817,8 +817,8 @@ public final class RaftClientTestContext {
|
|||
return requests;
|
||||
}
|
||||
|
||||
private static RaftConfig.AddressSpec mockAddress(int id) {
|
||||
return new RaftConfig.InetAddressSpec(new InetSocketAddress("localhost", 9990 + id));
|
||||
private static QuorumConfig.AddressSpec mockAddress(int id) {
|
||||
return new QuorumConfig.InetAddressSpec(new InetSocketAddress("localhost", 9990 + id));
|
||||
}
|
||||
|
||||
EndQuorumEpochResponseData endEpochResponse(
|
||||
|
|
|
@ -708,8 +708,8 @@ public class RaftEventSimulationTest {
|
|||
nodes.put(nodeId, new PersistentState(nodeId));
|
||||
}
|
||||
|
||||
private static RaftConfig.AddressSpec nodeAddress(int id) {
|
||||
return new RaftConfig.InetAddressSpec(new InetSocketAddress("localhost", 9990 + id));
|
||||
private static QuorumConfig.AddressSpec nodeAddress(int id) {
|
||||
return new QuorumConfig.InetAddressSpec(new InetSocketAddress("localhost", 9990 + id));
|
||||
}
|
||||
|
||||
void start(int nodeId) {
|
||||
|
@ -717,9 +717,9 @@ public class RaftEventSimulationTest {
|
|||
PersistentState persistentState = nodes.get(nodeId);
|
||||
MockNetworkChannel channel = new MockNetworkChannel(correlationIdCounter, voters);
|
||||
MockMessageQueue messageQueue = new MockMessageQueue();
|
||||
Map<Integer, RaftConfig.AddressSpec> voterAddressMap = voters.stream()
|
||||
Map<Integer, QuorumConfig.AddressSpec> voterAddressMap = voters.stream()
|
||||
.collect(Collectors.toMap(id -> id, Cluster::nodeAddress));
|
||||
RaftConfig raftConfig = new RaftConfig(voterAddressMap, REQUEST_TIMEOUT_MS, RETRY_BACKOFF_MS, ELECTION_TIMEOUT_MS,
|
||||
QuorumConfig quorumConfig = new QuorumConfig(voterAddressMap, REQUEST_TIMEOUT_MS, RETRY_BACKOFF_MS, ELECTION_TIMEOUT_MS,
|
||||
ELECTION_JITTER_MS, FETCH_TIMEOUT_MS, LINGER_MS);
|
||||
Metrics metrics = new Metrics(time);
|
||||
|
||||
|
@ -743,7 +743,7 @@ public class RaftEventSimulationTest {
|
|||
OptionalInt.of(nodeId),
|
||||
logContext,
|
||||
random,
|
||||
raftConfig
|
||||
quorumConfig
|
||||
);
|
||||
RaftNode node = new RaftNode(
|
||||
nodeId,
|
||||
|
|
|
@ -18,7 +18,7 @@ package org.apache.kafka.server.config;
|
|||
|
||||
import org.apache.kafka.clients.CommonClientConfigs;
|
||||
import org.apache.kafka.common.metrics.Sensor;
|
||||
import org.apache.kafka.raft.RaftConfig;
|
||||
import org.apache.kafka.raft.QuorumConfig;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
@ -87,11 +87,11 @@ public class Defaults {
|
|||
|
||||
|
||||
/** ********* Raft Quorum Configuration *********/
|
||||
public static final List<String> QUORUM_VOTERS = RaftConfig.DEFAULT_QUORUM_VOTERS;
|
||||
public static final int QUORUM_ELECTION_TIMEOUT_MS = RaftConfig.DEFAULT_QUORUM_ELECTION_TIMEOUT_MS;
|
||||
public static final int QUORUM_FETCH_TIMEOUT_MS = RaftConfig.DEFAULT_QUORUM_FETCH_TIMEOUT_MS;
|
||||
public static final int QUORUM_ELECTION_BACKOFF_MS = RaftConfig.DEFAULT_QUORUM_ELECTION_BACKOFF_MAX_MS;
|
||||
public static final int QUORUM_LINGER_MS = RaftConfig.DEFAULT_QUORUM_LINGER_MS;
|
||||
public static final int QUORUM_REQUEST_TIMEOUT_MS = RaftConfig.DEFAULT_QUORUM_REQUEST_TIMEOUT_MS;
|
||||
public static final int QUORUM_RETRY_BACKOFF_MS = RaftConfig.DEFAULT_QUORUM_RETRY_BACKOFF_MS;
|
||||
public static final List<String> QUORUM_VOTERS = QuorumConfig.DEFAULT_QUORUM_VOTERS;
|
||||
public static final int QUORUM_ELECTION_TIMEOUT_MS = QuorumConfig.DEFAULT_QUORUM_ELECTION_TIMEOUT_MS;
|
||||
public static final int QUORUM_FETCH_TIMEOUT_MS = QuorumConfig.DEFAULT_QUORUM_FETCH_TIMEOUT_MS;
|
||||
public static final int QUORUM_ELECTION_BACKOFF_MS = QuorumConfig.DEFAULT_QUORUM_ELECTION_BACKOFF_MAX_MS;
|
||||
public static final int QUORUM_LINGER_MS = QuorumConfig.DEFAULT_QUORUM_LINGER_MS;
|
||||
public static final int QUORUM_REQUEST_TIMEOUT_MS = QuorumConfig.DEFAULT_QUORUM_REQUEST_TIMEOUT_MS;
|
||||
public static final int QUORUM_RETRY_BACKOFF_MS = QuorumConfig.DEFAULT_QUORUM_RETRY_BACKOFF_MS;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue