mirror of https://github.com/apache/kafka.git
MINOR: call the serialize method including headers from the MockProducer (#11144)
Currently when using serializers like the Cloud Event Serializer, we need to do a work around so it doesn't throw an error. Using the method taking the headers would prevent this. Since the default implementation just calls the method without the headers, it's expected to be fully backwards compatible. Reviewers: Divij Vaidya <divijvaidya13@gmail.com>
This commit is contained in:
parent
ff785ac251
commit
b2a01b2754
|
@ -29,6 +29,7 @@ import org.apache.kafka.common.TopicPartition;
|
|||
import org.apache.kafka.common.Uuid;
|
||||
import org.apache.kafka.common.errors.ProducerFencedException;
|
||||
import org.apache.kafka.common.errors.TimeoutException;
|
||||
import org.apache.kafka.common.header.internals.RecordHeaders;
|
||||
import org.apache.kafka.common.metrics.KafkaMetric;
|
||||
import org.apache.kafka.common.record.RecordBatch;
|
||||
import org.apache.kafka.common.serialization.Serializer;
|
||||
|
@ -301,8 +302,8 @@ public class MockProducer<K, V> implements Producer<K, V> {
|
|||
partition = partition(record, this.cluster);
|
||||
else {
|
||||
//just to throw ClassCastException if serializers are not the proper ones to serialize key/value
|
||||
keySerializer.serialize(record.topic(), record.key());
|
||||
valueSerializer.serialize(record.topic(), record.value());
|
||||
keySerializer.serialize(record.topic(), new RecordHeaders(), record.key());
|
||||
valueSerializer.serialize(record.topic(), new RecordHeaders(), record.value());
|
||||
}
|
||||
|
||||
TopicPartition topicPartition = new TopicPartition(record.topic(), partition);
|
||||
|
|
Loading…
Reference in New Issue