KAFKA-19618 the `record-size` and `throughput`arguments don't work in TestRaftServer (#20379)

The `record-size` and `throughput` arguments don’t work in
`TestRaftServer`. The `recordsPerSec` and `recordSize` values are always
hard-coded.

- Fix `recordsPerSec` and `recordSize` values hard-coded issue
- Add "Required" description to command-line options to make it clear to
users.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Jhen-Yung Hsu 2025-08-22 01:43:52 +08:00 committed by GitHub
parent 0202721b4c
commit eeb6a0d981
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 8 additions and 12 deletions

View File

@ -25,7 +25,6 @@ import kafka.network.SocketServer
import kafka.raft.{DefaultExternalKRaftMetrics, KafkaRaftManager, RaftManager} import kafka.raft.{DefaultExternalKRaftMetrics, KafkaRaftManager, RaftManager}
import kafka.server.{KafkaConfig, KafkaRequestHandlerPool} import kafka.server.{KafkaConfig, KafkaRequestHandlerPool}
import kafka.utils.{CoreUtils, Logging} import kafka.utils.{CoreUtils, Logging}
import org.apache.kafka.common.errors.InvalidConfigurationException
import org.apache.kafka.common.message.ApiMessageType.ListenerType import org.apache.kafka.common.message.ApiMessageType.ListenerType
import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.metrics.stats.Percentiles.BucketSizing import org.apache.kafka.common.metrics.stats.Percentiles.BucketSizing
@ -115,8 +114,8 @@ class TestRaftServer(
workloadGenerator = new RaftWorkloadGenerator( workloadGenerator = new RaftWorkloadGenerator(
raftManager, raftManager,
time, time,
recordsPerSec = 20000, recordsPerSec = throughput,
recordSize = 256 recordSize = recordSize
) )
val requestHandler = new TestRaftRequestHandler( val requestHandler = new TestRaftRequestHandler(
@ -428,7 +427,7 @@ object TestRaftServer extends Logging {
} }
private class TestRaftServerOptions(args: Array[String]) extends CommandDefaultOptions(args) { private class TestRaftServerOptions(args: Array[String]) extends CommandDefaultOptions(args) {
val configOpt: OptionSpec[String] = parser.accepts("config", "Required configured file") val configOpt: OptionSpec[String] = parser.accepts("config", "REQUIRED: The configured file")
.withRequiredArg .withRequiredArg
.describedAs("filename") .describedAs("filename")
.ofType(classOf[String]) .ofType(classOf[String])
@ -446,12 +445,14 @@ object TestRaftServer extends Logging {
.ofType(classOf[Int]) .ofType(classOf[Int])
.defaultsTo(256) .defaultsTo(256)
val directoryId: OptionSpec[String] = parser.accepts("replica-directory-id", "The directory id of the replica") val directoryId: OptionSpec[String] = parser.accepts("replica-directory-id", "REQUIRED: The directory id of the replica")
.withRequiredArg .withRequiredArg
.describedAs("directory id") .describedAs("directory id")
.ofType(classOf[String]) .ofType(classOf[String])
options = parser.parse(args : _*) options = parser.parse(args : _*)
def checkArgs(): Unit = CommandLineUtils.checkRequiredArgs(parser, options, configOpt, directoryId)
} }
def main(args: Array[String]): Unit = { def main(args: Array[String]): Unit = {
@ -459,16 +460,11 @@ object TestRaftServer extends Logging {
try { try {
CommandLineUtils.maybePrintHelpOrVersion(opts, CommandLineUtils.maybePrintHelpOrVersion(opts,
"Standalone raft server for performance testing") "Standalone raft server for performance testing")
opts.checkArgs()
val configFile = opts.options.valueOf(opts.configOpt) val configFile = opts.options.valueOf(opts.configOpt)
if (configFile == null) {
throw new InvalidConfigurationException("Missing configuration file. Should specify with '--config'")
}
val directoryIdAsString = opts.options.valueOf(opts.directoryId) val directoryIdAsString = opts.options.valueOf(opts.directoryId)
if (directoryIdAsString == null) {
throw new InvalidConfigurationException("Missing replica directory id. Should specify with --replica-directory-id")
}
val serverProps = Utils.loadProps(configFile) val serverProps = Utils.loadProps(configFile)
// KafkaConfig requires either `process.roles` or `zookeeper.connect`. Neither are // KafkaConfig requires either `process.roles` or `zookeeper.connect`. Neither are