KAFKA-6084: Propagate JSON parsing errors in ReassignPartitionsCommand (#4090)

Reviewers: Ismael Juma <ismael@juma.me.uk>
This commit is contained in:
Viktor Somogyi-Vass 2020-12-15 19:05:40 +01:00 committed by GitHub
parent 3717ab0ff2
commit 1aac64667f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 50 additions and 13 deletions

View File

@ -40,6 +40,7 @@ import scala.collection.{Map, Seq, mutable}
import scala.compat.java8.OptionConverters._
import scala.math.Ordered.orderingToOrdered
object ReassignPartitionsCommand extends Logging {
private[admin] val AnyLogDir = "any"
@ -1622,14 +1623,15 @@ object ReassignPartitionsCommand extends Logging {
}
def parsePartitionReassignmentData(jsonData: String): (Seq[(TopicPartition, Seq[Int])], Map[TopicPartitionReplica, String]) = {
Json.parseFull(jsonData) match {
case Some(js) =>
Json.tryParseFull(jsonData) match {
case Right(js) =>
val version = js.asJsonObject.get("version") match {
case Some(jsonValue) => jsonValue.to[Int]
case None => EarliestVersion
}
parsePartitionReassignmentData(version, js)
case None => throw new AdminOperationException("The input string is not a valid JSON")
case Left(f) =>
throw new AdminOperationException(f)
}
}

View File

@ -16,8 +16,9 @@
*/
package kafka.utils
import com.fasterxml.jackson.core.JsonProcessingException
import com.fasterxml.jackson.core.{JsonParseException, JsonProcessingException}
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.node.MissingNode
import kafka.utils.json.JsonValue
import scala.reflect.ClassTag
@ -32,9 +33,7 @@ object Json {
/**
* Parse a JSON string into a JsonValue if possible. `None` is returned if `input` is not valid JSON.
*/
def parseFull(input: String): Option[JsonValue] =
try Option(mapper.readTree(input)).map(JsonValue(_))
catch { case _: JsonProcessingException => None }
def parseFull(input: String): Option[JsonValue] = tryParseFull(input).toOption
/**
* Parse a JSON string into either a generic type T, or a JsonProcessingException in the case of
@ -64,6 +63,19 @@ object Json {
catch { case e: JsonProcessingException => Left(e) }
}
/**
* Parse a JSON string into a JsonValue if possible. It returns an `Either` where `Left` will be an exception and
* `Right` is the `JsonValue`.
* @param input a JSON string to parse
* @return An `Either` which in case of `Left` means an exception and `Right` is the actual return value.
*/
def tryParseFull(input: String): Either[JsonProcessingException, JsonValue] =
if (input == null || input.isEmpty)
Left(new JsonParseException(MissingNode.getInstance().traverse(), "The input string shouldn't be empty"))
else
try Right(mapper.readTree(input)).map(JsonValue(_))
catch { case e: JsonProcessingException => Left(e) }
/**
* Encode an object into a JSON string. This method accepts any type supported by Jackson's ObjectMapper in
* the default configuration. That is, Java collections are supported, but Scala collections are not (to avoid

View File

@ -659,4 +659,18 @@ class ReassignPartitionsUnitTest {
assertTrue("Expected the string to start with %s, but it was %s".format(prefix, str),
str.startsWith(prefix))
}
@Test
def testPropagateInvalidJsonError(): Unit = {
val adminClient = new MockAdminClient.Builder().numBrokers(4).build()
try {
addTopics(adminClient)
assertStartsWith("Unexpected character",
assertThrows(
classOf[AdminOperationException], () => executeAssignment(adminClient, additional = false,
"{invalid_json")).getMessage)
} finally {
adminClient.close()
}
}
}

View File

@ -19,7 +19,7 @@ package kafka.utils
import java.nio.charset.StandardCharsets
import com.fasterxml.jackson.annotation.JsonProperty
import com.fasterxml.jackson.core.JsonParseException
import com.fasterxml.jackson.core.{JsonParseException, JsonProcessingException}
import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.node._
import kafka.utils.JsonTest.TestObject
@ -40,25 +40,34 @@ class JsonTest {
def testJsonParse(): Unit = {
val jnf = JsonNodeFactory.instance
assertEquals(Json.parseFull("{}"), Some(JsonValue(new ObjectNode(jnf))))
assertEquals(Some(JsonValue(new ObjectNode(jnf))), Json.parseFull("{}"))
assertEquals(Right(JsonValue(new ObjectNode(jnf))), Json.tryParseFull("{}"))
assertEquals(classOf[Left[JsonProcessingException, JsonValue]], Json.tryParseFull(null).getClass)
assertThrows(classOf[IllegalArgumentException], () => Json.tryParseBytes(null))
assertEquals(Json.parseFull("""{"foo":"bar"s}"""), None)
assertEquals(None, Json.parseFull(""))
assertEquals(classOf[Left[JsonProcessingException, JsonValue]], Json.tryParseFull("").getClass)
assertEquals(None, Json.parseFull("""{"foo":"bar"s}"""))
val tryRes = Json.tryParseFull("""{"foo":"bar"s}""")
assertTrue(tryRes.isInstanceOf[Left[_, JsonValue]])
val objectNode = new ObjectNode(
jnf,
Map[String, JsonNode]("foo" -> new TextNode("bar"), "is_enabled" -> BooleanNode.TRUE).asJava
)
assertEquals(Json.parseFull("""{"foo":"bar", "is_enabled":true}"""), Some(JsonValue(objectNode)))
assertEquals(Some(JsonValue(objectNode)), Json.parseFull("""{"foo":"bar", "is_enabled":true}"""))
assertEquals(Right(JsonValue(objectNode)), Json.tryParseFull("""{"foo":"bar", "is_enabled":true}"""))
val arrayNode = new ArrayNode(jnf)
Vector(1, 2, 3).map(new IntNode(_)).foreach(arrayNode.add)
assertEquals(Json.parseFull("[1, 2, 3]"), Some(JsonValue(arrayNode)))
assertEquals(Some(JsonValue(arrayNode)), Json.parseFull("[1, 2, 3]"))
// Test with encoder that properly escapes backslash and quotes
val map = Map("foo1" -> """bar1\,bar2""", "foo2" -> """\bar""").asJava
val encoded = Json.encodeAsString(map)
val decoded = Json.parseFull(encoded)
assertEquals(Json.parseFull("""{"foo1":"bar1\\,bar2", "foo2":"\\bar"}"""), decoded)
assertEquals(decoded, Json.parseFull("""{"foo1":"bar1\\,bar2", "foo2":"\\bar"}"""))
}
@Test