KAFKA-14136 Generate ConfigRecord for brokers even if the value is unchanged (#12483)

This commit is contained in:
David Arthur 2022-08-04 15:09:08 -04:00
parent 4e049c706f
commit a7369bd52f
2 changed files with 43 additions and 22 deletions

View File

@ -67,6 +67,7 @@ import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, Test, TestInfo}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import java.util.concurrent.atomic.AtomicInteger
import scala.annotation.nowarn
import scala.collection._
import scala.collection.mutable.ArrayBuffer
@ -352,8 +353,9 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
}
}
@Test // TODO KAFKA-14126 add KRaft support
def testKeyStoreAlter(): Unit = {
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testKeyStoreAlter(quorum: String): Unit = {
val topic2 = "testtopic2"
TestUtils.createTopicWithAdmin(adminClients.head, topic2, servers, numPartitions, replicationFactor = numServers)
@ -419,8 +421,9 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
stopAndVerifyProduceConsume(producerThread, consumerThread)
}
@Test // TODO KAFKA-14126 add KRaft support
def testTrustStoreAlter(): Unit = {
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testTrustStoreAlter(quorum: String): Unit = {
val producerBuilder = ProducerBuilder().listenerName(SecureInternal).securityProtocol(SecurityProtocol.SSL)
// Producer with new keystore should fail to connect before truststore update
@ -467,9 +470,12 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
assertFalse(response.wasDisconnected(), "Request failed because broker is not available")
}
val group_id = new AtomicInteger(1)
def next_group_name(): String = s"alter-truststore-${group_id.getAndIncrement()}"
// Produce/consume should work with old as well as new client keystore
verifySslProduceConsume(sslProperties1, "alter-truststore-1")
verifySslProduceConsume(sslProperties2, "alter-truststore-2")
verifySslProduceConsume(sslProperties1, next_group_name())
verifySslProduceConsume(sslProperties2, next_group_name())
// Revert to old truststore with only one certificate and update. Clients should connect only with old keystore.
val oldTruststoreProps = new Properties
@ -478,7 +484,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
reconfigureServers(oldTruststoreProps, perBrokerConfig = true,
(s"$prefix$SSL_TRUSTSTORE_LOCATION_CONFIG", sslProperties1.getProperty(SSL_TRUSTSTORE_LOCATION_CONFIG)))
verifyAuthenticationFailure(producerBuilder.keyStoreProps(sslProperties2).build())
verifySslProduceConsume(sslProperties1, "alter-truststore-3")
verifySslProduceConsume(sslProperties1, next_group_name())
// Update same truststore file to contain both certificates without changing any configs.
// Clients should connect successfully with either keystore after admin client AlterConfigsRequest completes.
@ -486,8 +492,14 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
Paths.get(sslProperties1.getProperty(SSL_TRUSTSTORE_LOCATION_CONFIG)),
StandardCopyOption.REPLACE_EXISTING)
TestUtils.incrementalAlterConfigs(servers, adminClients.head, oldTruststoreProps, perBrokerConfig = true).all.get()
verifySslProduceConsume(sslProperties1, "alter-truststore-4")
verifySslProduceConsume(sslProperties2, "alter-truststore-5")
TestUtils.retry(30000) {
try {
verifySslProduceConsume(sslProperties1, next_group_name())
verifySslProduceConsume(sslProperties2, next_group_name())
} catch {
case t: Throwable => throw new AssertionError(t)
}
}
// Update internal keystore/truststore and validate new client connections from broker (e.g. controller).
// Alter internal keystore from `sslProperties1` to `sslProperties2`, force disconnect of a controller connection
@ -495,12 +507,13 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
val props2 = securityProps(sslProperties2, KEYSTORE_PROPS, prefix)
props2 ++= securityProps(combinedStoreProps, TRUSTSTORE_PROPS, prefix)
TestUtils.incrementalAlterConfigs(servers, adminClients.head, props2, perBrokerConfig = true).all.get(15, TimeUnit.SECONDS)
verifySslProduceConsume(sslProperties2, "alter-truststore-6")
verifySslProduceConsume(sslProperties2, next_group_name())
props2 ++= securityProps(sslProperties2, TRUSTSTORE_PROPS, prefix)
TestUtils.incrementalAlterConfigs(servers, adminClients.head, props2, perBrokerConfig = true).all.get(15, TimeUnit.SECONDS)
verifySslProduceConsume(sslProperties2, "alter-truststore-7")
verifySslProduceConsume(sslProperties2, next_group_name())
waitForAuthenticationFailure(producerBuilder.keyStoreProps(sslProperties1))
if (!isKRaftTest()) {
val controller = servers.find(_.config.brokerId == TestUtils.waitUntilControllerElected(zkClient)).get.asInstanceOf[KafkaServer]
val controllerChannelManager = controller.kafkaController.controllerChannelManager
val brokerStateInfo: mutable.HashMap[Int, ControllerBrokerStateInfo] =
@ -511,6 +524,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
// validate that the brokerToController request works fine
verifyBrokerToControllerCall(controller)
}
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))

View File

@ -21,6 +21,7 @@ import org.apache.kafka.clients.admin.AlterConfigOp.OpType;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.ConfigResource.Type;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.common.metadata.ConfigRecord;
import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.common.utils.LogContext;
@ -146,7 +147,8 @@ public class ConfigurationControlManager {
}
break;
}
if (!Objects.equals(currentValue, newValue)) {
if (!Objects.equals(currentValue, newValue) || configResource.type().equals(Type.BROKER)) {
// KAFKA-14136 We need to generate records even if the value is unchanged to trigger reloads on the brokers
newRecords.add(new ApiMessageAndVersion(new ConfigRecord().
setResourceType(configResource.type().id()).
setResourceName(configResource.name()).
@ -233,7 +235,8 @@ public class ConfigurationControlManager {
String key = entry.getKey();
String newValue = entry.getValue();
String currentValue = currentConfigs.get(key);
if (!Objects.equals(newValue, currentValue)) {
if (!Objects.equals(currentValue, newValue) || configResource.type().equals(Type.BROKER)) {
// KAFKA-14136 We need to generate records even if the value is unchanged to trigger reloads on the brokers
newRecords.add(new ApiMessageAndVersion(new ConfigRecord().
setResourceType(configResource.type().id()).
setResourceName(configResource.name()).
@ -297,8 +300,12 @@ public class ConfigurationControlManager {
if (configs.isEmpty()) {
configData.remove(configResource);
}
if (configSchema.isSensitive(record)) {
log.info("{}: set configuration {} to {}", configResource, record.name(), Password.HIDDEN);
} else {
log.info("{}: set configuration {} to {}", configResource, record.name(), record.value());
}
}
// VisibleForTesting
Map<String, String> getConfigs(ConfigResource configResource) {