MINOR: Improve exception messages in FileChannelRecordBatch (#6068)

Replace `channel` by `fileRecords` in potentially thrown KafkaException
descriptions when loading/writing `FileChannelRecordBatch`. This makes exception
messages more readable (channel only shows an object hashcode, fileRecords shows
the path of the file being read and start/end positions in the file).

Reviewers: Jason Gustafson <jason@confluent.io>
This commit is contained in:
Flavien Raynaud 2018-12-28 21:58:05 +00:00 committed by Jason Gustafson
parent 964e2c57c2
commit 9295444d48
3 changed files with 19 additions and 15 deletions

View File

@ -29,7 +29,6 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.ArrayDeque;
import java.util.Iterator;
import java.util.NoSuchElementException;
@ -530,10 +529,10 @@ public abstract class AbstractLegacyRecordBatch extends AbstractRecordBatch impl
LegacyFileChannelRecordBatch(long offset,
byte magic,
FileChannel channel,
FileRecords fileRecords,
int position,
int batchSize) {
super(offset, magic, channel, position, batchSize);
super(offset, magic, fileRecords, position, batchSize);
}
@Override

View File

@ -28,7 +28,6 @@ import java.io.EOFException;
import java.io.IOException;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
@ -586,10 +585,10 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe
DefaultFileChannelRecordBatch(long offset,
byte magic,
FileChannel channel,
FileRecords fileRecords,
int position,
int batchSize) {
super(offset, magic, channel, position, batchSize);
super(offset, magic, fileRecords, position, batchSize);
}
@Override

View File

@ -82,9 +82,9 @@ public class FileLogInputStream implements LogInputStream<FileLogInputStream.Fil
final FileChannelRecordBatch batch;
if (magic < RecordBatch.MAGIC_VALUE_V2)
batch = new LegacyFileChannelRecordBatch(offset, magic, channel, position, size);
batch = new LegacyFileChannelRecordBatch(offset, magic, fileRecords, position, size);
else
batch = new DefaultFileChannelRecordBatch(offset, magic, channel, position, size);
batch = new DefaultFileChannelRecordBatch(offset, magic, fileRecords, position, size);
position += batch.sizeInBytes();
return batch;
@ -98,7 +98,7 @@ public class FileLogInputStream implements LogInputStream<FileLogInputStream.Fil
public abstract static class FileChannelRecordBatch extends AbstractRecordBatch {
protected final long offset;
protected final byte magic;
protected final FileChannel channel;
protected final FileRecords fileRecords;
protected final int position;
protected final int batchSize;
@ -107,12 +107,12 @@ public class FileLogInputStream implements LogInputStream<FileLogInputStream.Fil
FileChannelRecordBatch(long offset,
byte magic,
FileChannel channel,
FileRecords fileRecords,
int position,
int batchSize) {
this.offset = offset;
this.magic = magic;
this.channel = channel;
this.fileRecords = fileRecords;
this.position = position;
this.batchSize = batchSize;
}
@ -173,14 +173,14 @@ public class FileLogInputStream implements LogInputStream<FileLogInputStream.Fil
@Override
public void writeTo(ByteBuffer buffer) {
FileChannel channel = fileRecords.channel();
try {
int limit = buffer.limit();
buffer.limit(buffer.position() + sizeInBytes());
Utils.readFully(channel, buffer, position);
buffer.limit(limit);
} catch (IOException e) {
throw new KafkaException("Failed to read record batch at position " + position + " from file channel " +
channel, e);
throw new KafkaException("Failed to read record batch at position " + position + " from " + fileRecords, e);
}
}
@ -207,13 +207,14 @@ public class FileLogInputStream implements LogInputStream<FileLogInputStream.Fil
}
private RecordBatch loadBatchWithSize(int size, String description) {
FileChannel channel = fileRecords.channel();
try {
ByteBuffer buffer = ByteBuffer.allocate(size);
Utils.readFullyOrFail(channel, buffer, position, description);
buffer.rewind();
return toMemoryRecordBatch(buffer);
} catch (IOException e) {
throw new KafkaException(e);
throw new KafkaException("Failed to load record batch at position " + position + " from " + fileRecords, e);
}
}
@ -226,14 +227,19 @@ public class FileLogInputStream implements LogInputStream<FileLogInputStream.Fil
FileChannelRecordBatch that = (FileChannelRecordBatch) o;
FileChannel channel = fileRecords == null ? null : fileRecords.channel();
FileChannel thatChannel = that.fileRecords == null ? null : that.fileRecords.channel();
return offset == that.offset &&
position == that.position &&
batchSize == that.batchSize &&
(channel == null ? that.channel == null : channel.equals(that.channel));
(channel == null ? thatChannel == null : channel.equals(thatChannel));
}
@Override
public int hashCode() {
FileChannel channel = fileRecords == null ? null : fileRecords.channel();
int result = (int) (offset ^ (offset >>> 32));
result = 31 * result + (channel != null ? channel.hashCode() : 0);
result = 31 * result + position;