mirror of https://github.com/apache/kafka.git
Extract options into main function
This commit is contained in:
parent
bf56000f37
commit
cbb2a9c4c3
|
@ -63,19 +63,22 @@ import joptsimple.OptionSpec;
|
||||||
public class TestLinearWriteSpeed {
|
public class TestLinearWriteSpeed {
|
||||||
|
|
||||||
public static void main(String[] args) throws Exception {
|
public static void main(String[] args) throws Exception {
|
||||||
var option = createOptions(new OptionParser(), args);
|
var parser = new OptionParser();
|
||||||
long bytesToWrite = option.options.valueOf(option.bytesOpt);
|
var option = createOptions(parser);
|
||||||
int bufferSize = option.options.valueOf(option.sizeOpt);
|
OptionSet options = parser.parse(args);
|
||||||
int numFiles = option.options.valueOf(option.filesOpt);
|
CommandLineUtils.checkRequiredArgs(parser, options, option.bytesOpt, option.sizeOpt);
|
||||||
long reportingInterval = option.options.valueOf(option.reportingIntervalOpt);
|
long bytesToWrite = options.valueOf(option.bytesOpt);
|
||||||
String dir = option.options.valueOf(option.dirOpt);
|
int bufferSize = options.valueOf(option.sizeOpt);
|
||||||
long maxThroughputBytes = option.options.valueOf(option.maxThroughputOpt) * 1024L * 1024L;
|
int numFiles = options.valueOf(option.filesOpt);
|
||||||
|
long reportingInterval = options.valueOf(option.reportingIntervalOpt);
|
||||||
|
String dir = options.valueOf(option.dirOpt);
|
||||||
|
long maxThroughputBytes = options.valueOf(option.maxThroughputOpt) * 1024L * 1024L;
|
||||||
ByteBuffer buffer = ByteBuffer.allocate(bufferSize);
|
ByteBuffer buffer = ByteBuffer.allocate(bufferSize);
|
||||||
int messageSize = option.options.valueOf(option.messageSizeOpt);
|
int messageSize = options.valueOf(option.messageSizeOpt);
|
||||||
long flushInterval = option.options.valueOf(option.flushIntervalOpt);
|
long flushInterval = options.valueOf(option.flushIntervalOpt);
|
||||||
CompressionType compressionType = CompressionType.forName(option.options.valueOf(option.compressionCodecOpt));
|
CompressionType compressionType = CompressionType.forName(options.valueOf(option.compressionCodecOpt));
|
||||||
Compression.Builder<? extends Compression> compressionBuilder = Compression.of(compressionType);
|
Compression.Builder<? extends Compression> compressionBuilder = Compression.of(compressionType);
|
||||||
Integer compressionLevel = option.options.valueOf(option.compressionLevelOpt);
|
Integer compressionLevel = options.valueOf(option.compressionLevelOpt);
|
||||||
|
|
||||||
if (compressionLevel != null) setupCompression(compressionType, compressionBuilder, compressionLevel);
|
if (compressionLevel != null) setupCompression(compressionType, compressionBuilder, compressionLevel);
|
||||||
Compression compression = compressionBuilder.build();
|
Compression compression = compressionBuilder.build();
|
||||||
|
@ -95,11 +98,11 @@ public class TestLinearWriteSpeed {
|
||||||
scheduler.startup();
|
scheduler.startup();
|
||||||
|
|
||||||
for (int i = 0; i < numFiles; i++) {
|
for (int i = 0; i < numFiles; i++) {
|
||||||
if (option.options.has(option.mmapOpt)) {
|
if (options.has(option.mmapOpt)) {
|
||||||
writables[i] = new MmapWritable(new File(dir, "kafka-test-" + i + ".dat"), bytesToWrite / numFiles, buffer);
|
writables[i] = new MmapWritable(new File(dir, "kafka-test-" + i + ".dat"), bytesToWrite / numFiles, buffer);
|
||||||
} else if (option.options.has(option.channelOpt)) {
|
} else if (options.has(option.channelOpt)) {
|
||||||
writables[i] = new ChannelWritable(new File(dir, "kafka-test-" + i + ".dat"), buffer);
|
writables[i] = new ChannelWritable(new File(dir, "kafka-test-" + i + ".dat"), buffer);
|
||||||
} else if (option.options.has(option.logOpt)) {
|
} else if (options.has(option.logOpt)) {
|
||||||
int segmentSize = ThreadLocalRandom.current().nextInt(512) * 1024 * 1024 + 64 * 1024 * 1024;
|
int segmentSize = ThreadLocalRandom.current().nextInt(512) * 1024 * 1024 + 64 * 1024 * 1024;
|
||||||
Properties logProperties = new Properties();
|
Properties logProperties = new Properties();
|
||||||
logProperties.put(TopicConfig.SEGMENT_BYTES_CONFIG, Integer.toString(segmentSize));
|
logProperties.put(TopicConfig.SEGMENT_BYTES_CONFIG, Integer.toString(segmentSize));
|
||||||
|
@ -294,7 +297,6 @@ public class TestLinearWriteSpeed {
|
||||||
private final OptionSpec<Void> channelOpt;
|
private final OptionSpec<Void> channelOpt;
|
||||||
private final OptionSpec<Void> logOpt;
|
private final OptionSpec<Void> logOpt;
|
||||||
private final OptionSpec<Void> mmapOpt;
|
private final OptionSpec<Void> mmapOpt;
|
||||||
private final OptionSet options;
|
|
||||||
|
|
||||||
private Options(
|
private Options(
|
||||||
OptionSpec<String> dirOpt,
|
OptionSpec<String> dirOpt,
|
||||||
|
@ -309,8 +311,7 @@ public class TestLinearWriteSpeed {
|
||||||
OptionSpec<Integer> compressionLevelOpt,
|
OptionSpec<Integer> compressionLevelOpt,
|
||||||
OptionSpec<Void> channelOpt,
|
OptionSpec<Void> channelOpt,
|
||||||
OptionSpec<Void> logOpt,
|
OptionSpec<Void> logOpt,
|
||||||
OptionSpec<Void> mmapOpt,
|
OptionSpec<Void> mmapOpt
|
||||||
OptionSet options
|
|
||||||
) {
|
) {
|
||||||
this.dirOpt = dirOpt;
|
this.dirOpt = dirOpt;
|
||||||
this.bytesOpt = bytesOpt;
|
this.bytesOpt = bytesOpt;
|
||||||
|
@ -325,11 +326,10 @@ public class TestLinearWriteSpeed {
|
||||||
this.channelOpt = channelOpt;
|
this.channelOpt = channelOpt;
|
||||||
this.logOpt = logOpt;
|
this.logOpt = logOpt;
|
||||||
this.mmapOpt = mmapOpt;
|
this.mmapOpt = mmapOpt;
|
||||||
this.options = options;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static Options createOptions(OptionParser parser, String[] args) {
|
private static Options createOptions(OptionParser parser) {
|
||||||
OptionSpec<String> dirOpt = parser.accepts("dir", "The directory to write to.")
|
OptionSpec<String> dirOpt = parser.accepts("dir", "The directory to write to.")
|
||||||
.withRequiredArg()
|
.withRequiredArg()
|
||||||
.describedAs("path")
|
.describedAs("path")
|
||||||
|
@ -390,8 +390,6 @@ public class TestLinearWriteSpeed {
|
||||||
OptionSpec<Void> channelOpt = parser.accepts("channel", "Do writes to file channels.");
|
OptionSpec<Void> channelOpt = parser.accepts("channel", "Do writes to file channels.");
|
||||||
OptionSpec<Void> logOpt = parser.accepts("log", "Do writes to kafka logs.");
|
OptionSpec<Void> logOpt = parser.accepts("log", "Do writes to kafka logs.");
|
||||||
OptionSpec<Void> mmapOpt = parser.accepts("mmap", "Do writes to mmap file.");
|
OptionSpec<Void> mmapOpt = parser.accepts("mmap", "Do writes to mmap file.");
|
||||||
OptionSet options = parser.parse(args);
|
|
||||||
CommandLineUtils.checkRequiredArgs(parser, options, bytesOpt, sizeOpt);
|
|
||||||
|
|
||||||
return new Options(
|
return new Options(
|
||||||
dirOpt,
|
dirOpt,
|
||||||
|
@ -406,8 +404,7 @@ public class TestLinearWriteSpeed {
|
||||||
compressionLevelOpt,
|
compressionLevelOpt,
|
||||||
channelOpt,
|
channelOpt,
|
||||||
logOpt,
|
logOpt,
|
||||||
mmapOpt,
|
mmapOpt
|
||||||
options
|
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
Loading…
Reference in New Issue