KAFKA-15852: Move LinuxIoMetricsCollector to server module (#16178)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Mickael Maison 2024-06-04 16:42:35 +02:00 committed by GitHub
parent 16359e70d3
commit 55d38efcc5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 210 additions and 192 deletions

View File

@ -1,101 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.metrics
import java.nio.file.{Files, Path, Paths}
import org.apache.kafka.common.utils.Time
import org.slf4j.Logger
import java.nio.charset.StandardCharsets
import scala.jdk.CollectionConverters._
/**
* Retrieves Linux /proc/self/io metrics.
*/
class LinuxIoMetricsCollector(procRoot: String, val time: Time, val logger: Logger) {
import LinuxIoMetricsCollector._
private var lastUpdateMs = -1L
private var cachedReadBytes = 0L
private var cachedWriteBytes = 0L
val path: Path = Paths.get(procRoot, "self", "io")
def readBytes(): Long = this.synchronized {
val curMs = time.milliseconds()
if (curMs != lastUpdateMs) {
updateValues(curMs)
}
cachedReadBytes
}
def writeBytes(): Long = this.synchronized {
val curMs = time.milliseconds()
if (curMs != lastUpdateMs) {
updateValues(curMs)
}
cachedWriteBytes
}
/**
* Read /proc/self/io.
*
* Generally, each line in this file contains a prefix followed by a colon and a number.
*
* For example, it might contain this:
* rchar: 4052
* wchar: 0
* syscr: 13
* syscw: 0
* read_bytes: 0
* write_bytes: 0
* cancelled_write_bytes: 0
*/
private def updateValues(now: Long): Boolean = this.synchronized {
try {
cachedReadBytes = -1
cachedWriteBytes = -1
val lines = Files.readAllLines(path, StandardCharsets.UTF_8).asScala
lines.foreach(line => {
if (line.startsWith(READ_BYTES_PREFIX)) {
cachedReadBytes = line.substring(READ_BYTES_PREFIX.length).toLong
} else if (line.startsWith(WRITE_BYTES_PREFIX)) {
cachedWriteBytes = line.substring(WRITE_BYTES_PREFIX.length).toLong
}
})
lastUpdateMs = now
true
} catch {
case t: Throwable =>
logger.warn("Unable to update IO metrics", t)
false
}
}
def usable(): Boolean = {
if (path.toFile.exists()) {
updateValues(time.milliseconds())
} else {
logger.debug(s"disabling IO metrics collection because $path does not exist.")
false
}
}
}
object LinuxIoMetricsCollector {
private val READ_BYTES_PREFIX = "read_bytes: "
private val WRITE_BYTES_PREFIX = "write_bytes: "
}

View File

@ -17,7 +17,6 @@
package kafka.server
import kafka.metrics.LinuxIoMetricsCollector
import kafka.migration.MigrationPropagator
import kafka.network.{DataPlaneAcceptor, SocketServer}
import kafka.raft.KafkaRaftManager
@ -48,7 +47,7 @@ import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.config.ServerLogConfigs.{ALTER_CONFIG_POLICY_CLASS_NAME_CONFIG, CREATE_TOPIC_POLICY_CLASS_NAME_CONFIG}
import org.apache.kafka.server.common.ApiMessageAndVersion
import org.apache.kafka.server.config.ConfigType
import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics}
import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics, LinuxIoMetricsCollector}
import org.apache.kafka.server.network.{EndpointReadyFutures, KafkaAuthorizerServerInfo}
import org.apache.kafka.server.policy.{AlterConfigPolicy, CreateTopicPolicy}
import org.apache.kafka.server.util.{Deadline, FutureUtils}
@ -154,7 +153,7 @@ class ControllerServer(
metricsGroup.newGauge("ClusterId", () => clusterId)
metricsGroup.newGauge("yammer-metrics-count", () => KafkaYammerMetrics.defaultRegistry.allMetrics.size)
linuxIoMetricsCollector = new LinuxIoMetricsCollector("/proc", time, logger.underlying)
linuxIoMetricsCollector = new LinuxIoMetricsCollector("/proc", time)
if (linuxIoMetricsCollector.usable()) {
metricsGroup.newGauge("linux-disk-read-bytes", () => linuxIoMetricsCollector.readBytes())
metricsGroup.newGauge("linux-disk-write-bytes", () => linuxIoMetricsCollector.writeBytes())

View File

@ -20,7 +20,6 @@ package kafka.server
import com.yammer.metrics.core.MetricName
import kafka.log.LogManager
import kafka.log.remote.RemoteLogManager
import kafka.metrics.LinuxIoMetricsCollector
import kafka.network.SocketServer
import kafka.utils.Logging
import org.apache.kafka.common.ClusterResource
@ -34,7 +33,7 @@ import org.apache.kafka.metadata.BrokerState
import org.apache.kafka.security.CredentialProvider
import org.apache.kafka.server.NodeToControllerChannelManager
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics}
import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics, LinuxIoMetricsCollector}
import org.apache.kafka.server.util.Scheduler
import java.time.Duration
@ -115,7 +114,7 @@ trait KafkaBroker extends Logging {
metricsGroup.newGauge("ClusterId", () => clusterId)
metricsGroup.newGauge("yammer-metrics-count", () => KafkaYammerMetrics.defaultRegistry.allMetrics.size)
private val linuxIoMetricsCollector = new LinuxIoMetricsCollector("/proc", Time.SYSTEM, logger.underlying)
private val linuxIoMetricsCollector = new LinuxIoMetricsCollector("/proc", Time.SYSTEM)
if (linuxIoMetricsCollector.usable()) {
metricsGroup.newGauge("linux-disk-read-bytes", () => linuxIoMetricsCollector.readBytes())

View File

@ -1,82 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.metrics
import java.nio.charset.StandardCharsets
import java.nio.file.Files
import kafka.utils.Logging
import org.apache.kafka.server.util.MockTime
import org.apache.kafka.test.TestUtils
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
import org.junit.jupiter.api.{Test, Timeout}
@Timeout(120)
class LinuxIoMetricsCollectorTest extends Logging {
class TestDirectory {
val baseDir = TestUtils.tempDirectory()
val selfDir = Files.createDirectories(baseDir.toPath.resolve("self"))
def writeProcFile(readBytes: Long, writeBytes: Long) = {
val bld = new StringBuilder()
bld.append("rchar: 0%n".format())
bld.append("wchar: 0%n".format())
bld.append("syschr: 0%n".format())
bld.append("syscw: 0%n".format())
bld.append("read_bytes: %d%n".format(readBytes))
bld.append("write_bytes: %d%n".format(writeBytes))
bld.append("cancelled_write_bytes: 0%n".format())
Files.write(selfDir.resolve("io"), bld.toString().getBytes(StandardCharsets.UTF_8))
}
}
@Test
def testReadProcFile(): Unit = {
val testDirectory = new TestDirectory()
val time = new MockTime(100, 1000)
testDirectory.writeProcFile(123L, 456L)
val collector = new LinuxIoMetricsCollector(testDirectory.baseDir.getAbsolutePath,
time, logger.underlying)
// Test that we can read the values we wrote.
assertTrue(collector.usable())
assertEquals(123L, collector.readBytes())
assertEquals(456L, collector.writeBytes())
testDirectory.writeProcFile(124L, 457L)
// The previous values should still be cached.
assertEquals(123L, collector.readBytes())
assertEquals(456L, collector.writeBytes())
// Update the time, and the values should be re-read.
time.sleep(1)
assertEquals(124L, collector.readBytes())
assertEquals(457L, collector.writeBytes())
}
@Test
def testUnableToReadNonexistentProcFile(): Unit = {
val testDirectory = new TestDirectory()
val time = new MockTime(100, 1000)
val collector = new LinuxIoMetricsCollector(testDirectory.baseDir.getAbsolutePath,
time, logger.underlying)
// Test that we can't read the file, since it hasn't been written.
assertFalse(collector.usable())
}
}

View File

@ -34,8 +34,7 @@ import org.apache.kafka.common.metrics.JmxReporter
import org.apache.kafka.common.utils.Time
import org.apache.kafka.metadata.migration.ZkMigrationState
import org.apache.kafka.server.config.ServerLogConfigs
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics, LinuxIoMetricsCollector}
import org.junit.jupiter.api.Timeout
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
@ -110,7 +109,7 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
def testLinuxIoMetrics(quorum: String): Unit = {
// Check if linux-disk-{read,write}-bytes metrics either do or do not exist depending on whether we are or are not
// able to collect those metrics on the platform where this test is running.
val usable = new LinuxIoMetricsCollector("/proc", Time.SYSTEM, logger.underlying).usable()
val usable = new LinuxIoMetricsCollector("/proc", Time.SYSTEM).usable()
val expectedCount = if (usable) 1 else 0
val metrics = KafkaYammerMetrics.defaultRegistry.allMetrics
Set("linux-disk-read-bytes", "linux-disk-write-bytes").foreach(name =>

View File

@ -0,0 +1,112 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.metrics;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
/**
* Retrieves Linux /proc/self/io metrics.
*/
public class LinuxIoMetricsCollector {
private static final Logger LOG = LoggerFactory.getLogger(LinuxIoMetricsCollector.class);
private static final String READ_BYTES_PREFIX = "read_bytes: ";
private static final String WRITE_BYTES_PREFIX = "write_bytes: ";
private final Time time;
private final Path path;
private long lastUpdateMs = -1L;
private long cachedReadBytes = 0L;
private long cachedWriteBytes = 0L;
public LinuxIoMetricsCollector(String procRoot, Time time) {
this.time = time;
path = Paths.get(procRoot, "self", "io");
}
public long readBytes() {
synchronized (this) {
long curMs = time.milliseconds();
if (curMs != lastUpdateMs) {
updateValues(curMs);
}
return cachedReadBytes;
}
}
public long writeBytes() {
synchronized (this) {
long curMs = time.milliseconds();
if (curMs != lastUpdateMs) {
updateValues(curMs);
}
return cachedWriteBytes;
}
}
/**
* Read /proc/self/io.
* Generally, each line in this file contains a prefix followed by a colon and a number.
* For example, it might contain this:
* rchar: 4052
* wchar: 0
* syscr: 13
* syscw: 0
* read_bytes: 0
* write_bytes: 0
* cancelled_write_bytes: 0
*/
private boolean updateValues(long now) {
synchronized (this) {
try {
cachedReadBytes = -1L;
cachedWriteBytes = -1L;
List<String> lines = Files.readAllLines(path, StandardCharsets.UTF_8);
for (String line : lines) {
if (line.startsWith(READ_BYTES_PREFIX)) {
cachedReadBytes = Long.parseLong(line.substring(READ_BYTES_PREFIX.length()));
} else if (line.startsWith(WRITE_BYTES_PREFIX)) {
cachedWriteBytes = Long.parseLong(line.substring(WRITE_BYTES_PREFIX.length()));
}
}
lastUpdateMs = now;
return true;
} catch (Throwable t) {
LOG.warn("Unable to update IO metrics", t);
return false;
}
}
}
public boolean usable() {
if (path.toFile().exists()) {
return updateValues(time.milliseconds());
} else {
LOG.debug("Disabling IO metrics collection because {} does not exist.", path);
return false;
}
}
}

View File

@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.metrics;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
@Timeout(120)
public class LinuxIoMetricsCollectorTest {
@Test
public void testReadProcFile() throws IOException {
TestDirectory testDirectory = new TestDirectory();
Time time = new MockTime(0L, 100L, 1000L);
testDirectory.writeProcFile(123L, 456L);
LinuxIoMetricsCollector collector = new LinuxIoMetricsCollector(testDirectory.baseDir.getAbsolutePath(), time);
// Test that we can read the values we wrote.
assertTrue(collector.usable());
assertEquals(123L, collector.readBytes());
assertEquals(456L, collector.writeBytes());
testDirectory.writeProcFile(124L, 457L);
// The previous values should still be cached.
assertEquals(123L, collector.readBytes());
assertEquals(456L, collector.writeBytes());
// Update the time, and the values should be re-read.
time.sleep(1);
assertEquals(124L, collector.readBytes());
assertEquals(457L, collector.writeBytes());
}
@Test
public void testUnableToReadNonexistentProcFile() throws IOException {
TestDirectory testDirectory = new TestDirectory();
Time time = new MockTime(0L, 100L, 1000L);
LinuxIoMetricsCollector collector = new LinuxIoMetricsCollector(testDirectory.baseDir.getAbsolutePath(), time);
// Test that we can't read the file, since it hasn't been written.
assertFalse(collector.usable());
}
static class TestDirectory {
public final File baseDir;
private final Path selfDir;
TestDirectory() throws IOException {
baseDir = TestUtils.tempDirectory();
selfDir = Files.createDirectories(baseDir.toPath().resolve("self"));
}
void writeProcFile(long readBytes, long writeBytes) throws IOException {
String bld = "rchar: 0\n" +
"wchar: 0\n" +
"syschr: 0\n" +
"syscw: 0\n" +
"read_bytes: " + readBytes + "\n" +
"write_bytes: " + writeBytes + "\n" +
"cancelled_write_bytes: 0\n";
Files.write(selfDir.resolve("io"), bld.getBytes(StandardCharsets.UTF_8));
}
}
}