mirror of https://github.com/apache/kafka.git
MINOR: Move RaftManager interface to raft module (#20366)
- Move the `RaftManager` interface to raft module, and remove the `register` and `leaderAndEpoch` methods since they are already part of the RaftClient APIs. - Rename RaftManager.scala to KafkaRaftManager.scala. Reviewers: Ken Huang <s7133700@gmail.com>, TengYao Chi <kitingiao@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
parent
3cbb2a0aaf
commit
dddb619177
|
@ -107,6 +107,7 @@
|
|||
<allow pkg="org.apache.kafka.common.config" />
|
||||
<allow pkg="org.apache.kafka.common.message" />
|
||||
<allow pkg="org.apache.kafka.common.metadata" />
|
||||
<allow pkg="org.apache.kafka.common.network" />
|
||||
<allow pkg="org.apache.kafka.common.protocol" />
|
||||
<allow pkg="org.apache.kafka.common.quota" />
|
||||
<allow pkg="org.apache.kafka.common.record" />
|
||||
|
|
|
@ -20,16 +20,13 @@ import java.io.File
|
|||
import java.net.InetSocketAddress
|
||||
import java.nio.file.Files
|
||||
import java.nio.file.Paths
|
||||
import java.util.OptionalInt
|
||||
import java.util.{OptionalInt, Collection => JCollection, Map => JMap}
|
||||
import java.util.concurrent.CompletableFuture
|
||||
import java.util.{Map => JMap}
|
||||
import java.util.{Collection => JCollection}
|
||||
import kafka.server.KafkaConfig
|
||||
import kafka.utils.CoreUtils
|
||||
import kafka.utils.Logging
|
||||
import org.apache.kafka.clients.{ApiVersions, ManualMetadataUpdater, MetadataRecoveryStrategy, NetworkClient}
|
||||
import org.apache.kafka.common.KafkaException
|
||||
import org.apache.kafka.common.Node
|
||||
import org.apache.kafka.common.TopicPartition
|
||||
import org.apache.kafka.common.Uuid
|
||||
import org.apache.kafka.common.metrics.Metrics
|
||||
|
@ -40,7 +37,7 @@ import org.apache.kafka.common.requests.RequestHeader
|
|||
import org.apache.kafka.common.security.JaasContext
|
||||
import org.apache.kafka.common.security.auth.SecurityProtocol
|
||||
import org.apache.kafka.common.utils.{LogContext, Time, Utils}
|
||||
import org.apache.kafka.raft.{Endpoints, ExternalKRaftMetrics, FileQuorumStateStore, KafkaNetworkChannel, KafkaRaftClient, KafkaRaftClientDriver, LeaderAndEpoch, MetadataLogConfig, QuorumConfig, RaftClient, ReplicatedLog, TimingWheelExpirationService}
|
||||
import org.apache.kafka.raft.{Endpoints, ExternalKRaftMetrics, FileQuorumStateStore, KafkaNetworkChannel, KafkaRaftClient, KafkaRaftClientDriver, MetadataLogConfig, QuorumConfig, RaftManager, ReplicatedLog, TimingWheelExpirationService}
|
||||
import org.apache.kafka.server.ProcessRole
|
||||
import org.apache.kafka.server.common.Feature
|
||||
import org.apache.kafka.server.common.serialization.RecordSerde
|
||||
|
@ -50,7 +47,6 @@ import org.apache.kafka.server.util.timer.SystemTimer
|
|||
import org.apache.kafka.storage.internals.log.{LogManager, UnifiedLog}
|
||||
|
||||
import scala.jdk.CollectionConverters._
|
||||
import scala.jdk.OptionConverters._
|
||||
|
||||
object KafkaRaftManager {
|
||||
private def createLogDirectory(logDir: File, logDirName: String): File = {
|
||||
|
@ -85,29 +81,6 @@ object KafkaRaftManager {
|
|||
}
|
||||
}
|
||||
|
||||
trait RaftManager[T] {
|
||||
def handleRequest(
|
||||
context: RequestContext,
|
||||
header: RequestHeader,
|
||||
request: ApiMessage,
|
||||
createdTimeMs: Long
|
||||
): CompletableFuture[ApiMessage]
|
||||
|
||||
def register(
|
||||
listener: RaftClient.Listener[T]
|
||||
): Unit
|
||||
|
||||
def leaderAndEpoch: LeaderAndEpoch
|
||||
|
||||
def client: RaftClient[T]
|
||||
|
||||
def replicatedLog: ReplicatedLog
|
||||
|
||||
def voterNode(id: Int, listener: ListenerName): Option[Node]
|
||||
|
||||
def recordSerde: RecordSerde[T]
|
||||
}
|
||||
|
||||
class KafkaRaftManager[T](
|
||||
clusterId: String,
|
||||
config: KafkaConfig,
|
||||
|
@ -178,12 +151,6 @@ class KafkaRaftManager[T](
|
|||
CoreUtils.swallow(dataDirLock.foreach(_.destroy()), this)
|
||||
}
|
||||
|
||||
override def register(
|
||||
listener: RaftClient.Listener[T]
|
||||
): Unit = {
|
||||
client.register(listener)
|
||||
}
|
||||
|
||||
override def handleRequest(
|
||||
context: RequestContext,
|
||||
header: RequestHeader,
|
||||
|
@ -292,13 +259,5 @@ class KafkaRaftManager[T](
|
|||
(controllerListenerName, networkClient)
|
||||
}
|
||||
|
||||
override def leaderAndEpoch: LeaderAndEpoch = {
|
||||
client.leaderAndEpoch
|
||||
}
|
||||
|
||||
override def voterNode(id: Int, listener: ListenerName): Option[Node] = {
|
||||
client.voterNode(id, listener).toScala
|
||||
}
|
||||
|
||||
override def recordSerde: RecordSerde[T] = serde
|
||||
}
|
|
@ -24,7 +24,6 @@ import java.util.Map.Entry
|
|||
import java.util.concurrent.CompletableFuture
|
||||
import java.util.function.Consumer
|
||||
import kafka.network.RequestChannel
|
||||
import kafka.raft.RaftManager
|
||||
import kafka.server.QuotaFactory.QuotaManagers
|
||||
import kafka.server.logger.RuntimeLoggerManager
|
||||
import kafka.server.metadata.KRaftMetadataCache
|
||||
|
@ -55,6 +54,7 @@ import org.apache.kafka.image.publisher.ControllerRegistrationsPublisher
|
|||
import org.apache.kafka.metadata.{BrokerHeartbeatReply, BrokerRegistrationReply}
|
||||
import org.apache.kafka.common.security.auth.KafkaPrincipal
|
||||
import org.apache.kafka.common.security.auth.SecurityProtocol
|
||||
import org.apache.kafka.raft.RaftManager
|
||||
import org.apache.kafka.server.{ApiVersionManager, DelegationTokenManager, ProcessRole}
|
||||
import org.apache.kafka.server.authorizer.Authorizer
|
||||
import org.apache.kafka.server.common.{ApiMessageAndVersion, RequestLocal}
|
||||
|
@ -1070,7 +1070,7 @@ class ControllerApis(
|
|||
EndpointType.CONTROLLER,
|
||||
clusterId,
|
||||
() => registrationsPublisher.describeClusterControllers(request.context.listenerName()),
|
||||
() => raftManager.leaderAndEpoch.leaderId().orElse(-1)
|
||||
() => raftManager.client.leaderAndEpoch.leaderId().orElse(-1)
|
||||
)
|
||||
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
|
||||
new DescribeClusterResponse(response.setThrottleTimeMs(requestThrottleMs)))
|
||||
|
|
|
@ -17,7 +17,6 @@
|
|||
|
||||
package kafka.server
|
||||
|
||||
import kafka.raft.RaftManager
|
||||
import kafka.utils.Logging
|
||||
import org.apache.kafka.clients._
|
||||
import org.apache.kafka.common.metrics.Metrics
|
||||
|
@ -28,6 +27,7 @@ import org.apache.kafka.common.security.JaasContext
|
|||
import org.apache.kafka.common.security.auth.SecurityProtocol
|
||||
import org.apache.kafka.common.utils.{LogContext, Time}
|
||||
import org.apache.kafka.common.{Node, Reconfigurable}
|
||||
import org.apache.kafka.raft.RaftManager
|
||||
import org.apache.kafka.server.common.{ApiMessageAndVersion, ControllerRequestCompletionHandler, NodeToControllerChannelManager}
|
||||
import org.apache.kafka.server.util.{InterBrokerSendThread, RequestAndCompletionHandler}
|
||||
|
||||
|
@ -37,7 +37,7 @@ import java.util.concurrent.LinkedBlockingDeque
|
|||
import java.util.concurrent.atomic.AtomicReference
|
||||
import scala.collection.Seq
|
||||
import scala.jdk.CollectionConverters._
|
||||
import scala.jdk.OptionConverters.{RichOption, RichOptionalInt}
|
||||
import scala.jdk.OptionConverters.{RichOption, RichOptional, RichOptionalInt}
|
||||
|
||||
case class ControllerInformation(
|
||||
node: Option[Node],
|
||||
|
@ -79,10 +79,10 @@ class RaftControllerNodeProvider(
|
|||
val saslMechanism: String
|
||||
) extends ControllerNodeProvider with Logging {
|
||||
|
||||
private def idToNode(id: Int): Option[Node] = raftManager.voterNode(id, listenerName)
|
||||
private def idToNode(id: Int): Option[Node] = raftManager.client.voterNode(id, listenerName).toScala
|
||||
|
||||
override def getControllerInfo(): ControllerInformation =
|
||||
ControllerInformation(raftManager.leaderAndEpoch.leaderId.toScala.flatMap(idToNode),
|
||||
ControllerInformation(raftManager.client.leaderAndEpoch.leaderId.toScala.flatMap(idToNode),
|
||||
listenerName, securityProtocol, saslMechanism)
|
||||
}
|
||||
|
||||
|
|
|
@ -342,7 +342,7 @@ class SharedServer(
|
|||
throw new RuntimeException("Unable to install metadata publishers.", t)
|
||||
}
|
||||
}
|
||||
_raftManager.register(loader)
|
||||
_raftManager.client.register(loader)
|
||||
debug("Completed SharedServer startup.")
|
||||
started = true
|
||||
} catch {
|
||||
|
|
|
@ -18,7 +18,6 @@
|
|||
package kafka.tools
|
||||
|
||||
import kafka.network.RequestChannel
|
||||
import kafka.raft.RaftManager
|
||||
import kafka.server.ApiRequestHandler
|
||||
import kafka.utils.Logging
|
||||
import org.apache.kafka.common.internals.FatalExitError
|
||||
|
@ -26,6 +25,7 @@ import org.apache.kafka.common.message.{BeginQuorumEpochResponseData, EndQuorumE
|
|||
import org.apache.kafka.common.protocol.{ApiKeys, ApiMessage}
|
||||
import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, BeginQuorumEpochResponse, EndQuorumEpochResponse, FetchResponse, FetchSnapshotResponse, VoteResponse}
|
||||
import org.apache.kafka.common.utils.Time
|
||||
import org.apache.kafka.raft.RaftManager
|
||||
import org.apache.kafka.server.ApiVersionManager
|
||||
import org.apache.kafka.server.common.RequestLocal
|
||||
|
||||
|
|
|
@ -22,7 +22,7 @@ import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}
|
|||
import java.util.concurrent.{CompletableFuture, CountDownLatch, LinkedBlockingDeque, TimeUnit}
|
||||
import joptsimple.{OptionException, OptionSpec}
|
||||
import kafka.network.SocketServer
|
||||
import kafka.raft.{DefaultExternalKRaftMetrics, KafkaRaftManager, RaftManager}
|
||||
import kafka.raft.{DefaultExternalKRaftMetrics, KafkaRaftManager}
|
||||
import kafka.server.{KafkaConfig, KafkaRequestHandlerPool}
|
||||
import kafka.utils.{CoreUtils, Logging}
|
||||
import org.apache.kafka.common.message.ApiMessageType.ListenerType
|
||||
|
@ -36,7 +36,7 @@ import org.apache.kafka.common.security.token.delegation.internals.DelegationTok
|
|||
import org.apache.kafka.common.utils.{Exit, Time, Utils}
|
||||
import org.apache.kafka.common.{TopicPartition, Uuid, protocol}
|
||||
import org.apache.kafka.raft.errors.NotLeaderException
|
||||
import org.apache.kafka.raft.{Batch, BatchReader, Endpoints, LeaderAndEpoch, QuorumConfig, RaftClient}
|
||||
import org.apache.kafka.raft.{Batch, BatchReader, Endpoints, LeaderAndEpoch, QuorumConfig, RaftClient, RaftManager}
|
||||
import org.apache.kafka.security.CredentialProvider
|
||||
import org.apache.kafka.server.SimpleApiVersionManager
|
||||
import org.apache.kafka.server.common.{FinalizedFeatures, MetadataVersion}
|
||||
|
@ -180,7 +180,7 @@ class TestRaftServer(
|
|||
|
||||
private var claimedEpoch: Option[Int] = None
|
||||
|
||||
raftManager.register(this)
|
||||
raftManager.client.register(this)
|
||||
|
||||
override def handleLeaderChange(newLeaderAndEpoch: LeaderAndEpoch): Unit = {
|
||||
if (newLeaderAndEpoch.isLeader(config.nodeId)) {
|
||||
|
|
|
@ -34,7 +34,7 @@ class AllocateProducerIdsRequestTest(cluster: ClusterInstance) {
|
|||
def testAllocateProducersIdSentToController(): Unit = {
|
||||
val sourceBroker = cluster.brokers.values().stream().findFirst().get().asInstanceOf[BrokerServer]
|
||||
|
||||
val controllerId = sourceBroker.raftManager.leaderAndEpoch.leaderId().getAsInt
|
||||
val controllerId = sourceBroker.raftManager.client.leaderAndEpoch.leaderId().getAsInt
|
||||
val controllerServer = cluster.controllers.values().stream()
|
||||
.filter(_.config.nodeId == controllerId)
|
||||
.findFirst()
|
||||
|
@ -50,7 +50,7 @@ class AllocateProducerIdsRequestTest(cluster: ClusterInstance) {
|
|||
def testAllocateProducersIdSentToNonController(): Unit = {
|
||||
val sourceBroker = cluster.brokers.values().stream().findFirst().get().asInstanceOf[BrokerServer]
|
||||
|
||||
val controllerId = sourceBroker.raftManager.leaderAndEpoch.leaderId().getAsInt
|
||||
val controllerId = sourceBroker.raftManager.client.leaderAndEpoch.leaderId().getAsInt
|
||||
val controllerServer = cluster.controllers().values().stream()
|
||||
.filter(_.config.nodeId != controllerId)
|
||||
.findFirst()
|
||||
|
|
|
@ -18,7 +18,6 @@
|
|||
package kafka.server
|
||||
|
||||
import kafka.network.RequestChannel
|
||||
import kafka.raft.RaftManager
|
||||
import kafka.server.QuotaFactory.QuotaManagers
|
||||
import kafka.server.metadata.KRaftMetadataCache
|
||||
import org.apache.kafka.clients.admin.AlterConfigOp
|
||||
|
@ -56,7 +55,7 @@ import org.apache.kafka.image.publisher.ControllerRegistrationsPublisher
|
|||
import org.apache.kafka.network.SocketServerConfigs
|
||||
import org.apache.kafka.network.metrics.RequestChannelMetrics
|
||||
import org.apache.kafka.network.Session
|
||||
import org.apache.kafka.raft.QuorumConfig
|
||||
import org.apache.kafka.raft.{QuorumConfig, RaftManager}
|
||||
import org.apache.kafka.server.SimpleApiVersionManager
|
||||
import org.apache.kafka.server.authorizer.{Action, AuthorizableRequestContext, AuthorizationResult, Authorizer}
|
||||
import org.apache.kafka.server.common.{ApiMessageAndVersion, FinalizedFeatures, KRaftVersion, MetadataVersion, ProducerIdsBlock, RequestLocal}
|
||||
|
|
|
@ -293,7 +293,7 @@ class LeaderEpochIntegrationTest extends QuorumTestHarness with Logging {
|
|||
}
|
||||
|
||||
private def waitUntilQuorumLeaderElected(controllerServer: ControllerServer, timeout: Long = JTestUtils.DEFAULT_MAX_WAIT_MS): Int = {
|
||||
val (leaderAndEpoch, _) = computeUntilTrue(controllerServer.raftManager.leaderAndEpoch, waitTime = timeout)(_.leaderId().isPresent)
|
||||
val (leaderAndEpoch, _) = computeUntilTrue(controllerServer.raftManager.client.leaderAndEpoch, waitTime = timeout)(_.leaderId().isPresent)
|
||||
leaderAndEpoch.leaderId().orElseThrow(() => new AssertionError(s"Quorum Controller leader not elected after $timeout ms"))
|
||||
}
|
||||
|
||||
|
|
|
@ -17,6 +17,8 @@
|
|||
|
||||
package org.apache.kafka.controller;
|
||||
|
||||
import org.apache.kafka.common.Node;
|
||||
import org.apache.kafka.common.network.ListenerName;
|
||||
import org.apache.kafka.common.protocol.ObjectSerializationCache;
|
||||
import org.apache.kafka.common.utils.BufferSupplier;
|
||||
import org.apache.kafka.common.utils.LogContext;
|
||||
|
@ -758,6 +760,11 @@ public final class MockRaftClient implements RaftClient<ApiMessageAndVersion>, A
|
|||
return OptionalInt.of(nodeId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<Node> voterNode(int id, ListenerName listenerName) {
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
public List<RaftClient.Listener<ApiMessageAndVersion>> listeners() {
|
||||
final CompletableFuture<List<RaftClient.Listener<ApiMessageAndVersion>>> future = new CompletableFuture<>();
|
||||
eventQueue.append(() ->
|
||||
|
|
|
@ -17,6 +17,8 @@
|
|||
|
||||
package org.apache.kafka.image.publisher;
|
||||
|
||||
import org.apache.kafka.common.Node;
|
||||
import org.apache.kafka.common.network.ListenerName;
|
||||
import org.apache.kafka.common.utils.MockTime;
|
||||
import org.apache.kafka.image.FakeSnapshotWriter;
|
||||
import org.apache.kafka.image.MetadataImageTest;
|
||||
|
@ -73,6 +75,11 @@ public class SnapshotEmitterTest {
|
|||
return OptionalInt.empty();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<Node> voterNode(int id, ListenerName listenerName) {
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long prepareAppend(int epoch, List<ApiMessageAndVersion> records) {
|
||||
return 0;
|
||||
|
|
|
@ -3793,6 +3793,7 @@ public final class KafkaRaftClient<T> implements RaftClient<T> {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<Node> voterNode(int id, ListenerName listenerName) {
|
||||
return partitionState.lastVoterSet().voterNode(id, listenerName);
|
||||
}
|
||||
|
|
|
@ -16,7 +16,9 @@
|
|||
*/
|
||||
package org.apache.kafka.raft;
|
||||
|
||||
import org.apache.kafka.common.Node;
|
||||
import org.apache.kafka.common.errors.ApiException;
|
||||
import org.apache.kafka.common.network.ListenerName;
|
||||
import org.apache.kafka.raft.errors.BufferAllocationException;
|
||||
import org.apache.kafka.raft.errors.NotLeaderException;
|
||||
import org.apache.kafka.server.common.KRaftVersion;
|
||||
|
@ -129,6 +131,15 @@ public interface RaftClient<T> extends AutoCloseable {
|
|||
*/
|
||||
OptionalInt nodeId();
|
||||
|
||||
/**
|
||||
* Returns the node information for a given voter id and listener.
|
||||
*
|
||||
* @param id the id of the voter
|
||||
* @param listenerName the name of the listener
|
||||
* @return the node information if it exists, otherwise {@code Optional.empty()}
|
||||
*/
|
||||
Optional<Node> voterNode(int id, ListenerName listenerName);
|
||||
|
||||
/**
|
||||
* Prepare a list of records to be appended to the log.
|
||||
*
|
||||
|
|
|
@ -0,0 +1,54 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.kafka.raft;
|
||||
|
||||
import org.apache.kafka.common.protocol.ApiMessage;
|
||||
import org.apache.kafka.common.requests.RequestContext;
|
||||
import org.apache.kafka.common.requests.RequestHeader;
|
||||
import org.apache.kafka.server.common.serialization.RecordSerde;
|
||||
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
|
||||
public interface RaftManager<T> {
|
||||
|
||||
CompletableFuture<ApiMessage> handleRequest(
|
||||
RequestContext context,
|
||||
RequestHeader header,
|
||||
ApiMessage request,
|
||||
long createdTimeMs
|
||||
);
|
||||
|
||||
/**
|
||||
* Returns a Raft client.
|
||||
* <p>
|
||||
* Always returns the same instance. Callers must NOT close it.
|
||||
*/
|
||||
RaftClient<T> client();
|
||||
|
||||
/**
|
||||
* Returns a replicated log.
|
||||
* <p>
|
||||
* Always returns the same instance. Callers must NOT close it.
|
||||
*/
|
||||
ReplicatedLog replicatedLog();
|
||||
|
||||
/**
|
||||
* Returns the record Serde.
|
||||
*/
|
||||
RecordSerde<T> recordSerde();
|
||||
}
|
Loading…
Reference in New Issue