KAFKA-15883: Implement RemoteCopyLagBytes (#14832)

This pull request implements the first in the list of metrics in KIP-963: Additional metrics in Tiered Storage.

Since each partition of a topic will be serviced by its own RLMTask we need an aggregator object for a topic. The aggregator object in this pull request is BrokerTopicAggregatedMetric. Since the RemoteCopyLagBytes is a gauge I have introduced a new GaugeWrapper. The GaugeWrapper is used by the metrics collection system to interact with the BrokerTopicAggregatedMetric. The RemoteLogManager interacts with the BrokerTopicAggregatedMetric directly.

Reviewers: Luke Chen <showuon@gmail.com>, Satish Duggana <satishd@apache.org>, Kamal Chandraprakash<kamal.chandraprakash@gmail.com>
This commit is contained in:
Christo Lolov 2023-12-14 01:21:37 +00:00 committed by GitHub
parent a1e985d22f
commit a87e86e015
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 307 additions and 8 deletions

View File

@ -341,6 +341,8 @@ public class RemoteLogManager implements Closeable {
leaderPartitions.forEach(this::cacheTopicPartitionIds);
followerPartitions.forEach(this::cacheTopicPartitionIds);
followerPartitions.forEach(
topicIdPartition -> brokerTopicStats.topicStats(topicIdPartition.topic()).removeRemoteCopyBytesLag(topicIdPartition.partition()));
remoteLogMetadataManager.onPartitionLeadershipChanges(leaderPartitions, followerPartitions);
followerPartitions.forEach(topicIdPartition ->
@ -373,6 +375,9 @@ public class RemoteLogManager implements Closeable {
LOGGER.info("Cancelling the RLM task for tpId: {}", tpId);
task.cancel();
}
brokerTopicStats.topicStats(tp.topic()).removeRemoteCopyBytesLag(tp.partition());
if (stopPartition.deleteRemoteLog()) {
LOGGER.info("Deleting the remote log segments task for partition: {}", tpId);
deleteRemoteLogPartition(tpId);
@ -778,6 +783,11 @@ public class RemoteLogManager implements Closeable {
// are not deleted before they are copied to remote storage.
log.updateHighestOffsetInRemoteStorage(endOffset);
logger.info("Copied {} to remote storage with segment-id: {}", logFileName, copySegmentFinishedRlsm.remoteLogSegmentId());
long bytesLag = log.onlyLocalLogSegmentsSize() - log.activeSegment().size();
String topic = topicIdPartition.topic();
int partition = topicIdPartition.partition();
brokerTopicStats.topicStats(topic).recordRemoteCopyBytesLag(partition, bytesLag);
}
private Path toPathIfExists(File file) {

View File

@ -21,9 +21,9 @@ import kafka.network._
import kafka.utils._
import kafka.server.KafkaRequestHandler.{threadCurrentRequest, threadRequestChannel}
import java.util.concurrent.{CountDownLatch, TimeUnit}
import java.util.concurrent.{ConcurrentHashMap, CountDownLatch, TimeUnit}
import java.util.concurrent.atomic.AtomicInteger
import com.yammer.metrics.core.Meter
import com.yammer.metrics.core.{Gauge, Meter}
import org.apache.kafka.common.internals.FatalExitError
import org.apache.kafka.common.utils.{KafkaThread, Time}
import org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics
@ -283,8 +283,31 @@ class BrokerTopicMetrics(name: Option[String], configOpt: java.util.Optional[Kaf
meter()
}
case class GaugeWrapper(metricType: String, brokerTopicAggregatedMetric: BrokerTopicAggregatedMetric) {
@volatile private var gaugeObject: Gauge[Long] = _
final private val gaugeLock = new Object
def gauge(): Gauge[Long] = gaugeLock synchronized {
if (gaugeObject == null) {
gaugeObject = metricsGroup.newGauge(metricType, () => brokerTopicAggregatedMetric.value())
}
return gaugeObject
}
def close(): Unit = gaugeLock synchronized {
if (gaugeObject != null) {
metricsGroup.removeMetric(metricType)
brokerTopicAggregatedMetric.close()
gaugeObject = null
}
}
gauge()
}
// an internal map for "lazy initialization" of certain metrics
private val metricTypeMap = new Pool[String, MeterWrapper]()
private val metricGaugeTypeMap = new Pool[String, GaugeWrapper]()
metricTypeMap.putAll(Map(
BrokerTopicStats.MessagesInPerSec -> MeterWrapper(BrokerTopicStats.MessagesInPerSec, "messages"),
BrokerTopicStats.BytesInPerSec -> MeterWrapper(BrokerTopicStats.BytesInPerSec, "bytes"),
@ -319,11 +342,16 @@ class BrokerTopicMetrics(name: Option[String], configOpt: java.util.Optional[Kaf
RemoteStorageMetrics.FAILED_REMOTE_FETCH_PER_SEC_METRIC.getName -> MeterWrapper(RemoteStorageMetrics.FAILED_REMOTE_FETCH_PER_SEC_METRIC.getName, "requests"),
RemoteStorageMetrics.FAILED_REMOTE_COPY_PER_SEC_METRIC.getName -> MeterWrapper(RemoteStorageMetrics.FAILED_REMOTE_COPY_PER_SEC_METRIC.getName, "requests")
).asJava)
metricGaugeTypeMap.putAll(Map(
RemoteStorageMetrics.REMOTE_COPY_LOG_BYTES_METRIC.getName -> GaugeWrapper(RemoteStorageMetrics.REMOTE_COPY_LOG_BYTES_METRIC.getName, new BrokerTopicAggregatedMetric)
).asJava)
})
// used for testing only
def metricMap: Map[String, MeterWrapper] = metricTypeMap.toMap
def metricGaugeMap: Map[String, GaugeWrapper] = metricGaugeTypeMap.toMap
def messagesInRate: Meter = metricTypeMap.get(BrokerTopicStats.MessagesInPerSec).meter()
def bytesInRate: Meter = metricTypeMap.get(BrokerTopicStats.BytesInPerSec).meter()
@ -368,6 +396,18 @@ class BrokerTopicMetrics(name: Option[String], configOpt: java.util.Optional[Kaf
def invalidOffsetOrSequenceRecordsPerSec: Meter = metricTypeMap.get(BrokerTopicStats.InvalidOffsetOrSequenceRecordsPerSec).meter()
def recordRemoteCopyBytesLag(partition: Int, bytesLag: Long): Unit = {
val brokerTopicAggregatedMetric = metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_COPY_LOG_BYTES_METRIC.getName).brokerTopicAggregatedMetric
brokerTopicAggregatedMetric.setPartitionMetricValue(partition, bytesLag)
}
def removeRemoteCopyBytesLag(partition: Int): Unit = {
val brokerTopicAggregatedMetric = metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_COPY_LOG_BYTES_METRIC.getName).brokerTopicAggregatedMetric
brokerTopicAggregatedMetric.removePartition(partition)
}
def remoteCopyBytesLag: Long = metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_COPY_LOG_BYTES_METRIC.getName).brokerTopicAggregatedMetric.value()
def remoteCopyBytesRate: Meter = metricTypeMap.get(RemoteStorageMetrics.REMOTE_COPY_BYTES_PER_SEC_METRIC.getName).meter()
def remoteFetchBytesRate: Meter = metricTypeMap.get(RemoteStorageMetrics.REMOTE_FETCH_BYTES_PER_SEC_METRIC.getName).meter()
@ -384,9 +424,31 @@ class BrokerTopicMetrics(name: Option[String], configOpt: java.util.Optional[Kaf
val meter = metricTypeMap.get(metricType)
if (meter != null)
meter.close()
val gauge = metricGaugeTypeMap.get(metricType)
if (gauge != null)
gauge.close()
}
def close(): Unit = metricTypeMap.values.foreach(_.close())
def close(): Unit = {
metricTypeMap.values.foreach(_.close())
metricGaugeTypeMap.values.foreach(_.close())
}
}
class BrokerTopicAggregatedMetric() {
private val partitionMetricValues = new ConcurrentHashMap[Int, Long]()
def setPartitionMetricValue(partition: Int, partitionValue: Long): Unit = {
partitionMetricValues.put(partition, partitionValue)
}
def removePartition(partition: Int): Option[Long] = {
Option.apply(partitionMetricValues.remove(partition))
}
def value(): Long = partitionMetricValues.values().stream().mapToLong(v => v).sum()
def close(): Unit = partitionMetricValues.clear()
}
object BrokerTopicStats {
@ -462,6 +524,7 @@ class BrokerTopicStats(configOpt: java.util.Optional[KafkaConfig] = java.util.Op
topicMetrics.closeMetric(RemoteStorageMetrics.REMOTE_COPY_REQUESTS_PER_SEC_METRIC.getName)
topicMetrics.closeMetric(RemoteStorageMetrics.FAILED_REMOTE_FETCH_PER_SEC_METRIC.getName)
topicMetrics.closeMetric(RemoteStorageMetrics.FAILED_REMOTE_COPY_PER_SEC_METRIC.getName)
topicMetrics.closeMetric(RemoteStorageMetrics.REMOTE_COPY_LOG_BYTES_METRIC.getName)
}
}

View File

@ -101,6 +101,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Properties;
@ -699,15 +700,105 @@ public class RemoteLogManagerTest {
Partition mockFollowerPartition = mockPartition(followerTopicIdPartition);
// before running tasks, the remote log manager tasks should be all idle
assertEquals(1.0, yammerMetricValue("RemoteLogManagerTasksAvgIdlePercent"));
assertEquals(1.0, (double) yammerMetricValue("RemoteLogManagerTasksAvgIdlePercent"));
remoteLogManager.onLeadershipChange(Collections.singleton(mockLeaderPartition), Collections.singleton(mockFollowerPartition), topicIds);
assertTrue(yammerMetricValue("RemoteLogManagerTasksAvgIdlePercent") < 1.0);
assertTrue((double) yammerMetricValue("RemoteLogManagerTasksAvgIdlePercent") < 1.0);
// unlock copyLogSegmentData
latch.countDown();
}
private double yammerMetricValue(String name) {
Gauge<Double> gauge = (Gauge) KafkaYammerMetrics.defaultRegistry().allMetrics().entrySet().stream()
@Test
void testRemoteLogManagerRemoteCopyLagBytes() throws Exception {
long oldestSegmentStartOffset = 0L;
long olderSegmentStartOffset = 75L;
long nextSegmentStartOffset = 150L;
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
// leader epoch preparation
checkpoint.write(totalEpochEntries);
LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint);
when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class), anyInt())).thenReturn(Optional.of(0L));
File tempFile = TestUtils.tempFile();
File mockProducerSnapshotIndex = TestUtils.tempFile();
File tempDir = TestUtils.tempDirectory();
// create 3 log segments, with 0, 75 and 150 as log start offset
LogSegment oldestSegment = mock(LogSegment.class);
LogSegment olderSegment = mock(LogSegment.class);
LogSegment activeSegment = mock(LogSegment.class);
when(oldestSegment.baseOffset()).thenReturn(oldestSegmentStartOffset);
when(olderSegment.baseOffset()).thenReturn(olderSegmentStartOffset);
when(activeSegment.baseOffset()).thenReturn(nextSegmentStartOffset);
FileRecords oldestFileRecords = mock(FileRecords.class);
when(oldestSegment.log()).thenReturn(oldestFileRecords);
when(oldestFileRecords.file()).thenReturn(tempFile);
when(oldestFileRecords.sizeInBytes()).thenReturn(10);
when(oldestSegment.readNextOffset()).thenReturn(olderSegmentStartOffset);
FileRecords olderFileRecords = mock(FileRecords.class);
when(olderSegment.log()).thenReturn(olderFileRecords);
when(olderFileRecords.file()).thenReturn(tempFile);
when(olderFileRecords.sizeInBytes()).thenReturn(10);
when(olderSegment.readNextOffset()).thenReturn(nextSegmentStartOffset);
when(mockLog.activeSegment()).thenReturn(activeSegment);
when(mockLog.logStartOffset()).thenReturn(oldestSegmentStartOffset);
when(mockLog.logSegments(anyLong(), anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(oldestSegment, olderSegment, activeSegment)));
ProducerStateManager mockStateManager = mock(ProducerStateManager.class);
when(mockLog.producerStateManager()).thenReturn(mockStateManager);
when(mockStateManager.fetchSnapshot(anyLong())).thenReturn(Optional.of(mockProducerSnapshotIndex));
when(mockLog.lastStableOffset()).thenReturn(250L);
OffsetIndex oldestIdx = LazyIndex.forOffset(LogFileUtils.offsetIndexFile(tempDir, oldestSegmentStartOffset, ""), oldestSegmentStartOffset, 1000).get();
TimeIndex oldestTimeIdx = LazyIndex.forTime(LogFileUtils.timeIndexFile(tempDir, oldestSegmentStartOffset, ""), oldestSegmentStartOffset, 1500).get();
File oldestTxnFile = UnifiedLog.transactionIndexFile(tempDir, oldestSegmentStartOffset, "");
oldestTxnFile.createNewFile();
TransactionIndex oldestTxnIndex = new TransactionIndex(oldestSegmentStartOffset, oldestTxnFile);
when(oldestSegment.timeIndex()).thenReturn(oldestTimeIdx);
when(oldestSegment.offsetIndex()).thenReturn(oldestIdx);
when(oldestSegment.txnIndex()).thenReturn(oldestTxnIndex);
OffsetIndex olderIdx = LazyIndex.forOffset(LogFileUtils.offsetIndexFile(tempDir, olderSegmentStartOffset, ""), olderSegmentStartOffset, 1000).get();
TimeIndex olderTimeIdx = LazyIndex.forTime(LogFileUtils.timeIndexFile(tempDir, olderSegmentStartOffset, ""), olderSegmentStartOffset, 1500).get();
File olderTxnFile = UnifiedLog.transactionIndexFile(tempDir, olderSegmentStartOffset, "");
oldestTxnFile.createNewFile();
TransactionIndex olderTxnIndex = new TransactionIndex(olderSegmentStartOffset, olderTxnFile);
when(olderSegment.timeIndex()).thenReturn(olderTimeIdx);
when(olderSegment.offsetIndex()).thenReturn(olderIdx);
when(olderSegment.txnIndex()).thenReturn(olderTxnIndex);
CompletableFuture<Void> dummyFuture = new CompletableFuture<>();
dummyFuture.complete(null);
when(remoteLogMetadataManager.addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class))).thenReturn(dummyFuture);
when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class))).thenReturn(dummyFuture);
CountDownLatch latch = new CountDownLatch(1);
doAnswer(ans -> Optional.empty()).doAnswer(ans -> {
// waiting for verification
latch.await();
return Optional.empty();
}).when(remoteStorageManager).copyLogSegmentData(any(RemoteLogSegmentMetadata.class), any(LogSegmentData.class));
Partition mockLeaderPartition = mockPartition(leaderTopicIdPartition);
when(mockLog.onlyLocalLogSegmentsSize()).thenReturn(175L, 100L);
when(activeSegment.size()).thenReturn(100);
// before running tasks, the metric should not be registered
assertThrows(NoSuchElementException.class, () -> yammerMetricValue("RemoteCopyLagBytes"));
remoteLogManager.onLeadershipChange(Collections.singleton(mockLeaderPartition), Collections.emptySet(), topicIds);
TestUtils.waitForCondition(
() -> 75 == safeLongYammerMetricValue("RemoteCopyLagBytes"),
String.format("Expected to find 75 for RemoteCopyLagBytes metric value, but found %d", safeLongYammerMetricValue("RemoteCopyLagBytes")));
// unlock copyLogSegmentData
latch.countDown();
}
private Object yammerMetricValue(String name) {
Gauge gauge = (Gauge) KafkaYammerMetrics.defaultRegistry().allMetrics().entrySet().stream()
.filter(e -> e.getKey().getMBeanName().contains(name))
.findFirst()
.get()
@ -715,6 +806,14 @@ public class RemoteLogManagerTest {
return gauge.value();
}
private long safeLongYammerMetricValue(String name) {
try {
return (long) yammerMetricValue(name);
} catch (NoSuchElementException ex) {
return 0L;
}
}
@Test
void testMetricsUpdateOnCopyLogSegmentsFailure() throws Exception {
long oldSegmentStartOffset = 0L;

View File

@ -320,16 +320,28 @@ class MetricsTest extends IntegrationTestHarness with SaslSetup {
assertTrue(metrics.isEmpty, s"$errorMessage: ${metrics.keys}")
}
private def fromNameToBrokerTopicStatsMBean(name: String): String = {
s"kafka.server:type=BrokerTopicMetrics,name=$name"
}
private def verifyRemoteStorageMetrics(shouldContainMetrics: Boolean): Unit = {
val metrics = RemoteStorageMetrics.allMetrics().asScala.filter(name =>
KafkaYammerMetrics.defaultRegistry.allMetrics.asScala.find(metric => {
metric._1.getMBeanName().equals(name.getMBeanName)
}).isDefined
).toList
val aggregatedBrokerTopicStats = Set(RemoteStorageMetrics.REMOTE_COPY_LOG_BYTES_METRIC.getName)
val aggregatedBrokerTopicMetrics = aggregatedBrokerTopicStats.filter(name =>
KafkaYammerMetrics.defaultRegistry().allMetrics().asScala.find(metric => {
metric._1.getMBeanName().equals(fromNameToBrokerTopicStatsMBean(name))
}).isDefined
).toList
if (shouldContainMetrics) {
assertEquals(RemoteStorageMetrics.allMetrics().size(), metrics.size, s"Only $metrics appear in the metrics")
assertEquals(aggregatedBrokerTopicStats.size, aggregatedBrokerTopicMetrics.size, s"Only $aggregatedBrokerTopicMetrics appear in the metrics")
} else {
assertEquals(0, metrics.size, s"$metrics should not appear in the metrics")
assertEquals(0, aggregatedBrokerTopicMetrics.size, s"$aggregatedBrokerTopicMetrics should not appear in the metrics")
}
}
}

View File

@ -193,13 +193,25 @@ class KafkaRequestHandlerTest {
props.setProperty(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, systemRemoteStorageEnabled.toString)
val brokerTopicStats = new BrokerTopicStats(java.util.Optional.of(KafkaConfig.fromProps(props)))
brokerTopicStats.topicStats(topic)
val gaugeMetrics = Set(RemoteStorageMetrics.REMOTE_COPY_LOG_BYTES_METRIC.getName)
RemoteStorageMetrics.brokerTopicStatsMetrics.forEach(metric => {
if (systemRemoteStorageEnabled) {
assertTrue(brokerTopicStats.topicStats(topic).metricMap.contains(metric.getName))
if (!gaugeMetrics.contains(metric.getName)) {
assertTrue(brokerTopicStats.topicStats(topic).metricMap.contains(metric.getName))
} else {
assertFalse(brokerTopicStats.topicStats(topic).metricMap.contains(metric.getName))
}
} else {
assertFalse(brokerTopicStats.topicStats(topic).metricMap.contains(metric.getName))
}
})
gaugeMetrics.foreach(metricName => {
if (systemRemoteStorageEnabled) {
assertTrue(brokerTopicStats.topicStats(topic).metricGaugeMap.contains(metricName), metricName)
} else {
assertFalse(brokerTopicStats.topicStats(topic).metricGaugeMap.contains(metricName), metricName)
}
})
}
def makeRequest(time: Time, metrics: RequestChannel.Metrics): RequestChannel.Request = {
@ -213,4 +225,98 @@ class KafkaRequestHandlerTest {
new RequestChannel.Request(0, context, time.nanoseconds(),
mock(classOf[MemoryPool]), ByteBuffer.allocate(0), metrics)
}
def setupBrokerTopicMetrics(systemRemoteStorageEnabled: Boolean = true): BrokerTopicMetrics = {
val topic = "topic"
val props = kafka.utils.TestUtils.createDummyBrokerConfig()
props.setProperty(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, systemRemoteStorageEnabled.toString)
new BrokerTopicMetrics(Option.apply(topic), java.util.Optional.of(KafkaConfig.fromProps(props)))
}
@ParameterizedTest
@ValueSource(booleans = Array(true, false))
def testSingularCopyLagBytesMetric(systemRemoteStorageEnabled: Boolean): Unit = {
val brokerTopicMetrics = setupBrokerTopicMetrics(systemRemoteStorageEnabled)
if (systemRemoteStorageEnabled) {
brokerTopicMetrics.recordRemoteCopyBytesLag(0, 100);
brokerTopicMetrics.recordRemoteCopyBytesLag(1, 150);
brokerTopicMetrics.recordRemoteCopyBytesLag(2, 250);
assertEquals(500, brokerTopicMetrics.remoteCopyBytesLag)
} else {
assertEquals(None, brokerTopicMetrics.metricGaugeMap.get(RemoteStorageMetrics.REMOTE_COPY_LOG_BYTES_METRIC.getName))
}
}
@Test
def testMultipleCopyLagBytesMetrics(): Unit = {
val brokerTopicMetrics = setupBrokerTopicMetrics()
brokerTopicMetrics.recordRemoteCopyBytesLag(0, 1);
brokerTopicMetrics.recordRemoteCopyBytesLag(1, 2);
brokerTopicMetrics.recordRemoteCopyBytesLag(2, 3);
brokerTopicMetrics.recordRemoteCopyBytesLag(0, 4);
brokerTopicMetrics.recordRemoteCopyBytesLag(1, 5);
brokerTopicMetrics.recordRemoteCopyBytesLag(2, 6);
assertEquals(15, brokerTopicMetrics.remoteCopyBytesLag)
}
@Test
def testCopyLagBytesMetricWithPartitionExpansion(): Unit = {
val brokerTopicMetrics = setupBrokerTopicMetrics()
brokerTopicMetrics.recordRemoteCopyBytesLag(0, 1);
brokerTopicMetrics.recordRemoteCopyBytesLag(1, 2);
assertEquals(3, brokerTopicMetrics.remoteCopyBytesLag)
brokerTopicMetrics.recordRemoteCopyBytesLag(2, 3);
assertEquals(6, brokerTopicMetrics.remoteCopyBytesLag)
}
@Test
def testCopyLagBytesMetricWithPartitionShrinking(): Unit = {
val brokerTopicMetrics = setupBrokerTopicMetrics()
brokerTopicMetrics.recordRemoteCopyBytesLag(0, 1);
brokerTopicMetrics.recordRemoteCopyBytesLag(1, 2);
assertEquals(3, brokerTopicMetrics.remoteCopyBytesLag)
brokerTopicMetrics.removeRemoteCopyBytesLag(1);
assertEquals(1, brokerTopicMetrics.remoteCopyBytesLag)
}
@Test
def testCopyLagBytesMetricWithRemovingNonexistentPartitions(): Unit = {
val brokerTopicMetrics = setupBrokerTopicMetrics()
brokerTopicMetrics.recordRemoteCopyBytesLag(0, 1);
brokerTopicMetrics.recordRemoteCopyBytesLag(1, 2);
assertEquals(3, brokerTopicMetrics.remoteCopyBytesLag)
brokerTopicMetrics.removeRemoteCopyBytesLag(3);
assertEquals(3, brokerTopicMetrics.remoteCopyBytesLag)
}
@Test
def testCopyLagBytesMetricClear(): Unit = {
val brokerTopicMetrics = setupBrokerTopicMetrics()
brokerTopicMetrics.recordRemoteCopyBytesLag(0, 1);
brokerTopicMetrics.recordRemoteCopyBytesLag(1, 2);
assertEquals(3, brokerTopicMetrics.remoteCopyBytesLag)
brokerTopicMetrics.close()
assertEquals(0, brokerTopicMetrics.remoteCopyBytesLag)
}
}

View File

@ -148,6 +148,7 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
// The broker metrics for all topics should be greedily registered
assertTrue(topicMetrics(None).nonEmpty, "General topic metrics don't exist")
assertEquals(brokers.head.brokerTopicStats.allTopicsStats.metricMap.size, topicMetrics(None).size)
assertEquals(0, brokers.head.brokerTopicStats.allTopicsStats.metricGaugeMap.size)
// topic metrics should be lazily registered
assertTrue(topicMetricGroups(topic).isEmpty, "Topic metrics aren't lazily registered")
TestUtils.generateAndProduceMessages(brokers, topic, nMessages)

View File

@ -41,6 +41,7 @@ public class RemoteStorageMetrics {
private static final String REMOTE_COPY_REQUESTS_PER_SEC = "RemoteCopyRequestsPerSec";
private static final String FAILED_REMOTE_FETCH_PER_SEC = "RemoteFetchErrorsPerSec";
private static final String FAILED_REMOTE_COPY_PER_SEC = "RemoteCopyErrorsPerSec";
private static final String REMOTE_COPY_LAG_BYTES = "RemoteCopyLagBytes";
private static final String REMOTE_LOG_READER_TASK_QUEUE_SIZE = REMOTE_LOG_READER_METRICS_NAME_PREFIX + TASK_QUEUE_SIZE;
private static final String REMOTE_LOG_READER_AVG_IDLE_PERCENT = REMOTE_LOG_READER_METRICS_NAME_PREFIX + AVG_IDLE_PERCENT;
public static final Set<String> REMOTE_STORAGE_THREAD_POOL_METRICS = Collections.unmodifiableSet(
@ -58,6 +59,8 @@ public class RemoteStorageMetrics {
"kafka.server", "BrokerTopicMetrics", FAILED_REMOTE_FETCH_PER_SEC);
public final static MetricName FAILED_REMOTE_COPY_PER_SEC_METRIC = getMetricName(
"kafka.server", "BrokerTopicMetrics", FAILED_REMOTE_COPY_PER_SEC);
public final static MetricName REMOTE_COPY_LOG_BYTES_METRIC = getMetricName(
"kafka.server", "BrokerTopicMetrics", REMOTE_COPY_LAG_BYTES);
public final static MetricName REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC = getMetricName(
"kafka.log.remote", "RemoteLogManager", REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT);
public final static MetricName REMOTE_LOG_READER_TASK_QUEUE_SIZE_METRIC = getMetricName(
@ -67,12 +70,14 @@ public class RemoteStorageMetrics {
public static Set<MetricName> allMetrics() {
Set<MetricName> metrics = new HashSet<>();
metrics.add(REMOTE_COPY_BYTES_PER_SEC_METRIC);
metrics.add(REMOTE_FETCH_BYTES_PER_SEC_METRIC);
metrics.add(REMOTE_FETCH_REQUESTS_PER_SEC_METRIC);
metrics.add(REMOTE_COPY_REQUESTS_PER_SEC_METRIC);
metrics.add(FAILED_REMOTE_FETCH_PER_SEC_METRIC);
metrics.add(FAILED_REMOTE_COPY_PER_SEC_METRIC);
metrics.add(REMOTE_COPY_LOG_BYTES_METRIC);
metrics.add(REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC);
metrics.add(REMOTE_LOG_READER_AVG_IDLE_PERCENT_METRIC);
metrics.add(REMOTE_LOG_READER_TASK_QUEUE_SIZE_METRIC);
@ -82,15 +87,18 @@ public class RemoteStorageMetrics {
public static Set<MetricName> brokerTopicStatsMetrics() {
Set<MetricName> metrics = new HashSet<>();
metrics.add(REMOTE_COPY_BYTES_PER_SEC_METRIC);
metrics.add(REMOTE_FETCH_BYTES_PER_SEC_METRIC);
metrics.add(REMOTE_FETCH_REQUESTS_PER_SEC_METRIC);
metrics.add(REMOTE_COPY_REQUESTS_PER_SEC_METRIC);
metrics.add(FAILED_REMOTE_FETCH_PER_SEC_METRIC);
metrics.add(FAILED_REMOTE_COPY_PER_SEC_METRIC);
metrics.add(REMOTE_COPY_LOG_BYTES_METRIC);
return metrics;
}
private static MetricName getMetricName(String group, String type, String name) {
return KafkaYammerMetrics.getMetricName(group, type, name);
}