KAFKA-5610; WriteTxnMarker handler should return UNKNOWN_TOPIC_OR_PARTITION if replica is not available

Before this patch, we would instead return the non-retriable `UNSUPPORTED_FOR_MESSAGE_FORMAT` causing markers to be lost.

Author: Apurva Mehta <apurva@confluent.io>

Reviewers: Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io>

Closes #3550 from apurvam/KAFKA-5610-handleWriteTxnMarker-should-handle-emigration
This commit is contained in:
Apurva Mehta 2017-07-20 13:05:40 -07:00 committed by Jason Gustafson
parent 9b7a6ee2a9
commit cdff011fe0
2 changed files with 71 additions and 9 deletions

View File

@ -1543,24 +1543,29 @@ class KafkaApis(val requestChannel: RequestChannel,
var skippedMarkers = 0 var skippedMarkers = 0
for (marker <- markers.asScala) { for (marker <- markers.asScala) {
val producerId = marker.producerId val producerId = marker.producerId
val (goodPartitions, partitionsWithIncorrectMessageFormat) = marker.partitions.asScala.partition { partition => val partitionsWithCompatibleMessageFormat = new mutable.ArrayBuffer[TopicPartition]
val currentErrors = new ConcurrentHashMap[TopicPartition, Errors]()
marker.partitions.asScala.foreach { partition =>
replicaManager.getMagic(partition) match { replicaManager.getMagic(partition) match {
case Some(magic) if magic >= RecordBatch.MAGIC_VALUE_V2 => true case Some(magic) =>
case _ => false if (magic < RecordBatch.MAGIC_VALUE_V2)
currentErrors.put(partition, Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT)
else
partitionsWithCompatibleMessageFormat += partition
case None =>
currentErrors.put(partition, Errors.UNKNOWN_TOPIC_OR_PARTITION)
} }
} }
if (partitionsWithIncorrectMessageFormat.nonEmpty) { if (!currentErrors.isEmpty)
val currentErrors = new ConcurrentHashMap[TopicPartition, Errors]()
partitionsWithIncorrectMessageFormat.foreach { partition => currentErrors.put(partition, Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT) }
updateErrors(producerId, currentErrors) updateErrors(producerId, currentErrors)
}
if (goodPartitions.isEmpty) { if (partitionsWithCompatibleMessageFormat.isEmpty) {
numAppends.decrementAndGet() numAppends.decrementAndGet()
skippedMarkers += 1 skippedMarkers += 1
} else { } else {
val controlRecords = goodPartitions.map { partition => val controlRecords = partitionsWithCompatibleMessageFormat.map { partition =>
val controlRecordType = marker.transactionResult match { val controlRecordType = marker.transactionResult match {
case TransactionResult.COMMIT => ControlRecordType.COMMIT case TransactionResult.COMMIT => ControlRecordType.COMMIT
case TransactionResult.ABORT => ControlRecordType.ABORT case TransactionResult.ABORT => ControlRecordType.ABORT

View File

@ -139,6 +139,25 @@ class KafkaApisTest {
assertEquals(expectedErrors, markersResponse.errors(1)) assertEquals(expectedErrors, markersResponse.errors(1))
} }
@Test
def shouldRespondWithUnknownTopicWhenPartitionIsNotHosted(): Unit = {
val topicPartition = new TopicPartition("t", 0)
val (writeTxnMarkersRequest, request) = createWriteTxnMarkersRequest(Utils.mkList(topicPartition))
val expectedErrors = Map(topicPartition -> Errors.UNKNOWN_TOPIC_OR_PARTITION).asJava
val capturedResponse: Capture[RequestChannel.Response] = EasyMock.newCapture()
EasyMock.expect(replicaManager.getMagic(topicPartition))
.andReturn(None)
EasyMock.expect(requestChannel.sendResponse(EasyMock.capture(capturedResponse)))
EasyMock.replay(replicaManager, replicaQuotaManager, requestChannel)
createKafkaApis().handleWriteTxnMarkersRequest(request)
val markersResponse = readResponse(ApiKeys.WRITE_TXN_MARKERS, writeTxnMarkersRequest, capturedResponse)
.asInstanceOf[WriteTxnMarkersResponse]
assertEquals(expectedErrors, markersResponse.errors(1))
}
@Test @Test
def shouldRespondWithUnsupportedMessageFormatForBadPartitionAndNoErrorsForGoodPartition(): Unit = { def shouldRespondWithUnsupportedMessageFormatForBadPartitionAndNoErrorsForGoodPartition(): Unit = {
val tp1 = new TopicPartition("t", 0) val tp1 = new TopicPartition("t", 0)
@ -177,6 +196,44 @@ class KafkaApisTest {
EasyMock.verify(replicaManager) EasyMock.verify(replicaManager)
} }
@Test
def shouldRespondWithUnknownTopicOrPartitionForBadPartitionAndNoErrorsForGoodPartition(): Unit = {
val tp1 = new TopicPartition("t", 0)
val tp2 = new TopicPartition("t1", 0)
val (writeTxnMarkersRequest, request) = createWriteTxnMarkersRequest(Utils.mkList(tp1, tp2))
val expectedErrors = Map(tp1 -> Errors.UNKNOWN_TOPIC_OR_PARTITION, tp2 -> Errors.NONE).asJava
val capturedResponse: Capture[RequestChannel.Response] = EasyMock.newCapture()
val responseCallback: Capture[Map[TopicPartition, PartitionResponse] => Unit] = EasyMock.newCapture()
EasyMock.expect(replicaManager.getMagic(tp1))
.andReturn(None)
EasyMock.expect(replicaManager.getMagic(tp2))
.andReturn(Some(RecordBatch.MAGIC_VALUE_V2))
EasyMock.expect(replicaManager.appendRecords(EasyMock.anyLong(),
EasyMock.anyShort(),
EasyMock.eq(true),
EasyMock.eq(false),
EasyMock.anyObject(),
EasyMock.capture(responseCallback),
EasyMock.anyObject())).andAnswer(new IAnswer[Unit] {
override def answer(): Unit = {
responseCallback.getValue.apply(Map(tp2 -> new PartitionResponse(Errors.NONE)))
}
})
EasyMock.expect(requestChannel.sendResponse(EasyMock.capture(capturedResponse)))
EasyMock.replay(replicaManager, replicaQuotaManager, requestChannel)
createKafkaApis().handleWriteTxnMarkersRequest(request)
val markersResponse = readResponse(ApiKeys.WRITE_TXN_MARKERS, writeTxnMarkersRequest, capturedResponse)
.asInstanceOf[WriteTxnMarkersResponse]
assertEquals(expectedErrors, markersResponse.errors(1))
EasyMock.verify(replicaManager)
}
@Test @Test
def shouldAppendToLogOnWriteTxnMarkersWhenCorrectMagicVersion(): Unit = { def shouldAppendToLogOnWriteTxnMarkersWhenCorrectMagicVersion(): Unit = {
val topicPartition = new TopicPartition("t", 0) val topicPartition = new TopicPartition("t", 0)