From 8fb5e63aa88019216e95fdbe0b6874c723b64bb4 Mon Sep 17 00:00:00 2001 From: Rajini Sivaram Date: Tue, 2 Oct 2018 20:57:31 +0100 Subject: [PATCH] KAFKA-7429: Enable key/truststore update with same filename/password (#5699) --- .../kafka/common/security/ssl/SslFactory.java | 31 ++++++++- .../common/security/ssl/SslFactoryTest.java | 46 +++++++++++++ .../scala/kafka/server/AdminManager.scala | 2 + .../kafka/server/DynamicBrokerConfig.scala | 64 ++++++++++++++++--- .../DynamicBrokerReconfigurationTest.scala | 9 +++ 5 files changed, 141 insertions(+), 11 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java index b1f7df87690..b9b52037c52 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java +++ b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java @@ -24,6 +24,8 @@ import org.apache.kafka.common.config.internals.BrokerSecurityConfigs; import org.apache.kafka.common.network.Mode; import org.apache.kafka.common.config.types.Password; import org.apache.kafka.common.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import javax.net.ssl.KeyManager; import javax.net.ssl.KeyManagerFactory; @@ -47,6 +49,7 @@ import java.security.cert.X509Certificate; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.Date; import java.util.Enumeration; import java.util.List; import java.util.Map; @@ -54,8 +57,9 @@ import java.util.Objects; import java.util.Set; import java.util.HashSet; - public class SslFactory implements Reconfigurable { + private static final Logger log = LoggerFactory.getLogger(SslFactory.class); + private final Mode mode; private final String clientAuthConfigOverride; private final boolean keystoreVerifiableUsingTruststore; @@ -183,6 +187,9 @@ public class SslFactory implements Reconfigurable { !Objects.equals(configs.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG), keystore.password) || !Objects.equals(configs.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG), keystore.keyPassword); + if (!keystoreChanged) { + keystoreChanged = keystore.modified(); + } if (keystoreChanged) { return createKeystore((String) configs.get(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG), (String) configs.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG), @@ -197,6 +204,9 @@ public class SslFactory implements Reconfigurable { !Objects.equals(configs.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG), truststore.path) || !Objects.equals(configs.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG), truststore.password); + if (!truststoreChanged) { + truststoreChanged = truststore.modified(); + } if (truststoreChanged) { return createTruststore((String) configs.get(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG), (String) configs.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG), @@ -306,6 +316,7 @@ public class SslFactory implements Reconfigurable { private final String path; private final Password password; private final Password keyPassword; + private Long fileLastModifiedMs; SecurityStore(String type, String path, Password password, Password keyPassword) { Objects.requireNonNull(type, "type must not be null"); @@ -327,11 +338,29 @@ public class SslFactory implements Reconfigurable { // If a password is not set access to the truststore is still available, but integrity checking is disabled. char[] passwordChars = password != null ? password.value().toCharArray() : null; ks.load(in, passwordChars); + fileLastModifiedMs = lastModifiedMs(path); + + log.debug("Loaded key store with path {} modification time {}", path, + fileLastModifiedMs == null ? null : new Date(fileLastModifiedMs)); return ks; } catch (GeneralSecurityException | IOException e) { throw new KafkaException("Failed to load SSL keystore " + path + " of type " + type, e); } } + + private Long lastModifiedMs(String path) { + try { + return Files.getLastModifiedTime(Paths.get(path)).toMillis(); + } catch (IOException e) { + log.error("Modification time of key store could not be obtained: " + path, e); + return null; + } + } + + boolean modified() { + Long modifiedMs = lastModifiedMs(path); + return modifiedMs != null && !Objects.equals(modifiedMs, this.fileLastModifiedMs); + } } /** diff --git a/clients/src/test/java/org/apache/kafka/common/security/ssl/SslFactoryTest.java b/clients/src/test/java/org/apache/kafka/common/security/ssl/SslFactoryTest.java index a134104c266..97021e3908a 100644 --- a/clients/src/test/java/org/apache/kafka/common/security/ssl/SslFactoryTest.java +++ b/clients/src/test/java/org/apache/kafka/common/security/ssl/SslFactoryTest.java @@ -17,6 +17,7 @@ package org.apache.kafka.common.security.ssl; import java.io.File; +import java.nio.file.Files; import java.security.KeyStore; import java.util.Map; @@ -32,8 +33,11 @@ import org.junit.Test; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -81,6 +85,48 @@ public class SslFactoryTest { assertTrue(engine.getUseClientMode()); } + @Test + public void testReconfiguration() throws Exception { + File trustStoreFile = File.createTempFile("truststore", ".jks"); + Map sslConfig = TestSslUtils.createSslConfig(false, true, Mode.SERVER, trustStoreFile, "server"); + SslFactory sslFactory = new SslFactory(Mode.SERVER); + sslFactory.configure(sslConfig); + SSLContext sslContext = sslFactory.sslContext(); + assertNotNull("SSL context not created", sslContext); + assertSame("SSL context recreated unnecessarily", sslContext, sslFactory.sslContext()); + assertFalse(sslContext.createSSLEngine("localhost", 0).getUseClientMode()); + + // Verify that context is not recreated on reconfigure() if config and file are not changed + sslFactory.reconfigure(sslConfig); + assertSame("SSL context recreated unnecessarily", sslContext, sslFactory.sslContext()); + + // Verify that context is recreated on reconfigure() if config is changed + trustStoreFile = File.createTempFile("truststore", ".jks"); + sslConfig = TestSslUtils.createSslConfig(false, true, Mode.SERVER, trustStoreFile, "server"); + sslFactory.reconfigure(sslConfig); + assertNotSame("SSL context not recreated", sslContext, sslFactory.sslContext()); + sslContext = sslFactory.sslContext(); + + // Verify that context is recreated on reconfigure() if config is not changed, but truststore file was modified + trustStoreFile.setLastModified(System.currentTimeMillis() + 10000); + sslFactory.reconfigure(sslConfig); + assertNotSame("SSL context not recreated", sslContext, sslFactory.sslContext()); + sslContext = sslFactory.sslContext(); + + // Verify that context is recreated on reconfigure() if config is not changed, but keystore file was modified + File keyStoreFile = new File((String) sslConfig.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG)); + keyStoreFile.setLastModified(System.currentTimeMillis() + 10000); + sslFactory.reconfigure(sslConfig); + assertNotSame("SSL context not recreated", sslContext, sslFactory.sslContext()); + sslContext = sslFactory.sslContext(); + + // Verify that the context is not recreated if modification time cannot be determined + keyStoreFile.setLastModified(System.currentTimeMillis() + 20000); + Files.delete(keyStoreFile.toPath()); + sslFactory.reconfigure(sslConfig); + assertSame("SSL context recreated unnecessarily", sslContext, sslFactory.sslContext()); + } + @Test public void testKeyStoreTrustStoreValidation() throws Exception { File trustStoreFile = File.createTempFile("truststore", ".jks"); diff --git a/core/src/main/scala/kafka/server/AdminManager.scala b/core/src/main/scala/kafka/server/AdminManager.scala index f765f51a89a..2b481702c45 100644 --- a/core/src/main/scala/kafka/server/AdminManager.scala +++ b/core/src/main/scala/kafka/server/AdminManager.scala @@ -389,6 +389,8 @@ class AdminManager(val config: KafkaConfig, this.config.dynamicConfig.validate(configProps, perBrokerConfig) validateConfigPolicy(ConfigResource.Type.BROKER) if (!validateOnly) { + if (perBrokerConfig) + this.config.dynamicConfig.reloadUpdatedFilesWithoutConfigChange(configProps) adminZkClient.changeBrokerConfig(brokerId, this.config.dynamicConfig.toPersistentProps(configProps, perBrokerConfig)) } diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala index bcaaa02ee69..2c0f6c1b52c 100755 --- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala +++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala @@ -87,6 +87,8 @@ object DynamicBrokerConfig { DynamicListenerConfig.ReconfigurableConfigs private val ListenerMechanismConfigs = Set(KafkaConfig.SaslJaasConfigProp) + private val ReloadableFileConfigs = Set(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG) + val ListenerConfigRegex = """listener\.name\.[^.]*\.(.*)""".r private val DynamicPasswordConfigs = { @@ -267,6 +269,27 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging } } + /** + * All config updates through ZooKeeper are triggered through actual changes in values stored in ZooKeeper. + * For some configs like SSL keystores and truststores, we also want to reload the store if it was modified + * in-place, even though the actual value of the file path and password haven't changed. This scenario alone + * is handled here when a config update request using admin client is processed by AdminManager. If any of + * the SSL configs have changed, then the update will not be done here, but will be handled later when ZK + * changes are processed. At the moment, only listener configs are considered for reloading. + */ + private[server] def reloadUpdatedFilesWithoutConfigChange(newProps: Properties): Unit = CoreUtils.inWriteLock(lock) { + reconfigurables + .filter(reconfigurable => ReloadableFileConfigs.exists(reconfigurable.reconfigurableConfigs.contains)) + .foreach { + case reconfigurable: ListenerReconfigurable => + val kafkaProps = validatedKafkaProps(newProps, perBrokerConfig = true) + val newConfig = new KafkaConfig(kafkaProps.asJava, false, None) + processListenerReconfigurable(reconfigurable, newConfig, Collections.emptyMap(), validateOnly = false, reloadOnly = true) + case reconfigurable => + trace(s"Files will not be reloaded without config change for $reconfigurable") + } + } + private def maybeCreatePasswordEncoder(secret: Option[Password]): Option[PasswordEncoder] = { secret.map { secret => new PasswordEncoder(secret, @@ -355,17 +378,28 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging props } - private[server] def validate(props: Properties, perBrokerConfig: Boolean): Unit = CoreUtils.inReadLock(lock) { - validateConfigs(props, perBrokerConfig) + /** + * Validate the provided configs `propsOverride` and return the full Kafka configs with + * the configured defaults and these overrides. + * + * Note: The caller must acquire the read or write lock before invoking this method. + */ + private def validatedKafkaProps(propsOverride: Properties, perBrokerConfig: Boolean): Map[String, String] = { + validateConfigs(propsOverride, perBrokerConfig) val newProps = mutable.Map[String, String]() newProps ++= staticBrokerConfigs if (perBrokerConfig) { overrideProps(newProps, dynamicDefaultConfigs) - overrideProps(newProps, props.asScala) + overrideProps(newProps, propsOverride.asScala) } else { - overrideProps(newProps, props.asScala) + overrideProps(newProps, propsOverride.asScala) overrideProps(newProps, dynamicBrokerConfigs) } + newProps + } + + private[server] def validate(props: Properties, perBrokerConfig: Boolean): Unit = CoreUtils.inReadLock(lock) { + val newProps = validatedKafkaProps(props, perBrokerConfig) processReconfiguration(newProps, validateOnly = true) } @@ -445,12 +479,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging newConfig.valuesFromThisConfig.keySet.asScala.foreach(customConfigs.remove) reconfigurables.foreach { case listenerReconfigurable: ListenerReconfigurable => - val listenerName = listenerReconfigurable.listenerName - val oldValues = currentConfig.valuesWithPrefixOverride(listenerName.configPrefix) - val newValues = newConfig.valuesFromThisConfigWithPrefixOverride(listenerName.configPrefix) - val updatedKeys = updatedConfigs(newValues, oldValues).keySet - if (needsReconfiguration(listenerReconfigurable.reconfigurableConfigs, updatedKeys)) - processReconfigurable(listenerReconfigurable, updatedKeys, newValues, customConfigs, validateOnly) + processListenerReconfigurable(listenerReconfigurable, newConfig, customConfigs, validateOnly, reloadOnly = false) case reconfigurable => if (needsReconfiguration(reconfigurable.reconfigurableConfigs, updatedMap.keySet)) processReconfigurable(reconfigurable, updatedMap.keySet, newConfig.valuesFromThisConfig, customConfigs, validateOnly) @@ -481,6 +510,21 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging reconfigurableConfigs.asScala.intersect(updatedKeys).nonEmpty } + private def processListenerReconfigurable(listenerReconfigurable: ListenerReconfigurable, + newConfig: KafkaConfig, + customConfigs: util.Map[String, Object], + validateOnly: Boolean, + reloadOnly: Boolean): Unit = { + val listenerName = listenerReconfigurable.listenerName + val oldValues = currentConfig.valuesWithPrefixOverride(listenerName.configPrefix) + val newValues = newConfig.valuesFromThisConfigWithPrefixOverride(listenerName.configPrefix) + val updatedKeys = updatedConfigs(newValues, oldValues).keySet + val configsChanged = needsReconfiguration(listenerReconfigurable.reconfigurableConfigs, updatedKeys) + // if `reloadOnly`, reconfigure if configs haven't changed. Otherwise reconfigure if configs have changed + if (reloadOnly != configsChanged) + processReconfigurable(listenerReconfigurable, updatedKeys, newValues, customConfigs, validateOnly) + } + private def processReconfigurable(reconfigurable: Reconfigurable, updatedConfigNames: Set[String], allNewConfigs: util.Map[String, _], diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala index f772e586d6e..5d15cc46ed9 100644 --- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala +++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala @@ -300,6 +300,15 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet (s"$prefix$SSL_TRUSTSTORE_LOCATION_CONFIG", sslProperties1.getProperty(SSL_TRUSTSTORE_LOCATION_CONFIG))) verifyAuthenticationFailure(producerBuilder.keyStoreProps(sslProperties2).build()) verifySslProduceConsume(sslProperties1, "alter-truststore-3") + + // Update same truststore file to contain both certificates without changing any configs. + // Clients should connect successfully with either keystore after admin client AlterConfigsRequest completes. + Files.copy(Paths.get(combinedStoreProps.getProperty(SSL_TRUSTSTORE_LOCATION_CONFIG)), + Paths.get(sslProperties1.getProperty(SSL_TRUSTSTORE_LOCATION_CONFIG)), + StandardCopyOption.REPLACE_EXISTING) + TestUtils.alterConfigs(servers, adminClients.head, oldTruststoreProps, perBrokerConfig = true) + verifySslProduceConsume(sslProperties1, "alter-truststore-4") + verifySslProduceConsume(sslProperties2, "alter-truststore-5") } @Test