KAFKA-13862; Support Append/Subtract multiple config values in KRaft mode (#12108)

We can append/subtract multiple config values in kraft mode using the `IncrementalAlterConfig` RPC. For example: append/subtract topic config "cleanup.policy" with value="delete,compact" will end up treating "delete,compact" as a value not 2 values. This patch fixes the problem. Additionally, it update the zk logic to correctly handle duplicate additions.

Reviewers: Akhilesh Chaganti <akhileshchg@users.noreply.github.com>, Jason Gustafson <jason@confluent.io>
This commit is contained in:
dengziming 2022-05-11 03:41:17 +08:00 committed by GitHub
parent 020ff2fe0e
commit 0c1cde1080
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 170 additions and 24 deletions

View File

@ -499,7 +499,8 @@ object ConfigAdminManager {
.orElse(Option(ConfigDef.convertToString(configKeys(configPropName).defaultValue, ConfigDef.Type.LIST)))
.getOrElse("")
.split(",").toList
val newValueList = oldValueList ::: alterConfigOp.configEntry.value.split(",").toList
val appendingValueList = alterConfigOp.configEntry.value.split(",").toList.filter(value => !oldValueList.contains(value))
val newValueList = oldValueList ::: appendingValueList
configProps.setProperty(alterConfigOp.configEntry.name, newValueList.mkString(","))
}
case OpType.SUBTRACT => {

View File

@ -18,6 +18,7 @@
package kafka.server.metadata
import java.util.Properties
import java.util.concurrent.atomic.AtomicLong
import kafka.coordinator.group.GroupCoordinator
import kafka.coordinator.transaction.TransactionCoordinator
@ -118,6 +119,11 @@ class BrokerMetadataPublisher(conf: KafkaConfig,
*/
var _firstPublish = true
/**
* This is updated after all components (e.g. LogManager) has finished publishing the new metadata delta
*/
val publishedOffsetAtomic = new AtomicLong(-1)
override def publish(delta: MetadataDelta, newImage: MetadataImage): Unit = {
val highestOffsetAndEpoch = newImage.highestOffsetAndEpoch()
@ -249,6 +255,7 @@ class BrokerMetadataPublisher(conf: KafkaConfig,
if (_firstPublish) {
finishInitializingReplicaManager(newImage)
}
publishedOffsetAtomic.set(newImage.highestOffsetAndEpoch().offset)
} catch {
case t: Throwable => error(s"Error publishing broker metadata at $highestOffsetAndEpoch", t)
throw t
@ -257,6 +264,8 @@ class BrokerMetadataPublisher(conf: KafkaConfig,
}
}
override def publishedOffset: Long = publishedOffsetAtomic.get()
def reloadUpdatedFilesWithoutConfigChange(props: Properties): Unit = {
conf.dynamicConfig.reloadUpdatedFilesWithoutConfigChange(props)
}

View File

@ -30,4 +30,9 @@ trait MetadataPublisher {
* delta to the previous image.
*/
def publish(delta: MetadataDelta, newImage: MetadataImage): Unit
/**
* The highest offset of metadata topic which has been published
*/
def publishedOffset: Long
}

View File

@ -1755,8 +1755,9 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
assertEquals(0, allReassignmentsMap.size())
}
@Test
def testValidIncrementalAlterConfigs(): Unit = {
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testValidIncrementalAlterConfigs(quorum: String): Unit = {
client = Admin.create(createConfig)
// Create topics
@ -1793,6 +1794,10 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
assertEquals(Set(topic1Resource, topic2Resource).asJava, alterResult.values.keySet)
alterResult.all.get
if (isKRaftTest()) {
TestUtils.ensureConsistentKRaftMetadata(brokers, controllerServer, "Timeout waiting for topic configs propagating to brokers")
}
// Verify that topics were updated correctly
var describeResult = client.describeConfigs(Seq(topic1Resource, topic2Resource).asJava)
var configs = describeResult.all.get
@ -1807,7 +1812,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
assertEquals("lz4", configs.get(topic2Resource).get(LogConfig.CompressionTypeProp).value)
assertEquals("delete,compact", configs.get(topic2Resource).get(LogConfig.CleanupPolicyProp).value)
//verify subtract operation, including from an empty property
// verify subtract operation, including from an empty property
topic1AlterConfigs = Seq(
new AlterConfigOp(new ConfigEntry(LogConfig.CleanupPolicyProp, LogConfig.Compact), AlterConfigOp.OpType.SUBTRACT),
new AlterConfigOp(new ConfigEntry(LogConfig.LeaderReplicationThrottledReplicasProp, "0"), AlterConfigOp.OpType.SUBTRACT)
@ -1825,6 +1830,10 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
assertEquals(Set(topic1Resource, topic2Resource).asJava, alterResult.values.keySet)
alterResult.all.get
if (isKRaftTest()) {
TestUtils.ensureConsistentKRaftMetadata(brokers, controllerServer, "Timeout waiting for topic configs propagating to brokers")
}
// Verify that topics were updated correctly
describeResult = client.describeConfigs(Seq(topic1Resource, topic2Resource).asJava)
configs = describeResult.all.get
@ -1852,7 +1861,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
assertEquals("delete", configs.get(topic1Resource).get(LogConfig.CleanupPolicyProp).value)
//Alter topics with validateOnly=true with invalid configs
// Alter topics with validateOnly=true with invalid configs
topic1AlterConfigs = Seq(
new AlterConfigOp(new ConfigEntry(LogConfig.CompressionTypeProp, "zip"), AlterConfigOp.OpType.SET)
).asJava
@ -1861,8 +1870,56 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
topic1Resource -> topic1AlterConfigs
).asJava, new AlterConfigsOptions().validateOnly(true))
assertFutureExceptionTypeEquals(alterResult.values().get(topic1Resource), classOf[InvalidRequestException],
Some("Invalid config value for resource"))
if (isKRaftTest()) {
assertFutureExceptionTypeEquals(alterResult.values().get(topic1Resource), classOf[InvalidConfigurationException],
Some("Invalid value zip for configuration compression.type"))
} else {
assertFutureExceptionTypeEquals(alterResult.values().get(topic1Resource), classOf[InvalidRequestException],
Some("Invalid config value for resource"))
}
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testAppendAlreadyExistsConfigsAndSubtractNotExistsConfigs(quorum: String): Unit = {
client = Admin.create(createConfig)
// Create topics
val topic = "incremental-alter-configs-topic"
val topicResource = new ConfigResource(ConfigResource.Type.TOPIC, topic)
val appendValues = s"0:${brokers.head.config.brokerId}"
val subtractValues = brokers.tail.map(broker => s"0:${broker.config.brokerId}").mkString(",")
assertNotEquals("", subtractValues)
val topicCreateConfigs = new Properties
topicCreateConfigs.setProperty(LogConfig.LeaderReplicationThrottledReplicasProp, appendValues)
createTopic(topic, numPartitions = 1, replicationFactor = 1, topicCreateConfigs)
// Append value that is already present
val topicAppendConfigs = Seq(
new AlterConfigOp(new ConfigEntry(LogConfig.LeaderReplicationThrottledReplicasProp, appendValues), AlterConfigOp.OpType.APPEND),
).asJavaCollection
val appendResult = client.incrementalAlterConfigs(Map(topicResource -> topicAppendConfigs).asJava)
appendResult.all.get
// Subtract values that are not present
val topicSubtractConfigs = Seq(
new AlterConfigOp(new ConfigEntry(LogConfig.LeaderReplicationThrottledReplicasProp, subtractValues), AlterConfigOp.OpType.SUBTRACT)
).asJavaCollection
val subtractResult = client.incrementalAlterConfigs(Map(topicResource -> topicSubtractConfigs).asJava)
subtractResult.all.get
if (isKRaftTest()) {
TestUtils.ensureConsistentKRaftMetadata(brokers, controllerServer)
}
// Verify that topics were updated correctly
val describeResult = client.describeConfigs(Seq(topicResource).asJava)
val configs = describeResult.all.get
assertEquals(appendValues, configs.get(topicResource).get(LogConfig.LeaderReplicationThrottledReplicasProp).value)
}
@Test
@ -2352,7 +2409,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
* Note: this test requires some custom static broker and controller configurations, which are set up in
* BaseAdminIntegrationTest.modifyConfigs and BaseAdminIntegrationTest.kraftControllerConfigs.
*/
@ParameterizedTest
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testCreateTopicsReturnsConfigs(quorum: String): Unit = {
client = Admin.create(super.createConfig)

View File

@ -84,6 +84,8 @@ class BrokerMetadataListenerTest {
Collections.emptyMap[String, VersionRange](), Optional.empty[String](), true),
delta.clusterDelta().broker(1))
}
override def publishedOffset: Long = -1
}).get()
} finally {
listener.close()
@ -125,6 +127,8 @@ class BrokerMetadataListenerTest {
override def publish(delta: MetadataDelta, newImage: MetadataImage): Unit = {
image = newImage
}
override def publishedOffset: Long = -1
}
private val FOO_ID = Uuid.fromString("jj1G9utnTuCegi_gpnRgYw")

View File

@ -1127,6 +1127,25 @@ object TestUtils extends Logging {
throw new IllegalStateException(s"Cannot get topic: $topic, partition: $partition in server metadata cache"))
}
/**
* Wait until the kraft broker metadata have caught up to the controller, before calling this, we should make sure
* the related metadata message has already been committed to the controller metadata log.
*/
def ensureConsistentKRaftMetadata(
brokers: Seq[KafkaBroker],
controllerServer: ControllerServer,
msg: String = "Timeout waiting for controller metadata propagating to brokers"
): Unit = {
val controllerOffset = controllerServer.raftManager.replicatedLog.endOffset().offset - 1
TestUtils.waitUntilTrue(
() => {
brokers.forall { broker =>
val metadataOffset = broker.asInstanceOf[BrokerServer].metadataPublisher.publishedOffset
metadataOffset >= controllerOffset
}
}, msg)
}
def waitUntilControllerElected(zkClient: KafkaZkClient, timeout: Long = JTestUtils.DEFAULT_MAX_WAIT_MS): Int = {
val (controllerId, _) = computeUntilTrue(zkClient.getControllerId, waitTime = timeout)(_.isDefined)
controllerId.getOrElse(throw new AssertionError(s"Controller not elected after $timeout ms"))

View File

@ -213,15 +213,19 @@ public class ConfigurationControlManager {
"key " + key + " because its type is not LIST."));
return;
}
List<String> newValueParts = getParts(newValue, key, configResource);
List<String> oldValueList = getParts(newValue, key, configResource);
if (opType == APPEND) {
if (!newValueParts.contains(opValue)) {
newValueParts.add(opValue);
for (String value : opValue.split(",")) {
if (!oldValueList.contains(value)) {
oldValueList.add(value);
}
}
} else {
for (String value : opValue.split(",")) {
oldValueList.remove(value);
}
newValue = String.join(",", newValueParts);
} else if (newValueParts.remove(opValue)) {
newValue = String.join(",", newValueParts);
}
newValue = String.join(",", oldValueList);
break;
}
if (!Objects.equals(currentValue, newValue)) {

View File

@ -53,6 +53,7 @@ import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET;
import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SUBTRACT;
import static org.apache.kafka.common.config.ConfigResource.Type.BROKER;
import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC;
import static org.apache.kafka.common.metadata.MetadataRecordType.CONFIG_RECORD;
import static org.apache.kafka.metadata.ConfigSynonym.HOURS_TO_MILLISECONDS;
import static org.junit.jupiter.api.Assertions.assertEquals;
@ -137,10 +138,10 @@ public class ConfigurationControlManagerTest {
RecordTestUtils.assertBatchIteratorContains(asList(
asList(new ApiMessageAndVersion(new ConfigRecord().
setResourceType(TOPIC.id()).setResourceName("mytopic").
setName("abc").setValue("x,y,z"), (short) 0),
setName("abc").setValue("x,y,z"), CONFIG_RECORD.highestSupportedVersion()),
new ApiMessageAndVersion(new ConfigRecord().
setResourceType(TOPIC.id()).setResourceName("mytopic").
setName("def").setValue("blah"), (short) 0))),
setName("def").setValue("blah"), CONFIG_RECORD.highestSupportedVersion()))),
manager.iterator(Long.MAX_VALUE));
}
@ -159,7 +160,7 @@ public class ConfigurationControlManagerTest {
assertEquals(ControllerResult.atomicOf(Collections.singletonList(new ApiMessageAndVersion(
new ConfigRecord().setResourceType(TOPIC.id()).setResourceName("mytopic").
setName("abc").setValue("123"), (short) 0)),
setName("abc").setValue("123"), CONFIG_RECORD.highestSupportedVersion())),
toMap(entry(BROKER0, new ApiError(Errors.INVALID_CONFIG,
"Can't SUBTRACT to key baz because its type is not LIST.")),
entry(MYTOPIC, ApiError.NONE))), result);
@ -168,13 +169,59 @@ public class ConfigurationControlManagerTest {
assertEquals(ControllerResult.atomicOf(Collections.singletonList(new ApiMessageAndVersion(
new ConfigRecord().setResourceType(TOPIC.id()).setResourceName("mytopic").
setName("abc").setValue(null), (short) 0)),
setName("abc").setValue(null), CONFIG_RECORD.highestSupportedVersion())),
toMap(entry(MYTOPIC, ApiError.NONE))),
manager.incrementalAlterConfigs(toMap(entry(MYTOPIC, toMap(
entry("abc", entry(DELETE, "xyz"))))),
true));
}
@Test
public void testIncrementalAlterMultipleConfigValues() {
ConfigurationControlManager manager = new ConfigurationControlManager.Builder().
setKafkaConfigSchema(SCHEMA).
build();
ControllerResult<Map<ConfigResource, ApiError>> result = manager.
incrementalAlterConfigs(toMap(entry(MYTOPIC, toMap(entry("abc", entry(APPEND, "123,456,789"))))), true);
assertEquals(ControllerResult.atomicOf(Collections.singletonList(new ApiMessageAndVersion(
new ConfigRecord().setResourceType(TOPIC.id()).setResourceName("mytopic").
setName("abc").setValue("123,456,789"), CONFIG_RECORD.highestSupportedVersion())),
toMap(entry(MYTOPIC, ApiError.NONE))), result);
RecordTestUtils.replayAll(manager, result.records());
// It's ok for the appended value to be already present
result = manager
.incrementalAlterConfigs(toMap(entry(MYTOPIC, toMap(entry("abc", entry(APPEND, "123,456"))))), true);
assertEquals(
ControllerResult.atomicOf(Collections.emptyList(), toMap(entry(MYTOPIC, ApiError.NONE))),
result
);
RecordTestUtils.replayAll(manager, result.records());
result = manager
.incrementalAlterConfigs(toMap(entry(MYTOPIC, toMap(entry("abc", entry(SUBTRACT, "123,456"))))), true);
assertEquals(ControllerResult.atomicOf(Collections.singletonList(new ApiMessageAndVersion(
new ConfigRecord().setResourceType(TOPIC.id()).setResourceName("mytopic").
setName("abc").setValue("789"), CONFIG_RECORD.highestSupportedVersion())),
toMap(entry(MYTOPIC, ApiError.NONE))),
result);
RecordTestUtils.replayAll(manager, result.records());
// It's ok for the deleted value not to be present
result = manager
.incrementalAlterConfigs(toMap(entry(MYTOPIC, toMap(entry("abc", entry(SUBTRACT, "123456"))))), true);
assertEquals(
ControllerResult.atomicOf(Collections.emptyList(), toMap(entry(MYTOPIC, ApiError.NONE))),
result
);
RecordTestUtils.replayAll(manager, result.records());
assertEquals("789", manager.getConfigs(MYTOPIC).get("abc"));
}
@Test
public void testIncrementalAlterConfigsWithoutExistence() {
ConfigurationControlManager manager = new ConfigurationControlManager.Builder().
@ -191,7 +238,7 @@ public class ConfigurationControlManagerTest {
assertEquals(ControllerResult.atomicOf(Collections.singletonList(new ApiMessageAndVersion(
new ConfigRecord().setResourceType(TOPIC.id()).setResourceName("ExistingTopic").
setName("def").setValue("newVal"), (short) 0)),
setName("def").setValue("newVal"), CONFIG_RECORD.highestSupportedVersion())),
toMap(entry(BROKER0, new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION,
"Unknown resource.")),
entry(existingTopic, ApiError.NONE))), result);
@ -242,9 +289,9 @@ public class ConfigurationControlManagerTest {
build();
assertEquals(ControllerResult.atomicOf(asList(new ApiMessageAndVersion(
new ConfigRecord().setResourceType(BROKER.id()).setResourceName("0").
setName("foo.bar").setValue("123"), (short) 0), new ApiMessageAndVersion(
setName("foo.bar").setValue("123"), CONFIG_RECORD.highestSupportedVersion()), new ApiMessageAndVersion(
new ConfigRecord().setResourceType(BROKER.id()).setResourceName("0").
setName("quux").setValue("456"), (short) 0)),
setName("quux").setValue("456"), CONFIG_RECORD.highestSupportedVersion())),
toMap(entry(MYTOPIC, new ApiError(Errors.POLICY_VIOLATION,
"Expected: AlterConfigPolicy.RequestMetadata(resource=ConfigResource(" +
"type=TOPIC, name='mytopic'), configs={}). Got: " +
@ -267,10 +314,10 @@ public class ConfigurationControlManagerTest {
List<ApiMessageAndVersion> expectedRecords1 = asList(
new ApiMessageAndVersion(new ConfigRecord().
setResourceType(TOPIC.id()).setResourceName("mytopic").
setName("abc").setValue("456"), (short) 0),
setName("abc").setValue("456"), CONFIG_RECORD.highestSupportedVersion()),
new ApiMessageAndVersion(new ConfigRecord().
setResourceType(TOPIC.id()).setResourceName("mytopic").
setName("def").setValue("901"), (short) 0));
setName("def").setValue("901"), CONFIG_RECORD.highestSupportedVersion()));
assertEquals(ControllerResult.atomicOf(
expectedRecords1, toMap(entry(MYTOPIC, ApiError.NONE))),
manager.legacyAlterConfigs(
@ -286,7 +333,7 @@ public class ConfigurationControlManagerTest {
.setResourceName("mytopic")
.setName("abc")
.setValue(null),
(short) 0)),
CONFIG_RECORD.highestSupportedVersion())),
toMap(entry(MYTOPIC, ApiError.NONE))),
manager.legacyAlterConfigs(toMap(entry(MYTOPIC, toMap(entry("def", "901")))),
true));