From 497f50048305733606769a688b940cadce98521f Mon Sep 17 00:00:00 2001 From: Logan Zhu Date: Sat, 14 Dec 2024 01:42:32 +0800 Subject: [PATCH] KAFKA-18183 replace BytesSerializer with ByteArraySerializer for producer/consumer (#18113) Reviewers: Chia-Ping Tsai --- .../kafka/common/test/api/ClusterInstance.java | 12 ++++++------ .../test/api/ClusterTestExtensionsTest.java | 17 +++++++++-------- 2 files changed, 15 insertions(+), 14 deletions(-) diff --git a/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterInstance.java b/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterInstance.java index b64e6d1a000..b6954e44923 100644 --- a/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterInstance.java +++ b/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterInstance.java @@ -38,8 +38,8 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.acl.AccessControlEntry; import org.apache.kafka.common.acl.AclBindingFilter; import org.apache.kafka.common.network.ListenerName; -import org.apache.kafka.common.serialization.BytesDeserializer; -import org.apache.kafka.common.serialization.BytesSerializer; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.test.TestUtils; import org.apache.kafka.server.authorizer.Authorizer; import org.apache.kafka.server.fault.FaultHandlerException; @@ -159,8 +159,8 @@ public interface ClusterInstance { default Producer producer(Map configs) { Map props = new HashMap<>(configs); - props.putIfAbsent(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, BytesSerializer.class.getName()); - props.putIfAbsent(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, BytesSerializer.class.getName()); + props.putIfAbsent(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); + props.putIfAbsent(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); props.putIfAbsent(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers()); return new KafkaProducer<>(props); } @@ -171,8 +171,8 @@ public interface ClusterInstance { default Consumer consumer(Map configs) { Map props = new HashMap<>(configs); - props.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class.getName()); - props.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class.getName()); + props.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); + props.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); props.putIfAbsent(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.putIfAbsent(ConsumerConfig.GROUP_ID_CONFIG, "group_" + TestUtils.randomString(5)); props.putIfAbsent(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers()); diff --git a/test-common/test-common-api/src/test/java/org/apache/kafka/common/test/api/ClusterTestExtensionsTest.java b/test-common/test-common-api/src/test/java/org/apache/kafka/common/test/api/ClusterTestExtensionsTest.java index af7a39a92be..2a08d4e58eb 100644 --- a/test-common/test-common-api/src/test/java/org/apache/kafka/common/test/api/ClusterTestExtensionsTest.java +++ b/test-common/test-common-api/src/test/java/org/apache/kafka/common/test/api/ClusterTestExtensionsTest.java @@ -32,13 +32,13 @@ import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.test.TestUtils; -import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.coordinator.group.GroupCoordinatorConfig; import org.apache.kafka.server.common.MetadataVersion; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.extension.ExtendWith; +import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; @@ -61,6 +61,7 @@ import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CL import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG; import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG; import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -301,11 +302,11 @@ public class ClusterTestExtensionsTest { }) public void testCreateDefaultProducerAndConsumer(ClusterInstance cluster) throws InterruptedException { String topic = "topic"; - Bytes key = Bytes.wrap("key".getBytes()); - Bytes value = Bytes.wrap("value".getBytes()); + byte[] key = "key".getBytes(StandardCharsets.UTF_8); + byte[] value = "value".getBytes(StandardCharsets.UTF_8); try (Admin adminClient = cluster.admin(); - Producer producer = cluster.producer(); - Consumer consumer = cluster.consumer() + Producer producer = cluster.producer(); + Consumer consumer = cluster.consumer() ) { adminClient.createTopics(singleton(new NewTopic(topic, 1, (short) 1))); assertNotNull(producer); @@ -313,13 +314,13 @@ public class ClusterTestExtensionsTest { producer.send(new ProducerRecord<>(topic, key, value)); producer.flush(); consumer.subscribe(singletonList(topic)); - List> records = new ArrayList<>(); + List> records = new ArrayList<>(); TestUtils.waitForCondition(() -> { consumer.poll(Duration.ofMillis(100)).forEach(records::add); return records.size() == 1; }, "Failed to receive message"); - assertEquals(key, records.get(0).key()); - assertEquals(value, records.get(0).value()); + assertArrayEquals(key, records.get(0).key()); + assertArrayEquals(value, records.get(0).value()); } }