diff --git a/core/src/main/scala/kafka/metrics/LinuxIoMetricsCollector.scala b/core/src/main/scala/kafka/metrics/LinuxIoMetricsCollector.scala deleted file mode 100644 index 3b4950948b3..00000000000 --- a/core/src/main/scala/kafka/metrics/LinuxIoMetricsCollector.scala +++ /dev/null @@ -1,101 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.metrics - -import java.nio.file.{Files, Path, Paths} -import org.apache.kafka.common.utils.Time -import org.slf4j.Logger - -import java.nio.charset.StandardCharsets -import scala.jdk.CollectionConverters._ - -/** - * Retrieves Linux /proc/self/io metrics. - */ -class LinuxIoMetricsCollector(procRoot: String, val time: Time, val logger: Logger) { - import LinuxIoMetricsCollector._ - private var lastUpdateMs = -1L - private var cachedReadBytes = 0L - private var cachedWriteBytes = 0L - val path: Path = Paths.get(procRoot, "self", "io") - - def readBytes(): Long = this.synchronized { - val curMs = time.milliseconds() - if (curMs != lastUpdateMs) { - updateValues(curMs) - } - cachedReadBytes - } - - def writeBytes(): Long = this.synchronized { - val curMs = time.milliseconds() - if (curMs != lastUpdateMs) { - updateValues(curMs) - } - cachedWriteBytes - } - - /** - * Read /proc/self/io. - * - * Generally, each line in this file contains a prefix followed by a colon and a number. - * - * For example, it might contain this: - * rchar: 4052 - * wchar: 0 - * syscr: 13 - * syscw: 0 - * read_bytes: 0 - * write_bytes: 0 - * cancelled_write_bytes: 0 - */ - private def updateValues(now: Long): Boolean = this.synchronized { - try { - cachedReadBytes = -1 - cachedWriteBytes = -1 - val lines = Files.readAllLines(path, StandardCharsets.UTF_8).asScala - lines.foreach(line => { - if (line.startsWith(READ_BYTES_PREFIX)) { - cachedReadBytes = line.substring(READ_BYTES_PREFIX.length).toLong - } else if (line.startsWith(WRITE_BYTES_PREFIX)) { - cachedWriteBytes = line.substring(WRITE_BYTES_PREFIX.length).toLong - } - }) - lastUpdateMs = now - true - } catch { - case t: Throwable => - logger.warn("Unable to update IO metrics", t) - false - } - } - - def usable(): Boolean = { - if (path.toFile.exists()) { - updateValues(time.milliseconds()) - } else { - logger.debug(s"disabling IO metrics collection because $path does not exist.") - false - } - } -} - -object LinuxIoMetricsCollector { - private val READ_BYTES_PREFIX = "read_bytes: " - private val WRITE_BYTES_PREFIX = "write_bytes: " -} diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala b/core/src/main/scala/kafka/server/ControllerServer.scala index 58a033ac638..7c1f694b080 100644 --- a/core/src/main/scala/kafka/server/ControllerServer.scala +++ b/core/src/main/scala/kafka/server/ControllerServer.scala @@ -17,7 +17,6 @@ package kafka.server -import kafka.metrics.LinuxIoMetricsCollector import kafka.migration.MigrationPropagator import kafka.network.{DataPlaneAcceptor, SocketServer} import kafka.raft.KafkaRaftManager @@ -48,7 +47,7 @@ import org.apache.kafka.server.authorizer.Authorizer import org.apache.kafka.server.config.ServerLogConfigs.{ALTER_CONFIG_POLICY_CLASS_NAME_CONFIG, CREATE_TOPIC_POLICY_CLASS_NAME_CONFIG} import org.apache.kafka.server.common.ApiMessageAndVersion import org.apache.kafka.server.config.ConfigType -import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics} +import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics, LinuxIoMetricsCollector} import org.apache.kafka.server.network.{EndpointReadyFutures, KafkaAuthorizerServerInfo} import org.apache.kafka.server.policy.{AlterConfigPolicy, CreateTopicPolicy} import org.apache.kafka.server.util.{Deadline, FutureUtils} @@ -154,7 +153,7 @@ class ControllerServer( metricsGroup.newGauge("ClusterId", () => clusterId) metricsGroup.newGauge("yammer-metrics-count", () => KafkaYammerMetrics.defaultRegistry.allMetrics.size) - linuxIoMetricsCollector = new LinuxIoMetricsCollector("/proc", time, logger.underlying) + linuxIoMetricsCollector = new LinuxIoMetricsCollector("/proc", time) if (linuxIoMetricsCollector.usable()) { metricsGroup.newGauge("linux-disk-read-bytes", () => linuxIoMetricsCollector.readBytes()) metricsGroup.newGauge("linux-disk-write-bytes", () => linuxIoMetricsCollector.writeBytes()) diff --git a/core/src/main/scala/kafka/server/KafkaBroker.scala b/core/src/main/scala/kafka/server/KafkaBroker.scala index b88a56e378d..9e1ee3d6941 100644 --- a/core/src/main/scala/kafka/server/KafkaBroker.scala +++ b/core/src/main/scala/kafka/server/KafkaBroker.scala @@ -20,7 +20,6 @@ package kafka.server import com.yammer.metrics.core.MetricName import kafka.log.LogManager import kafka.log.remote.RemoteLogManager -import kafka.metrics.LinuxIoMetricsCollector import kafka.network.SocketServer import kafka.utils.Logging import org.apache.kafka.common.ClusterResource @@ -34,7 +33,7 @@ import org.apache.kafka.metadata.BrokerState import org.apache.kafka.security.CredentialProvider import org.apache.kafka.server.NodeToControllerChannelManager import org.apache.kafka.server.authorizer.Authorizer -import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics} +import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics, LinuxIoMetricsCollector} import org.apache.kafka.server.util.Scheduler import java.time.Duration @@ -115,7 +114,7 @@ trait KafkaBroker extends Logging { metricsGroup.newGauge("ClusterId", () => clusterId) metricsGroup.newGauge("yammer-metrics-count", () => KafkaYammerMetrics.defaultRegistry.allMetrics.size) - private val linuxIoMetricsCollector = new LinuxIoMetricsCollector("/proc", Time.SYSTEM, logger.underlying) + private val linuxIoMetricsCollector = new LinuxIoMetricsCollector("/proc", Time.SYSTEM) if (linuxIoMetricsCollector.usable()) { metricsGroup.newGauge("linux-disk-read-bytes", () => linuxIoMetricsCollector.readBytes()) diff --git a/core/src/test/scala/kafka/metrics/LinuxIoMetricsCollectorTest.scala b/core/src/test/scala/kafka/metrics/LinuxIoMetricsCollectorTest.scala deleted file mode 100644 index 5ef2bb0c580..00000000000 --- a/core/src/test/scala/kafka/metrics/LinuxIoMetricsCollectorTest.scala +++ /dev/null @@ -1,82 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.metrics - -import java.nio.charset.StandardCharsets -import java.nio.file.Files -import kafka.utils.Logging -import org.apache.kafka.server.util.MockTime -import org.apache.kafka.test.TestUtils -import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue} -import org.junit.jupiter.api.{Test, Timeout} - -@Timeout(120) -class LinuxIoMetricsCollectorTest extends Logging { - - class TestDirectory { - val baseDir = TestUtils.tempDirectory() - val selfDir = Files.createDirectories(baseDir.toPath.resolve("self")) - - def writeProcFile(readBytes: Long, writeBytes: Long) = { - val bld = new StringBuilder() - bld.append("rchar: 0%n".format()) - bld.append("wchar: 0%n".format()) - bld.append("syschr: 0%n".format()) - bld.append("syscw: 0%n".format()) - bld.append("read_bytes: %d%n".format(readBytes)) - bld.append("write_bytes: %d%n".format(writeBytes)) - bld.append("cancelled_write_bytes: 0%n".format()) - Files.write(selfDir.resolve("io"), bld.toString().getBytes(StandardCharsets.UTF_8)) - } - } - - @Test - def testReadProcFile(): Unit = { - val testDirectory = new TestDirectory() - val time = new MockTime(100, 1000) - testDirectory.writeProcFile(123L, 456L) - val collector = new LinuxIoMetricsCollector(testDirectory.baseDir.getAbsolutePath, - time, logger.underlying) - - // Test that we can read the values we wrote. - assertTrue(collector.usable()) - assertEquals(123L, collector.readBytes()) - assertEquals(456L, collector.writeBytes()) - testDirectory.writeProcFile(124L, 457L) - - // The previous values should still be cached. - assertEquals(123L, collector.readBytes()) - assertEquals(456L, collector.writeBytes()) - - // Update the time, and the values should be re-read. - time.sleep(1) - assertEquals(124L, collector.readBytes()) - assertEquals(457L, collector.writeBytes()) - } - - @Test - def testUnableToReadNonexistentProcFile(): Unit = { - val testDirectory = new TestDirectory() - val time = new MockTime(100, 1000) - val collector = new LinuxIoMetricsCollector(testDirectory.baseDir.getAbsolutePath, - time, logger.underlying) - - // Test that we can't read the file, since it hasn't been written. - assertFalse(collector.usable()) - } -} diff --git a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala index 7c9d0ac0b40..e2355dfe119 100644 --- a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala +++ b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala @@ -34,8 +34,7 @@ import org.apache.kafka.common.metrics.JmxReporter import org.apache.kafka.common.utils.Time import org.apache.kafka.metadata.migration.ZkMigrationState import org.apache.kafka.server.config.ServerLogConfigs -import org.apache.kafka.server.metrics.KafkaMetricsGroup -import org.apache.kafka.server.metrics.KafkaYammerMetrics +import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics, LinuxIoMetricsCollector} import org.junit.jupiter.api.Timeout import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.ValueSource @@ -110,7 +109,7 @@ class MetricsTest extends KafkaServerTestHarness with Logging { def testLinuxIoMetrics(quorum: String): Unit = { // Check if linux-disk-{read,write}-bytes metrics either do or do not exist depending on whether we are or are not // able to collect those metrics on the platform where this test is running. - val usable = new LinuxIoMetricsCollector("/proc", Time.SYSTEM, logger.underlying).usable() + val usable = new LinuxIoMetricsCollector("/proc", Time.SYSTEM).usable() val expectedCount = if (usable) 1 else 0 val metrics = KafkaYammerMetrics.defaultRegistry.allMetrics Set("linux-disk-read-bytes", "linux-disk-write-bytes").foreach(name => diff --git a/server/src/main/java/org/apache/kafka/server/metrics/LinuxIoMetricsCollector.java b/server/src/main/java/org/apache/kafka/server/metrics/LinuxIoMetricsCollector.java new file mode 100644 index 00000000000..a90c9b4341a --- /dev/null +++ b/server/src/main/java/org/apache/kafka/server/metrics/LinuxIoMetricsCollector.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.metrics; + +import org.apache.kafka.common.utils.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.List; + +/** + * Retrieves Linux /proc/self/io metrics. + */ +public class LinuxIoMetricsCollector { + + private static final Logger LOG = LoggerFactory.getLogger(LinuxIoMetricsCollector.class); + private static final String READ_BYTES_PREFIX = "read_bytes: "; + private static final String WRITE_BYTES_PREFIX = "write_bytes: "; + + private final Time time; + private final Path path; + + private long lastUpdateMs = -1L; + private long cachedReadBytes = 0L; + private long cachedWriteBytes = 0L; + + public LinuxIoMetricsCollector(String procRoot, Time time) { + this.time = time; + path = Paths.get(procRoot, "self", "io"); + } + + public long readBytes() { + synchronized (this) { + long curMs = time.milliseconds(); + if (curMs != lastUpdateMs) { + updateValues(curMs); + } + return cachedReadBytes; + } + } + + public long writeBytes() { + synchronized (this) { + long curMs = time.milliseconds(); + if (curMs != lastUpdateMs) { + updateValues(curMs); + } + return cachedWriteBytes; + } + } + + /** + * Read /proc/self/io. + * Generally, each line in this file contains a prefix followed by a colon and a number. + * For example, it might contain this: + * rchar: 4052 + * wchar: 0 + * syscr: 13 + * syscw: 0 + * read_bytes: 0 + * write_bytes: 0 + * cancelled_write_bytes: 0 + */ + private boolean updateValues(long now) { + synchronized (this) { + try { + cachedReadBytes = -1L; + cachedWriteBytes = -1L; + List lines = Files.readAllLines(path, StandardCharsets.UTF_8); + for (String line : lines) { + if (line.startsWith(READ_BYTES_PREFIX)) { + cachedReadBytes = Long.parseLong(line.substring(READ_BYTES_PREFIX.length())); + } else if (line.startsWith(WRITE_BYTES_PREFIX)) { + cachedWriteBytes = Long.parseLong(line.substring(WRITE_BYTES_PREFIX.length())); + } + } + lastUpdateMs = now; + return true; + } catch (Throwable t) { + LOG.warn("Unable to update IO metrics", t); + return false; + } + } + } + + public boolean usable() { + if (path.toFile().exists()) { + return updateValues(time.milliseconds()); + } else { + LOG.debug("Disabling IO metrics collection because {} does not exist.", path); + return false; + } + } +} diff --git a/server/src/test/java/org/apache/kafka/server/metrics/LinuxIoMetricsCollectorTest.java b/server/src/test/java/org/apache/kafka/server/metrics/LinuxIoMetricsCollectorTest.java new file mode 100644 index 00000000000..8caa9ab664c --- /dev/null +++ b/server/src/test/java/org/apache/kafka/server/metrics/LinuxIoMetricsCollectorTest.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.metrics; + +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +@Timeout(120) +public class LinuxIoMetricsCollectorTest { + + @Test + public void testReadProcFile() throws IOException { + TestDirectory testDirectory = new TestDirectory(); + Time time = new MockTime(0L, 100L, 1000L); + testDirectory.writeProcFile(123L, 456L); + LinuxIoMetricsCollector collector = new LinuxIoMetricsCollector(testDirectory.baseDir.getAbsolutePath(), time); + + // Test that we can read the values we wrote. + assertTrue(collector.usable()); + assertEquals(123L, collector.readBytes()); + assertEquals(456L, collector.writeBytes()); + testDirectory.writeProcFile(124L, 457L); + + // The previous values should still be cached. + assertEquals(123L, collector.readBytes()); + assertEquals(456L, collector.writeBytes()); + + // Update the time, and the values should be re-read. + time.sleep(1); + assertEquals(124L, collector.readBytes()); + assertEquals(457L, collector.writeBytes()); + } + + @Test + public void testUnableToReadNonexistentProcFile() throws IOException { + TestDirectory testDirectory = new TestDirectory(); + Time time = new MockTime(0L, 100L, 1000L); + LinuxIoMetricsCollector collector = new LinuxIoMetricsCollector(testDirectory.baseDir.getAbsolutePath(), time); + + // Test that we can't read the file, since it hasn't been written. + assertFalse(collector.usable()); + } + + static class TestDirectory { + + public final File baseDir; + private final Path selfDir; + + TestDirectory() throws IOException { + baseDir = TestUtils.tempDirectory(); + selfDir = Files.createDirectories(baseDir.toPath().resolve("self")); + } + + void writeProcFile(long readBytes, long writeBytes) throws IOException { + String bld = "rchar: 0\n" + + "wchar: 0\n" + + "syschr: 0\n" + + "syscw: 0\n" + + "read_bytes: " + readBytes + "\n" + + "write_bytes: " + writeBytes + "\n" + + "cancelled_write_bytes: 0\n"; + Files.write(selfDir.resolve("io"), bld.getBytes(StandardCharsets.UTF_8)); + } + } +}