MINOR: Drop enable.metadata.quorum config (#9934)

The primary purpose of this patch is to remove the internal `enable.metadata.quorum` configuration. Instead, we rely on `process.roles` to determine if the self-managed quorum has been enabled. As a part of this, I've done the following:

1. Replace the notion of "disabled" APIs with "controller-only" APIs. We previously marked some APIs which were intended only for the KIP-500 as "disabled" so that they would not be unintentionally exposed. For example, the Raft quorum APIs were disabled. Marking them as "controller-only" carries the same effect, but makes the intent that they should be only exposed by the KIP-500 controller clearer.
2. Make `ForwardingManager` optional in `KafkaServer` and `KafkaApis`. Previously we used `null` if forwarding was enabled and relied on the metadata quorum check.
3. Make `zookeeper.connect` an optional configuration if `process.roles` is defined.
4. Update raft README to remove reference to `zookeeper.conntect`

Reviewers: Colin Patrick McCabe <cmccabe@confluent.io>, Boyang Chen <boyang@confluent.io>
This commit is contained in:
Jason Gustafson 2021-01-21 15:16:15 -08:00 committed by GitHub
parent fea2f65929
commit 9689a313f5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 132 additions and 110 deletions

View File

@ -62,7 +62,7 @@ public class NodeApiVersions {
*/
public static NodeApiVersions create(Collection<ApiVersion> overrides) {
List<ApiVersion> apiVersions = new LinkedList<>(overrides);
for (ApiKeys apiKey : ApiKeys.enabledApis()) {
for (ApiKeys apiKey : ApiKeys.brokerApis()) {
boolean exists = false;
for (ApiVersion apiVersion : apiVersions) {
if (apiVersion.apiKey() == apiKey.id) {
@ -170,7 +170,7 @@ public class NodeApiVersions {
// Also handle the case where some apiKey types are not specified at all in the given ApiVersions,
// which may happen when the remote is too old.
for (ApiKeys apiKey : ApiKeys.enabledApis()) {
for (ApiKeys apiKey : ApiKeys.brokerApis()) {
if (!apiKeysText.containsKey(apiKey.id)) {
StringBuilder bld = new StringBuilder();
bld.append(apiKey.name).append("(").

View File

@ -90,15 +90,15 @@ public enum ApiKeys {
ALTER_CLIENT_QUOTAS(ApiMessageType.ALTER_CLIENT_QUOTAS, false, true),
DESCRIBE_USER_SCRAM_CREDENTIALS(ApiMessageType.DESCRIBE_USER_SCRAM_CREDENTIALS),
ALTER_USER_SCRAM_CREDENTIALS(ApiMessageType.ALTER_USER_SCRAM_CREDENTIALS, false, true),
VOTE(ApiMessageType.VOTE, true, RecordBatch.MAGIC_VALUE_V0, false, false),
BEGIN_QUORUM_EPOCH(ApiMessageType.BEGIN_QUORUM_EPOCH, true, RecordBatch.MAGIC_VALUE_V0, false, false),
END_QUORUM_EPOCH(ApiMessageType.END_QUORUM_EPOCH, true, RecordBatch.MAGIC_VALUE_V0, false, false),
DESCRIBE_QUORUM(ApiMessageType.DESCRIBE_QUORUM, true, RecordBatch.MAGIC_VALUE_V0, false, false),
VOTE(ApiMessageType.VOTE, true, RecordBatch.MAGIC_VALUE_V0, false, true),
BEGIN_QUORUM_EPOCH(ApiMessageType.BEGIN_QUORUM_EPOCH, true, RecordBatch.MAGIC_VALUE_V0, false, true),
END_QUORUM_EPOCH(ApiMessageType.END_QUORUM_EPOCH, true, RecordBatch.MAGIC_VALUE_V0, false, true),
DESCRIBE_QUORUM(ApiMessageType.DESCRIBE_QUORUM, true, RecordBatch.MAGIC_VALUE_V0, false, true),
ALTER_ISR(ApiMessageType.ALTER_ISR, true),
UPDATE_FEATURES(ApiMessageType.UPDATE_FEATURES, false, true),
ENVELOPE(ApiMessageType.ENVELOPE, true, RecordBatch.MAGIC_VALUE_V0, false, false),
FETCH_SNAPSHOT(ApiMessageType.FETCH_SNAPSHOT, false, RecordBatch.MAGIC_VALUE_V0, false, false),
DESCRIBE_CLUSTER(ApiMessageType.DESCRIBE_CLUSTER, false, RecordBatch.MAGIC_VALUE_V0, false, true);
ENVELOPE(ApiMessageType.ENVELOPE, true, RecordBatch.MAGIC_VALUE_V0, false, true),
FETCH_SNAPSHOT(ApiMessageType.FETCH_SNAPSHOT, false, RecordBatch.MAGIC_VALUE_V0, false, true),
DESCRIBE_CLUSTER(ApiMessageType.DESCRIBE_CLUSTER);
// The generator ensures every `ApiMessageType` has a unique id
private static final Map<Integer, ApiKeys> ID_TO_TYPE = Arrays.stream(ApiKeys.values())
@ -116,8 +116,8 @@ public enum ApiKeys {
/** indicates the minimum required inter broker magic required to support the API */
public final byte minRequiredInterBrokerMagic;
/** indicates whether the API is enabled and should be exposed in ApiVersions **/
public final boolean isEnabled;
/** indicates whether this is an API which is only exposed by the KIP-500 controller **/
public final boolean isControllerOnlyApi;
/** indicates whether the API is enabled for forwarding **/
public final boolean forwardable;
@ -139,7 +139,7 @@ public enum ApiKeys {
}
ApiKeys(ApiMessageType messageType, boolean clusterAction, byte minRequiredInterBrokerMagic, boolean forwardable) {
this(messageType, clusterAction, minRequiredInterBrokerMagic, forwardable, true);
this(messageType, clusterAction, minRequiredInterBrokerMagic, forwardable, false);
}
ApiKeys(
@ -147,14 +147,14 @@ public enum ApiKeys {
boolean clusterAction,
byte minRequiredInterBrokerMagic,
boolean forwardable,
boolean isEnabled
boolean isControllerOnlyApi
) {
this.messageType = messageType;
this.id = messageType.apiKey();
this.name = messageType.name;
this.clusterAction = clusterAction;
this.minRequiredInterBrokerMagic = minRequiredInterBrokerMagic;
this.isEnabled = isEnabled;
this.isControllerOnlyApi = isControllerOnlyApi;
this.requiresDelayedAllocation = forwardable || shouldRetainsBufferReference(messageType.requestSchemas());
this.forwardable = forwardable;
@ -210,7 +210,7 @@ public enum ApiKeys {
b.append("<th>Name</th>\n");
b.append("<th>Key</th>\n");
b.append("</tr>");
for (ApiKeys key : ApiKeys.enabledApis()) {
for (ApiKeys key : ApiKeys.brokerApis()) {
b.append("<tr>\n");
b.append("<td>");
b.append("<a href=\"#The_Messages_" + key.name + "\">" + key.name + "</a>");
@ -242,9 +242,9 @@ public enum ApiKeys {
return hasBuffer.get();
}
public static List<ApiKeys> enabledApis() {
public static List<ApiKeys> brokerApis() {
return Arrays.stream(values())
.filter(api -> api.isEnabled)
.filter(api -> !api.isControllerOnlyApi)
.collect(Collectors.toList());
}

View File

@ -133,7 +133,7 @@ public class Protocol {
b.append("</pre>\n");
schemaToFieldTableHtml(ResponseHeaderData.SCHEMAS[i], b);
}
for (ApiKeys key : ApiKeys.enabledApis()) {
for (ApiKeys key : ApiKeys.brokerApis()) {
// Key
b.append("<h5>");
b.append("<a name=\"The_Messages_" + key.name + "\">");

View File

@ -119,7 +119,7 @@ public class ApiVersionsResponse extends AbstractResponse {
public static ApiVersionCollection defaultApiKeys(final byte minMagic) {
ApiVersionCollection apiKeys = new ApiVersionCollection();
for (ApiKeys apiKey : ApiKeys.enabledApis()) {
for (ApiKeys apiKey : ApiKeys.brokerApis()) {
if (apiKey.minRequiredInterBrokerMagic <= minMagic) {
apiKeys.add(ApiVersionsResponse.toApiVersion(apiKey));
}
@ -137,7 +137,7 @@ public class ApiVersionsResponse extends AbstractResponse {
public static ApiVersionCollection intersectControllerApiVersions(final byte minMagic,
final Map<ApiKeys, ApiVersion> activeControllerApiVersions) {
ApiVersionCollection apiKeys = new ApiVersionCollection();
for (ApiKeys apiKey : ApiKeys.enabledApis()) {
for (ApiKeys apiKey : ApiKeys.brokerApis()) {
if (apiKey.minRequiredInterBrokerMagic <= minMagic) {
ApiVersion brokerApiVersion = toApiVersion(apiKey);

View File

@ -38,7 +38,7 @@ public class NodeApiVersionsTest {
NodeApiVersions versions = new NodeApiVersions(new ApiVersionCollection());
StringBuilder bld = new StringBuilder();
String prefix = "(";
for (ApiKeys apiKey : ApiKeys.enabledApis()) {
for (ApiKeys apiKey : ApiKeys.brokerApis()) {
bld.append(prefix).append(apiKey.name).
append("(").append(apiKey.id).append("): UNSUPPORTED");
prefix = ", ";
@ -143,10 +143,10 @@ public class NodeApiVersionsTest {
.setMaxVersion((short) 1));
NodeApiVersions versions = new NodeApiVersions(versionList);
for (ApiKeys apiKey: ApiKeys.values()) {
if (apiKey.isEnabled) {
assertEquals(apiKey.latestVersion(), versions.latestUsableVersion(apiKey));
} else {
if (apiKey.isControllerOnlyApi) {
assertNull(versions.apiVersion(apiKey));
} else {
assertEquals(apiKey.latestVersion(), versions.latestUsableVersion(apiKey));
}
}
}

View File

@ -40,7 +40,7 @@ public class ApiVersionsResponseTest {
@Test
public void shouldCreateApiResponseThatHasAllApiKeysSupportedByBroker() {
assertEquals(apiKeysInResponse(ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE), new HashSet<>(ApiKeys.enabledApis()));
assertEquals(apiKeysInResponse(ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE), new HashSet<>(ApiKeys.brokerApis()));
assertTrue(ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE.data().supportedFeatures().isEmpty());
assertTrue(ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE.data().finalizedFeatures().isEmpty());
assertEquals(ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH, ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE.data().finalizedFeaturesEpoch());
@ -49,9 +49,9 @@ public class ApiVersionsResponseTest {
@Test
public void shouldHaveCorrectDefaultApiVersionsResponse() {
Collection<ApiVersion> apiVersions = ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE.data().apiKeys();
assertEquals(apiVersions.size(), ApiKeys.enabledApis().size(), "API versions for all API keys must be maintained.");
assertEquals(apiVersions.size(), ApiKeys.brokerApis().size(), "API versions for all API keys must be maintained.");
for (ApiKeys key : ApiKeys.enabledApis()) {
for (ApiKeys key : ApiKeys.brokerApis()) {
ApiVersion version = ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE.apiVersion(key.id);
assertNotNull(version, "Could not find ApiVersion for API " + key.name);
assertEquals(version.minVersion(), key.oldestVersion(), "Incorrect min version for Api " + key.name);

View File

@ -65,11 +65,12 @@ object Kafka extends Logging {
private def buildServer(props: Properties): Server = {
val config = KafkaConfig.fromProps(props, false)
if (config.processRoles.isEmpty) {
if (config.requiresZookeeper) {
new KafkaServer(
config,
Time.SYSTEM,
threadNamePrefix = None
threadNamePrefix = None,
enableForwarding = false
)
} else {
new KafkaRaftServer(

View File

@ -59,11 +59,11 @@ object RequestChannel extends Logging {
val sanitizedUser: String = Sanitizer.sanitize(principal.getName)
}
class Metrics(allowDisabledApis: Boolean = false) {
class Metrics(allowControllerOnlyApis: Boolean = false) {
private val metricsMap = mutable.Map[String, RequestMetrics]()
(ApiKeys.values.toSeq.filter(_.isEnabled || allowDisabledApis).map(_.name) ++
(ApiKeys.values.toSeq.filter(!_.isControllerOnlyApi || allowControllerOnlyApis).map(_.name) ++
Seq(RequestMetrics.consumerFetchMetricName, RequestMetrics.followFetchMetricName)).foreach { name =>
metricsMap.put(name, new RequestMetrics(name))
}
@ -337,9 +337,9 @@ object RequestChannel extends Logging {
class RequestChannel(val queueSize: Int,
val metricNamePrefix: String,
time: Time,
allowDisabledApis: Boolean = false) extends KafkaMetricsGroup {
allowControllerOnlyApis: Boolean = false) extends KafkaMetricsGroup {
import RequestChannel._
val metrics = new RequestChannel.Metrics(allowDisabledApis)
val metrics = new RequestChannel.Metrics(allowControllerOnlyApis)
private val requestQueue = new ArrayBlockingQueue[BaseRequest](queueSize)
private val processors = new ConcurrentHashMap[Int, Processor]()
val requestQueueSizeMetricName = metricNamePrefix.concat(RequestQueueSizeMetric)

View File

@ -77,7 +77,7 @@ class SocketServer(val config: KafkaConfig,
val metrics: Metrics,
val time: Time,
val credentialProvider: CredentialProvider,
val allowDisabledApis: Boolean = false)
val allowControllerOnlyApis: Boolean = false)
extends Logging with KafkaMetricsGroup with BrokerReconfigurable {
private val maxQueuedRequests = config.queuedMaxRequests
@ -93,12 +93,12 @@ class SocketServer(val config: KafkaConfig,
// data-plane
private val dataPlaneProcessors = new ConcurrentHashMap[Int, Processor]()
private[network] val dataPlaneAcceptors = new ConcurrentHashMap[EndPoint, Acceptor]()
val dataPlaneRequestChannel = new RequestChannel(maxQueuedRequests, DataPlaneMetricPrefix, time, allowDisabledApis)
val dataPlaneRequestChannel = new RequestChannel(maxQueuedRequests, DataPlaneMetricPrefix, time, allowControllerOnlyApis)
// control-plane
private var controlPlaneProcessorOpt : Option[Processor] = None
private[network] var controlPlaneAcceptorOpt : Option[Acceptor] = None
val controlPlaneRequestChannelOpt: Option[RequestChannel] = config.controlPlaneListenerName.map(_ =>
new RequestChannel(20, ControlPlaneMetricPrefix, time, allowDisabledApis))
new RequestChannel(20, ControlPlaneMetricPrefix, time, allowControllerOnlyApis))
private var nextProcessorId = 0
val connectionQuotas = new ConnectionQuotas(config, time, metrics)
@ -429,7 +429,7 @@ class SocketServer(val config: KafkaConfig,
memoryPool,
logContext,
isPrivilegedListener = isPrivilegedListener,
allowDisabledApis = allowDisabledApis
allowControllerOnlyApis = allowControllerOnlyApis
)
}
@ -790,7 +790,7 @@ private[kafka] class Processor(val id: Int,
logContext: LogContext,
connectionQueueSize: Int = ConnectionQueueSize,
isPrivilegedListener: Boolean = false,
allowDisabledApis: Boolean = false) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {
allowControllerOnlyApis: Boolean = false) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {
private object ConnectionId {
def fromString(s: String): Option[ConnectionId] = s.split("-") match {
@ -981,10 +981,10 @@ private[kafka] class Processor(val id: Int,
protected def parseRequestHeader(buffer: ByteBuffer): RequestHeader = {
val header = RequestHeader.parse(buffer)
if (header.apiKey.isEnabled || allowDisabledApis) {
if (!header.apiKey.isControllerOnlyApi || allowControllerOnlyApis) {
header
} else {
throw new InvalidRequestException("Received request for disabled api key " + header.apiKey)
throw new InvalidRequestException("Received request for KIP-500 controller-only api key " + header.apiKey)
}
}

View File

@ -36,7 +36,7 @@ trait ForwardingManager {
responseCallback: Option[AbstractResponse] => Unit
): Unit
def controllerApiVersions(): Option[NodeApiVersions]
def controllerApiVersions: Option[NodeApiVersions]
def start(): Unit = {}
@ -140,7 +140,7 @@ class ForwardingManagerImpl(
channelManager.sendRequest(envelopeRequest, new ForwardingResponseHandler)
}
override def controllerApiVersions(): Option[NodeApiVersions] =
override def controllerApiVersions: Option[NodeApiVersions] =
channelManager.controllerApiVersions()
private def parseResponse(

View File

@ -101,7 +101,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val groupCoordinator: GroupCoordinator,
val txnCoordinator: TransactionCoordinator,
val controller: KafkaController,
val forwardingManager: ForwardingManager,
val forwardingManager: Option[ForwardingManager],
val zkClient: KafkaZkClient,
val brokerId: Int,
val config: KafkaConfig,
@ -131,7 +131,7 @@ class KafkaApis(val requestChannel: RequestChannel,
}
private def isForwardingEnabled(request: RequestChannel.Request): Boolean = {
config.metadataQuorumEnabled && request.context.principalSerde.isPresent
forwardingManager.isDefined && request.context.principalSerde.isPresent
}
private def maybeForwardToController(
@ -149,12 +149,12 @@ class KafkaApis(val requestChannel: RequestChannel,
}
}
if (!request.isForwarded && !controller.isActive && isForwardingEnabled(request)) {
forwardingManager.forwardRequest(request, responseCallback)
} else {
// When the KIP-500 mode is off or the principal serde is undefined, forwarding is not supported,
// therefore requests are handled directly.
handler(request)
forwardingManager match {
case Some(mgr) if !request.isForwarded && !controller.isActive =>
mgr.forwardRequest(request, responseCallback)
case _ =>
handler(request)
}
}
@ -1742,11 +1742,7 @@ class KafkaApis(val requestChannel: RequestChannel,
else {
val supportedFeatures = brokerFeatures.supportedFeatures
val finalizedFeaturesOpt = finalizedFeatureCache.get
val controllerApiVersions = if (isForwardingEnabled(request)) {
forwardingManager.controllerApiVersions()
} else {
None
}
val controllerApiVersions = forwardingManager.flatMap(_.controllerApiVersions)
val apiVersionsResponse =
finalizedFeaturesOpt match {
@ -3251,9 +3247,9 @@ class KafkaApis(val requestChannel: RequestChannel,
// If forwarding is not yet enabled or this request has been received on an invalid endpoint,
// then we treat the request as unparsable and close the connection.
if (!config.metadataQuorumEnabled) {
if (!isForwardingEnabled(request)) {
info(s"Closing connection ${request.context.connectionId} because it sent an `Envelope` " +
s"request, which is not accepted without enabling the internal config ${KafkaConfig.EnableMetadataQuorumProp}")
"request even though forwarding has not been enabled")
requestHelper.closeConnection(request, Collections.emptyMap())
return
} else if (!request.context.fromPrivilegedListener) {
@ -3279,7 +3275,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val forwardedRequestHeader = parseForwardedRequestHeader(forwardedRequestBuffer)
val forwardedApi = forwardedRequestHeader.apiKey
if (!forwardedApi.forwardable || !forwardedApi.isEnabled) {
if (!forwardedApi.forwardable) {
throw new InvalidRequestException(s"API $forwardedApi is not enabled or is not eligible for forwarding")
}

View File

@ -363,7 +363,6 @@ object KafkaConfig {
val RequestTimeoutMsProp = CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG
val ConnectionSetupTimeoutMsProp = CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG
val ConnectionSetupTimeoutMaxMsProp = CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG
val EnableMetadataQuorumProp = "enable.metadata.quorum"
val ProcessRolesProp = "process.roles"
/************* Authorizer Configuration ***********/
@ -648,6 +647,9 @@ object KafkaConfig {
val RequestTimeoutMsDoc = CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC
val ConnectionSetupTimeoutMsDoc = CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_DOC
val ConnectionSetupTimeoutMaxMsDoc = CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_DOC
val ProcessRolesDoc = "The roles that this process plays: 'broker', 'controller', or 'broker,controller' if it is both. " +
"This configuration is only for clusters upgraded for KIP-500, which replaces the dependence on Zookeeper with " +
"a self-managed Raft quorum. Leave this config undefined or empty for Zookeeper clusters."
/************* Authorizer Configuration ***********/
val AuthorizerClassNameDoc = s"The fully qualified name of a class that implements s${classOf[Authorizer].getName}" +
" interface, which is used by the broker for authorization. This config also supports authorizers that implement the deprecated" +
@ -997,9 +999,6 @@ object KafkaConfig {
val PasswordEncoderKeyLengthDoc = "The key length used for encoding dynamically configured passwords."
val PasswordEncoderIterationsDoc = "The iteration count used for encoding dynamically configured passwords."
/** ********* Experimental metadata quorum configuration ***********/
val ProcessRolesDoc = "This configuration determines what roles this process should play: broker, controller, or both"
private val configDef = {
import ConfigDef.Importance._
import ConfigDef.Range._
@ -1009,7 +1008,7 @@ object KafkaConfig {
new ConfigDef()
/** ********* Zookeeper Configuration ***********/
.define(ZkConnectProp, STRING, HIGH, ZkConnectDoc)
.define(ZkConnectProp, STRING, null, HIGH, ZkConnectDoc)
.define(ZkSessionTimeoutMsProp, INT, Defaults.ZkSessionTimeoutMs, HIGH, ZkSessionTimeoutMsDoc)
.define(ZkConnectionTimeoutMsProp, INT, null, HIGH, ZkConnectionTimeoutMsDoc)
.define(ZkSyncTimeMsProp, INT, Defaults.ZkSyncTimeMs, LOW, ZkSyncTimeMsDoc)
@ -1044,9 +1043,6 @@ object KafkaConfig {
.define(RequestTimeoutMsProp, INT, Defaults.RequestTimeoutMs, HIGH, RequestTimeoutMsDoc)
.define(ConnectionSetupTimeoutMsProp, LONG, Defaults.ConnectionSetupTimeoutMs, MEDIUM, ConnectionSetupTimeoutMsDoc)
.define(ConnectionSetupTimeoutMaxMsProp, LONG, Defaults.ConnectionSetupTimeoutMaxMs, MEDIUM, ConnectionSetupTimeoutMaxMsDoc)
// Experimental flag to turn on APIs required for the internal metadata quorum (KIP-500)
.defineInternal(EnableMetadataQuorumProp, BOOLEAN, false, LOW)
.defineInternal(ProcessRolesProp, LIST, Collections.emptyList(), ValidList.in("broker", "controller"), HIGH, ProcessRolesDoc)
/************* Authorizer Configuration ***********/
@ -1480,6 +1476,8 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
var brokerId: Int = getInt(KafkaConfig.BrokerIdProp)
val processRoles = parseProcessRoles()
def requiresZookeeper: Boolean = processRoles.isEmpty
private def parseProcessRoles(): Set[ProcessRole] = {
val roles = getList(KafkaConfig.ProcessRolesProp).asScala.map {
case "broker" => BrokerRole
@ -1619,9 +1617,6 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
/** ********* Feature configuration ***********/
def isFeatureVersioningSupported = interBrokerProtocolVersion >= KAFKA_2_7_IV0
/** ********* Experimental metadata quorum configuration ***********/
def metadataQuorumEnabled = getBoolean(KafkaConfig.EnableMetadataQuorumProp)
/** ********* Group coordinator configuration ***********/
val groupMinSessionTimeoutMs = getInt(KafkaConfig.GroupMinSessionTimeoutMsProp)
val groupMaxSessionTimeoutMs = getInt(KafkaConfig.GroupMaxSessionTimeoutMsProp)
@ -1910,5 +1905,9 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
s"${KafkaConfig.FailedAuthenticationDelayMsProp}=$failedAuthenticationDelayMs should always be less than" +
s" ${KafkaConfig.ConnectionsMaxIdleMsProp}=$connectionsMaxIdleMs to prevent failed" +
s" authentication responses from timing out")
if (requiresZookeeper && zkConnect == null) {
throw new ConfigException(s"Missing required configuration '${KafkaConfig.ZkConnectProp}' which has no default value.")
}
}
}

View File

@ -87,6 +87,7 @@ class KafkaServer(
val config: KafkaConfig,
time: Time = Time.SYSTEM,
threadNamePrefix: Option[String] = None,
enableForwarding: Boolean = false
) extends Server with Logging with KafkaMetricsGroup {
private val startupComplete = new AtomicBoolean(false)
@ -129,7 +130,7 @@ class KafkaServer(
var kafkaController: KafkaController = null
var forwardingManager: ForwardingManager = null
var forwardingManager: Option[ForwardingManager] = None
var alterIsrManager: AlterIsrManager = null
@ -254,10 +255,10 @@ class KafkaServer(
// Delay starting processors until the end of the initialization sequence to ensure
// that credentials have been loaded before processing authentications.
//
// Note that we allow the use of disabled APIs when experimental support for
// the internal metadata quorum has been enabled
// Note that we allow the use of KIP-500 controller APIs when forwarding is enabled
// so that the Envelope request is exposed. This is only used in testing currently.
socketServer = new SocketServer(config, metrics, time, credentialProvider,
allowDisabledApis = config.metadataQuorumEnabled)
allowControllerOnlyApis = enableForwarding)
socketServer.startup(startProcessingRequests = false)
/* start replica manager */
@ -293,15 +294,15 @@ class KafkaServer(
kafkaController = new KafkaController(config, zkClient, time, metrics, brokerInfo, brokerEpoch, tokenManager, brokerFeatures, featureCache, threadNamePrefix)
kafkaController.startup()
if (config.metadataQuorumEnabled) {
forwardingManager = ForwardingManager(
if (enableForwarding) {
this.forwardingManager = Some(ForwardingManager(
config,
metadataCache,
time,
metrics,
threadNamePrefix
)
forwardingManager.start()
))
forwardingManager.foreach(_.start())
}
adminManager = new ZkAdminManager(config, metrics, metadataCache, zkClient)
@ -685,7 +686,7 @@ class KafkaServer(
CoreUtils.swallow(alterIsrManager.shutdown(), this)
if (forwardingManager != null)
CoreUtils.swallow(forwardingManager.shutdown(), this)
CoreUtils.swallow(forwardingManager.foreach(_.shutdown()), this)
if (logManager != null)
CoreUtils.swallow(logManager.shutdown(), this)

View File

@ -66,7 +66,7 @@ class TestRaftServer(
tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames)
credentialProvider = new CredentialProvider(ScramMechanism.mechanismNames, tokenCache)
socketServer = new SocketServer(config, metrics, time, credentialProvider, allowDisabledApis = true)
socketServer = new SocketServer(config, metrics, time, credentialProvider, allowControllerOnlyApis = true)
socketServer.startup(startProcessingRequests = false)
raftManager = new KafkaRaftManager[Array[Byte]](
@ -413,6 +413,11 @@ object TestRaftServer extends Logging {
val configFile = opts.options.valueOf(opts.configOpt)
val serverProps = Utils.loadProps(configFile)
// KafkaConfig requires either `process.roles` or `zookeeper.connect`. Neither are
// actually used by the test server, so we fill in `process.roles` with an arbitrary value.
serverProps.put(KafkaConfig.ProcessRolesProp, "controller")
val config = KafkaConfig.fromProps(serverProps, doLog = false)
val throughput = opts.options.valueOf(opts.throughputOpt)
val recordSize = opts.options.valueOf(opts.recordSizeOpt)

View File

@ -55,7 +55,7 @@ class BrokerApiVersionsCommandTest extends KafkaServerTestHarness {
assertTrue(lineIter.hasNext)
assertEquals(s"$brokerList (id: 0 rack: null) -> (", lineIter.next())
val nodeApiVersions = NodeApiVersions.create
val enabledApis = ApiKeys.enabledApis.asScala
val enabledApis = ApiKeys.brokerApis.asScala
for (apiKey <- enabledApis) {
val apiVersion = nodeApiVersions.apiVersion(apiKey)
assertNotNull(apiVersion)

View File

@ -232,7 +232,7 @@ class ApiVersionTest {
Features.emptySupportedFeatures,
None
)
assertEquals(new util.HashSet[ApiKeys](ApiKeys.enabledApis), apiKeysInResponse(response))
assertEquals(new util.HashSet[ApiKeys](ApiKeys.brokerApis), apiKeysInResponse(response))
assertEquals(AbstractResponse.DEFAULT_THROTTLE_TIME, response.throttleTimeMs)
assertTrue(response.data.supportedFeatures.isEmpty)
assertTrue(response.data.finalizedFeatures.isEmpty)

View File

@ -85,6 +85,7 @@ abstract class KafkaServerTestHarness extends ZooKeeperTestHarness {
protected def serverSaslProperties: Option[Properties] = None
protected def clientSaslProperties: Option[Properties] = None
protected def brokerTime(brokerId: Int): Time = Time.SYSTEM
protected def enableForwarding: Boolean = false
@BeforeEach
override def setUp(): Unit = {
@ -98,8 +99,14 @@ abstract class KafkaServerTestHarness extends ZooKeeperTestHarness {
// Add each broker to `servers` buffer as soon as it is created to ensure that brokers
// are shutdown cleanly in tearDown even if a subsequent broker fails to start
for (config <- configs)
servers += TestUtils.createServer(config, time = brokerTime(config.brokerId))
for (config <- configs) {
servers += TestUtils.createServer(
config,
time = brokerTime(config.brokerId),
threadNamePrefix = None,
enableForwarding
)
}
brokerList = TestUtils.bootstrapServers(servers, listenerName)
alive = new Array[Boolean](servers.length)
Arrays.fill(alive, true)

View File

@ -47,7 +47,7 @@ abstract class AbstractApiVersionsRequestTest extends BaseRequestTest {
}
def validateApiVersionsResponse(apiVersionsResponse: ApiVersionsResponse, listenerName: ListenerName = interBrokerListenerName): Unit = {
val expectedApis = ApiKeys.enabledApis()
val expectedApis = ApiKeys.brokerApis()
if (listenerName == controlPlaneListenerName) {
expectedApis.add(ApiKeys.ENVELOPE)
}

View File

@ -17,8 +17,6 @@
package kafka.server
import java.util.Properties
import org.apache.kafka.common.protocol.Errors
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
@ -27,9 +25,7 @@ import scala.jdk.CollectionConverters._
class CreateTopicsRequestWithForwardingTest extends AbstractCreateTopicsRequestTest {
override def brokerPropertyOverrides(properties: Properties): Unit = {
properties.put(KafkaConfig.EnableMetadataQuorumProp, true.toString)
}
override def enableForwarding: Boolean = true
@Test
def testForwardToController(): Unit = {

View File

@ -191,7 +191,7 @@ class ForwardingManagerTest {
startTimeNanos = time.nanoseconds(),
memoryPool = MemoryPool.NONE,
buffer = requestBuffer,
metrics = new RequestChannel.Metrics(allowDisabledApis = true),
metrics = new RequestChannel.Metrics(allowControllerOnlyApis = true),
envelope = None
)
}

View File

@ -124,14 +124,19 @@ class KafkaApisTest {
val properties = TestUtils.createBrokerConfig(brokerId, "zk")
properties.put(KafkaConfig.InterBrokerProtocolVersionProp, interBrokerProtocolVersion.toString)
properties.put(KafkaConfig.LogMessageFormatVersionProp, interBrokerProtocolVersion.toString)
properties.put(KafkaConfig.EnableMetadataQuorumProp, enableForwarding.toString)
val forwardingManagerOpt = if (enableForwarding)
Some(this.forwardingManager)
else
None
new KafkaApis(requestChannel,
replicaManager,
adminManager,
groupCoordinator,
txnCoordinator,
controller,
forwardingManager,
forwardingManagerOpt,
zkClient,
brokerId,
new KafkaConfig(properties),
@ -601,7 +606,7 @@ class KafkaApisTest {
val requestHeader = new RequestHeader(ApiKeys.API_VERSIONS, ApiKeys.API_VERSIONS.latestVersion, clientId, 0)
val permittedVersion: Short = 0
EasyMock.expect(forwardingManager.controllerApiVersions()).andReturn(
EasyMock.expect(forwardingManager.controllerApiVersions).andReturn(
Some(NodeApiVersions.create(ApiKeys.ALTER_CONFIGS.id, permittedVersion, permittedVersion)))
val capturedResponse = expectNoThrottling()
@ -637,7 +642,7 @@ class KafkaApisTest {
val requestHeader = new RequestHeader(ApiKeys.API_VERSIONS, ApiKeys.API_VERSIONS.latestVersion, clientId, 0)
EasyMock.expect(forwardingManager.controllerApiVersions()).andReturn(None)
EasyMock.expect(forwardingManager.controllerApiVersions).andReturn(None)
val capturedResponse = expectNoThrottling()

View File

@ -995,4 +995,21 @@ class KafkaConfigTest {
val raftConfig = new RaftConfig(KafkaConfig.fromProps(props))
assertEquals(expectedVoters, raftConfig.quorumVoterConnections())
}
@Test
def testZookeeperConnectRequiredIfEmptyProcessRoles(): Unit = {
val props = new Properties()
props.put(KafkaConfig.ProcessRolesProp, "")
props.put(KafkaConfig.ListenersProp, "PLAINTEXT://127.0.0.1:9092")
assertFalse(isValidKafkaConfig(props))
}
@Test
def testZookeeperConnectNotRequiredIfNonEmptyProcessRoles(): Unit = {
val props = new Properties()
props.put(KafkaConfig.ProcessRolesProp, "broker")
props.put(KafkaConfig.ListenersProp, "PLAINTEXT://127.0.0.1:9092")
assertTrue(isValidKafkaConfig(props))
}
}

View File

@ -171,7 +171,7 @@ class RequestQuotaTest extends BaseRequestTest {
def testUnauthorizedThrottle(): Unit = {
RequestQuotaTest.principal = RequestQuotaTest.UnauthorizedPrincipal
for (apiKey <- ApiKeys.enabledApis.asScala) {
for (apiKey <- ApiKeys.brokerApis.asScala) {
submitTest(apiKey, () => checkUnauthorizedRequestThrottle(apiKey))
}
@ -739,9 +739,9 @@ class RequestQuotaTest extends BaseRequestTest {
}
object RequestQuotaTest {
val ClusterActions = ApiKeys.enabledApis.asScala.filter(_.clusterAction).toSet
val ClusterActions = ApiKeys.brokerApis.asScala.filter(_.clusterAction).toSet
val SaslActions = Set(ApiKeys.SASL_HANDSHAKE, ApiKeys.SASL_AUTHENTICATE)
val ClientActions = ApiKeys.enabledApis.asScala.toSet -- ClusterActions -- SaslActions
val ClientActions = ApiKeys.brokerApis.asScala.toSet -- ClusterActions -- SaslActions
val UnauthorizedPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "Unauthorized")
// Principal used for all client connections. This is modified by tests which

View File

@ -156,7 +156,11 @@ object TestUtils extends Logging {
}
def createServer(config: KafkaConfig, time: Time, threadNamePrefix: Option[String]): KafkaServer = {
val server = new KafkaServer(config, time, threadNamePrefix = threadNamePrefix)
createServer(config, time, threadNamePrefix, enableForwarding = false)
}
def createServer(config: KafkaConfig, time: Time, threadNamePrefix: Option[String], enableForwarding: Boolean): KafkaServer = {
val server = new KafkaServer(config, time, threadNamePrefix, enableForwarding)
server.startup()
server
}

View File

@ -22,7 +22,6 @@ import kafka.coordinator.group.GroupCoordinator;
import kafka.coordinator.transaction.TransactionCoordinator;
import kafka.network.RequestChannel;
import kafka.network.RequestConvertToJson;
import kafka.server.ZkAdminManager;
import kafka.server.BrokerFeatures;
import kafka.server.BrokerTopicStats;
import kafka.server.ClientQuotaManager;
@ -30,7 +29,6 @@ import kafka.server.ClientRequestQuotaManager;
import kafka.server.ControllerMutationQuotaManager;
import kafka.server.FetchManager;
import kafka.server.FinalizedFeatureCache;
import kafka.server.ForwardingManager;
import kafka.server.KafkaApis;
import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$;
@ -38,6 +36,7 @@ import kafka.server.MetadataCache;
import kafka.server.QuotaFactory;
import kafka.server.ReplicaManager;
import kafka.server.ReplicationQuotaManager;
import kafka.server.ZkAdminManager;
import kafka.zk.KafkaZkClient;
import org.apache.kafka.common.memory.MemoryPool;
import org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataBroker;
@ -99,7 +98,6 @@ public class MetadataRequestBenchmark {
private ZkAdminManager adminManager = Mockito.mock(ZkAdminManager.class);
private TransactionCoordinator transactionCoordinator = Mockito.mock(TransactionCoordinator.class);
private KafkaController kafkaController = Mockito.mock(KafkaController.class);
private ForwardingManager forwardingManager = Mockito.mock(ForwardingManager.class);
private KafkaZkClient kafkaZkClient = Mockito.mock(KafkaZkClient.class);
private Metrics metrics = new Metrics();
private int brokerId = 1;
@ -176,7 +174,7 @@ public class MetadataRequestBenchmark {
groupCoordinator,
transactionCoordinator,
kafkaController,
forwardingManager,
Option.empty(),
kafkaZkClient,
brokerId,
new KafkaConfig(kafkaProps),

View File

@ -12,8 +12,7 @@ Below we describe the details to set this up.
bin/test-raft-server-start.sh config/raft.properties
### Run Multi Node Quorum ###
Create 3 separate raft quorum properties as the following
(note that the `zookeeper.connect` config is required, but unused):
Create 3 separate raft quorum properties as the following:
`cat << EOF >> config/raft-quorum-1.properties`
@ -21,8 +20,6 @@ Create 3 separate raft quorum properties as the following
listeners=PLAINTEXT://localhost:9092
controller.quorum.voters=1@localhost:9092,2@localhost:9093,3@localhost:9094
log.dirs=/tmp/raft-logs-1
zookeeper.connect=localhost:2181
EOF
`cat << EOF >> config/raft-quorum-2.properties`
@ -31,8 +28,6 @@ Create 3 separate raft quorum properties as the following
listeners=PLAINTEXT://localhost:9093
controller.quorum.voters=1@localhost:9092,2@localhost:9093,3@localhost:9094
log.dirs=/tmp/raft-logs-2
zookeeper.connect=localhost:2181
EOF
`cat << EOF >> config/raft-quorum-3.properties`
@ -41,8 +36,6 @@ Create 3 separate raft quorum properties as the following
listeners=PLAINTEXT://localhost:9094
controller.quorum.voters=1@localhost:9092,2@localhost:9093,3@localhost:9094
log.dirs=/tmp/raft-logs-3
zookeeper.connect=localhost:2181
EOF
Open up 3 separate terminals, and run individual commands: