diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index a0bf7400407..0d316c5774f 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -116,6 +116,7 @@ + @@ -140,6 +141,7 @@ + diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java index 144987e8494..3973701063d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -676,7 +676,8 @@ public class NetworkClient implements KafkaClient { public static AbstractResponse parseResponse(ByteBuffer responseBuffer, RequestHeader requestHeader) { Struct responseStruct = parseStructMaybeUpdateThrottleTimeMetrics(responseBuffer, requestHeader, null, 0); - return AbstractResponse.parseResponse(requestHeader.apiKey(), responseStruct); + return AbstractResponse.parseResponse(requestHeader.apiKey(), responseStruct, + requestHeader.apiVersion()); } private static Struct parseStructMaybeUpdateThrottleTimeMetrics(ByteBuffer responseBuffer, RequestHeader requestHeader, @@ -811,7 +812,8 @@ public class NetworkClient implements KafkaClient { req.header.apiKey(), req.header.correlationId(), responseStruct); } // If the received response includes a throttle delay, throttle the connection. - AbstractResponse body = AbstractResponse.parseResponse(req.header.apiKey(), responseStruct); + AbstractResponse body = AbstractResponse. + parseResponse(req.header.apiKey(), responseStruct, req.header.apiVersion()); maybeThrottle(body, req.header.apiVersion(), req.destination, now); if (req.isInternalRequest && body instanceof MetadataResponse) metadataUpdater.handleCompletedMetadataResponse(req.header, now, (MetadataResponse) body); diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java index bdd7cc36169..b823cdc0509 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java @@ -792,6 +792,58 @@ public abstract class AdminClient implements AutoCloseable { return deleteConsumerGroups(groupIds, new DeleteConsumerGroupsOptions()); } + /** + * Elect the preferred broker of the given {@code partitions} as leader, or + * elect the preferred broker for all partitions as leader if the argument to {@code partitions} is null. + * + * This is a convenience method for {@link #electPreferredLeaders(Collection, ElectPreferredLeadersOptions)} + * with default options. + * See the overload for more details. + * + * @param partitions The partitions for which the preferred leader should be elected. + * @return The ElectPreferredLeadersResult. + */ + public ElectPreferredLeadersResult electPreferredLeaders(Collection partitions) { + return electPreferredLeaders(partitions, new ElectPreferredLeadersOptions()); + } + + /** + * Elect the preferred broker of the given {@code partitions} as leader, or + * elect the preferred broker for all partitions as leader if the argument to {@code partitions} is null. + * + * This operation is not transactional so it may succeed for some partitions while fail for others. + * + * It may take several seconds after this method returns + * success for all the brokers in the cluster to become aware that the partitions have new leaders. + * During this time, {@link AdminClient#describeTopics(Collection)} + * may not return information about the partitions' new leaders. + * + * This operation is supported by brokers with version 2.2.0 or higher. + * + *

The following exceptions can be anticipated when calling {@code get()} on the futures obtained from + * the returned {@code ElectPreferredLeadersResult}:

+ *
    + *
  • {@link org.apache.kafka.common.errors.ClusterAuthorizationException} + * if the authenticated user didn't have alter access to the cluster.
  • + *
  • {@link org.apache.kafka.common.errors.UnknownTopicOrPartitionException} + * if the topic or partition did not exist within the cluster.
  • + *
  • {@link org.apache.kafka.common.errors.InvalidTopicException} + * if the topic was already queued for deletion.
  • + *
  • {@link org.apache.kafka.common.errors.NotControllerException} + * if the request was sent to a broker that was not the controller for the cluster.
  • + *
  • {@link org.apache.kafka.common.errors.TimeoutException} + * if the request timed out before the election was complete.
  • + *
  • {@link org.apache.kafka.common.errors.LeaderNotAvailableException} + * if the preferred leader was not alive or not in the ISR.
  • + *
+ * + * @param partitions The partitions for which the preferred leader should be elected. + * @param options The options to use when electing the preferred leaders. + * @return The ElectPreferredLeadersResult. + */ + public abstract ElectPreferredLeadersResult electPreferredLeaders(Collection partitions, + ElectPreferredLeadersOptions options); + /** * Get the metrics kept by the adminClient */ diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ElectPreferredLeadersOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/ElectPreferredLeadersOptions.java new file mode 100644 index 00000000000..80b00975f7d --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/ElectPreferredLeadersOptions.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.annotation.InterfaceStability; + +import java.util.Collection; + +/** + * Options for {@link AdminClient#electPreferredLeaders(Collection, ElectPreferredLeadersOptions)}. + * + * The API of this class is evolving, see {@link AdminClient} for details. + */ +@InterfaceStability.Evolving +public class ElectPreferredLeadersOptions extends AbstractOptions { +} diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ElectPreferredLeadersResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/ElectPreferredLeadersResult.java new file mode 100644 index 00000000000..c76336a7cbc --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/ElectPreferredLeadersResult.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.annotation.InterfaceStability; +import org.apache.kafka.common.errors.ApiException; +import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; +import org.apache.kafka.common.internals.KafkaFutureImpl; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ApiError; + +import java.util.Collection; +import java.util.Map; +import java.util.Set; + +/** + * The result of {@link AdminClient#electPreferredLeaders(Collection, ElectPreferredLeadersOptions)} + * + * The API of this class is evolving, see {@link AdminClient} for details. + */ +@InterfaceStability.Evolving +public class ElectPreferredLeadersResult { + + private final KafkaFutureImpl> electionFuture; + private final Set partitions; + + ElectPreferredLeadersResult(KafkaFutureImpl> electionFuture, Set partitions) { + this.electionFuture = electionFuture; + this.partitions = partitions; + } + + /** + * Get the result of the election for the given {@code partition}. + * If there was not an election triggered for the given {@code partition}, the + * returned future will complete with an error. + */ + public KafkaFuture partitionResult(final TopicPartition partition) { + final KafkaFutureImpl result = new KafkaFutureImpl<>(); + electionFuture.whenComplete(new KafkaFuture.BiConsumer, Throwable>() { + @Override + public void accept(Map topicPartitions, Throwable throwable) { + if (throwable != null) { + result.completeExceptionally(throwable); + } else if (!topicPartitions.containsKey(partition)) { + result.completeExceptionally(new UnknownTopicOrPartitionException( + "Preferred leader election for partition \"" + partition + + "\" was not attempted")); + } else { + if (partitions == null && topicPartitions.isEmpty()) { + result.completeExceptionally(Errors.CLUSTER_AUTHORIZATION_FAILED.exception()); + } + ApiException exception = topicPartitions.get(partition).exception(); + if (exception == null) { + result.complete(null); + } else { + result.completeExceptionally(exception); + } + } + } + }); + return result; + } + + /** + *

Get a future for the topic partitions for which a leader election + * was attempted. A partition will be present in this result if + * an election was attempted even if the election was not successful.

+ * + *

This method is provided to discover the partitions attempted when + * {@link AdminClient#electPreferredLeaders(Collection)} is called + * with a null {@code partitions} argument.

+ */ + public KafkaFuture> partitions() { + if (partitions != null) { + return KafkaFutureImpl.completedFuture(this.partitions); + } else { + final KafkaFutureImpl> result = new KafkaFutureImpl<>(); + electionFuture.whenComplete(new KafkaFuture.BiConsumer, Throwable>() { + @Override + public void accept(Map topicPartitions, Throwable throwable) { + if (throwable != null) { + result.completeExceptionally(throwable); + } else if (topicPartitions.isEmpty()) { + result.completeExceptionally(Errors.CLUSTER_AUTHORIZATION_FAILED.exception()); + } else { + for (ApiError apiError : topicPartitions.values()) { + if (apiError.isFailure()) { + result.completeExceptionally(apiError.exception()); + } + } + result.complete(topicPartitions.keySet()); + } + } + }); + return result; + } + } + + /** + * Return a future which succeeds if all the topic elections succeed. + */ + public KafkaFuture all() { + final KafkaFutureImpl result = new KafkaFutureImpl<>(); + electionFuture.thenApply(new KafkaFuture.Function, Void>() { + @Override + public Void apply(Map topicPartitions) { + for (ApiError apiError : topicPartitions.values()) { + if (apiError.isFailure()) { + result.completeExceptionally(apiError.exception()); + return null; + } + } + result.complete(null); + return null; + } + }); + return result; + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 2ba3cf225b4..58baab79c83 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -105,6 +105,8 @@ import org.apache.kafka.common.requests.DescribeGroupsRequest; import org.apache.kafka.common.requests.DescribeGroupsResponse; import org.apache.kafka.common.requests.DescribeLogDirsRequest; import org.apache.kafka.common.requests.DescribeLogDirsResponse; +import org.apache.kafka.common.requests.ElectPreferredLeadersRequest; +import org.apache.kafka.common.requests.ElectPreferredLeadersResponse; import org.apache.kafka.common.requests.ExpireDelegationTokenRequest; import org.apache.kafka.common.requests.ExpireDelegationTokenResponse; import org.apache.kafka.common.requests.FindCoordinatorRequest; @@ -2777,4 +2779,35 @@ public class KafkaAdminClient extends AdminClient { public Map metrics() { return Collections.unmodifiableMap(this.metrics.metrics()); } + + @Override + public ElectPreferredLeadersResult electPreferredLeaders(final Collection partitions, + ElectPreferredLeadersOptions options) { + final Set partitionSet = partitions != null ? new HashSet<>(partitions) : null; + final KafkaFutureImpl> electionFuture = new KafkaFutureImpl<>(); + final long now = time.milliseconds(); + runnable.call(new Call("electPreferredLeaders", calcDeadlineMs(now, options.timeoutMs()), + new ControllerNodeProvider()) { + + @Override + public AbstractRequest.Builder createRequest(int timeoutMs) { + return new ElectPreferredLeadersRequest.Builder( + ElectPreferredLeadersRequest.toRequestData(partitions, timeoutMs)); + } + + @Override + public void handleResponse(AbstractResponse abstractResponse) { + ElectPreferredLeadersResponse response = (ElectPreferredLeadersResponse) abstractResponse; + electionFuture.complete( + ElectPreferredLeadersRequest.fromResponseData(response.data())); + } + + @Override + void handleFailure(Throwable throwable) { + electionFuture.completeExceptionally(throwable); + } + }, now); + return new ElectPreferredLeadersResult(electionFuture, partitionSet); + } + } diff --git a/clients/src/main/java/org/apache/kafka/common/errors/PreferredLeaderNotAvailableException.java b/clients/src/main/java/org/apache/kafka/common/errors/PreferredLeaderNotAvailableException.java new file mode 100644 index 00000000000..73dfd64ac6d --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/errors/PreferredLeaderNotAvailableException.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.errors; + +public class PreferredLeaderNotAvailableException extends InvalidMetadataException { + + public PreferredLeaderNotAvailableException(String message) { + super(message); + } + + public PreferredLeaderNotAvailableException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java index 3d7710025ba..80b118b1356 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java @@ -16,6 +16,8 @@ */ package org.apache.kafka.common.protocol; +import org.apache.kafka.common.message.ElectPreferredLeadersRequestData; +import org.apache.kafka.common.message.ElectPreferredLeadersResponseData; import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.SchemaException; import org.apache.kafka.common.protocol.types.Struct; @@ -35,10 +37,10 @@ import org.apache.kafka.common.requests.ControlledShutdownRequest; import org.apache.kafka.common.requests.ControlledShutdownResponse; import org.apache.kafka.common.requests.CreateAclsRequest; import org.apache.kafka.common.requests.CreateAclsResponse; -import org.apache.kafka.common.requests.CreatePartitionsRequest; -import org.apache.kafka.common.requests.CreatePartitionsResponse; import org.apache.kafka.common.requests.CreateDelegationTokenRequest; import org.apache.kafka.common.requests.CreateDelegationTokenResponse; +import org.apache.kafka.common.requests.CreatePartitionsRequest; +import org.apache.kafka.common.requests.CreatePartitionsResponse; import org.apache.kafka.common.requests.CreateTopicsRequest; import org.apache.kafka.common.requests.CreateTopicsResponse; import org.apache.kafka.common.requests.DeleteAclsRequest; @@ -53,12 +55,12 @@ import org.apache.kafka.common.requests.DescribeAclsRequest; import org.apache.kafka.common.requests.DescribeAclsResponse; import org.apache.kafka.common.requests.DescribeConfigsRequest; import org.apache.kafka.common.requests.DescribeConfigsResponse; +import org.apache.kafka.common.requests.DescribeDelegationTokenRequest; +import org.apache.kafka.common.requests.DescribeDelegationTokenResponse; import org.apache.kafka.common.requests.DescribeGroupsRequest; import org.apache.kafka.common.requests.DescribeGroupsResponse; import org.apache.kafka.common.requests.DescribeLogDirsRequest; import org.apache.kafka.common.requests.DescribeLogDirsResponse; -import org.apache.kafka.common.requests.DescribeDelegationTokenRequest; -import org.apache.kafka.common.requests.DescribeDelegationTokenResponse; import org.apache.kafka.common.requests.EndTxnRequest; import org.apache.kafka.common.requests.EndTxnResponse; import org.apache.kafka.common.requests.ExpireDelegationTokenRequest; @@ -186,7 +188,9 @@ public enum ApiKeys { RENEW_DELEGATION_TOKEN(39, "RenewDelegationToken", RenewDelegationTokenRequest.schemaVersions(), RenewDelegationTokenResponse.schemaVersions()), EXPIRE_DELEGATION_TOKEN(40, "ExpireDelegationToken", ExpireDelegationTokenRequest.schemaVersions(), ExpireDelegationTokenResponse.schemaVersions()), DESCRIBE_DELEGATION_TOKEN(41, "DescribeDelegationToken", DescribeDelegationTokenRequest.schemaVersions(), DescribeDelegationTokenResponse.schemaVersions()), - DELETE_GROUPS(42, "DeleteGroups", DeleteGroupsRequest.schemaVersions(), DeleteGroupsResponse.schemaVersions()); + DELETE_GROUPS(42, "DeleteGroups", DeleteGroupsRequest.schemaVersions(), DeleteGroupsResponse.schemaVersions()), + ELECT_PREFERRED_LEADERS(43, "ElectPreferredLeaders", ElectPreferredLeadersRequestData.SCHEMAS, + ElectPreferredLeadersResponseData.SCHEMAS); private static final ApiKeys[] ID_TO_TYPE; private static final int MIN_API_KEY = 0; diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java index 14ca06da8e4..5bcff4363dc 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java @@ -72,6 +72,7 @@ import org.apache.kafka.common.errors.OffsetOutOfRangeException; import org.apache.kafka.common.errors.OperationNotAttemptedException; import org.apache.kafka.common.errors.OutOfOrderSequenceException; import org.apache.kafka.common.errors.PolicyViolationException; +import org.apache.kafka.common.errors.PreferredLeaderNotAvailableException; import org.apache.kafka.common.errors.ProducerFencedException; import org.apache.kafka.common.errors.ReassignmentInProgressException; import org.apache.kafka.common.errors.RebalanceInProgressException; @@ -297,7 +298,9 @@ public enum Errors { "election so the offsets cannot be guaranteed to be monotonically increasing", OffsetNotAvailableException::new), MEMBER_ID_REQUIRED(79, "The group member needs to have a valid member id before actually entering a consumer group", - MemberIdRequiredException::new); + MemberIdRequiredException::new), + PREFERRED_LEADER_NOT_AVAILABLE(80, "The preferred leader was not available", + PreferredLeaderNotAvailableException::new); private static final Logger log = LoggerFactory.getLogger(Errors.class); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java index d16e60f84db..239024f8632 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java @@ -227,6 +227,8 @@ public abstract class AbstractRequest extends AbstractRequestResponse { return new DescribeDelegationTokenRequest(struct, apiVersion); case DELETE_GROUPS: return new DeleteGroupsRequest(struct, apiVersion); + case ELECT_PREFERRED_LEADERS: + return new ElectPreferredLeadersRequest(struct, apiVersion); default: throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseRequest`, the " + "code should be updated to do so.", apiKey)); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java index c0ebef1d967..036814c5ada 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java @@ -68,7 +68,7 @@ public abstract class AbstractResponse extends AbstractRequestResponse { protected abstract Struct toStruct(short version); - public static AbstractResponse parseResponse(ApiKeys apiKey, Struct struct) { + public static AbstractResponse parseResponse(ApiKeys apiKey, Struct struct, short version) { switch (apiKey) { case PRODUCE: return new ProduceResponse(struct); @@ -156,6 +156,8 @@ public abstract class AbstractResponse extends AbstractRequestResponse { return new DescribeDelegationTokenResponse(struct); case DELETE_GROUPS: return new DeleteGroupsResponse(struct); + case ELECT_PREFERRED_LEADERS: + return new ElectPreferredLeadersResponse(struct, version); default: throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseResponse`, the " + "code should be updated to do so.", apiKey)); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ElectPreferredLeadersRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ElectPreferredLeadersRequest.java new file mode 100644 index 00000000000..ab96e3ba9e9 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/ElectPreferredLeadersRequest.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.message.ElectPreferredLeadersRequestData; +import org.apache.kafka.common.message.ElectPreferredLeadersRequestData.TopicPartitions; +import org.apache.kafka.common.message.ElectPreferredLeadersResponseData; +import org.apache.kafka.common.message.ElectPreferredLeadersResponseData.ReplicaElectionResult; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.types.Struct; +import org.apache.kafka.common.utils.CollectionUtils; + +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class ElectPreferredLeadersRequest extends AbstractRequest { + public static class Builder extends AbstractRequest.Builder { + private final ElectPreferredLeadersRequestData data; + + public Builder(ElectPreferredLeadersRequestData data) { + super(ApiKeys.ELECT_PREFERRED_LEADERS); + this.data = data; + } + + @Override + public ElectPreferredLeadersRequest build(short version) { + return new ElectPreferredLeadersRequest(data, version); + } + + @Override + public String toString() { + return data.toString(); + } + } + + public static ElectPreferredLeadersRequestData toRequestData(Collection partitions, int timeoutMs) { + ElectPreferredLeadersRequestData d = new ElectPreferredLeadersRequestData() + .setTimeoutMs(timeoutMs); + if (partitions != null) { + for (Map.Entry> tp : CollectionUtils.groupPartitionsByTopic(partitions).entrySet()) { + d.topicPartitions().add(new ElectPreferredLeadersRequestData.TopicPartitions().setTopic(tp.getKey()).setPartitionId(tp.getValue())); + } + } else { + d.setTopicPartitions(null); + } + return d; + } + + public static Map fromResponseData(ElectPreferredLeadersResponseData data) { + Map map = new HashMap<>(); + for (ElectPreferredLeadersResponseData.ReplicaElectionResult topicResults : data.replicaElectionResults()) { + for (ElectPreferredLeadersResponseData.PartitionResult partitionResult : topicResults.partitionResult()) { + map.put(new TopicPartition(topicResults.topic(), partitionResult.partitionId()), + new ApiError(Errors.forCode(partitionResult.errorCode()), + partitionResult.errorMessage())); + } + } + return map; + } + + private final ElectPreferredLeadersRequestData data; + private final short version; + + private ElectPreferredLeadersRequest(ElectPreferredLeadersRequestData data, short version) { + super(ApiKeys.ELECT_PREFERRED_LEADERS, version); + this.data = data; + this.version = version; + } + + public ElectPreferredLeadersRequest(Struct struct, short version) { + super(ApiKeys.ELECT_PREFERRED_LEADERS, version); + this.data = new ElectPreferredLeadersRequestData(struct, version); + this.version = version; + } + + public ElectPreferredLeadersRequestData data() { + return data; + } + + @Override + public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { + ElectPreferredLeadersResponseData response = new ElectPreferredLeadersResponseData(); + response.setThrottleTimeMs(throttleTimeMs); + ApiError apiError = ApiError.fromThrowable(e); + for (TopicPartitions topic : data.topicPartitions()) { + ReplicaElectionResult electionResult = new ReplicaElectionResult().setTopic(topic.topic()); + for (Integer partitionId : topic.partitionId()) { + electionResult.partitionResult().add(new ElectPreferredLeadersResponseData.PartitionResult() + .setPartitionId(partitionId) + .setErrorCode(apiError.error().code()) + .setErrorMessage(apiError.message())); + } + response.replicaElectionResults().add(electionResult); + } + return new ElectPreferredLeadersResponse(response); + } + + public static ElectPreferredLeadersRequest parse(ByteBuffer buffer, short version) { + return new ElectPreferredLeadersRequest(ApiKeys.ELECT_PREFERRED_LEADERS.parseRequest(version, buffer), version); + } + + /** + * Visible for testing. + */ + @Override + public Struct toStruct() { + return data.toStruct(version); + } +} \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ElectPreferredLeadersResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ElectPreferredLeadersResponse.java new file mode 100644 index 00000000000..d19a51d4f68 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/ElectPreferredLeadersResponse.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.message.ElectPreferredLeadersResponseData; +import org.apache.kafka.common.message.ElectPreferredLeadersResponseData.PartitionResult; +import org.apache.kafka.common.message.ElectPreferredLeadersResponseData.ReplicaElectionResult; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.types.Struct; + +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; + +public class ElectPreferredLeadersResponse extends AbstractResponse { + + private final ElectPreferredLeadersResponseData data; + + public ElectPreferredLeadersResponse(ElectPreferredLeadersResponseData data) { + this.data = data; + } + + public ElectPreferredLeadersResponse(Struct struct, short version) { + this.data = new ElectPreferredLeadersResponseData(struct, version); + } + + public ElectPreferredLeadersResponseData data() { + return data; + } + + @Override + protected Struct toStruct(short version) { + return data.toStruct(version); + } + + @Override + public int throttleTimeMs() { + return data.throttleTimeMs(); + } + + @Override + public Map errorCounts() { + HashMap counts = new HashMap<>(); + for (ReplicaElectionResult result : data.replicaElectionResults()) { + for (PartitionResult partitionResult : result.partitionResult()) { + Errors error = Errors.forCode(partitionResult.errorCode()); + counts.put(error, counts.getOrDefault(error, 0) + 1); + } + } + return counts; + } + + public static ElectPreferredLeadersResponse parse(ByteBuffer buffer, short version) { + return new ElectPreferredLeadersResponse( + ApiKeys.ELECT_PREFERRED_LEADERS.responseSchema(version).read(buffer), version); + } + + @Override + public boolean shouldClientThrottle(short version) { + return version >= 3; + } +} \ No newline at end of file diff --git a/clients/src/main/resources/common/message/ElectPreferredLeadersRequest.json b/clients/src/main/resources/common/message/ElectPreferredLeadersRequest.json new file mode 100644 index 00000000000..f566cdf3f64 --- /dev/null +++ b/clients/src/main/resources/common/message/ElectPreferredLeadersRequest.json @@ -0,0 +1,33 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 43, + "type": "request", + "name": "ElectPreferredLeadersRequest", + "validVersions": "0", + "fields": [ + { "name": "TopicPartitions", "type": "[]TopicPartitions", "versions": "0+", "nullableVersions": "0+", + "about": "The topic partitions to elect the preferred leader of.", + "fields": [ + { "name": "Topic", "type": "string", "versions": "0+", + "about": "The name of a topic." }, + { "name": "PartitionId", "type": "[]int32", "versions": "0+", + "about": "The partitions of this topic whose preferred leader should be elected" } + ]}, + { "name": "TimeoutMs", "type": "int32", "versions": "0+", "default": "60000", + "about": "The time in ms to wait for the election to complete." } + ] +} \ No newline at end of file diff --git a/clients/src/main/resources/common/message/ElectPreferredLeadersResponse.json b/clients/src/main/resources/common/message/ElectPreferredLeadersResponse.json new file mode 100644 index 00000000000..f34599cf03f --- /dev/null +++ b/clients/src/main/resources/common/message/ElectPreferredLeadersResponse.json @@ -0,0 +1,39 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 43, + "type": "response", + "name": "ElectPreferredLeadersResponse", + "validVersions": "0", + "fields": [ + { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", + "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, + { "name": "ReplicaElectionResults", "type": "[]ReplicaElectionResult", "versions": "0+", + "about": "The error code, or 0 if there was no error.", "fields": [ + { "name": "Topic", "type": "string", "versions": "0+", + "about": "The topic name" }, + { "name": "PartitionResult", "type": "[]PartitionResult", "versions": "0+", + "about": "The results for each partition", "fields": [ + { "name": "PartitionId", "type": "int32", "versions": "0+", + "about": "The partition id" }, + { "name": "ErrorCode", "type": "int16", "versions": "0+", + "about": "The result error, or zero if there was no error."}, + { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", + "about": "The result message, or null if there was no error."} + ]} + ]} + ] +} \ No newline at end of file diff --git a/clients/src/main/resources/common/message/README.md b/clients/src/main/resources/common/message/README.md index 5648f37812d..482b1dd29f8 100644 --- a/clients/src/main/resources/common/message/README.md +++ b/clients/src/main/resources/common/message/README.md @@ -187,7 +187,7 @@ One very common pattern in Kafka is to load array elements from a message into a Map or Set for easier access. The message protocol makes this easier with the "mapKey" concept. -If some of the elemements of an array are annotated with "mapKey": true, the +If some of the elements of an array are annotated with "mapKey": true, the entire array will be treated as a linked hash set rather than a list. Elements in this set will be accessible in O(1) time with an automatically generated "find" function. The order of elements in the set will still be preserved, diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index 82c5b1d81da..12b076d08bc 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -38,6 +38,7 @@ import org.apache.kafka.common.acl.AclOperation; import org.apache.kafka.common.acl.AclPermissionType; import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.errors.AuthenticationException; +import org.apache.kafka.common.errors.ClusterAuthorizationException; import org.apache.kafka.common.errors.GroupAuthorizationException; import org.apache.kafka.common.errors.InvalidRequestException; import org.apache.kafka.common.errors.InvalidTopicException; @@ -50,6 +51,9 @@ import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.errors.TopicDeletionDisabledException; import org.apache.kafka.common.errors.UnknownServerException; import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; +import org.apache.kafka.common.message.ElectPreferredLeadersResponseData; +import org.apache.kafka.common.message.ElectPreferredLeadersResponseData.PartitionResult; +import org.apache.kafka.common.message.ElectPreferredLeadersResponseData.ReplicaElectionResult; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.ApiError; import org.apache.kafka.common.requests.CreateAclsResponse; @@ -67,6 +71,7 @@ import org.apache.kafka.common.requests.DeleteTopicsResponse; import org.apache.kafka.common.requests.DescribeAclsResponse; import org.apache.kafka.common.requests.DescribeConfigsResponse; import org.apache.kafka.common.requests.DescribeGroupsResponse; +import org.apache.kafka.common.requests.ElectPreferredLeadersResponse; import org.apache.kafka.common.requests.FindCoordinatorResponse; import org.apache.kafka.common.requests.ListGroupsResponse; import org.apache.kafka.common.requests.MetadataRequest; @@ -634,6 +639,55 @@ public class KafkaAdminClientTest { } } + @Test + public void testElectPreferredLeaders() throws Exception { + TopicPartition topic1 = new TopicPartition("topic", 0); + TopicPartition topic2 = new TopicPartition("topic", 2); + try (AdminClientUnitTestEnv env = mockClientEnv()) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + + // Test a call where one partition has an error. + ApiError value = ApiError.fromThrowable(new ClusterAuthorizationException(null)); + ElectPreferredLeadersResponseData responseData = new ElectPreferredLeadersResponseData(); + ReplicaElectionResult r = new ReplicaElectionResult().setTopic(topic1.topic()); + r.partitionResult().add(new PartitionResult() + .setPartitionId(topic1.partition()) + .setErrorCode(ApiError.NONE.error().code()) + .setErrorMessage(ApiError.NONE.message())); + r.partitionResult().add(new PartitionResult() + .setPartitionId(topic2.partition()) + .setErrorCode(value.error().code()) + .setErrorMessage(value.message())); + responseData.replicaElectionResults().add(r); + env.kafkaClient().prepareResponse(new ElectPreferredLeadersResponse(responseData)); + ElectPreferredLeadersResult results = env.adminClient().electPreferredLeaders(asList(topic1, topic2)); + results.partitionResult(topic1).get(); + TestUtils.assertFutureError(results.partitionResult(topic2), ClusterAuthorizationException.class); + TestUtils.assertFutureError(results.all(), ClusterAuthorizationException.class); + + // Test a call where there are no errors. + r.partitionResult().clear(); + r.partitionResult().add(new PartitionResult() + .setPartitionId(topic1.partition()) + .setErrorCode(ApiError.NONE.error().code()) + .setErrorMessage(ApiError.NONE.message())); + r.partitionResult().add(new PartitionResult() + .setPartitionId(topic2.partition()) + .setErrorCode(ApiError.NONE.error().code()) + .setErrorMessage(ApiError.NONE.message())); + env.kafkaClient().prepareResponse(new ElectPreferredLeadersResponse(responseData)); + + results = env.adminClient().electPreferredLeaders(asList(topic1, topic2)); + results.partitionResult(topic1).get(); + results.partitionResult(topic2).get(); + + // Now try a timeout + results = env.adminClient().electPreferredLeaders(asList(topic1, topic2), new ElectPreferredLeadersOptions().timeoutMs(100)); + TestUtils.assertFutureError(results.partitionResult(topic1), TimeoutException.class); + TestUtils.assertFutureError(results.partitionResult(topic2), TimeoutException.class); + } + } + /** * Test handling timeouts. */ diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java index aa2e6835137..d721245be85 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java @@ -327,6 +327,10 @@ public class MockAdminClient extends AdminClient { throw new UnsupportedOperationException("Not implemented yet"); } + public ElectPreferredLeadersResult electPreferredLeaders(Collection partitions, ElectPreferredLeadersOptions options) { + throw new UnsupportedOperationException("Not implemented yet"); + } + @Override public CreateAclsResult createAcls(Collection acls, CreateAclsOptions options) { throw new UnsupportedOperationException("Not implemented yet"); diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestContextTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestContextTest.java index ed50b93bc4a..857869f316e 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestContextTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestContextTest.java @@ -68,7 +68,8 @@ public class RequestContextTest { assertEquals(correlationId, responseHeader.correlationId()); Struct struct = ApiKeys.API_VERSIONS.parseResponse((short) 0, responseBuffer); - ApiVersionsResponse response = (ApiVersionsResponse) AbstractResponse.parseResponse(ApiKeys.API_VERSIONS, struct); + ApiVersionsResponse response = (ApiVersionsResponse) + AbstractResponse.parseResponse(ApiKeys.API_VERSIONS, struct, (short) 0); assertEquals(Errors.UNSUPPORTED_VERSION, response.error()); assertTrue(response.apiVersions().isEmpty()); } diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index aa980311fc9..2892bb67c62 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -32,6 +32,11 @@ import org.apache.kafka.common.errors.NotEnoughReplicasException; import org.apache.kafka.common.errors.SecurityDisabledException; import org.apache.kafka.common.errors.UnknownServerException; import org.apache.kafka.common.errors.UnsupportedVersionException; +import org.apache.kafka.common.message.ElectPreferredLeadersRequestData; +import org.apache.kafka.common.message.ElectPreferredLeadersRequestData.TopicPartitions; +import org.apache.kafka.common.message.ElectPreferredLeadersResponseData; +import org.apache.kafka.common.message.ElectPreferredLeadersResponseData.PartitionResult; +import org.apache.kafka.common.message.ElectPreferredLeadersResponseData.ReplicaElectionResult; import org.apache.kafka.common.network.ListenerName; import org.apache.kafka.common.network.Send; import org.apache.kafka.common.protocol.ApiKeys; @@ -304,6 +309,10 @@ public class RequestResponseTest { checkRequest(createRenewTokenRequest()); checkErrorResponse(createRenewTokenRequest(), new UnknownServerException()); checkResponse(createRenewTokenResponse(), 0); + checkRequest(createElectPreferredLeadersRequest()); + checkRequest(createElectPreferredLeadersRequestNullPartitions()); + checkErrorResponse(createElectPreferredLeadersRequest(), new UnknownServerException()); + checkResponse(createElectPreferredLeadersResponse(), 0); } @Test @@ -460,7 +469,7 @@ public class RequestResponseTest { Struct deserializedStruct = ApiKeys.PRODUCE.parseResponse(version, buffer); ProduceResponse v5FromBytes = (ProduceResponse) AbstractResponse.parseResponse(ApiKeys.PRODUCE, - deserializedStruct); + deserializedStruct, version); assertEquals(1, v5FromBytes.responses().size()); assertTrue(v5FromBytes.responses().containsKey(tp0)); @@ -1322,4 +1331,33 @@ public class RequestResponseTest { return new DescribeDelegationTokenResponse(20, Errors.NONE, tokenList); } + + private ElectPreferredLeadersRequest createElectPreferredLeadersRequestNullPartitions() { + return new ElectPreferredLeadersRequest.Builder( + new ElectPreferredLeadersRequestData() + .setTimeoutMs(100) + .setTopicPartitions(null)) + .build((short) 0); + } + + private ElectPreferredLeadersRequest createElectPreferredLeadersRequest() { + ElectPreferredLeadersRequestData data = new ElectPreferredLeadersRequestData() + .setTimeoutMs(100); + data.topicPartitions().add(new TopicPartitions().setTopic("data").setPartitionId(asList(1, 2))); + return new ElectPreferredLeadersRequest.Builder(data).build((short) 0); + } + + private ElectPreferredLeadersResponse createElectPreferredLeadersResponse() { + ElectPreferredLeadersResponseData data = new ElectPreferredLeadersResponseData().setThrottleTimeMs(200); + ReplicaElectionResult resultsByTopic = new ReplicaElectionResult().setTopic("myTopic"); + resultsByTopic.partitionResult().add(new PartitionResult().setPartitionId(0) + .setErrorCode(Errors.NONE.code()) + .setErrorMessage(Errors.NONE.message())); + resultsByTopic.partitionResult().add(new PartitionResult().setPartitionId(1) + .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) + .setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message())); + data.replicaElectionResults().add(resultsByTopic); + return new ElectPreferredLeadersResponse(data); + } + } diff --git a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala index 7bfecde335d..8740ed45b89 100755 --- a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala +++ b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala @@ -16,12 +16,20 @@ */ package kafka.admin +import java.util.Properties +import java.util.concurrent.ExecutionException + +import joptsimple.OptionSpecBuilder import kafka.common.AdminCommandFailedException import kafka.utils._ import kafka.zk.KafkaZkClient -import org.apache.kafka.common.TopicPartition -import org.apache.kafka.common.security.JaasUtils +import org.apache.kafka.clients.admin.AdminClientConfig +import org.apache.kafka.common.errors.TimeoutException + +import collection.JavaConverters._ import org.apache.kafka.common.utils.{Time, Utils} +import org.apache.kafka.common.security.JaasUtils +import org.apache.kafka.common.{KafkaFuture, TopicPartition} import org.apache.zookeeper.KeeperException.NodeExistsException import collection._ @@ -29,33 +37,47 @@ import collection._ object PreferredReplicaLeaderElectionCommand extends Logging { def main(args: Array[String]): Unit = { + + val timeout = 30000 + run(args, timeout) + } + def run(args: Array[String], timeout: Int = 30000): Unit = { val commandOpts = new PreferredReplicaLeaderElectionCommandOptions(args) CommandLineUtils.printHelpAndExitIfNeeded(commandOpts, "This tool helps to causes leadership for each partition to be transferred back to the 'preferred replica'," + " it can be used to balance leadership among the servers.") - CommandLineUtils.checkRequiredArgs(commandOpts.parser, commandOpts.options, commandOpts.zkConnectOpt) + CommandLineUtils.checkRequiredArgs(commandOpts.parser, commandOpts.options) - val zkConnect = commandOpts.options.valueOf(commandOpts.zkConnectOpt) - var zkClient: KafkaZkClient = null - try { - val time = Time.SYSTEM - zkClient = KafkaZkClient(zkConnect, JaasUtils.isZkSecurityEnabled, 30000, 30000, Int.MaxValue, time) + if (commandOpts.options.has(commandOpts.bootstrapServerOpt) == commandOpts.options.has(commandOpts.zkConnectOpt)) { + CommandLineUtils.printUsageAndDie(commandOpts.parser, s"Exactly one of '${commandOpts.bootstrapServerOpt}' or '${commandOpts.zkConnectOpt}' must be provided") + } - val partitionsForPreferredReplicaElection = - if (!commandOpts.options.has(commandOpts.jsonFileOpt)) - zkClient.getAllPartitions() + val partitionsForPreferredReplicaElection = + if (commandOpts.options.has(commandOpts.jsonFileOpt)) + Some(parsePreferredReplicaElectionData(Utils.readFileAsString(commandOpts.options.valueOf(commandOpts.jsonFileOpt)))) + else + None + + val preferredReplicaElectionCommand = if (commandOpts.options.has(commandOpts.zkConnectOpt)) { + println(s"Warning: --zookeeper is deprecated and will be removed in a future version of Kafka.") + println(s"Use --bootstrap-server instead to specify a broker to connect to.") + new ZkCommand(commandOpts.options.valueOf(commandOpts.zkConnectOpt), + JaasUtils.isZkSecurityEnabled, + timeout) + } else { + val adminProps = if (commandOpts.options.has(commandOpts.adminClientConfigOpt)) + Utils.loadProps(commandOpts.options.valueOf(commandOpts.adminClientConfigOpt)) else - parsePreferredReplicaElectionData(Utils.readFileAsString(commandOpts.options.valueOf(commandOpts.jsonFileOpt))) - val preferredReplicaElectionCommand = new PreferredReplicaLeaderElectionCommand(zkClient, partitionsForPreferredReplicaElection) + new Properties() + adminProps.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, commandOpts.options.valueOf(commandOpts.bootstrapServerOpt)) + adminProps.setProperty(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, timeout.toString) + new AdminClientCommand(adminProps) + } - preferredReplicaElectionCommand.moveLeaderToPreferredReplica() - } catch { - case e: Throwable => - println("Failed to start preferred replica election") - println(Utils.stackTrace(e)) + try { + preferredReplicaElectionCommand.electPreferredLeaders(partitionsForPreferredReplicaElection) } finally { - if (zkClient != null) - zkClient.close() + preferredReplicaElectionCommand.close() } } @@ -101,14 +123,165 @@ object PreferredReplicaLeaderElectionCommand extends Logging { .withRequiredArg .describedAs("list of partitions for which preferred replica leader election needs to be triggered") .ofType(classOf[String]) - val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the " + - "form host:port. Multiple URLS can be given to allow fail-over.") + + private val zookeeperOptBuilder: OptionSpecBuilder = parser.accepts("zookeeper", + "DEPRECATED. The connection string for the zookeeper connection in the " + + "form host:port. Multiple URLS can be given to allow fail-over. " + + "Replaced by --bootstrap-server, REQUIRED unless --bootstrap-server is given.") + private val bootstrapOptBuilder: OptionSpecBuilder = parser.accepts("bootstrap-server", + "A hostname and port for the broker to connect to, " + + "in the form host:port. Multiple comma-separated URLs can be given. REQUIRED unless --zookeeper is given.") + parser.mutuallyExclusive(zookeeperOptBuilder, bootstrapOptBuilder) + val bootstrapServerOpt = bootstrapOptBuilder + .withRequiredArg + .describedAs("host:port") + .ofType(classOf[String]) + val zkConnectOpt = zookeeperOptBuilder .withRequiredArg .describedAs("urls") .ofType(classOf[String]) + val adminClientConfigOpt = parser.accepts("admin.config", + "Admin client config properties file to pass to the admin client when --bootstrap-server is given.") + .availableIf(bootstrapServerOpt) + .withRequiredArg + .describedAs("config file") + .ofType(classOf[String]) + + parser.accepts("") options = parser.parse(args: _*) } + + /** Abstraction over different ways to perform a leader election */ + trait Command { + /** Elect the preferred leader for the given {@code partitionsForElection}. + * If the given {@code partitionsForElection} are None then elect the preferred leader for all partitions. + */ + def electPreferredLeaders(partitionsForElection: Option[Set[TopicPartition]]) : Unit + def close() : Unit + } + + class ZkCommand(zkConnect: String, isSecure: Boolean, timeout: Int) + extends Command { + var zkClient: KafkaZkClient = null + + val time = Time.SYSTEM + zkClient = KafkaZkClient(zkConnect, isSecure, timeout, timeout, Int.MaxValue, time) + + override def electPreferredLeaders(partitionsFromUser: Option[Set[TopicPartition]]) { + try { + val topics = + partitionsFromUser match { + case Some(partitions) => + partitions.map(_.topic).toSet + case None => + zkClient.getAllPartitions().map(_.topic) + } + + val partitionsFromZk = zkClient.getPartitionsForTopics(topics).flatMap{ case (topic, partitions) => + partitions.map(new TopicPartition(topic, _)) + }.toSet + + val (validPartitions, invalidPartitions) = + partitionsFromUser match { + case Some(partitions) => + partitions.partition(partitionsFromZk.contains) + case None => + (zkClient.getAllPartitions(), Set.empty) + } + PreferredReplicaLeaderElectionCommand.writePreferredReplicaElectionData(zkClient, validPartitions) + + println("Successfully started preferred replica election for partitions %s".format(validPartitions)) + invalidPartitions.foreach(p => println("Skipping preferred replica leader election for partition %s since it doesn't exist.".format(p))) + } catch { + case e: Throwable => throw new AdminCommandFailedException("Admin command failed", e) + } + } + + override def close(): Unit = { + if (zkClient != null) + zkClient.close() + } + } + + /** Election via AdminClient.electPreferredLeaders() */ + class AdminClientCommand(adminClientProps: Properties) + extends Command with Logging { + + val adminClient = org.apache.kafka.clients.admin.AdminClient.create(adminClientProps) + + /** + * Wait until the given future has completed, then return whether it completed exceptionally. + * Because KafkaFuture.isCompletedExceptionally doesn't wait for a result + */ + private def completedExceptionally[T](future: KafkaFuture[T]): Boolean = { + try { + future.get() + false + } catch { + case (_: Throwable) => + true + } + } + + override def electPreferredLeaders(partitionsFromUser: Option[Set[TopicPartition]]): Unit = { + val partitions = partitionsFromUser match { + case Some(partitionsFromUser) => partitionsFromUser.asJava + case None => null + } + debug(s"Calling AdminClient.electPreferredLeaders($partitions)") + val result = adminClient.electPreferredLeaders(partitions) + // wait for all results + + val attemptedPartitions = partitionsFromUser match { + case Some(partitionsFromUser) => partitions.asScala + case None => try { + result.partitions().get.asScala + } catch { + case e: ExecutionException => + val cause = e.getCause + if (cause.isInstanceOf[TimeoutException]) { + // We timed out, or don't even know the attempted partitions + println("Timeout waiting for election results") + } + throw new AdminCommandFailedException(null, cause) + case e: Throwable => + // We don't even know the attempted partitions + println("Error while making request") + e.printStackTrace() + return + } + } + + val (exceptional, ok) = attemptedPartitions.map(tp => tp -> result.partitionResult(tp)). + partition { case (_, partitionResult) => completedExceptionally(partitionResult) } + + if (!ok.isEmpty) { + println(s"Successfully completed preferred replica election for partitions ${ok.map{ case (tp, future) => tp }.mkString(", ")}") + } + if (!exceptional.isEmpty) { + val adminException = new AdminCommandFailedException( + s"${exceptional.size} preferred replica(s) could not be elected") + for ((partition, void) <- exceptional) { + val exception = try { + void.get() + new AdminCommandFailedException("Exceptional future with no exception") + } catch { + case e: ExecutionException => e.getCause + } + println(s"Error completing preferred replica election for partition $partition: $exception") + adminException.addSuppressed(exception) + } + throw adminException + } + + } + + override def close(): Unit = { + debug("Closing AdminClient") + adminClient.close() + } + } } class PreferredReplicaLeaderElectionCommand(zkClient: KafkaZkClient, partitionsFromUser: scala.collection.Set[TopicPartition]) { diff --git a/core/src/main/scala/kafka/controller/ControllerEventManager.scala b/core/src/main/scala/kafka/controller/ControllerEventManager.scala index c93e9e79ec2..54e3a9e126b 100644 --- a/core/src/main/scala/kafka/controller/ControllerEventManager.scala +++ b/core/src/main/scala/kafka/controller/ControllerEventManager.scala @@ -28,6 +28,7 @@ import org.apache.kafka.common.errors.ControllerMovedException import org.apache.kafka.common.utils.Time import scala.collection._ +import scala.collection.JavaConverters._ object ControllerEventManager { val ControllerEventThreadName = "controller-event-thread" @@ -69,6 +70,10 @@ class ControllerEventManager(controllerId: Int, rateAndTimeMetrics: Map[Controll } def clearAndPut(event: ControllerEvent): Unit = inLock(putLock) { + queue.asScala.foreach(evt => + if (evt.isInstanceOf[PreemptableControllerEvent]) + evt.asInstanceOf[PreemptableControllerEvent].preempt() + ) queue.clear() put(event) } diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index c8cf446edcd..ea23beb4192 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -16,6 +16,7 @@ */ package kafka.controller +import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.{CountDownLatch, TimeUnit} import com.yammer.metrics.core.Gauge @@ -32,13 +33,13 @@ import org.apache.kafka.common.{KafkaException, TopicPartition} import org.apache.kafka.common.errors.{BrokerNotAvailableException, ControllerMovedException, StaleBrokerEpochException} import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.protocol.{ApiKeys, Errors} -import org.apache.kafka.common.requests.{AbstractControlRequest, AbstractResponse, LeaderAndIsrResponse, StopReplicaResponse} +import org.apache.kafka.common.requests.{AbstractControlRequest, AbstractResponse, ApiError, LeaderAndIsrResponse, StopReplicaResponse} import org.apache.kafka.common.utils.Time import org.apache.zookeeper.KeeperException import org.apache.zookeeper.KeeperException.Code import scala.collection._ -import scala.util.Try +import scala.util.{Failure, Try} object KafkaController extends Logging { val InitialControllerEpoch = 0 @@ -268,7 +269,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti maybeTriggerPartitionReassignment(controllerContext.partitionsBeingReassigned.keySet) topicDeletionManager.tryTopicDeletion() val pendingPreferredReplicaElections = fetchPendingPreferredReplicaElections() - onPreferredReplicaElection(pendingPreferredReplicaElections) + onPreferredReplicaElection(pendingPreferredReplicaElections, ZkTriggered) info("Starting the controller scheduler") kafkaScheduler.startup() if (config.autoLeaderRebalanceEnable) { @@ -586,7 +587,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti val partitionsToBeRemovedFromReassignment = scala.collection.mutable.Set.empty[TopicPartition] topicPartitions.foreach { tp => if (topicDeletionManager.isTopicQueuedUpForDeletion(tp.topic)) { - error(s"Skipping reassignment of $tp since the topic is currently being deleted") + info(s"Skipping reassignment of $tp since the topic is currently being deleted") partitionsToBeRemovedFromReassignment.add(tp) } else { val reassignedPartitionContext = controllerContext.partitionsBeingReassigned.get(tp).getOrElse { @@ -628,17 +629,38 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti removePartitionsFromReassignedPartitions(partitionsToBeRemovedFromReassignment) } - private def onPreferredReplicaElection(partitions: Set[TopicPartition], isTriggeredByAutoRebalance: Boolean = false) { + sealed trait ElectionType + object AutoTriggered extends ElectionType + object ZkTriggered extends ElectionType + object AdminClientTriggered extends ElectionType + + /** + * Attempt to elect the preferred replica as leader for each of the given partitions. + * @param partitions The partitions to have their preferred leader elected + * @param electionType The election type + * @return A map of failed elections where keys are partitions which had an error and the corresponding value is + * the exception that was thrown. + */ + private def onPreferredReplicaElection(partitions: Set[TopicPartition], + electionType: ElectionType): Map[TopicPartition, Throwable] = { info(s"Starting preferred replica leader election for partitions ${partitions.mkString(",")}") try { - partitionStateMachine.handleStateChanges(partitions.toSeq, OnlinePartition, Option(PreferredReplicaPartitionLeaderElectionStrategy)) - } catch { - case e: ControllerMovedException => - error(s"Error completing preferred replica leader election for partitions ${partitions.mkString(",")} because controller has moved to another broker.", e) - throw e - case e: Throwable => error(s"Error completing preferred replica leader election for partitions ${partitions.mkString(",")}", e) + val results = partitionStateMachine.handleStateChanges(partitions.toSeq, OnlinePartition, + Option(PreferredReplicaPartitionLeaderElectionStrategy)) + if (electionType != AdminClientTriggered) { + results.foreach { case (tp, throwable) => + if (throwable.isInstanceOf[ControllerMovedException]) { + error(s"Error completing preferred replica leader election for partition $tp because controller has moved to another broker.", throwable) + throw throwable + } else { + error(s"Error completing preferred replica leader election for partition $tp", throwable) + } + } + } + return results; } finally { - removePartitionsFromPreferredReplicaElection(partitions, isTriggeredByAutoRebalance) + if (electionType != AdminClientTriggered) + removePartitionsFromPreferredReplicaElection(partitions, electionType == AutoTriggered) } } @@ -884,7 +906,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti if (!isTriggeredByAutoRebalance) { zkClient.deletePreferredReplicaElection(controllerContext.epochZkVersion) // Ensure we detect future preferred replica leader elections - eventManager.put(PreferredReplicaLeaderElection) + eventManager.put(PreferredReplicaLeaderElection(None)) } } @@ -983,7 +1005,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti controllerContext.partitionsBeingReassigned.isEmpty && !topicDeletionManager.isTopicQueuedUpForDeletion(tp.topic) && controllerContext.allTopics.contains(tp.topic)) - onPreferredReplicaElection(candidatePartitions.toSet, isTriggeredByAutoRebalance = true) + onPreferredReplicaElection(candidatePartitions.toSet, AutoTriggered) } } } @@ -1022,11 +1044,15 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti } } - case class ControlledShutdown(id: Int, brokerEpoch: Long, controlledShutdownCallback: Try[Set[TopicPartition]] => Unit) extends ControllerEvent { + case class ControlledShutdown(id: Int, brokerEpoch: Long, controlledShutdownCallback: Try[Set[TopicPartition]] => Unit) extends PreemptableControllerEvent { def state = ControllerState.ControlledShutdown - override def process(): Unit = { + override def handlePreempt(): Unit = { + controlledShutdownCallback(Failure(new ControllerMovedException("Controller moved to another broker"))) + } + + override def handleProcess(): Unit = { val controlledShutdownResult = Try { doControlledShutdown(id) } controlledShutdownCallback(controlledShutdownResult) } @@ -1517,22 +1543,75 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti } } - case object PreferredReplicaLeaderElection extends ControllerEvent { + type ElectPreferredLeadersCallback = (Map[TopicPartition, Int], Map[TopicPartition, ApiError])=>Unit + + def electPreferredLeaders(partitions: Set[TopicPartition], callback: ElectPreferredLeadersCallback = { (_,_) => }): Unit = + eventManager.put(PreferredReplicaLeaderElection(Some(partitions), AdminClientTriggered, callback)) + + case class PreferredReplicaLeaderElection(partitionsFromAdminClientOpt: Option[Set[TopicPartition]], + electionType: ElectionType = ZkTriggered, + callback: ElectPreferredLeadersCallback = (_,_) =>{}) extends PreemptableControllerEvent { override def state: ControllerState = ControllerState.ManualLeaderBalance - override def process(): Unit = { - if (!isActive) return + override def handlePreempt(): Unit = { + callback(Map.empty, partitionsFromAdminClientOpt match { + case Some(partitions) => partitions.map(partition => partition -> new ApiError(Errors.NOT_CONTROLLER, null)).toMap + case None => Map.empty + }) + } - // We need to register the watcher if the path doesn't exist in order to detect future preferred replica - // leader elections and we get the `path exists` check for free - if (zkClient.registerZNodeChangeHandlerAndCheckExistence(preferredReplicaElectionHandler)) { - val partitions = zkClient.getPreferredReplicaElection - val partitionsForTopicsToBeDeleted = partitions.filter(p => topicDeletionManager.isTopicQueuedUpForDeletion(p.topic)) - if (partitionsForTopicsToBeDeleted.nonEmpty) { - error(s"Skipping preferred replica election for partitions $partitionsForTopicsToBeDeleted since the " + - "respective topics are being deleted") + override def handleProcess(): Unit = { + if (!isActive) { + callback(Map.empty, partitionsFromAdminClientOpt match { + case Some(partitions) => partitions.map(partition => partition -> new ApiError(Errors.NOT_CONTROLLER, null)).toMap + case None => Map.empty + }) + } else { + // We need to register the watcher if the path doesn't exist in order to detect future preferred replica + // leader elections and we get the `path exists` check for free + if (electionType == AdminClientTriggered || zkClient.registerZNodeChangeHandlerAndCheckExistence(preferredReplicaElectionHandler)) { + val partitions = partitionsFromAdminClientOpt match { + case Some(partitions) => partitions + case None => zkClient.getPreferredReplicaElection + } + + val (validPartitions, invalidPartitions) = partitions.partition(tp => controllerContext.allPartitions.contains(tp)) + invalidPartitions.foreach { p => + info(s"Skipping preferred replica leader election for partition ${p} since it doesn't exist.") + } + + val (partitionsBeingDeleted, livePartitions) = validPartitions.partition(partition => + topicDeletionManager.isTopicQueuedUpForDeletion(partition.topic)) + if (partitionsBeingDeleted.nonEmpty) { + warn(s"Skipping preferred replica election for partitions $partitionsBeingDeleted " + + s"since the respective topics are being deleted") + } + // partition those where preferred is already leader + val (electablePartitions, alreadyPreferred) = livePartitions.partition { partition => + val assignedReplicas = controllerContext.partitionReplicaAssignment(partition) + val preferredReplica = assignedReplicas.head + val currentLeader = controllerContext.partitionLeadershipInfo(partition).leaderAndIsr.leader + currentLeader != preferredReplica + } + + val electionErrors = onPreferredReplicaElection(electablePartitions, electionType) + val successfulPartitions = electablePartitions -- electionErrors.keySet + val results = electionErrors.map { case (partition, ex) => + val apiError = if (ex.isInstanceOf[StateChangeFailedException]) + new ApiError(Errors.PREFERRED_LEADER_NOT_AVAILABLE, ex.getMessage) + else + ApiError.fromThrowable(ex) + partition -> apiError + } ++ + alreadyPreferred.map(_ -> ApiError.NONE) ++ + partitionsBeingDeleted.map(_ -> new ApiError(Errors.INVALID_TOPIC_EXCEPTION, "The topic is being deleted")) ++ + invalidPartitions.map ( tp => tp -> new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION, s"The partition does not exist.") + ) + debug(s"PreferredReplicaLeaderElection waiting: $successfulPartitions, results: $results") + callback(successfulPartitions.map( + tp => tp->controllerContext.partitionReplicaAssignment(tp).head).toMap, + results) } - onPreferredReplicaElection(partitions -- partitionsForTopicsToBeDeleted) } } } @@ -1653,7 +1732,7 @@ object IsrChangeNotificationHandler { class PreferredReplicaElectionHandler(controller: KafkaController, eventManager: ControllerEventManager) extends ZNodeChangeHandler { override val path: String = PreferredReplicaElectionZNode.path - override def handleCreation(): Unit = eventManager.put(controller.PreferredReplicaLeaderElection) + override def handleCreation(): Unit = eventManager.put(controller.PreferredReplicaLeaderElection(None)) } class ControllerChangeHandler(controller: KafkaController, eventManager: ControllerEventManager) extends ZNodeChangeHandler { @@ -1712,3 +1791,25 @@ sealed trait ControllerEvent { def state: ControllerState def process(): Unit } + +/** + * A `ControllerEvent`, such as one with a client callback, which needs specific handling in the event of ZK session expiration. + */ +sealed trait PreemptableControllerEvent extends ControllerEvent { + + val spent = new AtomicBoolean(false) + + final def preempt(): Unit = { + if (!spent.getAndSet(true)) + handlePreempt() + } + + final def process(): Unit = { + if (!spent.getAndSet(true)) + handleProcess() + } + + def handlePreempt(): Unit + + def handleProcess(): Unit +} diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala index e4f053264c4..ad739797179 100755 --- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala +++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala @@ -125,22 +125,36 @@ class PartitionStateMachine(config: KafkaConfig, // It is important to trigger leader election for those partitions. } + /** + * Try to change the state of the given partitions to the given targetState, using the given + * partitionLeaderElectionStrategyOpt if a leader election is required. + * @param partitions The partitions + * @param targetState The state + * @param partitionLeaderElectionStrategyOpt The leader election strategy if a leader election is required. + * @return partitions and corresponding throwable for those partitions which could not transition to the given state + */ def handleStateChanges(partitions: Seq[TopicPartition], targetState: PartitionState, - partitionLeaderElectionStrategyOpt: Option[PartitionLeaderElectionStrategy] = None): Unit = { + partitionLeaderElectionStrategyOpt: Option[PartitionLeaderElectionStrategy] = None): Map[TopicPartition, Throwable] = { if (partitions.nonEmpty) { try { controllerBrokerRequestBatch.newBatch() - doHandleStateChanges(partitions, targetState, partitionLeaderElectionStrategyOpt) + val errors = doHandleStateChanges(partitions, targetState, partitionLeaderElectionStrategyOpt) controllerBrokerRequestBatch.sendRequestsToBrokers(controllerContext.epoch) + errors } catch { case e: ControllerMovedException => error(s"Controller moved to another broker when moving some partitions to $targetState state", e) throw e - case e: Throwable => error(s"Error while moving some partitions to $targetState state", e) + case e: Throwable => + error(s"Error while moving some partitions to $targetState state", e) + partitions.map { _ -> e }.toMap } + } else { + Map.empty[TopicPartition, Throwable] } } + def partitionsInState(state: PartitionState): Set[TopicPartition] = { partitionState.filter { case (_, s) => s == state }.keySet.toSet } @@ -183,7 +197,7 @@ class PartitionStateMachine(config: KafkaConfig, * @param targetState The end state that the partition should be moved to */ private def doHandleStateChanges(partitions: Seq[TopicPartition], targetState: PartitionState, - partitionLeaderElectionStrategyOpt: Option[PartitionLeaderElectionStrategy]): Unit = { + partitionLeaderElectionStrategyOpt: Option[PartitionLeaderElectionStrategy]): Map[TopicPartition, Throwable] = { val stateChangeLog = stateChangeLogger.withControllerEpoch(controllerContext.epoch) partitions.foreach(partition => partitionState.getOrElseUpdate(partition, NonExistentPartition)) val (validPartitions, invalidPartitions) = partitions.partition(partition => isValidTransition(partition, targetState)) @@ -195,6 +209,7 @@ class PartitionStateMachine(config: KafkaConfig, s"assigned replicas ${controllerContext.partitionReplicaAssignment(partition).mkString(",")}") changeStateTo(partition, partitionState(partition), NewPartition) } + Map.empty case OnlinePartition => val uninitializedPartitions = validPartitions.filter(partition => partitionState(partition) == NewPartition) val partitionsToElectLeader = validPartitions.filter(partition => partitionState(partition) == OfflinePartition || partitionState(partition) == OnlinePartition) @@ -207,23 +222,28 @@ class PartitionStateMachine(config: KafkaConfig, } } if (partitionsToElectLeader.nonEmpty) { - val successfulElections = electLeaderForPartitions(partitionsToElectLeader, partitionLeaderElectionStrategyOpt.get) + val (successfulElections, failedElections) = electLeaderForPartitions(partitionsToElectLeader, partitionLeaderElectionStrategyOpt.get) successfulElections.foreach { partition => stateChangeLog.trace(s"Changed partition $partition from ${partitionState(partition)} to $targetState with state " + s"${controllerContext.partitionLeadershipInfo(partition).leaderAndIsr}") changeStateTo(partition, partitionState(partition), OnlinePartition) } + failedElections + } else { + Map.empty } case OfflinePartition => validPartitions.foreach { partition => stateChangeLog.trace(s"Changed partition $partition state from ${partitionState(partition)} to $targetState") changeStateTo(partition, partitionState(partition), OfflinePartition) } + Map.empty case NonExistentPartition => validPartitions.foreach { partition => stateChangeLog.trace(s"Changed partition $partition state from ${partitionState(partition)} to $targetState") changeStateTo(partition, partitionState(partition), NonExistentPartition) } + Map.empty } } @@ -283,11 +303,14 @@ class PartitionStateMachine(config: KafkaConfig, * Repeatedly attempt to elect leaders for multiple partitions until there are no more remaining partitions to retry. * @param partitions The partitions that we're trying to elect leaders for. * @param partitionLeaderElectionStrategy The election strategy to use. - * @return The partitions that successfully had a leader elected. + * @return A pair with first element of which is the partitions that successfully had a leader elected + * and the second element a map of failed partition to the corresponding thrown exception. */ - private def electLeaderForPartitions(partitions: Seq[TopicPartition], partitionLeaderElectionStrategy: PartitionLeaderElectionStrategy): Seq[TopicPartition] = { + private def electLeaderForPartitions(partitions: Seq[TopicPartition], + partitionLeaderElectionStrategy: PartitionLeaderElectionStrategy): (Seq[TopicPartition], Map[TopicPartition, Throwable]) = { val successfulElections = mutable.Buffer.empty[TopicPartition] var remaining = partitions + var failures = Map.empty[TopicPartition, Throwable] while (remaining.nonEmpty) { val (success, updatesToRetry, failedElections) = doElectLeaderForPartitions(partitions, partitionLeaderElectionStrategy) remaining = updatesToRetry @@ -295,8 +318,9 @@ class PartitionStateMachine(config: KafkaConfig, failedElections.foreach { case (partition, e) => logFailedStateChange(partition, partitionState(partition), OnlinePartition, e) } + failures ++= failedElections } - successfulElections + (successfulElections, failures) } /** diff --git a/core/src/main/scala/kafka/server/DelayedElectPreferredLeader.scala b/core/src/main/scala/kafka/server/DelayedElectPreferredLeader.scala new file mode 100644 index 00000000000..38b07ad5ce6 --- /dev/null +++ b/core/src/main/scala/kafka/server/DelayedElectPreferredLeader.scala @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests.ApiError + +import scala.collection.{Map, mutable} + +/** A delayed elect preferred leader operation that can be created by the replica manager and watched + * in the elect preferred leader purgatory + */ +class DelayedElectPreferredLeader(delayMs: Long, + expectedLeaders: Map[TopicPartition, Int], + results: Map[TopicPartition, ApiError], + replicaManager: ReplicaManager, + responseCallback: Map[TopicPartition, ApiError] => Unit) + extends DelayedOperation(delayMs) { + + var waitingPartitions = expectedLeaders + val fullResults = results.to[mutable.Set] + + + /** + * Call-back to execute when a delayed operation gets expired and hence forced to complete. + */ + override def onExpiration(): Unit = {} + + /** + * Process for completing an operation; This function needs to be defined + * in subclasses and will be called exactly once in forceComplete() + */ + override def onComplete(): Unit = { + // This could be called to force complete, so I need the full list of partitions, so I can time them all out. + updateWaiting() + val timedout = waitingPartitions.map{ + case (tp, leader) => tp -> new ApiError(Errors.REQUEST_TIMED_OUT, null) + }.toMap + responseCallback(timedout ++ fullResults) + } + + private def timeoutWaiting = { + waitingPartitions.map(partition => partition -> new ApiError(Errors.REQUEST_TIMED_OUT, null)).toMap + } + + /** + * Try to complete the delayed operation by first checking if the operation + * can be completed by now. If yes execute the completion logic by calling + * forceComplete() and return true iff forceComplete returns true; otherwise return false + * + * This function needs to be defined in subclasses + */ + override def tryComplete(): Boolean = { + updateWaiting() + debug(s"tryComplete() waitingPartitions: $waitingPartitions") + waitingPartitions.isEmpty && forceComplete() + } + + private def updateWaiting() = { + waitingPartitions.foreach{case (tp, leader) => + val ps = replicaManager.metadataCache.getPartitionInfo(tp.topic, tp.partition) + ps match { + case Some(ps) => + if (leader == ps.basePartitionState.leader) { + waitingPartitions -= tp + fullResults += tp -> ApiError.NONE + } + case None => + } + } + } + +} diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 67020a8a5de..aab9a384bf3 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -43,6 +43,7 @@ import org.apache.kafka.common.config.ConfigResource import org.apache.kafka.common.errors._ import org.apache.kafka.common.internals.FatalExitError import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, TRANSACTION_STATE_TOPIC_NAME, isInternal} +import org.apache.kafka.common.message.ElectPreferredLeadersResponseData import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.network.{ListenerName, Send} import org.apache.kafka.common.protocol.{ApiKeys, Errors} @@ -146,6 +147,7 @@ class KafkaApis(val requestChannel: RequestChannel, case ApiKeys.EXPIRE_DELEGATION_TOKEN => handleExpireTokenRequest(request) case ApiKeys.DESCRIBE_DELEGATION_TOKEN => handleDescribeTokensRequest(request) case ApiKeys.DELETE_GROUPS => handleDeleteGroupsRequest(request) + case ApiKeys.ELECT_PREFERRED_LEADERS => handleElectPreferredReplicaLeader(request) } } catch { case e: FatalExitError => throw e @@ -253,6 +255,11 @@ class KafkaApis(val requestChannel: RequestChannel, quotas.request.updateQuotaMetricConfigs() } } + if (replicaManager.hasDelayedElectionOperations) { + updateMetadataRequest.partitionStates.asScala.foreach { case (tp, ps) => + replicaManager.tryCompleteElection(new TopicPartitionOperationKey(tp.topic(), tp.partition())) + } + } sendResponseExemptThrottle(request, new UpdateMetadataResponse(Errors.NONE)) } } @@ -2227,6 +2234,48 @@ class KafkaApis(val requestChannel: RequestChannel, true } + def handleElectPreferredReplicaLeader(request: RequestChannel.Request): Unit = { + + val electionRequest = request.body[ElectPreferredLeadersRequest] + val partitions = + if (electionRequest.data().topicPartitions() == null) { + metadataCache.getAllPartitions() + } else { + electionRequest.data().topicPartitions().asScala.flatMap{tp => + tp.partitionId().asScala.map(partitionId => new TopicPartition(tp.topic, partitionId))}.toSet + } + def sendResponseCallback(result: Map[TopicPartition, ApiError]): Unit = { + sendResponseMaybeThrottle(request, requestThrottleMs => { + val results = result. + groupBy{case (tp, error) => tp.topic}. + map{case (topic, ps) => new ElectPreferredLeadersResponseData.ReplicaElectionResult() + .setTopic(topic) + .setPartitionResult(ps.map{ + case (tp, error) => + new ElectPreferredLeadersResponseData.PartitionResult() + .setErrorCode(error.error.code) + .setErrorMessage(error.message()) + .setPartitionId(tp.partition)}.toList.asJava)} + val data = new ElectPreferredLeadersResponseData() + .setThrottleTimeMs(requestThrottleMs) + .setReplicaElectionResults(results.toList.asJava) + new ElectPreferredLeadersResponse(data)}) + } + if (!authorize(request.session, Alter, Resource.ClusterResource)) { + val error = new ApiError(Errors.CLUSTER_AUTHORIZATION_FAILED, null); + val partitionErrors = + if (electionRequest.data().topicPartitions() == null) { + // Don't leak the set of partitions if the client lack authz + Map.empty[TopicPartition, ApiError] + } else { + partitions.map(partition => partition -> error).toMap + } + sendResponseCallback(partitionErrors) + } else { + replicaManager.electPreferredLeaders(controller, partitions, sendResponseCallback, electionRequest.data().timeoutMs()) + } + } + def authorizeClusterAction(request: RequestChannel.Request): Unit = { if (!isAuthorizedClusterAction(request)) throw new ClusterAuthorizationException(s"Request $request is not authorized.") diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala index 3fefc7b84ef..ec5a2b9ed38 100755 --- a/core/src/main/scala/kafka/server/MetadataCache.scala +++ b/core/src/main/scala/kafka/server/MetadataCache.scala @@ -137,6 +137,12 @@ class MetadataCache(brokerId: Int) extends Logging { getAllTopics(metadataSnapshot) } + def getAllPartitions(): Set[TopicPartition] = { + metadataSnapshot.partitionStates.flatMap { case (topicName, partitionsAndStates) => + partitionsAndStates.keys.map(partitionId => new TopicPartition(topicName, partitionId.toInt)) + }.toSet + } + private def getAllTopics(snapshot: MetadataSnapshot): Set[String] = { snapshot.partitionStates.keySet } diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 5dfb2e66dff..5e41e35f879 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -146,6 +146,7 @@ class ReplicaManager(val config: KafkaConfig, val delayedProducePurgatory: DelayedOperationPurgatory[DelayedProduce], val delayedFetchPurgatory: DelayedOperationPurgatory[DelayedFetch], val delayedDeleteRecordsPurgatory: DelayedOperationPurgatory[DelayedDeleteRecords], + val delayedElectPreferredLeaderPurgatory: DelayedOperationPurgatory[DelayedElectPreferredLeader], threadNamePrefix: Option[String]) extends Logging with KafkaMetricsGroup { def this(config: KafkaConfig, @@ -171,6 +172,8 @@ class ReplicaManager(val config: KafkaConfig, DelayedOperationPurgatory[DelayedDeleteRecords]( purgatoryName = "DeleteRecords", brokerId = config.brokerId, purgeInterval = config.deleteRecordsPurgatoryPurgeIntervalRequests), + DelayedOperationPurgatory[DelayedElectPreferredLeader]( + purgatoryName = "ElectPreferredLeader", brokerId = config.brokerId), threadNamePrefix) } @@ -318,6 +321,13 @@ class ReplicaManager(val config: KafkaConfig, debug("Request key %s unblocked %d DeleteRecordsRequest.".format(key.keyLabel, completed)) } + def hasDelayedElectionOperations = delayedElectPreferredLeaderPurgatory.delayed != 0 + + def tryCompleteElection(key: DelayedOperationKey): Unit = { + val completed = delayedElectPreferredLeaderPurgatory.checkAndComplete(key) + debug("Request key %s unblocked %d ElectPreferredLeader.".format(key.keyLabel, completed)) + } + def startup() { // start ISR expiration thread // A follower can lag behind leader for up to config.replicaLagTimeMaxMs x 1.5 before it is removed from ISR @@ -1476,6 +1486,7 @@ class ReplicaManager(val config: KafkaConfig, delayedFetchPurgatory.shutdown() delayedProducePurgatory.shutdown() delayedDeleteRecordsPurgatory.shutdown() + delayedElectPreferredLeaderPurgatory.shutdown() if (checkpointHW) checkpointHighWatermarks() info("Shut down completely") @@ -1508,4 +1519,30 @@ class ReplicaManager(val config: KafkaConfig, tp -> epochEndOffset } } + + def electPreferredLeaders(controller: KafkaController, + partitions: Set[TopicPartition], + responseCallback: Map[TopicPartition, ApiError] => Unit, + requestTimeout: Long): Unit = { + + val deadline = time.milliseconds() + requestTimeout + + def electionCallback(expectedLeaders: Map[TopicPartition, Int], + results: Map[TopicPartition, ApiError]): Unit = { + if (expectedLeaders.nonEmpty) { + val watchKeys = expectedLeaders.map{ + case (tp, leader) => new TopicPartitionOperationKey(tp.topic, tp.partition) + }.toSeq + delayedElectPreferredLeaderPurgatory.tryCompleteElseWatch( + new DelayedElectPreferredLeader(deadline - time.milliseconds(), expectedLeaders, results, + this, responseCallback), + watchKeys) + } else { + // There are no partitions actually being elected, so return immediately + responseCallback(results) + } + } + + controller.electPreferredLeaders(partitions, electionCallback) + } } diff --git a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala index 92d1758b194..5a3278cda62 100644 --- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala @@ -47,13 +47,14 @@ import org.junit.Assert._ import scala.util.Random import scala.collection.JavaConverters._ -import java.lang.{Long => JLong} import kafka.zk.KafkaZkClient import scala.concurrent.duration.Duration import scala.concurrent.{Await, Future} +import java.lang.{Long => JLong} + /** * An integration test of the KafkaAdminClient. * @@ -99,6 +100,8 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { config.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp, s"${listenerName.value}:${securityProtocol.name}") config.setProperty(KafkaConfig.DeleteTopicEnableProp, "true") config.setProperty(KafkaConfig.GroupInitialRebalanceDelayMsProp, "0") + config.setProperty(KafkaConfig.AutoLeaderRebalanceEnableProp, "false") + config.setProperty(KafkaConfig.ControlledShutdownEnableProp, "false") // We set this in order to test that we don't expose sensitive data via describe configs. This will already be // set for subclasses with security enabled and we don't want to overwrite it. if (!config.containsKey(KafkaConfig.SslTruststorePasswordProp)) @@ -1198,6 +1201,204 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { } } + @Test + def testElectPreferredLeaders(): Unit = { + client = AdminClient.create(createConfig) + + val prefer0 = Seq(0, 1, 2) + val prefer1 = Seq(1, 2, 0) + val prefer2 = Seq(2, 0, 1) + + val partition1 = new TopicPartition("elect-preferred-leaders-topic-1", 0) + TestUtils.createTopic(zkClient, partition1.topic, Map[Int, Seq[Int]](partition1.partition -> prefer0), servers) + + val partition2 = new TopicPartition("elect-preferred-leaders-topic-2", 0) + TestUtils.createTopic(zkClient, partition2.topic, Map[Int, Seq[Int]](partition2.partition -> prefer0), servers) + + def currentLeader(topicPartition: TopicPartition) = + client.describeTopics(asList(topicPartition.topic)).values.get(topicPartition.topic). + get.partitions.get(topicPartition.partition).leader.id + + def preferredLeader(topicPartition: TopicPartition) = + client.describeTopics(asList(topicPartition.topic)).values.get(topicPartition.topic). + get.partitions.get(topicPartition.partition).replicas.get(0).id + + def waitForLeaderToBecome(topicPartition: TopicPartition, leader: Int) = + TestUtils.waitUntilTrue(() => currentLeader(topicPartition) == leader, s"Expected leader to become $leader", 10000) + + /** Changes the preferred leader without changing the current leader. */ + def changePreferredLeader(newAssignment: Seq[Int]) = { + val preferred = newAssignment.head + val prior1 = currentLeader(partition1) + val prior2 = currentLeader(partition2) + + var m = Map.empty[TopicPartition, Seq[Int]] + + if (prior1 != preferred) + m += partition1 -> newAssignment + if (prior2 != preferred) + m += partition2 -> newAssignment + + zkClient.createPartitionReassignment(m) + TestUtils.waitUntilTrue( + () => preferredLeader(partition1) == preferred && preferredLeader(partition2) == preferred, + s"Expected preferred leader to become $preferred, but is ${preferredLeader(partition1)} and ${preferredLeader(partition2)}", 10000) + // Check the leader hasn't moved + assertEquals(prior1, currentLeader(partition1)) + assertEquals(prior2, currentLeader(partition2)) + } + + // Check current leaders are 0 + assertEquals(0, currentLeader(partition1)) + assertEquals(0, currentLeader(partition2)) + + // Noop election + var electResult = client.electPreferredLeaders(asList(partition1)) + electResult.partitionResult(partition1).get() + assertEquals(0, currentLeader(partition1)) + + // Noop election with null partitions + electResult = client.electPreferredLeaders(null) + electResult.partitionResult(partition1).get() + assertEquals(0, currentLeader(partition1)) + electResult.partitionResult(partition2).get() + assertEquals(0, currentLeader(partition2)) + + // Now change the preferred leader to 1 + changePreferredLeader(prefer1) + + // meaningful election + electResult = client.electPreferredLeaders(asList(partition1)) + assertEquals(Set(partition1).asJava, electResult.partitions.get) + electResult.partitionResult(partition1).get() + waitForLeaderToBecome(partition1, 1) + + // topic 2 unchanged + try { + electResult.partitionResult(partition2).get() + fail("topic 2 wasn't requested") + } catch { + case e: ExecutionException => + val cause = e.getCause + assertTrue(cause.getClass.getName, cause.isInstanceOf[UnknownTopicOrPartitionException]) + assertEquals("Preferred leader election for partition \"elect-preferred-leaders-topic-2-0\" was not attempted", + cause.getMessage) + assertEquals(0, currentLeader(partition2)) + } + + // meaningful election with null partitions + electResult = client.electPreferredLeaders(null) + assertEquals(Set(partition1, partition2), electResult.partitions.get.asScala.filterNot(_.topic == "__consumer_offsets")) + electResult.partitionResult(partition1).get() + waitForLeaderToBecome(partition1, 1) + electResult.partitionResult(partition2).get() + waitForLeaderToBecome(partition2, 1) + + // unknown topic + val unknownPartition = new TopicPartition("topic-does-not-exist", 0) + electResult = client.electPreferredLeaders(asList(unknownPartition)) + assertEquals(Set(unknownPartition).asJava, electResult.partitions.get) + try { + electResult.partitionResult(unknownPartition).get() + } catch { + case e: Exception => + val cause = e.getCause + assertTrue(cause.isInstanceOf[UnknownTopicOrPartitionException]) + assertEquals("The partition does not exist.", + cause.getMessage) + assertEquals(1, currentLeader(partition1)) + assertEquals(1, currentLeader(partition2)) + } + + // Now change the preferred leader to 2 + changePreferredLeader(prefer2) + + // mixed results + electResult = client.electPreferredLeaders(asList(unknownPartition, partition1)) + assertEquals(Set(unknownPartition, partition1).asJava, electResult.partitions.get) + waitForLeaderToBecome(partition1, 2) + assertEquals(1, currentLeader(partition2)) + try { + electResult.partitionResult(unknownPartition).get() + } catch { + case e: Exception => + val cause = e.getCause + assertTrue(cause.isInstanceOf[UnknownTopicOrPartitionException]) + assertEquals("The partition does not exist.", + cause.getMessage) + } + + // dupe partitions + electResult = client.electPreferredLeaders(asList(partition2, partition2)) + assertEquals(Set(partition2).asJava, electResult.partitions.get) + electResult.partitionResult(partition2).get() + waitForLeaderToBecome(partition2, 2) + + // Now change the preferred leader to 1 + changePreferredLeader(prefer1) + // but shut it down... + servers(1).shutdown() + waitUntilTrue ( + () => { + val description = client.describeTopics(Set (partition1.topic(), partition2.topic()).asJava).all().get() + return !description.asScala.flatMap{ + case (topic, description) => description.partitions().asScala.map( + partition => partition.isr().asScala).flatten + }.exists(node => node.id == 1) + }, + "Expect broker 1 to no longer be in any ISR" + ) + + // ... now what happens if we try to elect the preferred leader and it's down? + val shortTimeout = new ElectPreferredLeadersOptions().timeoutMs(10000) + electResult = client.electPreferredLeaders(asList(partition1), shortTimeout) + assertEquals(Set(partition1).asJava, electResult.partitions.get) + try { + electResult.partitionResult(partition1).get() + fail() + } catch { + case e: Exception => + val cause = e.getCause + assertTrue(cause.getClass.getName, cause.isInstanceOf[LeaderNotAvailableException]) + assertTrue(s"Wrong message ${cause.getMessage}", cause.getMessage.contains( + "Failed to elect leader for partition elect-preferred-leaders-topic-1-0 under strategy PreferredReplicaPartitionLeaderElectionStrategy")) + } + assertEquals(2, currentLeader(partition1)) + + // preferred leader unavailable with null argument + electResult = client.electPreferredLeaders(null, shortTimeout) + try { + electResult.partitions.get() + fail() + } catch { + case e: Exception => + val cause = e.getCause + assertTrue(cause.getClass.getName, cause.isInstanceOf[LeaderNotAvailableException]) + } + try { + electResult.partitionResult(partition1).get() + fail() + } catch { + case e: Exception => + val cause = e.getCause + assertTrue(cause.getClass.getName, cause.isInstanceOf[LeaderNotAvailableException]) + assertTrue(s"Wrong message ${cause.getMessage}", cause.getMessage.contains( + "Failed to elect leader for partition elect-preferred-leaders-topic-1-0 under strategy PreferredReplicaPartitionLeaderElectionStrategy")) + } + try { + electResult.partitionResult(partition2).get() + fail() + } catch { + case e: Exception => + val cause = e.getCause + assertTrue(cause.getClass.getName, cause.isInstanceOf[LeaderNotAvailableException]) + assertTrue(s"Wrong message ${cause.getMessage}", cause.getMessage.contains( + "Failed to elect leader for partition elect-preferred-leaders-topic-2-0 under strategy PreferredReplicaPartitionLeaderElectionStrategy")) + } + + assertEquals(2, currentLeader(partition1)) + assertEquals(2, currentLeader(partition2)) + } } object AdminClientIntegrationTest { diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index 4c0459e72e0..ad7fdbbf5a3 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -146,7 +146,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest { ApiKeys.DESCRIBE_ACLS -> classOf[DescribeAclsResponse], ApiKeys.ALTER_REPLICA_LOG_DIRS -> classOf[AlterReplicaLogDirsResponse], ApiKeys.DESCRIBE_LOG_DIRS -> classOf[DescribeLogDirsResponse], - ApiKeys.CREATE_PARTITIONS -> classOf[CreatePartitionsResponse] + ApiKeys.CREATE_PARTITIONS -> classOf[CreatePartitionsResponse], + ApiKeys.ELECT_PREFERRED_LEADERS -> classOf[ElectPreferredLeadersResponse] ) val requestKeyToError = Map[ApiKeys, Nothing => Errors]( @@ -187,7 +188,9 @@ class AuthorizerIntegrationTest extends BaseRequestTest { ApiKeys.ALTER_REPLICA_LOG_DIRS -> ((resp: AlterReplicaLogDirsResponse) => resp.responses.get(tp)), ApiKeys.DESCRIBE_LOG_DIRS -> ((resp: DescribeLogDirsResponse) => if (resp.logDirInfos.size() > 0) resp.logDirInfos.asScala.head._2.error else Errors.CLUSTER_AUTHORIZATION_FAILED), - ApiKeys.CREATE_PARTITIONS -> ((resp: CreatePartitionsResponse) => resp.errors.asScala.find(_._1 == topic).get._2.error) + ApiKeys.CREATE_PARTITIONS -> ((resp: CreatePartitionsResponse) => resp.errors.asScala.find(_._1 == topic).get._2.error), + ApiKeys.ELECT_PREFERRED_LEADERS -> ((resp: ElectPreferredLeadersResponse) => + ElectPreferredLeadersRequest.fromResponseData(resp.data()).get(tp).error()) ) val requestKeysToAcls = Map[ApiKeys, Map[Resource, Set[Acl]]]( @@ -225,7 +228,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest { ApiKeys.DELETE_ACLS -> clusterAlterAcl, ApiKeys.ALTER_REPLICA_LOG_DIRS -> clusterAlterAcl, ApiKeys.DESCRIBE_LOG_DIRS -> clusterDescribeAcl, - ApiKeys.CREATE_PARTITIONS -> topicAlterAcl + ApiKeys.CREATE_PARTITIONS -> topicAlterAcl, + ApiKeys.ELECT_PREFERRED_LEADERS -> clusterAlterAcl ) @Before @@ -382,6 +386,9 @@ class AuthorizerIntegrationTest extends BaseRequestTest { private def addOffsetsToTxnRequest = new AddOffsetsToTxnRequest.Builder(transactionalId, 1, 1, group).build() + private def electPreferredLeadersRequest = new ElectPreferredLeadersRequest.Builder( + ElectPreferredLeadersRequest.toRequestData(Collections.singleton(tp), 10000)).build() + @Test def testAuthorizationWithTopicExisting() { val requestKeyToRequest = mutable.LinkedHashMap[ApiKeys, AbstractRequest]( @@ -414,9 +421,9 @@ class AuthorizerIntegrationTest extends BaseRequestTest { ApiKeys.CREATE_PARTITIONS -> createPartitionsRequest, ApiKeys.ADD_PARTITIONS_TO_TXN -> addPartitionsToTxnRequest, ApiKeys.ADD_OFFSETS_TO_TXN -> addOffsetsToTxnRequest, - // Check StopReplica last since some APIs depend on replica availability - ApiKeys.STOP_REPLICA -> stopReplicaRequest + ApiKeys.STOP_REPLICA -> stopReplicaRequest, + ApiKeys.ELECT_PREFERRED_LEADERS -> electPreferredLeadersRequest ) for ((key, request) <- requestKeyToRequest) { @@ -462,7 +469,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest { ApiKeys.ADD_OFFSETS_TO_TXN -> addOffsetsToTxnRequest, ApiKeys.CREATE_PARTITIONS -> createPartitionsRequest, ApiKeys.DELETE_GROUPS -> deleteGroupsRequest, - ApiKeys.OFFSET_FOR_LEADER_EPOCH -> offsetsForLeaderEpochRequest + ApiKeys.OFFSET_FOR_LEADER_EPOCH -> offsetsForLeaderEpochRequest, + ApiKeys.ELECT_PREFERRED_LEADERS -> electPreferredLeadersRequest ) for ((key, request) <- requestKeyToRequest) { diff --git a/core/src/test/scala/unit/kafka/admin/PreferredReplicaLeaderElectionCommandTest.scala b/core/src/test/scala/unit/kafka/admin/PreferredReplicaLeaderElectionCommandTest.scala new file mode 100644 index 00000000000..824e8fb4253 --- /dev/null +++ b/core/src/test/scala/unit/kafka/admin/PreferredReplicaLeaderElectionCommandTest.scala @@ -0,0 +1,337 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.admin + +import java.io.File +import java.nio.charset.StandardCharsets +import java.nio.file.{Files, Paths} +import java.util.Properties + +import kafka.admin.PreferredReplicaLeaderElectionCommand +import kafka.common.{AdminCommandFailedException, TopicAndPartition} +import kafka.network.RequestChannel +import kafka.security.auth._ +import kafka.server.{KafkaConfig, KafkaServer} +import kafka.utils.{Logging, TestUtils, ZkUtils} +import kafka.zk.ZooKeeperTestHarness +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.errors.{ClusterAuthorizationException, PreferredLeaderNotAvailableException, TimeoutException, UnknownTopicOrPartitionException} +import org.apache.kafka.common.network.ListenerName +import org.junit.Assert._ +import org.junit.{After, Test} + +class PreferredReplicaLeaderElectionCommandTest extends ZooKeeperTestHarness with Logging /*with RackAwareTest*/ { + + var servers: Seq[KafkaServer] = Seq() + + @After + override def tearDown() { + TestUtils.shutdownServers(servers) + super.tearDown() + } + + private def createTestTopicAndCluster(topicPartition: Map[TopicPartition, List[Int]], + authorizer: Option[String] = None) { + + val brokerConfigs = TestUtils.createBrokerConfigs(3, zkConnect, false) + brokerConfigs.foreach(p => p.setProperty("auto.leader.rebalance.enable", "false")) + authorizer match { + case Some(className) => + brokerConfigs.foreach(p => p.setProperty("authorizer.class.name", className)) + case None => + } + createTestTopicAndCluster(topicPartition,brokerConfigs) + } + + private def createTestTopicAndCluster(partitionsAndAssignments: Map[TopicPartition, List[Int]], + brokerConfigs: Seq[Properties]) { + // create brokers + servers = brokerConfigs.map(b => TestUtils.createServer(KafkaConfig.fromProps(b))) + // create the topic + partitionsAndAssignments.foreach { case (tp, assigment) => + zkClient.createTopicAssignment(tp.topic(), + Map(tp -> assigment)) + } + // wait until replica log is created on every broker + TestUtils.waitUntilTrue(() => servers.forall(server => partitionsAndAssignments.forall(partitionAndAssignment => server.getLogManager().getLog(partitionAndAssignment._1).isDefined)), + "Replicas for topic test not created") + } + + /** Bounce the given targetServer and wait for all servers to get metadata for the given partition */ + private def bounceServer(targetServer: Int, partition: TopicPartition) { + debug(s"Shutting down server $targetServer so a non-preferred replica becomes leader") + servers(targetServer).shutdown() + debug(s"Starting server $targetServer now that a non-preferred replica is leader") + servers(targetServer).startup() + TestUtils.waitUntilTrue(() => servers.forall { server => + server.metadataCache.getPartitionInfo(partition.topic(), partition.partition()).exists { partitionState => + partitionState.basePartitionState.isr.contains(targetServer) + } + }, + s"Replicas for partition $partition not created") + } + + private def getController() = { + servers.find(p => p.kafkaController.isActive) + } + + private def getLeader(topicPartition: TopicPartition) = { + servers(0).metadataCache.getPartitionInfo(topicPartition.topic(), topicPartition.partition()).get.basePartitionState.leader + } + + private def bootstrapServer(broker: Int = 0): String = { + val port = servers(broker).socketServer.boundPort(ListenerName.normalised("PLAINTEXT")) + debug("Server bound to port "+port) + s"localhost:$port" + } + + val testPartition = new TopicPartition("test", 0) + val testPartitionAssignment = List(1, 2, 0) + val testPartitionPreferredLeader = testPartitionAssignment.head + val testPartitionAndAssignment = Map(testPartition -> testPartitionAssignment) + + /** Test the case multiple values are given for --bootstrap-broker */ + @Test + def testMultipleBrokersGiven() { + createTestTopicAndCluster(testPartitionAndAssignment) + bounceServer(testPartitionPreferredLeader, testPartition) + // Check the leader for the partition is not the preferred one + assertNotEquals(testPartitionPreferredLeader, getLeader(testPartition)) + PreferredReplicaLeaderElectionCommand.run(Array( + "--bootstrap-server", s"${bootstrapServer(1)},${bootstrapServer(0)}")) + // Check the leader for the partition IS the preferred one + assertEquals(testPartitionPreferredLeader, getLeader(testPartition)) + } + + /** Test the case when an invalid broker is given for --bootstrap-broker */ + @Test + def testInvalidBrokerGiven() { + try { + PreferredReplicaLeaderElectionCommand.run(Array( + "--bootstrap-server", "example.com:1234"), + timeout = 1000) + fail() + } catch { + case e: AdminCommandFailedException => + assertTrue(e.getCause.isInstanceOf[TimeoutException]) + } + } + + /** Test the case where no partitions are given (=> elect all partitions) */ + @Test + def testNoPartitionsGiven() { + createTestTopicAndCluster(testPartitionAndAssignment) + bounceServer(testPartitionPreferredLeader, testPartition) + // Check the leader for the partition is not the preferred one + assertNotEquals(testPartitionPreferredLeader, getLeader(testPartition)) + PreferredReplicaLeaderElectionCommand.run(Array( + "--bootstrap-server", bootstrapServer())) + // Check the leader for the partition IS the preferred one + assertEquals(testPartitionPreferredLeader, getLeader(testPartition)) + } + + private def toJsonFile(partitions: scala.collection.Set[TopicPartition]): File = { + val jsonFile = File.createTempFile("preferredreplicaelection", ".js") + jsonFile.deleteOnExit() + val jsonString = ZkUtils.preferredReplicaLeaderElectionZkData(partitions.map(new TopicAndPartition(_))) + debug("Using json: "+jsonString) + Files.write(Paths.get(jsonFile.getAbsolutePath), jsonString.getBytes(StandardCharsets.UTF_8)) + jsonFile + } + + /** Test the case where a list of partitions is given */ + @Test + def testSingletonPartitionGiven() { + createTestTopicAndCluster(testPartitionAndAssignment) + bounceServer(testPartitionPreferredLeader, testPartition) + // Check the leader for the partition is not the preferred one + assertNotEquals(testPartitionPreferredLeader, getLeader(testPartition)) + val jsonFile = toJsonFile(testPartitionAndAssignment.keySet) + try { + PreferredReplicaLeaderElectionCommand.run(Array( + "--bootstrap-server", bootstrapServer(), + "--path-to-json-file", jsonFile.getAbsolutePath)) + } finally { + jsonFile.delete() + } + // Check the leader for the partition IS the preferred one + assertEquals(testPartitionPreferredLeader, getLeader(testPartition)) + } + + /** Test the case where a topic does not exist */ + @Test + def testTopicDoesNotExist() { + val nonExistentPartition = new TopicPartition("does.not.exist", 0) + val nonExistentPartitionAssignment = List(1, 2, 0) + val nonExistentPartitionAndAssignment = Map(nonExistentPartition -> nonExistentPartitionAssignment) + + createTestTopicAndCluster(testPartitionAndAssignment) + val jsonFile = toJsonFile(nonExistentPartitionAndAssignment.keySet) + try { + PreferredReplicaLeaderElectionCommand.run(Array( + "--bootstrap-server", bootstrapServer(), + "--path-to-json-file", jsonFile.getAbsolutePath)) + } catch { + case e: AdminCommandFailedException => + val suppressed = e.getSuppressed()(0) + assertTrue(suppressed.isInstanceOf[UnknownTopicOrPartitionException]) + case e: Throwable => + e.printStackTrace() + throw e + } finally { + jsonFile.delete() + } + } + + /** Test the case where several partitions are given */ + @Test + def testMultiplePartitionsSameAssignment() { + val testPartitionA = new TopicPartition("testA", 0) + val testPartitionB = new TopicPartition("testB", 0) + val testPartitionAssignment = List(1, 2, 0) + val testPartitionPreferredLeader = testPartitionAssignment.head + val testPartitionAndAssignment = Map(testPartitionA -> testPartitionAssignment, testPartitionB -> testPartitionAssignment) + + createTestTopicAndCluster(testPartitionAndAssignment) + bounceServer(testPartitionPreferredLeader, testPartitionA) + // Check the leader for the partition is not the preferred one + assertNotEquals(testPartitionPreferredLeader, getLeader(testPartitionA)) + assertNotEquals(testPartitionPreferredLeader, getLeader(testPartitionB)) + val jsonFile = toJsonFile(testPartitionAndAssignment.keySet) + try { + PreferredReplicaLeaderElectionCommand.run(Array( + "--bootstrap-server", bootstrapServer(), + "--path-to-json-file", jsonFile.getAbsolutePath)) + } finally { + jsonFile.delete() + } + // Check the leader for the partition IS the preferred one + assertEquals(testPartitionPreferredLeader, getLeader(testPartitionA)) + assertEquals(testPartitionPreferredLeader, getLeader(testPartitionB)) + } + + /** What happens when the preferred replica is already the leader? */ + @Test + def testNoopElection() { + createTestTopicAndCluster(testPartitionAndAssignment) + // Don't bounce the server. Doublec heck the leader for the partition is the preferred one + assertEquals(testPartitionPreferredLeader, getLeader(testPartition)) + val jsonFile = toJsonFile(testPartitionAndAssignment.keySet) + try { + // Now do the election, even though the preferred replica is *already* the leader + PreferredReplicaLeaderElectionCommand.run(Array( + "--bootstrap-server", bootstrapServer(), + "--path-to-json-file", jsonFile.getAbsolutePath)) + // Check the leader for the partition still is the preferred one + assertEquals(testPartitionPreferredLeader, getLeader(testPartition)) + } finally { + jsonFile.delete() + } + } + + /** What happens if the preferred replica is offline? */ + @Test + def testWithOfflinePreferredReplica() { + createTestTopicAndCluster(testPartitionAndAssignment) + bounceServer(testPartitionPreferredLeader, testPartition) + // Check the leader for the partition is not the preferred one + val leader = getLeader(testPartition) + assertNotEquals(testPartitionPreferredLeader, leader) + // Now kill the preferred one + servers(testPartitionPreferredLeader).shutdown() + // Now try to elect the preferred one + val jsonFile = toJsonFile(testPartitionAndAssignment.keySet) + try { + PreferredReplicaLeaderElectionCommand.run(Array( + "--bootstrap-server", bootstrapServer(), + "--path-to-json-file", jsonFile.getAbsolutePath)) + fail(); + } catch { + case e: AdminCommandFailedException => + assertEquals("1 preferred replica(s) could not be elected", e.getMessage) + val suppressed = e.getSuppressed()(0) + assertTrue(suppressed.isInstanceOf[PreferredLeaderNotAvailableException]) + assertTrue(suppressed.getMessage, suppressed.getMessage.contains("Failed to elect leader for partition test-0 under strategy PreferredReplicaPartitionLeaderElectionStrategy")) + // Check we still have the same leader + assertEquals(leader, getLeader(testPartition)) + } finally { + jsonFile.delete() + } + } + + /** What happens if the controller gets killed just before an election? */ + @Test + def testTimeout() { + createTestTopicAndCluster(testPartitionAndAssignment) + bounceServer(testPartitionPreferredLeader, testPartition) + // Check the leader for the partition is not the preferred one + val leader = getLeader(testPartition) + assertNotEquals(testPartitionPreferredLeader, leader) + // Now kill the controller just before we trigger the election + val controller = getController().get.config.brokerId + servers(controller).shutdown() + val jsonFile = toJsonFile(testPartitionAndAssignment.keySet) + try { + PreferredReplicaLeaderElectionCommand.run(Array( + "--bootstrap-server", bootstrapServer(controller), + "--path-to-json-file", jsonFile.getAbsolutePath), + timeout = 2000) + fail(); + } catch { + case e: AdminCommandFailedException => + assertEquals("1 preferred replica(s) could not be elected", e.getMessage) + assertTrue(e.getSuppressed()(0).getMessage.contains("Timed out waiting for a node assignment")) + // Check we still have the same leader + assertEquals(leader, getLeader(testPartition)) + } finally { + jsonFile.delete() + } + } + + /** Test the case where client is not authorized */ + @Test + def testAuthzFailure() { + createTestTopicAndCluster(testPartitionAndAssignment, Some(classOf[PreferredReplicaLeaderElectionCommandTestAuthorizer].getName)) + bounceServer(testPartitionPreferredLeader, testPartition) + // Check the leader for the partition is not the preferred one + val leader = getLeader(testPartition) + assertNotEquals(testPartitionPreferredLeader, leader) + // Check the leader for the partition is not the preferred one + assertNotEquals(testPartitionPreferredLeader, getLeader(testPartition)) + val jsonFile = toJsonFile(testPartitionAndAssignment.keySet) + try { + PreferredReplicaLeaderElectionCommand.run(Array( + "--bootstrap-server", bootstrapServer(), + "--path-to-json-file", jsonFile.getAbsolutePath)) + fail(); + } catch { + case e: AdminCommandFailedException => + assertEquals("1 preferred replica(s) could not be elected", e.getMessage) + assertTrue(e.getSuppressed()(0).isInstanceOf[ClusterAuthorizationException]) + // Check we still have the same leader + assertEquals(leader, getLeader(testPartition)) + } finally { + jsonFile.delete() + } + } + +} + +class PreferredReplicaLeaderElectionCommandTestAuthorizer extends SimpleAclAuthorizer { + override def authorize(session: RequestChannel.Session, operation: Operation, resource: Resource): Boolean = + operation != Alter || resource.resourceType != Cluster +} diff --git a/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala index d4dcd9f100f..d5beceae8e5 100644 --- a/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala @@ -158,7 +158,7 @@ object AbstractCoordinatorConcurrencyTest { } class TestReplicaManager extends ReplicaManager( - null, null, null, null, null, null, null, null, null, null, null, null, null, null, None) { + null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, None) { var producePurgatory: DelayedOperationPurgatory[DelayedProduce] = _ var watchKeys: mutable.Set[TopicPartitionOperationKey] = _ diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index 9b4210ed59e..a3ecb0773c1 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -524,7 +524,7 @@ class KafkaApisTest { channel.buffer.getInt() // read the size ResponseHeader.parse(channel.buffer) val struct = api.responseSchema(request.version).read(channel.buffer) - AbstractResponse.parseResponse(api, struct) + AbstractResponse.parseResponse(api, struct, request.version) } private def expectNoThrottling(): Capture[RequestChannel.Response] = { diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 1308820f48c..08aa624734b 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -657,6 +657,8 @@ class ReplicaManagerTest { purgatoryName = "Fetch", timer, reaperEnabled = false) val mockDeleteRecordsPurgatory = new DelayedOperationPurgatory[DelayedDeleteRecords]( purgatoryName = "DeleteRecords", timer, reaperEnabled = false) + val mockElectPreferredLeaderPurgatory = new DelayedOperationPurgatory[DelayedElectPreferredLeader]( + purgatoryName = "ElectPreferredLeader", timer, reaperEnabled = false) // Mock network client to show leader offset of 5 val quota = QuotaFactory.instantiate(config, metrics, time, "") @@ -665,7 +667,7 @@ class ReplicaManagerTest { val replicaManager = new ReplicaManager(config, metrics, time, kafkaZkClient, mockScheduler, mockLogMgr, new AtomicBoolean(false), quota, mockBrokerTopicStats, metadataCache, mockLogDirFailureChannel, mockProducePurgatory, mockFetchPurgatory, - mockDeleteRecordsPurgatory, Option(this.getClass.getName)) { + mockDeleteRecordsPurgatory, mockElectPreferredLeaderPurgatory, Option(this.getClass.getName)) { override protected def createReplicaFetcherManager(metrics: Metrics, time: Time, @@ -815,11 +817,13 @@ class ReplicaManagerTest { purgatoryName = "Fetch", timer, reaperEnabled = false) val mockDeleteRecordsPurgatory = new DelayedOperationPurgatory[DelayedDeleteRecords]( purgatoryName = "DeleteRecords", timer, reaperEnabled = false) + val mockDelayedElectPreferredLeaderPurgatory = new DelayedOperationPurgatory[DelayedElectPreferredLeader]( + purgatoryName = "DelayedElectPreferredLeader", timer, reaperEnabled = false) new ReplicaManager(config, metrics, time, kafkaZkClient, new MockScheduler(time), mockLogMgr, new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time, ""), new BrokerTopicStats, metadataCache, new LogDirFailureChannel(config.logDirs.size), mockProducePurgatory, mockFetchPurgatory, - mockDeleteRecordsPurgatory, Option(this.getClass.getName)) + mockDeleteRecordsPurgatory, mockDelayedElectPreferredLeaderPurgatory, Option(this.getClass.getName)) } } diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala index c9e4e7890eb..3176f72939a 100644 --- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala +++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala @@ -24,6 +24,7 @@ import kafka.security.auth._ import kafka.utils.TestUtils import org.apache.kafka.common.acl.{AccessControlEntry, AccessControlEntryFilter, AclBinding, AclBindingFilter, AclOperation, AclPermissionType} import org.apache.kafka.common.config.ConfigResource +import org.apache.kafka.common.message.ElectPreferredLeadersRequestData import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourcePatternFilter, ResourceType => AdminResourceType} import org.apache.kafka.common.{Node, TopicPartition} import org.apache.kafka.common.metrics.{KafkaMetric, Quota, Sensor} @@ -359,6 +360,15 @@ class RequestQuotaTest extends BaseRequestTest { case ApiKeys.DELETE_GROUPS => new DeleteGroupsRequest.Builder(Collections.singleton("test-group")) + case ApiKeys.ELECT_PREFERRED_LEADERS => + val partition = new ElectPreferredLeadersRequestData.TopicPartitions() + .setPartitionId(Collections.singletonList(0)) + .setTopic("my_topic") + new ElectPreferredLeadersRequest.Builder( + new ElectPreferredLeadersRequestData() + .setTimeoutMs(0) + .setTopicPartitions(Collections.singletonList(partition))) + case _ => throw new IllegalArgumentException("Unsupported API key " + apiKey) } @@ -450,6 +460,7 @@ class RequestQuotaTest extends BaseRequestTest { case ApiKeys.RENEW_DELEGATION_TOKEN => new RenewDelegationTokenResponse(response).throttleTimeMs case ApiKeys.DELETE_GROUPS => new DeleteGroupsResponse(response).throttleTimeMs case ApiKeys.OFFSET_FOR_LEADER_EPOCH => new OffsetsForLeaderEpochResponse(response).throttleTimeMs + case ApiKeys.ELECT_PREFERRED_LEADERS => new ElectPreferredLeadersResponse(response, 0).throttleTimeMs case requestId => throw new IllegalArgumentException(s"No throttle time for $requestId") } }