KAFKA-6911; Fix dynamic keystore/truststore update check (#5029)

Fix the check, add unit test to verify the change, update `DynamicBrokerReconfigurationTest` to avoid dynamic keystore update in tests which are not expected to update keystores.
This commit is contained in:
Rajini Sivaram 2018-05-25 00:24:37 +01:00 committed by Jason Gustafson
parent 0f86e68840
commit ff9f928c16
3 changed files with 44 additions and 35 deletions

View File

@ -176,10 +176,10 @@ public class SslFactory implements Reconfigurable {
}
private SecurityStore maybeCreateNewKeystore(Map<String, ?> configs) {
boolean keystoreChanged = Objects.equals(configs.get(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG), keystore.type) ||
Objects.equals(configs.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG), keystore.path) ||
Objects.equals(configs.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG), keystore.password) ||
Objects.equals(configs.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG), keystore.keyPassword);
boolean keystoreChanged = !Objects.equals(configs.get(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG), keystore.type) ||
!Objects.equals(configs.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG), keystore.path) ||
!Objects.equals(configs.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG), keystore.password) ||
!Objects.equals(configs.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG), keystore.keyPassword);
if (keystoreChanged) {
return createKeystore((String) configs.get(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG),
@ -191,9 +191,9 @@ public class SslFactory implements Reconfigurable {
}
private SecurityStore maybeCreateNewTruststore(Map<String, ?> configs) {
boolean truststoreChanged = Objects.equals(configs.get(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG), truststore.type) ||
Objects.equals(configs.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG), truststore.path) ||
Objects.equals(configs.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG), truststore.password);
boolean truststoreChanged = !Objects.equals(configs.get(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG), truststore.type) ||
!Objects.equals(configs.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG), truststore.path) ||
!Objects.equals(configs.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG), truststore.password);
if (truststoreChanged) {
return createTruststore((String) configs.get(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG),

View File

@ -847,18 +847,14 @@ public class SslTransportLayerTest {
CertStores invalidCertStores = new CertStores(true, "server", "127.0.0.1");
Map<String, Object> invalidConfigs = invalidCertStores.getTrustingConfig(clientCertStores);
try {
reconfigurableBuilder.validateReconfiguration(invalidConfigs);
fail("Should have failed validation with an exception with different SubjectAltName");
} catch (KafkaException e) {
// expected exception
}
try {
reconfigurableBuilder.reconfigure(invalidConfigs);
fail("Should have failed to reconfigure with different SubjectAltName");
} catch (KafkaException e) {
// expected exception
}
verifyInvalidReconfigure(reconfigurableBuilder, invalidConfigs, "keystore with different SubjectAltName");
Map<String, Object> missingStoreConfigs = new HashMap<>();
missingStoreConfigs.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "PKCS12");
missingStoreConfigs.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "some.keystore.path");
missingStoreConfigs.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, new Password("some.keystore.password"));
missingStoreConfigs.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, new Password("some.key.password"));
verifyInvalidReconfigure(reconfigurableBuilder, missingStoreConfigs, "keystore not found");
// Verify that new connections continue to work with the server with previously configured keystore after failed reconfiguration
newClientSelector.connect("3", addr, BUFFER_SIZE, BUFFER_SIZE);
@ -911,24 +907,35 @@ public class SslTransportLayerTest {
Map<String, Object> invalidConfigs = new HashMap<>(newTruststoreConfigs);
invalidConfigs.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, "INVALID_TYPE");
try {
reconfigurableBuilder.validateReconfiguration(invalidConfigs);
fail("Should have failed validation with an exception with invalid truststore type");
} catch (KafkaException e) {
// expected exception
}
try {
reconfigurableBuilder.reconfigure(invalidConfigs);
fail("Should have failed to reconfigure with with invalid truststore type");
} catch (KafkaException e) {
// expected exception
}
verifyInvalidReconfigure(reconfigurableBuilder, invalidConfigs, "invalid truststore type");
Map<String, Object> missingStoreConfigs = new HashMap<>();
missingStoreConfigs.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, "PKCS12");
missingStoreConfigs.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "some.truststore.path");
missingStoreConfigs.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, new Password("some.truststore.password"));
verifyInvalidReconfigure(reconfigurableBuilder, missingStoreConfigs, "truststore not found");
// Verify that new connections continue to work with the server with previously configured keystore after failed reconfiguration
newClientSelector.connect("3", addr, BUFFER_SIZE, BUFFER_SIZE);
NetworkTestUtils.checkClientConnection(newClientSelector, "3", 100, 10);
}
private void verifyInvalidReconfigure(ListenerReconfigurable reconfigurable,
Map<String, Object> invalidConfigs, String errorMessage) {
try {
reconfigurable.validateReconfiguration(invalidConfigs);
fail("Should have failed validation with an exception: " + errorMessage);
} catch (KafkaException e) {
// expected exception
}
try {
reconfigurable.reconfigure(invalidConfigs);
fail("Should have failed to reconfigure: " + errorMessage);
} catch (KafkaException e) {
// expected exception
}
}
private Selector createSelector(Map<String, Object> sslClientConfigs) {
return createSelector(sslClientConfigs, null, null, null);
}

View File

@ -119,12 +119,13 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
props ++= sslProperties1
props ++= securityProps(sslProperties1, KEYSTORE_PROPS, listenerPrefix(SecureInternal))
// Set invalid static properties to ensure that dynamic config is used
// Set invalid top-level properties to ensure that listener config is used
// Don't set any dynamic configs here since they get overridden in tests
props ++= invalidSslProperties
props ++= securityProps(invalidSslProperties, KEYSTORE_PROPS, listenerPrefix(SecureExternal))
props ++= securityProps(invalidSslProperties, KEYSTORE_PROPS, "")
props ++= securityProps(sslProperties1, KEYSTORE_PROPS, listenerPrefix(SecureExternal))
val kafkaConfig = KafkaConfig.fromProps(props)
configureDynamicKeystoreInZooKeeper(kafkaConfig, Seq(brokerId), sslProperties1)
servers += TestUtils.createServer(kafkaConfig)
}
@ -183,7 +184,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
if (overrideCount > 0) {
val listenerPrefix = "listener.name.external.ssl."
verifySynonym(configName, synonyms.get(0), isSensitive, listenerPrefix, ConfigSource.DYNAMIC_BROKER_CONFIG, sslProperties1)
verifySynonym(configName, synonyms.get(1), isSensitive, listenerPrefix, ConfigSource.STATIC_BROKER_CONFIG, invalidSslProperties)
verifySynonym(configName, synonyms.get(1), isSensitive, listenerPrefix, ConfigSource.STATIC_BROKER_CONFIG, sslProperties1)
}
verifySynonym(configName, synonyms.get(overrideCount), isSensitive, "ssl.", ConfigSource.STATIC_BROKER_CONFIG, invalidSslProperties)
defaultValue.foreach { value =>
@ -204,6 +205,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
}
val adminClient = adminClients.head
alterSslKeystoreUsingConfigCommand(sslProperties1, SecureExternal)
val configDesc = describeConfig(adminClient)
verifySslConfig("listener.name.external.", sslProperties1, configDesc)