KAFKA-870 hadoop-producer KafkaRecordWriter writes entire input buffer capacity, even when intended payload is smaller; reviewed by Neha Narkhede

This commit is contained in:
David Stein 2013-04-22 19:21:28 -07:00 committed by Neha Narkhede
parent 4d7629dd7f
commit 68c8434f61
1 changed files with 5 additions and 1 deletions

View File

@ -18,6 +18,7 @@ package kafka.bridge.hadoop;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import kafka.javaapi.producer.Producer; import kafka.javaapi.producer.Producer;
@ -56,7 +57,10 @@ public class KafkaRecordWriter<W extends BytesWritable> extends RecordWriter<Nul
@Override @Override
public void write(NullWritable key, BytesWritable value) throws IOException, InterruptedException public void write(NullWritable key, BytesWritable value) throws IOException, InterruptedException
{ {
Message msg = new Message(value.getBytes()); // BytesWritable.getBytes returns its internal buffer, so .length would refer to its capacity, not the
// intended size of the byte array contained. We need to use BytesWritable.getLength for the true size.
Message msg = new Message(Arrays.copyOf(value.getBytes(), value.getLength()));
msgList.add(new ProducerData<Integer, Message>(this.topic, msg)); msgList.add(new ProducerData<Integer, Message>(this.topic, msg));
totalSize += msg.size(); totalSize += msg.size();