KAFKA-1253 Compression in the new producer; reviewed by Jay Kreps and Jun Rao

This commit is contained in:
Guozhang Wang 2014-03-26 21:48:55 -07:00 committed by Neha Narkhede
parent 455c490f63
commit 466a83b78c
19 changed files with 366 additions and 177 deletions

View File

@ -151,7 +151,6 @@ project(':core') {
compile 'com.101tec:zkclient:0.3' compile 'com.101tec:zkclient:0.3'
compile 'com.yammer.metrics:metrics-core:2.2.0' compile 'com.yammer.metrics:metrics-core:2.2.0'
compile 'net.sf.jopt-simple:jopt-simple:3.2' compile 'net.sf.jopt-simple:jopt-simple:3.2'
compile 'org.xerial.snappy:snappy-java:1.0.5'
testCompile 'junit:junit:4.1' testCompile 'junit:junit:4.1'
testCompile 'org.easymock:easymock:3.0' testCompile 'org.easymock:easymock:3.0'
@ -317,6 +316,8 @@ project(':clients') {
dependencies { dependencies {
compile "org.slf4j:slf4j-api:1.7.6" compile "org.slf4j:slf4j-api:1.7.6"
compile 'org.xerial.snappy:snappy-java:1.0.5'
testCompile 'com.novocode:junit-interface:0.9' testCompile 'com.novocode:junit-interface:0.9'
testRuntime "$slf4jlog4j" testRuntime "$slf4jlog4j"
} }

View File

@ -69,6 +69,7 @@ public class KafkaProducer implements Producer {
private final Sender sender; private final Sender sender;
private final Metrics metrics; private final Metrics metrics;
private final Thread ioThread; private final Thread ioThread;
private final CompressionType compressionType;
/** /**
* A producer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings * A producer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings
@ -99,6 +100,7 @@ public class KafkaProducer implements Producer {
config.getLong(ProducerConfig.METADATA_EXPIRY_CONFIG)); config.getLong(ProducerConfig.METADATA_EXPIRY_CONFIG));
this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG); this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
this.totalMemorySize = config.getLong(ProducerConfig.TOTAL_BUFFER_MEMORY_CONFIG); this.totalMemorySize = config.getLong(ProducerConfig.TOTAL_BUFFER_MEMORY_CONFIG);
this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG));
this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.MAX_PARTITION_SIZE_CONFIG), this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.MAX_PARTITION_SIZE_CONFIG),
this.totalMemorySize, this.totalMemorySize,
config.getLong(ProducerConfig.LINGER_MS_CONFIG), config.getLong(ProducerConfig.LINGER_MS_CONFIG),
@ -224,7 +226,7 @@ public class KafkaProducer implements Producer {
ensureValidSize(record.key(), record.value()); ensureValidSize(record.key(), record.value());
TopicPartition tp = new TopicPartition(record.topic(), partition); TopicPartition tp = new TopicPartition(record.topic(), partition);
log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition); log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
FutureRecordMetadata future = accumulator.append(tp, record.key(), record.value(), CompressionType.NONE, callback); FutureRecordMetadata future = accumulator.append(tp, record.key(), record.value(), compressionType, callback);
this.sender.wakeup(); this.sender.wakeup();
return future; return future;
// For API exceptions return them in the future; // For API exceptions return them in the future;

View File

@ -135,6 +135,11 @@ public class ProducerConfig extends AbstractConfig {
*/ */
public static final String RETRY_BACKOFF_MS_CONFIG = "retry.backoff.ms"; public static final String RETRY_BACKOFF_MS_CONFIG = "retry.backoff.ms";
/**
* The compression type for all data generated. The default is none (i.e. no compression)
*/
public static final String COMPRESSION_TYPE_CONFIG = "compression.type";
/** /**
* Should we register the Kafka metrics as JMX mbeans? * Should we register the Kafka metrics as JMX mbeans?
*/ */
@ -158,9 +163,10 @@ public class ProducerConfig extends AbstractConfig {
.define(MAX_REQUEST_SIZE_CONFIG, Type.INT, 1 * 1024 * 1024, atLeast(0), "blah blah") .define(MAX_REQUEST_SIZE_CONFIG, Type.INT, 1 * 1024 * 1024, atLeast(0), "blah blah")
.define(RECONNECT_BACKOFF_MS_CONFIG, Type.LONG, 10L, atLeast(0L), "blah blah") .define(RECONNECT_BACKOFF_MS_CONFIG, Type.LONG, 10L, atLeast(0L), "blah blah")
.define(BLOCK_ON_BUFFER_FULL_CONFIG, Type.BOOLEAN, true, "blah blah") .define(BLOCK_ON_BUFFER_FULL_CONFIG, Type.BOOLEAN, true, "blah blah")
.define(ENABLE_JMX_CONFIG, Type.BOOLEAN, true, "")
.define(MAX_RETRIES_CONFIG, Type.INT, 0, between(0, Integer.MAX_VALUE), "") .define(MAX_RETRIES_CONFIG, Type.INT, 0, between(0, Integer.MAX_VALUE), "")
.define(RETRY_BACKOFF_MS_CONFIG, Type.LONG, 100L, atLeast(0L), "blah blah"); .define(RETRY_BACKOFF_MS_CONFIG, Type.LONG, 100L, atLeast(0L), "blah blah")
.define(COMPRESSION_TYPE_CONFIG, Type.STRING, "none", "blah blah")
.define(ENABLE_JMX_CONFIG, Type.BOOLEAN, true, "");
} }
ProducerConfig(Map<? extends Object, ? extends Object> props) { ProducerConfig(Map<? extends Object, ? extends Object> props) {

View File

@ -71,7 +71,7 @@ public final class BufferPool {
* @param size The buffer size to allocate in bytes * @param size The buffer size to allocate in bytes
* @return The buffer * @return The buffer
* @throws InterruptedException If the thread is interrupted while blocked * @throws InterruptedException If the thread is interrupted while blocked
* @throws IllegalArgument if size is larger than the total memory controlled by the pool (and hence we would block * @throws IllegalArgumentException if size is larger than the total memory controlled by the pool (and hence we would block
* forever) * forever)
* @throws BufferExhaustedException if the pool is in non-blocking mode and size exceeds the free memory in the pool * @throws BufferExhaustedException if the pool is in non-blocking mode and size exceeds the free memory in the pool
*/ */
@ -167,28 +167,31 @@ public final class BufferPool {
* Return buffers to the pool. If they are of the poolable size add them to the free list, otherwise just mark the * Return buffers to the pool. If they are of the poolable size add them to the free list, otherwise just mark the
* memory as free. * memory as free.
* *
* @param buffers The buffers to return * @param buffer The buffer to return
* @param size The size of the buffer to mark as deallocated, note that this maybe smaller than buffer.capacity
* since the buffer may re-allocate itself during in-place compression
*/ */
public void deallocate(ByteBuffer... buffers) { public void deallocate(ByteBuffer buffer, int size) {
lock.lock(); lock.lock();
try { try {
for (int i = 0; i < buffers.length; i++) { if (size == this.poolableSize && size == buffer.capacity()) {
int size = buffers[i].capacity(); buffer.clear();
if (size == this.poolableSize) { this.free.add(buffer);
buffers[i].clear(); } else {
this.free.add(buffers[i]); this.availableMemory += size;
} else {
this.availableMemory += size;
}
Condition moreMem = this.waiters.peekFirst();
if (moreMem != null)
moreMem.signal();
} }
Condition moreMem = this.waiters.peekFirst();
if (moreMem != null)
moreMem.signal();
} finally { } finally {
lock.unlock(); lock.unlock();
} }
} }
public void deallocate(ByteBuffer buffer) {
deallocate(buffer, buffer.capacity());
}
/** /**
* the total free memory both unallocated and in the free list * the total free memory both unallocated and in the free list
*/ */

View File

@ -28,8 +28,8 @@ import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.Records; import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.utils.CopyOnWriteMap; import org.apache.kafka.common.utils.CopyOnWriteMap;
import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils; import org.apache.kafka.common.utils.Utils;
@ -91,26 +91,26 @@ public final class RecordAccumulator {
private void registerMetrics(Metrics metrics) { private void registerMetrics(Metrics metrics) {
metrics.addMetric("blocked_threads", metrics.addMetric("blocked_threads",
"The number of user threads blocked waiting for buffer memory to enqueue their records", "The number of user threads blocked waiting for buffer memory to enqueue their records",
new Measurable() { new Measurable() {
public double measure(MetricConfig config, long now) { public double measure(MetricConfig config, long now) {
return free.queued(); return free.queued();
} }
}); });
metrics.addMetric("buffer_total_bytes", metrics.addMetric("buffer_total_bytes",
"The total amount of buffer memory that is available (not currently used for buffering records).", "The total amount of buffer memory that is available (not currently used for buffering records).",
new Measurable() { new Measurable() {
public double measure(MetricConfig config, long now) { public double measure(MetricConfig config, long now) {
return free.totalMemory(); return free.totalMemory();
} }
}); });
metrics.addMetric("buffer_available_bytes", metrics.addMetric("buffer_available_bytes",
"The total amount of buffer memory that is available (not currently used for buffering records).", "The total amount of buffer memory that is available (not currently used for buffering records).",
new Measurable() { new Measurable() {
public double measure(MetricConfig config, long now) { public double measure(MetricConfig config, long now) {
return free.availableMemory(); return free.availableMemory();
} }
}); });
} }
/** /**
@ -132,7 +132,7 @@ public final class RecordAccumulator {
synchronized (dq) { synchronized (dq) {
RecordBatch batch = dq.peekLast(); RecordBatch batch = dq.peekLast();
if (batch != null) { if (batch != null) {
FutureRecordMetadata future = batch.tryAppend(key, value, compression, callback); FutureRecordMetadata future = batch.tryAppend(key, value, callback);
if (future != null) if (future != null)
return future; return future;
} }
@ -145,7 +145,7 @@ public final class RecordAccumulator {
synchronized (dq) { synchronized (dq) {
RecordBatch last = dq.peekLast(); RecordBatch last = dq.peekLast();
if (last != null) { if (last != null) {
FutureRecordMetadata future = last.tryAppend(key, value, compression, callback); FutureRecordMetadata future = last.tryAppend(key, value, callback);
if (future != null) { if (future != null) {
// Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen
// often... // often...
@ -153,8 +153,10 @@ public final class RecordAccumulator {
return future; return future;
} }
} }
RecordBatch batch = new RecordBatch(tp, new MemoryRecords(buffer), time.milliseconds()); MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression);
FutureRecordMetadata future = Utils.notNull(batch.tryAppend(key, value, compression, callback)); RecordBatch batch = new RecordBatch(tp, records, time.milliseconds());
FutureRecordMetadata future = Utils.notNull(batch.tryAppend(key, value, callback));
dq.addLast(batch); dq.addLast(batch);
return future; return future;
} }
@ -193,7 +195,7 @@ public final class RecordAccumulator {
RecordBatch batch = deque.peekFirst(); RecordBatch batch = deque.peekFirst();
if (batch != null) { if (batch != null) {
boolean backingOff = batch.attempts > 0 && batch.lastAttempt + retryBackoffMs > now; boolean backingOff = batch.attempts > 0 && batch.lastAttempt + retryBackoffMs > now;
boolean full = deque.size() > 1 || !batch.records.buffer().hasRemaining(); boolean full = deque.size() > 1 || batch.records.isFull();
boolean expired = now - batch.created >= lingerMs; boolean expired = now - batch.created >= lingerMs;
boolean sendable = full || expired || exhausted || closed; boolean sendable = full || expired || exhausted || closed;
if (sendable && !backingOff) if (sendable && !backingOff)
@ -239,10 +241,15 @@ public final class RecordAccumulator {
Deque<RecordBatch> deque = dequeFor(tp); Deque<RecordBatch> deque = dequeFor(tp);
if (deque != null) { if (deque != null) {
synchronized (deque) { synchronized (deque) {
if (size + deque.peekFirst().records.sizeInBytes() > maxSize) { RecordBatch first = deque.peekFirst();
if (size + first.records.sizeInBytes() > maxSize && !ready.isEmpty()) {
// there is a rare case that a single batch size is larger than the request size due
// to compression; in this case we will still eventually send this batch in a single
// request
return ready; return ready;
} else { } else {
RecordBatch batch = deque.pollFirst(); RecordBatch batch = deque.pollFirst();
batch.records.close();
size += batch.records.sizeInBytes(); size += batch.records.sizeInBytes();
ready.add(batch); ready.add(batch);
} }
@ -269,7 +276,7 @@ public final class RecordAccumulator {
* Deallocate the record batch * Deallocate the record batch
*/ */
public void deallocate(RecordBatch batch) { public void deallocate(RecordBatch batch) {
free.deallocate(batch.records.buffer()); free.deallocate(batch.records.buffer(), batch.records.capacity());
} }
/** /**

View File

@ -17,7 +17,6 @@ import java.util.List;
import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.common.record.MemoryRecords;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -54,11 +53,11 @@ public final class RecordBatch {
* *
* @return The RecordSend corresponding to this record or null if there isn't sufficient room. * @return The RecordSend corresponding to this record or null if there isn't sufficient room.
*/ */
public FutureRecordMetadata tryAppend(byte[] key, byte[] value, CompressionType compression, Callback callback) { public FutureRecordMetadata tryAppend(byte[] key, byte[] value, Callback callback) {
if (!this.records.hasRoomFor(key, value)) { if (!this.records.hasRoomFor(key, value)) {
return null; return null;
} else { } else {
this.records.append(0L, key, value, compression); this.records.append(0L, key, value);
FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount); FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount);
if (callback != null) if (callback != null)
thunks.add(new Thunk(callback, future)); thunks.add(new Thunk(callback, future));
@ -71,7 +70,7 @@ public final class RecordBatch {
* Complete the request * Complete the request
* *
* @param baseOffset The base offset of the messages assigned by the server * @param baseOffset The base offset of the messages assigned by the server
* @param errorCode The error code or 0 if no error * @param exception The exception returned or null if no exception
*/ */
public void done(long baseOffset, RuntimeException exception) { public void done(long baseOffset, RuntimeException exception) {
this.produceFuture.done(topicPartition, baseOffset, exception); this.produceFuture.done(topicPartition, baseOffset, exception);

View File

@ -29,8 +29,8 @@ import org.apache.kafka.common.record.Records;
public class ProducerPerformance { public class ProducerPerformance {
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
if (args.length != 5) { if (args.length < 5) {
System.err.println("USAGE: java " + ProducerPerformance.class.getName() + " url topic_name num_records record_size acks"); System.err.println("USAGE: java " + ProducerPerformance.class.getName() + " url topic_name num_records record_size acks [compression_type]");
System.exit(1); System.exit(1);
} }
String url = args[0]; String url = args[0];
@ -45,6 +45,8 @@ public class ProducerPerformance {
props.setProperty(ProducerConfig.REQUEST_TIMEOUT_CONFIG, Integer.toString(Integer.MAX_VALUE)); props.setProperty(ProducerConfig.REQUEST_TIMEOUT_CONFIG, Integer.toString(Integer.MAX_VALUE));
props.setProperty(ProducerConfig.TOTAL_BUFFER_MEMORY_CONFIG, Integer.toString(256 * 1024 * 1024)); props.setProperty(ProducerConfig.TOTAL_BUFFER_MEMORY_CONFIG, Integer.toString(256 * 1024 * 1024));
props.setProperty(ProducerConfig.MAX_PARTITION_SIZE_CONFIG, Integer.toString(256 * 1024)); props.setProperty(ProducerConfig.MAX_PARTITION_SIZE_CONFIG, Integer.toString(256 * 1024));
if (args.length == 6)
props.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, args[5]);
KafkaProducer producer = new KafkaProducer(props); KafkaProducer producer = new KafkaProducer(props);
Callback callback = new Callback() { Callback callback = new Callback() {

View File

@ -20,14 +20,16 @@ package org.apache.kafka.common.record;
* The compression type to use * The compression type to use
*/ */
public enum CompressionType { public enum CompressionType {
NONE(0, "none"), GZIP(1, "gzip"), SNAPPY(2, "snappy"); NONE(0, "none", 1.0f), GZIP(1, "gzip", 0.5f), SNAPPY(2, "snappy", 0.5f);
public final int id; public final int id;
public final String name; public final String name;
public final float rate;
private CompressionType(int id, String name) { private CompressionType(int id, String name, float rate) {
this.id = id; this.id = id;
this.name = name; this.name = name;
this.rate = rate;
} }
public static CompressionType forId(int id) { public static CompressionType forId(int id) {
@ -53,4 +55,5 @@ public enum CompressionType {
else else
throw new IllegalArgumentException("Unknown compression name: " + name); throw new IllegalArgumentException("Unknown compression name: " + name);
} }
} }

View File

@ -16,53 +16,99 @@
*/ */
package org.apache.kafka.common.record; package org.apache.kafka.common.record;
import java.io.DataInputStream;
import java.io.EOFException;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.GatheringByteChannel; import java.nio.channels.GatheringByteChannel;
import java.util.Iterator; import java.util.Iterator;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.utils.AbstractIterator; import org.apache.kafka.common.utils.AbstractIterator;
/** /**
* A {@link Records} implementation backed by a ByteBuffer. * A {@link Records} implementation backed by a ByteBuffer.
*/ */
public class MemoryRecords implements Records { public class MemoryRecords implements Records {
private final ByteBuffer buffer; private final Compressor compressor;
private final int capacity;
private ByteBuffer buffer;
private boolean writable;
public MemoryRecords(int size) { // Construct a writable memory records
this(ByteBuffer.allocate(size)); private MemoryRecords(ByteBuffer buffer, CompressionType type, boolean writable) {
this.writable = writable;
this.capacity = buffer.capacity();
if (this.writable) {
this.buffer = null;
this.compressor = new Compressor(buffer, type);
} else {
this.buffer = buffer;
this.compressor = null;
}
} }
public MemoryRecords(ByteBuffer buffer) { public static MemoryRecords emptyRecords(ByteBuffer buffer, CompressionType type) {
this.buffer = buffer; return new MemoryRecords(buffer, type, true);
}
public static MemoryRecords iterableRecords(ByteBuffer buffer) {
return new MemoryRecords(buffer, CompressionType.NONE, false);
} }
/** /**
* Append the given record and offset to the buffer * Append the given record and offset to the buffer
*/ */
public void append(long offset, Record record) { public void append(long offset, Record record) {
buffer.putLong(offset); if (!writable)
buffer.putInt(record.size()); throw new IllegalStateException("Memory records is not writable");
buffer.put(record.buffer());
int size = record.size();
compressor.putLong(offset);
compressor.putInt(size);
compressor.put(record.buffer());
compressor.recordWritten(size + Records.LOG_OVERHEAD);
record.buffer().rewind(); record.buffer().rewind();
} }
/** /**
* Append a new record and offset to the buffer * Append a new record and offset to the buffer
*/ */
public void append(long offset, byte[] key, byte[] value, CompressionType type) { public void append(long offset, byte[] key, byte[] value) {
buffer.putLong(offset); if (!writable)
buffer.putInt(Record.recordSize(key, value)); throw new IllegalStateException("Memory records is not writable");
Record.write(this.buffer, key, value, type);
int size = Record.recordSize(key, value);
compressor.putLong(offset);
compressor.putInt(size);
compressor.putRecord(key, value);
compressor.recordWritten(size + Records.LOG_OVERHEAD);
} }
/** /**
* Check if we have room for a new record containing the given key/value pair * Check if we have room for a new record containing the given key/value pair
*
* Note that the return value is based on the estimate of the bytes written to the compressor,
* which may not be accurate if compression is really used. When this happens, the following
* append may cause dynamic buffer re-allocation in the underlying byte buffer stream.
*/ */
public boolean hasRoomFor(byte[] key, byte[] value) { public boolean hasRoomFor(byte[] key, byte[] value) {
return this.buffer.remaining() >= Records.LOG_OVERHEAD + Record.recordSize(key, value); return this.writable &&
this.capacity >= this.compressor.estimatedBytesWritten() + Records.LOG_OVERHEAD + Record.recordSize(key, value);
}
public boolean isFull() {
return !this.writable || this.capacity <= this.compressor.estimatedBytesWritten();
}
/**
* Close this batch for no more appends
*/
public void close() {
compressor.close();
writable = false;
buffer = compressor.buffer();
} }
/** Write the records in this set to the given channel */ /** Write the records in this set to the given channel */
@ -74,7 +120,14 @@ public class MemoryRecords implements Records {
* The size of this record set * The size of this record set
*/ */
public int sizeInBytes() { public int sizeInBytes() {
return this.buffer.position(); return compressor.buffer().position();
}
/**
* Return the capacity of the buffer
*/
public int capacity() {
return this.capacity;
} }
/** /**
@ -86,34 +139,79 @@ public class MemoryRecords implements Records {
@Override @Override
public Iterator<LogEntry> iterator() { public Iterator<LogEntry> iterator() {
return new RecordsIterator(this.buffer); ByteBuffer copy = (ByteBuffer) this.buffer.duplicate().flip();
return new RecordsIterator(copy, CompressionType.NONE, false);
} }
/* TODO: allow reuse of the buffer used for iteration */
public static class RecordsIterator extends AbstractIterator<LogEntry> { public static class RecordsIterator extends AbstractIterator<LogEntry> {
private final ByteBuffer buffer; private final ByteBuffer buffer;
private final DataInputStream stream;
private final CompressionType type;
private final boolean shallow;
private RecordsIterator innerIter;
public RecordsIterator(ByteBuffer buffer) { public RecordsIterator(ByteBuffer buffer, CompressionType type, boolean shallow) {
ByteBuffer copy = buffer.duplicate(); this.type = type;
copy.flip(); this.buffer = buffer;
this.buffer = copy; this.shallow = shallow;
stream = Compressor.wrapForInput(new ByteBufferInputStream(this.buffer), type);
} }
/*
* Read the next record from the buffer.
*
* Note that in the compressed message set, each message value size is set as the size
* of the un-compressed version of the message value, so when we do de-compression
* allocating an array of the specified size for reading compressed value data is sufficient.
*/
@Override @Override
protected LogEntry makeNext() { protected LogEntry makeNext() {
if (buffer.remaining() < Records.LOG_OVERHEAD) if (innerDone()) {
return allDone(); try {
long offset = buffer.getLong(); // read the offset
int size = buffer.getInt(); long offset = stream.readLong();
if (size < 0) // read record size
throw new IllegalStateException("Record with size " + size); int size = stream.readInt();
if (buffer.remaining() < size) if (size < 0)
return allDone(); throw new IllegalStateException("Record with size " + size);
ByteBuffer rec = buffer.slice(); // read the record, if compression is used we cannot depend on size
rec.limit(size); // and hence has to do extra copy
this.buffer.position(this.buffer.position() + size); ByteBuffer rec;
return new LogEntry(offset, new Record(rec)); if (type == CompressionType.NONE) {
rec = buffer.slice();
buffer.position(buffer.position() + size);
rec.limit(size);
} else {
byte[] recordBuffer = new byte[size];
stream.read(recordBuffer, 0, size);
rec = ByteBuffer.wrap(recordBuffer);
}
LogEntry entry = new LogEntry(offset, new Record(rec));
entry.record().ensureValid();
// decide whether to go shallow or deep iteration if it is compressed
CompressionType compression = entry.record().compressionType();
if (compression == CompressionType.NONE || shallow) {
return entry;
} else {
// init the inner iterator with the value payload of the message,
// which will de-compress the payload to a set of messages
ByteBuffer value = entry.record().value();
innerIter = new RecordsIterator(value, compression, true);
return innerIter.next();
}
} catch (EOFException e) {
return allDone();
} catch (IOException e) {
throw new KafkaException(e);
}
} else {
return innerIter.next();
}
}
private boolean innerDone() {
return (innerIter == null || !innerIter.hasNext());
} }
} }
} }

View File

@ -18,6 +18,7 @@ package org.apache.kafka.common.record;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import org.apache.kafka.common.utils.Crc32;
import org.apache.kafka.common.utils.Utils; import org.apache.kafka.common.utils.Utils;
@ -40,13 +41,15 @@ public final class Record {
public static final int KEY_OFFSET = KEY_SIZE_OFFSET + KEY_SIZE_LENGTH; public static final int KEY_OFFSET = KEY_SIZE_OFFSET + KEY_SIZE_LENGTH;
public static final int VALUE_SIZE_LENGTH = 4; public static final int VALUE_SIZE_LENGTH = 4;
/** The amount of overhead bytes in a record */ /**
public static final int RECORD_OVERHEAD = KEY_OFFSET + VALUE_SIZE_LENGTH; * The size for the record header
*/
public static final int HEADER_SIZE = CRC_LENGTH + MAGIC_LENGTH + ATTRIBUTE_LENGTH;
/** /**
* The minimum valid size for the record header * The amount of overhead bytes in a record
*/ */
public static final int MIN_HEADER_SIZE = CRC_LENGTH + MAGIC_LENGTH + ATTRIBUTE_LENGTH + KEY_SIZE_LENGTH + VALUE_SIZE_LENGTH; public static final int RECORD_OVERHEAD = HEADER_SIZE + KEY_SIZE_LENGTH + VALUE_SIZE_LENGTH;
/** /**
* The current "magic" value * The current "magic" value
@ -71,27 +74,29 @@ public final class Record {
} }
/** /**
* A constructor to create a LogRecord * A constructor to create a LogRecord. If the record's compression type is not none, then
* its value payload should be already compressed with the specified type; the constructor
* would always write the value payload as is and will not do the compression itself.
* *
* @param key The key of the record (null, if none) * @param key The key of the record (null, if none)
* @param value The record value * @param value The record value
* @param codec The compression codec used on the contents of the record (if any) * @param type The compression type used on the contents of the record (if any)
* @param valueOffset The offset into the payload array used to extract payload * @param valueOffset The offset into the payload array used to extract payload
* @param valueSize The size of the payload to use * @param valueSize The size of the payload to use
*/ */
public Record(byte[] key, byte[] value, CompressionType codec, int valueOffset, int valueSize) { public Record(byte[] key, byte[] value, CompressionType type, int valueOffset, int valueSize) {
this(ByteBuffer.allocate(recordSize(key == null ? 0 : key.length, value == null ? 0 : valueSize >= 0 ? valueSize this(ByteBuffer.allocate(recordSize(key == null ? 0 : key.length,
: value.length - valueOffset))); value == null ? 0 : valueSize >= 0 ? valueSize : value.length - valueOffset)));
write(this.buffer, key, value, codec, valueOffset, valueSize); write(this.buffer, key, value, type, valueOffset, valueSize);
this.buffer.rewind(); this.buffer.rewind();
} }
public Record(byte[] key, byte[] value, CompressionType codec) { public Record(byte[] key, byte[] value, CompressionType type) {
this(key, value, codec, 0, -1); this(key, value, type, 0, -1);
} }
public Record(byte[] value, CompressionType codec) { public Record(byte[] value, CompressionType type) {
this(null, value, codec); this(null, value, type);
} }
public Record(byte[] key, byte[] value) { public Record(byte[] key, byte[] value) {
@ -102,40 +107,37 @@ public final class Record {
this(null, value, CompressionType.NONE); this(null, value, CompressionType.NONE);
} }
public static void write(ByteBuffer buffer, byte[] key, byte[] value, CompressionType codec, int valueOffset, int valueSize) { // Write a record to the buffer, if the record's compression type is none, then
// skip crc, we will fill that in at the end // its value payload should be already compressed with the specified type
int pos = buffer.position(); public static void write(ByteBuffer buffer, byte[] key, byte[] value, CompressionType type, int valueOffset, int valueSize) {
buffer.position(pos + MAGIC_OFFSET); // construct the compressor with compression type none since this function will not do any
buffer.put(CURRENT_MAGIC_VALUE); //compression according to the input type, it will just write the record's payload as is
byte attributes = 0; Compressor compressor = new Compressor(buffer, CompressionType.NONE, buffer.capacity());
if (codec.id > 0) compressor.putRecord(key, value, type, valueOffset, valueSize);
attributes = (byte) (attributes | (COMPRESSION_CODEC_MASK & codec.id)); }
buffer.put(attributes);
public static void write(Compressor compressor, long crc, byte attributes, byte[] key, byte[] value, int valueOffset, int valueSize) {
// write crc
compressor.putInt((int) (crc & 0xffffffffL));
// write magic value
compressor.putByte(CURRENT_MAGIC_VALUE);
// write attributes
compressor.putByte(attributes);
// write the key // write the key
if (key == null) { if (key == null) {
buffer.putInt(-1); compressor.putInt(-1);
} else { } else {
buffer.putInt(key.length); compressor.putInt(key.length);
buffer.put(key, 0, key.length); compressor.put(key, 0, key.length);
} }
// write the value // write the value
if (value == null) { if (value == null) {
buffer.putInt(-1); compressor.putInt(-1);
} else { } else {
int size = valueSize >= 0 ? valueSize : (value.length - valueOffset); int size = valueSize >= 0 ? valueSize : (value.length - valueOffset);
buffer.putInt(size); compressor.putInt(size);
buffer.put(value, valueOffset, size); compressor.put(value, valueOffset, size);
} }
// now compute the checksum and fill it in
long crc = computeChecksum(buffer,
buffer.arrayOffset() + pos + MAGIC_OFFSET,
buffer.position() - pos - MAGIC_OFFSET - buffer.arrayOffset());
Utils.writeUnsignedInt(buffer, pos + CRC_OFFSET, crc);
}
public static void write(ByteBuffer buffer, byte[] key, byte[] value, CompressionType codec) {
write(buffer, key, value, codec, 0, -1);
} }
public static int recordSize(byte[] key, byte[] value) { public static int recordSize(byte[] key, byte[] value) {
@ -150,13 +152,51 @@ public final class Record {
return this.buffer; return this.buffer;
} }
public static byte computeAttributes(CompressionType type) {
byte attributes = 0;
if (type.id > 0)
attributes = (byte) (attributes | (COMPRESSION_CODEC_MASK & type.id));
return attributes;
}
/** /**
* Compute the checksum of the record from the record contents * Compute the checksum of the record from the record contents
*/ */
public static long computeChecksum(ByteBuffer buffer, int position, int size) { public static long computeChecksum(ByteBuffer buffer, int position, int size) {
return Utils.crc32(buffer.array(), buffer.arrayOffset() + position, size - buffer.arrayOffset()); Crc32 crc = new Crc32();
crc.update(buffer.array(), buffer.arrayOffset() + position, size);
return crc.getValue();
} }
/**
* Compute the checksum of the record from the attributes, key and value payloads
*/
public static long computeChecksum(byte[] key, byte[] value, CompressionType type, int valueOffset, int valueSize) {
Crc32 crc = new Crc32();
crc.update(CURRENT_MAGIC_VALUE);
byte attributes = 0;
if (type.id > 0)
attributes = (byte) (attributes | (COMPRESSION_CODEC_MASK & type.id));
crc.update(attributes);
// update for the key
if (key == null) {
crc.updateInt(-1);
} else {
crc.updateInt(key.length);
crc.update(key, 0, key.length);
}
// update for the value
if (value == null) {
crc.updateInt(-1);
} else {
int size = valueSize >= 0 ? valueSize : (value.length - valueOffset);
crc.updateInt(size);
crc.update(value, valueOffset, size);
}
return crc.getValue();
}
/** /**
* Compute the checksum of the record from the record contents * Compute the checksum of the record from the record contents
*/ */
@ -239,7 +279,7 @@ public final class Record {
} }
/** /**
* The compression codec used with this record * The compression type used with this record
*/ */
public CompressionType compressionType() { public CompressionType compressionType() {
return CompressionType.forId(buffer.get(ATTRIBUTES_OFFSET) & COMPRESSION_CODEC_MASK); return CompressionType.forId(buffer.get(ATTRIBUTES_OFFSET) & COMPRESSION_CODEC_MASK);

View File

@ -28,6 +28,30 @@ import java.util.zip.Checksum;
*/ */
public class Crc32 implements Checksum { public class Crc32 implements Checksum {
/**
* Compute the CRC32 of the byte array
*
* @param bytes The array to compute the checksum for
* @return The CRC32
*/
public static long crc32(byte[] bytes) {
return crc32(bytes, 0, bytes.length);
}
/**
* Compute the CRC32 of the segment of the byte array given by the specified size and offset
*
* @param bytes The bytes to checksum
* @param offset the offset at which to begin checksumming
* @param size the number of bytes to checksum
* @return The CRC32
*/
public static long crc32(byte[] bytes, int offset, int size) {
Crc32 crc = new Crc32();
crc.update(bytes, offset, size);
return crc.getValue();
}
/** the current CRC value, bit-flipped */ /** the current CRC value, bit-flipped */
private int crc; private int crc;
@ -97,6 +121,18 @@ public class Crc32 implements Checksum {
crc = (crc >>> 8) ^ T[T8_0_start + ((crc ^ b) & 0xff)]; crc = (crc >>> 8) ^ T[T8_0_start + ((crc ^ b) & 0xff)];
} }
/**
* Update the CRC32 given an integer
*/
final public void updateInt(int input) {
update((byte) (input >> 24));
update((byte) (input >> 16));
update((byte) (input >> 8));
update((byte) input /* >> 0 */);
}
/* /*
* CRC-32 lookup tables generated by the polynomial 0xEDB88320. See also TestPureJavaCrc32.Table. * CRC-32 lookup tables generated by the polynomial 0xEDB88320. See also TestPureJavaCrc32.Table.
*/ */

View File

@ -91,30 +91,6 @@ public class Utils {
buffer.putInt(index, (int) (value & 0xffffffffL)); buffer.putInt(index, (int) (value & 0xffffffffL));
} }
/**
* Compute the CRC32 of the byte array
*
* @param bytes The array to compute the checksum for
* @return The CRC32
*/
public static long crc32(byte[] bytes) {
return crc32(bytes, 0, bytes.length);
}
/**
* Compute the CRC32 of the segment of the byte array given by the specificed size and offset
*
* @param bytes The bytes to checksum
* @param offset the offset at which to begin checksumming
* @param size the number of bytes to checksum
* @return The CRC32
*/
public static long crc32(byte[] bytes, int offset, int size) {
Crc32 crc = new Crc32();
crc.update(bytes, offset, size);
return crc.getValue();
}
/** /**
* Get the absolute value of the given number. If the number is Int.MinValue return 0. This is different from * Get the absolute value of the given number. If the number is Int.MinValue return 0. This is different from
* java.lang.Math.abs or scala.math.abs in that they return Int.MinValue (!). * java.lang.Math.abs or scala.math.abs in that they return Int.MinValue (!).

View File

@ -22,29 +22,35 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Arrays; import java.util.*;
import java.util.Iterator;
import java.util.List;
import org.apache.kafka.common.record.LogEntry;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.Record;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@RunWith(value = Parameterized.class)
public class MemoryRecordsTest { public class MemoryRecordsTest {
private CompressionType compression;
public MemoryRecordsTest(CompressionType compression) {
this.compression = compression;
}
@Test @Test
public void testIterator() { public void testIterator() {
MemoryRecords recs1 = new MemoryRecords(ByteBuffer.allocate(1024)); MemoryRecords recs1 = MemoryRecords.emptyRecords(ByteBuffer.allocate(1024), compression);
MemoryRecords recs2 = new MemoryRecords(ByteBuffer.allocate(1024)); MemoryRecords recs2 = MemoryRecords.emptyRecords(ByteBuffer.allocate(1024), compression);
List<Record> list = Arrays.asList(new Record("a".getBytes(), "1".getBytes()), List<Record> list = Arrays.asList(new Record("a".getBytes(), "1".getBytes()),
new Record("b".getBytes(), "2".getBytes()), new Record("b".getBytes(), "2".getBytes()),
new Record("c".getBytes(), "3".getBytes())); new Record("c".getBytes(), "3".getBytes()));
for (int i = 0; i < list.size(); i++) { for (int i = 0; i < list.size(); i++) {
Record r = list.get(i); Record r = list.get(i);
recs1.append(i, r); recs1.append(i, r);
recs2.append(i, toArray(r.key()), toArray(r.value()), r.compressionType()); recs2.append(i, toArray(r.key()), toArray(r.value()));
} }
recs1.close();
recs2.close();
for (int iteration = 0; iteration < 2; iteration++) { for (int iteration = 0; iteration < 2; iteration++) {
for (MemoryRecords recs : Arrays.asList(recs1, recs2)) { for (MemoryRecords recs : Arrays.asList(recs1, recs2)) {
@ -54,10 +60,18 @@ public class MemoryRecordsTest {
LogEntry entry = iter.next(); LogEntry entry = iter.next();
assertEquals((long) i, entry.offset()); assertEquals((long) i, entry.offset());
assertEquals(list.get(i), entry.record()); assertEquals(list.get(i), entry.record());
entry.record().ensureValid();
} }
assertFalse(iter.hasNext()); assertFalse(iter.hasNext());
} }
} }
} }
@Parameterized.Parameters
public static Collection<Object[]> data() {
List<Object[]> values = new ArrayList<Object[]>();
for (CompressionType type: CompressionType.values())
values.add(new Object[] { type });
return values;
}
} }

View File

@ -27,9 +27,6 @@ import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.InvalidRecordException;
import org.apache.kafka.common.record.Record;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.junit.runners.Parameterized; import org.junit.runners.Parameterized;
@ -66,6 +63,10 @@ public class RecordTest {
@Test @Test
public void testChecksum() { public void testChecksum() {
assertEquals(record.checksum(), record.computeChecksum()); assertEquals(record.checksum(), record.computeChecksum());
assertEquals(record.checksum(), record.computeChecksum(
this.key == null ? null : this.key.array(),
this.value == null ? null : this.value.array(),
this.compression, 0, -1));
assertTrue(record.isValid()); assertTrue(record.isValid());
for (int i = Record.CRC_OFFSET + Record.CRC_LENGTH; i < record.size(); i++) { for (int i = Record.CRC_OFFSET + Record.CRC_LENGTH; i < record.size(); i++) {
Record copy = copyOf(record); Record copy = copyOf(record);
@ -95,9 +96,11 @@ public class RecordTest {
@Parameters @Parameters
public static Collection<Object[]> data() { public static Collection<Object[]> data() {
byte[] payload = new byte[1000];
Arrays.fill(payload, (byte) 1);
List<Object[]> values = new ArrayList<Object[]>(); List<Object[]> values = new ArrayList<Object[]>();
for (byte[] key : Arrays.asList(null, "".getBytes(), "key".getBytes())) for (byte[] key : Arrays.asList(null, "".getBytes(), "key".getBytes(), payload))
for (byte[] value : Arrays.asList(null, "".getBytes(), "value".getBytes())) for (byte[] value : Arrays.asList(null, "".getBytes(), "value".getBytes(), payload))
for (CompressionType compression : CompressionType.values()) for (CompressionType compression : CompressionType.values())
values.add(new Object[] { key, value, compression }); values.add(new Object[] { key, value, compression });
return values; return values;

View File

@ -88,7 +88,7 @@ public class TestUtils {
/** /**
* Generate an array of random bytes * Generate an array of random bytes
* *
* @param numBytes The size of the array * @param size The size of the array
*/ */
public static byte[] randomBytes(int size) { public static byte[] randomBytes(int size) {
byte[] bytes = new byte[size]; byte[] bytes = new byte[size];

View File

@ -255,8 +255,8 @@ object ConsoleProducer {
class NewShinyProducer(producerConfig: ProducerConfig) extends Producer { class NewShinyProducer(producerConfig: ProducerConfig) extends Producer {
val props = new Properties() val props = new Properties()
props.put("metadata.broker.list", producerConfig.brokerList) props.put("metadata.broker.list", producerConfig.brokerList)
val codec = if(producerConfig.compress) DefaultCompressionCodec.codec else NoCompressionCodec.codec val compression = if(producerConfig.compress) DefaultCompressionCodec.name else NoCompressionCodec.name
props.put("compression.codec", codec.toString) props.put("compression.type", compression)
props.put("send.buffer.bytes", producerConfig.socketBuffer.toString) props.put("send.buffer.bytes", producerConfig.socketBuffer.toString)
props.put("metadata.fetch.backoff.ms", producerConfig.retryBackoffMs.toString) props.put("metadata.fetch.backoff.ms", producerConfig.retryBackoffMs.toString)
props.put("metadata.expiry.ms", producerConfig.metadataExpiryMs.toString) props.put("metadata.expiry.ms", producerConfig.metadataExpiryMs.toString)

View File

@ -319,7 +319,6 @@ class ProducerFailureHandlingTest extends JUnit3Suite with ZooKeeperTestHarness
producerProps.put(ProducerConfig.TOTAL_BUFFER_MEMORY_CONFIG, bufferSize.toString) producerProps.put(ProducerConfig.TOTAL_BUFFER_MEMORY_CONFIG, bufferSize.toString)
producerProps.put(ProducerConfig.MAX_RETRIES_CONFIG, 10.toString) producerProps.put(ProducerConfig.MAX_RETRIES_CONFIG, 10.toString)
producerProps.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 1000.toString) producerProps.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 1000.toString)
val producer = new KafkaProducer(producerProps) val producer = new KafkaProducer(producerProps)
override def doWork(): Unit = { override def doWork(): Unit = {
@ -335,5 +334,10 @@ class ProducerFailureHandlingTest extends JUnit3Suite with ZooKeeperTestHarness
case e : Exception => failed = true case e : Exception => failed = true
} }
} }
override def shutdown(){
super.shutdown()
producer.close
}
} }
} }

View File

@ -17,6 +17,14 @@
package kafka.api.test package kafka.api.test
import java.util.Properties
import java.lang.{Integer, IllegalArgumentException}
import org.apache.kafka.clients.producer._
import org.scalatest.junit.JUnit3Suite
import org.junit.Test
import org.junit.Assert._
import kafka.server.{KafkaConfig, KafkaServer} import kafka.server.{KafkaConfig, KafkaServer}
import kafka.utils.{Utils, TestUtils} import kafka.utils.{Utils, TestUtils}
import kafka.zk.ZooKeeperTestHarness import kafka.zk.ZooKeeperTestHarness
@ -24,15 +32,6 @@ import kafka.consumer.SimpleConsumer
import kafka.api.FetchRequestBuilder import kafka.api.FetchRequestBuilder
import kafka.message.Message import kafka.message.Message
import org.apache.kafka.clients.producer._
import org.scalatest.junit.JUnit3Suite
import org.junit.Test
import org.junit.Assert._
import java.util.Properties
import java.lang.{Integer, IllegalArgumentException}
class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness { class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness {
private val brokerId1 = 0 private val brokerId1 = 0
@ -76,15 +75,10 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness {
super.tearDown() super.tearDown()
} }
class PrintOffsetCallback extends Callback { class CheckErrorCallback extends Callback {
def onCompletion(metadata: RecordMetadata, exception: Exception) { def onCompletion(metadata: RecordMetadata, exception: Exception) {
if (exception != null) if (exception != null)
fail("Send callback returns the following exception", exception) fail("Send callback returns the following exception", exception)
try {
System.out.println("The message we just sent is marked as [" + metadata.partition + "] : " + metadata.offset);
} catch {
case e: Throwable => fail("Should succeed sending the message", e)
}
} }
} }
@ -100,7 +94,7 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness {
props.put(ProducerConfig.BROKER_LIST_CONFIG, TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2))) props.put(ProducerConfig.BROKER_LIST_CONFIG, TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2)))
var producer = new KafkaProducer(props) var producer = new KafkaProducer(props)
val callback = new PrintOffsetCallback val callback = new CheckErrorCallback
try { try {
// create topic // create topic

View File

@ -212,6 +212,7 @@ object ProducerPerformance extends Logging {
props.put("request.timeout.ms", config.producerRequestTimeoutMs.toString) props.put("request.timeout.ms", config.producerRequestTimeoutMs.toString)
props.put("request.retries", config.producerNumRetries.toString) props.put("request.retries", config.producerNumRetries.toString)
props.put("retry.backoff.ms", config.producerRetryBackoffMs.toString) props.put("retry.backoff.ms", config.producerRetryBackoffMs.toString)
props.put("compression.type", config.compressionCodec.name)
val producer = new KafkaProducer(props) val producer = new KafkaProducer(props)
def send(topic: String, partition: Long, bytes: Array[Byte]) { def send(topic: String, partition: Long, bytes: Array[Byte]) {