KAFKA-6319: Quote strings stored in JSON configs

This is required for ACLs where SSL principals contain
special characters (e.g. comma) that are escaped using
backslash. The strings need to be quoted for JSON to
ensure that the JSON stored in ZK is valid.

Also converted `SslEndToEndAuthorizationTest` to use a
principal with special characters to ensure that this
path is tested.

Author: Rajini Sivaram <rajinisivaram@googlemail.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>

Closes #4303 from rajinisivaram/KAFKA-6319
This commit is contained in:
Rajini Sivaram 2017-12-12 11:05:29 +02:00 committed by Ismael Juma
parent a5cd34d796
commit 651c6e480a
7 changed files with 78 additions and 21 deletions

View File

@ -179,9 +179,9 @@ public class TestSslUtils {
}
public static Map<String, Object> createSslConfig(boolean useClientCert, boolean trustStore,
Mode mode, File trustStoreFile, String certAlias, String hostName)
Mode mode, File trustStoreFile, String certAlias, String cn)
throws IOException, GeneralSecurityException {
return createSslConfig(useClientCert, trustStore, mode, trustStoreFile, certAlias, hostName, new CertificateBuilder());
return createSslConfig(useClientCert, trustStore, mode, trustStoreFile, certAlias, cn, new CertificateBuilder());
}
public static Map<String, Object> createSslConfig(boolean useClientCert, boolean trustStore,

View File

@ -36,7 +36,16 @@ object Json {
*/
def parseFull(input: String): Option[JsonValue] =
try Option(mapper.readTree(input)).map(JsonValue(_))
catch { case _: JsonProcessingException => None }
catch {
case _: JsonProcessingException =>
// Before 1.0.1, Json#encode did not escape backslash or any other special characters. SSL principals
// stored in ACLs may contain backslash as an escape char, making the JSON generated in earlier versions invalid.
// Escape backslash and retry to handle these strings which may have been persisted in ZK.
// Note that this does not handle all special characters (e.g. non-escaped double quotes are not supported)
val escapedInput = input.replaceAll("\\\\", "\\\\\\\\")
try Option(mapper.readTree(escapedInput)).map(JsonValue(_))
catch { case _: JsonProcessingException => None }
}
/**
* Parse a JSON byte array into a JsonValue if possible. `None` is returned if `input` is not valid JSON.
@ -56,7 +65,7 @@ object Json {
obj match {
case null => "null"
case b: Boolean => b.toString
case s: String => "\"" + s + "\""
case s: String => mapper.writeValueAsString(s)
case n: Number => n.toString
case m: Map[_, _] => "{" +
m.map {

View File

@ -26,7 +26,7 @@ import java.util.Properties
import org.apache.kafka.clients.producer.KafkaProducer
import kafka.server.KafkaConfig
import kafka.integration.KafkaServerTestHarness
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.network.{ListenerName, Mode}
import org.junit.{After, Before}
import scala.collection.mutable.Buffer
@ -69,8 +69,8 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness {
@Before
override def setUp() {
val producerSecurityProps = TestUtils.producerSecurityConfigs(securityProtocol, trustStoreFile, clientSaslProperties)
val consumerSecurityProps = TestUtils.consumerSecurityConfigs(securityProtocol, trustStoreFile, clientSaslProperties)
val producerSecurityProps = clientSecurityProps("producer")
val consumerSecurityProps = clientSecurityProps("consumer")
super.setUp()
producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[org.apache.kafka.common.serialization.ByteArraySerializer])
producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[org.apache.kafka.common.serialization.ByteArraySerializer])
@ -87,6 +87,11 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness {
TestUtils.createOffsetsTopic(zkUtils, servers)
}
def clientSecurityProps(certAlias: String): Properties = {
TestUtils.securityConfigs(Mode.CLIENT, securityProtocol, trustStoreFile, certAlias, TestUtils.SslCertificateCn,
clientSaslProperties)
}
def createNewProducer: KafkaProducer[Array[Byte], Array[Byte]] = {
TestUtils.createNewProducer(brokerList,
securityProtocol = this.securityProtocol,

View File

@ -17,20 +17,29 @@
package kafka.api
import java.util.Properties
import kafka.utils.TestUtils
import org.apache.kafka.common.config.SslConfigs
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
import org.apache.kafka.common.network.Mode
import org.apache.kafka.common.security.auth._
import org.junit.Before
object SslEndToEndAuthorizationTest {
class TestPrincipalBuilder extends KafkaPrincipalBuilder {
private val Pattern = "O=A (.*?),CN=localhost".r
private val Pattern = "O=A (.*?),CN=(.*?)".r
// Use full DN as client principal to test special characters in principal
// Use field from DN as server principal to test custom PrincipalBuilder
override def build(context: AuthenticationContext): KafkaPrincipal = {
context match {
case ctx: SslAuthenticationContext =>
ctx.session.getPeerPrincipal.getName match {
case Pattern(name) =>
new KafkaPrincipal(KafkaPrincipal.USER_TYPE, name)
val peerPrincipal = ctx.session.getPeerPrincipal.getName
peerPrincipal match {
case Pattern(name, _) =>
val principal = if (name == "server") name else peerPrincipal
new KafkaPrincipal(KafkaPrincipal.USER_TYPE, principal)
case _ =>
KafkaPrincipal.ANONYMOUS
}
@ -40,18 +49,32 @@ object SslEndToEndAuthorizationTest {
}
class SslEndToEndAuthorizationTest extends EndToEndAuthorizationTest {
import kafka.api.SslEndToEndAuthorizationTest.TestPrincipalBuilder
override protected def securityProtocol = SecurityProtocol.SSL
this.serverConfig.setProperty(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG, "required")
this.serverConfig.setProperty(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, classOf[TestPrincipalBuilder].getName)
override val clientPrincipal = "client"
// Escaped characters in DN attribute values: from http://www.ietf.org/rfc/rfc2253.txt
// - a space or "#" character occurring at the beginning of the string
// - a space character occurring at the end of the string
// - one of the characters ",", "+", """, "\", "<", ">" or ";"
//
// Leading and trailing spaces in Kafka principal dont work with ACLs, but we can workaround by using
// a PrincipalBuilder that removes/replaces them.
private val clientCn = """\#A client with special chars in CN : (\, \+ \" \\ \< \> \; ')"""
override val clientPrincipal = s"O=A client,CN=$clientCn"
override val kafkaPrincipal = "server"
@Before
override def setUp() {
startSasl(jaasSections(List.empty, None, ZkSasl))
super.setUp()
}
override def clientSecurityProps(certAlias: String): Properties = {
val props = TestUtils.securityConfigs(Mode.CLIENT, securityProtocol, trustStoreFile, certAlias, clientCn, clientSaslProperties)
props.remove(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG)
props
}
}

View File

@ -28,7 +28,9 @@ import org.junit.Test
class AclCommandTest extends ZooKeeperTestHarness with Logging {
private val Users = Set(KafkaPrincipal.fromString("User:CN=writeuser,OU=Unknown,O=Unknown,L=Unknown,ST=Unknown,C=Unknown"), KafkaPrincipal.fromString("User:test2"))
private val Users = Set(KafkaPrincipal.fromString("User:CN=writeuser,OU=Unknown,O=Unknown,L=Unknown,ST=Unknown,C=Unknown"),
KafkaPrincipal.fromString("User:test2"),
KafkaPrincipal.fromString("""User:CN=\#User with special chars in CN : (\, \+ \" \\ \< \> \; ')"""))
private val Hosts = Set("host1", "host2")
private val AllowHostCommand = Array("--allow-host", "host1", "--allow-host", "host2")
private val DenyHostCommand = Array("--deny-host", "host1", "--deny-host", "host2")

View File

@ -21,7 +21,9 @@ import org.junit.Test
import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.node._
import kafka.utils.json.JsonValue
import scala.collection.JavaConverters._
import scala.collection.Map
class JsonTest {
@ -42,6 +44,16 @@ class JsonTest {
val arrayNode = new ArrayNode(jnf)
Vector(1, 2, 3).map(new IntNode(_)).foreach(arrayNode.add)
assertEquals(Json.parseFull("[1, 2, 3]"), Some(JsonValue(arrayNode)))
// Test with encoder that properly escapes backslash and quotes
val map = Map("foo1" -> """bar1\,bar2""", "foo2" -> """\bar""")
val encoded = Json.encode(map)
val decoded = Json.parseFull(encoded)
assertEquals(Json.parseFull("""{"foo1":"bar1\\,bar2", "foo2":"\\bar"}"""), decoded)
// Test strings with non-escaped backslash and quotes. This is to verify that ACLs
// containing non-escaped chars persisted using 1.0 can be parsed.
assertEquals(decoded, Json.parseFull("""{"foo1":"bar1\,bar2", "foo2":"\bar"}"""))
}
@Test
@ -61,6 +73,8 @@ class JsonTest {
assertEquals("{}", Json.encode(Map()))
assertEquals("{\"a\":1,\"b\":2}", Json.encode(Map("a" -> 1, "b" -> 2)))
assertEquals("{\"a\":[1,2],\"c\":[3,4]}", Json.encode(Map("a" -> Seq(1,2), "c" -> Seq(3,4))))
assertEquals(""""str1\\,str2"""", Json.encode("""str1\,str2"""))
assertEquals(""""\"quoted\""""", Json.encode(""""quoted""""))
}
}

View File

@ -76,6 +76,8 @@ object TestUtils extends Logging {
val MockZkPort = 1
/** ZooKeeper connection string to use for unit tests that mock/don't require a real ZK server. */
val MockZkConnect = "127.0.0.1:" + MockZkPort
// CN in SSL certificates - this is used for endpoint validation when enabled
val SslCertificateCn = "localhost"
private val transactionStatusKey = "transactionStatus"
private val committedValue : Array[Byte] = "committed".getBytes(StandardCharsets.UTF_8)
@ -519,14 +521,15 @@ object TestUtils extends Logging {
new Producer[K, V](new kafka.producer.ProducerConfig(props))
}
private def securityConfigs(mode: Mode,
def securityConfigs(mode: Mode,
securityProtocol: SecurityProtocol,
trustStoreFile: Option[File],
certAlias: String,
certCn: String,
saslProperties: Option[Properties]): Properties = {
val props = new Properties
if (usesSslTransportLayer(securityProtocol))
props ++= sslConfigs(mode, securityProtocol == SecurityProtocol.SSL, trustStoreFile, certAlias)
props ++= sslConfigs(mode, securityProtocol == SecurityProtocol.SSL, trustStoreFile, certAlias, certCn)
if (usesSaslAuthentication(securityProtocol))
props ++= JaasTestUtils.saslConfigs(saslProperties)
@ -535,7 +538,7 @@ object TestUtils extends Logging {
}
def producerSecurityConfigs(securityProtocol: SecurityProtocol, trustStoreFile: Option[File], saslProperties: Option[Properties]): Properties =
securityConfigs(Mode.CLIENT, securityProtocol, trustStoreFile, "producer", saslProperties)
securityConfigs(Mode.CLIENT, securityProtocol, trustStoreFile, "producer", SslCertificateCn, saslProperties)
/**
* Create a (new) producer with a few pre-configured properties.
@ -596,10 +599,10 @@ object TestUtils extends Logging {
}
def consumerSecurityConfigs(securityProtocol: SecurityProtocol, trustStoreFile: Option[File], saslProperties: Option[Properties]): Properties =
securityConfigs(Mode.CLIENT, securityProtocol, trustStoreFile, "consumer", saslProperties)
securityConfigs(Mode.CLIENT, securityProtocol, trustStoreFile, "consumer", SslCertificateCn, saslProperties)
def adminClientSecurityConfigs(securityProtocol: SecurityProtocol, trustStoreFile: Option[File], saslProperties: Option[Properties]): Properties =
securityConfigs(Mode.CLIENT, securityProtocol, trustStoreFile, "admin-client", saslProperties)
securityConfigs(Mode.CLIENT, securityProtocol, trustStoreFile, "admin-client", SslCertificateCn, saslProperties)
/**
* Create a new consumer with a few pre-configured properties.
@ -1220,12 +1223,13 @@ object TestUtils extends Logging {
copy
}
def sslConfigs(mode: Mode, clientCert: Boolean, trustStoreFile: Option[File], certAlias: String): Properties = {
def sslConfigs(mode: Mode, clientCert: Boolean, trustStoreFile: Option[File], certAlias: String,
certCn: String = SslCertificateCn): Properties = {
val trustStore = trustStoreFile.getOrElse {
throw new Exception("SSL enabled but no trustStoreFile provided")
}
val sslConfigs = TestSslUtils.createSslConfig(clientCert, true, mode, trustStore, certAlias)
val sslConfigs = TestSslUtils.createSslConfig(clientCert, true, mode, trustStore, certAlias, certCn)
val sslProps = new Properties()
sslConfigs.asScala.foreach { case (k, v) => sslProps.put(k, v) }