KAFKA-4073; MirrorMaker should handle messages without timestamp correctly

Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Jun Rao <junrao@gmail.com>

Closes #1773 from ijuma/kafka-4073-mirror-maker-timestamps

(cherry picked from commit a1e0b2240d)
Signed-off-by: Jun Rao <junrao@gmail.com>
This commit is contained in:
Ismael Juma 2016-08-22 21:49:40 -07:00 committed by Jun Rao
parent 8bf9addd25
commit 4e4e2fb508
2 changed files with 20 additions and 2 deletions

View File

@ -38,6 +38,7 @@ import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.ByteArrayDeserializer
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.common.errors.WakeupException
import org.apache.kafka.common.record.Record
import scala.collection.JavaConversions._
import scala.collection.mutable.HashMap
@ -675,7 +676,8 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
private[tools] object defaultMirrorMakerMessageHandler extends MirrorMakerMessageHandler {
override def handle(record: BaseConsumerRecord): util.List[ProducerRecord[Array[Byte], Array[Byte]]] = {
Collections.singletonList(new ProducerRecord[Array[Byte], Array[Byte]](record.topic, null, record.timestamp, record.key, record.value))
val timestamp: java.lang.Long = if (record.timestamp == Record.NO_TIMESTAMP) null else record.timestamp
Collections.singletonList(new ProducerRecord[Array[Byte], Array[Byte]](record.topic, null, timestamp, record.key, record.value))
}
}

View File

@ -18,7 +18,7 @@
package kafka.tools
import kafka.consumer.BaseConsumerRecord
import org.apache.kafka.common.record.TimestampType
import org.apache.kafka.common.record.{Record, TimestampType}
import org.junit.Assert._
import org.junit.Test
@ -39,4 +39,20 @@ class MirrorMakerTest {
assertEquals("key", new String(producerRecord.key))
assertEquals("value", new String(producerRecord.value))
}
@Test
def testDefaultMirrorMakerMessageHandlerWithNoTimestampInSourceMessage() {
val consumerRecord = BaseConsumerRecord("topic", 0, 1L, Record.NO_TIMESTAMP, TimestampType.CREATE_TIME, "key".getBytes, "value".getBytes)
val result = MirrorMaker.defaultMirrorMakerMessageHandler.handle(consumerRecord)
assertEquals(1, result.size)
val producerRecord = result.get(0)
assertNull(producerRecord.timestamp)
assertEquals("topic", producerRecord.topic)
assertNull(producerRecord.partition)
assertEquals("key", new String(producerRecord.key))
assertEquals("value", new String(producerRecord.value))
}
}