KAFKA-5692: Change PreferredReplicaLeaderElectionCommand to use Admin… (#3848)

See also KIP-183.

This implements the following algorithm:

AdminClient sends ElectPreferredLeadersRequest.
KafakApis receives ElectPreferredLeadersRequest and delegates to
ReplicaManager.electPreferredLeaders()
ReplicaManager delegates to KafkaController.electPreferredLeaders()
KafkaController adds a PreferredReplicaLeaderElection to the EventManager,
ReplicaManager.electPreferredLeaders()'s callback uses the
delayedElectPreferredReplicasPurgatory to wait for the results of the
election to appear in the metadata cache. If there are no results
because of errors, or because the preferred leaders are already leading
the partitions then a response is returned immediately.
In the EventManager work thread the preferred leader is elected as follows:

The EventManager runs PreferredReplicaLeaderElection.process()
process() calls KafkaController.onPreferredReplicaElectionWithResults()
KafkaController.onPreferredReplicaElectionWithResults()
calls the PartitionStateMachine.handleStateChangesWithResults() to
perform the election (asynchronously the PSM will send LeaderAndIsrRequest
to the new and old leaders and UpdateMetadataRequest to all brokers)
then invokes the callback.

Reviewers: Colin P. McCabe <cmccabe@apache.org>, Jun Rao <junrao@gmail.com>
This commit is contained in:
Tom Bentley 2019-01-25 22:06:18 +00:00 committed by Jun Rao
parent a65940cd82
commit 269b65279c
35 changed files with 1798 additions and 82 deletions

View File

@ -116,6 +116,7 @@
<subpackage name="protocol">
<allow pkg="org.apache.kafka.common.errors" />
<allow pkg="org.apache.kafka.common.message" />
<allow pkg="org.apache.kafka.common.protocol.types" />
<allow pkg="org.apache.kafka.common.record" />
<allow pkg="org.apache.kafka.common.requests" />
@ -140,6 +141,7 @@
<subpackage name="requests">
<allow pkg="org.apache.kafka.common.acl" />
<allow pkg="org.apache.kafka.common.protocol" />
<allow pkg="org.apache.kafka.common.message" />
<allow pkg="org.apache.kafka.common.network" />
<allow pkg="org.apache.kafka.common.requests" />
<allow pkg="org.apache.kafka.common.resource" />

View File

@ -676,7 +676,8 @@ public class NetworkClient implements KafkaClient {
public static AbstractResponse parseResponse(ByteBuffer responseBuffer, RequestHeader requestHeader) {
Struct responseStruct = parseStructMaybeUpdateThrottleTimeMetrics(responseBuffer, requestHeader, null, 0);
return AbstractResponse.parseResponse(requestHeader.apiKey(), responseStruct);
return AbstractResponse.parseResponse(requestHeader.apiKey(), responseStruct,
requestHeader.apiVersion());
}
private static Struct parseStructMaybeUpdateThrottleTimeMetrics(ByteBuffer responseBuffer, RequestHeader requestHeader,
@ -811,7 +812,8 @@ public class NetworkClient implements KafkaClient {
req.header.apiKey(), req.header.correlationId(), responseStruct);
}
// If the received response includes a throttle delay, throttle the connection.
AbstractResponse body = AbstractResponse.parseResponse(req.header.apiKey(), responseStruct);
AbstractResponse body = AbstractResponse.
parseResponse(req.header.apiKey(), responseStruct, req.header.apiVersion());
maybeThrottle(body, req.header.apiVersion(), req.destination, now);
if (req.isInternalRequest && body instanceof MetadataResponse)
metadataUpdater.handleCompletedMetadataResponse(req.header, now, (MetadataResponse) body);

View File

@ -792,6 +792,58 @@ public abstract class AdminClient implements AutoCloseable {
return deleteConsumerGroups(groupIds, new DeleteConsumerGroupsOptions());
}
/**
* Elect the preferred broker of the given {@code partitions} as leader, or
* elect the preferred broker for all partitions as leader if the argument to {@code partitions} is null.
*
* This is a convenience method for {@link #electPreferredLeaders(Collection, ElectPreferredLeadersOptions)}
* with default options.
* See the overload for more details.
*
* @param partitions The partitions for which the preferred leader should be elected.
* @return The ElectPreferredLeadersResult.
*/
public ElectPreferredLeadersResult electPreferredLeaders(Collection<TopicPartition> partitions) {
return electPreferredLeaders(partitions, new ElectPreferredLeadersOptions());
}
/**
* Elect the preferred broker of the given {@code partitions} as leader, or
* elect the preferred broker for all partitions as leader if the argument to {@code partitions} is null.
*
* This operation is not transactional so it may succeed for some partitions while fail for others.
*
* It may take several seconds after this method returns
* success for all the brokers in the cluster to become aware that the partitions have new leaders.
* During this time, {@link AdminClient#describeTopics(Collection)}
* may not return information about the partitions' new leaders.
*
* This operation is supported by brokers with version 2.2.0 or higher.
*
* <p>The following exceptions can be anticipated when calling {@code get()} on the futures obtained from
* the returned {@code ElectPreferredLeadersResult}:</p>
* <ul>
* <li>{@link org.apache.kafka.common.errors.ClusterAuthorizationException}
* if the authenticated user didn't have alter access to the cluster.</li>
* <li>{@link org.apache.kafka.common.errors.UnknownTopicOrPartitionException}
* if the topic or partition did not exist within the cluster.</li>
* <li>{@link org.apache.kafka.common.errors.InvalidTopicException}
* if the topic was already queued for deletion.</li>
* <li>{@link org.apache.kafka.common.errors.NotControllerException}
* if the request was sent to a broker that was not the controller for the cluster.</li>
* <li>{@link org.apache.kafka.common.errors.TimeoutException}
* if the request timed out before the election was complete.</li>
* <li>{@link org.apache.kafka.common.errors.LeaderNotAvailableException}
* if the preferred leader was not alive or not in the ISR.</li>
* </ul>
*
* @param partitions The partitions for which the preferred leader should be elected.
* @param options The options to use when electing the preferred leaders.
* @return The ElectPreferredLeadersResult.
*/
public abstract ElectPreferredLeadersResult electPreferredLeaders(Collection<TopicPartition> partitions,
ElectPreferredLeadersOptions options);
/**
* Get the metrics kept by the adminClient
*/

View File

@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.annotation.InterfaceStability;
import java.util.Collection;
/**
* Options for {@link AdminClient#electPreferredLeaders(Collection, ElectPreferredLeadersOptions)}.
*
* The API of this class is evolving, see {@link AdminClient} for details.
*/
@InterfaceStability.Evolving
public class ElectPreferredLeadersOptions extends AbstractOptions<ElectPreferredLeadersOptions> {
}

View File

@ -0,0 +1,136 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ApiError;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
/**
* The result of {@link AdminClient#electPreferredLeaders(Collection, ElectPreferredLeadersOptions)}
*
* The API of this class is evolving, see {@link AdminClient} for details.
*/
@InterfaceStability.Evolving
public class ElectPreferredLeadersResult {
private final KafkaFutureImpl<Map<TopicPartition, ApiError>> electionFuture;
private final Set<TopicPartition> partitions;
ElectPreferredLeadersResult(KafkaFutureImpl<Map<TopicPartition, ApiError>> electionFuture, Set<TopicPartition> partitions) {
this.electionFuture = electionFuture;
this.partitions = partitions;
}
/**
* Get the result of the election for the given {@code partition}.
* If there was not an election triggered for the given {@code partition}, the
* returned future will complete with an error.
*/
public KafkaFuture<Void> partitionResult(final TopicPartition partition) {
final KafkaFutureImpl<Void> result = new KafkaFutureImpl<>();
electionFuture.whenComplete(new KafkaFuture.BiConsumer<Map<TopicPartition, ApiError>, Throwable>() {
@Override
public void accept(Map<TopicPartition, ApiError> topicPartitions, Throwable throwable) {
if (throwable != null) {
result.completeExceptionally(throwable);
} else if (!topicPartitions.containsKey(partition)) {
result.completeExceptionally(new UnknownTopicOrPartitionException(
"Preferred leader election for partition \"" + partition +
"\" was not attempted"));
} else {
if (partitions == null && topicPartitions.isEmpty()) {
result.completeExceptionally(Errors.CLUSTER_AUTHORIZATION_FAILED.exception());
}
ApiException exception = topicPartitions.get(partition).exception();
if (exception == null) {
result.complete(null);
} else {
result.completeExceptionally(exception);
}
}
}
});
return result;
}
/**
* <p>Get a future for the topic partitions for which a leader election
* was attempted. A partition will be present in this result if
* an election was attempted even if the election was not successful.</p>
*
* <p>This method is provided to discover the partitions attempted when
* {@link AdminClient#electPreferredLeaders(Collection)} is called
* with a null {@code partitions} argument.</p>
*/
public KafkaFuture<Set<TopicPartition>> partitions() {
if (partitions != null) {
return KafkaFutureImpl.completedFuture(this.partitions);
} else {
final KafkaFutureImpl<Set<TopicPartition>> result = new KafkaFutureImpl<>();
electionFuture.whenComplete(new KafkaFuture.BiConsumer<Map<TopicPartition, ApiError>, Throwable>() {
@Override
public void accept(Map<TopicPartition, ApiError> topicPartitions, Throwable throwable) {
if (throwable != null) {
result.completeExceptionally(throwable);
} else if (topicPartitions.isEmpty()) {
result.completeExceptionally(Errors.CLUSTER_AUTHORIZATION_FAILED.exception());
} else {
for (ApiError apiError : topicPartitions.values()) {
if (apiError.isFailure()) {
result.completeExceptionally(apiError.exception());
}
}
result.complete(topicPartitions.keySet());
}
}
});
return result;
}
}
/**
* Return a future which succeeds if all the topic elections succeed.
*/
public KafkaFuture<Void> all() {
final KafkaFutureImpl<Void> result = new KafkaFutureImpl<>();
electionFuture.thenApply(new KafkaFuture.Function<Map<TopicPartition, ApiError>, Void>() {
@Override
public Void apply(Map<TopicPartition, ApiError> topicPartitions) {
for (ApiError apiError : topicPartitions.values()) {
if (apiError.isFailure()) {
result.completeExceptionally(apiError.exception());
return null;
}
}
result.complete(null);
return null;
}
});
return result;
}
}

View File

@ -105,6 +105,8 @@ import org.apache.kafka.common.requests.DescribeGroupsRequest;
import org.apache.kafka.common.requests.DescribeGroupsResponse;
import org.apache.kafka.common.requests.DescribeLogDirsRequest;
import org.apache.kafka.common.requests.DescribeLogDirsResponse;
import org.apache.kafka.common.requests.ElectPreferredLeadersRequest;
import org.apache.kafka.common.requests.ElectPreferredLeadersResponse;
import org.apache.kafka.common.requests.ExpireDelegationTokenRequest;
import org.apache.kafka.common.requests.ExpireDelegationTokenResponse;
import org.apache.kafka.common.requests.FindCoordinatorRequest;
@ -2777,4 +2779,35 @@ public class KafkaAdminClient extends AdminClient {
public Map<MetricName, ? extends Metric> metrics() {
return Collections.unmodifiableMap(this.metrics.metrics());
}
@Override
public ElectPreferredLeadersResult electPreferredLeaders(final Collection<TopicPartition> partitions,
ElectPreferredLeadersOptions options) {
final Set<TopicPartition> partitionSet = partitions != null ? new HashSet<>(partitions) : null;
final KafkaFutureImpl<Map<TopicPartition, ApiError>> electionFuture = new KafkaFutureImpl<>();
final long now = time.milliseconds();
runnable.call(new Call("electPreferredLeaders", calcDeadlineMs(now, options.timeoutMs()),
new ControllerNodeProvider()) {
@Override
public AbstractRequest.Builder createRequest(int timeoutMs) {
return new ElectPreferredLeadersRequest.Builder(
ElectPreferredLeadersRequest.toRequestData(partitions, timeoutMs));
}
@Override
public void handleResponse(AbstractResponse abstractResponse) {
ElectPreferredLeadersResponse response = (ElectPreferredLeadersResponse) abstractResponse;
electionFuture.complete(
ElectPreferredLeadersRequest.fromResponseData(response.data()));
}
@Override
void handleFailure(Throwable throwable) {
electionFuture.completeExceptionally(throwable);
}
}, now);
return new ElectPreferredLeadersResult(electionFuture, partitionSet);
}
}

View File

@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.errors;
public class PreferredLeaderNotAvailableException extends InvalidMetadataException {
public PreferredLeaderNotAvailableException(String message) {
super(message);
}
public PreferredLeaderNotAvailableException(String message, Throwable cause) {
super(message, cause);
}
}

View File

@ -16,6 +16,8 @@
*/
package org.apache.kafka.common.protocol;
import org.apache.kafka.common.message.ElectPreferredLeadersRequestData;
import org.apache.kafka.common.message.ElectPreferredLeadersResponseData;
import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.SchemaException;
import org.apache.kafka.common.protocol.types.Struct;
@ -35,10 +37,10 @@ import org.apache.kafka.common.requests.ControlledShutdownRequest;
import org.apache.kafka.common.requests.ControlledShutdownResponse;
import org.apache.kafka.common.requests.CreateAclsRequest;
import org.apache.kafka.common.requests.CreateAclsResponse;
import org.apache.kafka.common.requests.CreatePartitionsRequest;
import org.apache.kafka.common.requests.CreatePartitionsResponse;
import org.apache.kafka.common.requests.CreateDelegationTokenRequest;
import org.apache.kafka.common.requests.CreateDelegationTokenResponse;
import org.apache.kafka.common.requests.CreatePartitionsRequest;
import org.apache.kafka.common.requests.CreatePartitionsResponse;
import org.apache.kafka.common.requests.CreateTopicsRequest;
import org.apache.kafka.common.requests.CreateTopicsResponse;
import org.apache.kafka.common.requests.DeleteAclsRequest;
@ -53,12 +55,12 @@ import org.apache.kafka.common.requests.DescribeAclsRequest;
import org.apache.kafka.common.requests.DescribeAclsResponse;
import org.apache.kafka.common.requests.DescribeConfigsRequest;
import org.apache.kafka.common.requests.DescribeConfigsResponse;
import org.apache.kafka.common.requests.DescribeDelegationTokenRequest;
import org.apache.kafka.common.requests.DescribeDelegationTokenResponse;
import org.apache.kafka.common.requests.DescribeGroupsRequest;
import org.apache.kafka.common.requests.DescribeGroupsResponse;
import org.apache.kafka.common.requests.DescribeLogDirsRequest;
import org.apache.kafka.common.requests.DescribeLogDirsResponse;
import org.apache.kafka.common.requests.DescribeDelegationTokenRequest;
import org.apache.kafka.common.requests.DescribeDelegationTokenResponse;
import org.apache.kafka.common.requests.EndTxnRequest;
import org.apache.kafka.common.requests.EndTxnResponse;
import org.apache.kafka.common.requests.ExpireDelegationTokenRequest;
@ -186,7 +188,9 @@ public enum ApiKeys {
RENEW_DELEGATION_TOKEN(39, "RenewDelegationToken", RenewDelegationTokenRequest.schemaVersions(), RenewDelegationTokenResponse.schemaVersions()),
EXPIRE_DELEGATION_TOKEN(40, "ExpireDelegationToken", ExpireDelegationTokenRequest.schemaVersions(), ExpireDelegationTokenResponse.schemaVersions()),
DESCRIBE_DELEGATION_TOKEN(41, "DescribeDelegationToken", DescribeDelegationTokenRequest.schemaVersions(), DescribeDelegationTokenResponse.schemaVersions()),
DELETE_GROUPS(42, "DeleteGroups", DeleteGroupsRequest.schemaVersions(), DeleteGroupsResponse.schemaVersions());
DELETE_GROUPS(42, "DeleteGroups", DeleteGroupsRequest.schemaVersions(), DeleteGroupsResponse.schemaVersions()),
ELECT_PREFERRED_LEADERS(43, "ElectPreferredLeaders", ElectPreferredLeadersRequestData.SCHEMAS,
ElectPreferredLeadersResponseData.SCHEMAS);
private static final ApiKeys[] ID_TO_TYPE;
private static final int MIN_API_KEY = 0;

View File

@ -72,6 +72,7 @@ import org.apache.kafka.common.errors.OffsetOutOfRangeException;
import org.apache.kafka.common.errors.OperationNotAttemptedException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.PolicyViolationException;
import org.apache.kafka.common.errors.PreferredLeaderNotAvailableException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.ReassignmentInProgressException;
import org.apache.kafka.common.errors.RebalanceInProgressException;
@ -297,7 +298,9 @@ public enum Errors {
"election so the offsets cannot be guaranteed to be monotonically increasing",
OffsetNotAvailableException::new),
MEMBER_ID_REQUIRED(79, "The group member needs to have a valid member id before actually entering a consumer group",
MemberIdRequiredException::new);
MemberIdRequiredException::new),
PREFERRED_LEADER_NOT_AVAILABLE(80, "The preferred leader was not available",
PreferredLeaderNotAvailableException::new);
private static final Logger log = LoggerFactory.getLogger(Errors.class);

View File

@ -227,6 +227,8 @@ public abstract class AbstractRequest extends AbstractRequestResponse {
return new DescribeDelegationTokenRequest(struct, apiVersion);
case DELETE_GROUPS:
return new DeleteGroupsRequest(struct, apiVersion);
case ELECT_PREFERRED_LEADERS:
return new ElectPreferredLeadersRequest(struct, apiVersion);
default:
throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseRequest`, the " +
"code should be updated to do so.", apiKey));

View File

@ -68,7 +68,7 @@ public abstract class AbstractResponse extends AbstractRequestResponse {
protected abstract Struct toStruct(short version);
public static AbstractResponse parseResponse(ApiKeys apiKey, Struct struct) {
public static AbstractResponse parseResponse(ApiKeys apiKey, Struct struct, short version) {
switch (apiKey) {
case PRODUCE:
return new ProduceResponse(struct);
@ -156,6 +156,8 @@ public abstract class AbstractResponse extends AbstractRequestResponse {
return new DescribeDelegationTokenResponse(struct);
case DELETE_GROUPS:
return new DeleteGroupsResponse(struct);
case ELECT_PREFERRED_LEADERS:
return new ElectPreferredLeadersResponse(struct, version);
default:
throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseResponse`, the " +
"code should be updated to do so.", apiKey));

View File

@ -0,0 +1,129 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.requests;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.message.ElectPreferredLeadersRequestData;
import org.apache.kafka.common.message.ElectPreferredLeadersRequestData.TopicPartitions;
import org.apache.kafka.common.message.ElectPreferredLeadersResponseData;
import org.apache.kafka.common.message.ElectPreferredLeadersResponseData.ReplicaElectionResult;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.utils.CollectionUtils;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class ElectPreferredLeadersRequest extends AbstractRequest {
public static class Builder extends AbstractRequest.Builder<ElectPreferredLeadersRequest> {
private final ElectPreferredLeadersRequestData data;
public Builder(ElectPreferredLeadersRequestData data) {
super(ApiKeys.ELECT_PREFERRED_LEADERS);
this.data = data;
}
@Override
public ElectPreferredLeadersRequest build(short version) {
return new ElectPreferredLeadersRequest(data, version);
}
@Override
public String toString() {
return data.toString();
}
}
public static ElectPreferredLeadersRequestData toRequestData(Collection<TopicPartition> partitions, int timeoutMs) {
ElectPreferredLeadersRequestData d = new ElectPreferredLeadersRequestData()
.setTimeoutMs(timeoutMs);
if (partitions != null) {
for (Map.Entry<String, List<Integer>> tp : CollectionUtils.groupPartitionsByTopic(partitions).entrySet()) {
d.topicPartitions().add(new ElectPreferredLeadersRequestData.TopicPartitions().setTopic(tp.getKey()).setPartitionId(tp.getValue()));
}
} else {
d.setTopicPartitions(null);
}
return d;
}
public static Map<TopicPartition, ApiError> fromResponseData(ElectPreferredLeadersResponseData data) {
Map<TopicPartition, ApiError> map = new HashMap<>();
for (ElectPreferredLeadersResponseData.ReplicaElectionResult topicResults : data.replicaElectionResults()) {
for (ElectPreferredLeadersResponseData.PartitionResult partitionResult : topicResults.partitionResult()) {
map.put(new TopicPartition(topicResults.topic(), partitionResult.partitionId()),
new ApiError(Errors.forCode(partitionResult.errorCode()),
partitionResult.errorMessage()));
}
}
return map;
}
private final ElectPreferredLeadersRequestData data;
private final short version;
private ElectPreferredLeadersRequest(ElectPreferredLeadersRequestData data, short version) {
super(ApiKeys.ELECT_PREFERRED_LEADERS, version);
this.data = data;
this.version = version;
}
public ElectPreferredLeadersRequest(Struct struct, short version) {
super(ApiKeys.ELECT_PREFERRED_LEADERS, version);
this.data = new ElectPreferredLeadersRequestData(struct, version);
this.version = version;
}
public ElectPreferredLeadersRequestData data() {
return data;
}
@Override
public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
ElectPreferredLeadersResponseData response = new ElectPreferredLeadersResponseData();
response.setThrottleTimeMs(throttleTimeMs);
ApiError apiError = ApiError.fromThrowable(e);
for (TopicPartitions topic : data.topicPartitions()) {
ReplicaElectionResult electionResult = new ReplicaElectionResult().setTopic(topic.topic());
for (Integer partitionId : topic.partitionId()) {
electionResult.partitionResult().add(new ElectPreferredLeadersResponseData.PartitionResult()
.setPartitionId(partitionId)
.setErrorCode(apiError.error().code())
.setErrorMessage(apiError.message()));
}
response.replicaElectionResults().add(electionResult);
}
return new ElectPreferredLeadersResponse(response);
}
public static ElectPreferredLeadersRequest parse(ByteBuffer buffer, short version) {
return new ElectPreferredLeadersRequest(ApiKeys.ELECT_PREFERRED_LEADERS.parseRequest(version, buffer), version);
}
/**
* Visible for testing.
*/
@Override
public Struct toStruct() {
return data.toStruct(version);
}
}

View File

@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.requests;
import org.apache.kafka.common.message.ElectPreferredLeadersResponseData;
import org.apache.kafka.common.message.ElectPreferredLeadersResponseData.PartitionResult;
import org.apache.kafka.common.message.ElectPreferredLeadersResponseData.ReplicaElectionResult;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.Struct;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
public class ElectPreferredLeadersResponse extends AbstractResponse {
private final ElectPreferredLeadersResponseData data;
public ElectPreferredLeadersResponse(ElectPreferredLeadersResponseData data) {
this.data = data;
}
public ElectPreferredLeadersResponse(Struct struct, short version) {
this.data = new ElectPreferredLeadersResponseData(struct, version);
}
public ElectPreferredLeadersResponseData data() {
return data;
}
@Override
protected Struct toStruct(short version) {
return data.toStruct(version);
}
@Override
public int throttleTimeMs() {
return data.throttleTimeMs();
}
@Override
public Map<Errors, Integer> errorCounts() {
HashMap<Errors, Integer> counts = new HashMap<>();
for (ReplicaElectionResult result : data.replicaElectionResults()) {
for (PartitionResult partitionResult : result.partitionResult()) {
Errors error = Errors.forCode(partitionResult.errorCode());
counts.put(error, counts.getOrDefault(error, 0) + 1);
}
}
return counts;
}
public static ElectPreferredLeadersResponse parse(ByteBuffer buffer, short version) {
return new ElectPreferredLeadersResponse(
ApiKeys.ELECT_PREFERRED_LEADERS.responseSchema(version).read(buffer), version);
}
@Override
public boolean shouldClientThrottle(short version) {
return version >= 3;
}
}

View File

@ -0,0 +1,33 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
{
"apiKey": 43,
"type": "request",
"name": "ElectPreferredLeadersRequest",
"validVersions": "0",
"fields": [
{ "name": "TopicPartitions", "type": "[]TopicPartitions", "versions": "0+", "nullableVersions": "0+",
"about": "The topic partitions to elect the preferred leader of.",
"fields": [
{ "name": "Topic", "type": "string", "versions": "0+",
"about": "The name of a topic." },
{ "name": "PartitionId", "type": "[]int32", "versions": "0+",
"about": "The partitions of this topic whose preferred leader should be elected" }
]},
{ "name": "TimeoutMs", "type": "int32", "versions": "0+", "default": "60000",
"about": "The time in ms to wait for the election to complete." }
]
}

View File

@ -0,0 +1,39 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
{
"apiKey": 43,
"type": "response",
"name": "ElectPreferredLeadersResponse",
"validVersions": "0",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
{ "name": "ReplicaElectionResults", "type": "[]ReplicaElectionResult", "versions": "0+",
"about": "The error code, or 0 if there was no error.", "fields": [
{ "name": "Topic", "type": "string", "versions": "0+",
"about": "The topic name" },
{ "name": "PartitionResult", "type": "[]PartitionResult", "versions": "0+",
"about": "The results for each partition", "fields": [
{ "name": "PartitionId", "type": "int32", "versions": "0+",
"about": "The partition id" },
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The result error, or zero if there was no error."},
{ "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+",
"about": "The result message, or null if there was no error."}
]}
]}
]
}

View File

@ -187,7 +187,7 @@ One very common pattern in Kafka is to load array elements from a message into
a Map or Set for easier access. The message protocol makes this easier with
the "mapKey" concept.
If some of the elemements of an array are annotated with "mapKey": true, the
If some of the elements of an array are annotated with "mapKey": true, the
entire array will be treated as a linked hash set rather than a list. Elements
in this set will be accessible in O(1) time with an automatically generated
"find" function. The order of elements in the set will still be preserved,

View File

@ -38,6 +38,7 @@ import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.ClusterAuthorizationException;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.InvalidTopicException;
@ -50,6 +51,9 @@ import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicDeletionDisabledException;
import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.message.ElectPreferredLeadersResponseData;
import org.apache.kafka.common.message.ElectPreferredLeadersResponseData.PartitionResult;
import org.apache.kafka.common.message.ElectPreferredLeadersResponseData.ReplicaElectionResult;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.common.requests.CreateAclsResponse;
@ -67,6 +71,7 @@ import org.apache.kafka.common.requests.DeleteTopicsResponse;
import org.apache.kafka.common.requests.DescribeAclsResponse;
import org.apache.kafka.common.requests.DescribeConfigsResponse;
import org.apache.kafka.common.requests.DescribeGroupsResponse;
import org.apache.kafka.common.requests.ElectPreferredLeadersResponse;
import org.apache.kafka.common.requests.FindCoordinatorResponse;
import org.apache.kafka.common.requests.ListGroupsResponse;
import org.apache.kafka.common.requests.MetadataRequest;
@ -634,6 +639,55 @@ public class KafkaAdminClientTest {
}
}
@Test
public void testElectPreferredLeaders() throws Exception {
TopicPartition topic1 = new TopicPartition("topic", 0);
TopicPartition topic2 = new TopicPartition("topic", 2);
try (AdminClientUnitTestEnv env = mockClientEnv()) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
// Test a call where one partition has an error.
ApiError value = ApiError.fromThrowable(new ClusterAuthorizationException(null));
ElectPreferredLeadersResponseData responseData = new ElectPreferredLeadersResponseData();
ReplicaElectionResult r = new ReplicaElectionResult().setTopic(topic1.topic());
r.partitionResult().add(new PartitionResult()
.setPartitionId(topic1.partition())
.setErrorCode(ApiError.NONE.error().code())
.setErrorMessage(ApiError.NONE.message()));
r.partitionResult().add(new PartitionResult()
.setPartitionId(topic2.partition())
.setErrorCode(value.error().code())
.setErrorMessage(value.message()));
responseData.replicaElectionResults().add(r);
env.kafkaClient().prepareResponse(new ElectPreferredLeadersResponse(responseData));
ElectPreferredLeadersResult results = env.adminClient().electPreferredLeaders(asList(topic1, topic2));
results.partitionResult(topic1).get();
TestUtils.assertFutureError(results.partitionResult(topic2), ClusterAuthorizationException.class);
TestUtils.assertFutureError(results.all(), ClusterAuthorizationException.class);
// Test a call where there are no errors.
r.partitionResult().clear();
r.partitionResult().add(new PartitionResult()
.setPartitionId(topic1.partition())
.setErrorCode(ApiError.NONE.error().code())
.setErrorMessage(ApiError.NONE.message()));
r.partitionResult().add(new PartitionResult()
.setPartitionId(topic2.partition())
.setErrorCode(ApiError.NONE.error().code())
.setErrorMessage(ApiError.NONE.message()));
env.kafkaClient().prepareResponse(new ElectPreferredLeadersResponse(responseData));
results = env.adminClient().electPreferredLeaders(asList(topic1, topic2));
results.partitionResult(topic1).get();
results.partitionResult(topic2).get();
// Now try a timeout
results = env.adminClient().electPreferredLeaders(asList(topic1, topic2), new ElectPreferredLeadersOptions().timeoutMs(100));
TestUtils.assertFutureError(results.partitionResult(topic1), TimeoutException.class);
TestUtils.assertFutureError(results.partitionResult(topic2), TimeoutException.class);
}
}
/**
* Test handling timeouts.
*/

View File

@ -327,6 +327,10 @@ public class MockAdminClient extends AdminClient {
throw new UnsupportedOperationException("Not implemented yet");
}
public ElectPreferredLeadersResult electPreferredLeaders(Collection<TopicPartition> partitions, ElectPreferredLeadersOptions options) {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public CreateAclsResult createAcls(Collection<AclBinding> acls, CreateAclsOptions options) {
throw new UnsupportedOperationException("Not implemented yet");

View File

@ -68,7 +68,8 @@ public class RequestContextTest {
assertEquals(correlationId, responseHeader.correlationId());
Struct struct = ApiKeys.API_VERSIONS.parseResponse((short) 0, responseBuffer);
ApiVersionsResponse response = (ApiVersionsResponse) AbstractResponse.parseResponse(ApiKeys.API_VERSIONS, struct);
ApiVersionsResponse response = (ApiVersionsResponse)
AbstractResponse.parseResponse(ApiKeys.API_VERSIONS, struct, (short) 0);
assertEquals(Errors.UNSUPPORTED_VERSION, response.error());
assertTrue(response.apiVersions().isEmpty());
}

View File

@ -32,6 +32,11 @@ import org.apache.kafka.common.errors.NotEnoughReplicasException;
import org.apache.kafka.common.errors.SecurityDisabledException;
import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.ElectPreferredLeadersRequestData;
import org.apache.kafka.common.message.ElectPreferredLeadersRequestData.TopicPartitions;
import org.apache.kafka.common.message.ElectPreferredLeadersResponseData;
import org.apache.kafka.common.message.ElectPreferredLeadersResponseData.PartitionResult;
import org.apache.kafka.common.message.ElectPreferredLeadersResponseData.ReplicaElectionResult;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.network.Send;
import org.apache.kafka.common.protocol.ApiKeys;
@ -304,6 +309,10 @@ public class RequestResponseTest {
checkRequest(createRenewTokenRequest());
checkErrorResponse(createRenewTokenRequest(), new UnknownServerException());
checkResponse(createRenewTokenResponse(), 0);
checkRequest(createElectPreferredLeadersRequest());
checkRequest(createElectPreferredLeadersRequestNullPartitions());
checkErrorResponse(createElectPreferredLeadersRequest(), new UnknownServerException());
checkResponse(createElectPreferredLeadersResponse(), 0);
}
@Test
@ -460,7 +469,7 @@ public class RequestResponseTest {
Struct deserializedStruct = ApiKeys.PRODUCE.parseResponse(version, buffer);
ProduceResponse v5FromBytes = (ProduceResponse) AbstractResponse.parseResponse(ApiKeys.PRODUCE,
deserializedStruct);
deserializedStruct, version);
assertEquals(1, v5FromBytes.responses().size());
assertTrue(v5FromBytes.responses().containsKey(tp0));
@ -1322,4 +1331,33 @@ public class RequestResponseTest {
return new DescribeDelegationTokenResponse(20, Errors.NONE, tokenList);
}
private ElectPreferredLeadersRequest createElectPreferredLeadersRequestNullPartitions() {
return new ElectPreferredLeadersRequest.Builder(
new ElectPreferredLeadersRequestData()
.setTimeoutMs(100)
.setTopicPartitions(null))
.build((short) 0);
}
private ElectPreferredLeadersRequest createElectPreferredLeadersRequest() {
ElectPreferredLeadersRequestData data = new ElectPreferredLeadersRequestData()
.setTimeoutMs(100);
data.topicPartitions().add(new TopicPartitions().setTopic("data").setPartitionId(asList(1, 2)));
return new ElectPreferredLeadersRequest.Builder(data).build((short) 0);
}
private ElectPreferredLeadersResponse createElectPreferredLeadersResponse() {
ElectPreferredLeadersResponseData data = new ElectPreferredLeadersResponseData().setThrottleTimeMs(200);
ReplicaElectionResult resultsByTopic = new ReplicaElectionResult().setTopic("myTopic");
resultsByTopic.partitionResult().add(new PartitionResult().setPartitionId(0)
.setErrorCode(Errors.NONE.code())
.setErrorMessage(Errors.NONE.message()));
resultsByTopic.partitionResult().add(new PartitionResult().setPartitionId(1)
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
.setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message()));
data.replicaElectionResults().add(resultsByTopic);
return new ElectPreferredLeadersResponse(data);
}
}

View File

@ -16,12 +16,20 @@
*/
package kafka.admin
import java.util.Properties
import java.util.concurrent.ExecutionException
import joptsimple.OptionSpecBuilder
import kafka.common.AdminCommandFailedException
import kafka.utils._
import kafka.zk.KafkaZkClient
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.clients.admin.AdminClientConfig
import org.apache.kafka.common.errors.TimeoutException
import collection.JavaConverters._
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.common.{KafkaFuture, TopicPartition}
import org.apache.zookeeper.KeeperException.NodeExistsException
import collection._
@ -29,33 +37,47 @@ import collection._
object PreferredReplicaLeaderElectionCommand extends Logging {
def main(args: Array[String]): Unit = {
val timeout = 30000
run(args, timeout)
}
def run(args: Array[String], timeout: Int = 30000): Unit = {
val commandOpts = new PreferredReplicaLeaderElectionCommandOptions(args)
CommandLineUtils.printHelpAndExitIfNeeded(commandOpts, "This tool helps to causes leadership for each partition to be transferred back to the 'preferred replica'," +
" it can be used to balance leadership among the servers.")
CommandLineUtils.checkRequiredArgs(commandOpts.parser, commandOpts.options, commandOpts.zkConnectOpt)
CommandLineUtils.checkRequiredArgs(commandOpts.parser, commandOpts.options)
val zkConnect = commandOpts.options.valueOf(commandOpts.zkConnectOpt)
var zkClient: KafkaZkClient = null
try {
val time = Time.SYSTEM
zkClient = KafkaZkClient(zkConnect, JaasUtils.isZkSecurityEnabled, 30000, 30000, Int.MaxValue, time)
if (commandOpts.options.has(commandOpts.bootstrapServerOpt) == commandOpts.options.has(commandOpts.zkConnectOpt)) {
CommandLineUtils.printUsageAndDie(commandOpts.parser, s"Exactly one of '${commandOpts.bootstrapServerOpt}' or '${commandOpts.zkConnectOpt}' must be provided")
}
val partitionsForPreferredReplicaElection =
if (!commandOpts.options.has(commandOpts.jsonFileOpt))
zkClient.getAllPartitions()
if (commandOpts.options.has(commandOpts.jsonFileOpt))
Some(parsePreferredReplicaElectionData(Utils.readFileAsString(commandOpts.options.valueOf(commandOpts.jsonFileOpt))))
else
parsePreferredReplicaElectionData(Utils.readFileAsString(commandOpts.options.valueOf(commandOpts.jsonFileOpt)))
val preferredReplicaElectionCommand = new PreferredReplicaLeaderElectionCommand(zkClient, partitionsForPreferredReplicaElection)
None
preferredReplicaElectionCommand.moveLeaderToPreferredReplica()
} catch {
case e: Throwable =>
println("Failed to start preferred replica election")
println(Utils.stackTrace(e))
val preferredReplicaElectionCommand = if (commandOpts.options.has(commandOpts.zkConnectOpt)) {
println(s"Warning: --zookeeper is deprecated and will be removed in a future version of Kafka.")
println(s"Use --bootstrap-server instead to specify a broker to connect to.")
new ZkCommand(commandOpts.options.valueOf(commandOpts.zkConnectOpt),
JaasUtils.isZkSecurityEnabled,
timeout)
} else {
val adminProps = if (commandOpts.options.has(commandOpts.adminClientConfigOpt))
Utils.loadProps(commandOpts.options.valueOf(commandOpts.adminClientConfigOpt))
else
new Properties()
adminProps.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, commandOpts.options.valueOf(commandOpts.bootstrapServerOpt))
adminProps.setProperty(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, timeout.toString)
new AdminClientCommand(adminProps)
}
try {
preferredReplicaElectionCommand.electPreferredLeaders(partitionsForPreferredReplicaElection)
} finally {
if (zkClient != null)
zkClient.close()
preferredReplicaElectionCommand.close()
}
}
@ -101,14 +123,165 @@ object PreferredReplicaLeaderElectionCommand extends Logging {
.withRequiredArg
.describedAs("list of partitions for which preferred replica leader election needs to be triggered")
.ofType(classOf[String])
val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the " +
"form host:port. Multiple URLS can be given to allow fail-over.")
private val zookeeperOptBuilder: OptionSpecBuilder = parser.accepts("zookeeper",
"DEPRECATED. The connection string for the zookeeper connection in the " +
"form host:port. Multiple URLS can be given to allow fail-over. " +
"Replaced by --bootstrap-server, REQUIRED unless --bootstrap-server is given.")
private val bootstrapOptBuilder: OptionSpecBuilder = parser.accepts("bootstrap-server",
"A hostname and port for the broker to connect to, " +
"in the form host:port. Multiple comma-separated URLs can be given. REQUIRED unless --zookeeper is given.")
parser.mutuallyExclusive(zookeeperOptBuilder, bootstrapOptBuilder)
val bootstrapServerOpt = bootstrapOptBuilder
.withRequiredArg
.describedAs("host:port")
.ofType(classOf[String])
val zkConnectOpt = zookeeperOptBuilder
.withRequiredArg
.describedAs("urls")
.ofType(classOf[String])
val adminClientConfigOpt = parser.accepts("admin.config",
"Admin client config properties file to pass to the admin client when --bootstrap-server is given.")
.availableIf(bootstrapServerOpt)
.withRequiredArg
.describedAs("config file")
.ofType(classOf[String])
parser.accepts("")
options = parser.parse(args: _*)
}
/** Abstraction over different ways to perform a leader election */
trait Command {
/** Elect the preferred leader for the given {@code partitionsForElection}.
* If the given {@code partitionsForElection} are None then elect the preferred leader for all partitions.
*/
def electPreferredLeaders(partitionsForElection: Option[Set[TopicPartition]]) : Unit
def close() : Unit
}
class ZkCommand(zkConnect: String, isSecure: Boolean, timeout: Int)
extends Command {
var zkClient: KafkaZkClient = null
val time = Time.SYSTEM
zkClient = KafkaZkClient(zkConnect, isSecure, timeout, timeout, Int.MaxValue, time)
override def electPreferredLeaders(partitionsFromUser: Option[Set[TopicPartition]]) {
try {
val topics =
partitionsFromUser match {
case Some(partitions) =>
partitions.map(_.topic).toSet
case None =>
zkClient.getAllPartitions().map(_.topic)
}
val partitionsFromZk = zkClient.getPartitionsForTopics(topics).flatMap{ case (topic, partitions) =>
partitions.map(new TopicPartition(topic, _))
}.toSet
val (validPartitions, invalidPartitions) =
partitionsFromUser match {
case Some(partitions) =>
partitions.partition(partitionsFromZk.contains)
case None =>
(zkClient.getAllPartitions(), Set.empty)
}
PreferredReplicaLeaderElectionCommand.writePreferredReplicaElectionData(zkClient, validPartitions)
println("Successfully started preferred replica election for partitions %s".format(validPartitions))
invalidPartitions.foreach(p => println("Skipping preferred replica leader election for partition %s since it doesn't exist.".format(p)))
} catch {
case e: Throwable => throw new AdminCommandFailedException("Admin command failed", e)
}
}
override def close(): Unit = {
if (zkClient != null)
zkClient.close()
}
}
/** Election via AdminClient.electPreferredLeaders() */
class AdminClientCommand(adminClientProps: Properties)
extends Command with Logging {
val adminClient = org.apache.kafka.clients.admin.AdminClient.create(adminClientProps)
/**
* Wait until the given future has completed, then return whether it completed exceptionally.
* Because KafkaFuture.isCompletedExceptionally doesn't wait for a result
*/
private def completedExceptionally[T](future: KafkaFuture[T]): Boolean = {
try {
future.get()
false
} catch {
case (_: Throwable) =>
true
}
}
override def electPreferredLeaders(partitionsFromUser: Option[Set[TopicPartition]]): Unit = {
val partitions = partitionsFromUser match {
case Some(partitionsFromUser) => partitionsFromUser.asJava
case None => null
}
debug(s"Calling AdminClient.electPreferredLeaders($partitions)")
val result = adminClient.electPreferredLeaders(partitions)
// wait for all results
val attemptedPartitions = partitionsFromUser match {
case Some(partitionsFromUser) => partitions.asScala
case None => try {
result.partitions().get.asScala
} catch {
case e: ExecutionException =>
val cause = e.getCause
if (cause.isInstanceOf[TimeoutException]) {
// We timed out, or don't even know the attempted partitions
println("Timeout waiting for election results")
}
throw new AdminCommandFailedException(null, cause)
case e: Throwable =>
// We don't even know the attempted partitions
println("Error while making request")
e.printStackTrace()
return
}
}
val (exceptional, ok) = attemptedPartitions.map(tp => tp -> result.partitionResult(tp)).
partition { case (_, partitionResult) => completedExceptionally(partitionResult) }
if (!ok.isEmpty) {
println(s"Successfully completed preferred replica election for partitions ${ok.map{ case (tp, future) => tp }.mkString(", ")}")
}
if (!exceptional.isEmpty) {
val adminException = new AdminCommandFailedException(
s"${exceptional.size} preferred replica(s) could not be elected")
for ((partition, void) <- exceptional) {
val exception = try {
void.get()
new AdminCommandFailedException("Exceptional future with no exception")
} catch {
case e: ExecutionException => e.getCause
}
println(s"Error completing preferred replica election for partition $partition: $exception")
adminException.addSuppressed(exception)
}
throw adminException
}
}
override def close(): Unit = {
debug("Closing AdminClient")
adminClient.close()
}
}
}
class PreferredReplicaLeaderElectionCommand(zkClient: KafkaZkClient, partitionsFromUser: scala.collection.Set[TopicPartition]) {

View File

@ -28,6 +28,7 @@ import org.apache.kafka.common.errors.ControllerMovedException
import org.apache.kafka.common.utils.Time
import scala.collection._
import scala.collection.JavaConverters._
object ControllerEventManager {
val ControllerEventThreadName = "controller-event-thread"
@ -69,6 +70,10 @@ class ControllerEventManager(controllerId: Int, rateAndTimeMetrics: Map[Controll
}
def clearAndPut(event: ControllerEvent): Unit = inLock(putLock) {
queue.asScala.foreach(evt =>
if (evt.isInstanceOf[PreemptableControllerEvent])
evt.asInstanceOf[PreemptableControllerEvent].preempt()
)
queue.clear()
put(event)
}

View File

@ -16,6 +16,7 @@
*/
package kafka.controller
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.{CountDownLatch, TimeUnit}
import com.yammer.metrics.core.Gauge
@ -32,13 +33,13 @@ import org.apache.kafka.common.{KafkaException, TopicPartition}
import org.apache.kafka.common.errors.{BrokerNotAvailableException, ControllerMovedException, StaleBrokerEpochException}
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.{AbstractControlRequest, AbstractResponse, LeaderAndIsrResponse, StopReplicaResponse}
import org.apache.kafka.common.requests.{AbstractControlRequest, AbstractResponse, ApiError, LeaderAndIsrResponse, StopReplicaResponse}
import org.apache.kafka.common.utils.Time
import org.apache.zookeeper.KeeperException
import org.apache.zookeeper.KeeperException.Code
import scala.collection._
import scala.util.Try
import scala.util.{Failure, Try}
object KafkaController extends Logging {
val InitialControllerEpoch = 0
@ -268,7 +269,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
maybeTriggerPartitionReassignment(controllerContext.partitionsBeingReassigned.keySet)
topicDeletionManager.tryTopicDeletion()
val pendingPreferredReplicaElections = fetchPendingPreferredReplicaElections()
onPreferredReplicaElection(pendingPreferredReplicaElections)
onPreferredReplicaElection(pendingPreferredReplicaElections, ZkTriggered)
info("Starting the controller scheduler")
kafkaScheduler.startup()
if (config.autoLeaderRebalanceEnable) {
@ -586,7 +587,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
val partitionsToBeRemovedFromReassignment = scala.collection.mutable.Set.empty[TopicPartition]
topicPartitions.foreach { tp =>
if (topicDeletionManager.isTopicQueuedUpForDeletion(tp.topic)) {
error(s"Skipping reassignment of $tp since the topic is currently being deleted")
info(s"Skipping reassignment of $tp since the topic is currently being deleted")
partitionsToBeRemovedFromReassignment.add(tp)
} else {
val reassignedPartitionContext = controllerContext.partitionsBeingReassigned.get(tp).getOrElse {
@ -628,17 +629,38 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
removePartitionsFromReassignedPartitions(partitionsToBeRemovedFromReassignment)
}
private def onPreferredReplicaElection(partitions: Set[TopicPartition], isTriggeredByAutoRebalance: Boolean = false) {
sealed trait ElectionType
object AutoTriggered extends ElectionType
object ZkTriggered extends ElectionType
object AdminClientTriggered extends ElectionType
/**
* Attempt to elect the preferred replica as leader for each of the given partitions.
* @param partitions The partitions to have their preferred leader elected
* @param electionType The election type
* @return A map of failed elections where keys are partitions which had an error and the corresponding value is
* the exception that was thrown.
*/
private def onPreferredReplicaElection(partitions: Set[TopicPartition],
electionType: ElectionType): Map[TopicPartition, Throwable] = {
info(s"Starting preferred replica leader election for partitions ${partitions.mkString(",")}")
try {
partitionStateMachine.handleStateChanges(partitions.toSeq, OnlinePartition, Option(PreferredReplicaPartitionLeaderElectionStrategy))
} catch {
case e: ControllerMovedException =>
error(s"Error completing preferred replica leader election for partitions ${partitions.mkString(",")} because controller has moved to another broker.", e)
throw e
case e: Throwable => error(s"Error completing preferred replica leader election for partitions ${partitions.mkString(",")}", e)
val results = partitionStateMachine.handleStateChanges(partitions.toSeq, OnlinePartition,
Option(PreferredReplicaPartitionLeaderElectionStrategy))
if (electionType != AdminClientTriggered) {
results.foreach { case (tp, throwable) =>
if (throwable.isInstanceOf[ControllerMovedException]) {
error(s"Error completing preferred replica leader election for partition $tp because controller has moved to another broker.", throwable)
throw throwable
} else {
error(s"Error completing preferred replica leader election for partition $tp", throwable)
}
}
}
return results;
} finally {
removePartitionsFromPreferredReplicaElection(partitions, isTriggeredByAutoRebalance)
if (electionType != AdminClientTriggered)
removePartitionsFromPreferredReplicaElection(partitions, electionType == AutoTriggered)
}
}
@ -884,7 +906,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
if (!isTriggeredByAutoRebalance) {
zkClient.deletePreferredReplicaElection(controllerContext.epochZkVersion)
// Ensure we detect future preferred replica leader elections
eventManager.put(PreferredReplicaLeaderElection)
eventManager.put(PreferredReplicaLeaderElection(None))
}
}
@ -983,7 +1005,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
controllerContext.partitionsBeingReassigned.isEmpty &&
!topicDeletionManager.isTopicQueuedUpForDeletion(tp.topic) &&
controllerContext.allTopics.contains(tp.topic))
onPreferredReplicaElection(candidatePartitions.toSet, isTriggeredByAutoRebalance = true)
onPreferredReplicaElection(candidatePartitions.toSet, AutoTriggered)
}
}
}
@ -1022,11 +1044,15 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
}
}
case class ControlledShutdown(id: Int, brokerEpoch: Long, controlledShutdownCallback: Try[Set[TopicPartition]] => Unit) extends ControllerEvent {
case class ControlledShutdown(id: Int, brokerEpoch: Long, controlledShutdownCallback: Try[Set[TopicPartition]] => Unit) extends PreemptableControllerEvent {
def state = ControllerState.ControlledShutdown
override def process(): Unit = {
override def handlePreempt(): Unit = {
controlledShutdownCallback(Failure(new ControllerMovedException("Controller moved to another broker")))
}
override def handleProcess(): Unit = {
val controlledShutdownResult = Try { doControlledShutdown(id) }
controlledShutdownCallback(controlledShutdownResult)
}
@ -1517,22 +1543,75 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
}
}
case object PreferredReplicaLeaderElection extends ControllerEvent {
type ElectPreferredLeadersCallback = (Map[TopicPartition, Int], Map[TopicPartition, ApiError])=>Unit
def electPreferredLeaders(partitions: Set[TopicPartition], callback: ElectPreferredLeadersCallback = { (_,_) => }): Unit =
eventManager.put(PreferredReplicaLeaderElection(Some(partitions), AdminClientTriggered, callback))
case class PreferredReplicaLeaderElection(partitionsFromAdminClientOpt: Option[Set[TopicPartition]],
electionType: ElectionType = ZkTriggered,
callback: ElectPreferredLeadersCallback = (_,_) =>{}) extends PreemptableControllerEvent {
override def state: ControllerState = ControllerState.ManualLeaderBalance
override def process(): Unit = {
if (!isActive) return
override def handlePreempt(): Unit = {
callback(Map.empty, partitionsFromAdminClientOpt match {
case Some(partitions) => partitions.map(partition => partition -> new ApiError(Errors.NOT_CONTROLLER, null)).toMap
case None => Map.empty
})
}
override def handleProcess(): Unit = {
if (!isActive) {
callback(Map.empty, partitionsFromAdminClientOpt match {
case Some(partitions) => partitions.map(partition => partition -> new ApiError(Errors.NOT_CONTROLLER, null)).toMap
case None => Map.empty
})
} else {
// We need to register the watcher if the path doesn't exist in order to detect future preferred replica
// leader elections and we get the `path exists` check for free
if (zkClient.registerZNodeChangeHandlerAndCheckExistence(preferredReplicaElectionHandler)) {
val partitions = zkClient.getPreferredReplicaElection
val partitionsForTopicsToBeDeleted = partitions.filter(p => topicDeletionManager.isTopicQueuedUpForDeletion(p.topic))
if (partitionsForTopicsToBeDeleted.nonEmpty) {
error(s"Skipping preferred replica election for partitions $partitionsForTopicsToBeDeleted since the " +
"respective topics are being deleted")
if (electionType == AdminClientTriggered || zkClient.registerZNodeChangeHandlerAndCheckExistence(preferredReplicaElectionHandler)) {
val partitions = partitionsFromAdminClientOpt match {
case Some(partitions) => partitions
case None => zkClient.getPreferredReplicaElection
}
val (validPartitions, invalidPartitions) = partitions.partition(tp => controllerContext.allPartitions.contains(tp))
invalidPartitions.foreach { p =>
info(s"Skipping preferred replica leader election for partition ${p} since it doesn't exist.")
}
val (partitionsBeingDeleted, livePartitions) = validPartitions.partition(partition =>
topicDeletionManager.isTopicQueuedUpForDeletion(partition.topic))
if (partitionsBeingDeleted.nonEmpty) {
warn(s"Skipping preferred replica election for partitions $partitionsBeingDeleted " +
s"since the respective topics are being deleted")
}
// partition those where preferred is already leader
val (electablePartitions, alreadyPreferred) = livePartitions.partition { partition =>
val assignedReplicas = controllerContext.partitionReplicaAssignment(partition)
val preferredReplica = assignedReplicas.head
val currentLeader = controllerContext.partitionLeadershipInfo(partition).leaderAndIsr.leader
currentLeader != preferredReplica
}
val electionErrors = onPreferredReplicaElection(electablePartitions, electionType)
val successfulPartitions = electablePartitions -- electionErrors.keySet
val results = electionErrors.map { case (partition, ex) =>
val apiError = if (ex.isInstanceOf[StateChangeFailedException])
new ApiError(Errors.PREFERRED_LEADER_NOT_AVAILABLE, ex.getMessage)
else
ApiError.fromThrowable(ex)
partition -> apiError
} ++
alreadyPreferred.map(_ -> ApiError.NONE) ++
partitionsBeingDeleted.map(_ -> new ApiError(Errors.INVALID_TOPIC_EXCEPTION, "The topic is being deleted")) ++
invalidPartitions.map ( tp => tp -> new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION, s"The partition does not exist.")
)
debug(s"PreferredReplicaLeaderElection waiting: $successfulPartitions, results: $results")
callback(successfulPartitions.map(
tp => tp->controllerContext.partitionReplicaAssignment(tp).head).toMap,
results)
}
onPreferredReplicaElection(partitions -- partitionsForTopicsToBeDeleted)
}
}
}
@ -1653,7 +1732,7 @@ object IsrChangeNotificationHandler {
class PreferredReplicaElectionHandler(controller: KafkaController, eventManager: ControllerEventManager) extends ZNodeChangeHandler {
override val path: String = PreferredReplicaElectionZNode.path
override def handleCreation(): Unit = eventManager.put(controller.PreferredReplicaLeaderElection)
override def handleCreation(): Unit = eventManager.put(controller.PreferredReplicaLeaderElection(None))
}
class ControllerChangeHandler(controller: KafkaController, eventManager: ControllerEventManager) extends ZNodeChangeHandler {
@ -1712,3 +1791,25 @@ sealed trait ControllerEvent {
def state: ControllerState
def process(): Unit
}
/**
* A `ControllerEvent`, such as one with a client callback, which needs specific handling in the event of ZK session expiration.
*/
sealed trait PreemptableControllerEvent extends ControllerEvent {
val spent = new AtomicBoolean(false)
final def preempt(): Unit = {
if (!spent.getAndSet(true))
handlePreempt()
}
final def process(): Unit = {
if (!spent.getAndSet(true))
handleProcess()
}
def handlePreempt(): Unit
def handleProcess(): Unit
}

View File

@ -125,22 +125,36 @@ class PartitionStateMachine(config: KafkaConfig,
// It is important to trigger leader election for those partitions.
}
/**
* Try to change the state of the given partitions to the given targetState, using the given
* partitionLeaderElectionStrategyOpt if a leader election is required.
* @param partitions The partitions
* @param targetState The state
* @param partitionLeaderElectionStrategyOpt The leader election strategy if a leader election is required.
* @return partitions and corresponding throwable for those partitions which could not transition to the given state
*/
def handleStateChanges(partitions: Seq[TopicPartition], targetState: PartitionState,
partitionLeaderElectionStrategyOpt: Option[PartitionLeaderElectionStrategy] = None): Unit = {
partitionLeaderElectionStrategyOpt: Option[PartitionLeaderElectionStrategy] = None): Map[TopicPartition, Throwable] = {
if (partitions.nonEmpty) {
try {
controllerBrokerRequestBatch.newBatch()
doHandleStateChanges(partitions, targetState, partitionLeaderElectionStrategyOpt)
val errors = doHandleStateChanges(partitions, targetState, partitionLeaderElectionStrategyOpt)
controllerBrokerRequestBatch.sendRequestsToBrokers(controllerContext.epoch)
errors
} catch {
case e: ControllerMovedException =>
error(s"Controller moved to another broker when moving some partitions to $targetState state", e)
throw e
case e: Throwable => error(s"Error while moving some partitions to $targetState state", e)
}
case e: Throwable =>
error(s"Error while moving some partitions to $targetState state", e)
partitions.map { _ -> e }.toMap
}
} else {
Map.empty[TopicPartition, Throwable]
}
}
def partitionsInState(state: PartitionState): Set[TopicPartition] = {
partitionState.filter { case (_, s) => s == state }.keySet.toSet
}
@ -183,7 +197,7 @@ class PartitionStateMachine(config: KafkaConfig,
* @param targetState The end state that the partition should be moved to
*/
private def doHandleStateChanges(partitions: Seq[TopicPartition], targetState: PartitionState,
partitionLeaderElectionStrategyOpt: Option[PartitionLeaderElectionStrategy]): Unit = {
partitionLeaderElectionStrategyOpt: Option[PartitionLeaderElectionStrategy]): Map[TopicPartition, Throwable] = {
val stateChangeLog = stateChangeLogger.withControllerEpoch(controllerContext.epoch)
partitions.foreach(partition => partitionState.getOrElseUpdate(partition, NonExistentPartition))
val (validPartitions, invalidPartitions) = partitions.partition(partition => isValidTransition(partition, targetState))
@ -195,6 +209,7 @@ class PartitionStateMachine(config: KafkaConfig,
s"assigned replicas ${controllerContext.partitionReplicaAssignment(partition).mkString(",")}")
changeStateTo(partition, partitionState(partition), NewPartition)
}
Map.empty
case OnlinePartition =>
val uninitializedPartitions = validPartitions.filter(partition => partitionState(partition) == NewPartition)
val partitionsToElectLeader = validPartitions.filter(partition => partitionState(partition) == OfflinePartition || partitionState(partition) == OnlinePartition)
@ -207,23 +222,28 @@ class PartitionStateMachine(config: KafkaConfig,
}
}
if (partitionsToElectLeader.nonEmpty) {
val successfulElections = electLeaderForPartitions(partitionsToElectLeader, partitionLeaderElectionStrategyOpt.get)
val (successfulElections, failedElections) = electLeaderForPartitions(partitionsToElectLeader, partitionLeaderElectionStrategyOpt.get)
successfulElections.foreach { partition =>
stateChangeLog.trace(s"Changed partition $partition from ${partitionState(partition)} to $targetState with state " +
s"${controllerContext.partitionLeadershipInfo(partition).leaderAndIsr}")
changeStateTo(partition, partitionState(partition), OnlinePartition)
}
failedElections
} else {
Map.empty
}
case OfflinePartition =>
validPartitions.foreach { partition =>
stateChangeLog.trace(s"Changed partition $partition state from ${partitionState(partition)} to $targetState")
changeStateTo(partition, partitionState(partition), OfflinePartition)
}
Map.empty
case NonExistentPartition =>
validPartitions.foreach { partition =>
stateChangeLog.trace(s"Changed partition $partition state from ${partitionState(partition)} to $targetState")
changeStateTo(partition, partitionState(partition), NonExistentPartition)
}
Map.empty
}
}
@ -283,11 +303,14 @@ class PartitionStateMachine(config: KafkaConfig,
* Repeatedly attempt to elect leaders for multiple partitions until there are no more remaining partitions to retry.
* @param partitions The partitions that we're trying to elect leaders for.
* @param partitionLeaderElectionStrategy The election strategy to use.
* @return The partitions that successfully had a leader elected.
* @return A pair with first element of which is the partitions that successfully had a leader elected
* and the second element a map of failed partition to the corresponding thrown exception.
*/
private def electLeaderForPartitions(partitions: Seq[TopicPartition], partitionLeaderElectionStrategy: PartitionLeaderElectionStrategy): Seq[TopicPartition] = {
private def electLeaderForPartitions(partitions: Seq[TopicPartition],
partitionLeaderElectionStrategy: PartitionLeaderElectionStrategy): (Seq[TopicPartition], Map[TopicPartition, Throwable]) = {
val successfulElections = mutable.Buffer.empty[TopicPartition]
var remaining = partitions
var failures = Map.empty[TopicPartition, Throwable]
while (remaining.nonEmpty) {
val (success, updatesToRetry, failedElections) = doElectLeaderForPartitions(partitions, partitionLeaderElectionStrategy)
remaining = updatesToRetry
@ -295,8 +318,9 @@ class PartitionStateMachine(config: KafkaConfig,
failedElections.foreach { case (partition, e) =>
logFailedStateChange(partition, partitionState(partition), OnlinePartition, e)
}
failures ++= failedElections
}
successfulElections
(successfulElections, failures)
}
/**

View File

@ -0,0 +1,89 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.ApiError
import scala.collection.{Map, mutable}
/** A delayed elect preferred leader operation that can be created by the replica manager and watched
* in the elect preferred leader purgatory
*/
class DelayedElectPreferredLeader(delayMs: Long,
expectedLeaders: Map[TopicPartition, Int],
results: Map[TopicPartition, ApiError],
replicaManager: ReplicaManager,
responseCallback: Map[TopicPartition, ApiError] => Unit)
extends DelayedOperation(delayMs) {
var waitingPartitions = expectedLeaders
val fullResults = results.to[mutable.Set]
/**
* Call-back to execute when a delayed operation gets expired and hence forced to complete.
*/
override def onExpiration(): Unit = {}
/**
* Process for completing an operation; This function needs to be defined
* in subclasses and will be called exactly once in forceComplete()
*/
override def onComplete(): Unit = {
// This could be called to force complete, so I need the full list of partitions, so I can time them all out.
updateWaiting()
val timedout = waitingPartitions.map{
case (tp, leader) => tp -> new ApiError(Errors.REQUEST_TIMED_OUT, null)
}.toMap
responseCallback(timedout ++ fullResults)
}
private def timeoutWaiting = {
waitingPartitions.map(partition => partition -> new ApiError(Errors.REQUEST_TIMED_OUT, null)).toMap
}
/**
* Try to complete the delayed operation by first checking if the operation
* can be completed by now. If yes execute the completion logic by calling
* forceComplete() and return true iff forceComplete returns true; otherwise return false
*
* This function needs to be defined in subclasses
*/
override def tryComplete(): Boolean = {
updateWaiting()
debug(s"tryComplete() waitingPartitions: $waitingPartitions")
waitingPartitions.isEmpty && forceComplete()
}
private def updateWaiting() = {
waitingPartitions.foreach{case (tp, leader) =>
val ps = replicaManager.metadataCache.getPartitionInfo(tp.topic, tp.partition)
ps match {
case Some(ps) =>
if (leader == ps.basePartitionState.leader) {
waitingPartitions -= tp
fullResults += tp -> ApiError.NONE
}
case None =>
}
}
}
}

View File

@ -43,6 +43,7 @@ import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.errors._
import org.apache.kafka.common.internals.FatalExitError
import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, TRANSACTION_STATE_TOPIC_NAME, isInternal}
import org.apache.kafka.common.message.ElectPreferredLeadersResponseData
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.{ListenerName, Send}
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
@ -146,6 +147,7 @@ class KafkaApis(val requestChannel: RequestChannel,
case ApiKeys.EXPIRE_DELEGATION_TOKEN => handleExpireTokenRequest(request)
case ApiKeys.DESCRIBE_DELEGATION_TOKEN => handleDescribeTokensRequest(request)
case ApiKeys.DELETE_GROUPS => handleDeleteGroupsRequest(request)
case ApiKeys.ELECT_PREFERRED_LEADERS => handleElectPreferredReplicaLeader(request)
}
} catch {
case e: FatalExitError => throw e
@ -253,6 +255,11 @@ class KafkaApis(val requestChannel: RequestChannel,
quotas.request.updateQuotaMetricConfigs()
}
}
if (replicaManager.hasDelayedElectionOperations) {
updateMetadataRequest.partitionStates.asScala.foreach { case (tp, ps) =>
replicaManager.tryCompleteElection(new TopicPartitionOperationKey(tp.topic(), tp.partition()))
}
}
sendResponseExemptThrottle(request, new UpdateMetadataResponse(Errors.NONE))
}
}
@ -2227,6 +2234,48 @@ class KafkaApis(val requestChannel: RequestChannel,
true
}
def handleElectPreferredReplicaLeader(request: RequestChannel.Request): Unit = {
val electionRequest = request.body[ElectPreferredLeadersRequest]
val partitions =
if (electionRequest.data().topicPartitions() == null) {
metadataCache.getAllPartitions()
} else {
electionRequest.data().topicPartitions().asScala.flatMap{tp =>
tp.partitionId().asScala.map(partitionId => new TopicPartition(tp.topic, partitionId))}.toSet
}
def sendResponseCallback(result: Map[TopicPartition, ApiError]): Unit = {
sendResponseMaybeThrottle(request, requestThrottleMs => {
val results = result.
groupBy{case (tp, error) => tp.topic}.
map{case (topic, ps) => new ElectPreferredLeadersResponseData.ReplicaElectionResult()
.setTopic(topic)
.setPartitionResult(ps.map{
case (tp, error) =>
new ElectPreferredLeadersResponseData.PartitionResult()
.setErrorCode(error.error.code)
.setErrorMessage(error.message())
.setPartitionId(tp.partition)}.toList.asJava)}
val data = new ElectPreferredLeadersResponseData()
.setThrottleTimeMs(requestThrottleMs)
.setReplicaElectionResults(results.toList.asJava)
new ElectPreferredLeadersResponse(data)})
}
if (!authorize(request.session, Alter, Resource.ClusterResource)) {
val error = new ApiError(Errors.CLUSTER_AUTHORIZATION_FAILED, null);
val partitionErrors =
if (electionRequest.data().topicPartitions() == null) {
// Don't leak the set of partitions if the client lack authz
Map.empty[TopicPartition, ApiError]
} else {
partitions.map(partition => partition -> error).toMap
}
sendResponseCallback(partitionErrors)
} else {
replicaManager.electPreferredLeaders(controller, partitions, sendResponseCallback, electionRequest.data().timeoutMs())
}
}
def authorizeClusterAction(request: RequestChannel.Request): Unit = {
if (!isAuthorizedClusterAction(request))
throw new ClusterAuthorizationException(s"Request $request is not authorized.")

View File

@ -137,6 +137,12 @@ class MetadataCache(brokerId: Int) extends Logging {
getAllTopics(metadataSnapshot)
}
def getAllPartitions(): Set[TopicPartition] = {
metadataSnapshot.partitionStates.flatMap { case (topicName, partitionsAndStates) =>
partitionsAndStates.keys.map(partitionId => new TopicPartition(topicName, partitionId.toInt))
}.toSet
}
private def getAllTopics(snapshot: MetadataSnapshot): Set[String] = {
snapshot.partitionStates.keySet
}

View File

@ -146,6 +146,7 @@ class ReplicaManager(val config: KafkaConfig,
val delayedProducePurgatory: DelayedOperationPurgatory[DelayedProduce],
val delayedFetchPurgatory: DelayedOperationPurgatory[DelayedFetch],
val delayedDeleteRecordsPurgatory: DelayedOperationPurgatory[DelayedDeleteRecords],
val delayedElectPreferredLeaderPurgatory: DelayedOperationPurgatory[DelayedElectPreferredLeader],
threadNamePrefix: Option[String]) extends Logging with KafkaMetricsGroup {
def this(config: KafkaConfig,
@ -171,6 +172,8 @@ class ReplicaManager(val config: KafkaConfig,
DelayedOperationPurgatory[DelayedDeleteRecords](
purgatoryName = "DeleteRecords", brokerId = config.brokerId,
purgeInterval = config.deleteRecordsPurgatoryPurgeIntervalRequests),
DelayedOperationPurgatory[DelayedElectPreferredLeader](
purgatoryName = "ElectPreferredLeader", brokerId = config.brokerId),
threadNamePrefix)
}
@ -318,6 +321,13 @@ class ReplicaManager(val config: KafkaConfig,
debug("Request key %s unblocked %d DeleteRecordsRequest.".format(key.keyLabel, completed))
}
def hasDelayedElectionOperations = delayedElectPreferredLeaderPurgatory.delayed != 0
def tryCompleteElection(key: DelayedOperationKey): Unit = {
val completed = delayedElectPreferredLeaderPurgatory.checkAndComplete(key)
debug("Request key %s unblocked %d ElectPreferredLeader.".format(key.keyLabel, completed))
}
def startup() {
// start ISR expiration thread
// A follower can lag behind leader for up to config.replicaLagTimeMaxMs x 1.5 before it is removed from ISR
@ -1476,6 +1486,7 @@ class ReplicaManager(val config: KafkaConfig,
delayedFetchPurgatory.shutdown()
delayedProducePurgatory.shutdown()
delayedDeleteRecordsPurgatory.shutdown()
delayedElectPreferredLeaderPurgatory.shutdown()
if (checkpointHW)
checkpointHighWatermarks()
info("Shut down completely")
@ -1508,4 +1519,30 @@ class ReplicaManager(val config: KafkaConfig,
tp -> epochEndOffset
}
}
def electPreferredLeaders(controller: KafkaController,
partitions: Set[TopicPartition],
responseCallback: Map[TopicPartition, ApiError] => Unit,
requestTimeout: Long): Unit = {
val deadline = time.milliseconds() + requestTimeout
def electionCallback(expectedLeaders: Map[TopicPartition, Int],
results: Map[TopicPartition, ApiError]): Unit = {
if (expectedLeaders.nonEmpty) {
val watchKeys = expectedLeaders.map{
case (tp, leader) => new TopicPartitionOperationKey(tp.topic, tp.partition)
}.toSeq
delayedElectPreferredLeaderPurgatory.tryCompleteElseWatch(
new DelayedElectPreferredLeader(deadline - time.milliseconds(), expectedLeaders, results,
this, responseCallback),
watchKeys)
} else {
// There are no partitions actually being elected, so return immediately
responseCallback(results)
}
}
controller.electPreferredLeaders(partitions, electionCallback)
}
}

View File

@ -47,13 +47,14 @@ import org.junit.Assert._
import scala.util.Random
import scala.collection.JavaConverters._
import java.lang.{Long => JLong}
import kafka.zk.KafkaZkClient
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Future}
import java.lang.{Long => JLong}
/**
* An integration test of the KafkaAdminClient.
*
@ -99,6 +100,8 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
config.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp, s"${listenerName.value}:${securityProtocol.name}")
config.setProperty(KafkaConfig.DeleteTopicEnableProp, "true")
config.setProperty(KafkaConfig.GroupInitialRebalanceDelayMsProp, "0")
config.setProperty(KafkaConfig.AutoLeaderRebalanceEnableProp, "false")
config.setProperty(KafkaConfig.ControlledShutdownEnableProp, "false")
// We set this in order to test that we don't expose sensitive data via describe configs. This will already be
// set for subclasses with security enabled and we don't want to overwrite it.
if (!config.containsKey(KafkaConfig.SslTruststorePasswordProp))
@ -1198,6 +1201,204 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
}
}
@Test
def testElectPreferredLeaders(): Unit = {
client = AdminClient.create(createConfig)
val prefer0 = Seq(0, 1, 2)
val prefer1 = Seq(1, 2, 0)
val prefer2 = Seq(2, 0, 1)
val partition1 = new TopicPartition("elect-preferred-leaders-topic-1", 0)
TestUtils.createTopic(zkClient, partition1.topic, Map[Int, Seq[Int]](partition1.partition -> prefer0), servers)
val partition2 = new TopicPartition("elect-preferred-leaders-topic-2", 0)
TestUtils.createTopic(zkClient, partition2.topic, Map[Int, Seq[Int]](partition2.partition -> prefer0), servers)
def currentLeader(topicPartition: TopicPartition) =
client.describeTopics(asList(topicPartition.topic)).values.get(topicPartition.topic).
get.partitions.get(topicPartition.partition).leader.id
def preferredLeader(topicPartition: TopicPartition) =
client.describeTopics(asList(topicPartition.topic)).values.get(topicPartition.topic).
get.partitions.get(topicPartition.partition).replicas.get(0).id
def waitForLeaderToBecome(topicPartition: TopicPartition, leader: Int) =
TestUtils.waitUntilTrue(() => currentLeader(topicPartition) == leader, s"Expected leader to become $leader", 10000)
/** Changes the <i>preferred</i> leader without changing the <i>current</i> leader. */
def changePreferredLeader(newAssignment: Seq[Int]) = {
val preferred = newAssignment.head
val prior1 = currentLeader(partition1)
val prior2 = currentLeader(partition2)
var m = Map.empty[TopicPartition, Seq[Int]]
if (prior1 != preferred)
m += partition1 -> newAssignment
if (prior2 != preferred)
m += partition2 -> newAssignment
zkClient.createPartitionReassignment(m)
TestUtils.waitUntilTrue(
() => preferredLeader(partition1) == preferred && preferredLeader(partition2) == preferred,
s"Expected preferred leader to become $preferred, but is ${preferredLeader(partition1)} and ${preferredLeader(partition2)}", 10000)
// Check the leader hasn't moved
assertEquals(prior1, currentLeader(partition1))
assertEquals(prior2, currentLeader(partition2))
}
// Check current leaders are 0
assertEquals(0, currentLeader(partition1))
assertEquals(0, currentLeader(partition2))
// Noop election
var electResult = client.electPreferredLeaders(asList(partition1))
electResult.partitionResult(partition1).get()
assertEquals(0, currentLeader(partition1))
// Noop election with null partitions
electResult = client.electPreferredLeaders(null)
electResult.partitionResult(partition1).get()
assertEquals(0, currentLeader(partition1))
electResult.partitionResult(partition2).get()
assertEquals(0, currentLeader(partition2))
// Now change the preferred leader to 1
changePreferredLeader(prefer1)
// meaningful election
electResult = client.electPreferredLeaders(asList(partition1))
assertEquals(Set(partition1).asJava, electResult.partitions.get)
electResult.partitionResult(partition1).get()
waitForLeaderToBecome(partition1, 1)
// topic 2 unchanged
try {
electResult.partitionResult(partition2).get()
fail("topic 2 wasn't requested")
} catch {
case e: ExecutionException =>
val cause = e.getCause
assertTrue(cause.getClass.getName, cause.isInstanceOf[UnknownTopicOrPartitionException])
assertEquals("Preferred leader election for partition \"elect-preferred-leaders-topic-2-0\" was not attempted",
cause.getMessage)
assertEquals(0, currentLeader(partition2))
}
// meaningful election with null partitions
electResult = client.electPreferredLeaders(null)
assertEquals(Set(partition1, partition2), electResult.partitions.get.asScala.filterNot(_.topic == "__consumer_offsets"))
electResult.partitionResult(partition1).get()
waitForLeaderToBecome(partition1, 1)
electResult.partitionResult(partition2).get()
waitForLeaderToBecome(partition2, 1)
// unknown topic
val unknownPartition = new TopicPartition("topic-does-not-exist", 0)
electResult = client.electPreferredLeaders(asList(unknownPartition))
assertEquals(Set(unknownPartition).asJava, electResult.partitions.get)
try {
electResult.partitionResult(unknownPartition).get()
} catch {
case e: Exception =>
val cause = e.getCause
assertTrue(cause.isInstanceOf[UnknownTopicOrPartitionException])
assertEquals("The partition does not exist.",
cause.getMessage)
assertEquals(1, currentLeader(partition1))
assertEquals(1, currentLeader(partition2))
}
// Now change the preferred leader to 2
changePreferredLeader(prefer2)
// mixed results
electResult = client.electPreferredLeaders(asList(unknownPartition, partition1))
assertEquals(Set(unknownPartition, partition1).asJava, electResult.partitions.get)
waitForLeaderToBecome(partition1, 2)
assertEquals(1, currentLeader(partition2))
try {
electResult.partitionResult(unknownPartition).get()
} catch {
case e: Exception =>
val cause = e.getCause
assertTrue(cause.isInstanceOf[UnknownTopicOrPartitionException])
assertEquals("The partition does not exist.",
cause.getMessage)
}
// dupe partitions
electResult = client.electPreferredLeaders(asList(partition2, partition2))
assertEquals(Set(partition2).asJava, electResult.partitions.get)
electResult.partitionResult(partition2).get()
waitForLeaderToBecome(partition2, 2)
// Now change the preferred leader to 1
changePreferredLeader(prefer1)
// but shut it down...
servers(1).shutdown()
waitUntilTrue (
() => {
val description = client.describeTopics(Set (partition1.topic(), partition2.topic()).asJava).all().get()
return !description.asScala.flatMap{
case (topic, description) => description.partitions().asScala.map(
partition => partition.isr().asScala).flatten
}.exists(node => node.id == 1)
},
"Expect broker 1 to no longer be in any ISR"
)
// ... now what happens if we try to elect the preferred leader and it's down?
val shortTimeout = new ElectPreferredLeadersOptions().timeoutMs(10000)
electResult = client.electPreferredLeaders(asList(partition1), shortTimeout)
assertEquals(Set(partition1).asJava, electResult.partitions.get)
try {
electResult.partitionResult(partition1).get()
fail()
} catch {
case e: Exception =>
val cause = e.getCause
assertTrue(cause.getClass.getName, cause.isInstanceOf[LeaderNotAvailableException])
assertTrue(s"Wrong message ${cause.getMessage}", cause.getMessage.contains(
"Failed to elect leader for partition elect-preferred-leaders-topic-1-0 under strategy PreferredReplicaPartitionLeaderElectionStrategy"))
}
assertEquals(2, currentLeader(partition1))
// preferred leader unavailable with null argument
electResult = client.electPreferredLeaders(null, shortTimeout)
try {
electResult.partitions.get()
fail()
} catch {
case e: Exception =>
val cause = e.getCause
assertTrue(cause.getClass.getName, cause.isInstanceOf[LeaderNotAvailableException])
}
try {
electResult.partitionResult(partition1).get()
fail()
} catch {
case e: Exception =>
val cause = e.getCause
assertTrue(cause.getClass.getName, cause.isInstanceOf[LeaderNotAvailableException])
assertTrue(s"Wrong message ${cause.getMessage}", cause.getMessage.contains(
"Failed to elect leader for partition elect-preferred-leaders-topic-1-0 under strategy PreferredReplicaPartitionLeaderElectionStrategy"))
}
try {
electResult.partitionResult(partition2).get()
fail()
} catch {
case e: Exception =>
val cause = e.getCause
assertTrue(cause.getClass.getName, cause.isInstanceOf[LeaderNotAvailableException])
assertTrue(s"Wrong message ${cause.getMessage}", cause.getMessage.contains(
"Failed to elect leader for partition elect-preferred-leaders-topic-2-0 under strategy PreferredReplicaPartitionLeaderElectionStrategy"))
}
assertEquals(2, currentLeader(partition1))
assertEquals(2, currentLeader(partition2))
}
}
object AdminClientIntegrationTest {

View File

@ -146,7 +146,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
ApiKeys.DESCRIBE_ACLS -> classOf[DescribeAclsResponse],
ApiKeys.ALTER_REPLICA_LOG_DIRS -> classOf[AlterReplicaLogDirsResponse],
ApiKeys.DESCRIBE_LOG_DIRS -> classOf[DescribeLogDirsResponse],
ApiKeys.CREATE_PARTITIONS -> classOf[CreatePartitionsResponse]
ApiKeys.CREATE_PARTITIONS -> classOf[CreatePartitionsResponse],
ApiKeys.ELECT_PREFERRED_LEADERS -> classOf[ElectPreferredLeadersResponse]
)
val requestKeyToError = Map[ApiKeys, Nothing => Errors](
@ -187,7 +188,9 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
ApiKeys.ALTER_REPLICA_LOG_DIRS -> ((resp: AlterReplicaLogDirsResponse) => resp.responses.get(tp)),
ApiKeys.DESCRIBE_LOG_DIRS -> ((resp: DescribeLogDirsResponse) =>
if (resp.logDirInfos.size() > 0) resp.logDirInfos.asScala.head._2.error else Errors.CLUSTER_AUTHORIZATION_FAILED),
ApiKeys.CREATE_PARTITIONS -> ((resp: CreatePartitionsResponse) => resp.errors.asScala.find(_._1 == topic).get._2.error)
ApiKeys.CREATE_PARTITIONS -> ((resp: CreatePartitionsResponse) => resp.errors.asScala.find(_._1 == topic).get._2.error),
ApiKeys.ELECT_PREFERRED_LEADERS -> ((resp: ElectPreferredLeadersResponse) =>
ElectPreferredLeadersRequest.fromResponseData(resp.data()).get(tp).error())
)
val requestKeysToAcls = Map[ApiKeys, Map[Resource, Set[Acl]]](
@ -225,7 +228,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
ApiKeys.DELETE_ACLS -> clusterAlterAcl,
ApiKeys.ALTER_REPLICA_LOG_DIRS -> clusterAlterAcl,
ApiKeys.DESCRIBE_LOG_DIRS -> clusterDescribeAcl,
ApiKeys.CREATE_PARTITIONS -> topicAlterAcl
ApiKeys.CREATE_PARTITIONS -> topicAlterAcl,
ApiKeys.ELECT_PREFERRED_LEADERS -> clusterAlterAcl
)
@Before
@ -382,6 +386,9 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
private def addOffsetsToTxnRequest = new AddOffsetsToTxnRequest.Builder(transactionalId, 1, 1, group).build()
private def electPreferredLeadersRequest = new ElectPreferredLeadersRequest.Builder(
ElectPreferredLeadersRequest.toRequestData(Collections.singleton(tp), 10000)).build()
@Test
def testAuthorizationWithTopicExisting() {
val requestKeyToRequest = mutable.LinkedHashMap[ApiKeys, AbstractRequest](
@ -414,9 +421,9 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
ApiKeys.CREATE_PARTITIONS -> createPartitionsRequest,
ApiKeys.ADD_PARTITIONS_TO_TXN -> addPartitionsToTxnRequest,
ApiKeys.ADD_OFFSETS_TO_TXN -> addOffsetsToTxnRequest,
// Check StopReplica last since some APIs depend on replica availability
ApiKeys.STOP_REPLICA -> stopReplicaRequest
ApiKeys.STOP_REPLICA -> stopReplicaRequest,
ApiKeys.ELECT_PREFERRED_LEADERS -> electPreferredLeadersRequest
)
for ((key, request) <- requestKeyToRequest) {
@ -462,7 +469,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
ApiKeys.ADD_OFFSETS_TO_TXN -> addOffsetsToTxnRequest,
ApiKeys.CREATE_PARTITIONS -> createPartitionsRequest,
ApiKeys.DELETE_GROUPS -> deleteGroupsRequest,
ApiKeys.OFFSET_FOR_LEADER_EPOCH -> offsetsForLeaderEpochRequest
ApiKeys.OFFSET_FOR_LEADER_EPOCH -> offsetsForLeaderEpochRequest,
ApiKeys.ELECT_PREFERRED_LEADERS -> electPreferredLeadersRequest
)
for ((key, request) <- requestKeyToRequest) {

View File

@ -0,0 +1,337 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.admin
import java.io.File
import java.nio.charset.StandardCharsets
import java.nio.file.{Files, Paths}
import java.util.Properties
import kafka.admin.PreferredReplicaLeaderElectionCommand
import kafka.common.{AdminCommandFailedException, TopicAndPartition}
import kafka.network.RequestChannel
import kafka.security.auth._
import kafka.server.{KafkaConfig, KafkaServer}
import kafka.utils.{Logging, TestUtils, ZkUtils}
import kafka.zk.ZooKeeperTestHarness
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.{ClusterAuthorizationException, PreferredLeaderNotAvailableException, TimeoutException, UnknownTopicOrPartitionException}
import org.apache.kafka.common.network.ListenerName
import org.junit.Assert._
import org.junit.{After, Test}
class PreferredReplicaLeaderElectionCommandTest extends ZooKeeperTestHarness with Logging /*with RackAwareTest*/ {
var servers: Seq[KafkaServer] = Seq()
@After
override def tearDown() {
TestUtils.shutdownServers(servers)
super.tearDown()
}
private def createTestTopicAndCluster(topicPartition: Map[TopicPartition, List[Int]],
authorizer: Option[String] = None) {
val brokerConfigs = TestUtils.createBrokerConfigs(3, zkConnect, false)
brokerConfigs.foreach(p => p.setProperty("auto.leader.rebalance.enable", "false"))
authorizer match {
case Some(className) =>
brokerConfigs.foreach(p => p.setProperty("authorizer.class.name", className))
case None =>
}
createTestTopicAndCluster(topicPartition,brokerConfigs)
}
private def createTestTopicAndCluster(partitionsAndAssignments: Map[TopicPartition, List[Int]],
brokerConfigs: Seq[Properties]) {
// create brokers
servers = brokerConfigs.map(b => TestUtils.createServer(KafkaConfig.fromProps(b)))
// create the topic
partitionsAndAssignments.foreach { case (tp, assigment) =>
zkClient.createTopicAssignment(tp.topic(),
Map(tp -> assigment))
}
// wait until replica log is created on every broker
TestUtils.waitUntilTrue(() => servers.forall(server => partitionsAndAssignments.forall(partitionAndAssignment => server.getLogManager().getLog(partitionAndAssignment._1).isDefined)),
"Replicas for topic test not created")
}
/** Bounce the given targetServer and wait for all servers to get metadata for the given partition */
private def bounceServer(targetServer: Int, partition: TopicPartition) {
debug(s"Shutting down server $targetServer so a non-preferred replica becomes leader")
servers(targetServer).shutdown()
debug(s"Starting server $targetServer now that a non-preferred replica is leader")
servers(targetServer).startup()
TestUtils.waitUntilTrue(() => servers.forall { server =>
server.metadataCache.getPartitionInfo(partition.topic(), partition.partition()).exists { partitionState =>
partitionState.basePartitionState.isr.contains(targetServer)
}
},
s"Replicas for partition $partition not created")
}
private def getController() = {
servers.find(p => p.kafkaController.isActive)
}
private def getLeader(topicPartition: TopicPartition) = {
servers(0).metadataCache.getPartitionInfo(topicPartition.topic(), topicPartition.partition()).get.basePartitionState.leader
}
private def bootstrapServer(broker: Int = 0): String = {
val port = servers(broker).socketServer.boundPort(ListenerName.normalised("PLAINTEXT"))
debug("Server bound to port "+port)
s"localhost:$port"
}
val testPartition = new TopicPartition("test", 0)
val testPartitionAssignment = List(1, 2, 0)
val testPartitionPreferredLeader = testPartitionAssignment.head
val testPartitionAndAssignment = Map(testPartition -> testPartitionAssignment)
/** Test the case multiple values are given for --bootstrap-broker */
@Test
def testMultipleBrokersGiven() {
createTestTopicAndCluster(testPartitionAndAssignment)
bounceServer(testPartitionPreferredLeader, testPartition)
// Check the leader for the partition is not the preferred one
assertNotEquals(testPartitionPreferredLeader, getLeader(testPartition))
PreferredReplicaLeaderElectionCommand.run(Array(
"--bootstrap-server", s"${bootstrapServer(1)},${bootstrapServer(0)}"))
// Check the leader for the partition IS the preferred one
assertEquals(testPartitionPreferredLeader, getLeader(testPartition))
}
/** Test the case when an invalid broker is given for --bootstrap-broker */
@Test
def testInvalidBrokerGiven() {
try {
PreferredReplicaLeaderElectionCommand.run(Array(
"--bootstrap-server", "example.com:1234"),
timeout = 1000)
fail()
} catch {
case e: AdminCommandFailedException =>
assertTrue(e.getCause.isInstanceOf[TimeoutException])
}
}
/** Test the case where no partitions are given (=> elect all partitions) */
@Test
def testNoPartitionsGiven() {
createTestTopicAndCluster(testPartitionAndAssignment)
bounceServer(testPartitionPreferredLeader, testPartition)
// Check the leader for the partition is not the preferred one
assertNotEquals(testPartitionPreferredLeader, getLeader(testPartition))
PreferredReplicaLeaderElectionCommand.run(Array(
"--bootstrap-server", bootstrapServer()))
// Check the leader for the partition IS the preferred one
assertEquals(testPartitionPreferredLeader, getLeader(testPartition))
}
private def toJsonFile(partitions: scala.collection.Set[TopicPartition]): File = {
val jsonFile = File.createTempFile("preferredreplicaelection", ".js")
jsonFile.deleteOnExit()
val jsonString = ZkUtils.preferredReplicaLeaderElectionZkData(partitions.map(new TopicAndPartition(_)))
debug("Using json: "+jsonString)
Files.write(Paths.get(jsonFile.getAbsolutePath), jsonString.getBytes(StandardCharsets.UTF_8))
jsonFile
}
/** Test the case where a list of partitions is given */
@Test
def testSingletonPartitionGiven() {
createTestTopicAndCluster(testPartitionAndAssignment)
bounceServer(testPartitionPreferredLeader, testPartition)
// Check the leader for the partition is not the preferred one
assertNotEquals(testPartitionPreferredLeader, getLeader(testPartition))
val jsonFile = toJsonFile(testPartitionAndAssignment.keySet)
try {
PreferredReplicaLeaderElectionCommand.run(Array(
"--bootstrap-server", bootstrapServer(),
"--path-to-json-file", jsonFile.getAbsolutePath))
} finally {
jsonFile.delete()
}
// Check the leader for the partition IS the preferred one
assertEquals(testPartitionPreferredLeader, getLeader(testPartition))
}
/** Test the case where a topic does not exist */
@Test
def testTopicDoesNotExist() {
val nonExistentPartition = new TopicPartition("does.not.exist", 0)
val nonExistentPartitionAssignment = List(1, 2, 0)
val nonExistentPartitionAndAssignment = Map(nonExistentPartition -> nonExistentPartitionAssignment)
createTestTopicAndCluster(testPartitionAndAssignment)
val jsonFile = toJsonFile(nonExistentPartitionAndAssignment.keySet)
try {
PreferredReplicaLeaderElectionCommand.run(Array(
"--bootstrap-server", bootstrapServer(),
"--path-to-json-file", jsonFile.getAbsolutePath))
} catch {
case e: AdminCommandFailedException =>
val suppressed = e.getSuppressed()(0)
assertTrue(suppressed.isInstanceOf[UnknownTopicOrPartitionException])
case e: Throwable =>
e.printStackTrace()
throw e
} finally {
jsonFile.delete()
}
}
/** Test the case where several partitions are given */
@Test
def testMultiplePartitionsSameAssignment() {
val testPartitionA = new TopicPartition("testA", 0)
val testPartitionB = new TopicPartition("testB", 0)
val testPartitionAssignment = List(1, 2, 0)
val testPartitionPreferredLeader = testPartitionAssignment.head
val testPartitionAndAssignment = Map(testPartitionA -> testPartitionAssignment, testPartitionB -> testPartitionAssignment)
createTestTopicAndCluster(testPartitionAndAssignment)
bounceServer(testPartitionPreferredLeader, testPartitionA)
// Check the leader for the partition is not the preferred one
assertNotEquals(testPartitionPreferredLeader, getLeader(testPartitionA))
assertNotEquals(testPartitionPreferredLeader, getLeader(testPartitionB))
val jsonFile = toJsonFile(testPartitionAndAssignment.keySet)
try {
PreferredReplicaLeaderElectionCommand.run(Array(
"--bootstrap-server", bootstrapServer(),
"--path-to-json-file", jsonFile.getAbsolutePath))
} finally {
jsonFile.delete()
}
// Check the leader for the partition IS the preferred one
assertEquals(testPartitionPreferredLeader, getLeader(testPartitionA))
assertEquals(testPartitionPreferredLeader, getLeader(testPartitionB))
}
/** What happens when the preferred replica is already the leader? */
@Test
def testNoopElection() {
createTestTopicAndCluster(testPartitionAndAssignment)
// Don't bounce the server. Doublec heck the leader for the partition is the preferred one
assertEquals(testPartitionPreferredLeader, getLeader(testPartition))
val jsonFile = toJsonFile(testPartitionAndAssignment.keySet)
try {
// Now do the election, even though the preferred replica is *already* the leader
PreferredReplicaLeaderElectionCommand.run(Array(
"--bootstrap-server", bootstrapServer(),
"--path-to-json-file", jsonFile.getAbsolutePath))
// Check the leader for the partition still is the preferred one
assertEquals(testPartitionPreferredLeader, getLeader(testPartition))
} finally {
jsonFile.delete()
}
}
/** What happens if the preferred replica is offline? */
@Test
def testWithOfflinePreferredReplica() {
createTestTopicAndCluster(testPartitionAndAssignment)
bounceServer(testPartitionPreferredLeader, testPartition)
// Check the leader for the partition is not the preferred one
val leader = getLeader(testPartition)
assertNotEquals(testPartitionPreferredLeader, leader)
// Now kill the preferred one
servers(testPartitionPreferredLeader).shutdown()
// Now try to elect the preferred one
val jsonFile = toJsonFile(testPartitionAndAssignment.keySet)
try {
PreferredReplicaLeaderElectionCommand.run(Array(
"--bootstrap-server", bootstrapServer(),
"--path-to-json-file", jsonFile.getAbsolutePath))
fail();
} catch {
case e: AdminCommandFailedException =>
assertEquals("1 preferred replica(s) could not be elected", e.getMessage)
val suppressed = e.getSuppressed()(0)
assertTrue(suppressed.isInstanceOf[PreferredLeaderNotAvailableException])
assertTrue(suppressed.getMessage, suppressed.getMessage.contains("Failed to elect leader for partition test-0 under strategy PreferredReplicaPartitionLeaderElectionStrategy"))
// Check we still have the same leader
assertEquals(leader, getLeader(testPartition))
} finally {
jsonFile.delete()
}
}
/** What happens if the controller gets killed just before an election? */
@Test
def testTimeout() {
createTestTopicAndCluster(testPartitionAndAssignment)
bounceServer(testPartitionPreferredLeader, testPartition)
// Check the leader for the partition is not the preferred one
val leader = getLeader(testPartition)
assertNotEquals(testPartitionPreferredLeader, leader)
// Now kill the controller just before we trigger the election
val controller = getController().get.config.brokerId
servers(controller).shutdown()
val jsonFile = toJsonFile(testPartitionAndAssignment.keySet)
try {
PreferredReplicaLeaderElectionCommand.run(Array(
"--bootstrap-server", bootstrapServer(controller),
"--path-to-json-file", jsonFile.getAbsolutePath),
timeout = 2000)
fail();
} catch {
case e: AdminCommandFailedException =>
assertEquals("1 preferred replica(s) could not be elected", e.getMessage)
assertTrue(e.getSuppressed()(0).getMessage.contains("Timed out waiting for a node assignment"))
// Check we still have the same leader
assertEquals(leader, getLeader(testPartition))
} finally {
jsonFile.delete()
}
}
/** Test the case where client is not authorized */
@Test
def testAuthzFailure() {
createTestTopicAndCluster(testPartitionAndAssignment, Some(classOf[PreferredReplicaLeaderElectionCommandTestAuthorizer].getName))
bounceServer(testPartitionPreferredLeader, testPartition)
// Check the leader for the partition is not the preferred one
val leader = getLeader(testPartition)
assertNotEquals(testPartitionPreferredLeader, leader)
// Check the leader for the partition is not the preferred one
assertNotEquals(testPartitionPreferredLeader, getLeader(testPartition))
val jsonFile = toJsonFile(testPartitionAndAssignment.keySet)
try {
PreferredReplicaLeaderElectionCommand.run(Array(
"--bootstrap-server", bootstrapServer(),
"--path-to-json-file", jsonFile.getAbsolutePath))
fail();
} catch {
case e: AdminCommandFailedException =>
assertEquals("1 preferred replica(s) could not be elected", e.getMessage)
assertTrue(e.getSuppressed()(0).isInstanceOf[ClusterAuthorizationException])
// Check we still have the same leader
assertEquals(leader, getLeader(testPartition))
} finally {
jsonFile.delete()
}
}
}
class PreferredReplicaLeaderElectionCommandTestAuthorizer extends SimpleAclAuthorizer {
override def authorize(session: RequestChannel.Session, operation: Operation, resource: Resource): Boolean =
operation != Alter || resource.resourceType != Cluster
}

View File

@ -158,7 +158,7 @@ object AbstractCoordinatorConcurrencyTest {
}
class TestReplicaManager extends ReplicaManager(
null, null, null, null, null, null, null, null, null, null, null, null, null, null, None) {
null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, None) {
var producePurgatory: DelayedOperationPurgatory[DelayedProduce] = _
var watchKeys: mutable.Set[TopicPartitionOperationKey] = _

View File

@ -524,7 +524,7 @@ class KafkaApisTest {
channel.buffer.getInt() // read the size
ResponseHeader.parse(channel.buffer)
val struct = api.responseSchema(request.version).read(channel.buffer)
AbstractResponse.parseResponse(api, struct)
AbstractResponse.parseResponse(api, struct, request.version)
}
private def expectNoThrottling(): Capture[RequestChannel.Response] = {

View File

@ -657,6 +657,8 @@ class ReplicaManagerTest {
purgatoryName = "Fetch", timer, reaperEnabled = false)
val mockDeleteRecordsPurgatory = new DelayedOperationPurgatory[DelayedDeleteRecords](
purgatoryName = "DeleteRecords", timer, reaperEnabled = false)
val mockElectPreferredLeaderPurgatory = new DelayedOperationPurgatory[DelayedElectPreferredLeader](
purgatoryName = "ElectPreferredLeader", timer, reaperEnabled = false)
// Mock network client to show leader offset of 5
val quota = QuotaFactory.instantiate(config, metrics, time, "")
@ -665,7 +667,7 @@ class ReplicaManagerTest {
val replicaManager = new ReplicaManager(config, metrics, time, kafkaZkClient, mockScheduler, mockLogMgr,
new AtomicBoolean(false), quota, mockBrokerTopicStats,
metadataCache, mockLogDirFailureChannel, mockProducePurgatory, mockFetchPurgatory,
mockDeleteRecordsPurgatory, Option(this.getClass.getName)) {
mockDeleteRecordsPurgatory, mockElectPreferredLeaderPurgatory, Option(this.getClass.getName)) {
override protected def createReplicaFetcherManager(metrics: Metrics,
time: Time,
@ -815,11 +817,13 @@ class ReplicaManagerTest {
purgatoryName = "Fetch", timer, reaperEnabled = false)
val mockDeleteRecordsPurgatory = new DelayedOperationPurgatory[DelayedDeleteRecords](
purgatoryName = "DeleteRecords", timer, reaperEnabled = false)
val mockDelayedElectPreferredLeaderPurgatory = new DelayedOperationPurgatory[DelayedElectPreferredLeader](
purgatoryName = "DelayedElectPreferredLeader", timer, reaperEnabled = false)
new ReplicaManager(config, metrics, time, kafkaZkClient, new MockScheduler(time), mockLogMgr,
new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time, ""), new BrokerTopicStats,
metadataCache, new LogDirFailureChannel(config.logDirs.size), mockProducePurgatory, mockFetchPurgatory,
mockDeleteRecordsPurgatory, Option(this.getClass.getName))
mockDeleteRecordsPurgatory, mockDelayedElectPreferredLeaderPurgatory, Option(this.getClass.getName))
}
}

View File

@ -24,6 +24,7 @@ import kafka.security.auth._
import kafka.utils.TestUtils
import org.apache.kafka.common.acl.{AccessControlEntry, AccessControlEntryFilter, AclBinding, AclBindingFilter, AclOperation, AclPermissionType}
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.message.ElectPreferredLeadersRequestData
import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourcePatternFilter, ResourceType => AdminResourceType}
import org.apache.kafka.common.{Node, TopicPartition}
import org.apache.kafka.common.metrics.{KafkaMetric, Quota, Sensor}
@ -359,6 +360,15 @@ class RequestQuotaTest extends BaseRequestTest {
case ApiKeys.DELETE_GROUPS =>
new DeleteGroupsRequest.Builder(Collections.singleton("test-group"))
case ApiKeys.ELECT_PREFERRED_LEADERS =>
val partition = new ElectPreferredLeadersRequestData.TopicPartitions()
.setPartitionId(Collections.singletonList(0))
.setTopic("my_topic")
new ElectPreferredLeadersRequest.Builder(
new ElectPreferredLeadersRequestData()
.setTimeoutMs(0)
.setTopicPartitions(Collections.singletonList(partition)))
case _ =>
throw new IllegalArgumentException("Unsupported API key " + apiKey)
}
@ -450,6 +460,7 @@ class RequestQuotaTest extends BaseRequestTest {
case ApiKeys.RENEW_DELEGATION_TOKEN => new RenewDelegationTokenResponse(response).throttleTimeMs
case ApiKeys.DELETE_GROUPS => new DeleteGroupsResponse(response).throttleTimeMs
case ApiKeys.OFFSET_FOR_LEADER_EPOCH => new OffsetsForLeaderEpochResponse(response).throttleTimeMs
case ApiKeys.ELECT_PREFERRED_LEADERS => new ElectPreferredLeadersResponse(response, 0).throttleTimeMs
case requestId => throw new IllegalArgumentException(s"No throttle time for $requestId")
}
}