diff --git a/checkstyle/import-control-metadata.xml b/checkstyle/import-control-metadata.xml index d5b787ce24f..773635cec8e 100644 --- a/checkstyle/import-control-metadata.xml +++ b/checkstyle/import-control-metadata.xml @@ -107,6 +107,7 @@ + diff --git a/core/src/main/scala/kafka/raft/RaftManager.scala b/core/src/main/scala/kafka/raft/KafkaRaftManager.scala similarity index 89% rename from core/src/main/scala/kafka/raft/RaftManager.scala rename to core/src/main/scala/kafka/raft/KafkaRaftManager.scala index 9e8ea38f8fd..86950e1ce25 100644 --- a/core/src/main/scala/kafka/raft/RaftManager.scala +++ b/core/src/main/scala/kafka/raft/KafkaRaftManager.scala @@ -20,16 +20,13 @@ import java.io.File import java.net.InetSocketAddress import java.nio.file.Files import java.nio.file.Paths -import java.util.OptionalInt +import java.util.{OptionalInt, Collection => JCollection, Map => JMap} import java.util.concurrent.CompletableFuture -import java.util.{Map => JMap} -import java.util.{Collection => JCollection} import kafka.server.KafkaConfig import kafka.utils.CoreUtils import kafka.utils.Logging import org.apache.kafka.clients.{ApiVersions, ManualMetadataUpdater, MetadataRecoveryStrategy, NetworkClient} import org.apache.kafka.common.KafkaException -import org.apache.kafka.common.Node import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.Uuid import org.apache.kafka.common.metrics.Metrics @@ -40,7 +37,7 @@ import org.apache.kafka.common.requests.RequestHeader import org.apache.kafka.common.security.JaasContext import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.common.utils.{LogContext, Time, Utils} -import org.apache.kafka.raft.{Endpoints, ExternalKRaftMetrics, FileQuorumStateStore, KafkaNetworkChannel, KafkaRaftClient, KafkaRaftClientDriver, LeaderAndEpoch, MetadataLogConfig, QuorumConfig, RaftClient, ReplicatedLog, TimingWheelExpirationService} +import org.apache.kafka.raft.{Endpoints, ExternalKRaftMetrics, FileQuorumStateStore, KafkaNetworkChannel, KafkaRaftClient, KafkaRaftClientDriver, MetadataLogConfig, QuorumConfig, RaftManager, ReplicatedLog, TimingWheelExpirationService} import org.apache.kafka.server.ProcessRole import org.apache.kafka.server.common.Feature import org.apache.kafka.server.common.serialization.RecordSerde @@ -50,7 +47,6 @@ import org.apache.kafka.server.util.timer.SystemTimer import org.apache.kafka.storage.internals.log.{LogManager, UnifiedLog} import scala.jdk.CollectionConverters._ -import scala.jdk.OptionConverters._ object KafkaRaftManager { private def createLogDirectory(logDir: File, logDirName: String): File = { @@ -85,29 +81,6 @@ object KafkaRaftManager { } } -trait RaftManager[T] { - def handleRequest( - context: RequestContext, - header: RequestHeader, - request: ApiMessage, - createdTimeMs: Long - ): CompletableFuture[ApiMessage] - - def register( - listener: RaftClient.Listener[T] - ): Unit - - def leaderAndEpoch: LeaderAndEpoch - - def client: RaftClient[T] - - def replicatedLog: ReplicatedLog - - def voterNode(id: Int, listener: ListenerName): Option[Node] - - def recordSerde: RecordSerde[T] -} - class KafkaRaftManager[T]( clusterId: String, config: KafkaConfig, @@ -178,12 +151,6 @@ class KafkaRaftManager[T]( CoreUtils.swallow(dataDirLock.foreach(_.destroy()), this) } - override def register( - listener: RaftClient.Listener[T] - ): Unit = { - client.register(listener) - } - override def handleRequest( context: RequestContext, header: RequestHeader, @@ -292,13 +259,5 @@ class KafkaRaftManager[T]( (controllerListenerName, networkClient) } - override def leaderAndEpoch: LeaderAndEpoch = { - client.leaderAndEpoch - } - - override def voterNode(id: Int, listener: ListenerName): Option[Node] = { - client.voterNode(id, listener).toScala - } - override def recordSerde: RecordSerde[T] = serde } diff --git a/core/src/main/scala/kafka/server/ControllerApis.scala b/core/src/main/scala/kafka/server/ControllerApis.scala index cc9a12c9aa3..ac9a2d9eff1 100644 --- a/core/src/main/scala/kafka/server/ControllerApis.scala +++ b/core/src/main/scala/kafka/server/ControllerApis.scala @@ -24,7 +24,6 @@ import java.util.Map.Entry import java.util.concurrent.CompletableFuture import java.util.function.Consumer import kafka.network.RequestChannel -import kafka.raft.RaftManager import kafka.server.QuotaFactory.QuotaManagers import kafka.server.logger.RuntimeLoggerManager import kafka.server.metadata.KRaftMetadataCache @@ -55,6 +54,7 @@ import org.apache.kafka.image.publisher.ControllerRegistrationsPublisher import org.apache.kafka.metadata.{BrokerHeartbeatReply, BrokerRegistrationReply} import org.apache.kafka.common.security.auth.KafkaPrincipal import org.apache.kafka.common.security.auth.SecurityProtocol +import org.apache.kafka.raft.RaftManager import org.apache.kafka.server.{ApiVersionManager, DelegationTokenManager, ProcessRole} import org.apache.kafka.server.authorizer.Authorizer import org.apache.kafka.server.common.{ApiMessageAndVersion, RequestLocal} @@ -1070,7 +1070,7 @@ class ControllerApis( EndpointType.CONTROLLER, clusterId, () => registrationsPublisher.describeClusterControllers(request.context.listenerName()), - () => raftManager.leaderAndEpoch.leaderId().orElse(-1) + () => raftManager.client.leaderAndEpoch.leaderId().orElse(-1) ) requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => new DescribeClusterResponse(response.setThrottleTimeMs(requestThrottleMs))) diff --git a/core/src/main/scala/kafka/server/NodeToControllerChannelManager.scala b/core/src/main/scala/kafka/server/NodeToControllerChannelManager.scala index cd6b8e1d134..0caa03ec052 100644 --- a/core/src/main/scala/kafka/server/NodeToControllerChannelManager.scala +++ b/core/src/main/scala/kafka/server/NodeToControllerChannelManager.scala @@ -17,7 +17,6 @@ package kafka.server -import kafka.raft.RaftManager import kafka.utils.Logging import org.apache.kafka.clients._ import org.apache.kafka.common.metrics.Metrics @@ -28,6 +27,7 @@ import org.apache.kafka.common.security.JaasContext import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.common.utils.{LogContext, Time} import org.apache.kafka.common.{Node, Reconfigurable} +import org.apache.kafka.raft.RaftManager import org.apache.kafka.server.common.{ApiMessageAndVersion, ControllerRequestCompletionHandler, NodeToControllerChannelManager} import org.apache.kafka.server.util.{InterBrokerSendThread, RequestAndCompletionHandler} @@ -37,7 +37,7 @@ import java.util.concurrent.LinkedBlockingDeque import java.util.concurrent.atomic.AtomicReference import scala.collection.Seq import scala.jdk.CollectionConverters._ -import scala.jdk.OptionConverters.{RichOption, RichOptionalInt} +import scala.jdk.OptionConverters.{RichOption, RichOptional, RichOptionalInt} case class ControllerInformation( node: Option[Node], @@ -79,10 +79,10 @@ class RaftControllerNodeProvider( val saslMechanism: String ) extends ControllerNodeProvider with Logging { - private def idToNode(id: Int): Option[Node] = raftManager.voterNode(id, listenerName) + private def idToNode(id: Int): Option[Node] = raftManager.client.voterNode(id, listenerName).toScala override def getControllerInfo(): ControllerInformation = - ControllerInformation(raftManager.leaderAndEpoch.leaderId.toScala.flatMap(idToNode), + ControllerInformation(raftManager.client.leaderAndEpoch.leaderId.toScala.flatMap(idToNode), listenerName, securityProtocol, saslMechanism) } diff --git a/core/src/main/scala/kafka/server/SharedServer.scala b/core/src/main/scala/kafka/server/SharedServer.scala index 03f1c9b929e..aba9035cb7e 100644 --- a/core/src/main/scala/kafka/server/SharedServer.scala +++ b/core/src/main/scala/kafka/server/SharedServer.scala @@ -342,7 +342,7 @@ class SharedServer( throw new RuntimeException("Unable to install metadata publishers.", t) } } - _raftManager.register(loader) + _raftManager.client.register(loader) debug("Completed SharedServer startup.") started = true } catch { diff --git a/core/src/main/scala/kafka/tools/TestRaftRequestHandler.scala b/core/src/main/scala/kafka/tools/TestRaftRequestHandler.scala index 2e9d8e2bb8a..081fbec3c95 100644 --- a/core/src/main/scala/kafka/tools/TestRaftRequestHandler.scala +++ b/core/src/main/scala/kafka/tools/TestRaftRequestHandler.scala @@ -18,7 +18,6 @@ package kafka.tools import kafka.network.RequestChannel -import kafka.raft.RaftManager import kafka.server.ApiRequestHandler import kafka.utils.Logging import org.apache.kafka.common.internals.FatalExitError @@ -26,6 +25,7 @@ import org.apache.kafka.common.message.{BeginQuorumEpochResponseData, EndQuorumE import org.apache.kafka.common.protocol.{ApiKeys, ApiMessage} import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, BeginQuorumEpochResponse, EndQuorumEpochResponse, FetchResponse, FetchSnapshotResponse, VoteResponse} import org.apache.kafka.common.utils.Time +import org.apache.kafka.raft.RaftManager import org.apache.kafka.server.ApiVersionManager import org.apache.kafka.server.common.RequestLocal diff --git a/core/src/main/scala/kafka/tools/TestRaftServer.scala b/core/src/main/scala/kafka/tools/TestRaftServer.scala index 2b3183be80b..48e101443a1 100644 --- a/core/src/main/scala/kafka/tools/TestRaftServer.scala +++ b/core/src/main/scala/kafka/tools/TestRaftServer.scala @@ -22,7 +22,7 @@ import java.util.concurrent.atomic.{AtomicInteger, AtomicLong} import java.util.concurrent.{CompletableFuture, CountDownLatch, LinkedBlockingDeque, TimeUnit} import joptsimple.{OptionException, OptionSpec} import kafka.network.SocketServer -import kafka.raft.{DefaultExternalKRaftMetrics, KafkaRaftManager, RaftManager} +import kafka.raft.{DefaultExternalKRaftMetrics, KafkaRaftManager} import kafka.server.{KafkaConfig, KafkaRequestHandlerPool} import kafka.utils.{CoreUtils, Logging} import org.apache.kafka.common.message.ApiMessageType.ListenerType @@ -36,7 +36,7 @@ import org.apache.kafka.common.security.token.delegation.internals.DelegationTok import org.apache.kafka.common.utils.{Exit, Time, Utils} import org.apache.kafka.common.{TopicPartition, Uuid, protocol} import org.apache.kafka.raft.errors.NotLeaderException -import org.apache.kafka.raft.{Batch, BatchReader, Endpoints, LeaderAndEpoch, QuorumConfig, RaftClient} +import org.apache.kafka.raft.{Batch, BatchReader, Endpoints, LeaderAndEpoch, QuorumConfig, RaftClient, RaftManager} import org.apache.kafka.security.CredentialProvider import org.apache.kafka.server.SimpleApiVersionManager import org.apache.kafka.server.common.{FinalizedFeatures, MetadataVersion} @@ -180,7 +180,7 @@ class TestRaftServer( private var claimedEpoch: Option[Int] = None - raftManager.register(this) + raftManager.client.register(this) override def handleLeaderChange(newLeaderAndEpoch: LeaderAndEpoch): Unit = { if (newLeaderAndEpoch.isLeader(config.nodeId)) { diff --git a/core/src/test/scala/unit/kafka/server/AllocateProducerIdsRequestTest.scala b/core/src/test/scala/unit/kafka/server/AllocateProducerIdsRequestTest.scala index aa399985f83..16a82fdca8b 100644 --- a/core/src/test/scala/unit/kafka/server/AllocateProducerIdsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/AllocateProducerIdsRequestTest.scala @@ -34,7 +34,7 @@ class AllocateProducerIdsRequestTest(cluster: ClusterInstance) { def testAllocateProducersIdSentToController(): Unit = { val sourceBroker = cluster.brokers.values().stream().findFirst().get().asInstanceOf[BrokerServer] - val controllerId = sourceBroker.raftManager.leaderAndEpoch.leaderId().getAsInt + val controllerId = sourceBroker.raftManager.client.leaderAndEpoch.leaderId().getAsInt val controllerServer = cluster.controllers.values().stream() .filter(_.config.nodeId == controllerId) .findFirst() @@ -50,7 +50,7 @@ class AllocateProducerIdsRequestTest(cluster: ClusterInstance) { def testAllocateProducersIdSentToNonController(): Unit = { val sourceBroker = cluster.brokers.values().stream().findFirst().get().asInstanceOf[BrokerServer] - val controllerId = sourceBroker.raftManager.leaderAndEpoch.leaderId().getAsInt + val controllerId = sourceBroker.raftManager.client.leaderAndEpoch.leaderId().getAsInt val controllerServer = cluster.controllers().values().stream() .filter(_.config.nodeId != controllerId) .findFirst() diff --git a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala index 901dbe74356..43c7d5aecf4 100644 --- a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala @@ -18,7 +18,6 @@ package kafka.server import kafka.network.RequestChannel -import kafka.raft.RaftManager import kafka.server.QuotaFactory.QuotaManagers import kafka.server.metadata.KRaftMetadataCache import org.apache.kafka.clients.admin.AlterConfigOp @@ -56,7 +55,7 @@ import org.apache.kafka.image.publisher.ControllerRegistrationsPublisher import org.apache.kafka.network.SocketServerConfigs import org.apache.kafka.network.metrics.RequestChannelMetrics import org.apache.kafka.network.Session -import org.apache.kafka.raft.QuorumConfig +import org.apache.kafka.raft.{QuorumConfig, RaftManager} import org.apache.kafka.server.SimpleApiVersionManager import org.apache.kafka.server.authorizer.{Action, AuthorizableRequestContext, AuthorizationResult, Authorizer} import org.apache.kafka.server.common.{ApiMessageAndVersion, FinalizedFeatures, KRaftVersion, MetadataVersion, ProducerIdsBlock, RequestLocal} diff --git a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala index d678a497b53..f1ba2c7ac5e 100644 --- a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala +++ b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala @@ -293,7 +293,7 @@ class LeaderEpochIntegrationTest extends QuorumTestHarness with Logging { } private def waitUntilQuorumLeaderElected(controllerServer: ControllerServer, timeout: Long = JTestUtils.DEFAULT_MAX_WAIT_MS): Int = { - val (leaderAndEpoch, _) = computeUntilTrue(controllerServer.raftManager.leaderAndEpoch, waitTime = timeout)(_.leaderId().isPresent) + val (leaderAndEpoch, _) = computeUntilTrue(controllerServer.raftManager.client.leaderAndEpoch, waitTime = timeout)(_.leaderId().isPresent) leaderAndEpoch.leaderId().orElseThrow(() => new AssertionError(s"Quorum Controller leader not elected after $timeout ms")) } diff --git a/metadata/src/test/java/org/apache/kafka/controller/MockRaftClient.java b/metadata/src/test/java/org/apache/kafka/controller/MockRaftClient.java index 21e3a645530..8bf6d9543e7 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/MockRaftClient.java +++ b/metadata/src/test/java/org/apache/kafka/controller/MockRaftClient.java @@ -17,6 +17,8 @@ package org.apache.kafka.controller; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.network.ListenerName; import org.apache.kafka.common.protocol.ObjectSerializationCache; import org.apache.kafka.common.utils.BufferSupplier; import org.apache.kafka.common.utils.LogContext; @@ -758,6 +760,11 @@ public final class MockRaftClient implements RaftClient, A return OptionalInt.of(nodeId); } + @Override + public Optional voterNode(int id, ListenerName listenerName) { + return Optional.empty(); + } + public List> listeners() { final CompletableFuture>> future = new CompletableFuture<>(); eventQueue.append(() -> diff --git a/metadata/src/test/java/org/apache/kafka/image/publisher/SnapshotEmitterTest.java b/metadata/src/test/java/org/apache/kafka/image/publisher/SnapshotEmitterTest.java index cf123078eae..57fe845c98f 100644 --- a/metadata/src/test/java/org/apache/kafka/image/publisher/SnapshotEmitterTest.java +++ b/metadata/src/test/java/org/apache/kafka/image/publisher/SnapshotEmitterTest.java @@ -17,6 +17,8 @@ package org.apache.kafka.image.publisher; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.network.ListenerName; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.image.FakeSnapshotWriter; import org.apache.kafka.image.MetadataImageTest; @@ -73,6 +75,11 @@ public class SnapshotEmitterTest { return OptionalInt.empty(); } + @Override + public Optional voterNode(int id, ListenerName listenerName) { + return Optional.empty(); + } + @Override public long prepareAppend(int epoch, List records) { return 0; diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java index 343f4db6b02..6dc20026ca7 100644 --- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java +++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java @@ -3793,6 +3793,7 @@ public final class KafkaRaftClient implements RaftClient { } } + @Override public Optional voterNode(int id, ListenerName listenerName) { return partitionState.lastVoterSet().voterNode(id, listenerName); } diff --git a/raft/src/main/java/org/apache/kafka/raft/RaftClient.java b/raft/src/main/java/org/apache/kafka/raft/RaftClient.java index d2d59888178..95ed1905b67 100644 --- a/raft/src/main/java/org/apache/kafka/raft/RaftClient.java +++ b/raft/src/main/java/org/apache/kafka/raft/RaftClient.java @@ -16,7 +16,9 @@ */ package org.apache.kafka.raft; +import org.apache.kafka.common.Node; import org.apache.kafka.common.errors.ApiException; +import org.apache.kafka.common.network.ListenerName; import org.apache.kafka.raft.errors.BufferAllocationException; import org.apache.kafka.raft.errors.NotLeaderException; import org.apache.kafka.server.common.KRaftVersion; @@ -129,6 +131,15 @@ public interface RaftClient extends AutoCloseable { */ OptionalInt nodeId(); + /** + * Returns the node information for a given voter id and listener. + * + * @param id the id of the voter + * @param listenerName the name of the listener + * @return the node information if it exists, otherwise {@code Optional.empty()} + */ + Optional voterNode(int id, ListenerName listenerName); + /** * Prepare a list of records to be appended to the log. * diff --git a/raft/src/main/java/org/apache/kafka/raft/RaftManager.java b/raft/src/main/java/org/apache/kafka/raft/RaftManager.java new file mode 100644 index 00000000000..1e69c365840 --- /dev/null +++ b/raft/src/main/java/org/apache/kafka/raft/RaftManager.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.raft; + +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.common.requests.RequestContext; +import org.apache.kafka.common.requests.RequestHeader; +import org.apache.kafka.server.common.serialization.RecordSerde; + +import java.util.concurrent.CompletableFuture; + +public interface RaftManager { + + CompletableFuture handleRequest( + RequestContext context, + RequestHeader header, + ApiMessage request, + long createdTimeMs + ); + + /** + * Returns a Raft client. + *

+ * Always returns the same instance. Callers must NOT close it. + */ + RaftClient client(); + + /** + * Returns a replicated log. + *

+ * Always returns the same instance. Callers must NOT close it. + */ + ReplicatedLog replicatedLog(); + + /** + * Returns the record Serde. + */ + RecordSerde recordSerde(); +}