From ddd652c672b503de52b6cb2be20c29c7a5e6816f Mon Sep 17 00:00:00 2001 From: Colin Patrick McCabe Date: Thu, 16 Mar 2023 15:33:03 -0700 Subject: [PATCH] MINOR: Standardize KRaft logging, thread names, and terminology (#13390) Standardize KRaft thread names. - Always use kebab case. That is, "my-thread-name". - Thread prefixes are just strings, not Option[String] or Optional. If you don't want a prefix, use the empty string. - Thread prefixes end in a dash (except the empty prefix). Then you can calculate thread names as $prefix + "my-thread-name" - Broker-only components get "broker-$id-" as a thread name prefix. For example, "broker-1-" - Controller-only components get "controller-$id-" as a thread name prefix. For example, "controller-1-" - Shared components get "kafka-$id-" as a thread name prefix. For example, "kafka-0-" - Always pass a prefix to KafkaEventQueue, so that threads have names like "broker-0-metadata-loader-event-handler" rather than "event-handler". Prior to this PR, we had several threads just named "EventHandler" which was not helpful for debugging. - QuorumController thread name is "quorum-controller-123-event-handler" - Don't set a thread prefix for replication threads started by ReplicaManager. They run only on the broker, and already include the broker ID. Standardize KRaft slf4j log prefixes. - Names should be of the form "[ComponentName id=$id] ". So for a ControllerServer with ID 123, we will have "[ControllerServer id=123] " - For the QuorumController class, use the prefix "[QuorumController id=$id] " rather than "[Controller Long, ): AlterPartitionManager = { val channelManager = BrokerToControllerChannelManager( @@ -89,7 +89,7 @@ object AlterPartitionManager { time = time, metrics = metrics, config = config, - channelName = "alterPartition", + channelName = "alter-partition", threadNamePrefix = threadNamePrefix, retryTimeoutMs = Long.MaxValue ) diff --git a/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala b/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala index 9e42dfb4d90..346044c39e6 100644 --- a/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala +++ b/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala @@ -54,11 +54,21 @@ import scala.jdk.CollectionConverters._ class BrokerLifecycleManager( val config: KafkaConfig, val time: Time, - val threadNamePrefix: Option[String], + val threadNamePrefix: String, val isZkBroker: Boolean ) extends Logging { - val logContext = new LogContext(s"[BrokerLifecycleManager id=${config.nodeId}] ") + private def logPrefix(): String = { + val builder = new StringBuilder("[BrokerLifecycleManager") + builder.append(" id=").append(config.nodeId) + if (isZkBroker) { + builder.append(" isZkBroker=true") + } + builder.append("]") + builder.toString() + } + + val logContext = new LogContext(logPrefix()) this.logIdent = logContext.logPrefix() @@ -182,7 +192,7 @@ class BrokerLifecycleManager( */ private[server] val eventQueue = new KafkaEventQueue(time, logContext, - threadNamePrefix.getOrElse(""), + threadNamePrefix + "lifecycle-manager-", new ShutdownEvent()) /** diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala index 02ff680614c..191e3c45f02 100644 --- a/core/src/main/scala/kafka/server/BrokerServer.scala +++ b/core/src/main/scala/kafka/server/BrokerServer.scala @@ -64,7 +64,6 @@ class BrokerServer( val sharedServer: SharedServer, val initialOfflineDirs: Seq[String], ) extends KafkaBroker { - val threadNamePrefix = sharedServer.threadNamePrefix val config = sharedServer.brokerConfig val time = sharedServer.time def metrics = sharedServer.metrics @@ -172,7 +171,7 @@ class BrokerServer( lifecycleManager = new BrokerLifecycleManager(config, time, - threadNamePrefix, + s"broker-${config.nodeId}-", isZkBroker = false) /* start scheduler */ @@ -182,7 +181,8 @@ class BrokerServer( /* register broker metrics */ brokerTopicStats = new BrokerTopicStats - quotaManagers = QuotaFactory.instantiate(config, metrics, time, threadNamePrefix.getOrElse("")) + + quotaManagers = QuotaFactory.instantiate(config, metrics, time, s"broker-${config.nodeId}-") logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size) @@ -213,7 +213,7 @@ class BrokerServer( metrics, config, channelName = "forwarding", - threadNamePrefix, + s"broker-${config.nodeId}-", retryTimeoutMs = 60000 ) clientToControllerChannelManager.start() @@ -242,7 +242,7 @@ class BrokerServer( controllerNodeProvider, time = time, metrics, - threadNamePrefix, + s"broker-${config.nodeId}-", brokerEpochSupplier = () => lifecycleManager.brokerEpoch ) alterPartitionManager.start() @@ -261,8 +261,9 @@ class BrokerServer( brokerTopicStats = brokerTopicStats, isShuttingDown = isShuttingDown, zkClient = None, - threadNamePrefix = threadNamePrefix, - brokerEpochSupplier = () => lifecycleManager.brokerEpoch) + threadNamePrefix = None, // The ReplicaManager only runs on the broker, and already includes the ID in thread names. + brokerEpochSupplier = () => lifecycleManager.brokerEpoch + ) /* start token manager */ if (config.tokenAuthEnabled) { @@ -321,7 +322,7 @@ class BrokerServer( metrics, config, "heartbeat", - threadNamePrefix, + s"broker-${config.nodeId}-", config.brokerSessionTimeoutMs / 2 // KAFKA-14392 ) lifecycleManager.start( diff --git a/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala b/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala index 3c2a6a2acbd..2d259c8a2f4 100644 --- a/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala +++ b/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala @@ -136,7 +136,7 @@ object BrokerToControllerChannelManager { metrics: Metrics, config: KafkaConfig, channelName: String, - threadNamePrefix: Option[String], + threadNamePrefix: String, retryTimeoutMs: Long ): BrokerToControllerChannelManager = { new BrokerToControllerChannelManagerImpl( @@ -174,10 +174,10 @@ class BrokerToControllerChannelManagerImpl( metrics: Metrics, config: KafkaConfig, channelName: String, - threadNamePrefix: Option[String], + threadNamePrefix: String, retryTimeoutMs: Long ) extends BrokerToControllerChannelManager with Logging { - private val logContext = new LogContext(s"[BrokerToControllerChannelManager broker=${config.brokerId} name=$channelName] ") + private val logContext = new LogContext(s"[BrokerToControllerChannelManager id=${config.brokerId} name=${channelName}] ") private val manualMetadataUpdater = new ManualMetadataUpdater() private val apiVersions = new ApiVersions() private val requestThread = newRequestThread @@ -236,10 +236,7 @@ class BrokerToControllerChannelManagerImpl( logContext ) } - val threadName = threadNamePrefix match { - case None => s"BrokerToControllerChannelManager broker=${config.brokerId} name=$channelName" - case Some(name) => s"$name:BrokerToControllerChannelManager broker=${config.brokerId} name=$channelName" - } + val threadName = s"${threadNamePrefix}to-controller-${channelName}-channel-manager" val controllerInformation = controllerNodeProvider.getControllerInfo() new BrokerToControllerRequestThread( diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala b/core/src/main/scala/kafka/server/ControllerServer.scala index 5ba4fc3dcb8..613ac524cd3 100644 --- a/core/src/main/scala/kafka/server/ControllerServer.scala +++ b/core/src/main/scala/kafka/server/ControllerServer.scala @@ -90,7 +90,6 @@ class ControllerServer( val config = sharedServer.controllerConfig val time = sharedServer.time def metrics = sharedServer.metrics - val threadNamePrefix = sharedServer.threadNamePrefix.getOrElse("") def raftManager: KafkaRaftManager[ApiMessageAndVersion] = sharedServer.raftManager val lock = new ReentrantLock() @@ -131,11 +130,11 @@ class ControllerServer( if (!maybeChangeStatus(SHUTDOWN, STARTING)) return val startupDeadline = Deadline.fromDelay(time, config.serverMaxStartupTimeMs, TimeUnit.MILLISECONDS) try { + this.logIdent = new LogContext(s"[ControllerServer id=${config.nodeId}] ").logPrefix() info("Starting controller") config.dynamicConfig.initialize(zkClientOpt = None) maybeChangeStatus(STARTING, STARTED) - this.logIdent = new LogContext(s"[ControllerServer id=${config.nodeId}] ").logPrefix() metricsGroup.newGauge("ClusterId", () => clusterId) metricsGroup.newGauge("yammer-metrics-count", () => KafkaYammerMetrics.defaultRegistry.allMetrics.size) @@ -218,7 +217,7 @@ class ControllerServer( new QuorumController.Builder(config.nodeId, sharedServer.metaProps.clusterId). setTime(time). - setThreadNamePrefix(threadNamePrefix). + setThreadNamePrefix(s"quorum-controller-${config.nodeId}-"). setConfigSchema(configSchema). setRaftClient(raftManager.client). setQuorumFeatures(quorumFeatures). @@ -274,7 +273,7 @@ class ControllerServer( quotaManagers = QuotaFactory.instantiate(config, metrics, time, - threadNamePrefix) + s"controller-${config.nodeId}-") clientQuotaMetadataManager = new ClientQuotaMetadataManager(quotaManagers, socketServer.connectionQuotas) controllerApis = new ControllerApis(socketServer.dataPlaneRequestChannel, authorizer, diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index f7b44ce4942..44858e4b764 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -1718,7 +1718,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami distinctRoles } - def isKRaftCoResidentMode: Boolean = { + def isKRaftCombinedMode: Boolean = { processRoles == Set(BrokerRole, ControllerRole) } @@ -2280,8 +2280,8 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami validateControllerQuorumVotersMustContainNodeIdForKRaftController() validateControllerListenerExistsForKRaftController() validateControllerListenerNamesMustAppearInListenersForKRaftController() - } else if (isKRaftCoResidentMode) { - // KRaft colocated broker and controller + } else if (isKRaftCombinedMode) { + // KRaft combined broker and controller validateNonEmptyQuorumVotersForKRaft() validateControlPlaneListenerEmptyForKRaft() validateAdvertisedListenersDoesNotContainControllerListenersForKRaftBroker() diff --git a/core/src/main/scala/kafka/server/KafkaRaftServer.scala b/core/src/main/scala/kafka/server/KafkaRaftServer.scala index d31ee6db522..92d4274bcb6 100644 --- a/core/src/main/scala/kafka/server/KafkaRaftServer.scala +++ b/core/src/main/scala/kafka/server/KafkaRaftServer.scala @@ -48,7 +48,6 @@ import scala.jdk.CollectionConverters._ class KafkaRaftServer( config: KafkaConfig, time: Time, - threadNamePrefix: Option[String] ) extends Server with Logging { this.logIdent = s"[KafkaRaftServer nodeId=${config.nodeId}] " @@ -71,7 +70,6 @@ class KafkaRaftServer( metaProps, time, metrics, - threadNamePrefix, controllerQuorumVotersFuture, new StandardFaultHandlerFactory(), ) diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 9fed5955a7d..5e5995e5a66 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -311,7 +311,7 @@ class KafkaServer( metrics = metrics, config = config, channelName = "forwarding", - threadNamePrefix = threadNamePrefix, + s"zk-broker-${config.nodeId}-", retryTimeoutMs = config.requestTimeoutMs.longValue ) clientToControllerChannelManager.start() @@ -348,7 +348,7 @@ class KafkaServer( controllerNodeProvider, time = time, metrics = metrics, - threadNamePrefix = threadNamePrefix, + s"zk-broker-${config.nodeId}-", brokerEpochSupplier = brokerEpochSupplier ) } else { @@ -379,7 +379,7 @@ class KafkaServer( logger.info("Starting up additional components for ZooKeeper migration") lifecycleManager = new BrokerLifecycleManager(config, time, - threadNamePrefix, + s"zk-broker-${config.nodeId}-", isZkBroker = true) // If the ZK broker is in migration mode, start up a RaftManager to learn about the new KRaft controller @@ -406,7 +406,7 @@ class KafkaServer( metrics = metrics, config = config, channelName = "quorum", - threadNamePrefix = threadNamePrefix, + s"zk-broker-${config.nodeId}-", retryTimeoutMs = config.requestTimeoutMs.longValue ) diff --git a/core/src/main/scala/kafka/server/SharedServer.scala b/core/src/main/scala/kafka/server/SharedServer.scala index 554207d8c21..cb362248d6a 100644 --- a/core/src/main/scala/kafka/server/SharedServer.scala +++ b/core/src/main/scala/kafka/server/SharedServer.scala @@ -88,7 +88,6 @@ class SharedServer( val metaProps: MetaProperties, val time: Time, private val _metrics: Metrics, - val threadNamePrefix: Option[String], val controllerQuorumVotersFuture: CompletableFuture[util.Map[Integer, AddressSpec]], val faultHandlerFactory: FaultHandlerFactory ) extends Logging { @@ -243,7 +242,7 @@ class SharedServer( KafkaRaftServer.MetadataTopicId, time, metrics, - threadNamePrefix, + Some(s"kafka-${sharedServerConfig.nodeId}-raft"), // No dash expected at the end controllerQuorumVotersFuture, raftManagerFaultHandler ) @@ -252,7 +251,7 @@ class SharedServer( val loaderBuilder = new MetadataLoader.Builder(). setNodeId(metaProps.nodeId). setTime(time). - setThreadNamePrefix(threadNamePrefix.getOrElse("")). + setThreadNamePrefix(s"kafka-${sharedServerConfig.nodeId}-"). setFaultHandler(metadataLoaderFaultHandler). setHighWaterMarkAccessor(() => raftManager.client.highWatermark()) if (brokerMetrics != null) { @@ -270,6 +269,7 @@ class SharedServer( setMaxBytesSinceLastSnapshot(sharedServerConfig.metadataSnapshotMaxNewRecordBytes). setMaxTimeSinceLastSnapshotNs(TimeUnit.MILLISECONDS.toNanos(sharedServerConfig.metadataSnapshotMaxIntervalMs)). setDisabledReason(snapshotsDiabledReason). + setThreadNamePrefix(s"kafka-${sharedServerConfig.nodeId}-"). build() raftManager.register(loader) try { diff --git a/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java b/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java index e5702cf95fc..b053eaef251 100644 --- a/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java +++ b/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java @@ -68,13 +68,13 @@ public class RaftClusterInvocationContext implements TestTemplateInvocationConte private final ClusterConfig clusterConfig; private final AtomicReference clusterReference; private final AtomicReference zkReference; - private final boolean isCoResident; + private final boolean isCombined; - public RaftClusterInvocationContext(ClusterConfig clusterConfig, boolean isCoResident) { + public RaftClusterInvocationContext(ClusterConfig clusterConfig, boolean isCombined) { this.clusterConfig = clusterConfig; this.clusterReference = new AtomicReference<>(); this.zkReference = new AtomicReference<>(); - this.isCoResident = isCoResident; + this.isCombined = isCombined; } @Override @@ -82,7 +82,7 @@ public class RaftClusterInvocationContext implements TestTemplateInvocationConte String clusterDesc = clusterConfig.nameTags().entrySet().stream() .map(Object::toString) .collect(Collectors.joining(", ")); - return String.format("[%d] Type=Raft-%s, %s", invocationIndex, isCoResident ? "CoReside" : "Distributed", clusterDesc); + return String.format("[%d] Type=Raft-%s, %s", invocationIndex, isCombined ? "Combined" : "Isolated", clusterDesc); } @Override @@ -92,7 +92,7 @@ public class RaftClusterInvocationContext implements TestTemplateInvocationConte (BeforeTestExecutionCallback) context -> { TestKitNodes nodes = new TestKitNodes.Builder(). setBootstrapMetadataVersion(clusterConfig.metadataVersion()). - setCoResident(isCoResident). + setCombined(isCombined). setNumBrokerNodes(clusterConfig.numBrokers()). setNumControllerNodes(clusterConfig.numControllers()).build(); nodes.brokerNodes().forEach((brokerId, brokerNode) -> { diff --git a/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java b/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java index 38287e40053..e09ee49402f 100644 --- a/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java +++ b/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java @@ -218,19 +218,15 @@ public class KafkaClusterTestKit implements AutoCloseable { baseDirectory = TestUtils.tempDirectory(); nodes = nodes.copyWithAbsolutePaths(baseDirectory.getAbsolutePath()); executorService = Executors.newFixedThreadPool(numOfExecutorThreads, - ThreadUtils.createThreadFactory("KafkaClusterTestKit%d", false)); + ThreadUtils.createThreadFactory("kafka-cluster-test-kit-executor-%d", false)); for (ControllerNode node : nodes.controllerNodes().values()) { setupNodeDirectories(baseDirectory, node.metadataDirectory(), Collections.emptyList()); BootstrapMetadata bootstrapMetadata = BootstrapMetadata. fromVersion(nodes.bootstrapMetadataVersion(), "testkit"); - String threadNamePrefix = (nodes.brokerNodes().containsKey(node.id())) ? - String.format("colocated%d", node.id()) : - String.format("controller%d", node.id()); SharedServer sharedServer = new SharedServer(createNodeConfig(node), MetaProperties.apply(nodes.clusterId().toString(), node.id()), Time.SYSTEM, new Metrics(), - Option.apply(threadNamePrefix), connectFutureManager.future, faultHandlerFactory); ControllerServer controller = null; @@ -261,7 +257,6 @@ public class KafkaClusterTestKit implements AutoCloseable { MetaProperties.apply(nodes.clusterId().toString(), id), Time.SYSTEM, new Metrics(), - Option.apply(String.format("broker%d_", id)), connectFutureManager.future, faultHandlerFactory)); BrokerServer broker = null; @@ -304,7 +299,7 @@ public class KafkaClusterTestKit implements AutoCloseable { } private String listeners(int node) { - if (nodes.isCoResidentNode(node)) { + if (nodes.isCombined(node)) { return "EXTERNAL://localhost:0,CONTROLLER://localhost:0"; } if (nodes.controllerNodes().containsKey(node)) { @@ -314,7 +309,7 @@ public class KafkaClusterTestKit implements AutoCloseable { } private String roles(int node) { - if (nodes.isCoResidentNode(node)) { + if (nodes.isCombined(node)) { return "broker,controller"; } if (nodes.controllerNodes().containsKey(node)) { diff --git a/core/src/test/java/kafka/testkit/TestKitNodes.java b/core/src/test/java/kafka/testkit/TestKitNodes.java index 14692ccc962..5bc36854494 100644 --- a/core/src/test/java/kafka/testkit/TestKitNodes.java +++ b/core/src/test/java/kafka/testkit/TestKitNodes.java @@ -33,7 +33,7 @@ import java.util.TreeMap; public class TestKitNodes { public static class Builder { - private boolean coResident = false; + private boolean combined = false; private Uuid clusterId = null; private MetadataVersion bootstrapMetadataVersion = null; private final NavigableMap controllerNodes = new TreeMap<>(); @@ -49,8 +49,8 @@ public class TestKitNodes { return this; } - public Builder setCoResident(boolean coResident) { - this.coResident = coResident; + public Builder setCombined(boolean combined) { + this.combined = combined; return this; } @@ -127,7 +127,7 @@ public class TestKitNodes { } private int startControllerId() { - if (coResident) { + if (combined) { return startBrokerId(); } return startBrokerId() + 3000; @@ -139,7 +139,7 @@ public class TestKitNodes { private final NavigableMap controllerNodes; private final NavigableMap brokerNodes; - public boolean isCoResidentNode(int node) { + public boolean isCombined(int node) { return controllerNodes.containsKey(node) && brokerNodes.containsKey(node); } diff --git a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala index e5783374024..34afeb7fc49 100644 --- a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala +++ b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala @@ -980,7 +980,7 @@ class KRaftClusterTest { val cluster = new KafkaClusterTestKit.Builder( new TestKitNodes.Builder(). setNumBrokerNodes(1). - setCoResident(combinedController). + setCombined(combinedController). setNumControllerNodes(1).build()). setConfigProp("client.quota.callback.class", classOf[DummyClientQuotaCallback].getName). setConfigProp(DummyClientQuotaCallback.dummyClientQuotaCallbackValueConfigKey, "0"). @@ -1022,7 +1022,7 @@ class KRaftClusterTest { val cluster = new KafkaClusterTestKit.Builder( new TestKitNodes.Builder(). setNumBrokerNodes(1). - setCoResident(combinedMode). + setCombined(combinedMode). setNumControllerNodes(1).build()). setConfigProp("authorizer.class.name", classOf[FakeConfigurableAuthorizer].getName). build() diff --git a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala index 0b15e75f382..113df27147b 100755 --- a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala +++ b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala @@ -98,7 +98,6 @@ class KRaftQuorumImplementation( new MetaProperties(clusterId, config.nodeId), Time.SYSTEM, new Metrics(), - Option("Broker%02d_".format(config.nodeId)), controllerQuorumVotersFuture, faultHandlerFactory) var broker: BrokerServer = null @@ -316,7 +315,6 @@ abstract class QuorumTestHarness extends Logging { metaProperties, Time.SYSTEM, new Metrics(), - Option("Controller_" + testInfo.getDisplayName), controllerQuorumVotersFuture, faultHandlerFactory) var controllerServer: ControllerServer = null diff --git a/core/src/test/scala/unit/kafka/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/KafkaConfigTest.scala index fc4dd51a098..64332d999b3 100644 --- a/core/src/test/scala/unit/kafka/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/KafkaConfigTest.scala @@ -112,8 +112,8 @@ class KafkaTest { } @Test - def testColocatedRoleNodeIdValidation(): Unit = { - // Ensure that validation is happening at startup to check that colocated processes use their node.id as a voter in controller.quorum.voters + def testCombinedRoleNodeIdValidation(): Unit = { + // Ensure that validation is happening at startup to check that combined processes use their node.id as a voter in controller.quorum.voters val propertiesFile = new Properties propertiesFile.setProperty(KafkaConfig.ProcessRolesProp, "controller,broker") propertiesFile.setProperty(KafkaConfig.NodeIdProp, "1") @@ -127,6 +127,16 @@ class KafkaTest { KafkaConfig.fromProps(propertiesFile) } + @Test + def testIsKRaftCombinedMode(): Unit = { + val propertiesFile = new Properties + propertiesFile.setProperty(KafkaConfig.ProcessRolesProp, "controller,broker") + propertiesFile.setProperty(KafkaConfig.NodeIdProp, "1") + propertiesFile.setProperty(KafkaConfig.QuorumVotersProp, "1@localhost:9092") + val config = KafkaConfig.fromProps(propertiesFile) + assertTrue(config.isKRaftCombinedMode) + } + @Test def testMustContainQuorumVotersIfUsingProcessRoles(): Unit = { // Ensure that validation is happening at startup to check that if process.roles is set controller.quorum.voters is not empty diff --git a/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala b/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala index 304c987d3ae..9d93cb85941 100644 --- a/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala @@ -98,14 +98,14 @@ class BrokerLifecycleManagerTest { @Test def testCreateAndClose(): Unit = { val context = new BrokerLifecycleManagerTestContext(configProperties) - val manager = new BrokerLifecycleManager(context.config, context.time, None, isZkBroker = false) + val manager = new BrokerLifecycleManager(context.config, context.time, "create-and-close-", isZkBroker = false) manager.close() } @Test def testCreateStartAndClose(): Unit = { val context = new BrokerLifecycleManagerTestContext(configProperties) - val manager = new BrokerLifecycleManager(context.config, context.time, None, isZkBroker = false) + val manager = new BrokerLifecycleManager(context.config, context.time, "create-start-and-close-", isZkBroker = false) assertEquals(BrokerState.NOT_RUNNING, manager.state) manager.start(() => context.highestMetadataOffset.get(), context.mockChannelManager, context.clusterId, context.advertisedListeners, @@ -120,7 +120,7 @@ class BrokerLifecycleManagerTest { @Test def testSuccessfulRegistration(): Unit = { val context = new BrokerLifecycleManagerTestContext(configProperties) - val manager = new BrokerLifecycleManager(context.config, context.time, None, isZkBroker = false) + val manager = new BrokerLifecycleManager(context.config, context.time, "successful-registration-", isZkBroker = false) val controllerNode = new Node(3000, "localhost", 8021) context.controllerNodeProvider.node.set(controllerNode) context.mockClient.prepareResponseFrom(new BrokerRegistrationResponse( @@ -140,7 +140,7 @@ class BrokerLifecycleManagerTest { def testRegistrationTimeout(): Unit = { val context = new BrokerLifecycleManagerTestContext(configProperties) val controllerNode = new Node(3000, "localhost", 8021) - val manager = new BrokerLifecycleManager(context.config, context.time, None, isZkBroker = false) + val manager = new BrokerLifecycleManager(context.config, context.time, "registration-timeout-", isZkBroker = false) context.controllerNodeProvider.node.set(controllerNode) def newDuplicateRegistrationResponse(): Unit = { context.mockClient.prepareResponseFrom(new BrokerRegistrationResponse( @@ -181,7 +181,7 @@ class BrokerLifecycleManagerTest { @Test def testControlledShutdown(): Unit = { val context = new BrokerLifecycleManagerTestContext(configProperties) - val manager = new BrokerLifecycleManager(context.config, context.time, None, isZkBroker = false) + val manager = new BrokerLifecycleManager(context.config, context.time, "controlled-shutdown-", isZkBroker = false) val controllerNode = new Node(3000, "localhost", 8021) context.controllerNodeProvider.node.set(controllerNode) context.mockClient.prepareResponseFrom(new BrokerRegistrationResponse( diff --git a/core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala b/core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala index 09c74d4e155..25594c2de13 100644 --- a/core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala @@ -69,7 +69,7 @@ class BrokerRegistrationRequestTest { new Metrics(), clusterInstance.anyControllerSocketServer().config, "heartbeat", - Some("heartbeat"), + "test-heartbeat-", 10000 ) } diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java index 71a1dbc4f72..fd000da9e6f 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java +++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java @@ -316,10 +316,10 @@ public final class QuorumController implements Controller { } if (threadNamePrefix == null) { - threadNamePrefix = String.format("Node%d_", nodeId); + threadNamePrefix = String.format("quorum-controller-%d-", nodeId); } if (logContext == null) { - logContext = new LogContext(String.format("[Controller %d] ", nodeId)); + logContext = new LogContext(String.format("[QuorumController id=%d] ", nodeId)); } if (controllerMetrics == null) { controllerMetrics = (ControllerMetrics) Class.forName( @@ -328,7 +328,7 @@ public final class QuorumController implements Controller { KafkaEventQueue queue = null; try { - queue = new KafkaEventQueue(time, logContext, threadNamePrefix + "QuorumController"); + queue = new KafkaEventQueue(time, logContext, threadNamePrefix); return new QuorumController( fatalFaultHandler, logContext, diff --git a/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java b/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java index d204ac32ed9..ac34d1fa4a5 100644 --- a/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java +++ b/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java @@ -64,9 +64,9 @@ import static java.util.concurrent.TimeUnit.NANOSECONDS; public class MetadataLoader implements RaftClient.Listener, AutoCloseable { public static class Builder { private int nodeId = -1; + private String threadNamePrefix = ""; private Time time = Time.SYSTEM; private LogContext logContext = null; - private String threadNamePrefix = ""; private FaultHandler faultHandler = (m, e) -> new FaultHandlerException(m, e); private MetadataLoaderMetrics metrics = new MetadataLoaderMetrics() { private volatile long lastAppliedOffset = -1L; @@ -97,13 +97,13 @@ public class MetadataLoader implements RaftClient.Listener return this; } - public Builder setTime(Time time) { - this.time = time; + public Builder setThreadNamePrefix(String threadNamePrefix) { + this.threadNamePrefix = threadNamePrefix; return this; } - public Builder setThreadNamePrefix(String threadNamePrefix) { - this.threadNamePrefix = threadNamePrefix; + public Builder setTime(Time time) { + this.time = time; return this; } @@ -124,7 +124,7 @@ public class MetadataLoader implements RaftClient.Listener public MetadataLoader build() { if (logContext == null) { - logContext = new LogContext("[MetadataLoader " + nodeId + "] "); + logContext = new LogContext("[MetadataLoader id=" + nodeId + "] "); } if (highWaterMarkAccessor == null) { throw new RuntimeException("You must set the high water mark accessor."); @@ -132,6 +132,7 @@ public class MetadataLoader implements RaftClient.Listener return new MetadataLoader( time, logContext, + nodeId, threadNamePrefix, faultHandler, metrics, @@ -197,6 +198,7 @@ public class MetadataLoader implements RaftClient.Listener private MetadataLoader( Time time, LogContext logContext, + int nodeId, String threadNamePrefix, FaultHandler faultHandler, MetadataLoaderMetrics metrics, @@ -210,7 +212,9 @@ public class MetadataLoader implements RaftClient.Listener this.uninitializedPublishers = new LinkedHashMap<>(); this.publishers = new LinkedHashMap<>(); this.image = MetadataImage.EMPTY; - this.eventQueue = new KafkaEventQueue(time, logContext, threadNamePrefix, new ShutdownEvent()); + this.eventQueue = new KafkaEventQueue(time, logContext, + threadNamePrefix + "metadata-loader-", + new ShutdownEvent()); } private boolean stillNeedToCatchUp(long offset) { diff --git a/metadata/src/main/java/org/apache/kafka/image/publisher/SnapshotGenerator.java b/metadata/src/main/java/org/apache/kafka/image/publisher/SnapshotGenerator.java index 989f6299401..7435efeafd2 100644 --- a/metadata/src/main/java/org/apache/kafka/image/publisher/SnapshotGenerator.java +++ b/metadata/src/main/java/org/apache/kafka/image/publisher/SnapshotGenerator.java @@ -45,6 +45,7 @@ public class SnapshotGenerator implements MetadataPublisher { private long maxBytesSinceLastSnapshot = 100 * 1024L * 1024L; private long maxTimeSinceLastSnapshotNs = TimeUnit.DAYS.toNanos(1); private AtomicReference disabledReason = null; + private String threadNamePrefix = ""; public Builder(Emitter emitter) { this.emitter = emitter; @@ -80,6 +81,11 @@ public class SnapshotGenerator implements MetadataPublisher { return this; } + public Builder setThreadNamePrefix(String threadNamePrefix) { + this.threadNamePrefix = threadNamePrefix; + return this; + } + public SnapshotGenerator build() { if (disabledReason == null) { disabledReason = new AtomicReference<>(); @@ -91,7 +97,8 @@ public class SnapshotGenerator implements MetadataPublisher { faultHandler, maxBytesSinceLastSnapshot, maxTimeSinceLastSnapshotNs, - disabledReason + disabledReason, + threadNamePrefix ); } } @@ -174,7 +181,8 @@ public class SnapshotGenerator implements MetadataPublisher { FaultHandler faultHandler, long maxBytesSinceLastSnapshot, long maxTimeSinceLastSnapshotNs, - AtomicReference disabledReason + AtomicReference disabledReason, + String threadNamePrefix ) { this.nodeId = nodeId; this.time = time; @@ -182,10 +190,10 @@ public class SnapshotGenerator implements MetadataPublisher { this.faultHandler = faultHandler; this.maxBytesSinceLastSnapshot = maxBytesSinceLastSnapshot; this.maxTimeSinceLastSnapshotNs = maxTimeSinceLastSnapshotNs; - LogContext logContext = new LogContext("[SnapshotGenerator " + nodeId + "] "); + LogContext logContext = new LogContext("[SnapshotGenerator id=" + nodeId + "] "); this.log = logContext.logger(SnapshotGenerator.class); this.disabledReason = disabledReason; - this.eventQueue = new KafkaEventQueue(time, logContext, "SnapshotGenerator" + nodeId); + this.eventQueue = new KafkaEventQueue(time, logContext, threadNamePrefix + "snapshot-generator-"); resetSnapshotCounters(); log.debug("Starting SnapshotGenerator."); } diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java index c6b1c0fc268..acfc1ad79ee 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java @@ -89,11 +89,11 @@ public class KRaftMigrationDriver implements MetadataPublisher { this.zkMigrationClient = zkMigrationClient; this.propagator = propagator; this.time = Time.SYSTEM; - LogContext logContext = new LogContext(String.format("[KRaftMigrationDriver nodeId=%d] ", nodeId)); + LogContext logContext = new LogContext("[KRaftMigrationDriver id=" + nodeId + "] "); this.log = logContext.logger(KRaftMigrationDriver.class); this.migrationState = MigrationDriverState.UNINITIALIZED; this.migrationLeadershipState = ZkMigrationLeadershipState.EMPTY; - this.eventQueue = new KafkaEventQueue(Time.SYSTEM, logContext, "kraft-migration"); + this.eventQueue = new KafkaEventQueue(Time.SYSTEM, logContext, "controller-" + nodeId + "-migration-driver-"); this.image = MetadataImage.EMPTY; this.leaderAndEpoch = LeaderAndEpoch.UNKNOWN; this.initialZkLoadHandler = initialZkLoadHandler; diff --git a/server-common/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java b/server-common/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java index 6b9d537d5be..4e49a9a9153 100644 --- a/server-common/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java +++ b/server-common/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java @@ -453,7 +453,7 @@ public final class KafkaEventQueue implements EventQueue { this.lock = new ReentrantLock(); this.log = logContext.logger(KafkaEventQueue.class); this.eventHandler = new EventHandler(); - this.eventHandlerThread = new KafkaThread(threadNamePrefix + "EventHandler", + this.eventHandlerThread = new KafkaThread(threadNamePrefix + "event-handler", this.eventHandler, false); this.shuttingDown = false; this.interrupted = false; diff --git a/tests/kafkatest/sanity_checks/test_bounce.py b/tests/kafkatest/sanity_checks/test_bounce.py index 5c9cd7f2759..fdefc9606f3 100644 --- a/tests/kafkatest/sanity_checks/test_bounce.py +++ b/tests/kafkatest/sanity_checks/test_bounce.py @@ -36,7 +36,7 @@ class TestBounce(Test): raise Exception("Illegal %s value provided for the test: %s" % (quorum_size_arg_name, quorum_size)) self.topic = "topic" self.zk = ZookeeperService(test_context, num_nodes=quorum_size) if quorum.for_test(test_context) == quorum.zk else None - num_kafka_nodes = quorum_size if quorum.for_test(test_context) == quorum.colocated_kraft else 1 + num_kafka_nodes = quorum_size if quorum.for_test(test_context) == quorum.combined_kraft else 1 self.kafka = KafkaService(test_context, num_nodes=num_kafka_nodes, zk=self.zk, topics={self.topic: {"partitions": 1, "replication-factor": 1}}, controller_num_nodes_override=quorum_size) @@ -53,7 +53,7 @@ class TestBounce(Test): # ZooKeeper and KRaft, quorum size = 1 @cluster(num_nodes=4) @matrix(metadata_quorum=quorum.all, quorum_size=[1]) - # Remote and Co-located KRaft, quorum size = 3 + # Isolated and Combined KRaft, quorum size = 3 @cluster(num_nodes=6) @matrix(metadata_quorum=quorum.all_kraft, quorum_size=[3]) def test_simple_run(self, metadata_quorum, quorum_size): @@ -73,6 +73,6 @@ class TestBounce(Test): assert num_produced == self.num_messages, "num_produced: %d, num_messages: %d" % (num_produced, self.num_messages) if first_time: self.producer.stop() - if self.kafka.quorum_info.using_kraft and self.kafka.remote_controller_quorum: - self.kafka.remote_controller_quorum.restart_cluster() + if self.kafka.quorum_info.using_kraft and self.kafka.isolated_controller_quorum: + self.kafka.isolated_controller_quorum.restart_cluster() self.kafka.restart_cluster() diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py index 064793a8c70..b40c68b7d74 100644 --- a/tests/kafkatest/services/kafka/kafka.py +++ b/tests/kafkatest/services/kafka/kafka.py @@ -77,11 +77,11 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): in process.roles (0 when using Zookeeper) controller_quorum : KafkaService None when using ZooKeeper, otherwise the Kafka service for the - co-located case or the remote controller quorum service - instance for the remote case - remote_controller_quorum : KafkaService - None for the co-located case or when using ZooKeeper, otherwise - the remote controller quorum service instance + combined case or the isolated controller quorum service + instance for the isolated case + isolated_controller_quorum : KafkaService + None for the combined case or when using ZooKeeper, otherwise + the isolated controller quorum service instance Kafka Security Protocols ------------------------ @@ -106,12 +106,12 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): KRaft Quorums ------------ - Set metadata_quorum accordingly (to COLOCATED_KRAFT or REMOTE_KRAFT). + Set metadata_quorum accordingly (to COMBINED_KRAFT or ISOLATED_KRAFT). Do not instantiate a ZookeeperService instance. - Starting Kafka will cause any remote controller quorum to + Starting Kafka will cause any isolated controller quorum to automatically start first. Explicitly stopping Kafka does not stop - any remote controller quorum, but Ducktape will stop both when + any isolated controller quorum, but Ducktape will stop both when tearing down the test (it will stop Kafka first). KRaft Security Protocols @@ -119,12 +119,12 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): The broker-to-controller and inter-controller security protocols will both initially be set to the inter-broker security protocol. The broker-to-controller and inter-controller security protocols - must be identical for the co-located case (an exception will be + must be identical for the combined case (an exception will be thrown when trying to start the service if they are not identical). The broker-to-controller and inter-controller security protocols - can differ in the remote case. + can differ in the isolated case. - Set these attributes for the co-located case. Changes take effect + Set these attributes for the combined case. Changes take effect when starting each node: controller_security_protocol : str @@ -136,10 +136,10 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): intercontroller_sasl_mechanism : str default GSSAPI, ignored unless using SASL_PLAINTEXT or SASL_SSL - Set the same attributes for the remote case (changes take effect + Set the same attributes for the isolated case (changes take effect when starting each quorum node), but you must first obtain the - service instance for the remote quorum via one of the - 'controller_quorum' or 'remote_controller_quorum' attributes as + service instance for the isolated quorum via one of the + 'controller_quorum' or 'isolated_controller_quorum' attributes as defined above. """ @@ -200,7 +200,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): zk_client_secure=False, listener_security_config=ListenerSecurityConfig(), per_node_server_prop_overrides=None, extra_kafka_opts="", tls_version=None, - remote_kafka=None, + isolated_kafka=None, controller_num_nodes_override=0, allow_zk_with_kraft=False, quorum_info_provider=None @@ -211,7 +211,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): 1) Zookeeper quorum: The number of brokers is defined by this parameter. The broker.id values will be 1..num_nodes. - 2) Co-located KRaft quorum: + 2) Combined KRaft quorum: The number of nodes having a broker role is defined by this parameter. The node.id values will be 1..num_nodes The number of nodes having a controller role will by default be 1, 3, or 5 depending on num_nodes @@ -231,10 +231,10 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): broker having node.id=1: broker.roles=broker+controller broker having node.id=2: broker.roles=broker broker having node.id=3: broker.roles=broker - 3) Remote KRaft quorum when instantiating the broker service: + 3) Isolated KRaft quorum when instantiating the broker service: The number of nodes, all of which will have broker.roles=broker, is defined by this parameter. The node.id values will be 1..num_nodes - 4) Remote KRaft quorum when instantiating the controller service: + 4) Isolated KRaft quorum when instantiating the controller service: The number of nodes, all of which will have broker.roles=controller, is defined by this parameter. The node.id values will be 3001..(3000 + num_nodes) The value passed in is determined by the broker service when that is instantiated, and it uses the @@ -260,21 +260,21 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): :param dict per_node_server_prop_overrides: overrides for kafka.properties file keyed by 1-based node number e.g: {1: [["config1", "true"], ["config2", "1000"]], 2: [["config1", "false"], ["config2", "0"]]} :param str extra_kafka_opts: jvm args to add to KAFKA_OPTS variable - :param KafkaService remote_kafka: process.roles=controller for this cluster when not None; ignored when using ZooKeeper - :param int controller_num_nodes_override: the number of nodes to use in the cluster, instead of 5, 3, or 1 based on num_nodes, if positive, not using ZooKeeper, and remote_kafka is not None; ignored otherwise + :param KafkaService isolated_kafka: process.roles=controller for this cluster when not None; ignored when using ZooKeeper + :param int controller_num_nodes_override: the number of nodes to use in the cluster, instead of 5, 3, or 1 based on num_nodes, if positive, not using ZooKeeper, and isolated_kafka is not None; ignored otherwise :param bool allow_zk_with_kraft: if True, then allow a KRaft broker or controller to also use ZooKeeper :param quorum_info_provider: A function that takes this KafkaService as an argument and returns a ServiceQuorumInfo. If this is None, then the ServiceQuorumInfo is generated from the test context """ self.zk = zk - self.remote_kafka = remote_kafka + self.isolated_kafka = isolated_kafka self.allow_zk_with_kraft = allow_zk_with_kraft if quorum_info_provider is None: self.quorum_info = quorum.ServiceQuorumInfo.from_test_context(self, context) else: self.quorum_info = quorum_info_provider(self) self.controller_quorum = None # will define below if necessary - self.remote_controller_quorum = None # will define below if necessary + self.isolated_controller_quorum = None # will define below if necessary self.configured_for_zk_migration = False if num_nodes < 1: @@ -287,44 +287,44 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): num_nodes_broker_role = num_nodes if self.quorum_info.has_controllers: self.num_nodes_controller_role = self.num_kraft_controllers(num_nodes_broker_role, controller_num_nodes_override) - if self.remote_kafka: - raise Exception("Must not specify remote Kafka service with co-located Controller quorum") + if self.isolated_kafka: + raise Exception("Must not specify isolated Kafka service with combined Controller quorum") else: self.num_nodes_controller_role = num_nodes - if not self.remote_kafka: - raise Exception("Must specify remote Kafka service when instantiating remote Controller service (should not happen)") + if not self.isolated_kafka: + raise Exception("Must specify isolated Kafka service when instantiating isolated Controller service (should not happen)") # Initially use the inter-broker security protocol for both # broker-to-controller and inter-controller communication. Both can be explicitly changed later if desired. - # Note, however, that the two must the same if the controller quorum is co-located with the - # brokers. Different security protocols for the two are only supported with a remote controller quorum. + # Note, however, that the two must the same if the controller quorum is combined with the + # brokers. Different security protocols for the two are only supported with a isolated controller quorum. self.controller_security_protocol = interbroker_security_protocol self.controller_sasl_mechanism = interbroker_sasl_mechanism self.intercontroller_security_protocol = interbroker_security_protocol self.intercontroller_sasl_mechanism = interbroker_sasl_mechanism # Ducktape tears down services in the reverse order in which they are created, - # so create a service for the remote controller quorum (if we need one) first, before + # so create a service for the isolated controller quorum (if we need one) first, before # invoking Service.__init__(), so that Ducktape will tear down the quorum last; otherwise # Ducktape will tear down the controller quorum first, which could lead to problems in # Kafka and delays in tearing it down (and who knows what else -- it's simply better - # to correctly tear down Kafka first, before tearing down the remote controller). + # to correctly tear down Kafka first, before tearing down the isolated controller). if self.quorum_info.has_controllers: self.controller_quorum = self else: - num_remote_controller_nodes = self.num_kraft_controllers(num_nodes, controller_num_nodes_override) - self.remote_controller_quorum = KafkaService( - context, num_remote_controller_nodes, self.zk, security_protocol=self.controller_security_protocol, + num_isolated_controller_nodes = self.num_kraft_controllers(num_nodes, controller_num_nodes_override) + self.isolated_controller_quorum = KafkaService( + context, num_isolated_controller_nodes, self.zk, security_protocol=self.controller_security_protocol, interbroker_security_protocol=self.intercontroller_security_protocol, client_sasl_mechanism=self.controller_sasl_mechanism, interbroker_sasl_mechanism=self.intercontroller_sasl_mechanism, authorizer_class_name=authorizer_class_name, version=version, jmx_object_names=jmx_object_names, jmx_attributes=jmx_attributes, listener_security_config=listener_security_config, extra_kafka_opts=extra_kafka_opts, tls_version=tls_version, - remote_kafka=self, allow_zk_with_kraft=self.allow_zk_with_kraft, + isolated_kafka=self, allow_zk_with_kraft=self.allow_zk_with_kraft, server_prop_overrides=server_prop_overrides ) - self.controller_quorum = self.remote_controller_quorum + self.controller_quorum = self.isolated_controller_quorum Service.__init__(self, context, num_nodes) JmxMixin.__init__(self, num_nodes=num_nodes, jmx_object_names=jmx_object_names, jmx_attributes=(jmx_attributes or []), @@ -429,7 +429,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): node.config = KafkaConfig(**kraft_broker_plus_zk_configs) else: node.config = KafkaConfig(**kraft_broker_configs) - self.colocated_nodes_started = 0 + self.combined_nodes_started = 0 self.nodes_to_start = self.nodes def reconfigure_zk_for_migration(self, kraft_quorum): @@ -459,16 +459,16 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): self.server_prop_overrides.extend(props) del self.port_mappings[kraft_quorum.controller_listener_names] - # Set the quorum info to remote KRaft - self.quorum_info = quorum.ServiceQuorumInfo(quorum.remote_kraft, self) - self.remote_controller_quorum = kraft_quorum + # Set the quorum info to isolated KRaft + self.quorum_info = quorum.ServiceQuorumInfo(quorum.isolated_kraft, self) + self.isolated_controller_quorum = kraft_quorum self.controller_quorum = kraft_quorum def num_kraft_controllers(self, num_nodes_broker_role, controller_num_nodes_override): if controller_num_nodes_override < 0: raise Exception("controller_num_nodes_override must not be negative: %i" % controller_num_nodes_override) - if controller_num_nodes_override > num_nodes_broker_role and self.quorum_info.quorum_type == quorum.colocated_kraft: - raise Exception("controller_num_nodes_override must not exceed the service's node count in the co-located case: %i > %i" % + if controller_num_nodes_override > num_nodes_broker_role and self.quorum_info.quorum_type == quorum.combined_kraft: + raise Exception("controller_num_nodes_override must not exceed the service's node count in the combined case: %i > %i" % (controller_num_nodes_override, num_nodes_broker_role)) if controller_num_nodes_override: return controller_num_nodes_override @@ -511,7 +511,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): @property def security_config(self): if not self._security_config: - # we will later change the security protocols to PLAINTEXT if this is a remote KRaft controller case since + # we will later change the security protocols to PLAINTEXT if this is an isolated KRaft controller case since # those security protocols are irrelevant there and we don't want to falsely indicate the use of SASL or TLS security_protocol_to_use=self.security_protocol interbroker_security_protocol_to_use=self.interbroker_security_protocol @@ -525,7 +525,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): if self.quorum_info.has_controllers: if self.intercontroller_security_protocol in SecurityConfig.SASL_SECURITY_PROTOCOLS: serves_intercontroller_sasl_mechanism = self.intercontroller_sasl_mechanism - uses_controller_sasl_mechanism = self.intercontroller_sasl_mechanism # won't change from above in co-located case + uses_controller_sasl_mechanism = self.intercontroller_sasl_mechanism # won't change from above in combined case if self.controller_security_protocol in SecurityConfig.SASL_SECURITY_PROTOCOLS: serves_controller_sasl_mechanism = self.controller_sasl_mechanism # determine if KRaft uses TLS @@ -534,10 +534,10 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): # KRaft broker only kraft_tls = self.controller_quorum.controller_security_protocol in SecurityConfig.SSL_SECURITY_PROTOCOLS if self.quorum_info.has_controllers: - # remote or co-located KRaft controller + # isolated or combined KRaft controller kraft_tls = self.controller_security_protocol in SecurityConfig.SSL_SECURITY_PROTOCOLS \ or self.intercontroller_security_protocol in SecurityConfig.SSL_SECURITY_PROTOCOLS - # clear irrelevant security protocols of SASL/TLS implications for remote controller quorum case + # clear irrelevant security protocols of SASL/TLS implications for the isolated controller quorum case if self.quorum_info.has_controllers and not self.quorum_info.has_brokers: security_protocol_to_use=SecurityConfig.PLAINTEXT interbroker_security_protocol_to_use=SecurityConfig.PLAINTEXT @@ -556,7 +556,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): self._security_config.properties['security.protocol'] = self.security_protocol self._security_config.properties['sasl.mechanism'] = self.client_sasl_mechanism # Ensure we have the right inter-broker security protocol because it may have been mutated - # since we cached our security config (ignore if this is a remote KRaft controller quorum case; the + # since we cached our security config (ignore if this is an isolated KRaft controller quorum case; the # inter-broker security protocol is not used there). if (self.quorum_info.using_zk or self.quorum_info.has_brokers): # in case inter-broker SASL mechanism has changed without changing the inter-broker security protocol @@ -587,7 +587,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): has_sasl = self.security_config.has_sasl if has_sasl: if self.minikdc is None: - other_service = self.remote_kafka if self.remote_kafka else self.controller_quorum if self.quorum_info.using_kraft else None + other_service = self.isolated_kafka if self.isolated_kafka else self.controller_quorum if self.quorum_info.using_kraft else None if not other_service or not other_service.minikdc: nodes_for_kdc = self.nodes.copy() if other_service and other_service != self: @@ -598,8 +598,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): self.minikdc = None if self.quorum_info.using_kraft: self.controller_quorum.minikdc = None - if self.remote_kafka: - self.remote_kafka.minikdc = None + if self.isolated_kafka: + self.isolated_kafka.minikdc = None def alive(self, node): return len(self.pids(node)) > 0 @@ -621,7 +621,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): # This is not supported because both the broker and the controller take the first entry from # controller.listener.names and the value from sasl.mechanism.controller.protocol; # they share a single config, so they must both see/use identical values. - raise Exception("Co-located KRaft Brokers (%s/%s) and Controllers (%s/%s) cannot talk to Controllers via different security protocols" % + raise Exception("Combined KRaft Brokers (%s/%s) and Controllers (%s/%s) cannot talk to Controllers via different security protocols" % (self.controller_security_protocol, self.controller_sasl_mechanism, self.intercontroller_security_protocol, self.intercontroller_sasl_mechanism)) if self.quorum_info.using_zk or self.quorum_info.has_brokers: @@ -629,7 +629,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): self.interbroker_listener.open = True # we have to wait to decide whether to open the controller port(s) # because it could be dependent on the particular node in the - # co-located case where the number of controllers could be less + # combined case where the number of controllers could be less # than the number of nodes in the service self.start_minikdc_if_necessary(add_principals) @@ -641,8 +641,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): if self.quorum_info.using_zk: self._ensure_zk_chroot() - if self.remote_controller_quorum: - self.remote_controller_quorum.start() + if self.isolated_controller_quorum: + self.isolated_controller_quorum.start() Service.start(self, **kwargs) @@ -729,7 +729,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): if not port.name in controller_listener_names: advertised_listeners.append(port.advertised_listener(node)) protocol_map.append(port.listener_security_protocol()) - controller_sec_protocol = self.remote_controller_quorum.controller_security_protocol if self.remote_controller_quorum \ + controller_sec_protocol = self.isolated_controller_quorum.controller_security_protocol if self.isolated_controller_quorum \ else self.controller_security_protocol if self.quorum_info.has_brokers_and_controllers and not quorum.NodeQuorumInfo(self.quorum_info, node).has_controller_role \ else None if controller_sec_protocol: @@ -826,7 +826,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): for controller_listener in self.controller_listener_name_list(node): if self.node_quorum_info.has_controller_role: self.open_port(controller_listener) - else: # co-located case where node doesn't have a controller + else: # combined case where node doesn't have a controller self.close_port(controller_listener) self.security_config.setup_node(node) @@ -845,9 +845,9 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): for node in self.controller_quorum.nodes[:self.controller_quorum.num_nodes_controller_role]]) # define controller.listener.names self.controller_listener_names = ','.join(self.controller_listener_name_list(node)) - # define sasl.mechanism.controller.protocol to match remote quorum if one exists - if self.remote_controller_quorum: - self.controller_sasl_mechanism = self.remote_controller_quorum.controller_sasl_mechanism + # define sasl.mechanism.controller.protocol to match the isolated quorum if one exists + if self.isolated_controller_quorum: + self.controller_sasl_mechanism = self.isolated_controller_quorum.controller_sasl_mechanism prop_file = self.prop_file(node) self.logger.info("kafka.properties:") @@ -866,7 +866,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): self.logger.debug("Attempting to start KafkaService %s on %s with command: %s" %\ ("concurrently" if self.concurrent_start else "serially", str(node.account), cmd)) if self.node_quorum_info.has_controller_role and self.node_quorum_info.has_broker_role: - self.colocated_nodes_started += 1 + self.combined_nodes_started += 1 if self.concurrent_start: node.account.ssh(cmd) # and then don't wait for the startup message else: @@ -937,31 +937,31 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): def stop_node(self, node, clean_shutdown=True, timeout_sec=60): pids = self.pids(node) - cluster_has_colocated_controllers = self.quorum_info.has_brokers and self.quorum_info.has_controllers - force_sigkill_due_to_too_few_colocated_controllers =\ - clean_shutdown and cluster_has_colocated_controllers\ - and self.colocated_nodes_started < self.controllers_required_for_quorum() - if force_sigkill_due_to_too_few_colocated_controllers: - self.logger.info("Forcing node to stop via SIGKILL due to too few co-located KRaft controllers: %i/%i" %\ - (self.colocated_nodes_started, self.num_nodes_controller_role)) + cluster_has_combined_controllers = self.quorum_info.has_brokers and self.quorum_info.has_controllers + force_sigkill_due_to_too_few_combined_controllers =\ + clean_shutdown and cluster_has_combined_controllers\ + and self.combined_nodes_started < self.controllers_required_for_quorum() + if force_sigkill_due_to_too_few_combined_controllers: + self.logger.info("Forcing node to stop via SIGKILL due to too few combined KRaft controllers: %i/%i" %\ + (self.combined_nodes_started, self.num_nodes_controller_role)) - sig = signal.SIGTERM if clean_shutdown and not force_sigkill_due_to_too_few_colocated_controllers else signal.SIGKILL + sig = signal.SIGTERM if clean_shutdown and not force_sigkill_due_to_too_few_combined_controllers else signal.SIGKILL for pid in pids: node.account.signal(pid, sig, allow_fail=False) node_quorum_info = quorum.NodeQuorumInfo(self.quorum_info, node) - node_has_colocated_controllers = node_quorum_info.has_controller_role and node_quorum_info.has_broker_role - if pids and node_has_colocated_controllers: - self.colocated_nodes_started -= 1 + node_has_combined_controllers = node_quorum_info.has_controller_role and node_quorum_info.has_broker_role + if pids and node_has_combined_controllers: + self.combined_nodes_started -= 1 try: wait_until(lambda: len(self.pids(node)) == 0, timeout_sec=timeout_sec, err_msg="Kafka node failed to stop in %d seconds" % timeout_sec) except Exception: - if node_has_colocated_controllers: + if node_has_combined_controllers: # it didn't stop - self.colocated_nodes_started += 1 + self.combined_nodes_started += 1 self.thread_dump(node) raise @@ -1501,9 +1501,9 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): return missing def restart_cluster(self, clean_shutdown=True, timeout_sec=60, after_each_broker_restart=None, *args): - # We do not restart the remote controller quorum if it exists. + # We do not restart the isolated controller quorum if it exists. # This is not widely used -- it typically appears in rolling upgrade tests -- - # so we will let tests explicitly decide if/when to restart any remote controller quorum. + # so we will let tests explicitly decide if/when to restart any isolated controller quorum. for node in self.nodes: self.restart_node(node, clean_shutdown=clean_shutdown, timeout_sec=timeout_sec) if after_each_broker_restart is not None: diff --git a/tests/kafkatest/services/kafka/quorum.py b/tests/kafkatest/services/kafka/quorum.py index 499085b2a2e..724d7d0de12 100644 --- a/tests/kafkatest/services/kafka/quorum.py +++ b/tests/kafkatest/services/kafka/quorum.py @@ -15,20 +15,20 @@ # the types of metadata quorums we support zk = 'ZK' # ZooKeeper, used before/during the KIP-500 bridge release(s) -colocated_kraft = 'COLOCATED_KRAFT' # co-located Controllers in KRaft mode, used during/after the KIP-500 bridge release(s) -remote_kraft = 'REMOTE_KRAFT' # separate Controllers in KRaft mode, used during/after the KIP-500 bridge release(s) +combined_kraft = 'COMBINED_KRAFT' # combined Controllers in KRaft mode, used during/after the KIP-500 bridge release(s) +isolated_kraft = 'ISOLATED_KRAFT' # isolated Controllers in KRaft mode, used during/after the KIP-500 bridge release(s) # How we will parameterize tests that exercise all quorum styles -# [“ZK”, “REMOTE_KRAFT”, "COLOCATED_KRAFT"] during the KIP-500 bridge release(s) -# [“REMOTE_KRAFT”, "COLOCATED_KRAFT”] after the KIP-500 bridge release(s) -all = [zk, remote_kraft, colocated_kraft] +# [“ZK”, “ISOLATED_KRAFT”, "COMBINED_KRAFT"] during the KIP-500 bridge release(s) +# [“ISOLATED_KRAFT”, "COMBINED_KRAFT”] after the KIP-500 bridge release(s) +all = [zk, isolated_kraft, combined_kraft] # How we will parameterize tests that exercise all KRaft quorum styles -all_kraft = [remote_kraft, colocated_kraft] +all_kraft = [isolated_kraft, combined_kraft] # How we will parameterize tests that are unrelated to upgrades: # [“ZK”] before the KIP-500 bridge release(s) -# [“ZK”, “REMOTE_KRAFT”] during the KIP-500 bridge release(s) and in preview releases -# [“REMOTE_KRAFT”] after the KIP-500 bridge release(s) -all_non_upgrade = [zk, remote_kraft] +# [“ZK”, “ISOLATED_KRAFT”] during the KIP-500 bridge release(s) and in preview releases +# [“ISOLATED_KRAFT”] after the KIP-500 bridge release(s) +all_non_upgrade = [zk, isolated_kraft] def for_test(test_context): # A test uses ZooKeeper if it doesn't specify a metadata quorum or if it explicitly specifies ZooKeeper @@ -44,13 +44,13 @@ class ServiceQuorumInfo: Exposes quorum-related information for a KafkaService Kafka can use either ZooKeeper or a KRaft (Kafka Raft) Controller quorum for - its metadata. KRaft Controllers can either be co-located with Kafka in - the same JVM or remote in separate JVMs. The choice is made via + its metadata. KRaft Controllers can either be combined with Kafka in + the same JVM or isolated in separate JVMs. The choice is made via the 'metadata_quorum' parameter defined for the system test: if it is not explicitly defined, or if it is set to 'ZK', then ZooKeeper - is used. If it is explicitly set to 'COLOCATED_KRAFT' then KRaft - controllers will be co-located with the brokers; the value - `REMOTE_KRAFT` indicates remote controllers. + is used. If it is explicitly set to 'COMBINED_KRAFT' then KRaft + controllers will be combined with the brokers; the value + `ISOLATED_KRAFT` indicates isolated controllers. Attributes ---------- @@ -59,7 +59,7 @@ class ServiceQuorumInfo: The service for which this instance exposes quorum-related information quorum_type : str - COLOCATED_KRAFT, REMOTE_KRAFT, or ZK + COMBINED_KRAFT, ISOLATED_KRAFT, or ZK using_zk : bool True iff quorum_type==ZK using_kraft : bool @@ -67,22 +67,22 @@ class ServiceQuorumInfo: has_brokers : bool Whether there is at least one node with process.roles containing 'broker'. True iff using_kraft and the Kafka - service doesn't itself have a remote Kafka service (meaning - it is not a remote controller quorum). + service doesn't itself have an isolated Kafka service (meaning + it is not an isolated controller quorum). has_controllers : bool Whether there is at least one node with process.roles containing 'controller'. True iff quorum_type == - COLOCATED_KRAFT or the Kafka service itself has a remote Kafka - service (meaning it is a remote controller quorum). + COMBINED_KRAFT or the Kafka service itself has an isolated Kafka + service (meaning it is an isolated controller quorum). has_brokers_and_controllers : - True iff quorum_type==COLOCATED_KRAFT + True iff quorum_type==COMBINED_KRAFT """ def __init__(self, quorum_type, kafka): """ :param quorum_type : str - The type of quorum being used. Either "ZK", "COLOCATED_KRAFT", or "REMOTE_KRAFT" + The type of quorum being used. Either "ZK", "COMBINED_KRAFT", or "ISOLATED_KRAFT" :param context : TestContext The test context within which the this instance and the given Kafka service is being instantiated @@ -90,16 +90,16 @@ class ServiceQuorumInfo: if quorum_type != zk and kafka.zk and not kafka.allow_zk_with_kraft: raise Exception("Cannot use ZooKeeper while specifying a KRaft metadata quorum unless explicitly allowing it") - if kafka.remote_kafka and quorum_type != remote_kraft: - raise Exception("Cannot specify a remote Kafka service unless using a remote KRaft metadata quorum (should not happen)") + if kafka.isolated_kafka and quorum_type != isolated_kraft: + raise Exception("Cannot specify an isolated Kafka service unless using an isolated KRaft metadata quorum (should not happen)") self.kafka = kafka self.quorum_type = quorum_type self.using_zk = quorum_type == zk self.using_kraft = not self.using_zk - self.has_brokers = self.using_kraft and not kafka.remote_kafka - self.has_controllers = quorum_type == colocated_kraft or kafka.remote_kafka - self.has_brokers_and_controllers = quorum_type == colocated_kraft + self.has_brokers = self.using_kraft and not kafka.isolated_kafka + self.has_controllers = quorum_type == combined_kraft or kafka.isolated_kafka + self.has_brokers_and_controllers = quorum_type == combined_kraft @staticmethod def from_test_context(kafka, context): @@ -127,12 +127,12 @@ class NodeQuorumInfo: belongs has_broker_role : bool True iff using_kraft and the Kafka service doesn't itself have - a remote Kafka service (meaning it is not a remote controller) + an isolated Kafka service (meaning it is not an isolated controller) has_controller_role : bool - True iff quorum_type==COLOCATED_KRAFT and the node is one of + True iff quorum_type==COMBINED_KRAFT and the node is one of the first N in the cluster where N is the number of nodes - that have a controller role; or the Kafka service itself has a - remote Kafka service (meaning it is a remote controller + that have a controller role; or the Kafka service itself has an + isolated Kafka service (meaning it is an isolated controller quorum). has_combined_broker_and_controller_roles : True iff has_broker_role==True and has_controller_role==true @@ -145,7 +145,7 @@ class NodeQuorumInfo: belongs :param node : Node The particular node for which this information applies. - In the co-located case, whether or not a node's broker's + In the combined case, whether or not a node's broker's process.roles contains 'controller' may vary based on the particular node if the number of controller nodes is less than the number of nodes in the service. diff --git a/tests/kafkatest/tests/core/kraft_upgrade_test.py b/tests/kafkatest/tests/core/kraft_upgrade_test.py index cf9ea47e4f5..df28795075a 100644 --- a/tests/kafkatest/tests/core/kraft_upgrade_test.py +++ b/tests/kafkatest/tests/core/kraft_upgrade_test.py @@ -18,7 +18,7 @@ from ducktape.mark.resource import cluster from ducktape.utils.util import wait_until from kafkatest.services.console_consumer import ConsoleConsumer from kafkatest.services.kafka import KafkaService -from kafkatest.services.kafka.quorum import remote_kraft, colocated_kraft +from kafkatest.services.kafka.quorum import isolated_kraft, combined_kraft from kafkatest.services.verifiable_producer import VerifiableProducer from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest from kafkatest.utils import is_int @@ -109,14 +109,14 @@ class TestKRaftUpgrade(ProduceConsumeValidateTest): assert self.kafka.check_protocol_errors(self) @cluster(num_nodes=5) - @parametrize(from_kafka_version=str(LATEST_3_1), metadata_quorum=colocated_kraft) - @parametrize(from_kafka_version=str(LATEST_3_2), metadata_quorum=colocated_kraft) - def test_colocated_upgrade(self, from_kafka_version, metadata_quorum): + @parametrize(from_kafka_version=str(LATEST_3_1), metadata_quorum=combined_kraft) + @parametrize(from_kafka_version=str(LATEST_3_2), metadata_quorum=combined_kraft) + def test_combined_mode_upgrade(self, from_kafka_version, metadata_quorum): self.run_upgrade(from_kafka_version) @cluster(num_nodes=8) - @parametrize(from_kafka_version=str(LATEST_3_1), metadata_quorum=remote_kraft) - @parametrize(from_kafka_version=str(LATEST_3_2), metadata_quorum=remote_kraft) - def test_non_colocated_upgrade(self, from_kafka_version, metadata_quorum): + @parametrize(from_kafka_version=str(LATEST_3_1), metadata_quorum=isolated_kraft) + @parametrize(from_kafka_version=str(LATEST_3_2), metadata_quorum=isolated_kraft) + def test_isolated_mode_upgrade(self, from_kafka_version, metadata_quorum): self.run_upgrade(from_kafka_version) diff --git a/tests/kafkatest/tests/core/snapshot_test.py b/tests/kafkatest/tests/core/snapshot_test.py index cb62cb93f55..efc43e3b854 100644 --- a/tests/kafkatest/tests/core/snapshot_test.py +++ b/tests/kafkatest/tests/core/snapshot_test.py @@ -71,8 +71,8 @@ class TestSnapshots(ProduceConsumeValidateTest): topic_count = 10 self.topics_created += self.create_n_topics(topic_count) - if self.kafka.remote_controller_quorum: - self.controller_nodes = self.kafka.remote_controller_quorum.nodes + if self.kafka.isolated_controller_quorum: + self.controller_nodes = self.kafka.isolated_controller_quorum.nodes else: self.controller_nodes = self.kafka.nodes[:self.kafka.num_nodes_controller_role] @@ -145,7 +145,7 @@ class TestSnapshots(ProduceConsumeValidateTest): @cluster(num_nodes=9) @matrix(metadata_quorum=quorum.all_kraft) - def test_broker(self, metadata_quorum=quorum.colocated_kraft): + def test_broker(self, metadata_quorum=quorum.combined_kraft): """ Test the ability of a broker to consume metadata snapshots and to recover the cluster metadata state using them @@ -205,7 +205,7 @@ class TestSnapshots(ProduceConsumeValidateTest): @cluster(num_nodes=9) @matrix(metadata_quorum=quorum.all_kraft) - def test_controller(self, metadata_quorum=quorum.colocated_kraft): + def test_controller(self, metadata_quorum=quorum.combined_kraft): """ Test the ability of controllers to consume metadata snapshots and to recover the cluster metadata state using them