KAFKA-1580; Reject producer requests to internal topics; reviewed by Joel Koshy and Neha Narkhede

This commit is contained in:
Jonathan Natkins 2014-08-15 15:49:54 -07:00 committed by Joel Koshy
parent d678449b96
commit c6f08b6094
5 changed files with 63 additions and 16 deletions

View File

@ -0,0 +1,38 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.kafka.common.errors;
/**
* The client has attempted to perform an operation on an invalid topic.
*/
public class InvalidTopicException extends ApiException {
private static final long serialVersionUID = 1L;
public InvalidTopicException() {
super();
}
public InvalidTopicException(String message, Throwable cause) {
super(message, cause);
}
public InvalidTopicException(String message) {
super(message);
}
public InvalidTopicException(Throwable cause) {
super(cause);
}
}

View File

@ -19,17 +19,7 @@ package org.apache.kafka.common.protocol;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.CorruptRecordException;
import org.apache.kafka.common.errors.LeaderNotAvailableException;
import org.apache.kafka.common.errors.NetworkException;
import org.apache.kafka.common.errors.NotLeaderForPartitionException;
import org.apache.kafka.common.errors.OffsetMetadataTooLarge;
import org.apache.kafka.common.errors.OffsetOutOfRangeException;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.errors.*;
/**
@ -51,7 +41,9 @@ public enum Errors {
// TODO: errorCode 8, 9, 11
MESSAGE_TOO_LARGE(10, new RecordTooLargeException("The request included a message larger than the max message size the server will accept.")),
OFFSET_METADATA_TOO_LARGE(12, new OffsetMetadataTooLarge("The metadata field of the offset request was too large.")),
NETWORK_EXCEPTION(13, new NetworkException("The server disconnected before a response was received."));
NETWORK_EXCEPTION(13, new NetworkException("The server disconnected before a response was received.")),
// TODO: errorCode 14, 15, 16
INVALID_TOPIC_EXCEPTION(17, new InvalidTopicException("The request attempted to perform an operation on an invalid topic."));
private static Map<Class<?>, Errors> classToError = new HashMap<Class<?>, Errors>();
private static Map<Short, Errors> codeToError = new HashMap<Short, Errors>();

View File

@ -46,6 +46,7 @@ object ErrorMapping {
val OffsetsLoadInProgressCode: Short = 14
val ConsumerCoordinatorNotAvailableCode: Short = 15
val NotCoordinatorForConsumerCode: Short = 16
val InvalidTopicCode : Short = 17
private val exceptionToCode =
Map[Class[Throwable], Short](
@ -63,7 +64,8 @@ object ErrorMapping {
classOf[OffsetMetadataTooLargeException].asInstanceOf[Class[Throwable]] -> OffsetMetadataTooLargeCode,
classOf[OffsetsLoadInProgressException].asInstanceOf[Class[Throwable]] -> OffsetsLoadInProgressCode,
classOf[ConsumerCoordinatorNotAvailableException].asInstanceOf[Class[Throwable]] -> ConsumerCoordinatorNotAvailableCode,
classOf[NotCoordinatorForConsumerException].asInstanceOf[Class[Throwable]] -> NotCoordinatorForConsumerCode
classOf[NotCoordinatorForConsumerException].asInstanceOf[Class[Throwable]] -> NotCoordinatorForConsumerCode,
classOf[InvalidTopicException].asInstanceOf[Class[Throwable]] -> InvalidTopicCode
).withDefaultValue(UnknownCode)
/* invert the mapping */

View File

@ -160,7 +160,7 @@ class KafkaApis(val requestChannel: RequestChannel,
}
val sTime = SystemTime.milliseconds
val localProduceResults = appendToLocalLog(produceRequest)
val localProduceResults = appendToLocalLog(produceRequest, offsetCommitRequestOpt.nonEmpty)
debug("Produce to local log in %d ms".format(SystemTime.milliseconds - sTime))
val firstErrorCode = localProduceResults.find(_.errorCode != ErrorMapping.NoError).map(_.errorCode).getOrElse(ErrorMapping.NoError)
@ -236,11 +236,15 @@ class KafkaApis(val requestChannel: RequestChannel,
/**
* Helper method for handling a parsed producer request
*/
private def appendToLocalLog(producerRequest: ProducerRequest): Iterable[ProduceResult] = {
private def appendToLocalLog(producerRequest: ProducerRequest, isOffsetCommit: Boolean): Iterable[ProduceResult] = {
val partitionAndData: Map[TopicAndPartition, MessageSet] = producerRequest.data
trace("Append [%s] to local log ".format(partitionAndData.toString))
partitionAndData.map {case (topicAndPartition, messages) =>
try {
if (Topic.InternalTopics.contains(topicAndPartition.topic) &&
!(isOffsetCommit && topicAndPartition.topic == OffsetManager.OffsetsTopicName)) {
throw new InvalidTopicException("Cannot append to internal topic %s".format(topicAndPartition.topic))
}
val partitionOpt = replicaManager.getPartition(topicAndPartition.topic, topicAndPartition.partition)
val info = partitionOpt match {
case Some(partition) =>
@ -268,6 +272,10 @@ class KafkaApis(val requestChannel: RequestChannel,
fatal("Halting due to unrecoverable I/O error while handling produce request: ", e)
Runtime.getRuntime.halt(1)
null
case ite: InvalidTopicException =>
warn("Produce request with correlation id %d from client %s on partition %s failed due to %s".format(
producerRequest.correlationId, producerRequest.clientId, topicAndPartition, ite.getMessage))
new ProduceResult(topicAndPartition, ite)
case utpe: UnknownTopicOrPartitionException =>
warn("Produce request with correlation id %d from client %s on partition %s failed due to %s".format(
producerRequest.correlationId, producerRequest.clientId, topicAndPartition, utpe.getMessage))

View File

@ -17,6 +17,8 @@
package kafka.api
import kafka.common.Topic
import org.apache.kafka.common.errors.InvalidTopicException
import org.scalatest.junit.JUnit3Suite
import org.junit.Test
import org.junit.Assert._
@ -65,7 +67,7 @@ class ProducerFailureHandlingTest extends JUnit3Suite with KafkaServerTestHarnes
consumer1 = new SimpleConsumer("localhost", configs(0).port, 100, 1024*1024, "")
consumer2 = new SimpleConsumer("localhost", configs(1).port, 100, 1024*1024, "")
producer1 = TestUtils.createNewProducer(brokerList, acks = 0, blockOnBufferFull = false, bufferSize = producerBufferSize);
producer1 = TestUtils.createNewProducer(brokerList, acks = 0, blockOnBufferFull = false, bufferSize = producerBufferSize)
producer2 = TestUtils.createNewProducer(brokerList, acks = 1, blockOnBufferFull = false, bufferSize = producerBufferSize)
producer3 = TestUtils.createNewProducer(brokerList, acks = -1, blockOnBufferFull = false, bufferSize = producerBufferSize)
}
@ -295,6 +297,11 @@ class ProducerFailureHandlingTest extends JUnit3Suite with KafkaServerTestHarnes
assertEquals("Should have fetched " + scheduler.sent + " unique messages", scheduler.sent, uniqueMessageSize)
}
@Test(expected = classOf[InvalidTopicException])
def testCannotSendToInternalTopic() {
producer1.send(new ProducerRecord(Topic.InternalTopics.head, "test".getBytes, "test".getBytes)).get
}
private class ProducerScheduler extends ShutdownableThread("daemon-producer", false)
{
val numRecords = 1000