MINOR: Standardize KRaft logging, thread names, and terminology (#13390)

Standardize KRaft thread names.

- Always use kebab case. That is, "my-thread-name".

- Thread prefixes are just strings, not Option[String] or Optional<String>.
  If you don't want a prefix, use the empty string.

- Thread prefixes end in a dash (except the empty prefix). Then you can
  calculate thread names as $prefix + "my-thread-name"

- Broker-only components get "broker-$id-" as a thread name prefix. For example, "broker-1-"

- Controller-only components get "controller-$id-" as a thread name prefix. For example, "controller-1-"

- Shared components get "kafka-$id-" as a thread name prefix. For example, "kafka-0-"

- Always pass a prefix to KafkaEventQueue, so that threads have names like
  "broker-0-metadata-loader-event-handler" rather than "event-handler". Prior to this PR, we had
  several threads just named "EventHandler" which was not helpful for debugging.

- QuorumController thread name is "quorum-controller-123-event-handler"

- Don't set a thread prefix for replication threads started by ReplicaManager. They run only on the
  broker, and already include the broker ID.

Standardize KRaft slf4j log prefixes.

- Names should be of the form "[ComponentName id=$id] ". So for a ControllerServer with ID 123, we
  will have "[ControllerServer id=123] "

- For the QuorumController class, use the prefix "[QuorumController id=$id] " rather than
  "[Controller <nodeId] ", to make it clearer that this is a KRaft controller.

- In BrokerLifecycleManager, add isZkBroker=true to the log prefix for the migration case.

Standardize KRaft terminology.

- All synonyms of combined mode (colocated, coresident, etc.) should be replaced by "combined"

- All synonyms of isolated mode (remote, non-colocated, distributed, etc.) should be replaced by
  "isolated".
This commit is contained in:
Colin Patrick McCabe 2023-03-16 15:33:03 -07:00 committed by GitHub
parent 5dcdf71dec
commit ddd652c672
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
29 changed files with 224 additions and 205 deletions

View File

@ -81,7 +81,6 @@ object Kafka extends Logging {
new KafkaRaftServer( new KafkaRaftServer(
config, config,
Time.SYSTEM, Time.SYSTEM,
threadNamePrefix = None
) )
} }
} }

View File

@ -143,7 +143,7 @@ class KafkaRaftManager[T](
val apiVersions = new ApiVersions() val apiVersions = new ApiVersions()
private val raftConfig = new RaftConfig(config) private val raftConfig = new RaftConfig(config)
private val threadNamePrefix = threadNamePrefixOpt.getOrElse("kafka-raft") private val threadNamePrefix = threadNamePrefixOpt.getOrElse("kafka-raft")
private val logContext = new LogContext(s"[RaftManager nodeId=${config.nodeId}] ") private val logContext = new LogContext(s"[RaftManager id=${config.nodeId}] ")
this.logIdent = logContext.logPrefix() this.logIdent = logContext.logPrefix()
private val scheduler = new KafkaScheduler(1, true, threadNamePrefix + "-scheduler") private val scheduler = new KafkaScheduler(1, true, threadNamePrefix + "-scheduler")

View File

@ -81,7 +81,7 @@ object AlterPartitionManager {
controllerNodeProvider: ControllerNodeProvider, controllerNodeProvider: ControllerNodeProvider,
time: Time, time: Time,
metrics: Metrics, metrics: Metrics,
threadNamePrefix: Option[String], threadNamePrefix: String,
brokerEpochSupplier: () => Long, brokerEpochSupplier: () => Long,
): AlterPartitionManager = { ): AlterPartitionManager = {
val channelManager = BrokerToControllerChannelManager( val channelManager = BrokerToControllerChannelManager(
@ -89,7 +89,7 @@ object AlterPartitionManager {
time = time, time = time,
metrics = metrics, metrics = metrics,
config = config, config = config,
channelName = "alterPartition", channelName = "alter-partition",
threadNamePrefix = threadNamePrefix, threadNamePrefix = threadNamePrefix,
retryTimeoutMs = Long.MaxValue retryTimeoutMs = Long.MaxValue
) )

View File

@ -54,11 +54,21 @@ import scala.jdk.CollectionConverters._
class BrokerLifecycleManager( class BrokerLifecycleManager(
val config: KafkaConfig, val config: KafkaConfig,
val time: Time, val time: Time,
val threadNamePrefix: Option[String], val threadNamePrefix: String,
val isZkBroker: Boolean val isZkBroker: Boolean
) extends Logging { ) extends Logging {
val logContext = new LogContext(s"[BrokerLifecycleManager id=${config.nodeId}] ") private def logPrefix(): String = {
val builder = new StringBuilder("[BrokerLifecycleManager")
builder.append(" id=").append(config.nodeId)
if (isZkBroker) {
builder.append(" isZkBroker=true")
}
builder.append("]")
builder.toString()
}
val logContext = new LogContext(logPrefix())
this.logIdent = logContext.logPrefix() this.logIdent = logContext.logPrefix()
@ -182,7 +192,7 @@ class BrokerLifecycleManager(
*/ */
private[server] val eventQueue = new KafkaEventQueue(time, private[server] val eventQueue = new KafkaEventQueue(time,
logContext, logContext,
threadNamePrefix.getOrElse(""), threadNamePrefix + "lifecycle-manager-",
new ShutdownEvent()) new ShutdownEvent())
/** /**

View File

@ -64,7 +64,6 @@ class BrokerServer(
val sharedServer: SharedServer, val sharedServer: SharedServer,
val initialOfflineDirs: Seq[String], val initialOfflineDirs: Seq[String],
) extends KafkaBroker { ) extends KafkaBroker {
val threadNamePrefix = sharedServer.threadNamePrefix
val config = sharedServer.brokerConfig val config = sharedServer.brokerConfig
val time = sharedServer.time val time = sharedServer.time
def metrics = sharedServer.metrics def metrics = sharedServer.metrics
@ -172,7 +171,7 @@ class BrokerServer(
lifecycleManager = new BrokerLifecycleManager(config, lifecycleManager = new BrokerLifecycleManager(config,
time, time,
threadNamePrefix, s"broker-${config.nodeId}-",
isZkBroker = false) isZkBroker = false)
/* start scheduler */ /* start scheduler */
@ -182,7 +181,8 @@ class BrokerServer(
/* register broker metrics */ /* register broker metrics */
brokerTopicStats = new BrokerTopicStats brokerTopicStats = new BrokerTopicStats
quotaManagers = QuotaFactory.instantiate(config, metrics, time, threadNamePrefix.getOrElse(""))
quotaManagers = QuotaFactory.instantiate(config, metrics, time, s"broker-${config.nodeId}-")
logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size) logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size)
@ -213,7 +213,7 @@ class BrokerServer(
metrics, metrics,
config, config,
channelName = "forwarding", channelName = "forwarding",
threadNamePrefix, s"broker-${config.nodeId}-",
retryTimeoutMs = 60000 retryTimeoutMs = 60000
) )
clientToControllerChannelManager.start() clientToControllerChannelManager.start()
@ -242,7 +242,7 @@ class BrokerServer(
controllerNodeProvider, controllerNodeProvider,
time = time, time = time,
metrics, metrics,
threadNamePrefix, s"broker-${config.nodeId}-",
brokerEpochSupplier = () => lifecycleManager.brokerEpoch brokerEpochSupplier = () => lifecycleManager.brokerEpoch
) )
alterPartitionManager.start() alterPartitionManager.start()
@ -261,8 +261,9 @@ class BrokerServer(
brokerTopicStats = brokerTopicStats, brokerTopicStats = brokerTopicStats,
isShuttingDown = isShuttingDown, isShuttingDown = isShuttingDown,
zkClient = None, zkClient = None,
threadNamePrefix = threadNamePrefix, threadNamePrefix = None, // The ReplicaManager only runs on the broker, and already includes the ID in thread names.
brokerEpochSupplier = () => lifecycleManager.brokerEpoch) brokerEpochSupplier = () => lifecycleManager.brokerEpoch
)
/* start token manager */ /* start token manager */
if (config.tokenAuthEnabled) { if (config.tokenAuthEnabled) {
@ -321,7 +322,7 @@ class BrokerServer(
metrics, metrics,
config, config,
"heartbeat", "heartbeat",
threadNamePrefix, s"broker-${config.nodeId}-",
config.brokerSessionTimeoutMs / 2 // KAFKA-14392 config.brokerSessionTimeoutMs / 2 // KAFKA-14392
) )
lifecycleManager.start( lifecycleManager.start(

View File

@ -136,7 +136,7 @@ object BrokerToControllerChannelManager {
metrics: Metrics, metrics: Metrics,
config: KafkaConfig, config: KafkaConfig,
channelName: String, channelName: String,
threadNamePrefix: Option[String], threadNamePrefix: String,
retryTimeoutMs: Long retryTimeoutMs: Long
): BrokerToControllerChannelManager = { ): BrokerToControllerChannelManager = {
new BrokerToControllerChannelManagerImpl( new BrokerToControllerChannelManagerImpl(
@ -174,10 +174,10 @@ class BrokerToControllerChannelManagerImpl(
metrics: Metrics, metrics: Metrics,
config: KafkaConfig, config: KafkaConfig,
channelName: String, channelName: String,
threadNamePrefix: Option[String], threadNamePrefix: String,
retryTimeoutMs: Long retryTimeoutMs: Long
) extends BrokerToControllerChannelManager with Logging { ) extends BrokerToControllerChannelManager with Logging {
private val logContext = new LogContext(s"[BrokerToControllerChannelManager broker=${config.brokerId} name=$channelName] ") private val logContext = new LogContext(s"[BrokerToControllerChannelManager id=${config.brokerId} name=${channelName}] ")
private val manualMetadataUpdater = new ManualMetadataUpdater() private val manualMetadataUpdater = new ManualMetadataUpdater()
private val apiVersions = new ApiVersions() private val apiVersions = new ApiVersions()
private val requestThread = newRequestThread private val requestThread = newRequestThread
@ -236,10 +236,7 @@ class BrokerToControllerChannelManagerImpl(
logContext logContext
) )
} }
val threadName = threadNamePrefix match { val threadName = s"${threadNamePrefix}to-controller-${channelName}-channel-manager"
case None => s"BrokerToControllerChannelManager broker=${config.brokerId} name=$channelName"
case Some(name) => s"$name:BrokerToControllerChannelManager broker=${config.brokerId} name=$channelName"
}
val controllerInformation = controllerNodeProvider.getControllerInfo() val controllerInformation = controllerNodeProvider.getControllerInfo()
new BrokerToControllerRequestThread( new BrokerToControllerRequestThread(

View File

@ -90,7 +90,6 @@ class ControllerServer(
val config = sharedServer.controllerConfig val config = sharedServer.controllerConfig
val time = sharedServer.time val time = sharedServer.time
def metrics = sharedServer.metrics def metrics = sharedServer.metrics
val threadNamePrefix = sharedServer.threadNamePrefix.getOrElse("")
def raftManager: KafkaRaftManager[ApiMessageAndVersion] = sharedServer.raftManager def raftManager: KafkaRaftManager[ApiMessageAndVersion] = sharedServer.raftManager
val lock = new ReentrantLock() val lock = new ReentrantLock()
@ -131,11 +130,11 @@ class ControllerServer(
if (!maybeChangeStatus(SHUTDOWN, STARTING)) return if (!maybeChangeStatus(SHUTDOWN, STARTING)) return
val startupDeadline = Deadline.fromDelay(time, config.serverMaxStartupTimeMs, TimeUnit.MILLISECONDS) val startupDeadline = Deadline.fromDelay(time, config.serverMaxStartupTimeMs, TimeUnit.MILLISECONDS)
try { try {
this.logIdent = new LogContext(s"[ControllerServer id=${config.nodeId}] ").logPrefix()
info("Starting controller") info("Starting controller")
config.dynamicConfig.initialize(zkClientOpt = None) config.dynamicConfig.initialize(zkClientOpt = None)
maybeChangeStatus(STARTING, STARTED) maybeChangeStatus(STARTING, STARTED)
this.logIdent = new LogContext(s"[ControllerServer id=${config.nodeId}] ").logPrefix()
metricsGroup.newGauge("ClusterId", () => clusterId) metricsGroup.newGauge("ClusterId", () => clusterId)
metricsGroup.newGauge("yammer-metrics-count", () => KafkaYammerMetrics.defaultRegistry.allMetrics.size) metricsGroup.newGauge("yammer-metrics-count", () => KafkaYammerMetrics.defaultRegistry.allMetrics.size)
@ -218,7 +217,7 @@ class ControllerServer(
new QuorumController.Builder(config.nodeId, sharedServer.metaProps.clusterId). new QuorumController.Builder(config.nodeId, sharedServer.metaProps.clusterId).
setTime(time). setTime(time).
setThreadNamePrefix(threadNamePrefix). setThreadNamePrefix(s"quorum-controller-${config.nodeId}-").
setConfigSchema(configSchema). setConfigSchema(configSchema).
setRaftClient(raftManager.client). setRaftClient(raftManager.client).
setQuorumFeatures(quorumFeatures). setQuorumFeatures(quorumFeatures).
@ -274,7 +273,7 @@ class ControllerServer(
quotaManagers = QuotaFactory.instantiate(config, quotaManagers = QuotaFactory.instantiate(config,
metrics, metrics,
time, time,
threadNamePrefix) s"controller-${config.nodeId}-")
clientQuotaMetadataManager = new ClientQuotaMetadataManager(quotaManagers, socketServer.connectionQuotas) clientQuotaMetadataManager = new ClientQuotaMetadataManager(quotaManagers, socketServer.connectionQuotas)
controllerApis = new ControllerApis(socketServer.dataPlaneRequestChannel, controllerApis = new ControllerApis(socketServer.dataPlaneRequestChannel,
authorizer, authorizer,

View File

@ -1718,7 +1718,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
distinctRoles distinctRoles
} }
def isKRaftCoResidentMode: Boolean = { def isKRaftCombinedMode: Boolean = {
processRoles == Set(BrokerRole, ControllerRole) processRoles == Set(BrokerRole, ControllerRole)
} }
@ -2280,8 +2280,8 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
validateControllerQuorumVotersMustContainNodeIdForKRaftController() validateControllerQuorumVotersMustContainNodeIdForKRaftController()
validateControllerListenerExistsForKRaftController() validateControllerListenerExistsForKRaftController()
validateControllerListenerNamesMustAppearInListenersForKRaftController() validateControllerListenerNamesMustAppearInListenersForKRaftController()
} else if (isKRaftCoResidentMode) { } else if (isKRaftCombinedMode) {
// KRaft colocated broker and controller // KRaft combined broker and controller
validateNonEmptyQuorumVotersForKRaft() validateNonEmptyQuorumVotersForKRaft()
validateControlPlaneListenerEmptyForKRaft() validateControlPlaneListenerEmptyForKRaft()
validateAdvertisedListenersDoesNotContainControllerListenersForKRaftBroker() validateAdvertisedListenersDoesNotContainControllerListenersForKRaftBroker()

View File

@ -48,7 +48,6 @@ import scala.jdk.CollectionConverters._
class KafkaRaftServer( class KafkaRaftServer(
config: KafkaConfig, config: KafkaConfig,
time: Time, time: Time,
threadNamePrefix: Option[String]
) extends Server with Logging { ) extends Server with Logging {
this.logIdent = s"[KafkaRaftServer nodeId=${config.nodeId}] " this.logIdent = s"[KafkaRaftServer nodeId=${config.nodeId}] "
@ -71,7 +70,6 @@ class KafkaRaftServer(
metaProps, metaProps,
time, time,
metrics, metrics,
threadNamePrefix,
controllerQuorumVotersFuture, controllerQuorumVotersFuture,
new StandardFaultHandlerFactory(), new StandardFaultHandlerFactory(),
) )

View File

@ -311,7 +311,7 @@ class KafkaServer(
metrics = metrics, metrics = metrics,
config = config, config = config,
channelName = "forwarding", channelName = "forwarding",
threadNamePrefix = threadNamePrefix, s"zk-broker-${config.nodeId}-",
retryTimeoutMs = config.requestTimeoutMs.longValue retryTimeoutMs = config.requestTimeoutMs.longValue
) )
clientToControllerChannelManager.start() clientToControllerChannelManager.start()
@ -348,7 +348,7 @@ class KafkaServer(
controllerNodeProvider, controllerNodeProvider,
time = time, time = time,
metrics = metrics, metrics = metrics,
threadNamePrefix = threadNamePrefix, s"zk-broker-${config.nodeId}-",
brokerEpochSupplier = brokerEpochSupplier brokerEpochSupplier = brokerEpochSupplier
) )
} else { } else {
@ -379,7 +379,7 @@ class KafkaServer(
logger.info("Starting up additional components for ZooKeeper migration") logger.info("Starting up additional components for ZooKeeper migration")
lifecycleManager = new BrokerLifecycleManager(config, lifecycleManager = new BrokerLifecycleManager(config,
time, time,
threadNamePrefix, s"zk-broker-${config.nodeId}-",
isZkBroker = true) isZkBroker = true)
// If the ZK broker is in migration mode, start up a RaftManager to learn about the new KRaft controller // If the ZK broker is in migration mode, start up a RaftManager to learn about the new KRaft controller
@ -406,7 +406,7 @@ class KafkaServer(
metrics = metrics, metrics = metrics,
config = config, config = config,
channelName = "quorum", channelName = "quorum",
threadNamePrefix = threadNamePrefix, s"zk-broker-${config.nodeId}-",
retryTimeoutMs = config.requestTimeoutMs.longValue retryTimeoutMs = config.requestTimeoutMs.longValue
) )

View File

@ -88,7 +88,6 @@ class SharedServer(
val metaProps: MetaProperties, val metaProps: MetaProperties,
val time: Time, val time: Time,
private val _metrics: Metrics, private val _metrics: Metrics,
val threadNamePrefix: Option[String],
val controllerQuorumVotersFuture: CompletableFuture[util.Map[Integer, AddressSpec]], val controllerQuorumVotersFuture: CompletableFuture[util.Map[Integer, AddressSpec]],
val faultHandlerFactory: FaultHandlerFactory val faultHandlerFactory: FaultHandlerFactory
) extends Logging { ) extends Logging {
@ -243,7 +242,7 @@ class SharedServer(
KafkaRaftServer.MetadataTopicId, KafkaRaftServer.MetadataTopicId,
time, time,
metrics, metrics,
threadNamePrefix, Some(s"kafka-${sharedServerConfig.nodeId}-raft"), // No dash expected at the end
controllerQuorumVotersFuture, controllerQuorumVotersFuture,
raftManagerFaultHandler raftManagerFaultHandler
) )
@ -252,7 +251,7 @@ class SharedServer(
val loaderBuilder = new MetadataLoader.Builder(). val loaderBuilder = new MetadataLoader.Builder().
setNodeId(metaProps.nodeId). setNodeId(metaProps.nodeId).
setTime(time). setTime(time).
setThreadNamePrefix(threadNamePrefix.getOrElse("")). setThreadNamePrefix(s"kafka-${sharedServerConfig.nodeId}-").
setFaultHandler(metadataLoaderFaultHandler). setFaultHandler(metadataLoaderFaultHandler).
setHighWaterMarkAccessor(() => raftManager.client.highWatermark()) setHighWaterMarkAccessor(() => raftManager.client.highWatermark())
if (brokerMetrics != null) { if (brokerMetrics != null) {
@ -270,6 +269,7 @@ class SharedServer(
setMaxBytesSinceLastSnapshot(sharedServerConfig.metadataSnapshotMaxNewRecordBytes). setMaxBytesSinceLastSnapshot(sharedServerConfig.metadataSnapshotMaxNewRecordBytes).
setMaxTimeSinceLastSnapshotNs(TimeUnit.MILLISECONDS.toNanos(sharedServerConfig.metadataSnapshotMaxIntervalMs)). setMaxTimeSinceLastSnapshotNs(TimeUnit.MILLISECONDS.toNanos(sharedServerConfig.metadataSnapshotMaxIntervalMs)).
setDisabledReason(snapshotsDiabledReason). setDisabledReason(snapshotsDiabledReason).
setThreadNamePrefix(s"kafka-${sharedServerConfig.nodeId}-").
build() build()
raftManager.register(loader) raftManager.register(loader)
try { try {

View File

@ -68,13 +68,13 @@ public class RaftClusterInvocationContext implements TestTemplateInvocationConte
private final ClusterConfig clusterConfig; private final ClusterConfig clusterConfig;
private final AtomicReference<KafkaClusterTestKit> clusterReference; private final AtomicReference<KafkaClusterTestKit> clusterReference;
private final AtomicReference<EmbeddedZookeeper> zkReference; private final AtomicReference<EmbeddedZookeeper> zkReference;
private final boolean isCoResident; private final boolean isCombined;
public RaftClusterInvocationContext(ClusterConfig clusterConfig, boolean isCoResident) { public RaftClusterInvocationContext(ClusterConfig clusterConfig, boolean isCombined) {
this.clusterConfig = clusterConfig; this.clusterConfig = clusterConfig;
this.clusterReference = new AtomicReference<>(); this.clusterReference = new AtomicReference<>();
this.zkReference = new AtomicReference<>(); this.zkReference = new AtomicReference<>();
this.isCoResident = isCoResident; this.isCombined = isCombined;
} }
@Override @Override
@ -82,7 +82,7 @@ public class RaftClusterInvocationContext implements TestTemplateInvocationConte
String clusterDesc = clusterConfig.nameTags().entrySet().stream() String clusterDesc = clusterConfig.nameTags().entrySet().stream()
.map(Object::toString) .map(Object::toString)
.collect(Collectors.joining(", ")); .collect(Collectors.joining(", "));
return String.format("[%d] Type=Raft-%s, %s", invocationIndex, isCoResident ? "CoReside" : "Distributed", clusterDesc); return String.format("[%d] Type=Raft-%s, %s", invocationIndex, isCombined ? "Combined" : "Isolated", clusterDesc);
} }
@Override @Override
@ -92,7 +92,7 @@ public class RaftClusterInvocationContext implements TestTemplateInvocationConte
(BeforeTestExecutionCallback) context -> { (BeforeTestExecutionCallback) context -> {
TestKitNodes nodes = new TestKitNodes.Builder(). TestKitNodes nodes = new TestKitNodes.Builder().
setBootstrapMetadataVersion(clusterConfig.metadataVersion()). setBootstrapMetadataVersion(clusterConfig.metadataVersion()).
setCoResident(isCoResident). setCombined(isCombined).
setNumBrokerNodes(clusterConfig.numBrokers()). setNumBrokerNodes(clusterConfig.numBrokers()).
setNumControllerNodes(clusterConfig.numControllers()).build(); setNumControllerNodes(clusterConfig.numControllers()).build();
nodes.brokerNodes().forEach((brokerId, brokerNode) -> { nodes.brokerNodes().forEach((brokerId, brokerNode) -> {

View File

@ -218,19 +218,15 @@ public class KafkaClusterTestKit implements AutoCloseable {
baseDirectory = TestUtils.tempDirectory(); baseDirectory = TestUtils.tempDirectory();
nodes = nodes.copyWithAbsolutePaths(baseDirectory.getAbsolutePath()); nodes = nodes.copyWithAbsolutePaths(baseDirectory.getAbsolutePath());
executorService = Executors.newFixedThreadPool(numOfExecutorThreads, executorService = Executors.newFixedThreadPool(numOfExecutorThreads,
ThreadUtils.createThreadFactory("KafkaClusterTestKit%d", false)); ThreadUtils.createThreadFactory("kafka-cluster-test-kit-executor-%d", false));
for (ControllerNode node : nodes.controllerNodes().values()) { for (ControllerNode node : nodes.controllerNodes().values()) {
setupNodeDirectories(baseDirectory, node.metadataDirectory(), Collections.emptyList()); setupNodeDirectories(baseDirectory, node.metadataDirectory(), Collections.emptyList());
BootstrapMetadata bootstrapMetadata = BootstrapMetadata. BootstrapMetadata bootstrapMetadata = BootstrapMetadata.
fromVersion(nodes.bootstrapMetadataVersion(), "testkit"); fromVersion(nodes.bootstrapMetadataVersion(), "testkit");
String threadNamePrefix = (nodes.brokerNodes().containsKey(node.id())) ?
String.format("colocated%d", node.id()) :
String.format("controller%d", node.id());
SharedServer sharedServer = new SharedServer(createNodeConfig(node), SharedServer sharedServer = new SharedServer(createNodeConfig(node),
MetaProperties.apply(nodes.clusterId().toString(), node.id()), MetaProperties.apply(nodes.clusterId().toString(), node.id()),
Time.SYSTEM, Time.SYSTEM,
new Metrics(), new Metrics(),
Option.apply(threadNamePrefix),
connectFutureManager.future, connectFutureManager.future,
faultHandlerFactory); faultHandlerFactory);
ControllerServer controller = null; ControllerServer controller = null;
@ -261,7 +257,6 @@ public class KafkaClusterTestKit implements AutoCloseable {
MetaProperties.apply(nodes.clusterId().toString(), id), MetaProperties.apply(nodes.clusterId().toString(), id),
Time.SYSTEM, Time.SYSTEM,
new Metrics(), new Metrics(),
Option.apply(String.format("broker%d_", id)),
connectFutureManager.future, connectFutureManager.future,
faultHandlerFactory)); faultHandlerFactory));
BrokerServer broker = null; BrokerServer broker = null;
@ -304,7 +299,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
} }
private String listeners(int node) { private String listeners(int node) {
if (nodes.isCoResidentNode(node)) { if (nodes.isCombined(node)) {
return "EXTERNAL://localhost:0,CONTROLLER://localhost:0"; return "EXTERNAL://localhost:0,CONTROLLER://localhost:0";
} }
if (nodes.controllerNodes().containsKey(node)) { if (nodes.controllerNodes().containsKey(node)) {
@ -314,7 +309,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
} }
private String roles(int node) { private String roles(int node) {
if (nodes.isCoResidentNode(node)) { if (nodes.isCombined(node)) {
return "broker,controller"; return "broker,controller";
} }
if (nodes.controllerNodes().containsKey(node)) { if (nodes.controllerNodes().containsKey(node)) {

View File

@ -33,7 +33,7 @@ import java.util.TreeMap;
public class TestKitNodes { public class TestKitNodes {
public static class Builder { public static class Builder {
private boolean coResident = false; private boolean combined = false;
private Uuid clusterId = null; private Uuid clusterId = null;
private MetadataVersion bootstrapMetadataVersion = null; private MetadataVersion bootstrapMetadataVersion = null;
private final NavigableMap<Integer, ControllerNode> controllerNodes = new TreeMap<>(); private final NavigableMap<Integer, ControllerNode> controllerNodes = new TreeMap<>();
@ -49,8 +49,8 @@ public class TestKitNodes {
return this; return this;
} }
public Builder setCoResident(boolean coResident) { public Builder setCombined(boolean combined) {
this.coResident = coResident; this.combined = combined;
return this; return this;
} }
@ -127,7 +127,7 @@ public class TestKitNodes {
} }
private int startControllerId() { private int startControllerId() {
if (coResident) { if (combined) {
return startBrokerId(); return startBrokerId();
} }
return startBrokerId() + 3000; return startBrokerId() + 3000;
@ -139,7 +139,7 @@ public class TestKitNodes {
private final NavigableMap<Integer, ControllerNode> controllerNodes; private final NavigableMap<Integer, ControllerNode> controllerNodes;
private final NavigableMap<Integer, BrokerNode> brokerNodes; private final NavigableMap<Integer, BrokerNode> brokerNodes;
public boolean isCoResidentNode(int node) { public boolean isCombined(int node) {
return controllerNodes.containsKey(node) && brokerNodes.containsKey(node); return controllerNodes.containsKey(node) && brokerNodes.containsKey(node);
} }

View File

@ -980,7 +980,7 @@ class KRaftClusterTest {
val cluster = new KafkaClusterTestKit.Builder( val cluster = new KafkaClusterTestKit.Builder(
new TestKitNodes.Builder(). new TestKitNodes.Builder().
setNumBrokerNodes(1). setNumBrokerNodes(1).
setCoResident(combinedController). setCombined(combinedController).
setNumControllerNodes(1).build()). setNumControllerNodes(1).build()).
setConfigProp("client.quota.callback.class", classOf[DummyClientQuotaCallback].getName). setConfigProp("client.quota.callback.class", classOf[DummyClientQuotaCallback].getName).
setConfigProp(DummyClientQuotaCallback.dummyClientQuotaCallbackValueConfigKey, "0"). setConfigProp(DummyClientQuotaCallback.dummyClientQuotaCallbackValueConfigKey, "0").
@ -1022,7 +1022,7 @@ class KRaftClusterTest {
val cluster = new KafkaClusterTestKit.Builder( val cluster = new KafkaClusterTestKit.Builder(
new TestKitNodes.Builder(). new TestKitNodes.Builder().
setNumBrokerNodes(1). setNumBrokerNodes(1).
setCoResident(combinedMode). setCombined(combinedMode).
setNumControllerNodes(1).build()). setNumControllerNodes(1).build()).
setConfigProp("authorizer.class.name", classOf[FakeConfigurableAuthorizer].getName). setConfigProp("authorizer.class.name", classOf[FakeConfigurableAuthorizer].getName).
build() build()

View File

@ -98,7 +98,6 @@ class KRaftQuorumImplementation(
new MetaProperties(clusterId, config.nodeId), new MetaProperties(clusterId, config.nodeId),
Time.SYSTEM, Time.SYSTEM,
new Metrics(), new Metrics(),
Option("Broker%02d_".format(config.nodeId)),
controllerQuorumVotersFuture, controllerQuorumVotersFuture,
faultHandlerFactory) faultHandlerFactory)
var broker: BrokerServer = null var broker: BrokerServer = null
@ -316,7 +315,6 @@ abstract class QuorumTestHarness extends Logging {
metaProperties, metaProperties,
Time.SYSTEM, Time.SYSTEM,
new Metrics(), new Metrics(),
Option("Controller_" + testInfo.getDisplayName),
controllerQuorumVotersFuture, controllerQuorumVotersFuture,
faultHandlerFactory) faultHandlerFactory)
var controllerServer: ControllerServer = null var controllerServer: ControllerServer = null

View File

@ -112,8 +112,8 @@ class KafkaTest {
} }
@Test @Test
def testColocatedRoleNodeIdValidation(): Unit = { def testCombinedRoleNodeIdValidation(): Unit = {
// Ensure that validation is happening at startup to check that colocated processes use their node.id as a voter in controller.quorum.voters // Ensure that validation is happening at startup to check that combined processes use their node.id as a voter in controller.quorum.voters
val propertiesFile = new Properties val propertiesFile = new Properties
propertiesFile.setProperty(KafkaConfig.ProcessRolesProp, "controller,broker") propertiesFile.setProperty(KafkaConfig.ProcessRolesProp, "controller,broker")
propertiesFile.setProperty(KafkaConfig.NodeIdProp, "1") propertiesFile.setProperty(KafkaConfig.NodeIdProp, "1")
@ -127,6 +127,16 @@ class KafkaTest {
KafkaConfig.fromProps(propertiesFile) KafkaConfig.fromProps(propertiesFile)
} }
@Test
def testIsKRaftCombinedMode(): Unit = {
val propertiesFile = new Properties
propertiesFile.setProperty(KafkaConfig.ProcessRolesProp, "controller,broker")
propertiesFile.setProperty(KafkaConfig.NodeIdProp, "1")
propertiesFile.setProperty(KafkaConfig.QuorumVotersProp, "1@localhost:9092")
val config = KafkaConfig.fromProps(propertiesFile)
assertTrue(config.isKRaftCombinedMode)
}
@Test @Test
def testMustContainQuorumVotersIfUsingProcessRoles(): Unit = { def testMustContainQuorumVotersIfUsingProcessRoles(): Unit = {
// Ensure that validation is happening at startup to check that if process.roles is set controller.quorum.voters is not empty // Ensure that validation is happening at startup to check that if process.roles is set controller.quorum.voters is not empty

View File

@ -98,14 +98,14 @@ class BrokerLifecycleManagerTest {
@Test @Test
def testCreateAndClose(): Unit = { def testCreateAndClose(): Unit = {
val context = new BrokerLifecycleManagerTestContext(configProperties) val context = new BrokerLifecycleManagerTestContext(configProperties)
val manager = new BrokerLifecycleManager(context.config, context.time, None, isZkBroker = false) val manager = new BrokerLifecycleManager(context.config, context.time, "create-and-close-", isZkBroker = false)
manager.close() manager.close()
} }
@Test @Test
def testCreateStartAndClose(): Unit = { def testCreateStartAndClose(): Unit = {
val context = new BrokerLifecycleManagerTestContext(configProperties) val context = new BrokerLifecycleManagerTestContext(configProperties)
val manager = new BrokerLifecycleManager(context.config, context.time, None, isZkBroker = false) val manager = new BrokerLifecycleManager(context.config, context.time, "create-start-and-close-", isZkBroker = false)
assertEquals(BrokerState.NOT_RUNNING, manager.state) assertEquals(BrokerState.NOT_RUNNING, manager.state)
manager.start(() => context.highestMetadataOffset.get(), manager.start(() => context.highestMetadataOffset.get(),
context.mockChannelManager, context.clusterId, context.advertisedListeners, context.mockChannelManager, context.clusterId, context.advertisedListeners,
@ -120,7 +120,7 @@ class BrokerLifecycleManagerTest {
@Test @Test
def testSuccessfulRegistration(): Unit = { def testSuccessfulRegistration(): Unit = {
val context = new BrokerLifecycleManagerTestContext(configProperties) val context = new BrokerLifecycleManagerTestContext(configProperties)
val manager = new BrokerLifecycleManager(context.config, context.time, None, isZkBroker = false) val manager = new BrokerLifecycleManager(context.config, context.time, "successful-registration-", isZkBroker = false)
val controllerNode = new Node(3000, "localhost", 8021) val controllerNode = new Node(3000, "localhost", 8021)
context.controllerNodeProvider.node.set(controllerNode) context.controllerNodeProvider.node.set(controllerNode)
context.mockClient.prepareResponseFrom(new BrokerRegistrationResponse( context.mockClient.prepareResponseFrom(new BrokerRegistrationResponse(
@ -140,7 +140,7 @@ class BrokerLifecycleManagerTest {
def testRegistrationTimeout(): Unit = { def testRegistrationTimeout(): Unit = {
val context = new BrokerLifecycleManagerTestContext(configProperties) val context = new BrokerLifecycleManagerTestContext(configProperties)
val controllerNode = new Node(3000, "localhost", 8021) val controllerNode = new Node(3000, "localhost", 8021)
val manager = new BrokerLifecycleManager(context.config, context.time, None, isZkBroker = false) val manager = new BrokerLifecycleManager(context.config, context.time, "registration-timeout-", isZkBroker = false)
context.controllerNodeProvider.node.set(controllerNode) context.controllerNodeProvider.node.set(controllerNode)
def newDuplicateRegistrationResponse(): Unit = { def newDuplicateRegistrationResponse(): Unit = {
context.mockClient.prepareResponseFrom(new BrokerRegistrationResponse( context.mockClient.prepareResponseFrom(new BrokerRegistrationResponse(
@ -181,7 +181,7 @@ class BrokerLifecycleManagerTest {
@Test @Test
def testControlledShutdown(): Unit = { def testControlledShutdown(): Unit = {
val context = new BrokerLifecycleManagerTestContext(configProperties) val context = new BrokerLifecycleManagerTestContext(configProperties)
val manager = new BrokerLifecycleManager(context.config, context.time, None, isZkBroker = false) val manager = new BrokerLifecycleManager(context.config, context.time, "controlled-shutdown-", isZkBroker = false)
val controllerNode = new Node(3000, "localhost", 8021) val controllerNode = new Node(3000, "localhost", 8021)
context.controllerNodeProvider.node.set(controllerNode) context.controllerNodeProvider.node.set(controllerNode)
context.mockClient.prepareResponseFrom(new BrokerRegistrationResponse( context.mockClient.prepareResponseFrom(new BrokerRegistrationResponse(

View File

@ -69,7 +69,7 @@ class BrokerRegistrationRequestTest {
new Metrics(), new Metrics(),
clusterInstance.anyControllerSocketServer().config, clusterInstance.anyControllerSocketServer().config,
"heartbeat", "heartbeat",
Some("heartbeat"), "test-heartbeat-",
10000 10000
) )
} }

View File

@ -316,10 +316,10 @@ public final class QuorumController implements Controller {
} }
if (threadNamePrefix == null) { if (threadNamePrefix == null) {
threadNamePrefix = String.format("Node%d_", nodeId); threadNamePrefix = String.format("quorum-controller-%d-", nodeId);
} }
if (logContext == null) { if (logContext == null) {
logContext = new LogContext(String.format("[Controller %d] ", nodeId)); logContext = new LogContext(String.format("[QuorumController id=%d] ", nodeId));
} }
if (controllerMetrics == null) { if (controllerMetrics == null) {
controllerMetrics = (ControllerMetrics) Class.forName( controllerMetrics = (ControllerMetrics) Class.forName(
@ -328,7 +328,7 @@ public final class QuorumController implements Controller {
KafkaEventQueue queue = null; KafkaEventQueue queue = null;
try { try {
queue = new KafkaEventQueue(time, logContext, threadNamePrefix + "QuorumController"); queue = new KafkaEventQueue(time, logContext, threadNamePrefix);
return new QuorumController( return new QuorumController(
fatalFaultHandler, fatalFaultHandler,
logContext, logContext,

View File

@ -64,9 +64,9 @@ import static java.util.concurrent.TimeUnit.NANOSECONDS;
public class MetadataLoader implements RaftClient.Listener<ApiMessageAndVersion>, AutoCloseable { public class MetadataLoader implements RaftClient.Listener<ApiMessageAndVersion>, AutoCloseable {
public static class Builder { public static class Builder {
private int nodeId = -1; private int nodeId = -1;
private String threadNamePrefix = "";
private Time time = Time.SYSTEM; private Time time = Time.SYSTEM;
private LogContext logContext = null; private LogContext logContext = null;
private String threadNamePrefix = "";
private FaultHandler faultHandler = (m, e) -> new FaultHandlerException(m, e); private FaultHandler faultHandler = (m, e) -> new FaultHandlerException(m, e);
private MetadataLoaderMetrics metrics = new MetadataLoaderMetrics() { private MetadataLoaderMetrics metrics = new MetadataLoaderMetrics() {
private volatile long lastAppliedOffset = -1L; private volatile long lastAppliedOffset = -1L;
@ -97,13 +97,13 @@ public class MetadataLoader implements RaftClient.Listener<ApiMessageAndVersion>
return this; return this;
} }
public Builder setTime(Time time) { public Builder setThreadNamePrefix(String threadNamePrefix) {
this.time = time; this.threadNamePrefix = threadNamePrefix;
return this; return this;
} }
public Builder setThreadNamePrefix(String threadNamePrefix) { public Builder setTime(Time time) {
this.threadNamePrefix = threadNamePrefix; this.time = time;
return this; return this;
} }
@ -124,7 +124,7 @@ public class MetadataLoader implements RaftClient.Listener<ApiMessageAndVersion>
public MetadataLoader build() { public MetadataLoader build() {
if (logContext == null) { if (logContext == null) {
logContext = new LogContext("[MetadataLoader " + nodeId + "] "); logContext = new LogContext("[MetadataLoader id=" + nodeId + "] ");
} }
if (highWaterMarkAccessor == null) { if (highWaterMarkAccessor == null) {
throw new RuntimeException("You must set the high water mark accessor."); throw new RuntimeException("You must set the high water mark accessor.");
@ -132,6 +132,7 @@ public class MetadataLoader implements RaftClient.Listener<ApiMessageAndVersion>
return new MetadataLoader( return new MetadataLoader(
time, time,
logContext, logContext,
nodeId,
threadNamePrefix, threadNamePrefix,
faultHandler, faultHandler,
metrics, metrics,
@ -197,6 +198,7 @@ public class MetadataLoader implements RaftClient.Listener<ApiMessageAndVersion>
private MetadataLoader( private MetadataLoader(
Time time, Time time,
LogContext logContext, LogContext logContext,
int nodeId,
String threadNamePrefix, String threadNamePrefix,
FaultHandler faultHandler, FaultHandler faultHandler,
MetadataLoaderMetrics metrics, MetadataLoaderMetrics metrics,
@ -210,7 +212,9 @@ public class MetadataLoader implements RaftClient.Listener<ApiMessageAndVersion>
this.uninitializedPublishers = new LinkedHashMap<>(); this.uninitializedPublishers = new LinkedHashMap<>();
this.publishers = new LinkedHashMap<>(); this.publishers = new LinkedHashMap<>();
this.image = MetadataImage.EMPTY; this.image = MetadataImage.EMPTY;
this.eventQueue = new KafkaEventQueue(time, logContext, threadNamePrefix, new ShutdownEvent()); this.eventQueue = new KafkaEventQueue(time, logContext,
threadNamePrefix + "metadata-loader-",
new ShutdownEvent());
} }
private boolean stillNeedToCatchUp(long offset) { private boolean stillNeedToCatchUp(long offset) {

View File

@ -45,6 +45,7 @@ public class SnapshotGenerator implements MetadataPublisher {
private long maxBytesSinceLastSnapshot = 100 * 1024L * 1024L; private long maxBytesSinceLastSnapshot = 100 * 1024L * 1024L;
private long maxTimeSinceLastSnapshotNs = TimeUnit.DAYS.toNanos(1); private long maxTimeSinceLastSnapshotNs = TimeUnit.DAYS.toNanos(1);
private AtomicReference<String> disabledReason = null; private AtomicReference<String> disabledReason = null;
private String threadNamePrefix = "";
public Builder(Emitter emitter) { public Builder(Emitter emitter) {
this.emitter = emitter; this.emitter = emitter;
@ -80,6 +81,11 @@ public class SnapshotGenerator implements MetadataPublisher {
return this; return this;
} }
public Builder setThreadNamePrefix(String threadNamePrefix) {
this.threadNamePrefix = threadNamePrefix;
return this;
}
public SnapshotGenerator build() { public SnapshotGenerator build() {
if (disabledReason == null) { if (disabledReason == null) {
disabledReason = new AtomicReference<>(); disabledReason = new AtomicReference<>();
@ -91,7 +97,8 @@ public class SnapshotGenerator implements MetadataPublisher {
faultHandler, faultHandler,
maxBytesSinceLastSnapshot, maxBytesSinceLastSnapshot,
maxTimeSinceLastSnapshotNs, maxTimeSinceLastSnapshotNs,
disabledReason disabledReason,
threadNamePrefix
); );
} }
} }
@ -174,7 +181,8 @@ public class SnapshotGenerator implements MetadataPublisher {
FaultHandler faultHandler, FaultHandler faultHandler,
long maxBytesSinceLastSnapshot, long maxBytesSinceLastSnapshot,
long maxTimeSinceLastSnapshotNs, long maxTimeSinceLastSnapshotNs,
AtomicReference<String> disabledReason AtomicReference<String> disabledReason,
String threadNamePrefix
) { ) {
this.nodeId = nodeId; this.nodeId = nodeId;
this.time = time; this.time = time;
@ -182,10 +190,10 @@ public class SnapshotGenerator implements MetadataPublisher {
this.faultHandler = faultHandler; this.faultHandler = faultHandler;
this.maxBytesSinceLastSnapshot = maxBytesSinceLastSnapshot; this.maxBytesSinceLastSnapshot = maxBytesSinceLastSnapshot;
this.maxTimeSinceLastSnapshotNs = maxTimeSinceLastSnapshotNs; this.maxTimeSinceLastSnapshotNs = maxTimeSinceLastSnapshotNs;
LogContext logContext = new LogContext("[SnapshotGenerator " + nodeId + "] "); LogContext logContext = new LogContext("[SnapshotGenerator id=" + nodeId + "] ");
this.log = logContext.logger(SnapshotGenerator.class); this.log = logContext.logger(SnapshotGenerator.class);
this.disabledReason = disabledReason; this.disabledReason = disabledReason;
this.eventQueue = new KafkaEventQueue(time, logContext, "SnapshotGenerator" + nodeId); this.eventQueue = new KafkaEventQueue(time, logContext, threadNamePrefix + "snapshot-generator-");
resetSnapshotCounters(); resetSnapshotCounters();
log.debug("Starting SnapshotGenerator."); log.debug("Starting SnapshotGenerator.");
} }

View File

@ -89,11 +89,11 @@ public class KRaftMigrationDriver implements MetadataPublisher {
this.zkMigrationClient = zkMigrationClient; this.zkMigrationClient = zkMigrationClient;
this.propagator = propagator; this.propagator = propagator;
this.time = Time.SYSTEM; this.time = Time.SYSTEM;
LogContext logContext = new LogContext(String.format("[KRaftMigrationDriver nodeId=%d] ", nodeId)); LogContext logContext = new LogContext("[KRaftMigrationDriver id=" + nodeId + "] ");
this.log = logContext.logger(KRaftMigrationDriver.class); this.log = logContext.logger(KRaftMigrationDriver.class);
this.migrationState = MigrationDriverState.UNINITIALIZED; this.migrationState = MigrationDriverState.UNINITIALIZED;
this.migrationLeadershipState = ZkMigrationLeadershipState.EMPTY; this.migrationLeadershipState = ZkMigrationLeadershipState.EMPTY;
this.eventQueue = new KafkaEventQueue(Time.SYSTEM, logContext, "kraft-migration"); this.eventQueue = new KafkaEventQueue(Time.SYSTEM, logContext, "controller-" + nodeId + "-migration-driver-");
this.image = MetadataImage.EMPTY; this.image = MetadataImage.EMPTY;
this.leaderAndEpoch = LeaderAndEpoch.UNKNOWN; this.leaderAndEpoch = LeaderAndEpoch.UNKNOWN;
this.initialZkLoadHandler = initialZkLoadHandler; this.initialZkLoadHandler = initialZkLoadHandler;

View File

@ -453,7 +453,7 @@ public final class KafkaEventQueue implements EventQueue {
this.lock = new ReentrantLock(); this.lock = new ReentrantLock();
this.log = logContext.logger(KafkaEventQueue.class); this.log = logContext.logger(KafkaEventQueue.class);
this.eventHandler = new EventHandler(); this.eventHandler = new EventHandler();
this.eventHandlerThread = new KafkaThread(threadNamePrefix + "EventHandler", this.eventHandlerThread = new KafkaThread(threadNamePrefix + "event-handler",
this.eventHandler, false); this.eventHandler, false);
this.shuttingDown = false; this.shuttingDown = false;
this.interrupted = false; this.interrupted = false;

View File

@ -36,7 +36,7 @@ class TestBounce(Test):
raise Exception("Illegal %s value provided for the test: %s" % (quorum_size_arg_name, quorum_size)) raise Exception("Illegal %s value provided for the test: %s" % (quorum_size_arg_name, quorum_size))
self.topic = "topic" self.topic = "topic"
self.zk = ZookeeperService(test_context, num_nodes=quorum_size) if quorum.for_test(test_context) == quorum.zk else None self.zk = ZookeeperService(test_context, num_nodes=quorum_size) if quorum.for_test(test_context) == quorum.zk else None
num_kafka_nodes = quorum_size if quorum.for_test(test_context) == quorum.colocated_kraft else 1 num_kafka_nodes = quorum_size if quorum.for_test(test_context) == quorum.combined_kraft else 1
self.kafka = KafkaService(test_context, num_nodes=num_kafka_nodes, zk=self.zk, self.kafka = KafkaService(test_context, num_nodes=num_kafka_nodes, zk=self.zk,
topics={self.topic: {"partitions": 1, "replication-factor": 1}}, topics={self.topic: {"partitions": 1, "replication-factor": 1}},
controller_num_nodes_override=quorum_size) controller_num_nodes_override=quorum_size)
@ -53,7 +53,7 @@ class TestBounce(Test):
# ZooKeeper and KRaft, quorum size = 1 # ZooKeeper and KRaft, quorum size = 1
@cluster(num_nodes=4) @cluster(num_nodes=4)
@matrix(metadata_quorum=quorum.all, quorum_size=[1]) @matrix(metadata_quorum=quorum.all, quorum_size=[1])
# Remote and Co-located KRaft, quorum size = 3 # Isolated and Combined KRaft, quorum size = 3
@cluster(num_nodes=6) @cluster(num_nodes=6)
@matrix(metadata_quorum=quorum.all_kraft, quorum_size=[3]) @matrix(metadata_quorum=quorum.all_kraft, quorum_size=[3])
def test_simple_run(self, metadata_quorum, quorum_size): def test_simple_run(self, metadata_quorum, quorum_size):
@ -73,6 +73,6 @@ class TestBounce(Test):
assert num_produced == self.num_messages, "num_produced: %d, num_messages: %d" % (num_produced, self.num_messages) assert num_produced == self.num_messages, "num_produced: %d, num_messages: %d" % (num_produced, self.num_messages)
if first_time: if first_time:
self.producer.stop() self.producer.stop()
if self.kafka.quorum_info.using_kraft and self.kafka.remote_controller_quorum: if self.kafka.quorum_info.using_kraft and self.kafka.isolated_controller_quorum:
self.kafka.remote_controller_quorum.restart_cluster() self.kafka.isolated_controller_quorum.restart_cluster()
self.kafka.restart_cluster() self.kafka.restart_cluster()

View File

@ -77,11 +77,11 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
in process.roles (0 when using Zookeeper) in process.roles (0 when using Zookeeper)
controller_quorum : KafkaService controller_quorum : KafkaService
None when using ZooKeeper, otherwise the Kafka service for the None when using ZooKeeper, otherwise the Kafka service for the
co-located case or the remote controller quorum service combined case or the isolated controller quorum service
instance for the remote case instance for the isolated case
remote_controller_quorum : KafkaService isolated_controller_quorum : KafkaService
None for the co-located case or when using ZooKeeper, otherwise None for the combined case or when using ZooKeeper, otherwise
the remote controller quorum service instance the isolated controller quorum service instance
Kafka Security Protocols Kafka Security Protocols
------------------------ ------------------------
@ -106,12 +106,12 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
KRaft Quorums KRaft Quorums
------------ ------------
Set metadata_quorum accordingly (to COLOCATED_KRAFT or REMOTE_KRAFT). Set metadata_quorum accordingly (to COMBINED_KRAFT or ISOLATED_KRAFT).
Do not instantiate a ZookeeperService instance. Do not instantiate a ZookeeperService instance.
Starting Kafka will cause any remote controller quorum to Starting Kafka will cause any isolated controller quorum to
automatically start first. Explicitly stopping Kafka does not stop automatically start first. Explicitly stopping Kafka does not stop
any remote controller quorum, but Ducktape will stop both when any isolated controller quorum, but Ducktape will stop both when
tearing down the test (it will stop Kafka first). tearing down the test (it will stop Kafka first).
KRaft Security Protocols KRaft Security Protocols
@ -119,12 +119,12 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
The broker-to-controller and inter-controller security protocols The broker-to-controller and inter-controller security protocols
will both initially be set to the inter-broker security protocol. will both initially be set to the inter-broker security protocol.
The broker-to-controller and inter-controller security protocols The broker-to-controller and inter-controller security protocols
must be identical for the co-located case (an exception will be must be identical for the combined case (an exception will be
thrown when trying to start the service if they are not identical). thrown when trying to start the service if they are not identical).
The broker-to-controller and inter-controller security protocols The broker-to-controller and inter-controller security protocols
can differ in the remote case. can differ in the isolated case.
Set these attributes for the co-located case. Changes take effect Set these attributes for the combined case. Changes take effect
when starting each node: when starting each node:
controller_security_protocol : str controller_security_protocol : str
@ -136,10 +136,10 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
intercontroller_sasl_mechanism : str intercontroller_sasl_mechanism : str
default GSSAPI, ignored unless using SASL_PLAINTEXT or SASL_SSL default GSSAPI, ignored unless using SASL_PLAINTEXT or SASL_SSL
Set the same attributes for the remote case (changes take effect Set the same attributes for the isolated case (changes take effect
when starting each quorum node), but you must first obtain the when starting each quorum node), but you must first obtain the
service instance for the remote quorum via one of the service instance for the isolated quorum via one of the
'controller_quorum' or 'remote_controller_quorum' attributes as 'controller_quorum' or 'isolated_controller_quorum' attributes as
defined above. defined above.
""" """
@ -200,7 +200,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
zk_client_secure=False, zk_client_secure=False,
listener_security_config=ListenerSecurityConfig(), per_node_server_prop_overrides=None, listener_security_config=ListenerSecurityConfig(), per_node_server_prop_overrides=None,
extra_kafka_opts="", tls_version=None, extra_kafka_opts="", tls_version=None,
remote_kafka=None, isolated_kafka=None,
controller_num_nodes_override=0, controller_num_nodes_override=0,
allow_zk_with_kraft=False, allow_zk_with_kraft=False,
quorum_info_provider=None quorum_info_provider=None
@ -211,7 +211,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
1) Zookeeper quorum: 1) Zookeeper quorum:
The number of brokers is defined by this parameter. The number of brokers is defined by this parameter.
The broker.id values will be 1..num_nodes. The broker.id values will be 1..num_nodes.
2) Co-located KRaft quorum: 2) Combined KRaft quorum:
The number of nodes having a broker role is defined by this parameter. The number of nodes having a broker role is defined by this parameter.
The node.id values will be 1..num_nodes The node.id values will be 1..num_nodes
The number of nodes having a controller role will by default be 1, 3, or 5 depending on num_nodes The number of nodes having a controller role will by default be 1, 3, or 5 depending on num_nodes
@ -231,10 +231,10 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
broker having node.id=1: broker.roles=broker+controller broker having node.id=1: broker.roles=broker+controller
broker having node.id=2: broker.roles=broker broker having node.id=2: broker.roles=broker
broker having node.id=3: broker.roles=broker broker having node.id=3: broker.roles=broker
3) Remote KRaft quorum when instantiating the broker service: 3) Isolated KRaft quorum when instantiating the broker service:
The number of nodes, all of which will have broker.roles=broker, is defined by this parameter. The number of nodes, all of which will have broker.roles=broker, is defined by this parameter.
The node.id values will be 1..num_nodes The node.id values will be 1..num_nodes
4) Remote KRaft quorum when instantiating the controller service: 4) Isolated KRaft quorum when instantiating the controller service:
The number of nodes, all of which will have broker.roles=controller, is defined by this parameter. The number of nodes, all of which will have broker.roles=controller, is defined by this parameter.
The node.id values will be 3001..(3000 + num_nodes) The node.id values will be 3001..(3000 + num_nodes)
The value passed in is determined by the broker service when that is instantiated, and it uses the The value passed in is determined by the broker service when that is instantiated, and it uses the
@ -260,21 +260,21 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
:param dict per_node_server_prop_overrides: overrides for kafka.properties file keyed by 1-based node number :param dict per_node_server_prop_overrides: overrides for kafka.properties file keyed by 1-based node number
e.g: {1: [["config1", "true"], ["config2", "1000"]], 2: [["config1", "false"], ["config2", "0"]]} e.g: {1: [["config1", "true"], ["config2", "1000"]], 2: [["config1", "false"], ["config2", "0"]]}
:param str extra_kafka_opts: jvm args to add to KAFKA_OPTS variable :param str extra_kafka_opts: jvm args to add to KAFKA_OPTS variable
:param KafkaService remote_kafka: process.roles=controller for this cluster when not None; ignored when using ZooKeeper :param KafkaService isolated_kafka: process.roles=controller for this cluster when not None; ignored when using ZooKeeper
:param int controller_num_nodes_override: the number of nodes to use in the cluster, instead of 5, 3, or 1 based on num_nodes, if positive, not using ZooKeeper, and remote_kafka is not None; ignored otherwise :param int controller_num_nodes_override: the number of nodes to use in the cluster, instead of 5, 3, or 1 based on num_nodes, if positive, not using ZooKeeper, and isolated_kafka is not None; ignored otherwise
:param bool allow_zk_with_kraft: if True, then allow a KRaft broker or controller to also use ZooKeeper :param bool allow_zk_with_kraft: if True, then allow a KRaft broker or controller to also use ZooKeeper
:param quorum_info_provider: A function that takes this KafkaService as an argument and returns a ServiceQuorumInfo. If this is None, then the ServiceQuorumInfo is generated from the test context :param quorum_info_provider: A function that takes this KafkaService as an argument and returns a ServiceQuorumInfo. If this is None, then the ServiceQuorumInfo is generated from the test context
""" """
self.zk = zk self.zk = zk
self.remote_kafka = remote_kafka self.isolated_kafka = isolated_kafka
self.allow_zk_with_kraft = allow_zk_with_kraft self.allow_zk_with_kraft = allow_zk_with_kraft
if quorum_info_provider is None: if quorum_info_provider is None:
self.quorum_info = quorum.ServiceQuorumInfo.from_test_context(self, context) self.quorum_info = quorum.ServiceQuorumInfo.from_test_context(self, context)
else: else:
self.quorum_info = quorum_info_provider(self) self.quorum_info = quorum_info_provider(self)
self.controller_quorum = None # will define below if necessary self.controller_quorum = None # will define below if necessary
self.remote_controller_quorum = None # will define below if necessary self.isolated_controller_quorum = None # will define below if necessary
self.configured_for_zk_migration = False self.configured_for_zk_migration = False
if num_nodes < 1: if num_nodes < 1:
@ -287,44 +287,44 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
num_nodes_broker_role = num_nodes num_nodes_broker_role = num_nodes
if self.quorum_info.has_controllers: if self.quorum_info.has_controllers:
self.num_nodes_controller_role = self.num_kraft_controllers(num_nodes_broker_role, controller_num_nodes_override) self.num_nodes_controller_role = self.num_kraft_controllers(num_nodes_broker_role, controller_num_nodes_override)
if self.remote_kafka: if self.isolated_kafka:
raise Exception("Must not specify remote Kafka service with co-located Controller quorum") raise Exception("Must not specify isolated Kafka service with combined Controller quorum")
else: else:
self.num_nodes_controller_role = num_nodes self.num_nodes_controller_role = num_nodes
if not self.remote_kafka: if not self.isolated_kafka:
raise Exception("Must specify remote Kafka service when instantiating remote Controller service (should not happen)") raise Exception("Must specify isolated Kafka service when instantiating isolated Controller service (should not happen)")
# Initially use the inter-broker security protocol for both # Initially use the inter-broker security protocol for both
# broker-to-controller and inter-controller communication. Both can be explicitly changed later if desired. # broker-to-controller and inter-controller communication. Both can be explicitly changed later if desired.
# Note, however, that the two must the same if the controller quorum is co-located with the # Note, however, that the two must the same if the controller quorum is combined with the
# brokers. Different security protocols for the two are only supported with a remote controller quorum. # brokers. Different security protocols for the two are only supported with a isolated controller quorum.
self.controller_security_protocol = interbroker_security_protocol self.controller_security_protocol = interbroker_security_protocol
self.controller_sasl_mechanism = interbroker_sasl_mechanism self.controller_sasl_mechanism = interbroker_sasl_mechanism
self.intercontroller_security_protocol = interbroker_security_protocol self.intercontroller_security_protocol = interbroker_security_protocol
self.intercontroller_sasl_mechanism = interbroker_sasl_mechanism self.intercontroller_sasl_mechanism = interbroker_sasl_mechanism
# Ducktape tears down services in the reverse order in which they are created, # Ducktape tears down services in the reverse order in which they are created,
# so create a service for the remote controller quorum (if we need one) first, before # so create a service for the isolated controller quorum (if we need one) first, before
# invoking Service.__init__(), so that Ducktape will tear down the quorum last; otherwise # invoking Service.__init__(), so that Ducktape will tear down the quorum last; otherwise
# Ducktape will tear down the controller quorum first, which could lead to problems in # Ducktape will tear down the controller quorum first, which could lead to problems in
# Kafka and delays in tearing it down (and who knows what else -- it's simply better # Kafka and delays in tearing it down (and who knows what else -- it's simply better
# to correctly tear down Kafka first, before tearing down the remote controller). # to correctly tear down Kafka first, before tearing down the isolated controller).
if self.quorum_info.has_controllers: if self.quorum_info.has_controllers:
self.controller_quorum = self self.controller_quorum = self
else: else:
num_remote_controller_nodes = self.num_kraft_controllers(num_nodes, controller_num_nodes_override) num_isolated_controller_nodes = self.num_kraft_controllers(num_nodes, controller_num_nodes_override)
self.remote_controller_quorum = KafkaService( self.isolated_controller_quorum = KafkaService(
context, num_remote_controller_nodes, self.zk, security_protocol=self.controller_security_protocol, context, num_isolated_controller_nodes, self.zk, security_protocol=self.controller_security_protocol,
interbroker_security_protocol=self.intercontroller_security_protocol, interbroker_security_protocol=self.intercontroller_security_protocol,
client_sasl_mechanism=self.controller_sasl_mechanism, interbroker_sasl_mechanism=self.intercontroller_sasl_mechanism, client_sasl_mechanism=self.controller_sasl_mechanism, interbroker_sasl_mechanism=self.intercontroller_sasl_mechanism,
authorizer_class_name=authorizer_class_name, version=version, jmx_object_names=jmx_object_names, authorizer_class_name=authorizer_class_name, version=version, jmx_object_names=jmx_object_names,
jmx_attributes=jmx_attributes, jmx_attributes=jmx_attributes,
listener_security_config=listener_security_config, listener_security_config=listener_security_config,
extra_kafka_opts=extra_kafka_opts, tls_version=tls_version, extra_kafka_opts=extra_kafka_opts, tls_version=tls_version,
remote_kafka=self, allow_zk_with_kraft=self.allow_zk_with_kraft, isolated_kafka=self, allow_zk_with_kraft=self.allow_zk_with_kraft,
server_prop_overrides=server_prop_overrides server_prop_overrides=server_prop_overrides
) )
self.controller_quorum = self.remote_controller_quorum self.controller_quorum = self.isolated_controller_quorum
Service.__init__(self, context, num_nodes) Service.__init__(self, context, num_nodes)
JmxMixin.__init__(self, num_nodes=num_nodes, jmx_object_names=jmx_object_names, jmx_attributes=(jmx_attributes or []), JmxMixin.__init__(self, num_nodes=num_nodes, jmx_object_names=jmx_object_names, jmx_attributes=(jmx_attributes or []),
@ -429,7 +429,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
node.config = KafkaConfig(**kraft_broker_plus_zk_configs) node.config = KafkaConfig(**kraft_broker_plus_zk_configs)
else: else:
node.config = KafkaConfig(**kraft_broker_configs) node.config = KafkaConfig(**kraft_broker_configs)
self.colocated_nodes_started = 0 self.combined_nodes_started = 0
self.nodes_to_start = self.nodes self.nodes_to_start = self.nodes
def reconfigure_zk_for_migration(self, kraft_quorum): def reconfigure_zk_for_migration(self, kraft_quorum):
@ -459,16 +459,16 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
self.server_prop_overrides.extend(props) self.server_prop_overrides.extend(props)
del self.port_mappings[kraft_quorum.controller_listener_names] del self.port_mappings[kraft_quorum.controller_listener_names]
# Set the quorum info to remote KRaft # Set the quorum info to isolated KRaft
self.quorum_info = quorum.ServiceQuorumInfo(quorum.remote_kraft, self) self.quorum_info = quorum.ServiceQuorumInfo(quorum.isolated_kraft, self)
self.remote_controller_quorum = kraft_quorum self.isolated_controller_quorum = kraft_quorum
self.controller_quorum = kraft_quorum self.controller_quorum = kraft_quorum
def num_kraft_controllers(self, num_nodes_broker_role, controller_num_nodes_override): def num_kraft_controllers(self, num_nodes_broker_role, controller_num_nodes_override):
if controller_num_nodes_override < 0: if controller_num_nodes_override < 0:
raise Exception("controller_num_nodes_override must not be negative: %i" % controller_num_nodes_override) raise Exception("controller_num_nodes_override must not be negative: %i" % controller_num_nodes_override)
if controller_num_nodes_override > num_nodes_broker_role and self.quorum_info.quorum_type == quorum.colocated_kraft: if controller_num_nodes_override > num_nodes_broker_role and self.quorum_info.quorum_type == quorum.combined_kraft:
raise Exception("controller_num_nodes_override must not exceed the service's node count in the co-located case: %i > %i" % raise Exception("controller_num_nodes_override must not exceed the service's node count in the combined case: %i > %i" %
(controller_num_nodes_override, num_nodes_broker_role)) (controller_num_nodes_override, num_nodes_broker_role))
if controller_num_nodes_override: if controller_num_nodes_override:
return controller_num_nodes_override return controller_num_nodes_override
@ -511,7 +511,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
@property @property
def security_config(self): def security_config(self):
if not self._security_config: if not self._security_config:
# we will later change the security protocols to PLAINTEXT if this is a remote KRaft controller case since # we will later change the security protocols to PLAINTEXT if this is an isolated KRaft controller case since
# those security protocols are irrelevant there and we don't want to falsely indicate the use of SASL or TLS # those security protocols are irrelevant there and we don't want to falsely indicate the use of SASL or TLS
security_protocol_to_use=self.security_protocol security_protocol_to_use=self.security_protocol
interbroker_security_protocol_to_use=self.interbroker_security_protocol interbroker_security_protocol_to_use=self.interbroker_security_protocol
@ -525,7 +525,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
if self.quorum_info.has_controllers: if self.quorum_info.has_controllers:
if self.intercontroller_security_protocol in SecurityConfig.SASL_SECURITY_PROTOCOLS: if self.intercontroller_security_protocol in SecurityConfig.SASL_SECURITY_PROTOCOLS:
serves_intercontroller_sasl_mechanism = self.intercontroller_sasl_mechanism serves_intercontroller_sasl_mechanism = self.intercontroller_sasl_mechanism
uses_controller_sasl_mechanism = self.intercontroller_sasl_mechanism # won't change from above in co-located case uses_controller_sasl_mechanism = self.intercontroller_sasl_mechanism # won't change from above in combined case
if self.controller_security_protocol in SecurityConfig.SASL_SECURITY_PROTOCOLS: if self.controller_security_protocol in SecurityConfig.SASL_SECURITY_PROTOCOLS:
serves_controller_sasl_mechanism = self.controller_sasl_mechanism serves_controller_sasl_mechanism = self.controller_sasl_mechanism
# determine if KRaft uses TLS # determine if KRaft uses TLS
@ -534,10 +534,10 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
# KRaft broker only # KRaft broker only
kraft_tls = self.controller_quorum.controller_security_protocol in SecurityConfig.SSL_SECURITY_PROTOCOLS kraft_tls = self.controller_quorum.controller_security_protocol in SecurityConfig.SSL_SECURITY_PROTOCOLS
if self.quorum_info.has_controllers: if self.quorum_info.has_controllers:
# remote or co-located KRaft controller # isolated or combined KRaft controller
kraft_tls = self.controller_security_protocol in SecurityConfig.SSL_SECURITY_PROTOCOLS \ kraft_tls = self.controller_security_protocol in SecurityConfig.SSL_SECURITY_PROTOCOLS \
or self.intercontroller_security_protocol in SecurityConfig.SSL_SECURITY_PROTOCOLS or self.intercontroller_security_protocol in SecurityConfig.SSL_SECURITY_PROTOCOLS
# clear irrelevant security protocols of SASL/TLS implications for remote controller quorum case # clear irrelevant security protocols of SASL/TLS implications for the isolated controller quorum case
if self.quorum_info.has_controllers and not self.quorum_info.has_brokers: if self.quorum_info.has_controllers and not self.quorum_info.has_brokers:
security_protocol_to_use=SecurityConfig.PLAINTEXT security_protocol_to_use=SecurityConfig.PLAINTEXT
interbroker_security_protocol_to_use=SecurityConfig.PLAINTEXT interbroker_security_protocol_to_use=SecurityConfig.PLAINTEXT
@ -556,7 +556,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
self._security_config.properties['security.protocol'] = self.security_protocol self._security_config.properties['security.protocol'] = self.security_protocol
self._security_config.properties['sasl.mechanism'] = self.client_sasl_mechanism self._security_config.properties['sasl.mechanism'] = self.client_sasl_mechanism
# Ensure we have the right inter-broker security protocol because it may have been mutated # Ensure we have the right inter-broker security protocol because it may have been mutated
# since we cached our security config (ignore if this is a remote KRaft controller quorum case; the # since we cached our security config (ignore if this is an isolated KRaft controller quorum case; the
# inter-broker security protocol is not used there). # inter-broker security protocol is not used there).
if (self.quorum_info.using_zk or self.quorum_info.has_brokers): if (self.quorum_info.using_zk or self.quorum_info.has_brokers):
# in case inter-broker SASL mechanism has changed without changing the inter-broker security protocol # in case inter-broker SASL mechanism has changed without changing the inter-broker security protocol
@ -587,7 +587,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
has_sasl = self.security_config.has_sasl has_sasl = self.security_config.has_sasl
if has_sasl: if has_sasl:
if self.minikdc is None: if self.minikdc is None:
other_service = self.remote_kafka if self.remote_kafka else self.controller_quorum if self.quorum_info.using_kraft else None other_service = self.isolated_kafka if self.isolated_kafka else self.controller_quorum if self.quorum_info.using_kraft else None
if not other_service or not other_service.minikdc: if not other_service or not other_service.minikdc:
nodes_for_kdc = self.nodes.copy() nodes_for_kdc = self.nodes.copy()
if other_service and other_service != self: if other_service and other_service != self:
@ -598,8 +598,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
self.minikdc = None self.minikdc = None
if self.quorum_info.using_kraft: if self.quorum_info.using_kraft:
self.controller_quorum.minikdc = None self.controller_quorum.minikdc = None
if self.remote_kafka: if self.isolated_kafka:
self.remote_kafka.minikdc = None self.isolated_kafka.minikdc = None
def alive(self, node): def alive(self, node):
return len(self.pids(node)) > 0 return len(self.pids(node)) > 0
@ -621,7 +621,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
# This is not supported because both the broker and the controller take the first entry from # This is not supported because both the broker and the controller take the first entry from
# controller.listener.names and the value from sasl.mechanism.controller.protocol; # controller.listener.names and the value from sasl.mechanism.controller.protocol;
# they share a single config, so they must both see/use identical values. # they share a single config, so they must both see/use identical values.
raise Exception("Co-located KRaft Brokers (%s/%s) and Controllers (%s/%s) cannot talk to Controllers via different security protocols" % raise Exception("Combined KRaft Brokers (%s/%s) and Controllers (%s/%s) cannot talk to Controllers via different security protocols" %
(self.controller_security_protocol, self.controller_sasl_mechanism, (self.controller_security_protocol, self.controller_sasl_mechanism,
self.intercontroller_security_protocol, self.intercontroller_sasl_mechanism)) self.intercontroller_security_protocol, self.intercontroller_sasl_mechanism))
if self.quorum_info.using_zk or self.quorum_info.has_brokers: if self.quorum_info.using_zk or self.quorum_info.has_brokers:
@ -629,7 +629,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
self.interbroker_listener.open = True self.interbroker_listener.open = True
# we have to wait to decide whether to open the controller port(s) # we have to wait to decide whether to open the controller port(s)
# because it could be dependent on the particular node in the # because it could be dependent on the particular node in the
# co-located case where the number of controllers could be less # combined case where the number of controllers could be less
# than the number of nodes in the service # than the number of nodes in the service
self.start_minikdc_if_necessary(add_principals) self.start_minikdc_if_necessary(add_principals)
@ -641,8 +641,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
if self.quorum_info.using_zk: if self.quorum_info.using_zk:
self._ensure_zk_chroot() self._ensure_zk_chroot()
if self.remote_controller_quorum: if self.isolated_controller_quorum:
self.remote_controller_quorum.start() self.isolated_controller_quorum.start()
Service.start(self, **kwargs) Service.start(self, **kwargs)
@ -729,7 +729,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
if not port.name in controller_listener_names: if not port.name in controller_listener_names:
advertised_listeners.append(port.advertised_listener(node)) advertised_listeners.append(port.advertised_listener(node))
protocol_map.append(port.listener_security_protocol()) protocol_map.append(port.listener_security_protocol())
controller_sec_protocol = self.remote_controller_quorum.controller_security_protocol if self.remote_controller_quorum \ controller_sec_protocol = self.isolated_controller_quorum.controller_security_protocol if self.isolated_controller_quorum \
else self.controller_security_protocol if self.quorum_info.has_brokers_and_controllers and not quorum.NodeQuorumInfo(self.quorum_info, node).has_controller_role \ else self.controller_security_protocol if self.quorum_info.has_brokers_and_controllers and not quorum.NodeQuorumInfo(self.quorum_info, node).has_controller_role \
else None else None
if controller_sec_protocol: if controller_sec_protocol:
@ -826,7 +826,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
for controller_listener in self.controller_listener_name_list(node): for controller_listener in self.controller_listener_name_list(node):
if self.node_quorum_info.has_controller_role: if self.node_quorum_info.has_controller_role:
self.open_port(controller_listener) self.open_port(controller_listener)
else: # co-located case where node doesn't have a controller else: # combined case where node doesn't have a controller
self.close_port(controller_listener) self.close_port(controller_listener)
self.security_config.setup_node(node) self.security_config.setup_node(node)
@ -845,9 +845,9 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
for node in self.controller_quorum.nodes[:self.controller_quorum.num_nodes_controller_role]]) for node in self.controller_quorum.nodes[:self.controller_quorum.num_nodes_controller_role]])
# define controller.listener.names # define controller.listener.names
self.controller_listener_names = ','.join(self.controller_listener_name_list(node)) self.controller_listener_names = ','.join(self.controller_listener_name_list(node))
# define sasl.mechanism.controller.protocol to match remote quorum if one exists # define sasl.mechanism.controller.protocol to match the isolated quorum if one exists
if self.remote_controller_quorum: if self.isolated_controller_quorum:
self.controller_sasl_mechanism = self.remote_controller_quorum.controller_sasl_mechanism self.controller_sasl_mechanism = self.isolated_controller_quorum.controller_sasl_mechanism
prop_file = self.prop_file(node) prop_file = self.prop_file(node)
self.logger.info("kafka.properties:") self.logger.info("kafka.properties:")
@ -866,7 +866,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
self.logger.debug("Attempting to start KafkaService %s on %s with command: %s" %\ self.logger.debug("Attempting to start KafkaService %s on %s with command: %s" %\
("concurrently" if self.concurrent_start else "serially", str(node.account), cmd)) ("concurrently" if self.concurrent_start else "serially", str(node.account), cmd))
if self.node_quorum_info.has_controller_role and self.node_quorum_info.has_broker_role: if self.node_quorum_info.has_controller_role and self.node_quorum_info.has_broker_role:
self.colocated_nodes_started += 1 self.combined_nodes_started += 1
if self.concurrent_start: if self.concurrent_start:
node.account.ssh(cmd) # and then don't wait for the startup message node.account.ssh(cmd) # and then don't wait for the startup message
else: else:
@ -937,31 +937,31 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
def stop_node(self, node, clean_shutdown=True, timeout_sec=60): def stop_node(self, node, clean_shutdown=True, timeout_sec=60):
pids = self.pids(node) pids = self.pids(node)
cluster_has_colocated_controllers = self.quorum_info.has_brokers and self.quorum_info.has_controllers cluster_has_combined_controllers = self.quorum_info.has_brokers and self.quorum_info.has_controllers
force_sigkill_due_to_too_few_colocated_controllers =\ force_sigkill_due_to_too_few_combined_controllers =\
clean_shutdown and cluster_has_colocated_controllers\ clean_shutdown and cluster_has_combined_controllers\
and self.colocated_nodes_started < self.controllers_required_for_quorum() and self.combined_nodes_started < self.controllers_required_for_quorum()
if force_sigkill_due_to_too_few_colocated_controllers: if force_sigkill_due_to_too_few_combined_controllers:
self.logger.info("Forcing node to stop via SIGKILL due to too few co-located KRaft controllers: %i/%i" %\ self.logger.info("Forcing node to stop via SIGKILL due to too few combined KRaft controllers: %i/%i" %\
(self.colocated_nodes_started, self.num_nodes_controller_role)) (self.combined_nodes_started, self.num_nodes_controller_role))
sig = signal.SIGTERM if clean_shutdown and not force_sigkill_due_to_too_few_colocated_controllers else signal.SIGKILL sig = signal.SIGTERM if clean_shutdown and not force_sigkill_due_to_too_few_combined_controllers else signal.SIGKILL
for pid in pids: for pid in pids:
node.account.signal(pid, sig, allow_fail=False) node.account.signal(pid, sig, allow_fail=False)
node_quorum_info = quorum.NodeQuorumInfo(self.quorum_info, node) node_quorum_info = quorum.NodeQuorumInfo(self.quorum_info, node)
node_has_colocated_controllers = node_quorum_info.has_controller_role and node_quorum_info.has_broker_role node_has_combined_controllers = node_quorum_info.has_controller_role and node_quorum_info.has_broker_role
if pids and node_has_colocated_controllers: if pids and node_has_combined_controllers:
self.colocated_nodes_started -= 1 self.combined_nodes_started -= 1
try: try:
wait_until(lambda: len(self.pids(node)) == 0, timeout_sec=timeout_sec, wait_until(lambda: len(self.pids(node)) == 0, timeout_sec=timeout_sec,
err_msg="Kafka node failed to stop in %d seconds" % timeout_sec) err_msg="Kafka node failed to stop in %d seconds" % timeout_sec)
except Exception: except Exception:
if node_has_colocated_controllers: if node_has_combined_controllers:
# it didn't stop # it didn't stop
self.colocated_nodes_started += 1 self.combined_nodes_started += 1
self.thread_dump(node) self.thread_dump(node)
raise raise
@ -1501,9 +1501,9 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
return missing return missing
def restart_cluster(self, clean_shutdown=True, timeout_sec=60, after_each_broker_restart=None, *args): def restart_cluster(self, clean_shutdown=True, timeout_sec=60, after_each_broker_restart=None, *args):
# We do not restart the remote controller quorum if it exists. # We do not restart the isolated controller quorum if it exists.
# This is not widely used -- it typically appears in rolling upgrade tests -- # This is not widely used -- it typically appears in rolling upgrade tests --
# so we will let tests explicitly decide if/when to restart any remote controller quorum. # so we will let tests explicitly decide if/when to restart any isolated controller quorum.
for node in self.nodes: for node in self.nodes:
self.restart_node(node, clean_shutdown=clean_shutdown, timeout_sec=timeout_sec) self.restart_node(node, clean_shutdown=clean_shutdown, timeout_sec=timeout_sec)
if after_each_broker_restart is not None: if after_each_broker_restart is not None:

View File

@ -15,20 +15,20 @@
# the types of metadata quorums we support # the types of metadata quorums we support
zk = 'ZK' # ZooKeeper, used before/during the KIP-500 bridge release(s) zk = 'ZK' # ZooKeeper, used before/during the KIP-500 bridge release(s)
colocated_kraft = 'COLOCATED_KRAFT' # co-located Controllers in KRaft mode, used during/after the KIP-500 bridge release(s) combined_kraft = 'COMBINED_KRAFT' # combined Controllers in KRaft mode, used during/after the KIP-500 bridge release(s)
remote_kraft = 'REMOTE_KRAFT' # separate Controllers in KRaft mode, used during/after the KIP-500 bridge release(s) isolated_kraft = 'ISOLATED_KRAFT' # isolated Controllers in KRaft mode, used during/after the KIP-500 bridge release(s)
# How we will parameterize tests that exercise all quorum styles # How we will parameterize tests that exercise all quorum styles
# [“ZK”, “REMOTE_KRAFT”, "COLOCATED_KRAFT"] during the KIP-500 bridge release(s) # [“ZK”, “ISOLATED_KRAFT”, "COMBINED_KRAFT"] during the KIP-500 bridge release(s)
# [“REMOTE_KRAFT”, "COLOCATED_KRAFT”] after the KIP-500 bridge release(s) # [“ISOLATED_KRAFT”, "COMBINED_KRAFT”] after the KIP-500 bridge release(s)
all = [zk, remote_kraft, colocated_kraft] all = [zk, isolated_kraft, combined_kraft]
# How we will parameterize tests that exercise all KRaft quorum styles # How we will parameterize tests that exercise all KRaft quorum styles
all_kraft = [remote_kraft, colocated_kraft] all_kraft = [isolated_kraft, combined_kraft]
# How we will parameterize tests that are unrelated to upgrades: # How we will parameterize tests that are unrelated to upgrades:
# [“ZK”] before the KIP-500 bridge release(s) # [“ZK”] before the KIP-500 bridge release(s)
# [“ZK”, “REMOTE_KRAFT”] during the KIP-500 bridge release(s) and in preview releases # [“ZK”, “ISOLATED_KRAFT”] during the KIP-500 bridge release(s) and in preview releases
# [“REMOTE_KRAFT”] after the KIP-500 bridge release(s) # [“ISOLATED_KRAFT”] after the KIP-500 bridge release(s)
all_non_upgrade = [zk, remote_kraft] all_non_upgrade = [zk, isolated_kraft]
def for_test(test_context): def for_test(test_context):
# A test uses ZooKeeper if it doesn't specify a metadata quorum or if it explicitly specifies ZooKeeper # A test uses ZooKeeper if it doesn't specify a metadata quorum or if it explicitly specifies ZooKeeper
@ -44,13 +44,13 @@ class ServiceQuorumInfo:
Exposes quorum-related information for a KafkaService Exposes quorum-related information for a KafkaService
Kafka can use either ZooKeeper or a KRaft (Kafka Raft) Controller quorum for Kafka can use either ZooKeeper or a KRaft (Kafka Raft) Controller quorum for
its metadata. KRaft Controllers can either be co-located with Kafka in its metadata. KRaft Controllers can either be combined with Kafka in
the same JVM or remote in separate JVMs. The choice is made via the same JVM or isolated in separate JVMs. The choice is made via
the 'metadata_quorum' parameter defined for the system test: if it the 'metadata_quorum' parameter defined for the system test: if it
is not explicitly defined, or if it is set to 'ZK', then ZooKeeper is not explicitly defined, or if it is set to 'ZK', then ZooKeeper
is used. If it is explicitly set to 'COLOCATED_KRAFT' then KRaft is used. If it is explicitly set to 'COMBINED_KRAFT' then KRaft
controllers will be co-located with the brokers; the value controllers will be combined with the brokers; the value
`REMOTE_KRAFT` indicates remote controllers. `ISOLATED_KRAFT` indicates isolated controllers.
Attributes Attributes
---------- ----------
@ -59,7 +59,7 @@ class ServiceQuorumInfo:
The service for which this instance exposes quorum-related The service for which this instance exposes quorum-related
information information
quorum_type : str quorum_type : str
COLOCATED_KRAFT, REMOTE_KRAFT, or ZK COMBINED_KRAFT, ISOLATED_KRAFT, or ZK
using_zk : bool using_zk : bool
True iff quorum_type==ZK True iff quorum_type==ZK
using_kraft : bool using_kraft : bool
@ -67,22 +67,22 @@ class ServiceQuorumInfo:
has_brokers : bool has_brokers : bool
Whether there is at least one node with process.roles Whether there is at least one node with process.roles
containing 'broker'. True iff using_kraft and the Kafka containing 'broker'. True iff using_kraft and the Kafka
service doesn't itself have a remote Kafka service (meaning service doesn't itself have an isolated Kafka service (meaning
it is not a remote controller quorum). it is not an isolated controller quorum).
has_controllers : bool has_controllers : bool
Whether there is at least one node with process.roles Whether there is at least one node with process.roles
containing 'controller'. True iff quorum_type == containing 'controller'. True iff quorum_type ==
COLOCATED_KRAFT or the Kafka service itself has a remote Kafka COMBINED_KRAFT or the Kafka service itself has an isolated Kafka
service (meaning it is a remote controller quorum). service (meaning it is an isolated controller quorum).
has_brokers_and_controllers : has_brokers_and_controllers :
True iff quorum_type==COLOCATED_KRAFT True iff quorum_type==COMBINED_KRAFT
""" """
def __init__(self, quorum_type, kafka): def __init__(self, quorum_type, kafka):
""" """
:param quorum_type : str :param quorum_type : str
The type of quorum being used. Either "ZK", "COLOCATED_KRAFT", or "REMOTE_KRAFT" The type of quorum being used. Either "ZK", "COMBINED_KRAFT", or "ISOLATED_KRAFT"
:param context : TestContext :param context : TestContext
The test context within which the this instance and the The test context within which the this instance and the
given Kafka service is being instantiated given Kafka service is being instantiated
@ -90,16 +90,16 @@ class ServiceQuorumInfo:
if quorum_type != zk and kafka.zk and not kafka.allow_zk_with_kraft: if quorum_type != zk and kafka.zk and not kafka.allow_zk_with_kraft:
raise Exception("Cannot use ZooKeeper while specifying a KRaft metadata quorum unless explicitly allowing it") raise Exception("Cannot use ZooKeeper while specifying a KRaft metadata quorum unless explicitly allowing it")
if kafka.remote_kafka and quorum_type != remote_kraft: if kafka.isolated_kafka and quorum_type != isolated_kraft:
raise Exception("Cannot specify a remote Kafka service unless using a remote KRaft metadata quorum (should not happen)") raise Exception("Cannot specify an isolated Kafka service unless using an isolated KRaft metadata quorum (should not happen)")
self.kafka = kafka self.kafka = kafka
self.quorum_type = quorum_type self.quorum_type = quorum_type
self.using_zk = quorum_type == zk self.using_zk = quorum_type == zk
self.using_kraft = not self.using_zk self.using_kraft = not self.using_zk
self.has_brokers = self.using_kraft and not kafka.remote_kafka self.has_brokers = self.using_kraft and not kafka.isolated_kafka
self.has_controllers = quorum_type == colocated_kraft or kafka.remote_kafka self.has_controllers = quorum_type == combined_kraft or kafka.isolated_kafka
self.has_brokers_and_controllers = quorum_type == colocated_kraft self.has_brokers_and_controllers = quorum_type == combined_kraft
@staticmethod @staticmethod
def from_test_context(kafka, context): def from_test_context(kafka, context):
@ -127,12 +127,12 @@ class NodeQuorumInfo:
belongs belongs
has_broker_role : bool has_broker_role : bool
True iff using_kraft and the Kafka service doesn't itself have True iff using_kraft and the Kafka service doesn't itself have
a remote Kafka service (meaning it is not a remote controller) an isolated Kafka service (meaning it is not an isolated controller)
has_controller_role : bool has_controller_role : bool
True iff quorum_type==COLOCATED_KRAFT and the node is one of True iff quorum_type==COMBINED_KRAFT and the node is one of
the first N in the cluster where N is the number of nodes the first N in the cluster where N is the number of nodes
that have a controller role; or the Kafka service itself has a that have a controller role; or the Kafka service itself has an
remote Kafka service (meaning it is a remote controller isolated Kafka service (meaning it is an isolated controller
quorum). quorum).
has_combined_broker_and_controller_roles : has_combined_broker_and_controller_roles :
True iff has_broker_role==True and has_controller_role==true True iff has_broker_role==True and has_controller_role==true
@ -145,7 +145,7 @@ class NodeQuorumInfo:
belongs belongs
:param node : Node :param node : Node
The particular node for which this information applies. The particular node for which this information applies.
In the co-located case, whether or not a node's broker's In the combined case, whether or not a node's broker's
process.roles contains 'controller' may vary based on the process.roles contains 'controller' may vary based on the
particular node if the number of controller nodes is less particular node if the number of controller nodes is less
than the number of nodes in the service. than the number of nodes in the service.

View File

@ -18,7 +18,7 @@ from ducktape.mark.resource import cluster
from ducktape.utils.util import wait_until from ducktape.utils.util import wait_until
from kafkatest.services.console_consumer import ConsoleConsumer from kafkatest.services.console_consumer import ConsoleConsumer
from kafkatest.services.kafka import KafkaService from kafkatest.services.kafka import KafkaService
from kafkatest.services.kafka.quorum import remote_kraft, colocated_kraft from kafkatest.services.kafka.quorum import isolated_kraft, combined_kraft
from kafkatest.services.verifiable_producer import VerifiableProducer from kafkatest.services.verifiable_producer import VerifiableProducer
from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
from kafkatest.utils import is_int from kafkatest.utils import is_int
@ -109,14 +109,14 @@ class TestKRaftUpgrade(ProduceConsumeValidateTest):
assert self.kafka.check_protocol_errors(self) assert self.kafka.check_protocol_errors(self)
@cluster(num_nodes=5) @cluster(num_nodes=5)
@parametrize(from_kafka_version=str(LATEST_3_1), metadata_quorum=colocated_kraft) @parametrize(from_kafka_version=str(LATEST_3_1), metadata_quorum=combined_kraft)
@parametrize(from_kafka_version=str(LATEST_3_2), metadata_quorum=colocated_kraft) @parametrize(from_kafka_version=str(LATEST_3_2), metadata_quorum=combined_kraft)
def test_colocated_upgrade(self, from_kafka_version, metadata_quorum): def test_combined_mode_upgrade(self, from_kafka_version, metadata_quorum):
self.run_upgrade(from_kafka_version) self.run_upgrade(from_kafka_version)
@cluster(num_nodes=8) @cluster(num_nodes=8)
@parametrize(from_kafka_version=str(LATEST_3_1), metadata_quorum=remote_kraft) @parametrize(from_kafka_version=str(LATEST_3_1), metadata_quorum=isolated_kraft)
@parametrize(from_kafka_version=str(LATEST_3_2), metadata_quorum=remote_kraft) @parametrize(from_kafka_version=str(LATEST_3_2), metadata_quorum=isolated_kraft)
def test_non_colocated_upgrade(self, from_kafka_version, metadata_quorum): def test_isolated_mode_upgrade(self, from_kafka_version, metadata_quorum):
self.run_upgrade(from_kafka_version) self.run_upgrade(from_kafka_version)

View File

@ -71,8 +71,8 @@ class TestSnapshots(ProduceConsumeValidateTest):
topic_count = 10 topic_count = 10
self.topics_created += self.create_n_topics(topic_count) self.topics_created += self.create_n_topics(topic_count)
if self.kafka.remote_controller_quorum: if self.kafka.isolated_controller_quorum:
self.controller_nodes = self.kafka.remote_controller_quorum.nodes self.controller_nodes = self.kafka.isolated_controller_quorum.nodes
else: else:
self.controller_nodes = self.kafka.nodes[:self.kafka.num_nodes_controller_role] self.controller_nodes = self.kafka.nodes[:self.kafka.num_nodes_controller_role]
@ -145,7 +145,7 @@ class TestSnapshots(ProduceConsumeValidateTest):
@cluster(num_nodes=9) @cluster(num_nodes=9)
@matrix(metadata_quorum=quorum.all_kraft) @matrix(metadata_quorum=quorum.all_kraft)
def test_broker(self, metadata_quorum=quorum.colocated_kraft): def test_broker(self, metadata_quorum=quorum.combined_kraft):
""" Test the ability of a broker to consume metadata snapshots """ Test the ability of a broker to consume metadata snapshots
and to recover the cluster metadata state using them and to recover the cluster metadata state using them
@ -205,7 +205,7 @@ class TestSnapshots(ProduceConsumeValidateTest):
@cluster(num_nodes=9) @cluster(num_nodes=9)
@matrix(metadata_quorum=quorum.all_kraft) @matrix(metadata_quorum=quorum.all_kraft)
def test_controller(self, metadata_quorum=quorum.colocated_kraft): def test_controller(self, metadata_quorum=quorum.combined_kraft):
""" Test the ability of controllers to consume metadata snapshots """ Test the ability of controllers to consume metadata snapshots
and to recover the cluster metadata state using them and to recover the cluster metadata state using them