mirror of https://github.com/apache/kafka.git
KAFKA-18183 replace BytesSerializer with ByteArraySerializer for producer/consumer (#18113)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
parent
6af233e4fc
commit
497f500483
|
@ -38,8 +38,8 @@ import org.apache.kafka.common.TopicPartition;
|
|||
import org.apache.kafka.common.acl.AccessControlEntry;
|
||||
import org.apache.kafka.common.acl.AclBindingFilter;
|
||||
import org.apache.kafka.common.network.ListenerName;
|
||||
import org.apache.kafka.common.serialization.BytesDeserializer;
|
||||
import org.apache.kafka.common.serialization.BytesSerializer;
|
||||
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
|
||||
import org.apache.kafka.common.serialization.ByteArraySerializer;
|
||||
import org.apache.kafka.common.test.TestUtils;
|
||||
import org.apache.kafka.server.authorizer.Authorizer;
|
||||
import org.apache.kafka.server.fault.FaultHandlerException;
|
||||
|
@ -159,8 +159,8 @@ public interface ClusterInstance {
|
|||
|
||||
default <K, V> Producer<K, V> producer(Map<String, Object> configs) {
|
||||
Map<String, Object> props = new HashMap<>(configs);
|
||||
props.putIfAbsent(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, BytesSerializer.class.getName());
|
||||
props.putIfAbsent(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, BytesSerializer.class.getName());
|
||||
props.putIfAbsent(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
|
||||
props.putIfAbsent(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
|
||||
props.putIfAbsent(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers());
|
||||
return new KafkaProducer<>(props);
|
||||
}
|
||||
|
@ -171,8 +171,8 @@ public interface ClusterInstance {
|
|||
|
||||
default <K, V> Consumer<K, V> consumer(Map<String, Object> configs) {
|
||||
Map<String, Object> props = new HashMap<>(configs);
|
||||
props.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class.getName());
|
||||
props.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class.getName());
|
||||
props.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
|
||||
props.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
|
||||
props.putIfAbsent(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
|
||||
props.putIfAbsent(ConsumerConfig.GROUP_ID_CONFIG, "group_" + TestUtils.randomString(5));
|
||||
props.putIfAbsent(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers());
|
||||
|
|
|
@ -32,13 +32,13 @@ import org.apache.kafka.common.config.ConfigResource;
|
|||
import org.apache.kafka.common.serialization.StringDeserializer;
|
||||
import org.apache.kafka.common.serialization.StringSerializer;
|
||||
import org.apache.kafka.common.test.TestUtils;
|
||||
import org.apache.kafka.common.utils.Bytes;
|
||||
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
|
||||
import org.apache.kafka.server.common.MetadataVersion;
|
||||
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.time.Duration;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
|
@ -61,6 +61,7 @@ import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CL
|
|||
import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
|
||||
import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG;
|
||||
import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG;
|
||||
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||
|
||||
|
@ -301,11 +302,11 @@ public class ClusterTestExtensionsTest {
|
|||
})
|
||||
public void testCreateDefaultProducerAndConsumer(ClusterInstance cluster) throws InterruptedException {
|
||||
String topic = "topic";
|
||||
Bytes key = Bytes.wrap("key".getBytes());
|
||||
Bytes value = Bytes.wrap("value".getBytes());
|
||||
byte[] key = "key".getBytes(StandardCharsets.UTF_8);
|
||||
byte[] value = "value".getBytes(StandardCharsets.UTF_8);
|
||||
try (Admin adminClient = cluster.admin();
|
||||
Producer<Bytes, Bytes> producer = cluster.producer();
|
||||
Consumer<Bytes, Bytes> consumer = cluster.consumer()
|
||||
Producer<byte[], byte[]> producer = cluster.producer();
|
||||
Consumer<byte[], byte[]> consumer = cluster.consumer()
|
||||
) {
|
||||
adminClient.createTopics(singleton(new NewTopic(topic, 1, (short) 1)));
|
||||
assertNotNull(producer);
|
||||
|
@ -313,13 +314,13 @@ public class ClusterTestExtensionsTest {
|
|||
producer.send(new ProducerRecord<>(topic, key, value));
|
||||
producer.flush();
|
||||
consumer.subscribe(singletonList(topic));
|
||||
List<ConsumerRecord<Bytes, Bytes>> records = new ArrayList<>();
|
||||
List<ConsumerRecord<byte[], byte[]>> records = new ArrayList<>();
|
||||
TestUtils.waitForCondition(() -> {
|
||||
consumer.poll(Duration.ofMillis(100)).forEach(records::add);
|
||||
return records.size() == 1;
|
||||
}, "Failed to receive message");
|
||||
assertEquals(key, records.get(0).key());
|
||||
assertEquals(value, records.get(0).value());
|
||||
assertArrayEquals(key, records.get(0).key());
|
||||
assertArrayEquals(value, records.get(0).value());
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue