KAFKA-7466: Add IncrementalAlterConfigs API (KIP-339) (#6247)

Reviewers: Colin P. McCabe <cmccabe@apache.org>, Viktor Somogyi <viktorsomogyi@gmail.com>, Stanislav Kozlovski <stanislav_kozlovski@outlook.com>, Rajini Sivaram <rajinisivaram@googlemail.com>, Ismael Juma <ismael@juma.me.uk>
This commit is contained in:
Manikumar Reddy 2019-04-17 04:56:33 +05:30 committed by Colin Patrick McCabe
parent 99ce7d76ce
commit 3b1524c5df
23 changed files with 1029 additions and 74 deletions

View File

@ -43,7 +43,7 @@
files="Sender.java"/>
<suppress checks="ClassDataAbstractionCoupling"
files="(KafkaConsumer|ConsumerCoordinator|Fetcher|KafkaProducer|AbstractRequest|AbstractResponse|TransactionManager|KafkaAdminClient).java"/>
files="(KafkaConsumer|ConsumerCoordinator|Fetcher|KafkaProducer|AbstractRequest|AbstractResponse|TransactionManager|AdminClient|KafkaAdminClient).java"/>
<suppress checks="ClassDataAbstractionCoupling"
files="(Errors|SaslAuthenticatorTest|AgentTest|CoordinatorTest).java"/>

View File

@ -368,7 +368,9 @@ public abstract class AdminClient implements AutoCloseable {
* @param configs The resources with their configs (topic is the only resource type with configs that can
* be updated currently)
* @return The AlterConfigsResult
* @deprecated Since 2.3. Use {@link #incrementalAlterConfigs(Map)}.
*/
@Deprecated
public AlterConfigsResult alterConfigs(Map<ConfigResource, Config> configs) {
return alterConfigs(configs, new AlterConfigsOptions());
}
@ -385,9 +387,53 @@ public abstract class AdminClient implements AutoCloseable {
* be updated currently)
* @param options The options to use when describing configs
* @return The AlterConfigsResult
* @deprecated Since 2.3. Use {@link #incrementalAlterConfigs(Map, AlterConfigsOptions)}.
*/
@Deprecated
public abstract AlterConfigsResult alterConfigs(Map<ConfigResource, Config> configs, AlterConfigsOptions options);
/**
* Incrementally updates the configuration for the specified resources with default options.
*
* This is a convenience method for #{@link AdminClient#incrementalAlterConfigs(Map, AlterConfigsOptions)} with default options.
* See the overload for more details.*
*
* This operation is supported by brokers with version 2.3.0 or higher.
*
* @param configs The resources with their configs
* @return The IncrementalAlterConfigsResult
*/
public AlterConfigsResult incrementalAlterConfigs(Map<ConfigResource, Collection<AlterConfigOp>> configs) {
return incrementalAlterConfigs(configs, new AlterConfigsOptions());
}
/**
* Incrementally update the configuration for the specified resources.
*
* Updates are not transactional so they may succeed for some resources while fail for others. The configs for
* a particular resource are updated atomically.
*
* <p>The following exceptions can be anticipated when calling {@code get()} on the futures obtained from
* the returned {@code IncrementalAlterConfigsResult}:</p>
* <ul>
* <li>{@link org.apache.kafka.common.errors.ClusterAuthorizationException}
* if the authenticated user didn't have alter access to the cluster.</li>
* <li>{@link org.apache.kafka.common.errors.TopicAuthorizationException}
* if the authenticated user didn't have alter access to the Topic.</li>
* <li>{@link org.apache.kafka.common.errors.InvalidRequestException}
* if the request details are invalid. e.g., a configuration key was specified more than once for a resource</li>
* </ul>*
*
* This operation is supported by brokers with version 2.3.0 or higher.
*
* @param configs The resources with their configs
* @param options The options to use when altering configs
* @return The IncrementalAlterConfigsResult
*/
public abstract AlterConfigsResult incrementalAlterConfigs(Map<ConfigResource,
Collection<AlterConfigOp>> configs, AlterConfigsOptions options);
/**
* Change the log directory for the specified replicas. If the replica does not exist on the broker, the result
* shows REPLICA_NOT_AVAILABLE for the given replica and the replica will be created in the given log directory on the

View File

@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.annotation.InterfaceStability;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
* A class representing a alter configuration entry containing name, value and operation type.
*
* The API of this class is evolving, see {@link AdminClient} for details.
*/
@InterfaceStability.Evolving
public class AlterConfigOp {
public enum OpType {
SET((byte) 0), DELETE((byte) 1), APPEND((byte) 2), SUBTRACT((byte) 3);
private static final Map<Byte, OpType> OP_TYPES = Collections.unmodifiableMap(
Arrays.stream(values()).collect(Collectors.toMap(OpType::id, Function.identity()))
);
private final byte id;
OpType(final byte id) {
this.id = id;
}
public byte id() {
return id;
}
public static OpType forId(final byte id) {
return OP_TYPES.get(id);
}
}
private final ConfigEntry configEntry;
private final OpType opType;
public AlterConfigOp(ConfigEntry configEntry, OpType operationType) {
this.configEntry = configEntry;
this.opType = operationType;
}
public ConfigEntry configEntry() {
return configEntry;
};
public OpType opType() {
return opType;
};
@Override
public boolean equals(final Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
final AlterConfigOp that = (AlterConfigOp) o;
return opType == that.opType &&
Objects.equals(configEntry, that.configEntry);
}
@Override
public int hashCode() {
return Objects.hash(opType, configEntry);
}
@Override
public String toString() {
return "AlterConfigOp{" +
"opType=" + opType +
", configEntry=" + configEntry +
'}';
}
}

View File

@ -69,6 +69,10 @@ import org.apache.kafka.common.message.DescribeGroupsRequestData;
import org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroup;
import org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroupMember;
import org.apache.kafka.common.message.MetadataRequestData;
import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData;
import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterableConfigSet;
import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterableConfig;
import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterConfigsResource;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
@ -121,6 +125,8 @@ import org.apache.kafka.common.requests.ExpireDelegationTokenRequest;
import org.apache.kafka.common.requests.ExpireDelegationTokenResponse;
import org.apache.kafka.common.requests.FindCoordinatorRequest;
import org.apache.kafka.common.requests.FindCoordinatorResponse;
import org.apache.kafka.common.requests.IncrementalAlterConfigsRequest;
import org.apache.kafka.common.requests.IncrementalAlterConfigsResponse;
import org.apache.kafka.common.requests.ListGroupsRequest;
import org.apache.kafka.common.requests.ListGroupsResponse;
import org.apache.kafka.common.requests.MetadataRequest;
@ -1885,6 +1891,7 @@ public class KafkaAdminClient extends AdminClient {
}
@Override
@Deprecated
public AlterConfigsResult alterConfigs(Map<ConfigResource, Config> configs, final AlterConfigsOptions options) {
final Map<ConfigResource, KafkaFutureImpl<Void>> allFutures = new HashMap<>();
// We must make a separate AlterConfigs request for every BROKER resource we want to alter
@ -1948,6 +1955,89 @@ public class KafkaAdminClient extends AdminClient {
return futures;
}
@Override
public AlterConfigsResult incrementalAlterConfigs(Map<ConfigResource, Collection<AlterConfigOp>> configs,
final AlterConfigsOptions options) {
final Map<ConfigResource, KafkaFutureImpl<Void>> allFutures = new HashMap<>();
// We must make a separate AlterConfigs request for every BROKER resource we want to alter
// and send the request to that specific broker. Other resources are grouped together into
// a single request that may be sent to any broker.
final Collection<ConfigResource> unifiedRequestResources = new ArrayList<>();
for (ConfigResource resource : configs.keySet()) {
if (resource.type() == ConfigResource.Type.BROKER && !resource.isDefault()) {
NodeProvider nodeProvider = new ConstantNodeIdProvider(Integer.parseInt(resource.name()));
allFutures.putAll(incrementalAlterConfigs(configs, options, Collections.singleton(resource), nodeProvider));
} else
unifiedRequestResources.add(resource);
}
if (!unifiedRequestResources.isEmpty())
allFutures.putAll(incrementalAlterConfigs(configs, options, unifiedRequestResources, new LeastLoadedNodeProvider()));
return new AlterConfigsResult(new HashMap<>(allFutures));
}
private Map<ConfigResource, KafkaFutureImpl<Void>> incrementalAlterConfigs(Map<ConfigResource, Collection<AlterConfigOp>> configs,
final AlterConfigsOptions options,
Collection<ConfigResource> resources,
NodeProvider nodeProvider) {
final Map<ConfigResource, KafkaFutureImpl<Void>> futures = new HashMap<>();
for (ConfigResource resource : resources)
futures.put(resource, new KafkaFutureImpl<>());
final long now = time.milliseconds();
runnable.call(new Call("incrementalAlterConfigs", calcDeadlineMs(now, options.timeoutMs()), nodeProvider) {
@Override
public AbstractRequest.Builder createRequest(int timeoutMs) {
return new IncrementalAlterConfigsRequest.Builder(
toIncrementalAlterConfigsRequestData(resources, configs, options.shouldValidateOnly()));
}
@Override
public void handleResponse(AbstractResponse abstractResponse) {
IncrementalAlterConfigsResponse response = (IncrementalAlterConfigsResponse) abstractResponse;
Map<ConfigResource, ApiError> errors = IncrementalAlterConfigsResponse.fromResponseData(response.data());
for (Map.Entry<ConfigResource, KafkaFutureImpl<Void>> entry : futures.entrySet()) {
KafkaFutureImpl<Void> future = entry.getValue();
ApiException exception = errors.get(entry.getKey()).exception();
if (exception != null) {
future.completeExceptionally(exception);
} else {
future.complete(null);
}
}
}
@Override
void handleFailure(Throwable throwable) {
completeAllExceptionally(futures.values(), throwable);
}
}, now);
return futures;
}
private IncrementalAlterConfigsRequestData toIncrementalAlterConfigsRequestData(final Collection<ConfigResource> resources,
final Map<ConfigResource, Collection<AlterConfigOp>> configs,
final boolean validateOnly) {
IncrementalAlterConfigsRequestData requestData = new IncrementalAlterConfigsRequestData();
requestData.setValidateOnly(validateOnly);
for (ConfigResource resource : resources) {
AlterableConfigSet alterableConfigSet = new AlterableConfigSet();
for (AlterConfigOp configEntry : configs.get(resource))
alterableConfigSet.add(new AlterableConfig().
setName(configEntry.configEntry().name()).
setValue(configEntry.configEntry().value()).
setConfigOperation(configEntry.opType().id()));
AlterConfigsResource alterConfigsResource = new AlterConfigsResource();
alterConfigsResource.setResourceType(resource.type().id()).
setResourceName(resource.name()).setConfigs(alterableConfigSet);
requestData.resources().add(alterConfigsResource);
}
return requestData;
}
@Override
public AlterReplicaLogDirsResult alterReplicaLogDirs(Map<TopicPartitionReplica, String> replicaAssignment, final AlterReplicaLogDirsOptions options) {
final Map<TopicPartitionReplica, KafkaFutureImpl<Void>> futures = new HashMap<>(replicaAssignment.size());

View File

@ -1078,6 +1078,10 @@ public class ConfigDef {
public boolean hasDefault() {
return !NO_DEFAULT_VALUE.equals(this.defaultValue);
}
public Type type() {
return type;
}
}
protected List<String> headers() {

View File

@ -38,6 +38,8 @@ import org.apache.kafka.common.message.SaslAuthenticateRequestData;
import org.apache.kafka.common.message.SaslAuthenticateResponseData;
import org.apache.kafka.common.message.SaslHandshakeRequestData;
import org.apache.kafka.common.message.SaslHandshakeResponseData;
import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData;
import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData;
import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.SchemaException;
import org.apache.kafka.common.protocol.types.Struct;
@ -189,7 +191,9 @@ public enum ApiKeys {
DESCRIBE_DELEGATION_TOKEN(41, "DescribeDelegationToken", DescribeDelegationTokenRequest.schemaVersions(), DescribeDelegationTokenResponse.schemaVersions()),
DELETE_GROUPS(42, "DeleteGroups", DeleteGroupsRequest.schemaVersions(), DeleteGroupsResponse.schemaVersions()),
ELECT_PREFERRED_LEADERS(43, "ElectPreferredLeaders", ElectPreferredLeadersRequestData.SCHEMAS,
ElectPreferredLeadersResponseData.SCHEMAS);
ElectPreferredLeadersResponseData.SCHEMAS),
INCREMENTAL_ALTER_CONFIGS(44, "IncrementalAlterConfigs", IncrementalAlterConfigsRequestData.SCHEMAS,
IncrementalAlterConfigsResponseData.SCHEMAS);
private static final ApiKeys[] ID_TO_TYPE;
private static final int MIN_API_KEY = 0;

View File

@ -231,6 +231,8 @@ public abstract class AbstractRequest extends AbstractRequestResponse {
return new DeleteGroupsRequest(struct, apiVersion);
case ELECT_PREFERRED_LEADERS:
return new ElectPreferredLeadersRequest(struct, apiVersion);
case INCREMENTAL_ALTER_CONFIGS:
return new IncrementalAlterConfigsRequest(struct, apiVersion);
default:
throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseRequest`, the " +
"code should be updated to do so.", apiKey));

View File

@ -158,6 +158,8 @@ public abstract class AbstractResponse extends AbstractRequestResponse {
return new DeleteGroupsResponse(struct);
case ELECT_PREFERRED_LEADERS:
return new ElectPreferredLeadersResponse(struct, version);
case INCREMENTAL_ALTER_CONFIGS:
return new IncrementalAlterConfigsResponse(struct, version);
default:
throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseResponse`, the " +
"code should be updated to do so.", apiKey));

View File

@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.requests;
import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData;
import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterConfigsResource;
import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData;
import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.AlterConfigsResourceResult;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.types.Struct;
import java.nio.ByteBuffer;
public class IncrementalAlterConfigsRequest extends AbstractRequest {
public static class Builder extends AbstractRequest.Builder<IncrementalAlterConfigsRequest> {
private final IncrementalAlterConfigsRequestData data;
public Builder(IncrementalAlterConfigsRequestData data) {
super(ApiKeys.INCREMENTAL_ALTER_CONFIGS);
this.data = data;
}
@Override
public IncrementalAlterConfigsRequest build(short version) {
return new IncrementalAlterConfigsRequest(data, version);
}
@Override
public String toString() {
return data.toString();
}
}
private final IncrementalAlterConfigsRequestData data;
private final short version;
private IncrementalAlterConfigsRequest(IncrementalAlterConfigsRequestData data, short version) {
super(ApiKeys.INCREMENTAL_ALTER_CONFIGS, version);
this.data = data;
this.version = version;
}
IncrementalAlterConfigsRequest(final Struct struct, final short version) {
super(ApiKeys.INCREMENTAL_ALTER_CONFIGS, version);
this.data = new IncrementalAlterConfigsRequestData(struct, version);
this.version = version;
}
public static IncrementalAlterConfigsRequest parse(ByteBuffer buffer, short version) {
return new IncrementalAlterConfigsRequest(ApiKeys.INCREMENTAL_ALTER_CONFIGS.parseRequest(version, buffer), version);
}
public IncrementalAlterConfigsRequestData data() {
return data;
}
@Override
protected Struct toStruct() {
return data.toStruct(version);
}
@Override
public AbstractResponse getErrorResponse(final int throttleTimeMs, final Throwable e) {
IncrementalAlterConfigsResponseData response = new IncrementalAlterConfigsResponseData();
ApiError apiError = ApiError.fromThrowable(e);
for (AlterConfigsResource resource : data.resources()) {
response.responses().add(new AlterConfigsResourceResult()
.setResourceName(resource.resourceName())
.setResourceType(resource.resourceType())
.setErrorCode(apiError.error().code())
.setErrorMessage(apiError.message()));
}
return new IncrementalAlterConfigsResponse(response);
}
}

View File

@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.requests;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData;
import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.AlterConfigsResourceResult;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.Struct;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
public class IncrementalAlterConfigsResponse extends AbstractResponse {
public static IncrementalAlterConfigsResponseData toResponseData(final int requestThrottleMs,
final Map<ConfigResource, ApiError> results) {
IncrementalAlterConfigsResponseData responseData = new IncrementalAlterConfigsResponseData();
responseData.setThrottleTimeMs(requestThrottleMs);
for (Map.Entry<ConfigResource, ApiError> entry : results.entrySet()) {
responseData.responses().add(new AlterConfigsResourceResult().
setResourceName(entry.getKey().name()).
setResourceType(entry.getKey().type().id()).
setErrorCode(entry.getValue().error().code()).
setErrorMessage(entry.getValue().message()));
}
return responseData;
}
public static Map<ConfigResource, ApiError> fromResponseData(final IncrementalAlterConfigsResponseData data) {
Map<ConfigResource, ApiError> map = new HashMap<>();
for (AlterConfigsResourceResult result : data.responses()) {
map.put(new ConfigResource(ConfigResource.Type.forId(result.resourceType()), result.resourceName()),
new ApiError(Errors.forCode(result.errorCode()), result.errorMessage()));
}
return map;
}
private final IncrementalAlterConfigsResponseData data;
public IncrementalAlterConfigsResponse(IncrementalAlterConfigsResponseData data) {
this.data = data;
}
public IncrementalAlterConfigsResponse(final Struct struct, final short version) {
this.data = new IncrementalAlterConfigsResponseData(struct, version);
}
public IncrementalAlterConfigsResponseData data() {
return data;
}
@Override
public Map<Errors, Integer> errorCounts() {
HashMap<Errors, Integer> counts = new HashMap<>();
for (AlterConfigsResourceResult result : data.responses()) {
Errors error = Errors.forCode(result.errorCode());
counts.put(error, counts.getOrDefault(error, 0) + 1);
}
return counts;
}
@Override
protected Struct toStruct(final short version) {
return data.toStruct(version);
}
@Override
public boolean shouldClientThrottle(short version) {
return version >= 0;
}
@Override
public int throttleTimeMs() {
return data.throttleTimeMs();
}
public static IncrementalAlterConfigsResponse parse(ByteBuffer buffer, short version) {
return new IncrementalAlterConfigsResponse(
ApiKeys.INCREMENTAL_ALTER_CONFIGS.responseSchema(version).read(buffer), version);
}
}

View File

@ -0,0 +1,41 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
{
"apiKey": 44,
"type": "request",
"name": "IncrementalAlterConfigsRequest",
"validVersions": "0",
"fields": [
{ "name": "Resources", "type": "[]AlterConfigsResource", "versions": "0+",
"about": "The incremental updates for each resource.", "fields": [
{ "name": "ResourceType", "type": "int8", "versions": "0+", "mapKey": true,
"about": "The resource type." },
{ "name": "ResourceName", "type": "string", "versions": "0+", "mapKey": true,
"about": "The resource name." },
{ "name": "Configs", "type": "[]AlterableConfig", "versions": "0+",
"about": "The configurations.", "fields": [
{ "name": "Name", "type": "string", "versions": "0+", "mapKey": true,
"about": "The configuration key name." },
{ "name": "ConfigOperation", "type": "int8", "versions": "0+", "mapKey": true,
"about": "The type (Set, Delete, Append, Subtract) of operation." },
{ "name": "Value", "type": "string", "versions": "0+", "nullableVersions": "0+",
"about": "The value to set for the configuration key."}
]}
]},
{ "name": "ValidateOnly", "type": "bool", "versions": "0+",
"about": "True if we should validate the request, but not change the configurations."}
]
}

View File

@ -0,0 +1,36 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
{
"apiKey": 44,
"type": "response",
"name": "IncrementalAlterConfigsResponse",
"validVersions": "0",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
"about": "Duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
{ "name": "responses", "type": "[]AlterConfigsResourceResult", "versions": "0+",
"about": "The responses for each resource.", "fields": [
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The resource error code." },
{ "name": "ErrorMessage", "type": "string", "nullableVersions": "0+", "versions": "0+",
"about": "The resource error message, or null if there was no error." },
{ "name": "ResourceType", "type": "int8", "versions": "0+",
"about": "The resource type." },
{ "name": "ResourceName", "type": "string", "versions": "0+",
"about": "The resource name." }
]}
]
}

View File

@ -59,6 +59,8 @@ import org.apache.kafka.common.message.DescribeGroupsResponseData;
import org.apache.kafka.common.message.ElectPreferredLeadersResponseData;
import org.apache.kafka.common.message.ElectPreferredLeadersResponseData.PartitionResult;
import org.apache.kafka.common.message.ElectPreferredLeadersResponseData.ReplicaElectionResult;
import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData;
import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.AlterConfigsResourceResult;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.common.requests.CreateAclsResponse;
@ -78,6 +80,7 @@ import org.apache.kafka.common.requests.DescribeConfigsResponse;
import org.apache.kafka.common.requests.DescribeGroupsResponse;
import org.apache.kafka.common.requests.ElectPreferredLeadersResponse;
import org.apache.kafka.common.requests.FindCoordinatorResponse;
import org.apache.kafka.common.requests.IncrementalAlterConfigsResponse;
import org.apache.kafka.common.requests.ListGroupsResponse;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.requests.MetadataResponse;
@ -1203,6 +1206,59 @@ public class KafkaAdminClientTest {
}
}
@Test
public void testIncrementalAlterConfigs() throws Exception {
try (AdminClientUnitTestEnv env = mockClientEnv()) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
//test error scenarios
IncrementalAlterConfigsResponseData responseData = new IncrementalAlterConfigsResponseData();
responseData.responses().add(new AlterConfigsResourceResult()
.setResourceName("")
.setResourceType(ConfigResource.Type.BROKER.id())
.setErrorCode(Errors.CLUSTER_AUTHORIZATION_FAILED.code())
.setErrorMessage("authorization error"));
responseData.responses().add(new AlterConfigsResourceResult()
.setResourceName("topic1")
.setResourceType(ConfigResource.Type.TOPIC.id())
.setErrorCode(Errors.INVALID_REQUEST.code())
.setErrorMessage("Config value append is not allowed for config"));
env.kafkaClient().prepareResponse(new IncrementalAlterConfigsResponse(responseData));
ConfigResource brokerResource = new ConfigResource(ConfigResource.Type.BROKER, "");
ConfigResource topicResource = new ConfigResource(ConfigResource.Type.TOPIC, "topic1");
AlterConfigOp alterConfigOp1 = new AlterConfigOp(
new ConfigEntry("log.segment.bytes", "1073741"),
AlterConfigOp.OpType.SET);
AlterConfigOp alterConfigOp2 = new AlterConfigOp(
new ConfigEntry("compression.type", "gzip"),
AlterConfigOp.OpType.APPEND);
final Map<ConfigResource, Collection<AlterConfigOp>> configs = new HashMap<>();
configs.put(brokerResource, Collections.singletonList(alterConfigOp1));
configs.put(topicResource, Collections.singletonList(alterConfigOp2));
AlterConfigsResult result = env.adminClient().incrementalAlterConfigs(configs);
TestUtils.assertFutureError(result.values().get(brokerResource), ClusterAuthorizationException.class);
TestUtils.assertFutureError(result.values().get(topicResource), InvalidRequestException.class);
// Test a call where there are no errors.
responseData = new IncrementalAlterConfigsResponseData();
responseData.responses().add(new AlterConfigsResourceResult()
.setResourceName("")
.setResourceType(ConfigResource.Type.BROKER.id())
.setErrorCode(Errors.NONE.code())
.setErrorMessage(ApiError.NONE.message()));
env.kafkaClient().prepareResponse(new IncrementalAlterConfigsResponse(responseData));
env.adminClient().incrementalAlterConfigs(Collections.singletonMap(brokerResource, asList(alterConfigOp1))).all().get();
}
}
@SafeVarargs
private static <T> void assertCollectionIs(Collection<T> collection, T... elements) {
for (T element : elements) {

View File

@ -375,10 +375,17 @@ public class MockAdminClient extends AdminClient {
}
@Override
@Deprecated
public AlterConfigsResult alterConfigs(Map<ConfigResource, Config> configs, AlterConfigsOptions options) {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public AlterConfigsResult incrementalAlterConfigs(Map<ConfigResource, Collection<AlterConfigOp>> configs,
AlterConfigsOptions options) {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public AlterReplicaLogDirsResult alterReplicaLogDirs(Map<TopicPartitionReplica, String> replicaAssignment, AlterReplicaLogDirsOptions options) {
throw new UnsupportedOperationException("Not implemented yet");

View File

@ -62,6 +62,12 @@ import org.apache.kafka.common.message.SaslAuthenticateRequestData;
import org.apache.kafka.common.message.SaslAuthenticateResponseData;
import org.apache.kafka.common.message.SaslHandshakeRequestData;
import org.apache.kafka.common.message.SaslHandshakeResponseData;
import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData;
import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterConfigsResource;
import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterableConfigSet;
import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterableConfig;
import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData;
import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.AlterConfigsResourceResult;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.network.Send;
import org.apache.kafka.common.protocol.ApiKeys;
@ -338,6 +344,9 @@ public class RequestResponseTest {
checkRequest(createElectPreferredLeadersRequestNullPartitions());
checkErrorResponse(createElectPreferredLeadersRequest(), new UnknownServerException());
checkResponse(createElectPreferredLeadersResponse(), 0);
checkRequest(createIncrementalAlterConfigsRequest());
checkErrorResponse(createIncrementalAlterConfigsRequest(), new UnknownServerException());
checkResponse(createIncrementalAlterConfigsResponse(), 0);
}
@Test
@ -1477,4 +1486,30 @@ public class RequestResponseTest {
return new ElectPreferredLeadersResponse(data);
}
private IncrementalAlterConfigsRequest createIncrementalAlterConfigsRequest() {
IncrementalAlterConfigsRequestData data = new IncrementalAlterConfigsRequestData();
AlterableConfig alterableConfig = new AlterableConfig()
.setName("retention.ms")
.setConfigOperation((byte) 0)
.setValue("100");
AlterableConfigSet alterableConfigs = new AlterableConfigSet();
alterableConfigs.add(alterableConfig);
data.resources().add(new AlterConfigsResource()
.setResourceName("testtopic")
.setResourceType(ResourceType.TOPIC.code())
.setConfigs(alterableConfigs));
return new IncrementalAlterConfigsRequest.Builder(data).build((short) 0);
}
private IncrementalAlterConfigsResponse createIncrementalAlterConfigsResponse() {
IncrementalAlterConfigsResponseData data = new IncrementalAlterConfigsResponseData();
data.responses().add(new AlterConfigsResourceResult()
.setResourceName("testtopic")
.setResourceType(ResourceType.TOPIC.code())
.setErrorCode(Errors.INVALID_REQUEST.code())
.setErrorMessage("Duplicate Keys"));
return new IncrementalAlterConfigsResponse(data);
}
}

View File

@ -297,6 +297,8 @@ object LogConfig {
throw new InvalidConfigurationException(s"Unknown topic config name: $name")
}
private[kafka] def configKeys: Map[String, ConfigKey] = configDef.configKeys.asScala
/**
* Check that the given properties contain only valid log config names and that all values can be parsed and are valid
*/

View File

@ -24,6 +24,9 @@ import kafka.log.LogConfig
import kafka.metrics.KafkaMetricsGroup
import kafka.utils._
import kafka.zk.{AdminZkClient, KafkaZkClient}
import org.apache.kafka.clients.admin.AlterConfigOp
import org.apache.kafka.clients.admin.AlterConfigOp.OpType
import org.apache.kafka.common.config.ConfigDef.ConfigKey
import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, ConfigException, ConfigResource}
import org.apache.kafka.common.errors.{ApiException, InvalidConfigurationException, InvalidPartitionsException, InvalidReplicaAssignmentException, InvalidRequestException, ReassignmentInProgressException, UnknownTopicOrPartitionException}
import org.apache.kafka.common.internals.Topic
@ -38,7 +41,7 @@ import org.apache.kafka.common.requests.{AlterConfigsRequest, ApiError, Describe
import org.apache.kafka.server.policy.{AlterConfigPolicy, CreateTopicPolicy}
import org.apache.kafka.server.policy.CreateTopicPolicy.RequestMetadata
import scala.collection.{mutable, _}
import scala.collection.{Map, mutable, _}
import scala.collection.JavaConverters._
class AdminManager(val config: KafkaConfig,
@ -364,59 +367,17 @@ class AdminManager(val config: KafkaConfig,
def alterConfigs(configs: Map[ConfigResource, AlterConfigsRequest.Config], validateOnly: Boolean): Map[ConfigResource, ApiError] = {
configs.map { case (resource, config) =>
def validateConfigPolicy(resourceType: ConfigResource.Type): Unit = {
alterConfigPolicy match {
case Some(policy) =>
val configEntriesMap = config.entries.asScala.map(entry => (entry.name, entry.value)).toMap
policy.validate(new AlterConfigPolicy.RequestMetadata(
new ConfigResource(resourceType, resource.name), configEntriesMap.asJava))
case None =>
}
}
try {
val configEntriesMap = config.entries.asScala.map(entry => (entry.name, entry.value)).toMap
val configProps = new Properties
config.entries.asScala.foreach { configEntry =>
configProps.setProperty(configEntry.name, configEntry.value)
}
resource.`type` match {
case ConfigResource.Type.TOPIC =>
val topic = resource.name
val properties = new Properties
config.entries.asScala.foreach { configEntry =>
properties.setProperty(configEntry.name, configEntry.value)
}
adminZkClient.validateTopicConfig(topic, properties)
validateConfigPolicy(ConfigResource.Type.TOPIC)
if (!validateOnly) {
info(s"Updating topic $topic with new configuration $config")
adminZkClient.changeTopicConfig(topic, properties)
}
resource -> ApiError.NONE
case ConfigResource.Type.BROKER =>
val brokerId = if (resource.name == null || resource.name.isEmpty)
None
else {
val id = resourceNameToBrokerId(resource.name)
if (id != this.config.brokerId)
throw new InvalidRequestException(s"Unexpected broker id, expected ${this.config.brokerId}, but received $resource.name")
Some(id)
}
val configProps = new Properties
config.entries.asScala.foreach { configEntry =>
configProps.setProperty(configEntry.name, configEntry.value)
}
val perBrokerConfig = brokerId.nonEmpty
this.config.dynamicConfig.validate(configProps, perBrokerConfig)
validateConfigPolicy(ConfigResource.Type.BROKER)
if (!validateOnly) {
if (perBrokerConfig)
this.config.dynamicConfig.reloadUpdatedFilesWithoutConfigChange(configProps)
adminZkClient.changeBrokerConfig(brokerId,
this.config.dynamicConfig.toPersistentProps(configProps, perBrokerConfig))
}
resource -> ApiError.NONE
case ConfigResource.Type.TOPIC => alterTopicConfigs(resource, validateOnly, configProps, configEntriesMap)
case ConfigResource.Type.BROKER => alterBrokerConfigs(resource, validateOnly, configProps, configEntriesMap)
case resourceType =>
throw new InvalidRequestException(s"AlterConfigs is only supported for topics and brokers, but resource type is $resourceType")
}
@ -437,6 +398,133 @@ class AdminManager(val config: KafkaConfig,
}.toMap
}
private def alterTopicConfigs(resource: ConfigResource, validateOnly: Boolean,
configProps: Properties, configEntriesMap: Map[String, String]): (ConfigResource, ApiError) = {
val topic = resource.name
adminZkClient.validateTopicConfig(topic, configProps)
validateConfigPolicy(resource, configEntriesMap)
if (!validateOnly) {
info(s"Updating topic $topic with new configuration $config")
adminZkClient.changeTopicConfig(topic, configProps)
}
resource -> ApiError.NONE
}
private def alterBrokerConfigs(resource: ConfigResource, validateOnly: Boolean,
configProps: Properties, configEntriesMap: Map[String, String]): (ConfigResource, ApiError) = {
val brokerId = getBrokerId(resource)
val perBrokerConfig = brokerId.nonEmpty
this.config.dynamicConfig.validate(configProps, perBrokerConfig)
validateConfigPolicy(resource, configEntriesMap)
if (!validateOnly) {
if (perBrokerConfig)
this.config.dynamicConfig.reloadUpdatedFilesWithoutConfigChange(configProps)
adminZkClient.changeBrokerConfig(brokerId,
this.config.dynamicConfig.toPersistentProps(configProps, perBrokerConfig))
}
resource -> ApiError.NONE
}
private def getBrokerId(resource: ConfigResource) = {
if (resource.name == null || resource.name.isEmpty)
None
else {
val id = resourceNameToBrokerId(resource.name)
if (id != this.config.brokerId)
throw new InvalidRequestException(s"Unexpected broker id, expected ${this.config.brokerId}, but received $resource.name")
Some(id)
}
}
private def validateConfigPolicy(resource: ConfigResource, configEntriesMap: Map[String, String]): Unit = {
alterConfigPolicy match {
case Some(policy) =>
policy.validate(new AlterConfigPolicy.RequestMetadata(
new ConfigResource(resource.`type`(), resource.name), configEntriesMap.asJava))
case None =>
}
}
def incrementalAlterConfigs(configs: Map[ConfigResource, List[AlterConfigOp]], validateOnly: Boolean): Map[ConfigResource, ApiError] = {
configs.map { case (resource, alterConfigOps) =>
try {
//throw InvalidRequestException if any duplicate keys
val duplicateKeys = alterConfigOps.groupBy(config => config.configEntry().name())
.mapValues(_.size).filter(_._2 > 1).keys.toSet
if (duplicateKeys.nonEmpty)
throw new InvalidRequestException(s"Error due to duplicate config keys : ${duplicateKeys.mkString(",")}")
val configEntriesMap = alterConfigOps.map(entry => (entry.configEntry().name(), entry.configEntry().value())).toMap
resource.`type` match {
case ConfigResource.Type.TOPIC =>
val configProps = adminZkClient.fetchEntityConfig(ConfigType.Topic, resource.name)
prepareIncrementalConfigs(alterConfigOps, configProps, LogConfig.configKeys)
alterTopicConfigs(resource, validateOnly, configProps, configEntriesMap)
case ConfigResource.Type.BROKER =>
val brokerId = getBrokerId(resource)
val perBrokerConfig = brokerId.nonEmpty
val persistentProps = if (perBrokerConfig) adminZkClient.fetchEntityConfig(ConfigType.Broker, brokerId.get.toString)
else adminZkClient.fetchEntityConfig(ConfigType.Broker, ConfigEntityName.Default)
val configProps = this.config.dynamicConfig.fromPersistentProps(persistentProps, perBrokerConfig)
prepareIncrementalConfigs(alterConfigOps, configProps, KafkaConfig.configKeys)
alterBrokerConfigs(resource, validateOnly, configProps, configEntriesMap)
case resourceType =>
throw new InvalidRequestException(s"AlterConfigs is only supported for topics and brokers, but resource type is $resourceType")
}
} catch {
case e @ (_: ConfigException | _: IllegalArgumentException) =>
val message = s"Invalid config value for resource $resource: ${e.getMessage}"
info(message)
resource -> ApiError.fromThrowable(new InvalidRequestException(message, e))
case e: Throwable =>
// Log client errors at a lower level than unexpected exceptions
val message = s"Error processing alter configs request for resource $resource, config $alterConfigOps"
if (e.isInstanceOf[ApiException])
info(message, e)
else
error(message, e)
resource -> ApiError.fromThrowable(e)
}
}.toMap
}
private def prepareIncrementalConfigs(alterConfigOps: List[AlterConfigOp], configProps: Properties, configKeys: Map[String, ConfigKey]): Unit = {
def listType(configName: String, configKeys: Map[String, ConfigKey]): Boolean = {
val configKey = configKeys(configName)
if (configKey == null)
throw new InvalidConfigurationException(s"Unknown topic config name: $configName")
configKey.`type` == ConfigDef.Type.LIST
}
alterConfigOps.foreach { alterConfigOp =>
alterConfigOp.opType() match {
case OpType.SET => configProps.setProperty(alterConfigOp.configEntry().name(), alterConfigOp.configEntry().value())
case OpType.DELETE => configProps.remove(alterConfigOp.configEntry().name())
case OpType.APPEND => {
if (!listType(alterConfigOp.configEntry().name(), configKeys))
throw new InvalidRequestException(s"Config value append is not allowed for config key: ${alterConfigOp.configEntry().name()}")
val oldValueList = configProps.getProperty(alterConfigOp.configEntry().name()).split(",").toList
val newValueList = oldValueList ::: alterConfigOp.configEntry().value().split(",").toList
configProps.setProperty(alterConfigOp.configEntry().name(), newValueList.mkString(","))
}
case OpType.SUBTRACT => {
if (!listType(alterConfigOp.configEntry().name(), configKeys))
throw new InvalidRequestException(s"Config value subtract is not allowed for config key: ${alterConfigOp.configEntry().name()}")
val oldValueList = configProps.getProperty(alterConfigOp.configEntry().name()).split(",").toList
val newValueList = oldValueList.diff(alterConfigOp.configEntry().value().split(",").toList)
configProps.setProperty(alterConfigOp.configEntry().name(), newValueList.mkString(","))
}
}
}
}
def shutdown() {
topicPurgatory.shutdown()
CoreUtils.swallow(createTopicPolicy.foreach(_.close()), this)

View File

@ -39,6 +39,8 @@ import kafka.security.auth.{Resource, _}
import kafka.server.QuotaFactory.{QuotaManagers, UnboundedQuota}
import kafka.utils.{CoreUtils, Logging}
import kafka.zk.{AdminZkClient, KafkaZkClient}
import org.apache.kafka.clients.admin.{AlterConfigOp, ConfigEntry}
import org.apache.kafka.clients.admin.AlterConfigOp.OpType
import org.apache.kafka.common.acl.{AccessControlEntry, AclBinding}
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.errors._
@ -151,6 +153,7 @@ class KafkaApis(val requestChannel: RequestChannel,
case ApiKeys.DESCRIBE_DELEGATION_TOKEN => handleDescribeTokensRequest(request)
case ApiKeys.DELETE_GROUPS => handleDeleteGroupsRequest(request)
case ApiKeys.ELECT_PREFERRED_LEADERS => handleElectPreferredReplicaLeader(request)
case ApiKeys.INCREMENTAL_ALTER_CONFIGS => handleIncrementalAlterConfigsRequest(request)
}
} catch {
case e: FatalExitError => throw e
@ -2155,6 +2158,34 @@ class KafkaApis(val requestChannel: RequestChannel,
new ApiError(error, null)
}
def handleIncrementalAlterConfigsRequest(request: RequestChannel.Request): Unit = {
val alterConfigsRequest = request.body[IncrementalAlterConfigsRequest]
val configs = alterConfigsRequest.data().resources().iterator().asScala.map { alterConfigResource =>
val configResource = new ConfigResource(ConfigResource.Type.forId(alterConfigResource.resourceType()), alterConfigResource.resourceName())
configResource -> alterConfigResource.configs().iterator().asScala.map {
alterConfig => new AlterConfigOp(new ConfigEntry(alterConfig.name(), alterConfig.value()), OpType.forId(alterConfig.configOperation())) }.toList
}.toMap
val (authorizedResources, unauthorizedResources) = configs.partition { case (resource, _) =>
resource.`type` match {
case ConfigResource.Type.BROKER =>
authorize(request.session, AlterConfigs, Resource.ClusterResource)
case ConfigResource.Type.TOPIC =>
authorize(request.session, AlterConfigs, Resource(Topic, resource.name, LITERAL))
case rt => throw new InvalidRequestException(s"Unexpected resource type $rt")
}
}
val authorizedResult = adminManager.incrementalAlterConfigs(authorizedResources, alterConfigsRequest.data().validateOnly())
val unauthorizedResult = unauthorizedResources.keys.map { resource =>
resource -> configsAuthorizationApiError(request.session, resource)
}
sendResponseMaybeThrottle(request, requestThrottleMs =>
new IncrementalAlterConfigsResponse(IncrementalAlterConfigsResponse.toResponseData(requestThrottleMs,
(authorizedResult ++ unauthorizedResult).asJava)))
}
def handleDescribeConfigsRequest(request: RequestChannel.Request): Unit = {
val describeConfigsRequest = request.body[DescribeConfigsRequest]
val (authorizedResources, unauthorizedResources) = describeConfigsRequest.resources.asScala.partition { resource =>

View File

@ -1406,6 +1406,184 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
assertEquals(2, currentLeader(partition1))
assertEquals(2, currentLeader(partition2))
}
@Test
def testValidIncrementalAlterConfigs(): Unit = {
client = AdminClient.create(createConfig)
// Create topics
val topic1 = "incremental-alter-configs-topic-1"
val topic1Resource = new ConfigResource(ConfigResource.Type.TOPIC, topic1)
val topic1CreateConfigs = new Properties
topic1CreateConfigs.setProperty(LogConfig.RetentionMsProp, "60000000")
topic1CreateConfigs.setProperty(LogConfig.CleanupPolicyProp, LogConfig.Compact)
createTopic(topic1, numPartitions = 1, replicationFactor = 1, topic1CreateConfigs)
val topic2 = "incremental-alter-configs-topic-2"
val topic2Resource = new ConfigResource(ConfigResource.Type.TOPIC, topic2)
createTopic(topic2)
// Alter topic configs
var topic1AlterConfigs = Seq(
new AlterConfigOp(new ConfigEntry(LogConfig.FlushMsProp, "1000"), AlterConfigOp.OpType.SET),
new AlterConfigOp(new ConfigEntry(LogConfig.CleanupPolicyProp, LogConfig.Delete), AlterConfigOp.OpType.APPEND),
new AlterConfigOp(new ConfigEntry(LogConfig.RetentionMsProp, ""), AlterConfigOp.OpType.DELETE)
).asJavaCollection
val topic2AlterConfigs = Seq(
new AlterConfigOp(new ConfigEntry(LogConfig.MinCleanableDirtyRatioProp, "0.9"), AlterConfigOp.OpType.SET),
new AlterConfigOp(new ConfigEntry(LogConfig.CompressionTypeProp, "lz4"), AlterConfigOp.OpType.SET)
).asJavaCollection
var alterResult = client.incrementalAlterConfigs(Map(
topic1Resource -> topic1AlterConfigs,
topic2Resource -> topic2AlterConfigs
).asJava)
assertEquals(Set(topic1Resource, topic2Resource).asJava, alterResult.values.keySet)
alterResult.all.get
// Verify that topics were updated correctly
var describeResult = client.describeConfigs(Seq(topic1Resource, topic2Resource).asJava)
var configs = describeResult.all.get
assertEquals(2, configs.size)
assertEquals("1000", configs.get(topic1Resource).get(LogConfig.FlushMsProp).value)
assertEquals("compact,delete", configs.get(topic1Resource).get(LogConfig.CleanupPolicyProp).value)
assertEquals((Defaults.LogRetentionHours * 60 * 60 * 1000).toString, configs.get(topic1Resource).get(LogConfig.RetentionMsProp).value)
assertEquals("0.9", configs.get(topic2Resource).get(LogConfig.MinCleanableDirtyRatioProp).value)
assertEquals("lz4", configs.get(topic2Resource).get(LogConfig.CompressionTypeProp).value)
//verify subtract operation
topic1AlterConfigs = Seq(
new AlterConfigOp(new ConfigEntry(LogConfig.CleanupPolicyProp, LogConfig.Compact), AlterConfigOp.OpType.SUBTRACT)
).asJava
alterResult = client.incrementalAlterConfigs(Map(
topic1Resource -> topic1AlterConfigs
).asJava)
alterResult.all.get
// Verify that topics were updated correctly
describeResult = client.describeConfigs(Seq(topic1Resource).asJava)
configs = describeResult.all.get
assertEquals("delete", configs.get(topic1Resource).get(LogConfig.CleanupPolicyProp).value)
assertEquals("1000", configs.get(topic1Resource).get(LogConfig.FlushMsProp).value) // verify previous change is still intact
// Alter topics with validateOnly=true
topic1AlterConfigs = Seq(
new AlterConfigOp(new ConfigEntry(LogConfig.CleanupPolicyProp, LogConfig.Compact), AlterConfigOp.OpType.APPEND)
).asJava
alterResult = client.incrementalAlterConfigs(Map(
topic1Resource -> topic1AlterConfigs
).asJava, new AlterConfigsOptions().validateOnly(true))
alterResult.all.get
// Verify that topics were not updated due to validateOnly = true
describeResult = client.describeConfigs(Seq(topic1Resource).asJava)
configs = describeResult.all.get
assertEquals("delete", configs.get(topic1Resource).get(LogConfig.CleanupPolicyProp).value)
//Alter topics with validateOnly=true with invalid configs
topic1AlterConfigs = Seq(
new AlterConfigOp(new ConfigEntry(LogConfig.CompressionTypeProp, "zip"), AlterConfigOp.OpType.SET)
).asJava
alterResult = client.incrementalAlterConfigs(Map(
topic1Resource -> topic1AlterConfigs
).asJava, new AlterConfigsOptions().validateOnly(true))
assertFutureExceptionTypeEquals(alterResult.values().get(topic1Resource), classOf[InvalidRequestException],
Some("Invalid config value for resource"))
}
@Test
def testInvalidIncrementalAlterConfigs(): Unit = {
client = AdminClient.create(createConfig)
// Create topics
val topic1 = "incremental-alter-configs-topic-1"
val topic1Resource = new ConfigResource(ConfigResource.Type.TOPIC, topic1)
createTopic(topic1)
val topic2 = "incremental-alter-configs-topic-2"
val topic2Resource = new ConfigResource(ConfigResource.Type.TOPIC, topic2)
createTopic(topic2)
//Add duplicate Keys for topic1
var topic1AlterConfigs = Seq(
new AlterConfigOp(new ConfigEntry(LogConfig.MinCleanableDirtyRatioProp, "0.75"), AlterConfigOp.OpType.SET),
new AlterConfigOp(new ConfigEntry(LogConfig.MinCleanableDirtyRatioProp, "0.65"), AlterConfigOp.OpType.SET),
new AlterConfigOp(new ConfigEntry(LogConfig.CompressionTypeProp, "gzip"), AlterConfigOp.OpType.SET) // valid entry
).asJavaCollection
//Add valid config for topic2
var topic2AlterConfigs = Seq(
new AlterConfigOp(new ConfigEntry(LogConfig.MinCleanableDirtyRatioProp, "0.9"), AlterConfigOp.OpType.SET)
).asJavaCollection
var alterResult = client.incrementalAlterConfigs(Map(
topic1Resource -> topic1AlterConfigs,
topic2Resource -> topic2AlterConfigs
).asJava)
assertEquals(Set(topic1Resource, topic2Resource).asJava, alterResult.values.keySet)
//InvalidRequestException error for topic1
assertFutureExceptionTypeEquals(alterResult.values().get(topic1Resource), classOf[InvalidRequestException],
Some("Error due to duplicate config keys"))
//operation should succeed for topic2
alterResult.values().get(topic2Resource).get()
// Verify that topic1 is not config not updated, and topic2 config is updated
val describeResult = client.describeConfigs(Seq(topic1Resource, topic2Resource).asJava)
val configs = describeResult.all.get
assertEquals(2, configs.size)
assertEquals(Defaults.LogCleanerMinCleanRatio.toString, configs.get(topic1Resource).get(LogConfig.MinCleanableDirtyRatioProp).value)
assertEquals(Defaults.CompressionType.toString, configs.get(topic1Resource).get(LogConfig.CompressionTypeProp).value)
assertEquals("0.9", configs.get(topic2Resource).get(LogConfig.MinCleanableDirtyRatioProp).value)
//check invalid use of append/subtract operation types
topic1AlterConfigs = Seq(
new AlterConfigOp(new ConfigEntry(LogConfig.CompressionTypeProp, "gzip"), AlterConfigOp.OpType.APPEND)
).asJavaCollection
topic2AlterConfigs = Seq(
new AlterConfigOp(new ConfigEntry(LogConfig.CompressionTypeProp, "snappy"), AlterConfigOp.OpType.SUBTRACT)
).asJavaCollection
alterResult = client.incrementalAlterConfigs(Map(
topic1Resource -> topic1AlterConfigs,
topic2Resource -> topic2AlterConfigs
).asJava)
assertEquals(Set(topic1Resource, topic2Resource).asJava, alterResult.values.keySet)
assertFutureExceptionTypeEquals(alterResult.values().get(topic1Resource), classOf[InvalidRequestException],
Some("Config value append is not allowed for config"))
assertFutureExceptionTypeEquals(alterResult.values().get(topic2Resource), classOf[InvalidRequestException],
Some("Config value subtract is not allowed for config"))
//try to add invalid config
topic1AlterConfigs = Seq(
new AlterConfigOp(new ConfigEntry(LogConfig.MinCleanableDirtyRatioProp, "1.1"), AlterConfigOp.OpType.SET)
).asJavaCollection
alterResult = client.incrementalAlterConfigs(Map(
topic1Resource -> topic1AlterConfigs
).asJava)
assertEquals(Set(topic1Resource).asJava, alterResult.values.keySet)
assertFutureExceptionTypeEquals(alterResult.values().get(topic1Resource), classOf[InvalidRequestException],
Some("Invalid config value for resource"))
}
}
object AdminClientIntegrationTest {

View File

@ -25,7 +25,7 @@ import kafka.network.SocketServer
import kafka.security.auth._
import kafka.server.{BaseRequestTest, KafkaConfig}
import kafka.utils.TestUtils
import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig}
import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig, AlterConfigOp}
import org.apache.kafka.clients.consumer._
import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener
import org.apache.kafka.clients.producer._
@ -33,8 +33,9 @@ import org.apache.kafka.common.acl.{AccessControlEntry, AccessControlEntryFilter
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.errors._
import org.apache.kafka.common.internals.Topic.GROUP_METADATA_TOPIC_NAME
import org.apache.kafka.common.message.{ControlledShutdownRequestData, CreateTopicsRequestData, DeleteTopicsRequestData, DescribeGroupsRequestData, JoinGroupRequestData, LeaveGroupRequestData}
import org.apache.kafka.common.message._
import org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic, CreatableTopicSet}
import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.{AlterConfigsResource, AlterableConfig, AlterableConfigSet}
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record.{CompressionType, MemoryRecords, Records, SimpleRecord}
@ -148,8 +149,9 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
ApiKeys.ALTER_REPLICA_LOG_DIRS -> classOf[AlterReplicaLogDirsResponse],
ApiKeys.DESCRIBE_LOG_DIRS -> classOf[DescribeLogDirsResponse],
ApiKeys.CREATE_PARTITIONS -> classOf[CreatePartitionsResponse],
ApiKeys.ELECT_PREFERRED_LEADERS -> classOf[ElectPreferredLeadersResponse]
)
ApiKeys.ELECT_PREFERRED_LEADERS -> classOf[ElectPreferredLeadersResponse],
ApiKeys.INCREMENTAL_ALTER_CONFIGS -> classOf[IncrementalAlterConfigsResponse]
)
val requestKeyToError = Map[ApiKeys, Nothing => Errors](
ApiKeys.METADATA -> ((resp: requests.MetadataResponse) => resp.errors.asScala.find(_._1 == topic).getOrElse(("test", Errors.NONE))._2),
@ -194,7 +196,9 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
if (resp.logDirInfos.size() > 0) resp.logDirInfos.asScala.head._2.error else Errors.CLUSTER_AUTHORIZATION_FAILED),
ApiKeys.CREATE_PARTITIONS -> ((resp: CreatePartitionsResponse) => resp.errors.asScala.find(_._1 == topic).get._2.error),
ApiKeys.ELECT_PREFERRED_LEADERS -> ((resp: ElectPreferredLeadersResponse) =>
ElectPreferredLeadersRequest.fromResponseData(resp.data()).get(tp).error())
ElectPreferredLeadersRequest.fromResponseData(resp.data()).get(tp).error()),
ApiKeys.INCREMENTAL_ALTER_CONFIGS -> ((resp: IncrementalAlterConfigsResponse) =>
IncrementalAlterConfigsResponse.fromResponseData(resp.data()).get(new ConfigResource(ConfigResource.Type.TOPIC, tp.topic)).error)
)
val requestKeysToAcls = Map[ApiKeys, Map[Resource, Set[Acl]]](
@ -233,7 +237,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
ApiKeys.ALTER_REPLICA_LOG_DIRS -> clusterAlterAcl,
ApiKeys.DESCRIBE_LOG_DIRS -> clusterDescribeAcl,
ApiKeys.CREATE_PARTITIONS -> topicAlterAcl,
ApiKeys.ELECT_PREFERRED_LEADERS -> clusterAlterAcl
ApiKeys.ELECT_PREFERRED_LEADERS -> clusterAlterAcl,
ApiKeys.INCREMENTAL_ALTER_CONFIGS -> topicAlterConfigsAcl
)
@Before
@ -392,6 +397,19 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
new AlterConfigsRequest.ConfigEntry(LogConfig.MaxMessageBytesProp, "1000000")
))), true).build()
private def incrementalAlterConfigsRequest = {
val data = new IncrementalAlterConfigsRequestData
val alterableConfig = new AlterableConfig
alterableConfig.setName(LogConfig.MaxMessageBytesProp).
setValue("1000000").setConfigOperation(AlterConfigOp.OpType.SET.id())
val alterableConfigSet = new AlterableConfigSet
alterableConfigSet.add(alterableConfig)
data.resources().add(new AlterConfigsResource().
setResourceName(tp.topic).setResourceType(ConfigResource.Type.TOPIC.id()).
setConfigs(alterableConfigSet))
new IncrementalAlterConfigsRequest.Builder(data).build()
}
private def describeAclsRequest = new DescribeAclsRequest.Builder(AclBindingFilter.ANY).build()
private def createAclsRequest = new CreateAclsRequest.Builder(
@ -449,7 +467,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
ApiKeys.ADD_OFFSETS_TO_TXN -> addOffsetsToTxnRequest,
// Check StopReplica last since some APIs depend on replica availability
ApiKeys.STOP_REPLICA -> stopReplicaRequest,
ApiKeys.ELECT_PREFERRED_LEADERS -> electPreferredLeadersRequest
ApiKeys.ELECT_PREFERRED_LEADERS -> electPreferredLeadersRequest,
ApiKeys.INCREMENTAL_ALTER_CONFIGS -> incrementalAlterConfigsRequest
)
for ((key, request) <- requestKeyToRequest) {

View File

@ -39,6 +39,7 @@ import kafka.utils._
import kafka.utils.Implicits._
import kafka.zk.{ConfigEntityChangeNotificationZNode, ZooKeeperTestHarness}
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.admin.AlterConfigOp.OpType
import org.apache.kafka.clients.admin.ConfigEntry.{ConfigSource, ConfigSynonym}
import org.apache.kafka.clients.admin._
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, ConsumerRecords, KafkaConsumer}
@ -329,7 +330,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
Files.copy(Paths.get(combinedStoreProps.getProperty(SSL_TRUSTSTORE_LOCATION_CONFIG)),
Paths.get(sslProperties1.getProperty(SSL_TRUSTSTORE_LOCATION_CONFIG)),
StandardCopyOption.REPLACE_EXISTING)
TestUtils.alterConfigs(servers, adminClients.head, oldTruststoreProps, perBrokerConfig = true).all.get()
TestUtils.incrementalAlterConfigs(servers, adminClients.head, oldTruststoreProps, perBrokerConfig = true).all.get()
verifySslProduceConsume(sslProperties1, "alter-truststore-4")
verifySslProduceConsume(sslProperties2, "alter-truststore-5")
}
@ -509,7 +510,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
// Enable unclean leader election
val newProps = new Properties
newProps.put(KafkaConfig.UncleanLeaderElectionEnableProp, "true")
TestUtils.alterConfigs(servers, adminClients.head, newProps, perBrokerConfig = false).all.get
TestUtils.incrementalAlterConfigs(servers, adminClients.head, newProps, perBrokerConfig = false).all.get
waitForConfigOnServer(controller, KafkaConfig.UncleanLeaderElectionEnableProp, "true")
// Verify that the old follower with missing records is elected as the new leader
@ -908,7 +909,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
val unknownConfig = "some.config"
props.put(unknownConfig, "some.config.value")
TestUtils.alterConfigs(servers, adminClients.head, props, perBrokerConfig = true).all.get
TestUtils.incrementalAlterConfigs(servers, adminClients.head, props, perBrokerConfig = true).all.get
TestUtils.waitUntilTrue(() => servers.forall(server => server.config.listeners.size == existingListenerCount + 1),
"Listener config not updated")
@ -971,11 +972,14 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
.mkString(",")
val props = fetchBrokerConfigsFromZooKeeper(servers.head)
val listenerProps = props.asScala.keySet.filter(_.startsWith(listenerPrefix(listenerName)))
listenerProps.foreach(props.remove)
val deleteListenerProps = new Properties()
deleteListenerProps ++= props.asScala.filter(entry => entry._1.startsWith(listenerPrefix(listenerName)))
TestUtils.incrementalAlterConfigs(servers, adminClients.head, deleteListenerProps, perBrokerConfig = true, opType = OpType.DELETE).all.get
props.clear()
props.put(KafkaConfig.ListenersProp, listeners)
props.put(KafkaConfig.ListenerSecurityProtocolMapProp, listenerMap)
TestUtils.alterConfigs(servers, adminClients.head, props, perBrokerConfig = true).all.get
TestUtils.incrementalAlterConfigs(servers, adminClients.head, props, perBrokerConfig = true).all.get
TestUtils.waitUntilTrue(() => servers.forall(server => server.config.listeners.size == existingListenerCount - 1),
"Listeners not updated")

View File

@ -22,9 +22,9 @@ import kafka.log.LogConfig
import kafka.network.RequestChannel.Session
import kafka.security.auth._
import kafka.utils.TestUtils
import org.apache.kafka.common.acl.{AccessControlEntry, AccessControlEntryFilter, AclBinding, AclBindingFilter, AclOperation, AclPermissionType}
import org.apache.kafka.common.acl._
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.message.{CreateTopicsRequestData, DeleteTopicsRequestData, DescribeGroupsRequestData, ElectPreferredLeadersRequestData, InitProducerIdRequestData, JoinGroupRequestData, LeaveGroupRequestData, SaslAuthenticateRequestData, SaslHandshakeRequestData}
import org.apache.kafka.common.message._
import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourcePatternFilter, ResourceType => AdminResourceType}
import org.apache.kafka.common.{Node, TopicPartition}
import org.apache.kafka.common.message.ControlledShutdownRequestData
@ -402,6 +402,10 @@ class RequestQuotaTest extends BaseRequestTest {
.setTimeoutMs(0)
.setTopicPartitions(Collections.singletonList(partition)))
case ApiKeys.INCREMENTAL_ALTER_CONFIGS =>
new IncrementalAlterConfigsRequest.Builder(
new IncrementalAlterConfigsRequestData())
case _ =>
throw new IllegalArgumentException("Unsupported API key " + apiKey)
}
@ -501,6 +505,8 @@ class RequestQuotaTest extends BaseRequestTest {
case ApiKeys.DELETE_GROUPS => new DeleteGroupsResponse(response).throttleTimeMs
case ApiKeys.OFFSET_FOR_LEADER_EPOCH => new OffsetsForLeaderEpochResponse(response).throttleTimeMs
case ApiKeys.ELECT_PREFERRED_LEADERS => new ElectPreferredLeadersResponse(response).throttleTimeMs
case ApiKeys.INCREMENTAL_ALTER_CONFIGS =>
new IncrementalAlterConfigsResponse(response, ApiKeys.INCREMENTAL_ALTER_CONFIGS.latestVersion()).throttleTimeMs
case requestId => throw new IllegalArgumentException(s"No throttle time for $requestId")
}
}

View File

@ -26,8 +26,8 @@ import java.security.cert.X509Certificate
import java.time.Duration
import java.util.{Collections, Properties}
import java.util.concurrent.{Callable, ExecutionException, Executors, TimeUnit}
import javax.net.ssl.X509TrustManager
import javax.net.ssl.X509TrustManager
import kafka.api._
import kafka.cluster.{Broker, EndPoint}
import kafka.log._
@ -38,7 +38,8 @@ import Implicits._
import kafka.controller.LeaderIsrAndControllerEpoch
import kafka.zk._
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.admin.{AdminClient, AlterConfigsResult, Config, ConfigEntry}
import org.apache.kafka.clients.admin.AlterConfigOp.OpType
import org.apache.kafka.clients.admin._
import org.apache.kafka.clients.consumer._
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.{KafkaFuture, TopicPartition}
@ -1410,6 +1411,20 @@ object TestUtils extends Logging {
adminClient.alterConfigs(configs)
}
def incrementalAlterConfigs(servers: Seq[KafkaServer], adminClient: AdminClient, props: Properties,
perBrokerConfig: Boolean, opType: OpType = OpType.SET): AlterConfigsResult = {
val configEntries = props.asScala.map { case (k, v) => new AlterConfigOp(new ConfigEntry(k, v), opType) }.toList.asJavaCollection
val configs = if (perBrokerConfig) {
servers.map { server =>
val resource = new ConfigResource(ConfigResource.Type.BROKER, server.config.brokerId.toString)
(resource, configEntries)
}.toMap.asJava
} else {
Map(new ConfigResource(ConfigResource.Type.BROKER, "") -> configEntries).asJava
}
adminClient.incrementalAlterConfigs(configs)
}
def alterTopicConfigs(adminClient: AdminClient, topic: String, topicConfigs: Properties): AlterConfigsResult = {
val configEntries = topicConfigs.asScala.map { case (k, v) => new ConfigEntry(k, v) }.toList.asJava
val newConfig = new Config(configEntries)
@ -1451,15 +1466,18 @@ object TestUtils extends Logging {
(out.toString, err.toString)
}
def assertFutureExceptionTypeEquals(future: KafkaFuture[_], clazz: Class[_ <: Throwable]): Unit = {
def assertFutureExceptionTypeEquals(future: KafkaFuture[_], clazz: Class[_ <: Throwable],
expectedErrorMessage: Option[String] = None): Unit = {
try {
future.get()
fail("Expected CompletableFuture.get to return an exception")
} catch {
case e: ExecutionException =>
val cause = e.getCause()
val cause = e.getCause
assertTrue("Expected an exception of type " + clazz.getName + "; got type " +
cause.getClass().getName, clazz.isInstance(cause))
cause.getClass.getName, clazz.isInstance(cause))
expectedErrorMessage.foreach(message => assertTrue(s"Received error message : ${cause.getMessage}" +
s" does not contain expected error message : $message", cause.getMessage.contains(message)))
}
}