KAFKA-8190; Don't update keystore modification time during validation (#6539)

Ensure that modification time is checked against the file used to create the SSLContext that is in-use so that SSLContext is updated whenever file is modified and a config update request is received.

Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>
This commit is contained in:
Rajini Sivaram 2019-04-05 19:42:28 +01:00 committed by GitHub
parent 825fa3fa09
commit 844120c601
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 41 additions and 5 deletions

View File

@ -139,6 +139,7 @@ public class SslFactory implements Reconfigurable {
(Password) configs.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG));
try {
this.sslContext = createSSLContext(keystore, truststore);
log.debug("Created SSL context with keystore {} truststore {}", keystore, truststore);
} catch (Exception e) {
throw new KafkaException(e);
}
@ -173,6 +174,7 @@ public class SslFactory implements Reconfigurable {
SecurityStore keystore = newKeystore != null ? newKeystore : this.keystore;
SecurityStore truststore = newTruststore != null ? newTruststore : this.truststore;
this.sslContext = createSSLContext(keystore, truststore);
log.info("Created new SSL context with keystore {} truststore {}", keystore, truststore);
this.keystore = keystore;
this.truststore = truststore;
} catch (Exception e) {
@ -316,7 +318,7 @@ public class SslFactory implements Reconfigurable {
private final String path;
private final Password password;
private final Password keyPassword;
private Long fileLastModifiedMs;
private final Long fileLastModifiedMs;
SecurityStore(String type, String path, Password password, Password keyPassword) {
Objects.requireNonNull(type, "type must not be null");
@ -324,6 +326,7 @@ public class SslFactory implements Reconfigurable {
this.path = path;
this.password = password;
this.keyPassword = keyPassword;
fileLastModifiedMs = lastModifiedMs(path);
}
/**
@ -338,10 +341,6 @@ public class SslFactory implements Reconfigurable {
// If a password is not set access to the truststore is still available, but integrity checking is disabled.
char[] passwordChars = password != null ? password.value().toCharArray() : null;
ks.load(in, passwordChars);
fileLastModifiedMs = lastModifiedMs(path);
log.debug("Loaded key store with path {} modification time {}", path,
fileLastModifiedMs == null ? null : new Date(fileLastModifiedMs));
return ks;
} catch (GeneralSecurityException | IOException e) {
throw new KafkaException("Failed to load SSL keystore " + path + " of type " + type, e);
@ -361,6 +360,13 @@ public class SslFactory implements Reconfigurable {
Long modifiedMs = lastModifiedMs(path);
return modifiedMs != null && !Objects.equals(modifiedMs, this.fileLastModifiedMs);
}
@Override
public String toString() {
return "SecurityStore(" +
"path=" + path +
", modificationTime=" + (fileLastModifiedMs == null ? null : new Date(fileLastModifiedMs)) + ")";
}
}
/**

View File

@ -120,6 +120,13 @@ public class SslFactoryTest {
assertNotSame("SSL context not recreated", sslContext, sslFactory.sslContext());
sslContext = sslFactory.sslContext();
// Verify that context is recreated after validation on reconfigure() if config is not changed, but keystore file was modified
keyStoreFile.setLastModified(System.currentTimeMillis() + 15000);
sslFactory.validateReconfiguration(sslConfig);
sslFactory.reconfigure(sslConfig);
assertNotSame("SSL context not recreated", sslContext, sslFactory.sslContext());
sslContext = sslFactory.sslContext();
// Verify that the context is not recreated if modification time cannot be determined
keyStoreFile.setLastModified(System.currentTimeMillis() + 20000);
Files.delete(keyStoreFile.toPath());

View File

@ -253,6 +253,29 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
alterSslKeystore(adminClient, sslPropertiesCopy, SecureInternal)
verifyProduceConsume(producer, consumer, 10, topic2)
// Verify that keystores can be updated using same file name.
val reusableProps = sslProperties2.clone().asInstanceOf[Properties]
val reusableFile = File.createTempFile("keystore", ".jks")
reusableProps.setProperty(SSL_KEYSTORE_LOCATION_CONFIG, reusableFile.getPath)
Files.copy(new File(sslProperties1.getProperty(SSL_KEYSTORE_LOCATION_CONFIG)).toPath,
reusableFile.toPath, StandardCopyOption.REPLACE_EXISTING)
alterSslKeystore(adminClient, reusableProps, SecureExternal)
val producer3 = ProducerBuilder().trustStoreProps(sslProperties2).maxRetries(0).build()
verifyAuthenticationFailure(producer3)
// Now alter using same file name. We can't check if the update has completed by comparing config on
// the broker, so we wait for producer operation to succeed to verify that the update has been performed.
Files.copy(new File(sslProperties2.getProperty(SSL_KEYSTORE_LOCATION_CONFIG)).toPath,
reusableFile.toPath, StandardCopyOption.REPLACE_EXISTING)
reusableFile.setLastModified(System.currentTimeMillis() + 1000)
alterSslKeystore(adminClient, reusableProps, SecureExternal)
TestUtils.waitUntilTrue(() => {
try {
producer3.partitionsFor(topic).size() == numPartitions
} catch {
case _: Exception => false
}
}, "Keystore not updated")
// Verify that all messages sent with retries=0 while keystores were being altered were consumed
stopAndVerifyProduceConsume(producerThread, consumerThread)
}