Refactor main function of TestLinearWriteSpeed

This commit is contained in:
Chang-Chi Hsu 2025-09-12 13:34:44 +02:00 committed by Ubuntu
parent 423330ebe7
commit bf56000f37
1 changed files with 153 additions and 81 deletions

View File

@ -63,83 +63,19 @@ import joptsimple.OptionSpec;
public class TestLinearWriteSpeed {
public static void main(String[] args) throws Exception {
OptionParser parser = new OptionParser();
OptionSpec<String> dirOpt = parser.accepts("dir", "The directory to write to.")
.withRequiredArg()
.describedAs("path")
.ofType(String.class)
.defaultsTo(System.getProperty("java.io.tmpdir"));
OptionSpec<Long> bytesOpt = parser.accepts("bytes", "REQUIRED: The total number of bytes to write.")
.withRequiredArg()
.describedAs("num_bytes")
.ofType(Long.class);
OptionSpec<Integer> sizeOpt = parser.accepts("size", "REQUIRED: The size of each write.")
.withRequiredArg()
.describedAs("num_bytes")
.ofType(Integer.class);
OptionSpec<Integer> messageSizeOpt = parser.accepts("message-size", "The size of each message in the message set.")
.withRequiredArg()
.describedAs("num_bytes")
.ofType(Integer.class)
.defaultsTo(1024);
OptionSpec<Integer> filesOpt = parser.accepts("files", "The number of logs or files.")
.withRequiredArg()
.describedAs("num_files")
.ofType(Integer.class)
.defaultsTo(1);
OptionSpec<Long> reportingIntervalOpt = parser.accepts("reporting-interval", "The number of ms between updates.")
.withRequiredArg()
.describedAs("ms")
.ofType(Long.class)
.defaultsTo(1000L);
OptionSpec<Integer> maxThroughputOpt = parser.accepts("max-throughput-mb", "The maximum throughput.")
.withRequiredArg()
.describedAs("mb")
.ofType(Integer.class)
.defaultsTo(Integer.MAX_VALUE);
OptionSpec<Long> flushIntervalOpt = parser.accepts("flush-interval", "The number of messages between flushes")
.withRequiredArg()
.describedAs("message_count")
.ofType(Long.class)
.defaultsTo(Long.MAX_VALUE);
OptionSpec<String> compressionCodecOpt = parser.accepts("compression", "The compression codec to use")
.withRequiredArg()
.describedAs("codec")
.ofType(String.class)
.defaultsTo(CompressionType.NONE.name);
OptionSpec<Integer> compressionLevelOpt = parser.accepts("level", "The compression level to use")
.withRequiredArg()
.describedAs("level")
.ofType(Integer.class);
OptionSpec<Void> mmapOpt = parser.accepts("mmap", "Do writes to memory-mapped files.");
OptionSpec<Void> channelOpt = parser.accepts("channel", "Do writes to file channels.");
OptionSpec<Void> logOpt = parser.accepts("log", "Do writes to kafka logs.");
OptionSet options = parser.parse(args);
CommandLineUtils.checkRequiredArgs(parser, options, bytesOpt, sizeOpt);
long bytesToWrite = options.valueOf(bytesOpt);
int bufferSize = options.valueOf(sizeOpt);
int numFiles = options.valueOf(filesOpt);
long reportingInterval = options.valueOf(reportingIntervalOpt);
String dir = options.valueOf(dirOpt);
long maxThroughputBytes = options.valueOf(maxThroughputOpt) * 1024L * 1024L;
var option = createOptions(new OptionParser(), args);
long bytesToWrite = option.options.valueOf(option.bytesOpt);
int bufferSize = option.options.valueOf(option.sizeOpt);
int numFiles = option.options.valueOf(option.filesOpt);
long reportingInterval = option.options.valueOf(option.reportingIntervalOpt);
String dir = option.options.valueOf(option.dirOpt);
long maxThroughputBytes = option.options.valueOf(option.maxThroughputOpt) * 1024L * 1024L;
ByteBuffer buffer = ByteBuffer.allocate(bufferSize);
int messageSize = options.valueOf(messageSizeOpt);
long flushInterval = options.valueOf(flushIntervalOpt);
CompressionType compressionType = CompressionType.forName(options.valueOf(compressionCodecOpt));
int messageSize = option.options.valueOf(option.messageSizeOpt);
long flushInterval = option.options.valueOf(option.flushIntervalOpt);
CompressionType compressionType = CompressionType.forName(option.options.valueOf(option.compressionCodecOpt));
Compression.Builder<? extends Compression> compressionBuilder = Compression.of(compressionType);
Integer compressionLevel = options.valueOf(compressionLevelOpt);
Integer compressionLevel = option.options.valueOf(option.compressionLevelOpt);
if (compressionLevel != null) setupCompression(compressionType, compressionBuilder, compressionLevel);
Compression compression = compressionBuilder.build();
@ -159,17 +95,17 @@ public class TestLinearWriteSpeed {
scheduler.startup();
for (int i = 0; i < numFiles; i++) {
if (options.has(mmapOpt)) {
if (option.options.has(option.mmapOpt)) {
writables[i] = new MmapWritable(new File(dir, "kafka-test-" + i + ".dat"), bytesToWrite / numFiles, buffer);
} else if (options.has(channelOpt)) {
} else if (option.options.has(option.channelOpt)) {
writables[i] = new ChannelWritable(new File(dir, "kafka-test-" + i + ".dat"), buffer);
} else if (options.has(logOpt)) {
} else if (option.options.has(option.logOpt)) {
int segmentSize = ThreadLocalRandom.current().nextInt(512) * 1024 * 1024 + 64 * 1024 * 1024;
Properties logProperties = new Properties();
logProperties.put(TopicConfig.SEGMENT_BYTES_CONFIG, Integer.toString(segmentSize));
logProperties.put(TopicConfig.FLUSH_MESSAGES_INTERVAL_CONFIG, Long.toString(flushInterval));
LogConfig logConfig = new LogConfig(logProperties);
writables[i] = new LogWritable(new File(dir, "kafka-test-" + i), logConfig, scheduler, messageSet);
writables[i] = new LogWritable(new File(dir, "kafka-test-" + i), logConfig, scheduler, messageSet, compression, recordsList);
} else {
System.err.println("Must specify what to write to with one of --log, --channel, or --mmap");
Exit.exit(1);
@ -298,9 +234,13 @@ public class TestLinearWriteSpeed {
static class LogWritable implements Writable {
MemoryRecords messages;
UnifiedLog log;
Compression compression;
List<SimpleRecord> recordsList;
public LogWritable(File dir, LogConfig config, Scheduler scheduler, MemoryRecords messages) throws IOException {
public LogWritable(File dir, LogConfig config, Scheduler scheduler, MemoryRecords messages, Compression compression, List<SimpleRecord> recordsList) throws IOException {
this.messages = messages;
this.compression = compression;
this.recordsList = recordsList;
Utils.delete(dir);
this.log = UnifiedLog.create(
dir,
@ -323,6 +263,7 @@ public class TestLinearWriteSpeed {
}
public int write() {
this.messages = MemoryRecords.withRecords(compression, recordsList.toArray(new SimpleRecord[0]));
log.appendAsLeader(
messages,
0,
@ -338,4 +279,135 @@ public class TestLinearWriteSpeed {
Utils.delete(log.dir());
}
}
}
private static class Options {
private final OptionSpec<String> dirOpt;
private final OptionSpec<Long> bytesOpt;
private final OptionSpec<Integer> sizeOpt;
private final OptionSpec<Integer> messageSizeOpt;
private final OptionSpec<Integer> filesOpt;
private final OptionSpec<Long> reportingIntervalOpt;
private final OptionSpec<Integer> maxThroughputOpt;
private final OptionSpec<Long> flushIntervalOpt;
private final OptionSpec<String> compressionCodecOpt;
private final OptionSpec<Integer> compressionLevelOpt;
private final OptionSpec<Void> channelOpt;
private final OptionSpec<Void> logOpt;
private final OptionSpec<Void> mmapOpt;
private final OptionSet options;
private Options(
OptionSpec<String> dirOpt,
OptionSpec<Long> bytesOpt,
OptionSpec<Integer> sizeOpt,
OptionSpec<Integer> messageSizeOpt,
OptionSpec<Integer> filesOpt,
OptionSpec<Long> reportingIntervalOpt,
OptionSpec<Integer> maxThroughputOpt,
OptionSpec<Long> flushIntervalOpt,
OptionSpec<String> compressionCodecOpt,
OptionSpec<Integer> compressionLevelOpt,
OptionSpec<Void> channelOpt,
OptionSpec<Void> logOpt,
OptionSpec<Void> mmapOpt,
OptionSet options
) {
this.dirOpt = dirOpt;
this.bytesOpt = bytesOpt;
this.sizeOpt = sizeOpt;
this.messageSizeOpt = messageSizeOpt;
this.filesOpt = filesOpt;
this.reportingIntervalOpt = reportingIntervalOpt;
this.maxThroughputOpt = maxThroughputOpt;
this.flushIntervalOpt = flushIntervalOpt;
this.compressionCodecOpt = compressionCodecOpt;
this.compressionLevelOpt = compressionLevelOpt;
this.channelOpt = channelOpt;
this.logOpt = logOpt;
this.mmapOpt = mmapOpt;
this.options = options;
}
}
private static Options createOptions(OptionParser parser, String[] args) {
OptionSpec<String> dirOpt = parser.accepts("dir", "The directory to write to.")
.withRequiredArg()
.describedAs("path")
.ofType(String.class)
.defaultsTo(System.getProperty("java.io.tmpdir"));
OptionSpec<Long> bytesOpt = parser.accepts("bytes", "REQUIRED: The total number of bytes to write.")
.withRequiredArg()
.describedAs("num_bytes")
.ofType(Long.class);
OptionSpec<Integer> sizeOpt = parser.accepts("size", "REQUIRED: The size of each write.")
.withRequiredArg()
.describedAs("num_bytes")
.ofType(Integer.class);
OptionSpec<Integer> messageSizeOpt = parser.accepts("message-size", "The size of each message in the message set.")
.withRequiredArg()
.describedAs("num_bytes")
.ofType(Integer.class)
.defaultsTo(1024);
OptionSpec<Integer> filesOpt = parser.accepts("files", "The number of logs or files.")
.withRequiredArg()
.describedAs("num_files")
.ofType(Integer.class)
.defaultsTo(1);
OptionSpec<Long> reportingIntervalOpt = parser.accepts("reporting-interval", "The number of ms between updates.")
.withRequiredArg()
.describedAs("ms")
.ofType(Long.class)
.defaultsTo(1000L);
OptionSpec<Integer> maxThroughputOpt = parser.accepts("max-throughput-mb", "The maximum throughput.")
.withRequiredArg()
.describedAs("mb")
.ofType(Integer.class)
.defaultsTo(Integer.MAX_VALUE);
OptionSpec<Long> flushIntervalOpt = parser.accepts("flush-interval", "The number of messages between flushes")
.withRequiredArg()
.describedAs("message_count")
.ofType(Long.class)
.defaultsTo(Long.MAX_VALUE);
OptionSpec<String> compressionCodecOpt = parser.accepts("compression", "The compression codec to use")
.withRequiredArg()
.describedAs("codec")
.ofType(String.class)
.defaultsTo(CompressionType.NONE.name);
OptionSpec<Integer> compressionLevelOpt = parser.accepts("level", "The compression level to use")
.withRequiredArg()
.describedAs("level")
.ofType(Integer.class);
OptionSpec<Void> channelOpt = parser.accepts("channel", "Do writes to file channels.");
OptionSpec<Void> logOpt = parser.accepts("log", "Do writes to kafka logs.");
OptionSpec<Void> mmapOpt = parser.accepts("mmap", "Do writes to mmap file.");
OptionSet options = parser.parse(args);
CommandLineUtils.checkRequiredArgs(parser, options, bytesOpt, sizeOpt);
return new Options(
dirOpt,
bytesOpt,
sizeOpt,
messageSizeOpt,
filesOpt,
reportingIntervalOpt,
maxThroughputOpt,
flushIntervalOpt,
compressionCodecOpt,
compressionLevelOpt,
channelOpt,
logOpt,
mmapOpt,
options
);
}
}