From 844120c601e87693e540fa98a9cdb4d0272f5601 Mon Sep 17 00:00:00 2001 From: Rajini Sivaram Date: Fri, 5 Apr 2019 19:42:28 +0100 Subject: [PATCH] KAFKA-8190; Don't update keystore modification time during validation (#6539) Ensure that modification time is checked against the file used to create the SSLContext that is in-use so that SSLContext is updated whenever file is modified and a config update request is received. Reviewers: Manikumar Reddy --- .../kafka/common/security/ssl/SslFactory.java | 16 +++++++++---- .../common/security/ssl/SslFactoryTest.java | 7 ++++++ .../DynamicBrokerReconfigurationTest.scala | 23 +++++++++++++++++++ 3 files changed, 41 insertions(+), 5 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java index b9b52037c52..0c5094df4a5 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java +++ b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java @@ -139,6 +139,7 @@ public class SslFactory implements Reconfigurable { (Password) configs.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)); try { this.sslContext = createSSLContext(keystore, truststore); + log.debug("Created SSL context with keystore {} truststore {}", keystore, truststore); } catch (Exception e) { throw new KafkaException(e); } @@ -173,6 +174,7 @@ public class SslFactory implements Reconfigurable { SecurityStore keystore = newKeystore != null ? newKeystore : this.keystore; SecurityStore truststore = newTruststore != null ? newTruststore : this.truststore; this.sslContext = createSSLContext(keystore, truststore); + log.info("Created new SSL context with keystore {} truststore {}", keystore, truststore); this.keystore = keystore; this.truststore = truststore; } catch (Exception e) { @@ -316,7 +318,7 @@ public class SslFactory implements Reconfigurable { private final String path; private final Password password; private final Password keyPassword; - private Long fileLastModifiedMs; + private final Long fileLastModifiedMs; SecurityStore(String type, String path, Password password, Password keyPassword) { Objects.requireNonNull(type, "type must not be null"); @@ -324,6 +326,7 @@ public class SslFactory implements Reconfigurable { this.path = path; this.password = password; this.keyPassword = keyPassword; + fileLastModifiedMs = lastModifiedMs(path); } /** @@ -338,10 +341,6 @@ public class SslFactory implements Reconfigurable { // If a password is not set access to the truststore is still available, but integrity checking is disabled. char[] passwordChars = password != null ? password.value().toCharArray() : null; ks.load(in, passwordChars); - fileLastModifiedMs = lastModifiedMs(path); - - log.debug("Loaded key store with path {} modification time {}", path, - fileLastModifiedMs == null ? null : new Date(fileLastModifiedMs)); return ks; } catch (GeneralSecurityException | IOException e) { throw new KafkaException("Failed to load SSL keystore " + path + " of type " + type, e); @@ -361,6 +360,13 @@ public class SslFactory implements Reconfigurable { Long modifiedMs = lastModifiedMs(path); return modifiedMs != null && !Objects.equals(modifiedMs, this.fileLastModifiedMs); } + + @Override + public String toString() { + return "SecurityStore(" + + "path=" + path + + ", modificationTime=" + (fileLastModifiedMs == null ? null : new Date(fileLastModifiedMs)) + ")"; + } } /** diff --git a/clients/src/test/java/org/apache/kafka/common/security/ssl/SslFactoryTest.java b/clients/src/test/java/org/apache/kafka/common/security/ssl/SslFactoryTest.java index bfe34c98382..6c4a2391c8b 100644 --- a/clients/src/test/java/org/apache/kafka/common/security/ssl/SslFactoryTest.java +++ b/clients/src/test/java/org/apache/kafka/common/security/ssl/SslFactoryTest.java @@ -120,6 +120,13 @@ public class SslFactoryTest { assertNotSame("SSL context not recreated", sslContext, sslFactory.sslContext()); sslContext = sslFactory.sslContext(); + // Verify that context is recreated after validation on reconfigure() if config is not changed, but keystore file was modified + keyStoreFile.setLastModified(System.currentTimeMillis() + 15000); + sslFactory.validateReconfiguration(sslConfig); + sslFactory.reconfigure(sslConfig); + assertNotSame("SSL context not recreated", sslContext, sslFactory.sslContext()); + sslContext = sslFactory.sslContext(); + // Verify that the context is not recreated if modification time cannot be determined keyStoreFile.setLastModified(System.currentTimeMillis() + 20000); Files.delete(keyStoreFile.toPath()); diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala index e8aa081925c..fb7b5399545 100644 --- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala +++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala @@ -253,6 +253,29 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet alterSslKeystore(adminClient, sslPropertiesCopy, SecureInternal) verifyProduceConsume(producer, consumer, 10, topic2) + // Verify that keystores can be updated using same file name. + val reusableProps = sslProperties2.clone().asInstanceOf[Properties] + val reusableFile = File.createTempFile("keystore", ".jks") + reusableProps.setProperty(SSL_KEYSTORE_LOCATION_CONFIG, reusableFile.getPath) + Files.copy(new File(sslProperties1.getProperty(SSL_KEYSTORE_LOCATION_CONFIG)).toPath, + reusableFile.toPath, StandardCopyOption.REPLACE_EXISTING) + alterSslKeystore(adminClient, reusableProps, SecureExternal) + val producer3 = ProducerBuilder().trustStoreProps(sslProperties2).maxRetries(0).build() + verifyAuthenticationFailure(producer3) + // Now alter using same file name. We can't check if the update has completed by comparing config on + // the broker, so we wait for producer operation to succeed to verify that the update has been performed. + Files.copy(new File(sslProperties2.getProperty(SSL_KEYSTORE_LOCATION_CONFIG)).toPath, + reusableFile.toPath, StandardCopyOption.REPLACE_EXISTING) + reusableFile.setLastModified(System.currentTimeMillis() + 1000) + alterSslKeystore(adminClient, reusableProps, SecureExternal) + TestUtils.waitUntilTrue(() => { + try { + producer3.partitionsFor(topic).size() == numPartitions + } catch { + case _: Exception => false + } + }, "Keystore not updated") + // Verify that all messages sent with retries=0 while keystores were being altered were consumed stopAndVerifyProduceConsume(producerThread, consumerThread) }