diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java index 56417160856..a4aac86df09 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java @@ -29,6 +29,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.errors.ProducerFencedException; import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.metrics.KafkaMetric; import org.apache.kafka.common.record.RecordBatch; import org.apache.kafka.common.serialization.Serializer; @@ -301,8 +302,8 @@ public class MockProducer implements Producer { partition = partition(record, this.cluster); else { //just to throw ClassCastException if serializers are not the proper ones to serialize key/value - keySerializer.serialize(record.topic(), record.key()); - valueSerializer.serialize(record.topic(), record.value()); + keySerializer.serialize(record.topic(), new RecordHeaders(), record.key()); + valueSerializer.serialize(record.topic(), new RecordHeaders(), record.value()); } TopicPartition topicPartition = new TopicPartition(record.topic(), partition);