diff --git a/core/src/main/scala/kafka/tools/TestRaftServer.scala b/core/src/main/scala/kafka/tools/TestRaftServer.scala index a47b9fd4d47..2b3183be80b 100644 --- a/core/src/main/scala/kafka/tools/TestRaftServer.scala +++ b/core/src/main/scala/kafka/tools/TestRaftServer.scala @@ -25,7 +25,6 @@ import kafka.network.SocketServer import kafka.raft.{DefaultExternalKRaftMetrics, KafkaRaftManager, RaftManager} import kafka.server.{KafkaConfig, KafkaRequestHandlerPool} import kafka.utils.{CoreUtils, Logging} -import org.apache.kafka.common.errors.InvalidConfigurationException import org.apache.kafka.common.message.ApiMessageType.ListenerType import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.metrics.stats.Percentiles.BucketSizing @@ -115,8 +114,8 @@ class TestRaftServer( workloadGenerator = new RaftWorkloadGenerator( raftManager, time, - recordsPerSec = 20000, - recordSize = 256 + recordsPerSec = throughput, + recordSize = recordSize ) val requestHandler = new TestRaftRequestHandler( @@ -428,7 +427,7 @@ object TestRaftServer extends Logging { } private class TestRaftServerOptions(args: Array[String]) extends CommandDefaultOptions(args) { - val configOpt: OptionSpec[String] = parser.accepts("config", "Required configured file") + val configOpt: OptionSpec[String] = parser.accepts("config", "REQUIRED: The configured file") .withRequiredArg .describedAs("filename") .ofType(classOf[String]) @@ -446,12 +445,14 @@ object TestRaftServer extends Logging { .ofType(classOf[Int]) .defaultsTo(256) - val directoryId: OptionSpec[String] = parser.accepts("replica-directory-id", "The directory id of the replica") + val directoryId: OptionSpec[String] = parser.accepts("replica-directory-id", "REQUIRED: The directory id of the replica") .withRequiredArg .describedAs("directory id") .ofType(classOf[String]) options = parser.parse(args : _*) + + def checkArgs(): Unit = CommandLineUtils.checkRequiredArgs(parser, options, configOpt, directoryId) } def main(args: Array[String]): Unit = { @@ -459,16 +460,11 @@ object TestRaftServer extends Logging { try { CommandLineUtils.maybePrintHelpOrVersion(opts, "Standalone raft server for performance testing") + opts.checkArgs() val configFile = opts.options.valueOf(opts.configOpt) - if (configFile == null) { - throw new InvalidConfigurationException("Missing configuration file. Should specify with '--config'") - } - val directoryIdAsString = opts.options.valueOf(opts.directoryId) - if (directoryIdAsString == null) { - throw new InvalidConfigurationException("Missing replica directory id. Should specify with --replica-directory-id") - } + val serverProps = Utils.loadProps(configFile) // KafkaConfig requires either `process.roles` or `zookeeper.connect`. Neither are