mirror of https://github.com/apache/kafka.git
Merge 1940c321de into 4a5aa37169
This commit is contained in:
commit
23afdb4807
|
|
@ -63,83 +63,22 @@ import joptsimple.OptionSpec;
|
|||
public class TestLinearWriteSpeed {
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
OptionParser parser = new OptionParser();
|
||||
|
||||
OptionSpec<String> dirOpt = parser.accepts("dir", "The directory to write to.")
|
||||
.withRequiredArg()
|
||||
.describedAs("path")
|
||||
.ofType(String.class)
|
||||
.defaultsTo(System.getProperty("java.io.tmpdir"));
|
||||
|
||||
OptionSpec<Long> bytesOpt = parser.accepts("bytes", "REQUIRED: The total number of bytes to write.")
|
||||
.withRequiredArg()
|
||||
.describedAs("num_bytes")
|
||||
.ofType(Long.class);
|
||||
|
||||
OptionSpec<Integer> sizeOpt = parser.accepts("size", "REQUIRED: The size of each write.")
|
||||
.withRequiredArg()
|
||||
.describedAs("num_bytes")
|
||||
.ofType(Integer.class);
|
||||
|
||||
OptionSpec<Integer> messageSizeOpt = parser.accepts("message-size", "The size of each message in the message set.")
|
||||
.withRequiredArg()
|
||||
.describedAs("num_bytes")
|
||||
.ofType(Integer.class)
|
||||
.defaultsTo(1024);
|
||||
|
||||
OptionSpec<Integer> filesOpt = parser.accepts("files", "The number of logs or files.")
|
||||
.withRequiredArg()
|
||||
.describedAs("num_files")
|
||||
.ofType(Integer.class)
|
||||
.defaultsTo(1);
|
||||
|
||||
OptionSpec<Long> reportingIntervalOpt = parser.accepts("reporting-interval", "The number of ms between updates.")
|
||||
.withRequiredArg()
|
||||
.describedAs("ms")
|
||||
.ofType(Long.class)
|
||||
.defaultsTo(1000L);
|
||||
|
||||
OptionSpec<Integer> maxThroughputOpt = parser.accepts("max-throughput-mb", "The maximum throughput.")
|
||||
.withRequiredArg()
|
||||
.describedAs("mb")
|
||||
.ofType(Integer.class)
|
||||
.defaultsTo(Integer.MAX_VALUE);
|
||||
|
||||
OptionSpec<Long> flushIntervalOpt = parser.accepts("flush-interval", "The number of messages between flushes")
|
||||
.withRequiredArg()
|
||||
.describedAs("message_count")
|
||||
.ofType(Long.class)
|
||||
.defaultsTo(Long.MAX_VALUE);
|
||||
|
||||
OptionSpec<String> compressionCodecOpt = parser.accepts("compression", "The compression codec to use")
|
||||
.withRequiredArg()
|
||||
.describedAs("codec")
|
||||
.ofType(String.class)
|
||||
.defaultsTo(CompressionType.NONE.name);
|
||||
|
||||
OptionSpec<Integer> compressionLevelOpt = parser.accepts("level", "The compression level to use")
|
||||
.withRequiredArg()
|
||||
.describedAs("level")
|
||||
.ofType(Integer.class);
|
||||
|
||||
OptionSpec<Void> mmapOpt = parser.accepts("mmap", "Do writes to memory-mapped files.");
|
||||
OptionSpec<Void> channelOpt = parser.accepts("channel", "Do writes to file channels.");
|
||||
OptionSpec<Void> logOpt = parser.accepts("log", "Do writes to kafka logs.");
|
||||
var parser = new OptionParser();
|
||||
var option = createOptions(parser);
|
||||
OptionSet options = parser.parse(args);
|
||||
CommandLineUtils.checkRequiredArgs(parser, options, bytesOpt, sizeOpt);
|
||||
|
||||
long bytesToWrite = options.valueOf(bytesOpt);
|
||||
int bufferSize = options.valueOf(sizeOpt);
|
||||
int numFiles = options.valueOf(filesOpt);
|
||||
long reportingInterval = options.valueOf(reportingIntervalOpt);
|
||||
String dir = options.valueOf(dirOpt);
|
||||
long maxThroughputBytes = options.valueOf(maxThroughputOpt) * 1024L * 1024L;
|
||||
CommandLineUtils.checkRequiredArgs(parser, options, option.bytesOpt, option.sizeOpt);
|
||||
long bytesToWrite = options.valueOf(option.bytesOpt);
|
||||
int bufferSize = options.valueOf(option.sizeOpt);
|
||||
int numFiles = options.valueOf(option.filesOpt);
|
||||
long reportingInterval = options.valueOf(option.reportingIntervalOpt);
|
||||
String dir = options.valueOf(option.dirOpt);
|
||||
long maxThroughputBytes = options.valueOf(option.maxThroughputOpt) * 1024L * 1024L;
|
||||
ByteBuffer buffer = ByteBuffer.allocate(bufferSize);
|
||||
int messageSize = options.valueOf(messageSizeOpt);
|
||||
long flushInterval = options.valueOf(flushIntervalOpt);
|
||||
CompressionType compressionType = CompressionType.forName(options.valueOf(compressionCodecOpt));
|
||||
int messageSize = options.valueOf(option.messageSizeOpt);
|
||||
long flushInterval = options.valueOf(option.flushIntervalOpt);
|
||||
CompressionType compressionType = CompressionType.forName(options.valueOf(option.compressionCodecOpt));
|
||||
Compression.Builder<? extends Compression> compressionBuilder = Compression.of(compressionType);
|
||||
Integer compressionLevel = options.valueOf(compressionLevelOpt);
|
||||
Integer compressionLevel = options.valueOf(option.compressionLevelOpt);
|
||||
|
||||
if (compressionLevel != null) setupCompression(compressionType, compressionBuilder, compressionLevel);
|
||||
Compression compression = compressionBuilder.build();
|
||||
|
|
@ -159,17 +98,17 @@ public class TestLinearWriteSpeed {
|
|||
scheduler.startup();
|
||||
|
||||
for (int i = 0; i < numFiles; i++) {
|
||||
if (options.has(mmapOpt)) {
|
||||
if (options.has(option.mmapOpt)) {
|
||||
writables[i] = new MmapWritable(new File(dir, "kafka-test-" + i + ".dat"), bytesToWrite / numFiles, buffer);
|
||||
} else if (options.has(channelOpt)) {
|
||||
} else if (options.has(option.channelOpt)) {
|
||||
writables[i] = new ChannelWritable(new File(dir, "kafka-test-" + i + ".dat"), buffer);
|
||||
} else if (options.has(logOpt)) {
|
||||
} else if (options.has(option.logOpt)) {
|
||||
int segmentSize = ThreadLocalRandom.current().nextInt(512) * 1024 * 1024 + 64 * 1024 * 1024;
|
||||
Properties logProperties = new Properties();
|
||||
logProperties.put(TopicConfig.SEGMENT_BYTES_CONFIG, Integer.toString(segmentSize));
|
||||
logProperties.put(TopicConfig.FLUSH_MESSAGES_INTERVAL_CONFIG, Long.toString(flushInterval));
|
||||
LogConfig logConfig = new LogConfig(logProperties);
|
||||
writables[i] = new LogWritable(new File(dir, "kafka-test-" + i), logConfig, scheduler, messageSet);
|
||||
writables[i] = new LogWritable(new File(dir, "kafka-test-" + i), logConfig, scheduler, messageSet, compression, recordsList);
|
||||
} else {
|
||||
System.err.println("Must specify what to write to with one of --log, --channel, or --mmap");
|
||||
Exit.exit(1);
|
||||
|
|
@ -298,9 +237,13 @@ public class TestLinearWriteSpeed {
|
|||
static class LogWritable implements Writable {
|
||||
MemoryRecords messages;
|
||||
UnifiedLog log;
|
||||
Compression compression;
|
||||
SimpleRecord[] records;
|
||||
|
||||
public LogWritable(File dir, LogConfig config, Scheduler scheduler, MemoryRecords messages) throws IOException {
|
||||
public LogWritable(File dir, LogConfig config, Scheduler scheduler, MemoryRecords messages, Compression compression, List<SimpleRecord> recordsList) throws IOException {
|
||||
this.messages = messages;
|
||||
this.compression = compression;
|
||||
this.records = recordsList.toArray(new SimpleRecord[0]);
|
||||
Utils.delete(dir);
|
||||
this.log = UnifiedLog.create(
|
||||
dir,
|
||||
|
|
@ -323,6 +266,7 @@ public class TestLinearWriteSpeed {
|
|||
}
|
||||
|
||||
public int write() {
|
||||
this.messages = MemoryRecords.withRecords(compression, records);
|
||||
log.appendAsLeader(
|
||||
messages,
|
||||
0,
|
||||
|
|
@ -338,4 +282,91 @@ public class TestLinearWriteSpeed {
|
|||
Utils.delete(log.dir());
|
||||
}
|
||||
}
|
||||
|
||||
private record Options(OptionSpec<String> dirOpt, OptionSpec<Long> bytesOpt, OptionSpec<Integer> sizeOpt,
|
||||
OptionSpec<Integer> messageSizeOpt, OptionSpec<Integer> filesOpt,
|
||||
OptionSpec<Long> reportingIntervalOpt, OptionSpec<Integer> maxThroughputOpt,
|
||||
OptionSpec<Long> flushIntervalOpt, OptionSpec<String> compressionCodecOpt,
|
||||
OptionSpec<Integer> compressionLevelOpt, OptionSpec<Void> channelOpt,
|
||||
OptionSpec<Void> logOpt, OptionSpec<Void> mmapOpt) {
|
||||
}
|
||||
|
||||
private static Options createOptions(OptionParser parser) {
|
||||
OptionSpec<String> dirOpt = parser.accepts("dir", "The directory to write to.")
|
||||
.withRequiredArg()
|
||||
.describedAs("path")
|
||||
.ofType(String.class)
|
||||
.defaultsTo(System.getProperty("java.io.tmpdir"));
|
||||
|
||||
OptionSpec<Long> bytesOpt = parser.accepts("bytes", "REQUIRED: The total number of bytes to write.")
|
||||
.withRequiredArg()
|
||||
.describedAs("num_bytes")
|
||||
.ofType(Long.class);
|
||||
|
||||
OptionSpec<Integer> sizeOpt = parser.accepts("size", "REQUIRED: The size of each write.")
|
||||
.withRequiredArg()
|
||||
.describedAs("num_bytes")
|
||||
.ofType(Integer.class);
|
||||
|
||||
OptionSpec<Integer> messageSizeOpt = parser.accepts("message-size", "The size of each message in the message set.")
|
||||
.withRequiredArg()
|
||||
.describedAs("num_bytes")
|
||||
.ofType(Integer.class)
|
||||
.defaultsTo(1024);
|
||||
|
||||
OptionSpec<Integer> filesOpt = parser.accepts("files", "The number of logs or files.")
|
||||
.withRequiredArg()
|
||||
.describedAs("num_files")
|
||||
.ofType(Integer.class)
|
||||
.defaultsTo(1);
|
||||
|
||||
OptionSpec<Long> reportingIntervalOpt = parser.accepts("reporting-interval", "The number of ms between updates.")
|
||||
.withRequiredArg()
|
||||
.describedAs("ms")
|
||||
.ofType(Long.class)
|
||||
.defaultsTo(1000L);
|
||||
|
||||
OptionSpec<Integer> maxThroughputOpt = parser.accepts("max-throughput-mb", "The maximum throughput.")
|
||||
.withRequiredArg()
|
||||
.describedAs("mb")
|
||||
.ofType(Integer.class)
|
||||
.defaultsTo(Integer.MAX_VALUE);
|
||||
|
||||
OptionSpec<Long> flushIntervalOpt = parser.accepts("flush-interval", "The number of messages between flushes")
|
||||
.withRequiredArg()
|
||||
.describedAs("message_count")
|
||||
.ofType(Long.class)
|
||||
.defaultsTo(Long.MAX_VALUE);
|
||||
|
||||
OptionSpec<String> compressionCodecOpt = parser.accepts("compression", "The compression codec to use")
|
||||
.withRequiredArg()
|
||||
.describedAs("codec")
|
||||
.ofType(String.class)
|
||||
.defaultsTo(CompressionType.NONE.name);
|
||||
|
||||
OptionSpec<Integer> compressionLevelOpt = parser.accepts("level", "The compression level to use")
|
||||
.withRequiredArg()
|
||||
.describedAs("level")
|
||||
.ofType(Integer.class);
|
||||
|
||||
OptionSpec<Void> channelOpt = parser.accepts("channel", "Do writes to file channels.");
|
||||
OptionSpec<Void> logOpt = parser.accepts("log", "Do writes to kafka logs.");
|
||||
OptionSpec<Void> mmapOpt = parser.accepts("mmap", "Do writes to mmap file.");
|
||||
|
||||
return new Options(
|
||||
dirOpt,
|
||||
bytesOpt,
|
||||
sizeOpt,
|
||||
messageSizeOpt,
|
||||
filesOpt,
|
||||
reportingIntervalOpt,
|
||||
maxThroughputOpt,
|
||||
flushIntervalOpt,
|
||||
compressionCodecOpt,
|
||||
compressionLevelOpt,
|
||||
channelOpt,
|
||||
logOpt,
|
||||
mmapOpt
|
||||
);
|
||||
}
|
||||
}
|
||||
Loading…
Reference in New Issue