KAFKA-15854: Move Java classes from `kafka.server` to the `server` module (#14796)

We only move Java classes that have minimal or no dependencies on Scala classes in this PR.

Details:
* Configured `server` module in build files.
* Changed `ControllerRequestCompletionHandler` to be an interface since it has no implementations.
* Cleaned up various import control files.
* Minor build clean-ups for `server-common`.
* Disabled `testAssignmentAggregation` when executed with Java 8, this is an existing issue (see #14794).

For broader context on this change, please check:
* KAFKA-15852: Move server code from `core` to `server` module

Reviewers: Divij Vaidya <diviv@amazon.com>
This commit is contained in:
Ismael Juma 2023-11-19 22:09:19 -08:00 committed by GitHub
parent 066635819a
commit df78204e05
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
40 changed files with 268 additions and 102 deletions

View File

@ -842,6 +842,62 @@ tasks.create(name: "jarConnect", dependsOn: connectPkgs.collect { it + ":jar" })
tasks.create(name: "testConnect", dependsOn: connectPkgs.collect { it + ":test" }) {}
project(':server') {
archivesBaseName = "kafka-server"
dependencies {
implementation project(':clients')
implementation project(':server-common')
implementation libs.slf4jApi
compileOnly libs.log4j
testImplementation project(':clients').sourceSets.test.output
testImplementation libs.mockitoCore
testImplementation libs.junitJupiter
testImplementation libs.slf4jlog4j
}
task createVersionFile() {
def receiptFile = file("$buildDir/kafka/$buildVersionFileName")
inputs.property "commitId", commitId
inputs.property "version", version
outputs.file receiptFile
doLast {
def data = [
commitId: commitId,
version: version,
]
receiptFile.parentFile.mkdirs()
def content = data.entrySet().collect { "$it.key=$it.value" }.sort().join("\n")
receiptFile.setText(content, "ISO-8859-1")
}
}
jar {
dependsOn createVersionFile
from("$buildDir") {
include "kafka/$buildVersionFileName"
}
}
clean.doFirst {
delete "$buildDir/kafka/"
}
checkstyle {
configProperties = checkstyleConfigProperties("import-control-server.xml")
}
javadoc {
enabled = false
}
}
project(':core') {
apply plugin: 'scala'
@ -873,7 +929,7 @@ project(':core') {
implementation project(':tools:tools-api')
implementation project(':raft')
implementation project(':storage')
implementation project(':server')
implementation libs.argparse4j
implementation libs.commonsValidator
@ -912,6 +968,7 @@ project(':core') {
testImplementation project(':raft').sourceSets.test.output
testImplementation project(':server-common').sourceSets.test.output
testImplementation project(':storage:storage-api').sourceSets.test.output
testImplementation project(':server').sourceSets.test.output
testImplementation libs.bcpkix
testImplementation libs.mockitoCore
testImplementation(libs.apacheda) {
@ -1643,19 +1700,6 @@ project(':server-common') {
}
}
sourceSets {
main {
java {
srcDirs = ["src/main/java"]
}
}
test {
java {
srcDirs = ["src/test/java"]
}
}
}
jar {
dependsOn createVersionFile
from("$buildDir") {
@ -1670,6 +1714,10 @@ project(':server-common') {
checkstyle {
configProperties = checkstyleConfigProperties("import-control-server-common.xml")
}
javadoc {
enabled = false
}
}
project(':storage:storage-api') {

View File

@ -27,16 +27,11 @@
<allow pkg="javax.management" />
<allow pkg="org.slf4j" />
<allow pkg="org.junit" />
<allow pkg="org.opentest4j" />
<allow pkg="org.hamcrest" />
<allow pkg="org.mockito" />
<allow pkg="org.easymock" />
<allow pkg="org.powermock" />
<allow pkg="java.security" />
<allow pkg="javax.net.ssl" />
<allow pkg="javax.security" />
<allow pkg="javax.crypto" />
<allow pkg="org.ietf.jgss" />
<allow pkg="net.jqwik.api" />
<!-- no one depends on the server -->

View File

@ -27,15 +27,11 @@
<allow pkg="javax.management" />
<allow pkg="org.slf4j" />
<allow pkg="org.junit" />
<allow pkg="org.opentest4j" />
<allow pkg="org.hamcrest" />
<allow pkg="org.mockito" />
<allow pkg="org.easymock" />
<allow pkg="org.powermock" />
<allow pkg="java.security" />
<allow pkg="javax.net.ssl" />
<allow pkg="javax.security" />
<allow pkg="org.ietf.jgss" />
<allow pkg="net.jqwik.api" />
<!-- no one depends on the server -->

View File

@ -0,0 +1,61 @@
<!DOCTYPE import-control PUBLIC
"-//Puppy Crawl//DTD Import Control 1.1//EN"
"http://www.puppycrawl.com/dtds/import_control_1_1.dtd">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<import-control pkg="org.apache.kafka.server">
<!-- THINK HARD ABOUT THE LAYERING OF THE PROJECT BEFORE CHANGING THIS FILE -->
<!-- common library dependencies -->
<allow pkg="java" />
<allow pkg="javax.management" />
<allow pkg="org.slf4j" />
<allow pkg="org.junit" />
<allow pkg="org.mockito" />
<allow pkg="java.security" />
<allow pkg="javax.net.ssl" />
<allow pkg="javax.security" />
<allow pkg="net.jqwik.api" />
<!-- no one depends on the server -->
<disallow pkg="kafka" />
<!-- anyone can use public classes -->
<allow pkg="org.apache.kafka.common" exact-match="true" />
<allow pkg="org.apache.kafka.common.config" />
<allow pkg="org.apache.kafka.common.security" />
<allow pkg="org.apache.kafka.common.serialization" />
<allow pkg="org.apache.kafka.common.utils" />
<allow pkg="org.apache.kafka.common.errors" exact-match="true" />
<allow pkg="org.apache.kafka.common.memory" />
<!-- protocol, records and request/response utilities -->
<allow pkg="org.apache.kafka.clients" />
<allow pkg="org.apache.kafka.common.message" />
<allow pkg="org.apache.kafka.common.protocol" />
<allow pkg="org.apache.kafka.common.requests" />
<!-- utilities and reusable classes from server-common -->
<allow pkg="org.apache.kafka.queue" />
<allow pkg="org.apache.kafka.server.common" />
<!-- persistent collection factories/non-library-specific wrappers -->
<allow pkg="org.apache.kafka.server.immutable" exact-match="true" />
</import-control>

View File

@ -27,15 +27,11 @@
<allow pkg="javax.management" />
<allow pkg="org.slf4j" />
<allow pkg="org.junit" />
<allow pkg="org.opentest4j" />
<allow pkg="org.hamcrest" />
<allow pkg="org.mockito" />
<allow pkg="org.easymock" />
<allow pkg="org.powermock" />
<allow pkg="java.security" />
<allow pkg="javax.net.ssl" />
<allow pkg="javax.security" />
<allow pkg="org.ietf.jgss" />
<allow pkg="net.jqwik.api" />
<!-- no one depends on the server -->

View File

@ -22,7 +22,6 @@ import kafka.network.RequestChannel;
import kafka.server.ApiVersionManager;
import kafka.server.AutoTopicCreationManager;
import kafka.server.BrokerTopicStats;
import kafka.server.ClientMetricsManager;
import kafka.server.DelegationTokenManager;
import kafka.server.FetchManager;
import kafka.server.KafkaApis;
@ -35,6 +34,7 @@ import kafka.server.metadata.ConfigRepository;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.coordinator.group.GroupCoordinator;
import org.apache.kafka.server.ClientMetricsManager;
import org.apache.kafka.server.authorizer.Authorizer;
import java.util.Collections;

View File

@ -17,7 +17,6 @@
package kafka.coordinator.transaction
import kafka.coordinator.transaction.ProducerIdManager.{IterationLimit, NoRetry, RetryBackoffMs}
import kafka.server.{NodeToControllerChannelManager, ControllerRequestCompletionHandler}
import kafka.utils.Logging
import kafka.zk.{KafkaZkClient, ProducerIdBlockZNode}
import org.apache.kafka.clients.ClientResponse
@ -26,6 +25,7 @@ import org.apache.kafka.common.message.AllocateProducerIdsRequestData
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{AllocateProducerIdsRequest, AllocateProducerIdsResponse}
import org.apache.kafka.common.utils.Time
import org.apache.kafka.server.{ControllerRequestCompletionHandler, NodeToControllerChannelManager}
import org.apache.kafka.server.common.ProducerIdsBlock
import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong, AtomicReference}

View File

@ -34,6 +34,7 @@ import org.apache.kafka.common.requests.RequestHeader
import org.apache.kafka.common.requests.{AlterPartitionRequest, AlterPartitionResponse}
import org.apache.kafka.common.utils.Time
import org.apache.kafka.metadata.LeaderRecoveryState
import org.apache.kafka.server.{ControllerRequestCompletionHandler, NodeToControllerChannelManager}
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.util.Scheduler
@ -84,7 +85,7 @@ object AlterPartitionManager {
threadNamePrefix: String,
brokerEpochSupplier: () => Long,
): AlterPartitionManager = {
val channelManager = NodeToControllerChannelManager(
val channelManager = new NodeToControllerChannelManagerImpl(
controllerNodeProvider,
time = time,
metrics = metrics,

View File

@ -20,7 +20,6 @@ package kafka.server
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicReference
import java.util.{Collections, Properties}
import kafka.controller.KafkaController
import kafka.coordinator.transaction.TransactionCoordinator
import kafka.utils.Logging
@ -34,8 +33,10 @@ import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopi
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.{ApiError, CreateTopicsRequest, RequestContext, RequestHeader}
import org.apache.kafka.coordinator.group.GroupCoordinator
import org.apache.kafka.server.{ControllerRequestCompletionHandler, NodeToControllerChannelManager}
import scala.collection.{Map, Seq, Set, mutable}
import scala.compat.java8.OptionConverters._
import scala.jdk.CollectionConverters._
trait AutoTopicCreationManager {
@ -193,7 +194,7 @@ class DefaultAutoTopicCreationManager(
val request = metadataRequestContext.map { context =>
val requestVersion =
channelManager.controllerApiVersions() match {
channelManager.controllerApiVersions.asScala match {
case None =>
// We will rely on the Metadata request to be retried in the case
// that the latest version is not usable by the controller.

View File

@ -30,6 +30,7 @@ import org.apache.kafka.metadata.{BrokerState, VersionRange}
import org.apache.kafka.queue.EventQueue.DeadlineFunction
import org.apache.kafka.common.utils.{ExponentialBackoff, LogContext, Time}
import org.apache.kafka.queue.{EventQueue, KafkaEventQueue}
import org.apache.kafka.server.{ControllerRequestCompletionHandler, NodeToControllerChannelManager}
import java.util.{Comparator, OptionalLong}
import scala.jdk.CollectionConverters._

View File

@ -42,6 +42,7 @@ import org.apache.kafka.coordinator.group.{GroupCoordinator, GroupCoordinatorCon
import org.apache.kafka.image.publisher.MetadataPublisher
import org.apache.kafka.metadata.{BrokerState, ListenerInfo, VersionRange}
import org.apache.kafka.raft.RaftConfig
import org.apache.kafka.server.{AssignmentsManager, ClientMetricsManager, NodeToControllerChannelManager}
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.common.{ApiMessageAndVersion, DirectoryEventHandler, TopicIdPartition}
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
@ -222,7 +223,7 @@ class BrokerServer(
val controllerNodes = RaftConfig.voterConnectionsToNodes(voterConnections).asScala
val controllerNodeProvider = RaftControllerNodeProvider(raftManager, config, controllerNodes)
clientToControllerChannelManager = NodeToControllerChannelManager(
clientToControllerChannelManager = new NodeToControllerChannelManagerImpl(
controllerNodeProvider,
time,
metrics,
@ -279,7 +280,7 @@ class BrokerServer(
time
)
val assignmentsChannelManager = NodeToControllerChannelManager(
val assignmentsChannelManager = new NodeToControllerChannelManagerImpl(
controllerNodeProvider,
time,
metrics,
@ -357,7 +358,7 @@ class BrokerServer(
k -> VersionRange.of(v.min, v.max)
}.asJava
val brokerLifecycleChannelManager = NodeToControllerChannelManager(
val brokerLifecycleChannelManager = new NodeToControllerChannelManagerImpl(
controllerNodeProvider,
time,
metrics,

View File

@ -33,6 +33,7 @@ import org.apache.kafka.common.config.internals.QuotaConfigs
import org.apache.kafka.common.metrics.Quota
import org.apache.kafka.common.metrics.Quota._
import org.apache.kafka.common.utils.Sanitizer
import org.apache.kafka.server.ClientMetricsManager
import org.apache.kafka.storage.internals.log.{LogConfig, ThrottledReplicaListValidator}
import org.apache.kafka.storage.internals.log.LogConfig.MessageFormatVersion

View File

@ -17,8 +17,6 @@
package kafka.server
import kafka.metrics.ClientMetricsConfigs
import java.util
import java.util.Properties
import org.apache.kafka.common.config.ConfigResource
@ -26,6 +24,7 @@ import org.apache.kafka.common.config.ConfigResource.Type.{BROKER, CLIENT_METRIC
import org.apache.kafka.controller.ConfigurationValidator
import org.apache.kafka.common.errors.{InvalidConfigurationException, InvalidRequestException}
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.server.metrics.ClientMetricsConfigs
import org.apache.kafka.storage.internals.log.LogConfig
import scala.collection.mutable

View File

@ -31,6 +31,7 @@ import org.apache.kafka.image.{MetadataDelta, MetadataImage}
import org.apache.kafka.image.publisher.MetadataPublisher
import org.apache.kafka.queue.EventQueue.DeadlineFunction
import org.apache.kafka.queue.{EventQueue, KafkaEventQueue}
import org.apache.kafka.server.{ControllerRequestCompletionHandler, NodeToControllerChannelManager}
import org.apache.kafka.server.common.MetadataVersion
import scala.jdk.CollectionConverters._

View File

@ -44,6 +44,7 @@ import org.apache.kafka.metadata.bootstrap.BootstrapMetadata
import org.apache.kafka.metadata.migration.{KRaftMigrationDriver, LegacyPropagator}
import org.apache.kafka.metadata.publisher.FeaturesPublisher
import org.apache.kafka.raft.RaftConfig
import org.apache.kafka.server.{ClientMetricsManager, NodeToControllerChannelManager}
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.common.ApiMessageAndVersion
import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics}
@ -432,7 +433,7 @@ class ControllerServer(
* Start the KIP-919 controller registration manager.
*/
val controllerNodeProvider = RaftControllerNodeProvider(raftManager, config, controllerNodes.asScala)
registrationChannelManager = NodeToControllerChannelManager(
registrationChannelManager = new NodeToControllerChannelManagerImpl(
controllerNodeProvider,
time,
metrics,

View File

@ -18,13 +18,13 @@
package kafka.server
import java.nio.ByteBuffer
import kafka.network.RequestChannel
import kafka.utils.Logging
import org.apache.kafka.clients.{ClientResponse, NodeApiVersions}
import org.apache.kafka.common.errors.TimeoutException
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, EnvelopeRequest, EnvelopeResponse, RequestContext, RequestHeader}
import org.apache.kafka.server.{ControllerRequestCompletionHandler, NodeToControllerChannelManager}
import scala.compat.java8.OptionConverters._
@ -165,7 +165,7 @@ class ForwardingManagerImpl(
}
override def controllerApiVersions: Option[NodeApiVersions] =
channelManager.controllerApiVersions()
channelManager.controllerApiVersions.asScala
private def parseResponse(
buffer: ByteBuffer,

View File

@ -67,6 +67,7 @@ import org.apache.kafka.common.security.token.delegation.{DelegationToken, Token
import org.apache.kafka.common.utils.{ProducerIdAndEpoch, Time}
import org.apache.kafka.common.{Node, TopicIdPartition, TopicPartition, Uuid}
import org.apache.kafka.coordinator.group.GroupCoordinator
import org.apache.kafka.server.ClientMetricsManager
import org.apache.kafka.server.authorizer._
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.MetadataVersion.{IBP_0_11_0_IV0, IBP_2_3_IV0}

View File

@ -32,6 +32,7 @@ import org.apache.kafka.common.security.token.delegation.internals.DelegationTok
import org.apache.kafka.common.utils.Time
import org.apache.kafka.coordinator.group.GroupCoordinator
import org.apache.kafka.metadata.BrokerState
import org.apache.kafka.server.NodeToControllerChannelManager
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics}
import org.apache.kafka.server.util.Scheduler

View File

@ -52,6 +52,7 @@ import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble.VerificationF
import org.apache.kafka.metadata.properties.{MetaProperties, MetaPropertiesEnsemble}
import org.apache.kafka.metadata.{BrokerState, MetadataRecordSerde, VersionRange}
import org.apache.kafka.raft.RaftConfig
import org.apache.kafka.server.NodeToControllerChannelManager
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.common.MetadataVersion._
import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion}
@ -338,7 +339,7 @@ class KafkaServer(
tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames)
credentialProvider = new CredentialProvider(ScramMechanism.mechanismNames, tokenCache)
clientToControllerChannelManager = NodeToControllerChannelManager(
clientToControllerChannelManager = new NodeToControllerChannelManagerImpl(
controllerNodeProvider = controllerNodeProvider,
time = time,
metrics = metrics,
@ -428,7 +429,7 @@ class KafkaServer(
)
val controllerNodes = RaftConfig.voterConnectionsToNodes(controllerQuorumVotersFuture.get()).asScala
val quorumControllerNodeProvider = RaftControllerNodeProvider(raftManager, config, controllerNodes)
val brokerToQuorumChannelManager = NodeToControllerChannelManager(
val brokerToQuorumChannelManager = new NodeToControllerChannelManagerImpl(
controllerNodeProvider = quorumControllerNodeProvider,
time = time,
metrics = metrics,

View File

@ -31,10 +31,12 @@ import org.apache.kafka.common.requests.AbstractRequest
import org.apache.kafka.common.security.JaasContext
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.{LogContext, Time}
import org.apache.kafka.server.{ControllerRequestCompletionHandler, NodeToControllerChannelManager}
import org.apache.kafka.server.common.ApiMessageAndVersion
import org.apache.kafka.server.util.{InterBrokerSendThread, RequestAndCompletionHandler}
import java.util
import java.util.Optional
import scala.collection.Seq
import scala.compat.java8.OptionConverters._
import scala.jdk.CollectionConverters._
@ -130,38 +132,6 @@ class RaftControllerNodeProvider(
listenerName, securityProtocol, saslMechanism, isZkController = false)
}
object NodeToControllerChannelManager {
def apply(
controllerNodeProvider: ControllerNodeProvider,
time: Time,
metrics: Metrics,
config: KafkaConfig,
channelName: String,
threadNamePrefix: String,
retryTimeoutMs: Long
): NodeToControllerChannelManager = {
new NodeToControllerChannelManagerImpl(
controllerNodeProvider,
time,
metrics,
config,
channelName,
threadNamePrefix,
retryTimeoutMs
)
}
}
trait NodeToControllerChannelManager {
def start(): Unit
def shutdown(): Unit
def controllerApiVersions(): Option[NodeApiVersions]
def sendRequest(
request: AbstractRequest.Builder[_ <: AbstractRequest],
callback: ControllerRequestCompletionHandler
): Unit
}
/**
* This class manages the connection between a broker and the controller. It runs a single
* [[NodeToControllerRequestThread]] which uses the broker's metadata cache as its own metadata to find
@ -270,22 +240,13 @@ class NodeToControllerChannelManagerImpl(
))
}
def controllerApiVersions(): Option[NodeApiVersions] = {
def controllerApiVersions(): Optional[NodeApiVersions] = {
requestThread.activeControllerAddress().flatMap { activeController =>
Option(apiVersions.get(activeController.idString))
}
}.asJava
}
}
abstract class ControllerRequestCompletionHandler extends RequestCompletionHandler {
/**
* Fire when the request transmission time passes the caller defined deadline on the channel queue.
* It covers the total waiting time including retries which might be the result of individual request timeout.
*/
def onTimeout(): Unit
}
case class NodeToControllerQueueItem(
createdTimeMs: Long,
request: AbstractRequest.Builder[_ <: AbstractRequest],

View File

@ -17,7 +17,7 @@
package kafka.zk
import kafka.security.authorizer.AclEntry.{WildcardHost, WildcardPrincipalString}
import kafka.server.{ConfigType, ControllerRequestCompletionHandler, KafkaConfig}
import kafka.server.{ConfigType, KafkaConfig}
import kafka.test.{ClusterConfig, ClusterGenerator, ClusterInstance}
import kafka.test.annotation.{AutoStart, ClusterConfigProperty, ClusterTemplate, ClusterTest, Type}
import kafka.test.junit.ClusterTestExtensions
@ -45,6 +45,7 @@ import org.apache.kafka.image.{MetadataDelta, MetadataImage, MetadataProvenance}
import org.apache.kafka.metadata.authorizer.StandardAcl
import org.apache.kafka.metadata.migration.ZkMigrationLeadershipState
import org.apache.kafka.raft.RaftConfig
import org.apache.kafka.server.ControllerRequestCompletionHandler
import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion, ProducerIdsBlock}
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertNotEquals, assertNotNull, assertTrue, fail}
import org.junit.jupiter.api.{Assumptions, Timeout}

View File

@ -31,6 +31,7 @@ import org.apache.kafka.common.requests.{AbstractRequest, EnvelopeRequest, Envel
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
import org.apache.kafka.common.utils.MockTime
import org.apache.kafka.server.ControllerRequestCompletionHandler
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
import org.mockito.Mockito._

View File

@ -51,6 +51,7 @@ import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.replica.ClientMetadata
import org.apache.kafka.common.replica.ClientMetadata.DefaultClientMetadata
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.server.{ControllerRequestCompletionHandler, NodeToControllerChannelManager}
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.MetadataVersion.IBP_2_6_IV0
import org.apache.kafka.server.metrics.KafkaYammerMetrics

View File

@ -17,7 +17,6 @@
package kafka.coordinator.transaction
import kafka.coordinator.transaction.ProducerIdManager.RetryBackoffMs
import kafka.server.NodeToControllerChannelManager
import kafka.utils.TestUtils
import kafka.zk.{KafkaZkClient, ProducerIdBlockZNode}
import org.apache.kafka.common.KafkaException
@ -26,6 +25,7 @@ import org.apache.kafka.common.message.AllocateProducerIdsResponseData
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.AllocateProducerIdsResponse
import org.apache.kafka.common.utils.{MockTime, Time}
import org.apache.kafka.server.NodeToControllerChannelManager
import org.apache.kafka.server.common.ProducerIdsBlock
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test

View File

@ -33,6 +33,7 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.RequestHeader
import org.apache.kafka.common.requests.{AbstractRequest, AlterPartitionRequest, AlterPartitionResponse}
import org.apache.kafka.metadata.LeaderRecoveryState
import org.apache.kafka.server.{ControllerRequestCompletionHandler, NodeToControllerChannelManager}
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.MetadataVersion.{IBP_2_7_IV2, IBP_3_2_IV0, IBP_3_5_IV1}
import org.apache.kafka.server.util.{MockScheduler, MockTime}

View File

@ -21,7 +21,6 @@ import java.net.InetAddress
import java.nio.ByteBuffer
import java.util.concurrent.atomic.AtomicBoolean
import java.util.{Collections, Optional, Properties}
import kafka.controller.KafkaController
import kafka.coordinator.transaction.TransactionCoordinator
import kafka.utils.TestUtils
@ -38,6 +37,7 @@ import org.apache.kafka.common.requests._
import org.apache.kafka.common.security.auth.{KafkaPrincipal, KafkaPrincipalSerde, SecurityProtocol}
import org.apache.kafka.common.utils.{SecurityUtils, Utils}
import org.apache.kafka.coordinator.group.GroupCoordinator
import org.apache.kafka.server.{ControllerRequestCompletionHandler, NodeToControllerChannelManager}
import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, assertTrue}
import org.junit.jupiter.api.{BeforeEach, Test}
import org.mockito.ArgumentMatchers.any
@ -327,7 +327,7 @@ class AutoTopicCreationManagerTest {
.setMinVersion(0)
.setMaxVersion(0)
Mockito.when(brokerToController.controllerApiVersions())
.thenReturn(Some(NodeApiVersions.create(Collections.singleton(createTopicApiVersion))))
.thenReturn(Optional.of(NodeApiVersions.create(Collections.singleton(createTopicApiVersion))))
Mockito.when(controller.isActive).thenReturn(false)

View File

@ -31,6 +31,7 @@ import org.apache.kafka.common.requests._
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.{Node, Uuid}
import org.apache.kafka.server.{ControllerRequestCompletionHandler, NodeToControllerChannelManager}
import org.apache.kafka.server.common.MetadataVersion
import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows}
import org.junit.jupiter.api.extension.ExtendWith
@ -47,7 +48,7 @@ import java.util.concurrent.{CompletableFuture, TimeUnit, TimeoutException}
class BrokerRegistrationRequestTest {
def brokerToControllerChannelManager(clusterInstance: ClusterInstance): NodeToControllerChannelManager = {
NodeToControllerChannelManager(
new NodeToControllerChannelManagerImpl(
new ControllerNodeProvider() {
def node: Option[Node] = Some(new Node(
clusterInstance.anyControllerSocketServer().config.nodeId,

View File

@ -17,12 +17,12 @@
package kafka.server
import kafka.metrics.ClientMetricsConfigs
import kafka.utils.TestUtils
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.config.ConfigResource.Type.{BROKER, BROKER_LOGGER, CLIENT_METRICS, TOPIC}
import org.apache.kafka.common.config.TopicConfig.{SEGMENT_BYTES_CONFIG, SEGMENT_JITTER_MS_CONFIG, SEGMENT_MS_CONFIG}
import org.apache.kafka.common.errors.{InvalidConfigurationException, InvalidRequestException, InvalidTopicException}
import org.apache.kafka.server.metrics.ClientMetricsConfigs
import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows}
import org.junit.jupiter.api.Test

View File

@ -28,7 +28,6 @@ import kafka.cluster.{Broker, Partition}
import kafka.controller.{ControllerContext, KafkaController}
import kafka.coordinator.transaction.{InitProducerIdResult, TransactionCoordinator}
import kafka.log.UnifiedLog
import kafka.metrics.ClientMetricsTestUtils
import kafka.network.{RequestChannel, RequestMetrics}
import kafka.server.QuotaFactory.QuotaManagers
import kafka.server.metadata.{ConfigRepository, KRaftMetadataCache, MockConfigRepository, ZkMetadataCache}
@ -98,6 +97,7 @@ import org.apache.kafka.common.message.OffsetDeleteResponseData.{OffsetDeleteRes
import org.apache.kafka.coordinator.group.GroupCoordinator
import org.apache.kafka.server.common.{Features, MetadataVersion}
import org.apache.kafka.server.common.MetadataVersion.{IBP_0_10_2_IV0, IBP_2_2_IV1}
import org.apache.kafka.server.metrics.ClientMetricsTestUtils
import org.apache.kafka.server.util.MockTime
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchParams, FetchPartitionData, LogConfig}

View File

@ -19,8 +19,11 @@ package kafka.server
import org.apache.kafka.clients.{ClientResponse, MockClient, NodeApiVersions}
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.AbstractRequest
import org.apache.kafka.server.{ControllerRequestCompletionHandler, NodeToControllerChannelManager}
import org.apache.kafka.server.util.MockTime
import java.util.Optional
class MockNodeToControllerChannelManager(
val client: MockClient,
time: MockTime,
@ -48,8 +51,8 @@ class MockNodeToControllerChannelManager(
))
}
override def controllerApiVersions(): Option[NodeApiVersions] = {
Some(controllerApiVersions)
override def controllerApiVersions(): Optional[NodeApiVersions] = {
Optional.of(controllerApiVersions)
}
private[server] def handleResponse(request: NodeToControllerQueueItem)(response: ClientResponse): Unit = {

View File

@ -71,6 +71,7 @@ import org.apache.kafka.common.utils.Utils._
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.controller.QuorumController
import org.apache.kafka.metadata.properties.MetaProperties
import org.apache.kafka.server.ControllerRequestCompletionHandler
import org.apache.kafka.server.authorizer.{AuthorizableRequestContext, Authorizer => JAuthorizer}
import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion}
import org.apache.kafka.server.metrics.KafkaYammerMetrics

View File

@ -15,7 +15,7 @@
* limitations under the License.
*/
package kafka.server;
package org.apache.kafka.server;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.common.Uuid;
@ -249,7 +249,7 @@ public class AssignmentsManager {
/**
* Callback for a {@link AssignReplicasToDirsRequest}.
*/
private class AssignReplicasToDirsRequestCompletionHandler extends ControllerRequestCompletionHandler {
private class AssignReplicasToDirsRequestCompletionHandler implements ControllerRequestCompletionHandler {
@Override
public void onTimeout() {
log.warn("Request to controller timed out");

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server;
package org.apache.kafka.server;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

View File

@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server;
import org.apache.kafka.clients.RequestCompletionHandler;
public interface ControllerRequestCompletionHandler extends RequestCompletionHandler {
/**
* Fire when the request transmission time passes the caller defined deadline on the channel queue.
* It covers the total waiting time including retries which might be the result of individual request timeout.
*/
void onTimeout();
}

View File

@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server;
import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.common.requests.AbstractRequest;
import java.util.Optional;
public interface NodeToControllerChannelManager {
void start();
void shutdown();
Optional<NodeApiVersions> controllerApiVersions();
void sendRequest(
AbstractRequest.Builder<? extends AbstractRequest> request,
ControllerRequestCompletionHandler callback
);
}

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.metrics;
package org.apache.kafka.server.metrics;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;

View File

@ -0,0 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Server-specific functionality for brokers and controllers.
*/
package org.apache.kafka.server;

View File

@ -15,7 +15,7 @@
* limitations under the License.
*/
package kafka.server;
package org.apache.kafka.server;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.common.Uuid;
@ -31,6 +31,8 @@ import org.apache.kafka.server.common.TopicIdPartition;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledOnJre;
import org.junit.jupiter.api.condition.JRE;
import org.mockito.ArgumentCaptor;
import java.util.Arrays;
@ -129,6 +131,7 @@ public class AssignmentsManagerTest {
}
@Test
@DisabledOnJre(JRE.JAVA_8)
public void testAssignmentAggregation() throws InterruptedException {
CountDownLatch readyToAssert = new CountDownLatch(1);
doAnswer(invocation -> {

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.metrics;
package org.apache.kafka.server.metrics;
import java.util.Arrays;
import java.util.Collections;

View File

@ -65,6 +65,7 @@ include 'clients',
'log4j-appender',
'metadata',
'raft',
'server',
'server-common',
'shell',
'storage',