mirror of https://github.com/apache/kafka.git
MINOR: Remove workarounds for lz4-java bug affecting byte buffers (#6679)
lz4/lz4-java#65 was included in lz4-java 1.4.0. Relying on existing tests for verification. Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>
This commit is contained in:
parent
0c62f5e664
commit
90043d5f7e
|
@ -77,11 +77,6 @@ public final class KafkaLZ4BlockInputStream extends InputStream {
|
||||||
this.bufferSupplier = bufferSupplier;
|
this.bufferSupplier = bufferSupplier;
|
||||||
readHeader();
|
readHeader();
|
||||||
decompressionBuffer = bufferSupplier.get(maxBlockSize);
|
decompressionBuffer = bufferSupplier.get(maxBlockSize);
|
||||||
if (!decompressionBuffer.hasArray() || decompressionBuffer.arrayOffset() != 0) {
|
|
||||||
// require array backed decompression buffer with zero offset
|
|
||||||
// to simplify workaround for https://github.com/lz4/lz4-java/pull/65
|
|
||||||
throw new RuntimeException("decompression buffer must have backing array with zero array offset");
|
|
||||||
}
|
|
||||||
finished = false;
|
finished = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -131,10 +126,7 @@ public final class KafkaLZ4BlockInputStream extends InputStream {
|
||||||
|
|
||||||
int len = in.position() - in.reset().position();
|
int len = in.position() - in.reset().position();
|
||||||
|
|
||||||
int hash = in.hasArray() ?
|
int hash = CHECKSUM.hash(in, in.position(), len, 0);
|
||||||
// workaround for https://github.com/lz4/lz4-java/pull/65
|
|
||||||
CHECKSUM.hash(in.array(), in.arrayOffset() + in.position(), len, 0) :
|
|
||||||
CHECKSUM.hash(in, in.position(), len, 0);
|
|
||||||
in.position(in.position() + len);
|
in.position(in.position() + len);
|
||||||
if (in.get() != (byte) ((hash >> 8) & 0xFF)) {
|
if (in.get() != (byte) ((hash >> 8) & 0xFF)) {
|
||||||
throw new IOException(DESCRIPTOR_HASH_MISMATCH);
|
throw new IOException(DESCRIPTOR_HASH_MISMATCH);
|
||||||
|
@ -172,22 +164,8 @@ public final class KafkaLZ4BlockInputStream extends InputStream {
|
||||||
|
|
||||||
if (compressed) {
|
if (compressed) {
|
||||||
try {
|
try {
|
||||||
// workaround for https://github.com/lz4/lz4-java/pull/65
|
final int bufferSize = DECOMPRESSOR.decompress(in, in.position(), blockSize, decompressionBuffer, 0,
|
||||||
final int bufferSize;
|
maxBlockSize);
|
||||||
if (in.hasArray()) {
|
|
||||||
bufferSize = DECOMPRESSOR.decompress(
|
|
||||||
in.array(),
|
|
||||||
in.position() + in.arrayOffset(),
|
|
||||||
blockSize,
|
|
||||||
decompressionBuffer.array(),
|
|
||||||
0,
|
|
||||||
maxBlockSize
|
|
||||||
);
|
|
||||||
} else {
|
|
||||||
// decompressionBuffer has zero arrayOffset, so we don't need to worry about
|
|
||||||
// https://github.com/lz4/lz4-java/pull/65
|
|
||||||
bufferSize = DECOMPRESSOR.decompress(in, in.position(), blockSize, decompressionBuffer, 0, maxBlockSize);
|
|
||||||
}
|
|
||||||
decompressionBuffer.position(0);
|
decompressionBuffer.position(0);
|
||||||
decompressionBuffer.limit(bufferSize);
|
decompressionBuffer.limit(bufferSize);
|
||||||
decompressedBuffer = decompressionBuffer;
|
decompressedBuffer = decompressionBuffer;
|
||||||
|
@ -201,10 +179,7 @@ public final class KafkaLZ4BlockInputStream extends InputStream {
|
||||||
|
|
||||||
// verify checksum
|
// verify checksum
|
||||||
if (flg.isBlockChecksumSet()) {
|
if (flg.isBlockChecksumSet()) {
|
||||||
// workaround for https://github.com/lz4/lz4-java/pull/65
|
int hash = CHECKSUM.hash(in, in.position(), blockSize, 0);
|
||||||
int hash = in.hasArray() ?
|
|
||||||
CHECKSUM.hash(in.array(), in.arrayOffset() + in.position(), blockSize, 0) :
|
|
||||||
CHECKSUM.hash(in, in.position(), blockSize, 0);
|
|
||||||
in.position(in.position() + blockSize);
|
in.position(in.position() + blockSize);
|
||||||
if (hash != in.getInt()) {
|
if (hash != in.getInt()) {
|
||||||
throw new IOException(BLOCK_HASH_MISMATCH);
|
throw new IOException(BLOCK_HASH_MISMATCH);
|
||||||
|
|
Loading…
Reference in New Issue