MINOR: Remove unneeded error handlers in deprecated request objects

These handlers were previously used on the broker to handle uncaught exceptions, but now the broker users the new Java request objects exclusively.

Author: Jason Gustafson <jason@confluent.io>

Reviewers: Ismael Juma <ismael@juma.me.uk>

Closes #3646 from hachikuji/remove-old-request-error-handlers
This commit is contained in:
Jason Gustafson 2017-08-10 11:07:53 -07:00
parent 75b0f30c44
commit 017a21c604
8 changed files with 13 additions and 98 deletions

View File

@ -21,15 +21,10 @@ import kafka.utils.nonthreadsafe
import kafka.api.ApiUtils._
import kafka.common.TopicAndPartition
import kafka.consumer.ConsumerConfig
import kafka.network.RequestChannel
import java.util.concurrent.atomic.AtomicInteger
import java.nio.ByteBuffer
import java.util
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record.MemoryRecords
import org.apache.kafka.common.requests.{FetchResponse => JFetchResponse}
import org.apache.kafka.common.protocol.ApiKeys
import scala.collection.mutable.ArrayBuffer
import scala.util.Random
@ -200,18 +195,6 @@ case class FetchRequest(versionId: Short = FetchRequest.CurrentVersion,
describe(true)
}
override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
val responseData = new util.LinkedHashMap[TopicPartition, JFetchResponse.PartitionData]
requestInfo.foreach { case (TopicAndPartition(topic, partition), _) =>
responseData.put(new TopicPartition(topic, partition),
new JFetchResponse.PartitionData(Errors.forException(e), JFetchResponse.INVALID_HIGHWATERMARK,
JFetchResponse.INVALID_LAST_STABLE_OFFSET, JFetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY))
}
val errorResponse = new JFetchResponse(responseData, 0)
// Magic value does not matter here because the message set is empty
requestChannel.sendResponse(RequestChannel.Response(request, errorResponse))
}
override def describe(details: Boolean): String = {
val fetchRequest = new StringBuilder
fetchRequest.append("Name: " + this.getClass.getSimpleName)

View File

@ -19,9 +19,7 @@ package kafka.api
import java.nio.ByteBuffer
import kafka.network.{RequestOrResponseSend, RequestChannel}
import kafka.network.RequestChannel.Response
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.protocol.ApiKeys
@deprecated("This object has been deprecated and will be removed in a future release.", "1.0.0")
object GroupCoordinatorRequest {
@ -64,12 +62,6 @@ case class GroupCoordinatorRequest(group: String,
ApiUtils.writeShortString(buffer, group)
}
override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
// return ConsumerCoordinatorNotAvailable for all uncaught errors
val errorResponse = GroupCoordinatorResponse(None, Errors.COORDINATOR_NOT_AVAILABLE, correlationId)
requestChannel.sendResponse(Response(request, new RequestOrResponseSend(request.connectionId, errorResponse)))
}
def describe(details: Boolean) = {
val consumerMetadataRequest = new StringBuilder
consumerMetadataRequest.append("Name: " + this.getClass.getSimpleName)

View File

@ -21,10 +21,8 @@ import java.nio.ByteBuffer
import kafka.api.ApiUtils._
import kafka.common.{OffsetAndMetadata, TopicAndPartition}
import kafka.network.{RequestOrResponseSend, RequestChannel}
import kafka.network.RequestChannel.Response
import kafka.utils.Logging
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.protocol.ApiKeys
import scala.collection._
@ -162,14 +160,6 @@ case class OffsetCommitRequest(groupId: String,
})
})
override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
val error = Errors.forException(e)
val commitStatus = requestInfo.mapValues(_ => error)
val commitResponse = OffsetCommitResponse(commitStatus, correlationId)
requestChannel.sendResponse(Response(request, new RequestOrResponseSend(request.connectionId, commitResponse)))
}
override def describe(details: Boolean): String = {
val offsetCommitRequest = new StringBuilder
offsetCommitRequest.append("Name: " + this.getClass.getSimpleName)

View File

@ -20,11 +20,9 @@ package kafka.api
import java.nio.ByteBuffer
import kafka.api.ApiUtils._
import kafka.common.{TopicAndPartition, _}
import kafka.network.{RequestOrResponseSend, RequestChannel}
import kafka.network.RequestChannel.Response
import kafka.common.TopicAndPartition
import kafka.utils.Logging
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.protocol.ApiKeys
@deprecated("This object has been deprecated and will be removed in a future release.", "1.0.0")
object OffsetFetchRequest extends Logging {
@ -92,23 +90,6 @@ case class OffsetFetchRequest(groupId: String,
t._2.size * 4 /* partition */
})
override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
val requestVersion = request.header.apiVersion
val thrownError = Errors.forException(e)
val responseMap =
if (requestVersion < 2) {
requestInfo.map {
topicAndPartition => (topicAndPartition, OffsetMetadataAndError(thrownError))
}.toMap
} else {
Map[kafka.common.TopicAndPartition, kafka.common.OffsetMetadataAndError]()
}
val errorResponse = OffsetFetchResponse(requestInfo=responseMap, correlationId=correlationId, error=thrownError)
requestChannel.sendResponse(Response(request, new RequestOrResponseSend(request.connectionId, errorResponse)))
}
override def describe(details: Boolean): String = {
val offsetFetchRequest = new StringBuilder
offsetFetchRequest.append("Name: " + this.getClass.getSimpleName)

View File

@ -21,10 +21,7 @@ import java.nio.ByteBuffer
import kafka.api.ApiUtils._
import kafka.common.TopicAndPartition
import kafka.network.{RequestOrResponseSend, RequestChannel}
import kafka.network.RequestChannel.Response
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.protocol.ApiKeys
@deprecated("This object has been deprecated and will be removed in a future release.", "1.0.0")
object OffsetRequest {
@ -115,14 +112,6 @@ case class OffsetRequest(requestInfo: Map[TopicAndPartition, PartitionOffsetRequ
describe(true)
}
override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
val partitionOffsetResponseMap = requestInfo.map { case (topicAndPartition, _) =>
(topicAndPartition, PartitionOffsetsResponse(Errors.forException(e), Nil))
}
val errorResponse = OffsetResponse(correlationId, partitionOffsetResponseMap)
requestChannel.sendResponse(Response(request, new RequestOrResponseSend(request.connectionId, errorResponse)))
}
override def describe(details: Boolean): String = {
val offsetRequest = new StringBuilder
offsetRequest.append("Name: " + this.getClass.getSimpleName)

View File

@ -22,9 +22,7 @@ import java.nio._
import kafka.api.ApiUtils._
import kafka.common._
import kafka.message._
import kafka.network.{RequestOrResponseSend, RequestChannel}
import kafka.network.RequestChannel.Response
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.protocol.ApiKeys
@deprecated("This object has been deprecated and will be removed in a future release.", "1.0.0")
object ProducerRequest {
@ -130,19 +128,6 @@ case class ProducerRequest(versionId: Short = ProducerRequest.CurrentVersion,
describe(true)
}
override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
if (request.body[org.apache.kafka.common.requests.ProduceRequest].acks == 0) {
requestChannel.sendResponse(new RequestChannel.Response(request, None, RequestChannel.CloseConnectionAction))
}
else {
val producerResponseStatus = data.map { case (topicAndPartition, _) =>
(topicAndPartition, ProducerResponseStatus(Errors.forException(e), -1l, Message.NoTimestamp))
}
val errorResponse = ProducerResponse(correlationId, producerResponseStatus)
requestChannel.sendResponse(Response(request, new RequestOrResponseSend(request.connectionId, errorResponse)))
}
}
override def describe(details: Boolean): String = {
val producerRequest = new StringBuilder
producerRequest.append("Name: " + this.getClass.getSimpleName)

View File

@ -20,10 +20,8 @@ package kafka.api
import java.nio.ByteBuffer
import kafka.api.ApiUtils._
import kafka.network.{RequestOrResponseSend, RequestChannel}
import kafka.network.RequestChannel.Response
import kafka.utils.Logging
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.protocol.ApiKeys
@deprecated("This object has been deprecated and will be removed in a future release.", "1.0.0")
object TopicMetadataRequest extends Logging {
@ -61,14 +59,6 @@ case class TopicMetadataRequest(versionId: Short,
describe(true)
}
override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
val topicMetadata = topics.map {
topic => TopicMetadata(topic, Nil, Errors.forException(e))
}
val errorResponse = TopicMetadataResponse(Seq(), topicMetadata, correlationId)
requestChannel.sendResponse(Response(request, new RequestOrResponseSend(request.connectionId, errorResponse)))
}
override def describe(details: Boolean): String = {
val topicMetadataRequest = new StringBuilder
topicMetadataRequest.append("Name: " + this.getClass.getSimpleName)

View File

@ -60,6 +60,11 @@
if the version of client's FetchRequest or ProducerRequest does not support KafkaStorageException. </li>
<li>-XX:+DisableExplicitGC was replaced by -XX:+ExplicitGCInvokesConcurrent in the default JVM settings. This helps
avoid out of memory exceptions during allocation of native memory by direct buffers in some cases.</li>
<li>The overridden <code>handleError</code> method implementations have been removed from the following deprecated classes in
the <code>kafka.api</code> package: <code>FetchRequest</code>, <code>GroupCoordinatorRequest</code>, <code>OffsetCommitRequest</code>,
<code>OffsetFetchRequest</code>, <code>OffsetRequest</code>, <code>ProducerRequest</code>, and <code>TopicMetadataRequest</code>.
This was only intended for use on the broker, but it is no longer in use and the implementations have not been maintained.
A stub implementation has been retained for binary compatibility.</li>
</ul>
<h5><a id="upgrade_100_new_protocols" href="#upgrade_100_new_protocols">New Protocol Versions</a></h5>