diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala index ad7b5454172..9cc1d27d97d 100755 --- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala +++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala @@ -40,6 +40,7 @@ import scala.collection.{Map, Seq, mutable} import scala.compat.java8.OptionConverters._ import scala.math.Ordered.orderingToOrdered + object ReassignPartitionsCommand extends Logging { private[admin] val AnyLogDir = "any" @@ -1622,14 +1623,15 @@ object ReassignPartitionsCommand extends Logging { } def parsePartitionReassignmentData(jsonData: String): (Seq[(TopicPartition, Seq[Int])], Map[TopicPartitionReplica, String]) = { - Json.parseFull(jsonData) match { - case Some(js) => + Json.tryParseFull(jsonData) match { + case Right(js) => val version = js.asJsonObject.get("version") match { case Some(jsonValue) => jsonValue.to[Int] case None => EarliestVersion } parsePartitionReassignmentData(version, js) - case None => throw new AdminOperationException("The input string is not a valid JSON") + case Left(f) => + throw new AdminOperationException(f) } } diff --git a/core/src/main/scala/kafka/utils/Json.scala b/core/src/main/scala/kafka/utils/Json.scala index 48bdc85dbb9..049941cd01d 100644 --- a/core/src/main/scala/kafka/utils/Json.scala +++ b/core/src/main/scala/kafka/utils/Json.scala @@ -16,8 +16,9 @@ */ package kafka.utils -import com.fasterxml.jackson.core.JsonProcessingException +import com.fasterxml.jackson.core.{JsonParseException, JsonProcessingException} import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.databind.node.MissingNode import kafka.utils.json.JsonValue import scala.reflect.ClassTag @@ -32,9 +33,7 @@ object Json { /** * Parse a JSON string into a JsonValue if possible. `None` is returned if `input` is not valid JSON. */ - def parseFull(input: String): Option[JsonValue] = - try Option(mapper.readTree(input)).map(JsonValue(_)) - catch { case _: JsonProcessingException => None } + def parseFull(input: String): Option[JsonValue] = tryParseFull(input).toOption /** * Parse a JSON string into either a generic type T, or a JsonProcessingException in the case of @@ -64,6 +63,19 @@ object Json { catch { case e: JsonProcessingException => Left(e) } } + /** + * Parse a JSON string into a JsonValue if possible. It returns an `Either` where `Left` will be an exception and + * `Right` is the `JsonValue`. + * @param input a JSON string to parse + * @return An `Either` which in case of `Left` means an exception and `Right` is the actual return value. + */ + def tryParseFull(input: String): Either[JsonProcessingException, JsonValue] = + if (input == null || input.isEmpty) + Left(new JsonParseException(MissingNode.getInstance().traverse(), "The input string shouldn't be empty")) + else + try Right(mapper.readTree(input)).map(JsonValue(_)) + catch { case e: JsonProcessingException => Left(e) } + /** * Encode an object into a JSON string. This method accepts any type supported by Jackson's ObjectMapper in * the default configuration. That is, Java collections are supported, but Scala collections are not (to avoid diff --git a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsUnitTest.scala b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsUnitTest.scala index 1429a41ad2f..ba4b709eb3f 100644 --- a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsUnitTest.scala +++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsUnitTest.scala @@ -659,4 +659,18 @@ class ReassignPartitionsUnitTest { assertTrue("Expected the string to start with %s, but it was %s".format(prefix, str), str.startsWith(prefix)) } + + @Test + def testPropagateInvalidJsonError(): Unit = { + val adminClient = new MockAdminClient.Builder().numBrokers(4).build() + try { + addTopics(adminClient) + assertStartsWith("Unexpected character", + assertThrows( + classOf[AdminOperationException], () => executeAssignment(adminClient, additional = false, + "{invalid_json")).getMessage) + } finally { + adminClient.close() + } + } } diff --git a/core/src/test/scala/unit/kafka/utils/JsonTest.scala b/core/src/test/scala/unit/kafka/utils/JsonTest.scala index 60d0234057a..57582a4eabf 100644 --- a/core/src/test/scala/unit/kafka/utils/JsonTest.scala +++ b/core/src/test/scala/unit/kafka/utils/JsonTest.scala @@ -19,7 +19,7 @@ package kafka.utils import java.nio.charset.StandardCharsets import com.fasterxml.jackson.annotation.JsonProperty -import com.fasterxml.jackson.core.JsonParseException +import com.fasterxml.jackson.core.{JsonParseException, JsonProcessingException} import com.fasterxml.jackson.databind.JsonNode import com.fasterxml.jackson.databind.node._ import kafka.utils.JsonTest.TestObject @@ -40,25 +40,34 @@ class JsonTest { def testJsonParse(): Unit = { val jnf = JsonNodeFactory.instance - assertEquals(Json.parseFull("{}"), Some(JsonValue(new ObjectNode(jnf)))) + assertEquals(Some(JsonValue(new ObjectNode(jnf))), Json.parseFull("{}")) + assertEquals(Right(JsonValue(new ObjectNode(jnf))), Json.tryParseFull("{}")) + assertEquals(classOf[Left[JsonProcessingException, JsonValue]], Json.tryParseFull(null).getClass) + assertThrows(classOf[IllegalArgumentException], () => Json.tryParseBytes(null)) - assertEquals(Json.parseFull("""{"foo":"bar"s}"""), None) + assertEquals(None, Json.parseFull("")) + assertEquals(classOf[Left[JsonProcessingException, JsonValue]], Json.tryParseFull("").getClass) + + assertEquals(None, Json.parseFull("""{"foo":"bar"s}""")) + val tryRes = Json.tryParseFull("""{"foo":"bar"s}""") + assertTrue(tryRes.isInstanceOf[Left[_, JsonValue]]) val objectNode = new ObjectNode( jnf, Map[String, JsonNode]("foo" -> new TextNode("bar"), "is_enabled" -> BooleanNode.TRUE).asJava ) - assertEquals(Json.parseFull("""{"foo":"bar", "is_enabled":true}"""), Some(JsonValue(objectNode))) + assertEquals(Some(JsonValue(objectNode)), Json.parseFull("""{"foo":"bar", "is_enabled":true}""")) + assertEquals(Right(JsonValue(objectNode)), Json.tryParseFull("""{"foo":"bar", "is_enabled":true}""")) val arrayNode = new ArrayNode(jnf) Vector(1, 2, 3).map(new IntNode(_)).foreach(arrayNode.add) - assertEquals(Json.parseFull("[1, 2, 3]"), Some(JsonValue(arrayNode))) + assertEquals(Some(JsonValue(arrayNode)), Json.parseFull("[1, 2, 3]")) // Test with encoder that properly escapes backslash and quotes val map = Map("foo1" -> """bar1\,bar2""", "foo2" -> """\bar""").asJava val encoded = Json.encodeAsString(map) val decoded = Json.parseFull(encoded) - assertEquals(Json.parseFull("""{"foo1":"bar1\\,bar2", "foo2":"\\bar"}"""), decoded) + assertEquals(decoded, Json.parseFull("""{"foo1":"bar1\\,bar2", "foo2":"\\bar"}""")) } @Test