KAFKA-16844: Add ByteBuffer support for Connect ByteArrayConverter (#16101)

Reviewers: Chris Egerton <chrise@aiven.io>
This commit is contained in:
Fan Yang 2024-05-30 23:26:25 +08:00 committed by GitHub
parent 3327435c8d
commit 32b2b73f67
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 38 additions and 2 deletions

View File

@ -27,6 +27,7 @@ import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.ConverterConfig;
import org.apache.kafka.connect.storage.HeaderConverter;
import java.nio.ByteBuffer;
import java.util.Map;
/**
@ -59,10 +60,10 @@ public class ByteArrayConverter implements Converter, HeaderConverter, Versioned
if (schema != null && schema.type() != Schema.Type.BYTES)
throw new DataException("Invalid schema type for ByteArrayConverter: " + schema.type().toString());
if (value != null && !(value instanceof byte[]))
if (value != null && !(value instanceof byte[]) && !(value instanceof ByteBuffer))
throw new DataException("ByteArrayConverter is not compatible with objects of type " + value.getClass());
return (byte[]) value;
return value instanceof ByteBuffer ? getBytesFromByteBuffer((ByteBuffer) value) : (byte[]) value;
}
@Override
@ -84,4 +85,15 @@ public class ByteArrayConverter implements Converter, HeaderConverter, Versioned
public void close() {
// do nothing
}
private byte[] getBytesFromByteBuffer(ByteBuffer byteBuffer) {
if (byteBuffer == null) {
return null;
}
byteBuffer.rewind();
byte[] bytes = new byte[byteBuffer.remaining()];
byteBuffer.get(bytes);
return bytes;
}
}

View File

@ -24,6 +24,7 @@ import org.apache.kafka.connect.errors.DataException;
import org.junit.Before;
import org.junit.Test;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
@ -76,6 +77,29 @@ public class ByteArrayConverterTest {
assertNull(converter.fromConnectData(TOPIC, Schema.BYTES_SCHEMA, null));
}
@Test
public void testFromConnectByteBufferValue() {
ByteBuffer buffer = ByteBuffer.wrap(SAMPLE_BYTES);
assertArrayEquals(
SAMPLE_BYTES,
converter.fromConnectData(TOPIC, Schema.BYTES_SCHEMA, buffer));
buffer.rewind();
buffer.get(); // Move the position
assertArrayEquals(
SAMPLE_BYTES,
converter.fromConnectData(TOPIC, Schema.BYTES_SCHEMA, buffer));
buffer = null;
assertNull(converter.fromConnectData(TOPIC, Schema.BYTES_SCHEMA, buffer));
byte[] emptyBytes = new byte[0];
buffer = ByteBuffer.wrap(emptyBytes);
assertArrayEquals(
emptyBytes,
converter.fromConnectData(TOPIC, Schema.BYTES_SCHEMA, buffer));
}
@Test
public void testToConnect() {
SchemaAndValue data = converter.toConnectData(TOPIC, SAMPLE_BYTES);