KAFKA-14835: Create ControllerMetadataMetricsPublisher (#13438)

Separate out KRaft controller metrics into two groups: metrics directly managed by the
QuorumController, and metrics handled by an external publisher. This separation of concerns makes
the code easier to reason about, by clarifying what metrics can be changed where.

The external publisher, ControllerServerMetricsPublisher, handles all metrics which are related to
the content of metadata. For example, metrics about number of topics or number of partitions, etc.
etc. It fits into the MetadataLoader metadata publishing framework as another publisher.  Since
ControllerServerMetricsPublisher operates off of a MetadataImage, we don't have to create
(essentially) another copy of the metadata in memory, as ControllerMetricsManager. This reduces
memory consumption. Another benefit of operating off of the MetadataImage is that we don't have to
have special handling for each record type, like we do now in ControllerMetricsManager.

Reviewers: David Arthur <mumrah@gmail.com>
This commit is contained in:
Colin Patrick McCabe 2023-03-24 11:26:53 -07:00 committed by GitHub
parent 797c28cb7c
commit ed400e4c0d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 1492 additions and 1474 deletions

View File

@ -225,6 +225,7 @@
<allow pkg="org.apache.kafka.common.requests" />
<allow pkg="org.apache.kafka.common.resource" />
<allow pkg="org.apache.kafka.controller" />
<allow pkg="org.apache.kafka.image" />
<allow pkg="org.apache.kafka.image.writer" />
<allow pkg="org.apache.kafka.metadata" />
<allow pkg="org.apache.kafka.metadata.authorizer" />

View File

@ -36,6 +36,7 @@ import org.apache.kafka.common.security.scram.internals.ScramMechanism
import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache
import org.apache.kafka.common.utils.LogContext
import org.apache.kafka.common.{ClusterResource, Endpoint}
import org.apache.kafka.controller.metrics.{ControllerMetadataMetricsPublisher, QuorumControllerMetrics}
import org.apache.kafka.controller.{Controller, QuorumController, QuorumFeatures}
import org.apache.kafka.image.publisher.MetadataPublisher
import org.apache.kafka.metadata.KafkaConfigSchema
@ -49,7 +50,8 @@ import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics}
import org.apache.kafka.server.policy.{AlterConfigPolicy, CreateTopicPolicy}
import org.apache.kafka.server.util.{Deadline, FutureUtils}
import java.util.OptionalLong
import java.util
import java.util.{Optional, OptionalLong}
import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.{CompletableFuture, TimeUnit}
import scala.compat.java8.OptionConverters._
@ -104,6 +106,7 @@ class ControllerServer(
val socketServerFirstBoundPortFuture = new CompletableFuture[Integer]()
var createTopicPolicy: Option[CreateTopicPolicy] = None
var alterConfigPolicy: Option[AlterConfigPolicy] = None
@volatile var quorumControllerMetrics: QuorumControllerMetrics = _
var controller: Controller = _
var quotaManagers: QuotaManagers = _
var clientQuotaMetadataManager: ClientQuotaMetadataManager = _
@ -111,6 +114,7 @@ class ControllerServer(
var controllerApisHandlerPool: KafkaRequestHandlerPool = _
var migrationSupport: Option[ControllerMigrationSupport] = None
def kafkaYammerMetrics: KafkaYammerMetrics = KafkaYammerMetrics.INSTANCE
val metadataPublishers: util.List[MetadataPublisher] = new util.ArrayList[MetadataPublisher]()
private def maybeChangeStatus(from: ProcessStatus, to: ProcessStatus): Boolean = {
lock.lock()
@ -215,6 +219,8 @@ class ControllerServer(
val maxIdleIntervalNs = config.metadataMaxIdleIntervalNs.fold(OptionalLong.empty)(OptionalLong.of)
quorumControllerMetrics = new QuorumControllerMetrics(Optional.of(KafkaYammerMetrics.defaultRegistry), time)
new QuorumController.Builder(config.nodeId, sharedServer.metaProps.clusterId).
setTime(time).
setThreadNamePrefix(s"quorum-controller-${config.nodeId}-").
@ -227,7 +233,7 @@ class ControllerServer(
TimeUnit.MILLISECONDS)).
setLeaderImbalanceCheckIntervalNs(leaderImbalanceCheckIntervalNs).
setMaxIdleIntervalNs(maxIdleIntervalNs).
setMetrics(sharedServer.controllerMetrics).
setMetrics(quorumControllerMetrics).
setCreateTopicPolicy(createTopicPolicy.asJava).
setAlterConfigPolicy(alterConfigPolicy.asJava).
setConfigurationValidator(new ControllerConfigurationValidator()).
@ -317,11 +323,9 @@ class ControllerServer(
// register this instance for dynamic config changes to the KafkaConfig
config.dynamicConfig.addReconfigurables(this)
val publishers = new java.util.ArrayList[MetadataPublisher]()
// Set up the dynamic config publisher. This runs even in combined mode, since the broker
// has its own separate dynamic configuration object.
publishers.add(new DynamicConfigPublisher(
metadataPublishers.add(new DynamicConfigPublisher(
config,
sharedServer.metadataPublishingFaultHandler,
immutable.Map[String, ConfigHandler](
@ -332,24 +336,30 @@ class ControllerServer(
// Set up the client quotas publisher. This will enable controller mutation quotas and any
// other quotas which are applicable.
publishers.add(new DynamicClientQuotaPublisher(
metadataPublishers.add(new DynamicClientQuotaPublisher(
config,
sharedServer.metadataPublishingFaultHandler,
"controller",
clientQuotaMetadataManager))
// Set up the SCRAM publisher.
publishers.add(new ScramPublisher(
metadataPublishers.add(new ScramPublisher(
config,
sharedServer.metadataPublishingFaultHandler,
"controller",
credentialProvider
))
// Set up the metrics publisher.
metadataPublishers.add(new ControllerMetadataMetricsPublisher(
sharedServer.controllerServerMetrics,
sharedServer.metadataPublishingFaultHandler
))
// Install all metadata publishers.
FutureUtils.waitWithLogging(logger.underlying, logIdent,
"the controller metadata publishers to be installed",
sharedServer.loader.installPublishers(publishers), startupDeadline, time)
sharedServer.loader.installPublishers(metadataPublishers), startupDeadline, time)
} catch {
case e: Throwable =>
maybeChangeStatus(STARTING, STARTED)
@ -366,6 +376,8 @@ class ControllerServer(
// Ensure that we're not the Raft leader prior to shutting down our socket server, for a
// smoother transition.
sharedServer.ensureNotRaftLeader()
metadataPublishers.forEach(p => sharedServer.loader.removeAndClosePublisher(p).get())
metadataPublishers.clear()
if (socketServer != null)
CoreUtils.swallow(socketServer.stopProcessingRequests(), this)
migrationSupport.foreach(_.shutdown(this))
@ -381,6 +393,8 @@ class ControllerServer(
CoreUtils.swallow(quotaManagers.shutdown(), this)
if (controller != null)
controller.close()
if (quorumControllerMetrics != null)
CoreUtils.swallow(quorumControllerMetrics.close(), this)
CoreUtils.swallow(authorizer.foreach(_.close()), this)
createTopicPolicy.foreach(policy => CoreUtils.swallow(policy.close(), this))
alterConfigPolicy.foreach(policy => CoreUtils.swallow(policy.close(), this))

View File

@ -24,7 +24,7 @@ import kafka.server.metadata.BrokerServerMetrics
import kafka.utils.{CoreUtils, Logging}
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.utils.{AppInfoParser, LogContext, Time}
import org.apache.kafka.controller.QuorumControllerMetrics
import org.apache.kafka.controller.metrics.ControllerMetadataMetrics
import org.apache.kafka.image.loader.MetadataLoader
import org.apache.kafka.image.publisher.{SnapshotEmitter, SnapshotGenerator}
import org.apache.kafka.metadata.MetadataRecordSerde
@ -34,7 +34,7 @@ import org.apache.kafka.server.fault.{FaultHandler, LoggingFaultHandler, Process
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import java.util
import java.util.Collections
import java.util.{Collections, Optional}
import java.util.concurrent.{CompletableFuture, TimeUnit}
import java.util.concurrent.atomic.AtomicReference
@ -101,7 +101,7 @@ class SharedServer(
@volatile var metrics: Metrics = _metrics
@volatile var raftManager: KafkaRaftManager[ApiMessageAndVersion] = _
@volatile var brokerMetrics: BrokerServerMetrics = _
@volatile var controllerMetrics: QuorumControllerMetrics = _
@volatile var controllerServerMetrics: ControllerMetadataMetrics = _
@volatile var loader: MetadataLoader = _
val snapshotsDiabledReason = new AtomicReference[String](null)
@volatile var snapshotEmitter: SnapshotEmitter = _
@ -165,7 +165,7 @@ class SharedServer(
fatal = sharedServerConfig.processRoles.contains(ControllerRole),
action = () => SharedServer.this.synchronized {
if (brokerMetrics != null) brokerMetrics.metadataLoadErrorCount.getAndIncrement()
if (controllerMetrics != null) controllerMetrics.incrementMetadataErrorCount()
if (controllerServerMetrics != null) controllerServerMetrics.incrementMetadataErrorCount()
snapshotsDiabledReason.compareAndSet(null, "metadata loading fault")
})
@ -176,7 +176,7 @@ class SharedServer(
name = "controller startup",
fatal = true,
action = () => SharedServer.this.synchronized {
if (controllerMetrics != null) controllerMetrics.incrementMetadataErrorCount()
if (controllerServerMetrics != null) controllerServerMetrics.incrementMetadataErrorCount()
snapshotsDiabledReason.compareAndSet(null, "controller startup fault")
})
@ -188,7 +188,7 @@ class SharedServer(
fatal = true,
action = () => SharedServer.this.synchronized {
if (brokerMetrics != null) brokerMetrics.metadataApplyErrorCount.getAndIncrement()
if (controllerMetrics != null) controllerMetrics.incrementMetadataErrorCount()
if (controllerServerMetrics != null) controllerServerMetrics.incrementMetadataErrorCount()
snapshotsDiabledReason.compareAndSet(null, "initial broker metadata loading fault")
})
@ -199,7 +199,7 @@ class SharedServer(
name = "quorum controller",
fatal = true,
action = () => SharedServer.this.synchronized {
if (controllerMetrics != null) controllerMetrics.incrementMetadataErrorCount()
if (controllerServerMetrics != null) controllerServerMetrics.incrementMetadataErrorCount()
snapshotsDiabledReason.compareAndSet(null, "quorum controller fault")
})
@ -211,7 +211,7 @@ class SharedServer(
fatal = false,
action = () => SharedServer.this.synchronized {
if (brokerMetrics != null) brokerMetrics.metadataApplyErrorCount.getAndIncrement()
if (controllerMetrics != null) controllerMetrics.incrementMetadataErrorCount()
if (controllerServerMetrics != null) controllerServerMetrics.incrementMetadataErrorCount()
// Note: snapshot generation does not need to be disabled for a publishing fault.
})
@ -232,7 +232,7 @@ class SharedServer(
brokerMetrics = BrokerServerMetrics(metrics)
}
if (sharedServerConfig.processRoles.contains(ControllerRole)) {
controllerMetrics = new QuorumControllerMetrics(KafkaYammerMetrics.defaultRegistry(), time)
controllerServerMetrics = new ControllerMetadataMetrics(Optional.of(KafkaYammerMetrics.defaultRegistry()))
}
raftManager = new KafkaRaftManager[ApiMessageAndVersion](
metaProps,
@ -323,9 +323,9 @@ class SharedServer(
CoreUtils.swallow(raftManager.shutdown(), this)
raftManager = null
}
if (controllerMetrics != null) {
CoreUtils.swallow(controllerMetrics.close(), this)
controllerMetrics = null
if (controllerServerMetrics != null) {
CoreUtils.swallow(controllerServerMetrics.close(), this)
controllerServerMetrics = null
}
if (brokerMetrics != null) {
CoreUtils.swallow(brokerMetrics.close(), this)

View File

@ -1,71 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.controller;
public interface ControllerMetrics extends AutoCloseable {
void setActive(boolean active);
boolean active();
void updateEventQueueTime(long durationMs);
void updateEventQueueProcessingTime(long durationMs);
void setFencedBrokerCount(int brokerCount);
int fencedBrokerCount();
void setActiveBrokerCount(int brokerCount);
int activeBrokerCount();
void setGlobalTopicCount(int topicCount);
int globalTopicCount();
void setGlobalPartitionCount(int partitionCount);
int globalPartitionCount();
void setOfflinePartitionCount(int offlinePartitions);
int offlinePartitionCount();
void setPreferredReplicaImbalanceCount(int replicaImbalances);
int preferredReplicaImbalanceCount();
void incrementMetadataErrorCount();
int metadataErrorCount();
void setLastAppliedRecordOffset(long offset);
long lastAppliedRecordOffset();
void setLastCommittedRecordOffset(long offset);
long lastCommittedRecordOffset();
void setLastAppliedRecordTimestamp(long timestamp);
long lastAppliedRecordTimestamp();
void close();
}

View File

@ -1,321 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.controller;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Predicate;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
import org.apache.kafka.common.metadata.FenceBrokerRecord;
import org.apache.kafka.common.metadata.MetadataRecordType;
import org.apache.kafka.common.metadata.PartitionChangeRecord;
import org.apache.kafka.common.metadata.PartitionRecord;
import org.apache.kafka.common.metadata.RegisterBrokerRecord;
import org.apache.kafka.common.metadata.RemoveTopicRecord;
import org.apache.kafka.common.metadata.TopicRecord;
import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.metadata.BrokerRegistrationFencingChange;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER;
import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER_CHANGE;
/**
* Type for updating controller metrics based on metadata records.
*/
final class ControllerMetricsManager {
private final static class PartitionState {
final int leader;
final int preferredReplica;
PartitionState(int leader, int preferredReplica) {
this.leader = leader;
this.preferredReplica = preferredReplica;
}
int leader() {
return leader;
}
int preferredReplica() {
return preferredReplica;
}
}
private final Set<Integer> registeredBrokers = new HashSet<>();
private final Set<Integer> fencedBrokers = new HashSet<>();
private int topicCount = 0;
private final Map<TopicIdPartition, PartitionState> topicPartitions = new HashMap<>();
private final Set<TopicIdPartition> offlineTopicPartitions = new HashSet<>();
private final Set<TopicIdPartition> imbalancedTopicPartitions = new HashSet<>();
private final ControllerMetrics controllerMetrics;
ControllerMetricsManager(ControllerMetrics controllerMetrics) {
this.controllerMetrics = controllerMetrics;
}
void replayBatch(long baseOffset, List<ApiMessageAndVersion> messages) {
int i = 1;
for (ApiMessageAndVersion message : messages) {
try {
replay(message.message());
} catch (Exception e) {
String failureMessage = String.format(
"Unable to update controller metrics for %s record, it was %d of %d record(s) " +
"in the batch with baseOffset %d.",
message.message().getClass().getSimpleName(),
i,
messages.size(),
baseOffset
);
throw new IllegalArgumentException(failureMessage, e);
}
i++;
}
}
/**
* Update controller metrics by replaying a metadata record.
*
* This method assumes that the provided ApiMessage is one of the type covered by MetadataRecordType.
*
* @param message a metadata record
*/
@SuppressWarnings("checkstyle:cyclomaticComplexity")
void replay(ApiMessage message) {
MetadataRecordType type = MetadataRecordType.fromId(message.apiKey());
switch (type) {
case REGISTER_BROKER_RECORD:
replay((RegisterBrokerRecord) message);
break;
case UNREGISTER_BROKER_RECORD:
replay((UnregisterBrokerRecord) message);
break;
case FENCE_BROKER_RECORD:
replay((FenceBrokerRecord) message);
break;
case UNFENCE_BROKER_RECORD:
replay((UnfenceBrokerRecord) message);
break;
case BROKER_REGISTRATION_CHANGE_RECORD:
replay((BrokerRegistrationChangeRecord) message);
break;
case TOPIC_RECORD:
replay((TopicRecord) message);
break;
case PARTITION_RECORD:
replay((PartitionRecord) message);
break;
case PARTITION_CHANGE_RECORD:
replay((PartitionChangeRecord) message);
break;
case REMOVE_TOPIC_RECORD:
replay((RemoveTopicRecord) message);
break;
case CONFIG_RECORD:
case FEATURE_LEVEL_RECORD:
case CLIENT_QUOTA_RECORD:
case PRODUCER_IDS_RECORD:
case ACCESS_CONTROL_ENTRY_RECORD:
case REMOVE_ACCESS_CONTROL_ENTRY_RECORD:
case USER_SCRAM_CREDENTIAL_RECORD:
case REMOVE_USER_SCRAM_CREDENTIAL_RECORD:
case NO_OP_RECORD:
case ZK_MIGRATION_STATE_RECORD:
// These record types do not affect metrics
break;
default:
throw new RuntimeException("Unhandled record type " + type);
}
}
private void replay(RegisterBrokerRecord record) {
Integer brokerId = record.brokerId();
registeredBrokers.add(brokerId);
if (record.fenced()) {
fencedBrokers.add(brokerId);
} else {
fencedBrokers.remove(brokerId);
}
updateBrokerStateMetrics();
}
private void replay(UnregisterBrokerRecord record) {
Integer brokerId = record.brokerId();
registeredBrokers.remove(brokerId);
fencedBrokers.remove(brokerId);
updateBrokerStateMetrics();
}
private void replay(FenceBrokerRecord record) {
handleFencingChange(record.id(), BrokerRegistrationFencingChange.FENCE);
}
private void replay(UnfenceBrokerRecord record) {
handleFencingChange(record.id(), BrokerRegistrationFencingChange.UNFENCE);
}
private void replay(BrokerRegistrationChangeRecord record) {
BrokerRegistrationFencingChange fencingChange = BrokerRegistrationFencingChange
.fromValue(record.fenced())
.orElseThrow(() -> {
return new IllegalArgumentException(
String.format(
"Registration change record for %d has unknown value for fenced field: %x",
record.brokerId(),
record.fenced()
)
);
});
handleFencingChange(record.brokerId(), fencingChange);
}
private void handleFencingChange(Integer brokerId, BrokerRegistrationFencingChange fencingChange) {
if (!registeredBrokers.contains(brokerId)) {
throw new IllegalArgumentException(String.format("Broker with id %s is not registered", brokerId));
}
if (fencingChange == BrokerRegistrationFencingChange.FENCE) {
fencedBrokers.add(brokerId);
updateBrokerStateMetrics();
} else if (fencingChange == BrokerRegistrationFencingChange.UNFENCE) {
fencedBrokers.remove(brokerId);
updateBrokerStateMetrics();
} else {
// The fencingChange value is NONE. In this case the controller doesn't need to update the broker
// state metrics.
}
}
private void updateBrokerStateMetrics() {
controllerMetrics.setFencedBrokerCount(fencedBrokers.size());
Set<Integer> activeBrokers = new HashSet<>(registeredBrokers);
activeBrokers.removeAll(fencedBrokers);
controllerMetrics.setActiveBrokerCount(activeBrokers.size());
}
private void replay(TopicRecord record) {
topicCount++;
controllerMetrics.setGlobalTopicCount(topicCount);
}
private void replay(PartitionRecord record) {
TopicIdPartition tp = new TopicIdPartition(record.topicId(), record.partitionId());
PartitionState partitionState = new PartitionState(record.leader(), record.replicas().get(0));
topicPartitions.put(tp, partitionState);
updateBasedOnPartitionState(tp, partitionState);
updateTopicAndPartitionMetrics();
}
private void replay(PartitionChangeRecord record) {
TopicIdPartition tp = new TopicIdPartition(record.topicId(), record.partitionId());
if (!topicPartitions.containsKey(tp)) {
throw new IllegalArgumentException(String.format("Unknown topic partitions %s", tp));
}
PartitionState partitionState = topicPartitions.computeIfPresent(
tp,
(key, oldValue) -> {
PartitionState newValue = oldValue;
// Update replicas
if (record.replicas() != null) {
newValue = new PartitionState(newValue.leader(), record.replicas().get(0));
}
if (record.leader() != NO_LEADER_CHANGE) {
newValue = new PartitionState(record.leader(), newValue.preferredReplica());
}
return newValue;
}
);
updateBasedOnPartitionState(tp, partitionState);
updateTopicAndPartitionMetrics();
}
private void replay(RemoveTopicRecord record) {
Uuid topicId = record.topicId();
Predicate<TopicIdPartition> matchesTopic = tp -> tp.topicId() == topicId;
topicCount--;
topicPartitions.keySet().removeIf(matchesTopic);
offlineTopicPartitions.removeIf(matchesTopic);
imbalancedTopicPartitions.removeIf(matchesTopic);
updateTopicAndPartitionMetrics();
}
private void updateBasedOnPartitionState(TopicIdPartition tp, PartitionState partitionState) {
if (partitionState.leader() == NO_LEADER) {
offlineTopicPartitions.add(tp);
} else {
offlineTopicPartitions.remove(tp);
}
if (partitionState.leader() == partitionState.preferredReplica()) {
imbalancedTopicPartitions.remove(tp);
} else {
imbalancedTopicPartitions.add(tp);
}
}
private void updateTopicAndPartitionMetrics() {
controllerMetrics.setGlobalTopicCount(topicCount);
controllerMetrics.setGlobalPartitionCount(topicPartitions.size());
controllerMetrics.setOfflinePartitionCount(offlineTopicPartitions.size());
controllerMetrics.setPreferredReplicaImbalanceCount(imbalancedTopicPartitions.size());
}
/**
* Resets the value of all of the metrics.
*
* Resets all of the state tracked by this type and resets all of the related controller metrics.
*/
void reset() {
registeredBrokers.clear();
fencedBrokers.clear();
topicCount = 0;
topicPartitions.clear();
offlineTopicPartitions.clear();
imbalancedTopicPartitions.clear();
updateBrokerStateMetrics();
updateTopicAndPartitionMetrics();
}
}

View File

@ -77,6 +77,7 @@ import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.controller.metrics.QuorumControllerMetrics;
import org.apache.kafka.metadata.BrokerHeartbeatReply;
import org.apache.kafka.metadata.BrokerRegistrationReply;
import org.apache.kafka.metadata.FinalizedControllerFeatures;
@ -174,7 +175,7 @@ public final class QuorumController implements Controller {
private OptionalLong leaderImbalanceCheckIntervalNs = OptionalLong.empty();
private OptionalLong maxIdleIntervalNs = OptionalLong.empty();
private long sessionTimeoutNs = ClusterControlManager.DEFAULT_SESSION_TIMEOUT_NS;
private ControllerMetrics controllerMetrics = null;
private QuorumControllerMetrics controllerMetrics = null;
private Optional<CreateTopicPolicy> createTopicPolicy = Optional.empty();
private Optional<AlterConfigPolicy> alterConfigPolicy = Optional.empty();
private ConfigurationValidator configurationValidator = ConfigurationValidator.NO_OP;
@ -258,7 +259,7 @@ public final class QuorumController implements Controller {
return this;
}
public Builder setMetrics(ControllerMetrics controllerMetrics) {
public Builder setMetrics(QuorumControllerMetrics controllerMetrics) {
this.controllerMetrics = controllerMetrics;
return this;
}
@ -322,8 +323,7 @@ public final class QuorumController implements Controller {
logContext = new LogContext(String.format("[QuorumController id=%d] ", nodeId));
}
if (controllerMetrics == null) {
controllerMetrics = (ControllerMetrics) Class.forName(
"org.apache.kafka.controller.MockControllerMetrics").getConstructor().newInstance();
controllerMetrics = new QuorumControllerMetrics(Optional.empty(), time);
}
KafkaEventQueue queue = null;
@ -939,8 +939,6 @@ public final class QuorumController implements Controller {
}
}
controllerMetricsManager.replayBatch(batch.baseOffset(), messages);
updateLastCommittedState(
offset,
epoch,
@ -994,7 +992,6 @@ public final class QuorumController implements Controller {
for (ApiMessageAndVersion message : messages) {
try {
replay(message.message(), Optional.of(reader.snapshotId()), reader.lastContainedLogOffset());
controllerMetricsManager.replay(message.message());
} catch (Throwable e) {
String failureMessage = String.format("Unable to apply %s record " +
"from snapshot %s on standby controller, which was %d of " +
@ -1435,7 +1432,6 @@ public final class QuorumController implements Controller {
*/
private void resetToEmptyState() {
snapshotRegistry.reset();
controllerMetricsManager.reset();
updateLastCommittedState(-1, -1, -1);
}
@ -1479,13 +1475,7 @@ public final class QuorumController implements Controller {
/**
* The controller metrics.
*/
private final ControllerMetrics controllerMetrics;
/**
* Manager for updating controller metrics based on the committed metadata.
*/
private final ControllerMetricsManager controllerMetricsManager;
private final QuorumControllerMetrics controllerMetrics;
/**
* A registry for snapshot data. This must be accessed only by the event queue thread.
@ -1675,7 +1665,7 @@ public final class QuorumController implements Controller {
OptionalLong leaderImbalanceCheckIntervalNs,
OptionalLong maxIdleIntervalNs,
long sessionTimeoutNs,
ControllerMetrics controllerMetrics,
QuorumControllerMetrics controllerMetrics,
Optional<CreateTopicPolicy> createTopicPolicy,
Optional<AlterConfigPolicy> alterConfigPolicy,
ConfigurationValidator configurationValidator,
@ -1693,7 +1683,6 @@ public final class QuorumController implements Controller {
this.queue = queue;
this.time = time;
this.controllerMetrics = controllerMetrics;
this.controllerMetricsManager = new ControllerMetricsManager(controllerMetrics);
this.snapshotRegistry = new SnapshotRegistry(logContext);
this.purgatory = new ControllerPurgatory();
this.resourceExists = new ConfigResourceExistenceChecker();

View File

@ -1,319 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.controller;
import com.yammer.metrics.core.Gauge;
import com.yammer.metrics.core.Histogram;
import com.yammer.metrics.core.MetricName;
import com.yammer.metrics.core.MetricsRegistry;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.metrics.KafkaYammerMetrics;
import java.util.Arrays;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
public final class QuorumControllerMetrics implements ControllerMetrics {
private final static MetricName ACTIVE_CONTROLLER_COUNT = getMetricName(
"KafkaController", "ActiveControllerCount");
private final static MetricName EVENT_QUEUE_TIME_MS = getMetricName(
"ControllerEventManager", "EventQueueTimeMs");
private final static MetricName EVENT_QUEUE_PROCESSING_TIME_MS = getMetricName(
"ControllerEventManager", "EventQueueProcessingTimeMs");
private final static MetricName FENCED_BROKER_COUNT = getMetricName(
"KafkaController", "FencedBrokerCount");
private final static MetricName ACTIVE_BROKER_COUNT = getMetricName(
"KafkaController", "ActiveBrokerCount");
private final static MetricName GLOBAL_TOPIC_COUNT = getMetricName(
"KafkaController", "GlobalTopicCount");
private final static MetricName GLOBAL_PARTITION_COUNT = getMetricName(
"KafkaController", "GlobalPartitionCount");
private final static MetricName OFFLINE_PARTITION_COUNT = getMetricName(
"KafkaController", "OfflinePartitionsCount");
private final static MetricName PREFERRED_REPLICA_IMBALANCE_COUNT = getMetricName(
"KafkaController", "PreferredReplicaImbalanceCount");
private final static MetricName METADATA_ERROR_COUNT = getMetricName(
"KafkaController", "MetadataErrorCount");
private final static MetricName LAST_APPLIED_RECORD_OFFSET = getMetricName(
"KafkaController", "LastAppliedRecordOffset");
private final static MetricName LAST_COMMITTED_RECORD_OFFSET = getMetricName(
"KafkaController", "LastCommittedRecordOffset");
private final static MetricName LAST_APPLIED_RECORD_TIMESTAMP = getMetricName(
"KafkaController", "LastAppliedRecordTimestamp");
private final static MetricName LAST_APPLIED_RECORD_LAG_MS = getMetricName(
"KafkaController", "LastAppliedRecordLagMs");
private final MetricsRegistry registry;
private volatile boolean active;
private volatile int fencedBrokerCount;
private volatile int activeBrokerCount;
private volatile int globalTopicCount;
private volatile int globalPartitionCount;
private volatile int offlinePartitionCount;
private volatile int preferredReplicaImbalanceCount;
private final AtomicInteger metadataErrorCount;
private final AtomicLong lastAppliedRecordOffset = new AtomicLong(0);
private final AtomicLong lastCommittedRecordOffset = new AtomicLong(0);
private final AtomicLong lastAppliedRecordTimestamp = new AtomicLong(0);
private final Gauge<Integer> activeControllerCount;
private final Gauge<Integer> fencedBrokerCountGauge;
private final Gauge<Integer> activeBrokerCountGauge;
private final Gauge<Integer> globalPartitionCountGauge;
private final Gauge<Integer> globalTopicCountGauge;
private final Gauge<Integer> offlinePartitionCountGauge;
private final Gauge<Integer> preferredReplicaImbalanceCountGauge;
private final Gauge<Integer> metadataErrorCountGauge;
private final Gauge<Long> lastAppliedRecordOffsetGauge;
private final Gauge<Long> lastCommittedRecordOffsetGauge;
private final Gauge<Long> lastAppliedRecordTimestampGauge;
private final Gauge<Long> lastAppliedRecordLagMsGauge;
private final Histogram eventQueueTime;
private final Histogram eventQueueProcessingTime;
public QuorumControllerMetrics(
MetricsRegistry registry,
Time time
) {
this.registry = Objects.requireNonNull(registry);
this.active = false;
this.fencedBrokerCount = 0;
this.activeBrokerCount = 0;
this.globalTopicCount = 0;
this.globalPartitionCount = 0;
this.offlinePartitionCount = 0;
this.preferredReplicaImbalanceCount = 0;
this.metadataErrorCount = new AtomicInteger(0);
this.activeControllerCount = registry.newGauge(ACTIVE_CONTROLLER_COUNT, new Gauge<Integer>() {
@Override
public Integer value() {
return active ? 1 : 0;
}
});
this.eventQueueTime = registry.newHistogram(EVENT_QUEUE_TIME_MS, true);
this.eventQueueProcessingTime = registry.newHistogram(EVENT_QUEUE_PROCESSING_TIME_MS, true);
this.fencedBrokerCountGauge = registry.newGauge(FENCED_BROKER_COUNT, new Gauge<Integer>() {
@Override
public Integer value() {
return fencedBrokerCount;
}
});
this.activeBrokerCountGauge = registry.newGauge(ACTIVE_BROKER_COUNT, new Gauge<Integer>() {
@Override
public Integer value() {
return activeBrokerCount;
}
});
this.globalTopicCountGauge = registry.newGauge(GLOBAL_TOPIC_COUNT, new Gauge<Integer>() {
@Override
public Integer value() {
return globalTopicCount;
}
});
this.globalPartitionCountGauge = registry.newGauge(GLOBAL_PARTITION_COUNT, new Gauge<Integer>() {
@Override
public Integer value() {
return globalPartitionCount;
}
});
this.offlinePartitionCountGauge = registry.newGauge(OFFLINE_PARTITION_COUNT, new Gauge<Integer>() {
@Override
public Integer value() {
return offlinePartitionCount;
}
});
this.preferredReplicaImbalanceCountGauge = registry.newGauge(PREFERRED_REPLICA_IMBALANCE_COUNT, new Gauge<Integer>() {
@Override
public Integer value() {
return preferredReplicaImbalanceCount;
}
});
this.metadataErrorCountGauge = registry.newGauge(METADATA_ERROR_COUNT, new Gauge<Integer>() {
@Override
public Integer value() {
return metadataErrorCount.get();
}
});
lastAppliedRecordOffsetGauge = registry.newGauge(LAST_APPLIED_RECORD_OFFSET, new Gauge<Long>() {
@Override
public Long value() {
return lastAppliedRecordOffset.get();
}
});
lastCommittedRecordOffsetGauge = registry.newGauge(LAST_COMMITTED_RECORD_OFFSET, new Gauge<Long>() {
@Override
public Long value() {
return lastCommittedRecordOffset.get();
}
});
lastAppliedRecordTimestampGauge = registry.newGauge(LAST_APPLIED_RECORD_TIMESTAMP, new Gauge<Long>() {
@Override
public Long value() {
return lastAppliedRecordTimestamp.get();
}
});
lastAppliedRecordLagMsGauge = registry.newGauge(LAST_APPLIED_RECORD_LAG_MS, new Gauge<Long>() {
@Override
public Long value() {
return time.milliseconds() - lastAppliedRecordTimestamp.get();
}
});
}
@Override
public void setActive(boolean active) {
this.active = active;
}
@Override
public boolean active() {
return this.active;
}
@Override
public void updateEventQueueTime(long durationMs) {
eventQueueTime.update(durationMs);
}
@Override
public void updateEventQueueProcessingTime(long durationMs) {
eventQueueProcessingTime.update(durationMs);
}
@Override
public void setFencedBrokerCount(int brokerCount) {
this.fencedBrokerCount = brokerCount;
}
@Override
public int fencedBrokerCount() {
return this.fencedBrokerCount;
}
public void setActiveBrokerCount(int brokerCount) {
this.activeBrokerCount = brokerCount;
}
@Override
public int activeBrokerCount() {
return this.activeBrokerCount;
}
@Override
public void setGlobalTopicCount(int topicCount) {
this.globalTopicCount = topicCount;
}
@Override
public int globalTopicCount() {
return this.globalTopicCount;
}
@Override
public void setGlobalPartitionCount(int partitionCount) {
this.globalPartitionCount = partitionCount;
}
@Override
public int globalPartitionCount() {
return this.globalPartitionCount;
}
@Override
public void setOfflinePartitionCount(int offlinePartitions) {
this.offlinePartitionCount = offlinePartitions;
}
@Override
public int offlinePartitionCount() {
return this.offlinePartitionCount;
}
@Override
public void setPreferredReplicaImbalanceCount(int replicaImbalances) {
this.preferredReplicaImbalanceCount = replicaImbalances;
}
@Override
public int preferredReplicaImbalanceCount() {
return this.preferredReplicaImbalanceCount;
}
@Override
public void incrementMetadataErrorCount() {
this.metadataErrorCount.getAndIncrement();
}
@Override
public int metadataErrorCount() {
return this.metadataErrorCount.get();
}
@Override
public void setLastAppliedRecordOffset(long offset) {
lastAppliedRecordOffset.set(offset);
}
@Override
public long lastAppliedRecordOffset() {
return lastAppliedRecordOffset.get();
}
@Override
public void setLastCommittedRecordOffset(long offset) {
lastCommittedRecordOffset.set(offset);
}
@Override
public long lastCommittedRecordOffset() {
return lastCommittedRecordOffset.get();
}
@Override
public void setLastAppliedRecordTimestamp(long timestamp) {
lastAppliedRecordTimestamp.set(timestamp);
}
@Override
public long lastAppliedRecordTimestamp() {
return lastAppliedRecordTimestamp.get();
}
@Override
public void close() {
Arrays.asList(
ACTIVE_CONTROLLER_COUNT,
FENCED_BROKER_COUNT,
ACTIVE_BROKER_COUNT,
EVENT_QUEUE_TIME_MS,
EVENT_QUEUE_PROCESSING_TIME_MS,
GLOBAL_TOPIC_COUNT,
GLOBAL_PARTITION_COUNT,
OFFLINE_PARTITION_COUNT,
PREFERRED_REPLICA_IMBALANCE_COUNT,
METADATA_ERROR_COUNT,
LAST_APPLIED_RECORD_OFFSET,
LAST_COMMITTED_RECORD_OFFSET,
LAST_APPLIED_RECORD_TIMESTAMP,
LAST_APPLIED_RECORD_LAG_MS
).forEach(registry::removeMetric);
}
private static MetricName getMetricName(String type, String name) {
return KafkaYammerMetrics.getMetricName("kafka.controller", type, name);
}
}

View File

@ -0,0 +1,209 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.controller.metrics;
import com.yammer.metrics.core.Gauge;
import com.yammer.metrics.core.MetricName;
import com.yammer.metrics.core.MetricsRegistry;
import org.apache.kafka.server.metrics.KafkaYammerMetrics;
import java.util.Arrays;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
/**
* These are the metrics which are managed by the ControllerServer class. They generally pertain to
* aspects of the metadata, like how many topics or partitions we have.
* All of these except MetadataErrorCount are managed by ControllerMetadataMetricsPublisher.
*
* IMPORTANT: Metrics which are managed by the QuorumController class itself should go in
* @link{org.apache.kafka.controller.metrics.QuorumControllerMetrics}, not here.
*/
public final class ControllerMetadataMetrics implements AutoCloseable {
private final static MetricName FENCED_BROKER_COUNT = getMetricName(
"KafkaController", "FencedBrokerCount");
private final static MetricName ACTIVE_BROKER_COUNT = getMetricName(
"KafkaController", "ActiveBrokerCount");
private final static MetricName GLOBAL_TOPIC_COUNT = getMetricName(
"KafkaController", "GlobalTopicCount");
private final static MetricName GLOBAL_PARTITION_COUNT = getMetricName(
"KafkaController", "GlobalPartitionCount");
private final static MetricName OFFLINE_PARTITION_COUNT = getMetricName(
"KafkaController", "OfflinePartitionsCount");
private final static MetricName PREFERRED_REPLICA_IMBALANCE_COUNT = getMetricName(
"KafkaController", "PreferredReplicaImbalanceCount");
private final static MetricName METADATA_ERROR_COUNT = getMetricName(
"KafkaController", "MetadataErrorCount");
private final Optional<MetricsRegistry> registry;
private final AtomicInteger fencedBrokerCount = new AtomicInteger(0);
private final AtomicInteger activeBrokerCount = new AtomicInteger(0);
private final AtomicInteger globalTopicCount = new AtomicInteger(0);
private final AtomicInteger globalPartitionCount = new AtomicInteger(0);
private final AtomicInteger offlinePartitionCount = new AtomicInteger(0);
private final AtomicInteger preferredReplicaImbalanceCount = new AtomicInteger(0);
private final AtomicInteger metadataErrorCount = new AtomicInteger(0);
/**
* Create a new ControllerMetadataMetrics object.
*
* @param registry The metrics registry, or Optional.empty if this is a test and we don't have one.
*/
public ControllerMetadataMetrics(Optional<MetricsRegistry> registry) {
this.registry = registry;
registry.ifPresent(r -> r.newGauge(FENCED_BROKER_COUNT, new Gauge<Integer>() {
@Override
public Integer value() {
return fencedBrokerCount();
}
}));
registry.ifPresent(r -> r.newGauge(ACTIVE_BROKER_COUNT, new Gauge<Integer>() {
@Override
public Integer value() {
return activeBrokerCount();
}
}));
registry.ifPresent(r -> r.newGauge(GLOBAL_TOPIC_COUNT, new Gauge<Integer>() {
@Override
public Integer value() {
return globalTopicCount();
}
}));
registry.ifPresent(r -> r.newGauge(GLOBAL_PARTITION_COUNT, new Gauge<Integer>() {
@Override
public Integer value() {
return globalPartitionCount();
}
}));
registry.ifPresent(r -> r.newGauge(OFFLINE_PARTITION_COUNT, new Gauge<Integer>() {
@Override
public Integer value() {
return offlinePartitionCount();
}
}));
registry.ifPresent(r -> r.newGauge(PREFERRED_REPLICA_IMBALANCE_COUNT, new Gauge<Integer>() {
@Override
public Integer value() {
return preferredReplicaImbalanceCount();
}
}));
registry.ifPresent(r -> r.newGauge(METADATA_ERROR_COUNT, new Gauge<Integer>() {
@Override
public Integer value() {
return metadataErrorCount();
}
}));
}
public void setFencedBrokerCount(int brokerCount) {
this.fencedBrokerCount.set(brokerCount);
}
public void addToFencedBrokerCount(int brokerCountDelta) {
this.fencedBrokerCount.addAndGet(brokerCountDelta);
}
public int fencedBrokerCount() {
return this.fencedBrokerCount.get();
}
public void setActiveBrokerCount(int brokerCount) {
this.activeBrokerCount.set(brokerCount);
}
public void addToActiveBrokerCount(int brokerCountDelta) {
this.activeBrokerCount.addAndGet(brokerCountDelta);
}
public int activeBrokerCount() {
return this.activeBrokerCount.get();
}
public void setGlobalTopicCount(int topicCount) {
this.globalTopicCount.set(topicCount);
}
public void addToGlobalTopicCount(int topicCountDelta) {
this.globalTopicCount.addAndGet(topicCountDelta);
}
public int globalTopicCount() {
return this.globalTopicCount.get();
}
public void setGlobalPartitionCount(int partitionCount) {
this.globalPartitionCount.set(partitionCount);
}
public void addToGlobalPartitionCount(int partitionCountDelta) {
this.globalPartitionCount.addAndGet(partitionCountDelta);
}
public int globalPartitionCount() {
return this.globalPartitionCount.get();
}
public void setOfflinePartitionCount(int offlinePartitions) {
this.offlinePartitionCount.set(offlinePartitions);
}
public void addToOfflinePartitionCount(int offlinePartitionsDelta) {
this.offlinePartitionCount.addAndGet(offlinePartitionsDelta);
}
public int offlinePartitionCount() {
return this.offlinePartitionCount.get();
}
public void setPreferredReplicaImbalanceCount(int replicaImbalances) {
this.preferredReplicaImbalanceCount.set(replicaImbalances);
}
public void addToPreferredReplicaImbalanceCount(int replicaImbalancesCount) {
this.preferredReplicaImbalanceCount.addAndGet(replicaImbalancesCount);
}
public int preferredReplicaImbalanceCount() {
return this.preferredReplicaImbalanceCount.get();
}
public void incrementMetadataErrorCount() {
this.metadataErrorCount.getAndIncrement();
}
public int metadataErrorCount() {
return this.metadataErrorCount.get();
}
@Override
public void close() {
registry.ifPresent(r -> Arrays.asList(
FENCED_BROKER_COUNT,
ACTIVE_BROKER_COUNT,
GLOBAL_TOPIC_COUNT,
GLOBAL_PARTITION_COUNT,
OFFLINE_PARTITION_COUNT,
PREFERRED_REPLICA_IMBALANCE_COUNT,
METADATA_ERROR_COUNT
).forEach(r::removeMetric));
}
private static MetricName getMetricName(String type, String name) {
return KafkaYammerMetrics.getMetricName("kafka.controller", type, name);
}
}

View File

@ -0,0 +1,156 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.controller.metrics;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.TopicDelta;
import org.apache.kafka.image.TopicImage;
import org.apache.kafka.image.loader.LoaderManifest;
import org.apache.kafka.image.publisher.MetadataPublisher;
import org.apache.kafka.metadata.BrokerRegistration;
import org.apache.kafka.metadata.PartitionRegistration;
import org.apache.kafka.server.fault.FaultHandler;
import java.util.Map.Entry;
import java.util.Optional;
/**
* This publisher translates metadata updates sent by MetadataLoader into changes to controller
* metrics. Like all MetadataPublisher objects, it only receives notifications about events that
* have been persisted to the metadata log. So on the active controller, it will run slightly
* behind the latest in-memory state which has not yet been fully persisted to the log. This is
* reasonable for metrics, which don't need up-to-the-millisecond update latency.
*
* NOTE: the ZK controller has some special rules for calculating preferredReplicaImbalanceCount
* which we haven't implemented here. Specifically, the ZK controller considers reassigning
* partitions to always have their preferred leader, even if they don't.
* All other metrics should be the same, as far as is possible.
*/
public class ControllerMetadataMetricsPublisher implements MetadataPublisher {
private final ControllerMetadataMetrics metrics;
private final FaultHandler faultHandler;
private MetadataImage prevImage = MetadataImage.EMPTY;
public ControllerMetadataMetricsPublisher(
ControllerMetadataMetrics metrics,
FaultHandler faultHandler
) {
this.metrics = metrics;
this.faultHandler = faultHandler;
}
@Override
public String name() {
return "ControllerMetadataMetricsPublisher";
}
@Override
public void onMetadataUpdate(
MetadataDelta delta,
MetadataImage newImage,
LoaderManifest manifest
) {
switch (manifest.type()) {
case LOG_DELTA:
try {
publishDelta(delta);
} catch (Throwable e) {
faultHandler.handleFault("Failed to publish controller metrics from log delta " +
" ending at offset " + manifest.provenance().lastContainedOffset(), e);
} finally {
prevImage = newImage;
}
break;
case SNAPSHOT:
try {
publishSnapshot(newImage);
} catch (Throwable e) {
faultHandler.handleFault("Failed to publish controller metrics from " +
manifest.provenance().snapshotName(), e);
} finally {
prevImage = newImage;
}
break;
}
}
private void publishDelta(MetadataDelta delta) {
ControllerMetricsChanges changes = new ControllerMetricsChanges();
if (delta.clusterDelta() != null) {
for (Entry<Integer, Optional<BrokerRegistration>> entry :
delta.clusterDelta().changedBrokers().entrySet()) {
changes.handleBrokerChange(prevImage.cluster().brokers().get(entry.getKey()),
entry.getValue().orElse(null));
}
}
if (delta.topicsDelta() != null) {
for (Uuid topicId : delta.topicsDelta().deletedTopicIds()) {
TopicImage prevTopic = prevImage.topics().topicsById().get(topicId);
if (prevTopic == null) {
throw new RuntimeException("Unable to find deleted topic id " + topicId +
" in previous topics image.");
}
changes.handleDeletedTopic(prevTopic);
}
for (Entry<Uuid, TopicDelta> entry : delta.topicsDelta().changedTopics().entrySet()) {
changes.handleTopicChange(prevImage.topics().getTopic(entry.getKey()), entry.getValue());
}
}
changes.apply(metrics);
}
private void publishSnapshot(MetadataImage newImage) {
metrics.setGlobalTopicCount(newImage.topics().topicsById().size());
int fencedBrokers = 0;
int activeBrokers = 0;
for (BrokerRegistration broker : newImage.cluster().brokers().values()) {
if (broker.fenced()) {
fencedBrokers++;
} else {
activeBrokers++;
}
}
metrics.setFencedBrokerCount(fencedBrokers);
metrics.setActiveBrokerCount(activeBrokers);
int totalPartitions = 0;
int offlinePartitions = 0;
int partitionsWithoutPreferredLeader = 0;
for (TopicImage topicImage : newImage.topics().topicsById().values()) {
for (PartitionRegistration partition : topicImage.partitions().values()) {
if (!partition.hasLeader()) {
offlinePartitions++;
}
if (!partition.hasPreferredLeader()) {
partitionsWithoutPreferredLeader++;
}
totalPartitions++;
}
}
metrics.setGlobalPartitionCount(totalPartitions);
metrics.setOfflinePartitionCount(offlinePartitions);
metrics.setPreferredReplicaImbalanceCount(partitionsWithoutPreferredLeader);
}
@Override
public void close() {
metrics.close();
}
}

View File

@ -0,0 +1,157 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.controller.metrics;
import org.apache.kafka.image.TopicDelta;
import org.apache.kafka.image.TopicImage;
import org.apache.kafka.metadata.BrokerRegistration;
import org.apache.kafka.metadata.PartitionRegistration;
import java.util.Map.Entry;
/**
* The ControllerMetricsChanges class is used inside ControllerMetricsPublisher to track the
* metrics changes triggered by a series of deltas.
*/
class ControllerMetricsChanges {
/**
* Calculates the change between two boolean values, expressed as an integer.
*/
static int delta(boolean prev, boolean next) {
if (prev) {
return next ? 0 : -1;
} else {
return next ? 1 : 0;
}
}
private int fencedBrokersChange = 0;
private int activeBrokersChange = 0;
private int globalTopicsChange = 0;
private int globalPartitionsChange = 0;
private int offlinePartitionsChange = 0;
private int partitionsWithoutPreferredLeaderChange = 0;
public int fencedBrokersChange() {
return fencedBrokersChange;
}
public int activeBrokersChange() {
return activeBrokersChange;
}
public int globalTopicsChange() {
return globalTopicsChange;
}
public int globalPartitionsChange() {
return globalPartitionsChange;
}
public int offlinePartitionsChange() {
return offlinePartitionsChange;
}
public int partitionsWithoutPreferredLeaderChange() {
return partitionsWithoutPreferredLeaderChange;
}
void handleBrokerChange(BrokerRegistration prev, BrokerRegistration next) {
boolean wasFenced = false;
boolean wasActive = false;
if (prev != null) {
wasFenced = prev.fenced();
wasActive = !prev.fenced();
}
boolean isFenced = false;
boolean isActive = false;
if (next != null) {
isFenced = next.fenced();
isActive = !next.fenced();
}
fencedBrokersChange += delta(wasFenced, isFenced);
activeBrokersChange += delta(wasActive, isActive);
}
void handleDeletedTopic(TopicImage deletedTopic) {
deletedTopic.partitions().values().forEach(prev -> handlePartitionChange(prev, null));
globalTopicsChange--;
}
void handleTopicChange(TopicImage prev, TopicDelta topicDelta) {
if (prev == null) {
globalTopicsChange++;
for (PartitionRegistration nextPartition : topicDelta.partitionChanges().values()) {
handlePartitionChange(null, nextPartition);
}
} else {
for (Entry<Integer, PartitionRegistration> entry : topicDelta.partitionChanges().entrySet()) {
int partitionId = entry.getKey();
PartitionRegistration nextPartition = entry.getValue();
handlePartitionChange(prev.partitions().get(partitionId), nextPartition);
}
}
}
void handlePartitionChange(PartitionRegistration prev, PartitionRegistration next) {
boolean wasPresent = false;
boolean wasOffline = false;
boolean wasWithoutPreferredLeader = false;
if (prev != null) {
wasPresent = true;
wasOffline = !prev.hasLeader();
wasWithoutPreferredLeader = !prev.hasPreferredLeader();
}
boolean isPresent = false;
boolean isOffline = false;
boolean isWithoutPreferredLeader = false;
if (next != null) {
isPresent = true;
isOffline = !next.hasLeader();
isWithoutPreferredLeader = !next.hasPreferredLeader();
}
globalPartitionsChange += delta(wasPresent, isPresent);
offlinePartitionsChange += delta(wasOffline, isOffline);
partitionsWithoutPreferredLeaderChange += delta(wasWithoutPreferredLeader, isWithoutPreferredLeader);
}
/**
* Apply these changes to the metrics object.
*/
void apply(ControllerMetadataMetrics metrics) {
if (fencedBrokersChange != 0) {
metrics.addToFencedBrokerCount(fencedBrokersChange);
}
if (activeBrokersChange != 0) {
metrics.addToActiveBrokerCount(activeBrokersChange);
}
if (globalTopicsChange != 0) {
metrics.addToGlobalTopicCount(globalTopicsChange);
}
if (globalPartitionsChange != 0) {
metrics.addToGlobalPartitionCount(globalPartitionsChange);
}
if (offlinePartitionsChange != 0) {
metrics.addToOfflinePartitionCount(offlinePartitionsChange);
}
if (partitionsWithoutPreferredLeaderChange != 0) {
metrics.addToPreferredReplicaImbalanceCount(partitionsWithoutPreferredLeaderChange);
}
}
}

View File

@ -0,0 +1,169 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.controller.metrics;
import com.yammer.metrics.core.Gauge;
import com.yammer.metrics.core.Histogram;
import com.yammer.metrics.core.MetricName;
import com.yammer.metrics.core.MetricsRegistry;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.metrics.KafkaYammerMetrics;
import java.util.Arrays;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
/**
* These are the metrics which are managed by the QuorumController class. They generally pertain to
* aspects of the internal operation of the controller, such as the time events spend on the
* controller queue.
*
* IMPORTANT: Metrics which relate to the metadata itself (like number of topics, etc.) should go in
* @link{org.apache.kafka.controller.metrics.ControllerMetadataMetrics}, not here.
*/
public class QuorumControllerMetrics implements AutoCloseable {
private final static MetricName ACTIVE_CONTROLLER_COUNT = getMetricName(
"KafkaController", "ActiveControllerCount");
private final static MetricName EVENT_QUEUE_TIME_MS = getMetricName(
"ControllerEventManager", "EventQueueTimeMs");
private final static MetricName EVENT_QUEUE_PROCESSING_TIME_MS = getMetricName(
"ControllerEventManager", "EventQueueProcessingTimeMs");
private final static MetricName LAST_APPLIED_RECORD_OFFSET = getMetricName(
"KafkaController", "LastAppliedRecordOffset");
private final static MetricName LAST_COMMITTED_RECORD_OFFSET = getMetricName(
"KafkaController", "LastCommittedRecordOffset");
private final static MetricName LAST_APPLIED_RECORD_TIMESTAMP = getMetricName(
"KafkaController", "LastAppliedRecordTimestamp");
private final static MetricName LAST_APPLIED_RECORD_LAG_MS = getMetricName(
"KafkaController", "LastAppliedRecordLagMs");
private final Optional<MetricsRegistry> registry;
private volatile boolean active;
private final AtomicLong lastAppliedRecordOffset = new AtomicLong(0);
private final AtomicLong lastCommittedRecordOffset = new AtomicLong(0);
private final AtomicLong lastAppliedRecordTimestamp = new AtomicLong(0);
private final Consumer<Long> eventQueueTimeUpdater;
private final Consumer<Long> eventQueueProcessingTimeUpdater;
private Consumer<Long> newHistogram(MetricName name, boolean biased) {
if (registry.isPresent()) {
Histogram histogram = registry.get().newHistogram(name, biased);
return e -> histogram.update(e);
} else {
return __ -> { };
}
}
public QuorumControllerMetrics(
Optional<MetricsRegistry> registry,
Time time
) {
this.registry = registry;
this.active = false;
registry.ifPresent(r -> r.newGauge(ACTIVE_CONTROLLER_COUNT, new Gauge<Integer>() {
@Override
public Integer value() {
return active ? 1 : 0;
}
}));
this.eventQueueTimeUpdater = newHistogram(EVENT_QUEUE_TIME_MS, true);
this.eventQueueProcessingTimeUpdater = newHistogram(EVENT_QUEUE_PROCESSING_TIME_MS, true);
registry.ifPresent(r -> r.newGauge(LAST_APPLIED_RECORD_OFFSET, new Gauge<Long>() {
@Override
public Long value() {
return lastAppliedRecordOffset();
}
}));
registry.ifPresent(r -> r.newGauge(LAST_COMMITTED_RECORD_OFFSET, new Gauge<Long>() {
@Override
public Long value() {
return lastCommittedRecordOffset();
}
}));
registry.ifPresent(r -> r.newGauge(LAST_APPLIED_RECORD_TIMESTAMP, new Gauge<Long>() {
@Override
public Long value() {
return lastAppliedRecordTimestamp();
}
}));
registry.ifPresent(r -> r.newGauge(LAST_APPLIED_RECORD_LAG_MS, new Gauge<Long>() {
@Override
public Long value() {
return time.milliseconds() - lastAppliedRecordTimestamp();
}
}));
}
public void setActive(boolean active) {
this.active = active;
}
public boolean active() {
return this.active;
}
public void updateEventQueueTime(long durationMs) {
eventQueueTimeUpdater.accept(durationMs);
}
public void updateEventQueueProcessingTime(long durationMs) {
eventQueueProcessingTimeUpdater.accept(durationMs);
}
public void setLastAppliedRecordOffset(long offset) {
lastAppliedRecordOffset.set(offset);
}
public long lastAppliedRecordOffset() {
return lastAppliedRecordOffset.get();
}
public void setLastCommittedRecordOffset(long offset) {
lastCommittedRecordOffset.set(offset);
}
public long lastCommittedRecordOffset() {
return lastCommittedRecordOffset.get();
}
public void setLastAppliedRecordTimestamp(long timestamp) {
lastAppliedRecordTimestamp.set(timestamp);
}
public long lastAppliedRecordTimestamp() {
return lastAppliedRecordTimestamp.get();
}
@Override
public void close() {
registry.ifPresent(r -> Arrays.asList(
ACTIVE_CONTROLLER_COUNT,
EVENT_QUEUE_TIME_MS,
EVENT_QUEUE_PROCESSING_TIME_MS,
LAST_APPLIED_RECORD_OFFSET,
LAST_COMMITTED_RECORD_OFFSET,
LAST_APPLIED_RECORD_TIMESTAMP,
LAST_APPLIED_RECORD_LAG_MS
).forEach(r::removeMetric));
}
private static MetricName getMetricName(String type, String name) {
return KafkaYammerMetrics.getMetricName("kafka.controller", type, name);
}
}

View File

@ -1,364 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.controller;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.OptionalInt;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
import org.apache.kafka.common.metadata.FenceBrokerRecord;
import org.apache.kafka.common.metadata.PartitionChangeRecord;
import org.apache.kafka.common.metadata.PartitionRecord;
import org.apache.kafka.common.metadata.RegisterBrokerRecord;
import org.apache.kafka.common.metadata.RemoveTopicRecord;
import org.apache.kafka.common.metadata.TopicRecord;
import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
import org.apache.kafka.metadata.BrokerRegistrationFencingChange;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.junit.jupiter.api.Test;
import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER;
import static org.junit.jupiter.api.Assertions.assertEquals;
final class ControllerMetricsManagerTest {
@Test
public void testActiveBrokerRegistration() {
ControllerMetrics metrics = new MockControllerMetrics();
ControllerMetricsManager manager = new ControllerMetricsManager(metrics);
manager.replay(brokerRegistration(1, 1, false));
assertEquals(1, metrics.activeBrokerCount());
assertEquals(0, metrics.fencedBrokerCount());
}
@Test
public void testFenceBrokerRegistration() {
ControllerMetrics metrics = new MockControllerMetrics();
ControllerMetricsManager manager = new ControllerMetricsManager(metrics);
manager.replay(brokerRegistration(1, 1, true));
assertEquals(0, metrics.activeBrokerCount());
assertEquals(1, metrics.fencedBrokerCount());
}
@Test
public void testBrokerChangedToActive() {
ControllerMetrics metrics = new MockControllerMetrics();
ControllerMetricsManager manager = new ControllerMetricsManager(metrics);
manager.replay(brokerRegistration(1, 1, true));
manager.replay(brokerChange(1, 1, BrokerRegistrationFencingChange.UNFENCE));
assertEquals(1, metrics.activeBrokerCount());
assertEquals(0, metrics.fencedBrokerCount());
}
@Test
public void testBrokerLegacyChangedToActive() {
ControllerMetrics metrics = new MockControllerMetrics();
ControllerMetricsManager manager = new ControllerMetricsManager(metrics);
manager.replay(brokerRegistration(1, 1, true));
manager.replay(brokerUnfence(1, 1));
assertEquals(1, metrics.activeBrokerCount());
assertEquals(0, metrics.fencedBrokerCount());
}
@Test
public void testBrokerChangedToFence() {
ControllerMetrics metrics = new MockControllerMetrics();
ControllerMetricsManager manager = new ControllerMetricsManager(metrics);
manager.replay(brokerRegistration(1, 1, false));
manager.replay(brokerChange(1, 1, BrokerRegistrationFencingChange.FENCE));
assertEquals(0, metrics.activeBrokerCount());
assertEquals(1, metrics.fencedBrokerCount());
}
@Test
public void testBrokerLegacyChangedToFence() {
ControllerMetrics metrics = new MockControllerMetrics();
ControllerMetricsManager manager = new ControllerMetricsManager(metrics);
manager.replay(brokerRegistration(1, 1, false));
manager.replay(brokerFence(1, 1));
assertEquals(0, metrics.activeBrokerCount());
assertEquals(1, metrics.fencedBrokerCount());
}
@Test
public void testBrokerUnchanged() {
ControllerMetrics metrics = new MockControllerMetrics();
ControllerMetricsManager manager = new ControllerMetricsManager(metrics);
manager.replay(brokerRegistration(1, 1, true));
manager.replay(brokerChange(1, 1, BrokerRegistrationFencingChange.NONE));
assertEquals(0, metrics.activeBrokerCount());
assertEquals(1, metrics.fencedBrokerCount());
}
@Test
public void testBrokerUnregister() {
ControllerMetrics metrics = new MockControllerMetrics();
ControllerMetricsManager manager = new ControllerMetricsManager(metrics);
manager.replay(brokerRegistration(1, 1, true));
manager.replay(brokerRegistration(2, 1, false));
assertEquals(1, metrics.activeBrokerCount());
assertEquals(1, metrics.fencedBrokerCount());
manager.replay(brokerUnregistration(1, 1));
assertEquals(1, metrics.activeBrokerCount());
assertEquals(0, metrics.fencedBrokerCount());
manager.replay(brokerUnregistration(2, 1));
assertEquals(0, metrics.activeBrokerCount());
assertEquals(0, metrics.fencedBrokerCount());
}
@Test
public void testReplayBatch() {
ControllerMetrics metrics = new MockControllerMetrics();
ControllerMetricsManager manager = new ControllerMetricsManager(metrics);
manager.replayBatch(
0,
Arrays.asList(
new ApiMessageAndVersion(brokerRegistration(1, 1, true), (short) 0),
new ApiMessageAndVersion(brokerChange(1, 1, BrokerRegistrationFencingChange.UNFENCE), (short) 0)
)
);
assertEquals(1, metrics.activeBrokerCount());
assertEquals(0, metrics.fencedBrokerCount());
}
@Test
public void testTopicCountIncreased() {
ControllerMetrics metrics = new MockControllerMetrics();
ControllerMetricsManager manager = new ControllerMetricsManager(metrics);
manager.replay(topicRecord("test"));
assertEquals(1, metrics.globalTopicCount());
}
@Test
public void testTopicCountDecreased() {
ControllerMetrics metrics = new MockControllerMetrics();
ControllerMetricsManager manager = new ControllerMetricsManager(metrics);
Uuid id = Uuid.randomUuid();
manager.replay(topicRecord("test", id));
manager.replay(removeTopicRecord(id));
assertEquals(0, metrics.globalTopicCount());
}
@Test
public void testPartitionCountIncreased() {
ControllerMetrics metrics = new MockControllerMetrics();
ControllerMetricsManager manager = new ControllerMetricsManager(metrics);
Uuid id = Uuid.randomUuid();
manager.replay(topicRecord("test", id));
assertEquals(0, metrics.globalPartitionCount());
manager.replay(partitionRecord(id, 0, 0, Arrays.asList(0, 1, 2)));
assertEquals(1, metrics.globalPartitionCount());
manager.replay(partitionRecord(id, 1, 0, Arrays.asList(0, 1, 2)));
assertEquals(2, metrics.globalPartitionCount());
}
@Test
public void testPartitionCountDecreased() {
ControllerMetrics metrics = new MockControllerMetrics();
ControllerMetricsManager manager = new ControllerMetricsManager(metrics);
Uuid id = Uuid.randomUuid();
manager.replay(topicRecord("test", id));
manager.replay(partitionRecord(id, 0, 0, Arrays.asList(0, 1, 2)));
manager.replay(partitionRecord(id, 1, 0, Arrays.asList(0, 1, 2)));
manager.replay(removeTopicRecord(id));
assertEquals(0, metrics.globalPartitionCount());
}
@Test
public void testOfflinePartition() {
ControllerMetrics metrics = new MockControllerMetrics();
ControllerMetricsManager manager = new ControllerMetricsManager(metrics);
Uuid id = Uuid.randomUuid();
manager.replay(topicRecord("test", id));
manager.replay(partitionRecord(id, 0, NO_LEADER, Arrays.asList(0, 1, 2)));
assertEquals(1, metrics.offlinePartitionCount());
}
@Test
public void testImbalancedPartition() {
ControllerMetrics metrics = new MockControllerMetrics();
ControllerMetricsManager manager = new ControllerMetricsManager(metrics);
Uuid id = Uuid.randomUuid();
manager.replay(topicRecord("test", id));
manager.replay(partitionRecord(id, 0, 1, Arrays.asList(0, 1, 2)));
assertEquals(1, metrics.preferredReplicaImbalanceCount());
}
@Test
public void testPartitionChange() {
ControllerMetrics metrics = new MockControllerMetrics();
ControllerMetricsManager manager = new ControllerMetricsManager(metrics);
Uuid id = Uuid.randomUuid();
manager.replay(topicRecord("test", id));
manager.replay(partitionRecord(id, 0, 0, Arrays.asList(0, 1, 2)));
manager.replay(partitionChangeRecord(id, 0, OptionalInt.of(NO_LEADER), Optional.empty()));
assertEquals(1, metrics.offlinePartitionCount());
manager.replay(partitionChangeRecord(id, 0, OptionalInt.of(1), Optional.empty()));
assertEquals(0, metrics.offlinePartitionCount());
assertEquals(1, metrics.preferredReplicaImbalanceCount());
manager.replay(partitionChangeRecord(id, 0, OptionalInt.of(0), Optional.empty()));
assertEquals(0, metrics.preferredReplicaImbalanceCount());
manager.replay(partitionChangeRecord(id, 0, OptionalInt.empty(), Optional.of(Arrays.asList(1, 2, 0))));
assertEquals(1, metrics.preferredReplicaImbalanceCount());
manager.replay(partitionChangeRecord(id, 0, OptionalInt.of(2), Optional.of(Arrays.asList(2, 0, 1))));
assertEquals(0, metrics.preferredReplicaImbalanceCount());
}
@Test
public void testStartingMetrics() {
ControllerMetrics metrics = new MockControllerMetrics();
ControllerMetricsManager manager = new ControllerMetricsManager(metrics);
assertEquals(0, metrics.activeBrokerCount());
assertEquals(0, metrics.fencedBrokerCount());
assertEquals(0, metrics.globalTopicCount());
assertEquals(0, metrics.globalPartitionCount());
assertEquals(0, metrics.offlinePartitionCount());
assertEquals(0, metrics.preferredReplicaImbalanceCount());
}
@Test
public void testReset() {
ControllerMetrics metrics = new MockControllerMetrics();
ControllerMetricsManager manager = new ControllerMetricsManager(metrics);
manager.replay(brokerRegistration(1, 1, true));
Uuid id = Uuid.randomUuid();
manager.replay(topicRecord("test", id));
manager.replay(partitionRecord(id, 0, 0, Arrays.asList(0, 1, 2)));
manager.reset();
assertEquals(0, metrics.activeBrokerCount());
assertEquals(0, metrics.fencedBrokerCount());
assertEquals(0, metrics.globalTopicCount());
assertEquals(0, metrics.globalPartitionCount());
assertEquals(0, metrics.offlinePartitionCount());
assertEquals(0, metrics.preferredReplicaImbalanceCount());
}
private static RegisterBrokerRecord brokerRegistration(
int brokerId,
long epoch,
boolean fenced
) {
return new RegisterBrokerRecord()
.setBrokerId(brokerId)
.setIncarnationId(Uuid.randomUuid())
.setBrokerEpoch(epoch)
.setFenced(fenced);
}
private static UnregisterBrokerRecord brokerUnregistration(
int brokerId,
long epoch
) {
return new UnregisterBrokerRecord()
.setBrokerId(brokerId)
.setBrokerEpoch(epoch);
}
private static BrokerRegistrationChangeRecord brokerChange(
int brokerId,
long epoch,
BrokerRegistrationFencingChange fencing
) {
return new BrokerRegistrationChangeRecord()
.setBrokerId(brokerId)
.setBrokerEpoch(epoch)
.setFenced(fencing.value());
}
private static UnfenceBrokerRecord brokerUnfence(int brokerId, long epoch) {
return new UnfenceBrokerRecord()
.setId(brokerId)
.setEpoch(epoch);
}
private static FenceBrokerRecord brokerFence(int brokerId, long epoch) {
return new FenceBrokerRecord()
.setId(brokerId)
.setEpoch(epoch);
}
private static TopicRecord topicRecord(String name) {
return new TopicRecord().setName(name).setTopicId(Uuid.randomUuid());
}
private static TopicRecord topicRecord(String name, Uuid id) {
return new TopicRecord().setName(name).setTopicId(id);
}
private static RemoveTopicRecord removeTopicRecord(Uuid id) {
return new RemoveTopicRecord().setTopicId(id);
}
private static PartitionRecord partitionRecord(
Uuid id,
int partition,
int leader,
List<Integer> replicas
) {
return new PartitionRecord()
.setPartitionId(partition)
.setTopicId(id)
.setReplicas(replicas)
.setIsr(replicas)
.setLeader(leader);
}
private static PartitionChangeRecord partitionChangeRecord(
Uuid id,
int partition,
OptionalInt leader,
Optional<List<Integer>> replicas
) {
PartitionChangeRecord record = new PartitionChangeRecord();
leader.ifPresent(record::setLeader);
replicas.ifPresent(record::setReplicas);
replicas.ifPresent(record::setIsr);
return record
.setPartitionId(partition)
.setTopicId(id);
}
}

View File

@ -1,165 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.controller;
import java.util.concurrent.atomic.AtomicInteger;
public final class MockControllerMetrics implements ControllerMetrics {
private volatile boolean active = false;
private volatile int fencedBrokers = 0;
private volatile int activeBrokers = 0;
private volatile int topics = 0;
private volatile int partitions = 0;
private volatile int offlinePartitions = 0;
private volatile int preferredReplicaImbalances = 0;
private volatile AtomicInteger metadataErrors = new AtomicInteger(0);
private volatile long lastAppliedRecordOffset = 0;
private volatile long lastCommittedRecordOffset = 0;
private volatile long lastAppliedRecordTimestamp = 0;
private volatile boolean closed = false;
@Override
public void setActive(boolean active) {
this.active = active;
}
@Override
public boolean active() {
return this.active;
}
@Override
public void updateEventQueueTime(long durationMs) {
// nothing to do
}
@Override
public void updateEventQueueProcessingTime(long durationMs) {
// nothing to do
}
@Override
public void setFencedBrokerCount(int brokerCount) {
this.fencedBrokers = brokerCount;
}
@Override
public int fencedBrokerCount() {
return this.fencedBrokers;
}
@Override
public void setActiveBrokerCount(int brokerCount) {
this.activeBrokers = brokerCount;
}
@Override
public int activeBrokerCount() {
return activeBrokers;
}
@Override
public void setGlobalTopicCount(int topicCount) {
this.topics = topicCount;
}
@Override
public int globalTopicCount() {
return this.topics;
}
@Override
public void setGlobalPartitionCount(int partitionCount) {
this.partitions = partitionCount;
}
@Override
public int globalPartitionCount() {
return this.partitions;
}
@Override
public void setOfflinePartitionCount(int offlinePartitions) {
this.offlinePartitions = offlinePartitions;
}
@Override
public int offlinePartitionCount() {
return this.offlinePartitions;
}
@Override
public void setPreferredReplicaImbalanceCount(int replicaImbalances) {
this.preferredReplicaImbalances = replicaImbalances;
}
@Override
public int preferredReplicaImbalanceCount() {
return this.preferredReplicaImbalances;
}
@Override
public void incrementMetadataErrorCount() {
this.metadataErrors.getAndIncrement();
}
@Override
public int metadataErrorCount() {
return this.metadataErrors.get();
}
@Override
public void setLastAppliedRecordOffset(long offset) {
lastAppliedRecordOffset = offset;
}
@Override
public long lastAppliedRecordOffset() {
return lastAppliedRecordOffset;
}
@Override
public void setLastCommittedRecordOffset(long offset) {
lastCommittedRecordOffset = offset;
}
@Override
public long lastCommittedRecordOffset() {
return lastCommittedRecordOffset;
}
@Override
public void setLastAppliedRecordTimestamp(long timestamp) {
lastAppliedRecordTimestamp = timestamp;
}
@Override
public long lastAppliedRecordTimestamp() {
return lastAppliedRecordTimestamp;
}
@Override
public void close() {
closed = true;
}
public boolean isClosed() {
return this.closed;
}
}

View File

@ -1,196 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.controller;
import com.yammer.metrics.core.Gauge;
import com.yammer.metrics.core.Histogram;
import com.yammer.metrics.core.MetricName;
import com.yammer.metrics.core.MetricsRegistry;
import java.util.Set;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Utils;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class QuorumControllerMetricsTest {
private static final String EXPECTED_GROUP = "kafka.controller";
@Test
public void testKafkaControllerMetricNames() {
String expectedType = "KafkaController";
Set<String> expectedMetricNames = Utils.mkSet(
"ActiveControllerCount",
"FencedBrokerCount",
"ActiveBrokerCount",
"GlobalTopicCount",
"GlobalPartitionCount",
"OfflinePartitionsCount",
"PreferredReplicaImbalanceCount",
"MetadataErrorCount",
"LastAppliedRecordLagMs",
"LastAppliedRecordOffset",
"LastAppliedRecordTimestamp",
"LastCommittedRecordOffset"
);
assertMetricsCreatedAndRemovedUponClose(expectedType, expectedMetricNames);
}
@Test
public void testControllerEventManagerMetricNames() {
String expectedType = "ControllerEventManager";
Set<String> expectedMetricNames = Utils.mkSet(
"EventQueueTimeMs",
"EventQueueProcessingTimeMs");
assertMetricsCreatedAndRemovedUponClose(expectedType, expectedMetricNames);
}
@Test
public void testUpdateEventQueueTime() {
MetricsRegistry registry = new MetricsRegistry();
MockTime time = new MockTime();
try {
try (QuorumControllerMetrics quorumControllerMetrics = new QuorumControllerMetrics(registry, time)) {
quorumControllerMetrics.updateEventQueueTime(1000);
assertMetricHistogram(registry, metricName("ControllerEventManager", "EventQueueTimeMs"), 1, 1000);
}
} finally {
registry.shutdown();
}
}
@Test
public void testUpdateEventQueueProcessingTime() {
MetricsRegistry registry = new MetricsRegistry();
MockTime time = new MockTime();
try {
try (QuorumControllerMetrics quorumControllerMetrics = new QuorumControllerMetrics(registry, time)) {
quorumControllerMetrics.updateEventQueueProcessingTime(1000);
assertMetricHistogram(registry, metricName("ControllerEventManager", "EventQueueProcessingTimeMs"), 1, 1000);
}
} finally {
registry.shutdown();
}
}
@Test
public void testLastAppliedRecordMetrics() {
MetricsRegistry registry = new MetricsRegistry();
MockTime time = new MockTime();
time.sleep(1000);
try {
try (QuorumControllerMetrics quorumControllerMetrics = new QuorumControllerMetrics(registry, time)) {
quorumControllerMetrics.setLastAppliedRecordOffset(100);
quorumControllerMetrics.setLastAppliedRecordTimestamp(500);
quorumControllerMetrics.setLastCommittedRecordOffset(50);
@SuppressWarnings("unchecked")
Gauge<Long> lastAppliedRecordOffset = (Gauge<Long>) registry
.allMetrics()
.get(metricName("KafkaController", "LastAppliedRecordOffset"));
assertEquals(100, lastAppliedRecordOffset.value());
@SuppressWarnings("unchecked")
Gauge<Long> lastAppliedRecordTimestamp = (Gauge<Long>) registry
.allMetrics()
.get(metricName("KafkaController", "LastAppliedRecordTimestamp"));
assertEquals(500, lastAppliedRecordTimestamp.value());
@SuppressWarnings("unchecked")
Gauge<Long> lastAppliedRecordLagMs = (Gauge<Long>) registry
.allMetrics()
.get(metricName("KafkaController", "LastAppliedRecordLagMs"));
assertEquals(time.milliseconds() - 500, lastAppliedRecordLagMs.value());
@SuppressWarnings("unchecked")
Gauge<Long> lastCommittedRecordOffset = (Gauge<Long>) registry
.allMetrics()
.get(metricName("KafkaController", "LastCommittedRecordOffset"));
assertEquals(50, lastCommittedRecordOffset.value());
}
} finally {
registry.shutdown();
}
}
@Test
public void testMetadataErrorCount() {
MetricsRegistry registry = new MetricsRegistry();
MockTime time = new MockTime();
try {
try (QuorumControllerMetrics quorumControllerMetrics = new QuorumControllerMetrics(registry, time)) {
@SuppressWarnings("unchecked")
Gauge<Integer> metadataErrorCount = (Gauge<Integer>) registry
.allMetrics()
.get(metricName("KafkaController", "MetadataErrorCount"));
assertEquals(0, metadataErrorCount.value());
quorumControllerMetrics.incrementMetadataErrorCount();
assertEquals(1, metadataErrorCount.value());
}
} finally {
registry.shutdown();
}
}
private static void assertMetricsCreatedAndRemovedUponClose(String expectedType, Set<String> expectedMetricNames) {
MetricsRegistry registry = new MetricsRegistry();
MockTime time = new MockTime();
try {
try (QuorumControllerMetrics quorumControllerMetrics = new QuorumControllerMetrics(registry, time)) {
assertMetricsCreated(registry, expectedMetricNames, expectedType);
}
assertMetricsRemoved(registry, expectedMetricNames, expectedType);
} finally {
registry.shutdown();
}
}
private static void assertMetricHistogram(MetricsRegistry registry, MetricName metricName, long count, double sum) {
Histogram histogram = (Histogram) registry.allMetrics().get(metricName);
assertEquals(count, histogram.count());
assertEquals(sum, histogram.sum(), .1);
}
private static MetricName metricName(String type, String name) {
String mBeanName = String.format("kafka.controller:type=%s,name=%s", type, name);
return new MetricName("kafka.controller", type, name, null, mBeanName);
}
private static void assertMetricsCreated(MetricsRegistry registry, Set<String> expectedMetricNames, String expectedType) {
assertEquals(registry.allMetrics().keySet().stream()
.filter(k -> k.getType() == expectedType).count(),
expectedMetricNames.size());
expectedMetricNames.forEach(expectedName -> {
MetricName expectMetricName = metricName(expectedType, expectedName);
assertTrue(registry.allMetrics().containsKey(expectMetricName), "Missing metric: " + expectMetricName);
});
registry.allMetrics().forEach((actualMetricName, actualMetric) -> {
if (actualMetricName.getType() == expectedType) {
assertTrue(expectedMetricNames.contains(actualMetricName.getName()), "Unexpected metric: " + actualMetricName);
}
});
}
private static void assertMetricsRemoved(MetricsRegistry registry, Set<String> expectedMetricNames, String expectedType) {
expectedMetricNames.forEach(expectedName -> {
MetricName expectMetricName = metricName(expectedType, expectedName);
assertFalse(registry.allMetrics().containsKey(expectMetricName), "Found metric: " + expectMetricName);
});
}
}

View File

@ -36,6 +36,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.Collectors;
@ -49,6 +50,7 @@ import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
import org.apache.kafka.common.metadata.ConfigRecord;
import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.TimeoutException;
@ -83,6 +85,7 @@ import org.apache.kafka.common.metadata.TopicRecord;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.controller.QuorumController.ConfigResourceExistenceChecker;
import org.apache.kafka.controller.metrics.QuorumControllerMetrics;
import org.apache.kafka.metadata.BrokerHeartbeatReply;
import org.apache.kafka.metadata.BrokerRegistrationFencingChange;
import org.apache.kafka.metadata.BrokerRegistrationReply;
@ -130,6 +133,20 @@ public class QuorumControllerTest {
static final BootstrapMetadata SIMPLE_BOOTSTRAP = BootstrapMetadata.
fromVersion(MetadataVersion.IBP_3_3_IV3, "test-provided bootstrap");
static class MockControllerMetrics extends QuorumControllerMetrics {
final AtomicBoolean closed = new AtomicBoolean(false);
MockControllerMetrics() {
super(Optional.empty(), Time.SYSTEM);
}
@Override
public void close() {
super.close();
closed.set(true);
}
}
/**
* Test creating a new QuorumController and closing it.
*/
@ -146,7 +163,7 @@ public class QuorumControllerTest {
build()
) {
}
assertTrue(metrics.isClosed(), "metrics were not closed");
assertTrue(metrics.closed.get(), "metrics were not closed");
}
/**

View File

@ -0,0 +1,152 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.controller.metrics;
import java.util.Optional;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.image.AclsImage;
import org.apache.kafka.image.ClientQuotasImage;
import org.apache.kafka.image.ClusterImage;
import org.apache.kafka.image.ConfigurationsImage;
import org.apache.kafka.image.FeaturesImage;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.MetadataProvenance;
import org.apache.kafka.image.ProducerIdsImage;
import org.apache.kafka.image.ScramImage;
import org.apache.kafka.image.TopicsImage;
import org.apache.kafka.image.loader.LoaderManifest;
import org.apache.kafka.image.loader.LogDeltaManifest;
import org.apache.kafka.image.loader.SnapshotManifest;
import org.apache.kafka.image.writer.ImageReWriter;
import org.apache.kafka.image.writer.ImageWriterOptions;
import org.apache.kafka.raft.LeaderAndEpoch;
import org.apache.kafka.server.fault.MockFaultHandler;
import org.junit.jupiter.api.Test;
import static org.apache.kafka.controller.metrics.ControllerMetricsTestUtils.FakePartitionRegistrationType.NON_PREFERRED_LEADER;
import static org.apache.kafka.controller.metrics.ControllerMetricsTestUtils.FakePartitionRegistrationType.NORMAL;
import static org.apache.kafka.controller.metrics.ControllerMetricsTestUtils.FakePartitionRegistrationType.OFFLINE;
import static org.apache.kafka.controller.metrics.ControllerMetricsTestUtils.fakePartitionRegistration;
import static org.apache.kafka.controller.metrics.ControllerMetricsTestUtils.fakeTopicImage;
import static org.apache.kafka.controller.metrics.ControllerMetricsTestUtils.fakeTopicsImage;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class ControllerMetadataMetricsPublisherTest {
static class TestEnv implements AutoCloseable {
MockFaultHandler faultHandler =
new MockFaultHandler("ControllerMetadataMetricsPublisher");
ControllerMetadataMetrics metrics =
new ControllerMetadataMetrics(Optional.empty());
ControllerMetadataMetricsPublisher publisher =
new ControllerMetadataMetricsPublisher(metrics, faultHandler);
@Override
public void close() {
publisher.close();
faultHandler.maybeRethrowFirstException();
}
}
@Test
public void testMetricsBeforePublishing() {
try (TestEnv env = new TestEnv()) {
assertEquals(0, env.metrics.activeBrokerCount());
assertEquals(0, env.metrics.globalTopicCount());
assertEquals(0, env.metrics.globalPartitionCount());
assertEquals(0, env.metrics.offlinePartitionCount());
assertEquals(0, env.metrics.preferredReplicaImbalanceCount());
assertEquals(0, env.metrics.metadataErrorCount());
}
}
static MetadataImage fakeImageFromTopicsImage(TopicsImage topicsImage) {
return new MetadataImage(
MetadataProvenance.EMPTY,
FeaturesImage.EMPTY,
ClusterImage.EMPTY,
topicsImage,
ConfigurationsImage.EMPTY,
ClientQuotasImage.EMPTY,
ProducerIdsImage.EMPTY,
AclsImage.EMPTY,
ScramImage.EMPTY);
}
static final TopicsImage TOPICS_IMAGE1;
static final MetadataImage IMAGE1;
static {
TOPICS_IMAGE1 = fakeTopicsImage(
fakeTopicImage("foo",
Uuid.fromString("JKNp6fQaT-icHxh654ok-w"),
fakePartitionRegistration(NORMAL)),
fakeTopicImage("bar",
Uuid.fromString("pEMSdUVWTXaFQUzLTznFSw"),
fakePartitionRegistration(NORMAL),
fakePartitionRegistration(NORMAL),
fakePartitionRegistration(NON_PREFERRED_LEADER)),
fakeTopicImage("quux",
Uuid.fromString("zkUT4lyyRke6VIaTw6RQWg"),
fakePartitionRegistration(OFFLINE),
fakePartitionRegistration(OFFLINE),
fakePartitionRegistration(OFFLINE))
);
IMAGE1 = fakeImageFromTopicsImage(TOPICS_IMAGE1);
}
@Test
public void testPublish() {
try (TestEnv env = new TestEnv()) {
assertEquals(0, env.metrics.activeBrokerCount());
assertEquals(0, env.metrics.globalTopicCount());
assertEquals(0, env.metrics.globalPartitionCount());
assertEquals(0, env.metrics.offlinePartitionCount());
assertEquals(0, env.metrics.preferredReplicaImbalanceCount());
assertEquals(0, env.metrics.metadataErrorCount());
}
}
static LoaderManifest fakeManifest(boolean isSnapshot) {
if (isSnapshot) {
return new SnapshotManifest(MetadataProvenance.EMPTY, 0);
} else {
return new LogDeltaManifest(MetadataProvenance.EMPTY, LeaderAndEpoch.UNKNOWN, 0, 0, 0);
}
}
@Test
public void testLoadSnapshot() {
try (TestEnv env = new TestEnv()) {
MetadataDelta delta = new MetadataDelta(MetadataImage.EMPTY);
ImageReWriter writer = new ImageReWriter(delta);
IMAGE1.write(writer, new ImageWriterOptions.Builder().
setMetadataVersion(delta.image().features().metadataVersion()).
build());
env.publisher.onMetadataUpdate(delta, IMAGE1, fakeManifest(true));
assertEquals(0, env.metrics.activeBrokerCount());
assertEquals(3, env.metrics.globalTopicCount());
assertEquals(7, env.metrics.globalPartitionCount());
assertEquals(3, env.metrics.offlinePartitionCount());
assertEquals(4, env.metrics.preferredReplicaImbalanceCount());
assertEquals(0, env.metrics.metadataErrorCount());
}
}
}

View File

@ -0,0 +1,174 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.controller.metrics;
import com.yammer.metrics.core.Gauge;
import com.yammer.metrics.core.MetricName;
import com.yammer.metrics.core.MetricsRegistry;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Optional;
import java.util.function.BiConsumer;
import java.util.function.Function;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class ControllerMetadataMetricsTest {
@Test
public void testMetricNames() {
MetricsRegistry registry = new MetricsRegistry();
try {
try (ControllerMetadataMetrics metrics = new ControllerMetadataMetrics(Optional.of(registry))) {
ControllerMetricsTestUtils.assertMetricsForTypeEqual(registry, "kafka.controller:",
new HashSet<>(Arrays.asList(
"kafka.controller:type=KafkaController,name=ActiveBrokerCount",
"kafka.controller:type=KafkaController,name=FencedBrokerCount",
"kafka.controller:type=KafkaController,name=GlobalPartitionCount",
"kafka.controller:type=KafkaController,name=GlobalTopicCount",
"kafka.controller:type=KafkaController,name=MetadataErrorCount",
"kafka.controller:type=KafkaController,name=OfflinePartitionsCount",
"kafka.controller:type=KafkaController,name=PreferredReplicaImbalanceCount"
)));
}
ControllerMetricsTestUtils.assertMetricsForTypeEqual(registry, "KafkaController",
Collections.emptySet());
} finally {
registry.shutdown();
}
}
@Test
public void testMetadataErrorCount() {
MetricsRegistry registry = new MetricsRegistry();
try (ControllerMetadataMetrics metrics = new ControllerMetadataMetrics(Optional.of(registry))) {
@SuppressWarnings("unchecked")
Gauge<Integer> metadataErrorCount = (Gauge<Integer>) registry
.allMetrics()
.get(metricName("KafkaController", "MetadataErrorCount"));
assertEquals(0, metadataErrorCount.value());
metrics.incrementMetadataErrorCount();
assertEquals(1, metadataErrorCount.value());
} finally {
registry.shutdown();
}
}
private static MetricName metricName(String type, String name) {
String mBeanName = String.format("kafka.controller:type=%s,name=%s", type, name);
return new MetricName("kafka.controller", type, name, null, mBeanName);
}
private void testIntGaugeMetric(
Function<ControllerMetadataMetrics, Integer> metricsGetter,
Function<MetricsRegistry, Integer> registryGetter,
BiConsumer<ControllerMetadataMetrics, Integer> setter,
BiConsumer<ControllerMetadataMetrics, Integer> incrementer
) {
MetricsRegistry registry = new MetricsRegistry();
try (ControllerMetadataMetrics metrics = new ControllerMetadataMetrics(Optional.of(registry))) {
assertEquals(0, metricsGetter.apply(metrics));
assertEquals(0, registryGetter.apply(registry));
setter.accept(metrics, 123);
assertEquals(123, metricsGetter.apply(metrics));
assertEquals(123, registryGetter.apply(registry));
incrementer.accept(metrics, 123);
assertEquals(246, metricsGetter.apply(metrics));
assertEquals(246, registryGetter.apply(registry));
incrementer.accept(metrics, -246);
assertEquals(0, metricsGetter.apply(metrics));
assertEquals(0, registryGetter.apply(registry));
} finally {
registry.shutdown();
}
}
@SuppressWarnings("unchecked") // suppress warning about Gauge typecast
@Test
public void testFencedBrokerMetric() {
testIntGaugeMetric(
m -> m.fencedBrokerCount(),
registry -> ((Gauge<Integer>) registry.allMetrics().
get(metricName("KafkaController", "FencedBrokerCount"))).value(),
(m, v) -> m.setFencedBrokerCount(v),
(m, v) -> m.addToFencedBrokerCount(v)
);
}
@SuppressWarnings("unchecked") // suppress warning about Gauge typecast
@Test
public void testActiveBrokerCountMetric() {
testIntGaugeMetric(
m -> m.activeBrokerCount(),
registry -> ((Gauge<Integer>) registry.allMetrics().
get(metricName("KafkaController", "ActiveBrokerCount"))).value(),
(m, v) -> m.setActiveBrokerCount(v),
(m, v) -> m.addToActiveBrokerCount(v)
);
}
@SuppressWarnings("unchecked") // suppress warning about Gauge typecast
@Test
public void testGlobalTopicCountMetric() {
testIntGaugeMetric(
m -> m.globalTopicCount(),
registry -> ((Gauge<Integer>) registry.allMetrics().
get(metricName("KafkaController", "GlobalTopicCount"))).value(),
(m, v) -> m.setGlobalTopicCount(v),
(m, v) -> m.addToGlobalTopicCount(v)
);
}
@SuppressWarnings("unchecked") // suppress warning about Gauge typecast
@Test
public void testGlobalPartitionCountMetric() {
testIntGaugeMetric(
m -> m.globalPartitionCount(),
registry -> ((Gauge<Integer>) registry.allMetrics().
get(metricName("KafkaController", "GlobalPartitionCount"))).value(),
(m, v) -> m.setGlobalPartitionCount(v),
(m, v) -> m.addToGlobalPartitionCount(v)
);
}
@SuppressWarnings("unchecked") // suppress warning about Gauge typecast
@Test
public void testOfflinePartitionCountMetric() {
testIntGaugeMetric(
m -> m.offlinePartitionCount(),
registry -> ((Gauge<Integer>) registry.allMetrics().
get(metricName("KafkaController", "OfflinePartitionsCount"))).value(),
(m, v) -> m.setOfflinePartitionCount(v),
(m, v) -> m.addToOfflinePartitionCount(v)
);
}
@SuppressWarnings("unchecked") // suppress warning about Gauge typecast
@Test
public void testPreferredReplicaImbalanceCountMetric() {
testIntGaugeMetric(
m -> m.preferredReplicaImbalanceCount(),
registry -> ((Gauge<Integer>) registry.allMetrics().
get(metricName("KafkaController", "PreferredReplicaImbalanceCount"))).value(),
(m, v) -> m.setPreferredReplicaImbalanceCount(v),
(m, v) -> m.addToPreferredReplicaImbalanceCount(v)
);
}
}

View File

@ -0,0 +1,173 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.controller.metrics;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.metadata.PartitionChangeRecord;
import org.apache.kafka.common.metadata.PartitionRecord;
import org.apache.kafka.image.TopicDelta;
import org.apache.kafka.image.TopicImage;
import org.apache.kafka.metadata.BrokerRegistration;
import org.apache.kafka.metadata.PartitionRegistration;
import org.junit.jupiter.api.Test;
import static org.apache.kafka.controller.metrics.ControllerMetricsTestUtils.FakePartitionRegistrationType.NORMAL;
import static org.apache.kafka.controller.metrics.ControllerMetricsTestUtils.FakePartitionRegistrationType.NON_PREFERRED_LEADER;
import static org.apache.kafka.controller.metrics.ControllerMetricsTestUtils.FakePartitionRegistrationType.OFFLINE;
import static org.apache.kafka.controller.metrics.ControllerMetricsTestUtils.fakePartitionRegistration;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class ControllerMetricsChangesTest {
@Test
public void testDelta() {
assertEquals(0, ControllerMetricsChanges.delta(false, false));
assertEquals(1, ControllerMetricsChanges.delta(false, true));
assertEquals(-1, ControllerMetricsChanges.delta(true, false));
assertEquals(0, ControllerMetricsChanges.delta(true, true));
}
private static BrokerRegistration brokerRegistration(
int brokerId,
boolean fenced
) {
return new BrokerRegistration(brokerId,
100L,
Uuid.fromString("Pxi6QwS2RFuN8VSKjqJZyQ"),
Collections.emptyList(),
Collections.emptyMap(),
Optional.empty(),
fenced,
false);
}
@Test
public void testInitialValues() {
ControllerMetricsChanges changes = new ControllerMetricsChanges();
assertEquals(0, changes.fencedBrokersChange());
assertEquals(0, changes.activeBrokersChange());
assertEquals(0, changes.globalTopicsChange());
assertEquals(0, changes.globalPartitionsChange());
assertEquals(0, changes.offlinePartitionsChange());
assertEquals(0, changes.partitionsWithoutPreferredLeaderChange());
}
@Test
public void testHandleNewUnfencedBroker() {
ControllerMetricsChanges changes = new ControllerMetricsChanges();
changes.handleBrokerChange(null, brokerRegistration(1, false));
assertEquals(0, changes.fencedBrokersChange());
assertEquals(1, changes.activeBrokersChange());
}
@Test
public void testHandleNewFencedBroker() {
ControllerMetricsChanges changes = new ControllerMetricsChanges();
changes.handleBrokerChange(null, brokerRegistration(1, true));
assertEquals(1, changes.fencedBrokersChange());
assertEquals(0, changes.activeBrokersChange());
}
@Test
public void testHandleBrokerFencing() {
ControllerMetricsChanges changes = new ControllerMetricsChanges();
changes.handleBrokerChange(brokerRegistration(1, false), brokerRegistration(1, true));
assertEquals(1, changes.fencedBrokersChange());
assertEquals(-1, changes.activeBrokersChange());
}
@Test
public void testHandleBrokerUnfencing() {
ControllerMetricsChanges changes = new ControllerMetricsChanges();
changes.handleBrokerChange(brokerRegistration(1, true), brokerRegistration(1, false));
assertEquals(-1, changes.fencedBrokersChange());
assertEquals(1, changes.activeBrokersChange());
}
@Test
public void testHandleDeletedTopic() {
ControllerMetricsChanges changes = new ControllerMetricsChanges();
Map<Integer, PartitionRegistration> partitions = new HashMap<>();
partitions.put(0, fakePartitionRegistration(NORMAL));
partitions.put(1, fakePartitionRegistration(NORMAL));
partitions.put(2, fakePartitionRegistration(NON_PREFERRED_LEADER));
partitions.put(3, fakePartitionRegistration(NON_PREFERRED_LEADER));
partitions.put(4, fakePartitionRegistration(OFFLINE));
TopicImage topicImage = new TopicImage("foo",
Uuid.fromString("wXtW6pQbTS2CL6PjdRCqVw"),
partitions);
changes.handleDeletedTopic(topicImage);
assertEquals(-1, changes.globalTopicsChange());
assertEquals(-5, changes.globalPartitionsChange());
assertEquals(-1, changes.offlinePartitionsChange());
// The offline partition counts as a partition without its preferred leader.
assertEquals(-3, changes.partitionsWithoutPreferredLeaderChange());
}
static final Uuid FOO_ID = Uuid.fromString("wXtW6pQbTS2CL6PjdRCqVw");
static final TopicDelta TOPIC_DELTA1;
static final TopicDelta TOPIC_DELTA2;
static {
TOPIC_DELTA1 = new TopicDelta(new TopicImage("foo", FOO_ID, Collections.emptyMap()));
TOPIC_DELTA1.replay((PartitionRecord) fakePartitionRegistration(NORMAL).
toRecord(FOO_ID, 0).message());
TOPIC_DELTA1.replay((PartitionRecord) fakePartitionRegistration(NORMAL).
toRecord(FOO_ID, 1).message());
TOPIC_DELTA1.replay((PartitionRecord) fakePartitionRegistration(NORMAL).
toRecord(FOO_ID, 2).message());
TOPIC_DELTA1.replay((PartitionRecord) fakePartitionRegistration(NON_PREFERRED_LEADER).
toRecord(FOO_ID, 3).message());
TOPIC_DELTA1.replay((PartitionRecord) fakePartitionRegistration(NON_PREFERRED_LEADER).
toRecord(FOO_ID, 4).message());
TOPIC_DELTA2 = new TopicDelta(TOPIC_DELTA1.apply());
TOPIC_DELTA2.replay(new PartitionChangeRecord().
setPartitionId(1).
setTopicId(FOO_ID).
setLeader(1));
TOPIC_DELTA2.replay((PartitionRecord) fakePartitionRegistration(NORMAL).
toRecord(FOO_ID, 5).message());
}
@Test
public void testHandleNewTopic() {
ControllerMetricsChanges changes = new ControllerMetricsChanges();
changes.handleTopicChange(null, TOPIC_DELTA1);
assertEquals(1, changes.globalTopicsChange());
assertEquals(5, changes.globalPartitionsChange());
assertEquals(0, changes.offlinePartitionsChange());
assertEquals(2, changes.partitionsWithoutPreferredLeaderChange());
}
@Test
public void testTopicChange() {
ControllerMetricsChanges changes = new ControllerMetricsChanges();
changes.handleTopicChange(TOPIC_DELTA2.image(), TOPIC_DELTA2);
assertEquals(0, changes.globalTopicsChange());
assertEquals(1, changes.globalPartitionsChange());
assertEquals(0, changes.offlinePartitionsChange());
assertEquals(1, changes.partitionsWithoutPreferredLeaderChange());
}
}

View File

@ -0,0 +1,110 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.controller.metrics;
import com.yammer.metrics.core.MetricsRegistry;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.image.TopicImage;
import org.apache.kafka.image.TopicsImage;
import org.apache.kafka.metadata.LeaderRecoveryState;
import org.apache.kafka.metadata.PartitionRegistration;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class ControllerMetricsTestUtils {
public static void assertMetricsForTypeEqual(
MetricsRegistry registry,
String expectedPrefix,
Set<String> expected
) {
Set<String> actual = new TreeSet<>();
registry.allMetrics().forEach((name, __) -> {
StringBuilder bld = new StringBuilder();
bld.append(name.getGroup());
bld.append(":type=").append(name.getType());
bld.append(",name=").append(name.getName());
if (bld.toString().startsWith(expectedPrefix)) {
actual.add(bld.toString());
}
});
assertEquals(new TreeSet<>(expected), actual);
}
enum FakePartitionRegistrationType {
NORMAL,
NON_PREFERRED_LEADER,
OFFLINE
}
public static PartitionRegistration fakePartitionRegistration(
FakePartitionRegistrationType type
) {
int leader = 0;
switch (type) {
case NORMAL:
leader = 0;
break;
case NON_PREFERRED_LEADER:
leader = 1;
break;
case OFFLINE:
leader = -1;
break;
}
return new PartitionRegistration(
new int[] {0, 1, 2},
new int[] {0, 1, 2},
new int[] {},
new int[] {},
leader,
LeaderRecoveryState.RECOVERED,
100,
200);
}
public static TopicImage fakeTopicImage(
String topicName,
Uuid topicId,
PartitionRegistration... registrations
) {
Map<Integer, PartitionRegistration> partitions = new HashMap<>();
int i = 0;
for (PartitionRegistration registration : registrations) {
partitions.put(i, registration);
i++;
}
return new TopicImage(topicName, topicId, partitions);
}
public static TopicsImage fakeTopicsImage(
TopicImage... topics
) {
Map<Uuid, TopicImage> topicsById = new HashMap<>();
Map<String, TopicImage> topicsByName = new HashMap<>();
for (TopicImage topic : topics) {
topicsById.put(topic.id(), topic);
topicsByName.put(topic.name(), topic);
}
return new TopicsImage(topicsById, topicsByName);
}
}

View File

@ -0,0 +1,133 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.controller.metrics;
import com.yammer.metrics.core.Gauge;
import com.yammer.metrics.core.Histogram;
import com.yammer.metrics.core.MetricName;
import com.yammer.metrics.core.MetricsRegistry;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Optional;
import org.apache.kafka.common.utils.MockTime;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class QuorumControllerMetricsTest {
@Test
public void testMetricNames() {
MetricsRegistry registry = new MetricsRegistry();
MockTime time = new MockTime();
try {
try (QuorumControllerMetrics metrics = new QuorumControllerMetrics(Optional.of(registry), time)) {
ControllerMetricsTestUtils.assertMetricsForTypeEqual(registry, "kafka.controller",
new HashSet<>(Arrays.asList(
"kafka.controller:type=ControllerEventManager,name=EventQueueTimeMs",
"kafka.controller:type=ControllerEventManager,name=EventQueueProcessingTimeMs",
"kafka.controller:type=KafkaController,name=ActiveControllerCount",
"kafka.controller:type=KafkaController,name=LastCommittedRecordOffset",
"kafka.controller:type=KafkaController,name=LastAppliedRecordLagMs",
"kafka.controller:type=KafkaController,name=LastAppliedRecordOffset",
"kafka.controller:type=KafkaController,name=LastAppliedRecordTimestamp",
"kafka.controller:type=KafkaController,name=LastAppliedRecordOffset"
)));
}
ControllerMetricsTestUtils.assertMetricsForTypeEqual(registry, "kafka.controller",
Collections.emptySet());
} finally {
registry.shutdown();
}
}
@Test
public void testUpdateEventQueueTime() {
MetricsRegistry registry = new MetricsRegistry();
MockTime time = new MockTime();
try (QuorumControllerMetrics metrics = new QuorumControllerMetrics(Optional.of(registry), time)) {
metrics.updateEventQueueTime(1000);
assertMetricHistogram(registry, metricName("ControllerEventManager", "EventQueueTimeMs"), 1, 1000);
} finally {
registry.shutdown();
}
}
@Test
public void testUpdateEventQueueProcessingTime() {
MetricsRegistry registry = new MetricsRegistry();
MockTime time = new MockTime();
try (QuorumControllerMetrics metrics = new QuorumControllerMetrics(Optional.of(registry), time)) {
metrics.updateEventQueueProcessingTime(1000);
assertMetricHistogram(registry, metricName("ControllerEventManager", "EventQueueProcessingTimeMs"), 1, 1000);
} finally {
registry.shutdown();
}
}
@Test
public void testLastAppliedRecordMetrics() {
MetricsRegistry registry = new MetricsRegistry();
MockTime time = new MockTime();
time.sleep(1000);
try (QuorumControllerMetrics metrics = new QuorumControllerMetrics(Optional.of(registry), time)) {
metrics.setLastAppliedRecordOffset(100);
metrics.setLastAppliedRecordTimestamp(500);
metrics.setLastCommittedRecordOffset(50);
@SuppressWarnings("unchecked")
Gauge<Long> lastAppliedRecordOffset = (Gauge<Long>) registry
.allMetrics()
.get(metricName("KafkaController", "LastAppliedRecordOffset"));
assertEquals(100, lastAppliedRecordOffset.value());
@SuppressWarnings("unchecked")
Gauge<Long> lastAppliedRecordTimestamp = (Gauge<Long>) registry
.allMetrics()
.get(metricName("KafkaController", "LastAppliedRecordTimestamp"));
assertEquals(500, lastAppliedRecordTimestamp.value());
@SuppressWarnings("unchecked")
Gauge<Long> lastAppliedRecordLagMs = (Gauge<Long>) registry
.allMetrics()
.get(metricName("KafkaController", "LastAppliedRecordLagMs"));
assertEquals(time.milliseconds() - 500, lastAppliedRecordLagMs.value());
@SuppressWarnings("unchecked")
Gauge<Long> lastCommittedRecordOffset = (Gauge<Long>) registry
.allMetrics()
.get(metricName("KafkaController", "LastCommittedRecordOffset"));
assertEquals(50, lastCommittedRecordOffset.value());
} finally {
registry.shutdown();
}
}
private static void assertMetricHistogram(MetricsRegistry registry, MetricName metricName, long count, double sum) {
Histogram histogram = (Histogram) registry.allMetrics().get(metricName);
assertEquals(count, histogram.count());
assertEquals(sum, histogram.sum(), .1);
}
private static MetricName metricName(String type, String name) {
String mBeanName = String.format("kafka.controller:type=%s,name=%s", type, name);
return new MetricName("kafka.controller", type, name, null, mBeanName);
}
}