KAFKA-13162: Ensure ElectLeaders is properly handled in KRaft (#11186)

This patch fixes several problems with the `ElectLeaders` API in KRaft:

- `KafkaApis` did not properly forward this request type to the controller.
- `ControllerApis` did not handle the request type.
- `ElectLeadersRequest.getErrorResponse` may raise NPE when `TopicPartitions` is null.
- Controller should not do preferred election if `ElectLeaders` specifies `UNCLEAN` election.
- Controller should not do unclean election if `ElectLeaders` specifies `PREFERRED` election.
- Controller should use proper error codes to handle cases when desired leader is unavailable or when no election is needed because a desired leader is already elected.
- When election for all partitions is requested (indicated with null `TopicPartitions` field), the response should not return partitions for which no election was necessary.

In addition to extending the unit test coverage in `ReplicationControlManagerTest`, I have also converted `LeaderElectionCommandTest` to use KRaft.

Reviewers: dengziming <swzmdeng@163.com>, José Armando García Sancio <jsancio@users.noreply.github.com>, David Arthur <mumrah@gmail.com>
This commit is contained in:
Jason Gustafson 2021-09-15 08:52:45 -07:00 committed by GitHub
parent 75795d1ed8
commit 7de8a93c7e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
31 changed files with 1210 additions and 531 deletions

View File

@ -84,7 +84,7 @@ public enum ApiKeys {
EXPIRE_DELEGATION_TOKEN(ApiMessageType.EXPIRE_DELEGATION_TOKEN, false, true),
DESCRIBE_DELEGATION_TOKEN(ApiMessageType.DESCRIBE_DELEGATION_TOKEN),
DELETE_GROUPS(ApiMessageType.DELETE_GROUPS),
ELECT_LEADERS(ApiMessageType.ELECT_LEADERS),
ELECT_LEADERS(ApiMessageType.ELECT_LEADERS, false, true),
INCREMENTAL_ALTER_CONFIGS(ApiMessageType.INCREMENTAL_ALTER_CONFIGS, false, true),
ALTER_PARTITION_REASSIGNMENTS(ApiMessageType.ALTER_PARTITION_REASSIGNMENTS, false, true),
LIST_PARTITION_REASSIGNMENTS(ApiMessageType.LIST_PARTITION_REASSIGNMENTS, false, true),

View File

@ -103,20 +103,22 @@ public class ElectLeadersRequest extends AbstractRequest {
ApiError apiError = ApiError.fromThrowable(e);
List<ReplicaElectionResult> electionResults = new ArrayList<>();
for (TopicPartitions topic : data.topicPartitions()) {
ReplicaElectionResult electionResult = new ReplicaElectionResult();
if (data.topicPartitions() != null) {
for (TopicPartitions topic : data.topicPartitions()) {
ReplicaElectionResult electionResult = new ReplicaElectionResult();
electionResult.setTopic(topic.topic());
for (Integer partitionId : topic.partitions()) {
PartitionResult partitionResult = new PartitionResult();
partitionResult.setPartitionId(partitionId);
partitionResult.setErrorCode(apiError.error().code());
partitionResult.setErrorMessage(apiError.message());
electionResult.setTopic(topic.topic());
for (Integer partitionId : topic.partitions()) {
PartitionResult partitionResult = new PartitionResult();
partitionResult.setPartitionId(partitionId);
partitionResult.setErrorCode(apiError.error().code());
partitionResult.setErrorMessage(apiError.message());
electionResult.partitionResult().add(partitionResult);
electionResult.partitionResult().add(partitionResult);
}
electionResults.add(electionResult);
}
electionResults.add(electionResult);
}
return new ElectLeadersResponse(throttleTimeMs, apiError.error().code(), electionResults, version());

View File

@ -208,7 +208,7 @@ class BrokerLifecycleManager(val config: KafkaConfig,
_state = BrokerState.PENDING_CONTROLLED_SHUTDOWN
// Send the next heartbeat immediately in order to let the controller
// begin processing the controlled shutdown as soon as possible.
scheduleNextCommunication(0)
scheduleNextCommunicationImmediately()
case _ =>
info(s"Skipping controlled shutdown because we are in state ${_state}.")
@ -284,8 +284,8 @@ class BrokerLifecycleManager(val config: KafkaConfig,
setIncarnationId(incarnationId).
setListeners(_advertisedListeners).
setRack(rack.orNull)
if (isTraceEnabled) {
trace(s"Sending broker registration ${data}")
if (isDebugEnabled) {
debug(s"Sending broker registration ${data}")
}
_channelManager.sendRequest(new BrokerRegistrationRequest.Builder(data),
new BrokerRegistrationResponseHandler())
@ -406,7 +406,7 @@ class BrokerLifecycleManager(val config: KafkaConfig,
scheduleNextCommunicationAfterSuccess()
}
} else {
info(s"The controlled has asked us to exit controlled shutdown.")
info(s"The controller has asked us to exit controlled shutdown.")
beginShutdown()
}
gotControlledShutdownResponse = true

View File

@ -17,11 +17,11 @@
package kafka.server
import java.net.InetAddress
import java.util
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.{CompletableFuture, TimeUnit, TimeoutException}
import java.net.InetAddress
import kafka.cluster.Broker.ServerInfo
import kafka.coordinator.group.GroupCoordinator
@ -34,7 +34,6 @@ import kafka.security.CredentialProvider
import kafka.server.KafkaRaftServer.ControllerRole
import kafka.server.metadata.{BrokerMetadataListener, BrokerMetadataPublisher, BrokerMetadataSnapshotter, ClientQuotaMetadataManager, KRaftMetadataCache, SnapshotWriterBuilder}
import kafka.utils.{CoreUtils, KafkaScheduler}
import org.apache.kafka.snapshot.SnapshotWriter
import org.apache.kafka.common.message.ApiMessageType.ListenerType
import org.apache.kafka.common.message.BrokerRegistrationRequestData.{Listener, ListenerCollection}
import org.apache.kafka.common.metrics.Metrics
@ -45,14 +44,15 @@ import org.apache.kafka.common.security.token.delegation.internals.DelegationTok
import org.apache.kafka.common.utils.{AppInfoParser, LogContext, Time, Utils}
import org.apache.kafka.common.{ClusterResource, Endpoint}
import org.apache.kafka.metadata.{BrokerState, VersionRange}
import org.apache.kafka.raft.{RaftClient, RaftConfig}
import org.apache.kafka.raft.RaftConfig.AddressSpec
import org.apache.kafka.raft.{RaftClient, RaftConfig}
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.common.ApiMessageAndVersion
import org.apache.kafka.snapshot.SnapshotWriter
import scala.collection.{Map, Seq}
import scala.jdk.CollectionConverters._
import scala.compat.java8.OptionConverters._
import scala.jdk.CollectionConverters._
class BrokerSnapshotWriterBuilder(raftClient: RaftClient[ApiMessageAndVersion])
@ -92,8 +92,7 @@ class BrokerServer(
this.logIdent = logContext.logPrefix
val lifecycleManager: BrokerLifecycleManager =
new BrokerLifecycleManager(config, time, threadNamePrefix)
@volatile private var lifecycleManager: BrokerLifecycleManager = null
private val isShuttingDown = new AtomicBoolean(false)
@ -105,7 +104,7 @@ class BrokerServer(
var controlPlaneRequestProcessor: KafkaApis = null
var authorizer: Option[Authorizer] = None
var socketServer: SocketServer = null
@volatile var socketServer: SocketServer = null
var dataPlaneRequestHandlerPool: KafkaRequestHandlerPool = null
var logDirFailureChannel: LogDirFailureChannel = null
@ -162,6 +161,8 @@ class BrokerServer(
lock.lock()
try {
if (status != from) return false
info(s"Transition from $status to $to")
status = to
if (to == SHUTTING_DOWN) {
isShuttingDown.set(true)
@ -182,6 +183,8 @@ class BrokerServer(
try {
info("Starting broker")
lifecycleManager = new BrokerLifecycleManager(config, time, threadNamePrefix)
/* start scheduler */
kafkaScheduler = new KafkaScheduler(config.backgroundThreads)
kafkaScheduler.startup()

View File

@ -20,8 +20,8 @@ package kafka.server
import java.util
import java.util.Collections
import java.util.Map.Entry
import java.util.concurrent.{CompletableFuture, ExecutionException}
import java.util.concurrent.TimeUnit.{MILLISECONDS, NANOSECONDS}
import java.util.concurrent.{CompletableFuture, ExecutionException}
import kafka.network.RequestChannel
import kafka.raft.RaftManager
@ -36,11 +36,10 @@ import org.apache.kafka.common.internals.FatalExitError
import org.apache.kafka.common.message.AlterConfigsResponseData.{AlterConfigsResourceResponse => OldAlterConfigsResourceResponse}
import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsTopic
import org.apache.kafka.common.message.CreatePartitionsResponseData.CreatePartitionsTopicResult
import org.apache.kafka.common.message.CreateTopicsRequestData
import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult
import org.apache.kafka.common.message.DeleteTopicsResponseData.{DeletableTopicResult, DeletableTopicResultCollection}
import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.AlterConfigsResourceResponse
import org.apache.kafka.common.message._
import org.apache.kafka.common.message.{CreateTopicsRequestData, _}
import org.apache.kafka.common.protocol.Errors._
import org.apache.kafka.common.protocol.{ApiKeys, ApiMessage, Errors}
import org.apache.kafka.common.requests._
@ -108,6 +107,7 @@ class ControllerApis(val requestChannel: RequestChannel,
case ApiKeys.DESCRIBE_ACLS => aclApis.handleDescribeAcls(request)
case ApiKeys.CREATE_ACLS => aclApis.handleCreateAcls(request)
case ApiKeys.DELETE_ACLS => aclApis.handleDeleteAcls(request)
case ApiKeys.ELECT_LEADERS => handleElectLeaders(request)
case _ => throw new ApiException(s"Unsupported ApiKey ${request.context.header.apiKey}")
}
} catch {
@ -488,6 +488,24 @@ class ControllerApis(val requestChannel: RequestChannel,
handleRaftRequest(request, response => new DescribeQuorumResponse(response.asInstanceOf[DescribeQuorumResponseData]))
}
def handleElectLeaders(request: RequestChannel.Request): Unit = {
authHelper.authorizeClusterOperation(request, ALTER)
val electLeadersRequest = request.body[ElectLeadersRequest]
val future = controller.electLeaders(electLeadersRequest.data)
future.whenComplete { (responseData, exception) =>
if (exception != null) {
requestHelper.sendResponseMaybeThrottle(request, throttleMs => {
electLeadersRequest.getErrorResponse(throttleMs, exception)
})
} else {
requestHelper.sendResponseMaybeThrottle(request, throttleMs => {
new ElectLeadersResponse(responseData.setThrottleTimeMs(throttleMs))
})
}
}
}
def handleAlterIsrRequest(request: RequestChannel.Request): Unit = {
val alterIsrRequest = request.body[AlterIsrRequest]
authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)

View File

@ -209,7 +209,7 @@ class KafkaApis(val requestChannel: RequestChannel,
case ApiKeys.EXPIRE_DELEGATION_TOKEN => maybeForwardToController(request, handleExpireTokenRequest)
case ApiKeys.DESCRIBE_DELEGATION_TOKEN => handleDescribeTokensRequest(request)
case ApiKeys.DELETE_GROUPS => handleDeleteGroupsRequest(request, requestLocal)
case ApiKeys.ELECT_LEADERS => handleElectReplicaLeader(request)
case ApiKeys.ELECT_LEADERS => maybeForwardToController(request, handleElectLeaders)
case ApiKeys.INCREMENTAL_ALTER_CONFIGS => maybeForwardToController(request, handleIncrementalAlterConfigsRequest)
case ApiKeys.ALTER_PARTITION_REASSIGNMENTS => maybeForwardToController(request, handleAlterPartitionReassignmentsRequest)
case ApiKeys.LIST_PARTITION_REASSIGNMENTS => maybeForwardToController(request, handleListPartitionReassignmentsRequest)
@ -2993,9 +2993,8 @@ class KafkaApis(val requestChannel: RequestChannel,
true
}
def handleElectReplicaLeader(request: RequestChannel.Request): Unit = {
val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.notYetSupported(request))
def handleElectLeaders(request: RequestChannel.Request): Unit = {
val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.shouldAlwaysForward(request))
val electionRequest = request.body[ElectLeadersRequest]
def sendResponseCallback(
@ -3006,7 +3005,7 @@ class KafkaApis(val requestChannel: RequestChannel,
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {
val adjustedResults = if (electionRequest.data.topicPartitions == null) {
/* When performing elections across all of the partitions we should only return
* partitions for which there was an eleciton or resulted in an error. In other
* partitions for which there was an election or resulted in an error. In other
* words, partitions that didn't need election because they ready have the correct
* leader are not returned to the client.
*/

View File

@ -117,6 +117,8 @@ class BrokerMetadataPublisher(conf: KafkaConfig,
delta: MetadataDelta,
newImage: MetadataImage): Unit = {
try {
trace(s"Publishing delta $delta with highest offset $newHighestMetadataOffset")
// Publish the new metadata image to the metadata cache.
metadataCache.setImage(newImage)

View File

@ -95,5 +95,11 @@ public interface ClusterInstance {
void stop();
void shutdownBroker(int brokerId);
void startBroker(int brokerId);
void rollingBrokerRestart();
void waitForReadyBrokers() throws InterruptedException;
}

View File

@ -27,6 +27,7 @@ import kafka.testkit.TestKitNodes;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.metadata.BrokerState;
import org.junit.jupiter.api.extension.AfterTestExecutionCallback;
import org.junit.jupiter.api.extension.BeforeTestExecutionCallback;
@ -36,10 +37,14 @@ import org.junit.jupiter.api.extension.TestTemplateInvocationContext;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* Wraps a {@link KafkaClusterTestKit} inside lifecycle methods for a test invocation. Each instance of this
@ -73,6 +78,7 @@ public class RaftClusterInvocationContext implements TestTemplateInvocationConte
@Override
public List<Extension> getAdditionalExtensions() {
RaftClusterInstance clusterInstance = new RaftClusterInstance(clusterReference, clusterConfig);
return Arrays.asList(
(BeforeTestExecutionCallback) context -> {
TestKitNodes nodes = new TestKitNodes.Builder().
@ -97,8 +103,8 @@ public class RaftClusterInvocationContext implements TestTemplateInvocationConte
org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS,
100L);
},
(AfterTestExecutionCallback) context -> clusterReference.get().close(),
new ClusterInstanceParameterResolver(new RaftClusterInstance(clusterReference, clusterConfig)),
(AfterTestExecutionCallback) context -> clusterInstance.stop(),
new ClusterInstanceParameterResolver(clusterInstance),
new GenericParameterResolver<>(clusterConfig, ClusterConfig.class)
);
}
@ -109,6 +115,7 @@ public class RaftClusterInvocationContext implements TestTemplateInvocationConte
private final ClusterConfig clusterConfig;
final AtomicBoolean started = new AtomicBoolean(false);
final AtomicBoolean stopped = new AtomicBoolean(false);
private final ConcurrentLinkedQueue<Admin> admins = new ConcurrentLinkedQueue<>();
RaftClusterInstance(AtomicReference<KafkaClusterTestKit> clusterReference, ClusterConfig clusterConfig) {
this.clusterReference = clusterReference;
@ -122,7 +129,7 @@ public class RaftClusterInvocationContext implements TestTemplateInvocationConte
@Override
public Collection<SocketServer> brokerSocketServers() {
return clusterReference.get().brokers().values().stream()
return brokers()
.map(BrokerServer::socketServer)
.collect(Collectors.toList());
}
@ -134,14 +141,14 @@ public class RaftClusterInvocationContext implements TestTemplateInvocationConte
@Override
public Collection<SocketServer> controllerSocketServers() {
return clusterReference.get().controllers().values().stream()
return controllers()
.map(ControllerServer::socketServer)
.collect(Collectors.toList());
}
@Override
public SocketServer anyBrokerSocketServer() {
return clusterReference.get().brokers().values().stream()
return brokers()
.map(BrokerServer::socketServer)
.findFirst()
.orElseThrow(() -> new RuntimeException("No broker SocketServers found"));
@ -149,7 +156,7 @@ public class RaftClusterInvocationContext implements TestTemplateInvocationConte
@Override
public SocketServer anyControllerSocketServer() {
return clusterReference.get().controllers().values().stream()
return controllers()
.map(ControllerServer::socketServer)
.findFirst()
.orElseThrow(() -> new RuntimeException("No controller SocketServers found"));
@ -172,7 +179,9 @@ public class RaftClusterInvocationContext implements TestTemplateInvocationConte
@Override
public Admin createAdminClient(Properties configOverrides) {
return Admin.create(clusterReference.get().clientProperties());
Admin admin = Admin.create(clusterReference.get().clientProperties());
admins.add(admin);
return admin;
}
@Override
@ -189,11 +198,27 @@ public class RaftClusterInvocationContext implements TestTemplateInvocationConte
@Override
public void stop() {
if (stopped.compareAndSet(false, true)) {
try {
clusterReference.get().close();
} catch (Exception e) {
throw new RuntimeException("Failed to stop Raft server", e);
}
admins.forEach(admin -> Utils.closeQuietly(admin, "admin"));
Utils.closeQuietly(clusterReference.get(), "cluster");
}
}
@Override
public void shutdownBroker(int brokerId) {
findBrokerOrThrow(brokerId).shutdown();
}
@Override
public void startBroker(int brokerId) {
findBrokerOrThrow(brokerId).startup();
}
@Override
public void waitForReadyBrokers() throws InterruptedException {
try {
clusterReference.get().waitForReadyBrokers();
} catch (ExecutionException e) {
throw new AssertionError("Failed while waiting for brokers to become ready", e);
}
}
@ -201,5 +226,19 @@ public class RaftClusterInvocationContext implements TestTemplateInvocationConte
public void rollingBrokerRestart() {
throw new UnsupportedOperationException("Restarting Raft servers is not yet supported.");
}
private BrokerServer findBrokerOrThrow(int brokerId) {
return Optional.ofNullable(clusterReference.get().brokers().get(brokerId))
.orElseThrow(() -> new IllegalArgumentException("Unknown brokerId " + brokerId));
}
private Stream<BrokerServer> brokers() {
return clusterReference.get().brokers().values().stream();
}
private Stream<ControllerServer> controllers() {
return clusterReference.get().controllers().values().stream();
}
}
}

View File

@ -21,12 +21,12 @@ import kafka.api.IntegrationTestHarness;
import kafka.network.SocketServer;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.test.ClusterConfig;
import kafka.test.ClusterInstance;
import kafka.utils.TestUtils;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import kafka.test.ClusterConfig;
import kafka.test.ClusterInstance;
import org.junit.jupiter.api.extension.AfterTestExecutionCallback;
import org.junit.jupiter.api.extension.BeforeTestExecutionCallback;
import org.junit.jupiter.api.extension.Extension;
@ -44,6 +44,7 @@ import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* Wraps a {@link IntegrationTestHarness} inside lifecycle methods for a test invocation. Each instance of this
@ -193,7 +194,7 @@ public class ZkClusterInvocationContext implements TestTemplateInvocationContext
@Override
public Collection<SocketServer> brokerSocketServers() {
return JavaConverters.asJavaCollection(clusterReference.get().servers()).stream()
return servers()
.map(KafkaServer::socketServer)
.collect(Collectors.toList());
}
@ -205,7 +206,7 @@ public class ZkClusterInvocationContext implements TestTemplateInvocationContext
@Override
public Collection<SocketServer> controllerSocketServers() {
return JavaConverters.asJavaCollection(clusterReference.get().servers()).stream()
return servers()
.filter(broker -> broker.kafkaController().isActive())
.map(KafkaServer::socketServer)
.collect(Collectors.toList());
@ -213,7 +214,7 @@ public class ZkClusterInvocationContext implements TestTemplateInvocationContext
@Override
public SocketServer anyBrokerSocketServer() {
return JavaConverters.asJavaCollection(clusterReference.get().servers()).stream()
return servers()
.map(KafkaServer::socketServer)
.findFirst()
.orElseThrow(() -> new RuntimeException("No broker SocketServers found"));
@ -221,7 +222,7 @@ public class ZkClusterInvocationContext implements TestTemplateInvocationContext
@Override
public SocketServer anyControllerSocketServer() {
return JavaConverters.asJavaCollection(clusterReference.get().servers()).stream()
return servers()
.filter(broker -> broker.kafkaController().isActive())
.map(KafkaServer::socketServer)
.findFirst()
@ -262,6 +263,16 @@ public class ZkClusterInvocationContext implements TestTemplateInvocationContext
}
}
@Override
public void shutdownBroker(int brokerId) {
findBrokerOrThrow(brokerId).shutdown();
}
@Override
public void startBroker(int brokerId) {
findBrokerOrThrow(brokerId).startup();
}
@Override
public void rollingBrokerRestart() {
if (!started.get()) {
@ -272,5 +283,25 @@ public class ZkClusterInvocationContext implements TestTemplateInvocationContext
}
clusterReference.get().restartDeadBrokers(true);
}
@Override
public void waitForReadyBrokers() throws InterruptedException {
org.apache.kafka.test.TestUtils.waitForCondition(() -> {
int numRegisteredBrokers = clusterReference.get().zkClient().getAllBrokersInCluster().size();
return numRegisteredBrokers == config.numBrokers();
}, "Timed out while waiting for brokers to become ready");
}
private KafkaServer findBrokerOrThrow(int brokerId) {
return servers()
.filter(server -> server.config().brokerId() == brokerId)
.findFirst()
.orElseThrow(() -> new IllegalArgumentException("Unknown brokerId " + brokerId));
}
private Stream<KafkaServer> servers() {
return JavaConverters.asJavaCollection(clusterReference.get().servers()).stream();
}
}
}

View File

@ -17,19 +17,23 @@
package kafka.server
import java.io.{DataInputStream, DataOutputStream}
import java.net.Socket
import java.nio.ByteBuffer
import java.util.{Collections, Properties}
import kafka.network.SocketServer
import kafka.utils.Implicits._
import kafka.utils.{NotNothing, TestUtils}
import org.apache.kafka.clients.admin.{Admin, NewTopic}
import org.apache.kafka.common.network.{ListenerName, Mode}
import org.apache.kafka.common.protocol.ApiKeys
import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, RequestHeader, ResponseHeader}
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.Utils
import java.io.{DataInputStream, DataOutputStream}
import java.net.Socket
import java.nio.ByteBuffer
import java.util.Properties
import scala.annotation.nowarn
import scala.jdk.CollectionConverters._
import scala.reflect.ClassTag
object IntegrationTestUtils {
@ -101,6 +105,32 @@ object IntegrationTestUtils {
finally socket.close()
}
def createTopic(
admin: Admin,
topic: String,
numPartitions: Int,
replicationFactor: Short
): Unit = {
val newTopics = Collections.singletonList(new NewTopic(topic, numPartitions, replicationFactor))
val createTopicResult = admin.createTopics(newTopics)
createTopicResult.all().get()
}
def createTopic(
admin: Admin,
topic: String,
replicaAssignment: Map[Int, Seq[Int]]
): Unit = {
val javaAssignment = new java.util.HashMap[Integer, java.util.List[Integer]]()
replicaAssignment.forKeyValue { (partitionId, assignment) =>
javaAssignment.put(partitionId, assignment.map(Int.box).asJava)
}
val newTopic = new NewTopic(topic, javaAssignment)
val newTopics = Collections.singletonList(newTopic)
val createTopicResult = admin.createTopics(newTopics)
createTopicResult.all().get()
}
protected def securityProtocol: SecurityProtocol = SecurityProtocol.PLAINTEXT
private var correlationId = 0

View File

@ -0,0 +1,97 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.admin
import kafka.common.AdminCommandFailedException
import org.apache.kafka.common.errors.TimeoutException
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
import scala.concurrent.duration._
/**
* For some error cases, we can save a little build time by avoiding the overhead for
* cluster creation and cleanup because the command is expected to fail immediately.
*/
class LeaderElectionCommandErrorTest {
@Test
def testTopicWithoutPartition(): Unit = {
val e = assertThrows(classOf[Throwable], () => LeaderElectionCommand.main(
Array(
"--bootstrap-server", "nohost:9092",
"--election-type", "unclean",
"--topic", "some-topic"
)
))
assertTrue(e.getMessage.startsWith("Missing required option(s)"))
assertTrue(e.getMessage.contains(" partition"))
}
@Test
def testPartitionWithoutTopic(): Unit = {
val e = assertThrows(classOf[Throwable], () => LeaderElectionCommand.main(
Array(
"--bootstrap-server", "nohost:9092",
"--election-type", "unclean",
"--all-topic-partitions",
"--partition", "0"
)
))
assertEquals("Option partition is only allowed if topic is used", e.getMessage)
}
@Test
def testMissingElectionType(): Unit = {
val e = assertThrows(classOf[Throwable], () => LeaderElectionCommand.main(
Array(
"--bootstrap-server", "nohost:9092",
"--topic", "some-topic",
"--partition", "0"
)
))
assertTrue(e.getMessage.startsWith("Missing required option(s)"))
assertTrue(e.getMessage.contains(" election-type"))
}
@Test
def testMissingTopicPartitionSelection(): Unit = {
val e = assertThrows(classOf[Throwable], () => LeaderElectionCommand.main(
Array(
"--bootstrap-server", "nohost:9092",
"--election-type", "preferred"
)
))
assertTrue(e.getMessage.startsWith("One and only one of the following options is required: "))
assertTrue(e.getMessage.contains(" all-topic-partitions"))
assertTrue(e.getMessage.contains(" topic"))
assertTrue(e.getMessage.contains(" path-to-json-file"))
}
@Test
def testInvalidBroker(): Unit = {
val e = assertThrows(classOf[AdminCommandFailedException], () => LeaderElectionCommand.run(
Array(
"--bootstrap-server", "example.com:1234",
"--election-type", "unclean",
"--all-topic-partitions"
),
1.seconds
))
assertTrue(e.getCause.isInstanceOf[TimeoutException])
}
}

View File

@ -18,214 +18,176 @@ package kafka.admin
import java.io.File
import java.nio.charset.StandardCharsets
import java.nio.file.Files
import java.nio.file.Path
import java.nio.file.{Files, Path}
import kafka.common.AdminCommandFailedException
import kafka.server.KafkaConfig
import kafka.server.KafkaServer
import kafka.server.IntegrationTestUtils.createTopic
import kafka.server.{KafkaConfig, KafkaServer}
import kafka.test.annotation.{ClusterTest, ClusterTestDefaults, Type}
import kafka.test.junit.ClusterTestExtensions
import kafka.test.{ClusterConfig, ClusterInstance}
import kafka.utils.TestUtils
import kafka.zk.ZooKeeperTestHarness
import org.apache.kafka.clients.admin.{Admin, AdminClientConfig}
import org.apache.kafka.clients.admin.AdminClientConfig
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.TimeoutException
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException
import org.apache.kafka.common.network.ListenerName
import org.junit.jupiter.api.AfterEach
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.extension.ExtendWith
import org.junit.jupiter.api.{BeforeEach, Tag}
import scala.jdk.CollectionConverters._
import scala.collection.Seq
import scala.concurrent.duration._
final class LeaderElectionCommandTest extends ZooKeeperTestHarness {
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
@ClusterTestDefaults(clusterType = Type.BOTH, brokers = 3)
@Tag("integration")
final class LeaderElectionCommandTest(cluster: ClusterInstance) {
import LeaderElectionCommandTest._
var servers = Seq.empty[KafkaServer]
val broker1 = 0
val broker2 = 1
val broker3 = 2
@BeforeEach
override def setUp(): Unit = {
super.setUp()
val brokerConfigs = TestUtils.createBrokerConfigs(3, zkConnect, false)
servers = brokerConfigs.map { config =>
config.setProperty("auto.leader.rebalance.enable", "false")
config.setProperty("controlled.shutdown.enable", "true")
config.setProperty("controlled.shutdown.max.retries", "1")
config.setProperty("controlled.shutdown.retry.backoff.ms", "1000")
TestUtils.createServer(KafkaConfig.fromProps(config))
}
def setup(clusterConfig: ClusterConfig): Unit = {
TestUtils.verifyNoUnexpectedThreads("@BeforeEach")
clusterConfig.serverProperties().put(KafkaConfig.AutoLeaderRebalanceEnableProp, "false")
clusterConfig.serverProperties().put(KafkaConfig.ControlledShutdownEnableProp, "true")
clusterConfig.serverProperties().put(KafkaConfig.ControlledShutdownMaxRetriesProp, "1")
clusterConfig.serverProperties().put(KafkaConfig.ControlledShutdownRetryBackoffMsProp, "1000")
clusterConfig.serverProperties().put(KafkaConfig.OffsetsTopicReplicationFactorProp, "2")
}
@AfterEach
override def tearDown(): Unit = {
TestUtils.shutdownServers(servers)
super.tearDown()
}
@Test
@ClusterTest
def testAllTopicPartition(): Unit = {
TestUtils.resource(Admin.create(createConfig(servers).asJava)) { client =>
val topic = "unclean-topic"
val partition = 0
val assignment = Seq(broker2, broker3)
val client = cluster.createAdminClient()
val topic = "unclean-topic"
val partition = 0
val assignment = Seq(broker2, broker3)
TestUtils.createTopic(zkClient, topic, Map(partition -> assignment), servers)
cluster.waitForReadyBrokers()
createTopic(client, topic, Map(partition -> assignment))
val topicPartition = new TopicPartition(topic, partition)
val topicPartition = new TopicPartition(topic, partition)
TestUtils.assertLeader(client, topicPartition, broker2)
TestUtils.assertLeader(client, topicPartition, broker2)
cluster.shutdownBroker(broker3)
TestUtils.waitForBrokersOutOfIsr(client, Set(topicPartition), Set(broker3))
cluster.shutdownBroker(broker2)
TestUtils.assertNoLeader(client, topicPartition)
cluster.startBroker(broker3)
TestUtils.waitForOnlineBroker(client, broker3)
servers(broker3).shutdown()
TestUtils.waitForBrokersOutOfIsr(client, Set(topicPartition), Set(broker3))
servers(broker2).shutdown()
TestUtils.assertNoLeader(client, topicPartition)
servers(broker3).startup()
LeaderElectionCommand.main(
Array(
"--bootstrap-server", bootstrapServers(servers),
"--election-type", "unclean",
"--all-topic-partitions"
)
LeaderElectionCommand.main(
Array(
"--bootstrap-server", cluster.bootstrapServers(),
"--election-type", "unclean",
"--all-topic-partitions"
)
)
TestUtils.assertLeader(client, topicPartition, broker3)
}
TestUtils.assertLeader(client, topicPartition, broker3)
}
@Test
@ClusterTest
def testTopicPartition(): Unit = {
TestUtils.resource(Admin.create(createConfig(servers).asJava)) { client =>
val topic = "unclean-topic"
val partition = 0
val assignment = Seq(broker2, broker3)
val client = cluster.createAdminClient()
val topic = "unclean-topic"
val partition = 0
val assignment = Seq(broker2, broker3)
TestUtils.createTopic(zkClient, topic, Map(partition -> assignment), servers)
cluster.waitForReadyBrokers()
createTopic(client, topic, Map(partition -> assignment))
val topicPartition = new TopicPartition(topic, partition)
val topicPartition = new TopicPartition(topic, partition)
TestUtils.assertLeader(client, topicPartition, broker2)
TestUtils.assertLeader(client, topicPartition, broker2)
servers(broker3).shutdown()
TestUtils.waitForBrokersOutOfIsr(client, Set(topicPartition), Set(broker3))
servers(broker2).shutdown()
TestUtils.assertNoLeader(client, topicPartition)
servers(broker3).startup()
cluster.shutdownBroker(broker3)
TestUtils.waitForBrokersOutOfIsr(client, Set(topicPartition), Set(broker3))
cluster.shutdownBroker(broker2)
TestUtils.assertNoLeader(client, topicPartition)
cluster.startBroker(broker3)
TestUtils.waitForOnlineBroker(client, broker3)
LeaderElectionCommand.main(
Array(
"--bootstrap-server", bootstrapServers(servers),
"--election-type", "unclean",
"--topic", topic,
"--partition", partition.toString
)
LeaderElectionCommand.main(
Array(
"--bootstrap-server", cluster.bootstrapServers(),
"--election-type", "unclean",
"--topic", topic,
"--partition", partition.toString
)
)
TestUtils.assertLeader(client, topicPartition, broker3)
}
TestUtils.assertLeader(client, topicPartition, broker3)
}
@Test
@ClusterTest
def testPathToJsonFile(): Unit = {
TestUtils.resource(Admin.create(createConfig(servers).asJava)) { client =>
val topic = "unclean-topic"
val partition = 0
val assignment = Seq(broker2, broker3)
val client = cluster.createAdminClient()
val topic = "unclean-topic"
val partition = 0
val assignment = Seq(broker2, broker3)
TestUtils.createTopic(zkClient, topic, Map(partition -> assignment), servers)
cluster.waitForReadyBrokers()
createTopic(client, topic, Map(partition -> assignment))
val topicPartition = new TopicPartition(topic, partition)
val topicPartition = new TopicPartition(topic, partition)
TestUtils.assertLeader(client, topicPartition, broker2)
TestUtils.assertLeader(client, topicPartition, broker2)
servers(broker3).shutdown()
TestUtils.waitForBrokersOutOfIsr(client, Set(topicPartition), Set(broker3))
servers(broker2).shutdown()
TestUtils.assertNoLeader(client, topicPartition)
servers(broker3).startup()
cluster.shutdownBroker(broker3)
TestUtils.waitForBrokersOutOfIsr(client, Set(topicPartition), Set(broker3))
cluster.shutdownBroker(broker2)
TestUtils.assertNoLeader(client, topicPartition)
cluster.startBroker(broker3)
TestUtils.waitForOnlineBroker(client, broker3)
val topicPartitionPath = tempTopicPartitionFile(Set(topicPartition))
val topicPartitionPath = tempTopicPartitionFile(Set(topicPartition))
LeaderElectionCommand.main(
Array(
"--bootstrap-server", bootstrapServers(servers),
"--election-type", "unclean",
"--path-to-json-file", topicPartitionPath.toString
)
LeaderElectionCommand.main(
Array(
"--bootstrap-server", cluster.bootstrapServers(),
"--election-type", "unclean",
"--path-to-json-file", topicPartitionPath.toString
)
)
TestUtils.assertLeader(client, topicPartition, broker3)
}
TestUtils.assertLeader(client, topicPartition, broker3)
}
@Test
@ClusterTest
def testPreferredReplicaElection(): Unit = {
TestUtils.resource(Admin.create(createConfig(servers).asJava)) { client =>
val topic = "unclean-topic"
val partition = 0
val assignment = Seq(broker2, broker3)
val client = cluster.createAdminClient()
val topic = "preferred-topic"
val partition = 0
val assignment = Seq(broker2, broker3)
TestUtils.createTopic(zkClient, topic, Map(partition -> assignment), servers)
cluster.waitForReadyBrokers()
createTopic(client, topic, Map(partition -> assignment))
val topicPartition = new TopicPartition(topic, partition)
val topicPartition = new TopicPartition(topic, partition)
TestUtils.assertLeader(client, topicPartition, broker2)
TestUtils.assertLeader(client, topicPartition, broker2)
servers(broker2).shutdown()
TestUtils.assertLeader(client, topicPartition, broker3)
servers(broker2).startup()
TestUtils.waitForBrokersInIsr(client, topicPartition, Set(broker2))
cluster.shutdownBroker(broker2)
TestUtils.assertLeader(client, topicPartition, broker3)
cluster.startBroker(broker2)
TestUtils.waitForBrokersInIsr(client, topicPartition, Set(broker2))
LeaderElectionCommand.main(
Array(
"--bootstrap-server", bootstrapServers(servers),
"--election-type", "preferred",
"--all-topic-partitions"
)
)
TestUtils.assertLeader(client, topicPartition, broker2)
}
}
@Test
def testTopicWithoutPartition(): Unit = {
val e = assertThrows(classOf[Throwable], () => LeaderElectionCommand.main(
LeaderElectionCommand.main(
Array(
"--bootstrap-server", bootstrapServers(servers),
"--election-type", "unclean",
"--topic", "some-topic"
"--bootstrap-server", cluster.bootstrapServers(),
"--election-type", "preferred",
"--all-topic-partitions"
)
))
assertTrue(e.getMessage.startsWith("Missing required option(s)"))
assertTrue(e.getMessage.contains(" partition"))
)
TestUtils.assertLeader(client, topicPartition, broker2)
}
@Test
def testPartitionWithoutTopic(): Unit = {
val e = assertThrows(classOf[Throwable], () => LeaderElectionCommand.main(
Array(
"--bootstrap-server", bootstrapServers(servers),
"--election-type", "unclean",
"--all-topic-partitions",
"--partition", "0"
)
))
assertEquals("Option partition is only allowed if topic is used", e.getMessage)
}
@Test
@ClusterTest
def testTopicDoesNotExist(): Unit = {
val e = assertThrows(classOf[AdminCommandFailedException], () => LeaderElectionCommand.main(
Array(
"--bootstrap-server", bootstrapServers(servers),
"--bootstrap-server", cluster.bootstrapServers(),
"--election-type", "preferred",
"--topic", "unknown-topic-name",
"--partition", "0"
@ -234,86 +196,55 @@ final class LeaderElectionCommandTest extends ZooKeeperTestHarness {
assertTrue(e.getSuppressed()(0).isInstanceOf[UnknownTopicOrPartitionException])
}
@Test
def testMissingElectionType(): Unit = {
val e = assertThrows(classOf[Throwable], () => LeaderElectionCommand.main(
Array(
"--bootstrap-server", bootstrapServers(servers),
"--topic", "some-topic",
"--partition", "0"
)
))
assertTrue(e.getMessage.startsWith("Missing required option(s)"))
assertTrue(e.getMessage.contains(" election-type"))
}
@Test
def testMissingTopicPartitionSelection(): Unit = {
val e = assertThrows(classOf[Throwable], () => LeaderElectionCommand.main(
Array(
"--bootstrap-server", bootstrapServers(servers),
"--election-type", "preferred"
)
))
assertTrue(e.getMessage.startsWith("One and only one of the following options is required: "))
assertTrue(e.getMessage.contains(" all-topic-partitions"))
assertTrue(e.getMessage.contains(" topic"))
assertTrue(e.getMessage.contains(" path-to-json-file"))
}
@Test
def testInvalidBroker(): Unit = {
val e = assertThrows(classOf[AdminCommandFailedException], () => LeaderElectionCommand.run(
Array(
"--bootstrap-server", "example.com:1234",
"--election-type", "unclean",
"--all-topic-partitions"
),
1.seconds
))
assertTrue(e.getCause.isInstanceOf[TimeoutException])
}
@Test
@ClusterTest
def testElectionResultOutput(): Unit = {
TestUtils.resource(Admin.create(createConfig(servers).asJava)) { client =>
val topic = "non-preferred-topic"
val partition0 = 0
val partition1 = 1
val assignment0 = Seq(broker2, broker3)
val assignment1 = Seq(broker3, broker2)
val client = cluster.createAdminClient()
val topic = "non-preferred-topic"
val partition0 = 0
val partition1 = 1
val assignment0 = Seq(broker2, broker3)
val assignment1 = Seq(broker3, broker2)
TestUtils.createTopic(zkClient, topic, Map(partition0 -> assignment0, partition1 -> assignment1), servers)
cluster.waitForReadyBrokers()
createTopic(client, topic, Map(
partition0 -> assignment0,
partition1 -> assignment1
))
val topicPartition0 = new TopicPartition(topic, partition0)
val topicPartition1 = new TopicPartition(topic, partition1)
val topicPartition0 = new TopicPartition(topic, partition0)
val topicPartition1 = new TopicPartition(topic, partition1)
TestUtils.assertLeader(client, topicPartition0, broker2)
TestUtils.assertLeader(client, topicPartition1, broker3)
TestUtils.assertLeader(client, topicPartition0, broker2)
TestUtils.assertLeader(client, topicPartition1, broker3)
servers(broker2).shutdown()
TestUtils.assertLeader(client, topicPartition0, broker3)
servers(broker2).startup()
TestUtils.waitForBrokersInIsr(client, topicPartition0, Set(broker2))
TestUtils.waitForBrokersInIsr(client, topicPartition1, Set(broker2))
cluster.shutdownBroker(broker2)
TestUtils.assertLeader(client, topicPartition0, broker3)
cluster.startBroker(broker2)
TestUtils.waitForBrokersInIsr(client, topicPartition0, Set(broker2))
TestUtils.waitForBrokersInIsr(client, topicPartition1, Set(broker2))
val topicPartitionPath = tempTopicPartitionFile(Set(topicPartition0, topicPartition1))
val output = TestUtils.grabConsoleOutput(
LeaderElectionCommand.main(
Array(
"--bootstrap-server", bootstrapServers(servers),
"--election-type", "preferred",
"--path-to-json-file", topicPartitionPath.toString
)
val topicPartitionPath = tempTopicPartitionFile(Set(topicPartition0, topicPartition1))
val output = TestUtils.grabConsoleOutput(
LeaderElectionCommand.main(
Array(
"--bootstrap-server", cluster.bootstrapServers(),
"--election-type", "preferred",
"--path-to-json-file", topicPartitionPath.toString
)
)
)
val electionResultOutputIter = output.split("\n").iterator
assertTrue(electionResultOutputIter.hasNext)
assertTrue(electionResultOutputIter.next().contains(s"Successfully completed leader election (PREFERRED) for partitions $topicPartition0"))
assertTrue(electionResultOutputIter.hasNext)
assertTrue(electionResultOutputIter.next().contains(s"Valid replica already elected for partitions $topicPartition1"))
}
val electionResultOutputIter = output.split("\n").iterator
assertTrue(electionResultOutputIter.hasNext)
val firstLine = electionResultOutputIter.next()
assertTrue(firstLine.contains(s"Successfully completed leader election (PREFERRED) for partitions $topicPartition0"),
s"Unexpected output: $firstLine")
assertTrue(electionResultOutputIter.hasNext)
val secondLine = electionResultOutputIter.next()
assertTrue(secondLine.contains(s"Valid replica already elected for partitions $topicPartition1"),
s"Unexpected output: $secondLine")
}
}

View File

@ -19,43 +19,40 @@ package kafka.server
import java.net.InetAddress
import java.util
import java.util.Collections.singletonList
import java.util.Properties
import java.util.concurrent.CompletableFuture
import java.util.concurrent.ExecutionException
import java.util.concurrent.{CompletableFuture, ExecutionException}
import kafka.network.RequestChannel
import kafka.raft.RaftManager
import kafka.server.QuotaFactory.QuotaManagers
import kafka.test.MockController
import kafka.utils.MockTime
import kafka.utils.{MockTime, NotNothing}
import org.apache.kafka.clients.admin.AlterConfigOp
import org.apache.kafka.common.Uuid
import org.apache.kafka.common.Uuid.ZERO_UUID
import org.apache.kafka.common.acl.AclOperation
import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
import org.apache.kafka.common.errors._
import org.apache.kafka.common.memory.MemoryPool
import org.apache.kafka.common.message.AlterConfigsRequestData.{AlterConfigsResource => OldAlterConfigsResource, AlterConfigsResourceCollection => OldAlterConfigsResourceCollection, AlterableConfig => OldAlterableConfig, AlterableConfigCollection => OldAlterableConfigCollection}
import org.apache.kafka.common.message.AlterConfigsResponseData.{AlterConfigsResourceResponse => OldAlterConfigsResourceResponse}
import org.apache.kafka.common.message.ApiMessageType.ListenerType
import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsTopic
import org.apache.kafka.common.message.CreatePartitionsResponseData.CreatePartitionsTopicResult
import org.apache.kafka.common.message.CreateTopicsRequestData
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection
import org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic, CreatableTopicCollection}
import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult
import org.apache.kafka.common.message.DeleteTopicsRequestData.DeleteTopicState
import org.apache.kafka.common.message.DeleteTopicsResponseData.DeletableTopicResult
import org.apache.kafka.common.message._
import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.{AlterConfigsResource, AlterConfigsResourceCollection, AlterableConfig, AlterableConfigCollection}
import org.apache.kafka.common.message.AlterConfigsRequestData.{AlterConfigsResourceCollection => OldAlterConfigsResourceCollection}
import org.apache.kafka.common.message.AlterConfigsRequestData.{AlterConfigsResource => OldAlterConfigsResource}
import org.apache.kafka.common.message.AlterConfigsRequestData.{AlterableConfigCollection => OldAlterableConfigCollection}
import org.apache.kafka.common.message.AlterConfigsRequestData.{AlterableConfig => OldAlterableConfig}
import org.apache.kafka.common.message.AlterConfigsResponseData.{AlterConfigsResourceResponse => OldAlterConfigsResourceResponse}
import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.AlterConfigsResourceResponse
import org.apache.kafka.common.message.{CreateTopicsRequestData, _}
import org.apache.kafka.common.network.{ClientInformation, ListenerName}
import org.apache.kafka.common.protocol.ApiKeys
import org.apache.kafka.common.protocol.ApiMessage
import org.apache.kafka.common.protocol.Errors._
import org.apache.kafka.common.protocol.{ApiKeys, ApiMessage, Errors}
import org.apache.kafka.common.requests._
import org.apache.kafka.common.resource.{PatternType, Resource, ResourcePattern, ResourceType}
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.common.{ElectionType, Uuid}
import org.apache.kafka.controller.Controller
import org.apache.kafka.server.authorizer.{Action, AuthorizableRequestContext, AuthorizationResult, Authorizer}
import org.apache.kafka.server.common.ApiMessageAndVersion
@ -65,7 +62,9 @@ import org.mockito.ArgumentMatchers._
import org.mockito.Mockito._
import org.mockito.{ArgumentCaptor, ArgumentMatchers}
import scala.annotation.nowarn
import scala.jdk.CollectionConverters._
import scala.reflect.ClassTag
class ControllerApisTest {
private val nodeId = 1
@ -136,12 +135,11 @@ class ControllerApisTest {
def createDenyAllAuthorizer(): Authorizer = {
val authorizer = mock(classOf[Authorizer])
mock(classOf[Authorizer])
when(authorizer.authorize(
any(classOf[AuthorizableRequestContext]),
any(classOf[java.util.List[Action]])
)).thenReturn(
java.util.Collections.singletonList(AuthorizationResult.DENIED)
singletonList(AuthorizationResult.DENIED)
)
authorizer
}
@ -732,6 +730,79 @@ class ControllerApisTest {
controllerApis.createPartitions(request, false, _ => Set("foo", "bar")).get().asScala.toSet)
}
@Test
def testElectLeadersAuthorization(): Unit = {
val authorizer = mock(classOf[Authorizer])
val controller = mock(classOf[Controller])
val controllerApis = createControllerApis(Some(authorizer), controller)
val request = new ElectLeadersRequest.Builder(
ElectionType.PREFERRED,
null,
30000
).build()
val resource = new ResourcePattern(ResourceType.CLUSTER, Resource.CLUSTER_NAME, PatternType.LITERAL)
val actions = singletonList(new Action(AclOperation.ALTER, resource, 1, true, true))
when(authorizer.authorize(
any[RequestContext],
ArgumentMatchers.eq(actions)
)).thenReturn(singletonList(AuthorizationResult.DENIED))
val response = handleRequest[ElectLeadersResponse](request, controllerApis)
assertEquals(Errors.CLUSTER_AUTHORIZATION_FAILED, Errors.forCode(response.data.errorCode))
}
@Test
def testElectLeadersHandledByController(): Unit = {
val controller = mock(classOf[Controller])
val controllerApis = createControllerApis(None, controller)
val request = new ElectLeadersRequest.Builder(
ElectionType.PREFERRED,
null,
30000
).build()
val responseData = new ElectLeadersResponseData()
.setErrorCode(Errors.NOT_CONTROLLER.code)
when(controller.electLeaders(
request.data
)).thenReturn(CompletableFuture.completedFuture(responseData))
val response = handleRequest[ElectLeadersResponse](request, controllerApis)
assertEquals(Errors.NOT_CONTROLLER, Errors.forCode(response.data.errorCode))
}
private def handleRequest[T <: AbstractResponse](
request: AbstractRequest,
controllerApis: ControllerApis
)(
implicit classTag: ClassTag[T],
@nowarn("cat=unused") nn: NotNothing[T]
): T = {
val req = buildRequest(request)
controllerApis.handle(req, RequestLocal.NoCaching)
val capturedResponse: ArgumentCaptor[AbstractResponse] =
ArgumentCaptor.forClass(classOf[AbstractResponse])
verify(requestChannel).sendResponse(
ArgumentMatchers.eq(req),
capturedResponse.capture(),
ArgumentMatchers.eq(None)
)
capturedResponse.getValue match {
case response: T => response
case response =>
throw new ClassCastException(s"Expected response with type ${classTag.runtimeClass}, " +
s"but found ${response.getClass}")
}
}
@AfterEach
def tearDown(): Unit = {
quotas.shutdown()

View File

@ -23,6 +23,7 @@ import java.util
import java.util.Arrays.asList
import java.util.concurrent.TimeUnit
import java.util.{Collections, Optional, Properties, Random}
import kafka.api.{ApiVersion, KAFKA_0_10_2_IV0, KAFKA_2_2_IV1, LeaderAndIsr}
import kafka.cluster.{Broker, Partition}
import kafka.controller.{ControllerContext, KafkaController}
@ -68,7 +69,7 @@ import org.apache.kafka.common.requests.{FetchMetadata => JFetchMetadata, _}
import org.apache.kafka.common.resource.{PatternType, Resource, ResourcePattern, ResourceType}
import org.apache.kafka.common.security.auth.{KafkaPrincipal, KafkaPrincipalSerde, SecurityProtocol}
import org.apache.kafka.common.utils.{ProducerIdAndEpoch, SecurityUtils, Utils}
import org.apache.kafka.common.{IsolationLevel, Node, TopicPartition, Uuid}
import org.apache.kafka.common.{ElectionType, IsolationLevel, Node, TopicPartition, Uuid}
import org.apache.kafka.server.authorizer.{Action, AuthorizationResult, Authorizer}
import org.easymock.EasyMock._
import org.easymock.{Capture, EasyMock, IAnswer}
@ -468,6 +469,12 @@ class KafkaApisTest {
testForwardableApi(ApiKeys.ALTER_CONFIGS, requestBuilder)
}
@Test
def testElectLeadersForwarding(): Unit = {
val requestBuilder = new ElectLeadersRequest.Builder(ElectionType.PREFERRED, null, 30000)
testKraftForwarding(ApiKeys.ELECT_LEADERS, requestBuilder)
}
@Test
def testDescribeQuorumNotAllowedForZkClusters(): Unit = {
val requestData = DescribeQuorumRequest.singletonRequest(KafkaRaftServer.MetadataPartition)
@ -495,6 +502,18 @@ class KafkaApisTest {
)
}
private def testKraftForwarding(
apiKey: ApiKeys,
requestBuilder: AbstractRequest.Builder[_ <: AbstractRequest]
): Unit = {
metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
testForwardableApi(
createKafkaApis(enableForwarding = true, raftSupport = true),
apiKey,
requestBuilder
)
}
private def testForwardableApi(apiKey: ApiKeys, requestBuilder: AbstractRequest.Builder[_ <: AbstractRequest]): Unit = {
testForwardableApi(
createKafkaApis(enableForwarding = true),
@ -511,7 +530,8 @@ class KafkaApisTest {
val topicHeader = new RequestHeader(apiKey, apiKey.latestVersion,
clientId, 0)
val request = buildRequest(requestBuilder.build(topicHeader.apiVersion))
val apiRequest = requestBuilder.build(topicHeader.apiVersion)
val request = buildRequest(apiRequest)
if (kafkaApis.metadataSupport.isInstanceOf[ZkSupport]) {
// The controller check only makes sense for ZK clusters. For KRaft,
@ -520,18 +540,28 @@ class KafkaApisTest {
EasyMock.expect(controller.isActive).andReturn(false)
}
expectNoThrottling(request)
val capturedResponse = expectNoThrottling(request)
val forwardCallback: Capture[Option[AbstractResponse] => Unit] = EasyMock.newCapture()
EasyMock.expect(forwardingManager.forwardRequest(
EasyMock.eq(request),
anyObject[Option[AbstractResponse] => Unit]()
EasyMock.capture(forwardCallback)
)).once()
EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, controller, forwardingManager)
kafkaApis.handle(request, RequestLocal.withThreadConfinedCaching)
assertNotNull(request.buffer, "The buffer was unexpectedly deallocated after " +
s"`handle` returned (is $apiKey marked as forwardable in `ApiKeys`?)")
EasyMock.verify(controller, forwardingManager)
val expectedResponse = apiRequest.getErrorResponse(Errors.NOT_CONTROLLER.exception)
assertTrue(forwardCallback.hasCaptured)
forwardCallback.getValue.apply(Some(expectedResponse))
assertTrue(capturedResponse.hasCaptured)
assertEquals(expectedResponse, capturedResponse.getValue)
EasyMock.verify(controller, requestChannel, forwardingManager)
}
private def authorizeResource(authorizer: Authorizer,
@ -3980,13 +4010,13 @@ class KafkaApisTest {
request
}
private def verifyShouldNeverHandle(handler: RequestChannel.Request => Unit): Unit = {
private def verifyShouldNeverHandleErrorMessage(handler: RequestChannel.Request => Unit): Unit = {
val request = createMockRequest()
val e = assertThrows(classOf[UnsupportedVersionException], () => handler(request))
assertEquals(KafkaApis.shouldNeverReceive(request).getMessage, e.getMessage)
}
private def verifyShouldAlwaysForward(handler: RequestChannel.Request => Unit): Unit = {
private def verifyShouldAlwaysForwardErrorMessage(handler: RequestChannel.Request => Unit): Unit = {
val request = createMockRequest()
val e = assertThrows(classOf[UnsupportedVersionException], () => handler(request))
assertEquals(KafkaApis.shouldAlwaysForward(request).getMessage, e.getMessage)
@ -3995,126 +4025,132 @@ class KafkaApisTest {
@Test
def testRaftShouldNeverHandleLeaderAndIsrRequest(): Unit = {
metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
verifyShouldNeverHandle(createKafkaApis(raftSupport = true).handleLeaderAndIsrRequest)
verifyShouldNeverHandleErrorMessage(createKafkaApis(raftSupport = true).handleLeaderAndIsrRequest)
}
@Test
def testRaftShouldNeverHandleStopReplicaRequest(): Unit = {
metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
verifyShouldNeverHandle(createKafkaApis(raftSupport = true).handleStopReplicaRequest)
verifyShouldNeverHandleErrorMessage(createKafkaApis(raftSupport = true).handleStopReplicaRequest)
}
@Test
def testRaftShouldNeverHandleUpdateMetadataRequest(): Unit = {
metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
verifyShouldNeverHandle(createKafkaApis(raftSupport = true).handleUpdateMetadataRequest(_, RequestLocal.withThreadConfinedCaching))
verifyShouldNeverHandleErrorMessage(createKafkaApis(raftSupport = true).handleUpdateMetadataRequest(_, RequestLocal.withThreadConfinedCaching))
}
@Test
def testRaftShouldNeverHandleControlledShutdownRequest(): Unit = {
metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
verifyShouldNeverHandle(createKafkaApis(raftSupport = true).handleControlledShutdownRequest)
verifyShouldNeverHandleErrorMessage(createKafkaApis(raftSupport = true).handleControlledShutdownRequest)
}
@Test
def testRaftShouldNeverHandleAlterIsrRequest(): Unit = {
metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
verifyShouldNeverHandle(createKafkaApis(raftSupport = true).handleAlterIsrRequest)
verifyShouldNeverHandleErrorMessage(createKafkaApis(raftSupport = true).handleAlterIsrRequest)
}
@Test
def testRaftShouldNeverHandleEnvelope(): Unit = {
metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
verifyShouldNeverHandle(createKafkaApis(raftSupport = true).handleEnvelope(_, RequestLocal.withThreadConfinedCaching))
verifyShouldNeverHandleErrorMessage(createKafkaApis(raftSupport = true).handleEnvelope(_, RequestLocal.withThreadConfinedCaching))
}
@Test
def testRaftShouldAlwaysForwardCreateTopicsRequest(): Unit = {
metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleCreateTopicsRequest)
verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleCreateTopicsRequest)
}
@Test
def testRaftShouldAlwaysForwardCreatePartitionsRequest(): Unit = {
metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleCreatePartitionsRequest)
verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleCreatePartitionsRequest)
}
@Test
def testRaftShouldAlwaysForwardDeleteTopicsRequest(): Unit = {
metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleDeleteTopicsRequest)
verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleDeleteTopicsRequest)
}
@Test
def testRaftShouldAlwaysForwardCreateAcls(): Unit = {
metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleCreateAcls)
verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleCreateAcls)
}
@Test
def testRaftShouldAlwaysForwardDeleteAcls(): Unit = {
metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleDeleteAcls)
verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleDeleteAcls)
}
@Test
def testRaftShouldAlwaysForwardAlterConfigsRequest(): Unit = {
metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleAlterConfigsRequest)
verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleAlterConfigsRequest)
}
@Test
def testRaftShouldAlwaysForwardAlterPartitionReassignmentsRequest(): Unit = {
metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleAlterPartitionReassignmentsRequest)
verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleAlterPartitionReassignmentsRequest)
}
@Test
def testRaftShouldAlwaysForwardIncrementalAlterConfigsRequest(): Unit = {
metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleIncrementalAlterConfigsRequest)
verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleIncrementalAlterConfigsRequest)
}
@Test
def testRaftShouldAlwaysForwardCreateTokenRequest(): Unit = {
metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleCreateTokenRequest)
verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleCreateTokenRequest)
}
@Test
def testRaftShouldAlwaysForwardRenewTokenRequest(): Unit = {
metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleRenewTokenRequest)
verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleRenewTokenRequest)
}
@Test
def testRaftShouldAlwaysForwardExpireTokenRequest(): Unit = {
metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleExpireTokenRequest)
verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleExpireTokenRequest)
}
@Test
def testRaftShouldAlwaysForwardAlterClientQuotasRequest(): Unit = {
metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleAlterClientQuotasRequest)
verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleAlterClientQuotasRequest)
}
@Test
def testRaftShouldAlwaysForwardAlterUserScramCredentialsRequest(): Unit = {
metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleAlterUserScramCredentialsRequest)
verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleAlterUserScramCredentialsRequest)
}
@Test
def testRaftShouldAlwaysForwardUpdateFeatures(): Unit = {
metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleUpdateFeatures)
verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleUpdateFeatures)
}
@Test
def testRaftShouldAlwaysForwardElectLeaders(): Unit = {
metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleElectLeaders)
}
@Test
def testRaftShouldAlwaysForwardListPartitionReassignments(): Unit = {
metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleListPartitionReassignmentsRequest)
verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleListPartitionReassignmentsRequest)
}
}

View File

@ -17,6 +17,7 @@
package kafka.utils
import java.io._
import java.net.InetAddress
import java.nio._
import java.nio.channels._
import java.nio.charset.{Charset, StandardCharsets}
@ -26,12 +27,12 @@ import java.time.Duration
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
import java.util.concurrent.{Callable, ExecutionException, Executors, TimeUnit}
import java.util.{Arrays, Collections, Optional, Properties}
import com.yammer.metrics.core.Meter
import com.yammer.metrics.core.Meter
import javax.net.ssl.X509TrustManager
import kafka.api._
import kafka.cluster.{Broker, EndPoint, IsrChangeListener}
import kafka.controller.LeaderIsrAndControllerEpoch
import kafka.controller.{ControllerEventManager, LeaderIsrAndControllerEpoch}
import kafka.log._
import kafka.metrics.KafkaYammerMetrics
import kafka.network.RequestChannel
@ -44,6 +45,7 @@ import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.admin.AlterConfigOp.OpType
import org.apache.kafka.clients.admin._
import org.apache.kafka.clients.consumer._
import org.apache.kafka.clients.consumer.internals.AbstractCoordinator
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.acl.{AccessControlEntry, AccessControlEntryFilter, AclBinding, AclBindingFilter}
import org.apache.kafka.common.config.ConfigResource
@ -53,9 +55,9 @@ import org.apache.kafka.common.header.Header
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.memory.MemoryPool
import org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataPartitionState
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.{ClientInformation, ListenerName, Mode}
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity}
import org.apache.kafka.common.record._
import org.apache.kafka.common.requests.{AbstractRequest, EnvelopeRequest, RequestContext, RequestHeader}
@ -73,14 +75,13 @@ import org.apache.zookeeper.data.ACL
import org.junit.jupiter.api.Assertions._
import org.mockito.Mockito
import java.net.InetAddress
import scala.annotation.nowarn
import scala.collection.mutable.{ArrayBuffer, ListBuffer}
import scala.collection.{Map, Seq, mutable}
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.jdk.CollectionConverters._
import scala.util.{Failure, Success, Try}
/**
* Utility functions to help with testing
@ -1625,19 +1626,37 @@ object TestUtils extends Logging {
waitForLeaderToBecome(client, topicPartition, None)
}
def waitForLeaderToBecome(client: Admin, topicPartition: TopicPartition, leader: Option[Int]): Unit = {
val topic = topicPartition.topic
val partition = topicPartition.partition
def waitForOnlineBroker(client: Admin, brokerId: Int): Unit = {
waitUntilTrue(() => {
try {
val topicResult = client.describeTopics(Arrays.asList(topic)).allTopicNames.get.get(topic)
val partitionResult = topicResult.partitions.get(partition)
Option(partitionResult.leader).map(_.id) == leader
} catch {
case e: ExecutionException if e.getCause.isInstanceOf[UnknownTopicOrPartitionException] => false
}
}, "Timed out waiting for leader metadata")
val nodes = client.describeCluster().nodes().get()
nodes.asScala.exists(_.id == brokerId)
}, s"Timed out waiting for brokerId $brokerId to come online")
}
def waitForLeaderToBecome(
client: Admin,
topicPartition: TopicPartition,
expectedLeaderOpt: Option[Int]
): Unit = {
val topic = topicPartition.topic
val partitionId = topicPartition.partition
def currentLeader: Try[Option[Int]] = Try {
val topicDescription = client.describeTopics(List(topic).asJava).allTopicNames.get.get(topic)
topicDescription.partitions.asScala
.find(_.partition == partitionId)
.flatMap(partitionState => Option(partitionState.leader))
.map(_.id)
}
val (lastLeaderCheck, isLeaderElected) = computeUntilTrue(currentLeader) {
case Success(leaderOpt) => leaderOpt == expectedLeaderOpt
case Failure(e: ExecutionException) if e.getCause.isInstanceOf[UnknownTopicOrPartitionException] => false
case Failure(e) => throw e
}
assertTrue(isLeaderElected, s"Timed out waiting for leader to become $expectedLeaderOpt. " +
s"Last metadata lookup returned leader = ${lastLeaderCheck.getOrElse("unknown")}")
}
def waitForBrokersOutOfIsr(client: Admin, partition: Set[TopicPartition], brokerIds: Set[Int]): Unit = {
@ -1652,7 +1671,7 @@ object TestUtils extends Logging {
brokerIds.intersect(isr).isEmpty
},
s"Expected brokers $brokerIds to no longer in the ISR for $partition"
s"Expected brokers $brokerIds to no longer be in the ISR for $partition"
)
}
@ -1917,4 +1936,28 @@ object TestUtils extends Logging {
)
}
def verifyNoUnexpectedThreads(context: String): Unit = {
// Threads which may cause transient failures in subsequent tests if not shutdown.
// These include threads which make connections to brokers and may cause issues
// when broker ports are reused (e.g. auto-create topics) as well as threads
// which reset static JAAS configuration.
val unexpectedThreadNames = Set(
ControllerEventManager.ControllerEventThreadName,
KafkaProducer.NETWORK_THREAD_PREFIX,
AdminClientUnitTestEnv.kafkaAdminClientNetworkThreadPrefix(),
AbstractCoordinator.HEARTBEAT_THREAD_PREFIX,
ZooKeeperTestHarness.ZkClientEventThreadSuffix
)
def unexpectedThreads: Set[String] = {
val allThreads = Thread.getAllStackTraces.keySet.asScala.map(thread => thread.getName)
allThreads.filter(t => unexpectedThreadNames.exists(s => t.contains(s))).toSet
}
val (unexpected, _) = TestUtils.computeUntilTrue(unexpectedThreads)(_.isEmpty)
assertTrue(unexpected.isEmpty,
s"Found ${unexpected.size} unexpected threads during $context: " +
s"${unexpected.mkString("`", ",", "`")}")
}
}

View File

@ -19,19 +19,12 @@ package kafka.zk
import javax.security.auth.login.Configuration
import kafka.utils.{CoreUtils, Logging, TestUtils}
import org.junit.jupiter.api.{AfterAll, AfterEach, BeforeAll, BeforeEach, Tag}
import org.junit.jupiter.api.Assertions._
import org.apache.kafka.common.security.JaasUtils
import scala.collection.Set
import scala.jdk.CollectionConverters._
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.consumer.internals.AbstractCoordinator
import kafka.controller.ControllerEventManager
import org.apache.kafka.clients.admin.AdminClientUnitTestEnv
import org.apache.kafka.common.utils.Time
import org.apache.zookeeper.client.ZKClientConfig
import org.apache.zookeeper.{WatchedEvent, Watcher, ZooKeeper}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterAll, AfterEach, BeforeAll, BeforeEach, Tag}
@Tag("integration")
abstract class ZooKeeperTestHarness extends Logging {
@ -87,16 +80,6 @@ abstract class ZooKeeperTestHarness extends Logging {
object ZooKeeperTestHarness {
val ZkClientEventThreadSuffix = "-EventThread"
// Threads which may cause transient failures in subsequent tests if not shutdown.
// These include threads which make connections to brokers and may cause issues
// when broker ports are reused (e.g. auto-create topics) as well as threads
// which reset static JAAS configuration.
val unexpectedThreadNames = Set(ControllerEventManager.ControllerEventThreadName,
KafkaProducer.NETWORK_THREAD_PREFIX,
AdminClientUnitTestEnv.kafkaAdminClientNetworkThreadPrefix(),
AbstractCoordinator.HEARTBEAT_THREAD_PREFIX,
ZkClientEventThreadSuffix)
/**
* Verify that a previous test that doesn't use ZooKeeperTestHarness hasn't left behind an unexpected thread.
* This assumes that brokers, ZooKeeper clients, producers and consumers are not created in another @BeforeClass,
@ -104,7 +87,7 @@ object ZooKeeperTestHarness {
*/
@BeforeAll
def setUpClass(): Unit = {
verifyNoUnexpectedThreads("@BeforeClass")
TestUtils.verifyNoUnexpectedThreads("@BeforeAll")
}
/**
@ -112,19 +95,7 @@ object ZooKeeperTestHarness {
*/
@AfterAll
def tearDownClass(): Unit = {
verifyNoUnexpectedThreads("@AfterClass")
TestUtils.verifyNoUnexpectedThreads("@AfterAll")
}
/**
* Verifies that threads which are known to cause transient failures in subsequent tests
* have been shutdown.
*/
def verifyNoUnexpectedThreads(context: String): Unit = {
def allThreads = Thread.getAllStackTraces.keySet.asScala.map(thread => thread.getName)
val (threads, noUnexpected) = TestUtils.computeUntilTrue(allThreads) { threads =>
threads.forall(t => unexpectedThreadNames.forall(s => !t.contains(s)))
}
assertTrue(noUnexpected, s"Found unexpected threads during $context, allThreads=$threads, " +
s"unexpected=${threads.filterNot(t => unexpectedThreadNames.forall(s => !t.contains(s)))}")
}
}

View File

@ -25,6 +25,7 @@ import scala.collection.Seq
import com.yammer.metrics.core.{Gauge, Meter, MetricName}
import kafka.server.KafkaConfig
import kafka.metrics.KafkaYammerMetrics
import kafka.utils.TestUtils
import kafka.zk.ZooKeeperTestHarness
import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.common.utils.Time
@ -33,7 +34,7 @@ import org.apache.zookeeper.Watcher.Event.{EventType, KeeperState}
import org.apache.zookeeper.ZooKeeper.States
import org.apache.zookeeper.client.ZKClientConfig
import org.apache.zookeeper.{CreateMode, WatchedEvent, ZooDefs}
import org.junit.jupiter.api.Assertions.{assertArrayEquals, assertEquals, assertFalse, assertTrue, assertThrows, fail}
import org.junit.jupiter.api.Assertions.{assertArrayEquals, assertEquals, assertFalse, assertThrows, assertTrue, fail}
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import scala.jdk.CollectionConverters._
@ -46,7 +47,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
@BeforeEach
override def setUp(): Unit = {
ZooKeeperTestHarness.verifyNoUnexpectedThreads("@BeforeEach")
TestUtils.verifyNoUnexpectedThreads("@BeforeEach")
cleanMetricsRegistry()
super.setUp()
zooKeeperClient = newZooKeeperClient()
@ -58,7 +59,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
zooKeeperClient.close()
super.tearDown()
System.clearProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)
ZooKeeperTestHarness.verifyNoUnexpectedThreads("@AfterEach")
TestUtils.verifyNoUnexpectedThreads("@AfterEach")
}
@Test

View File

@ -17,8 +17,6 @@
package org.apache.kafka.controller;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.kafka.common.Endpoint;
import org.apache.kafka.common.errors.DuplicateBrokerRegistrationException;
import org.apache.kafka.common.errors.StaleBrokerEpochException;
@ -35,11 +33,11 @@ import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.metadata.BrokerRegistration;
import org.apache.kafka.metadata.BrokerRegistrationReply;
import org.apache.kafka.metadata.FeatureMapAndEpoch;
import org.apache.kafka.metadata.VersionRange;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.apache.kafka.timeline.TimelineHashMap;
import org.slf4j.Logger;
@ -52,7 +50,9 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import static org.apache.kafka.common.metadata.MetadataRecordType.REGISTER_BROKER_RECORD;

View File

@ -22,6 +22,8 @@ import org.apache.kafka.common.metadata.PartitionChangeRecord;
import org.apache.kafka.metadata.PartitionRegistration;
import org.apache.kafka.metadata.Replicas;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
@ -38,6 +40,8 @@ import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER_CHANGE;
* PartitionChangeBuilder handles changing partition registrations.
*/
public class PartitionChangeBuilder {
private static final Logger log = LoggerFactory.getLogger(PartitionChangeBuilder.class);
public static boolean changeRecordIsNoOp(PartitionChangeRecord record) {
if (record.isr() != null) return false;
if (record.leader() != NO_LEADER_CHANGE) return false;
@ -141,12 +145,15 @@ public class PartitionChangeBuilder {
private void tryElection(PartitionChangeRecord record) {
BestLeader bestLeader = new BestLeader();
if (bestLeader.node != partition.leader) {
log.debug("Setting new leader for topicId {}, partition {} to {}", topicId, partitionId, bestLeader.node);
record.setLeader(bestLeader.node);
if (bestLeader.unclean) {
// If the election was unclean, we have to forcibly set the ISR to just the
// new leader. This can result in data loss!
record.setIsr(Collections.singletonList(bestLeader.node));
}
} else {
log.debug("Failed to find a new leader with current state: {}", this);
}
}
@ -240,4 +247,20 @@ public class PartitionChangeBuilder {
PARTITION_CHANGE_RECORD.highestSupportedVersion()));
}
}
@Override
public String toString() {
return "PartitionChangeBuilder(" +
"partition=" + partition +
", topicId=" + topicId +
", partitionId=" + partitionId +
", isAcceptableLeader=" + isAcceptableLeader +
", uncleanElectionOk=" + uncleanElectionOk +
", targetIsr=" + targetIsr +
", targetReplicas=" + targetReplicas +
", targetRemoving=" + targetRemoving +
", targetAdding=" + targetAdding +
", alwaysElectPreferredIfPossible=" + alwaysElectPreferredIfPossible +
')';
}
}

View File

@ -132,6 +132,14 @@ public class ReplicationControlManager {
this.id = id;
this.parts = new TimelineHashMap<>(snapshotRegistry, 0);
}
public String name() {
return name;
}
public Uuid topicId() {
return id;
}
}
private final SnapshotRegistry snapshotRegistry;
@ -586,6 +594,11 @@ public class ReplicationControlManager {
return topic.parts.get(partitionId);
}
// VisibleForTesting
TopicControlInfo getTopic(Uuid topicId) {
return topics.get(topicId);
}
// VisibleForTesting
BrokersToIsrs brokersToIsrs() {
return brokersToIsrs;
@ -787,7 +800,7 @@ public class ReplicationControlManager {
}
ControllerResult<ElectLeadersResponseData> electLeaders(ElectLeadersRequestData request) {
boolean uncleanOk = electionTypeIsUnclean(request.electionType());
ElectionType electionType = electionType(request.electionType());
List<ApiMessageAndVersion> records = new ArrayList<>();
ElectLeadersResponseData response = new ElectLeadersResponseData();
if (request.topicPartitions() == null) {
@ -804,11 +817,16 @@ public class ReplicationControlManager {
TopicControlInfo topic = topics.get(topicEntry.getValue());
if (topic != null) {
for (int partitionId : topic.parts.keySet()) {
ApiError error = electLeader(topicName, partitionId, uncleanOk, records);
topicResults.partitionResult().add(new PartitionResult().
setPartitionId(partitionId).
setErrorCode(error.error().code()).
setErrorMessage(error.message()));
ApiError error = electLeader(topicName, partitionId, electionType, records);
// When electing leaders for all partitions, we do not return
// partitions which already have the desired leader.
if (error.error() != Errors.ELECTION_NOT_NEEDED) {
topicResults.partitionResult().add(new PartitionResult().
setPartitionId(partitionId).
setErrorCode(error.error().code()).
setErrorMessage(error.message()));
}
}
}
}
@ -818,7 +836,7 @@ public class ReplicationControlManager {
new ReplicaElectionResult().setTopic(topic.topic());
response.replicaElectionResults().add(topicResults);
for (int partitionId : topic.partitions()) {
ApiError error = electLeader(topic.topic(), partitionId, uncleanOk, records);
ApiError error = electLeader(topic.topic(), partitionId, electionType, records);
topicResults.partitionResult().add(new PartitionResult().
setPartitionId(partitionId).
setErrorCode(error.error().code()).
@ -829,17 +847,15 @@ public class ReplicationControlManager {
return ControllerResult.of(records, response);
}
static boolean electionTypeIsUnclean(byte electionType) {
ElectionType type;
private static ElectionType electionType(byte electionType) {
try {
type = ElectionType.valueOf(electionType);
return ElectionType.valueOf(electionType);
} catch (IllegalArgumentException e) {
throw new InvalidRequestException("Unknown election type " + (int) electionType);
}
return type == ElectionType.UNCLEAN;
}
ApiError electLeader(String topic, int partitionId, boolean uncleanOk,
ApiError electLeader(String topic, int partitionId, ElectionType electionType,
List<ApiMessageAndVersion> records) {
Uuid topicId = topicsByName.get(topic);
if (topicId == null) {
@ -856,21 +872,24 @@ public class ReplicationControlManager {
return new ApiError(UNKNOWN_TOPIC_OR_PARTITION,
"No such partition as " + topic + "-" + partitionId);
}
if ((electionType == ElectionType.PREFERRED && partition.hasPreferredLeader())
|| (electionType == ElectionType.UNCLEAN && partition.hasLeader())) {
return new ApiError(Errors.ELECTION_NOT_NEEDED);
}
PartitionChangeBuilder builder = new PartitionChangeBuilder(partition,
topicId,
partitionId,
r -> clusterControl.unfenced(r),
() -> uncleanOk || configurationControl.uncleanLeaderElectionEnabledForTopic(topic));
builder.setAlwaysElectPreferredIfPossible(true);
() -> electionType == ElectionType.UNCLEAN);
builder.setAlwaysElectPreferredIfPossible(electionType == ElectionType.PREFERRED);
Optional<ApiMessageAndVersion> record = builder.build();
if (!record.isPresent()) {
if (partition.leader == NO_LEADER) {
// If we can't find any leader for the partition, return an error.
return new ApiError(Errors.LEADER_NOT_AVAILABLE,
"Unable to find any leader for the partition.");
if (electionType == ElectionType.PREFERRED) {
return new ApiError(Errors.PREFERRED_LEADER_NOT_AVAILABLE);
} else {
// There is nothing to do.
return ApiError.NONE;
return new ApiError(Errors.ELIGIBLE_LEADERS_NOT_AVAILABLE);
}
}
records.add(record.get());

View File

@ -81,4 +81,11 @@ public final class ClientQuotasDelta {
}
return new ClientQuotasImage(newEntities);
}
@Override
public String toString() {
return "ClientQuotasDelta(" +
"changes=" + changes +
')';
}
}

View File

@ -126,4 +126,11 @@ public final class ClusterDelta {
}
return new ClusterImage(newBrokers);
}
@Override
public String toString() {
return "ClusterDelta(" +
"changedBrokers=" + changedBrokers +
')';
}
}

View File

@ -74,4 +74,13 @@ public final class ConfigurationDelta {
}
return new ConfigurationImage(newData);
}
@Override
public String toString() {
// Values are intentionally left out of this so that sensitive configs
// do not end up in logging by mistake.
return "ConfigurationDelta(" +
"changedKeys=" + changes.keySet() +
')';
}
}

View File

@ -96,4 +96,11 @@ public final class ConfigurationsDelta {
}
return new ConfigurationsImage(newData);
}
@Override
public String toString() {
return "ConfigurationsDelta(" +
"changes=" + changes +
')';
}
}

View File

@ -83,4 +83,11 @@ public final class FeaturesDelta {
}
return new FeaturesImage(newFinalizedVersions);
}
@Override
public String toString() {
return "FeaturesDelta(" +
"changes=" + changes +
')';
}
}

View File

@ -256,4 +256,15 @@ public final class MetadataDelta {
return new MetadataImage(newFeatures, newCluster, newTopics, newConfigs,
newClientQuotas);
}
@Override
public String toString() {
return "MetadataDelta(" +
"featuresDelta=" + featuresDelta +
", clusterDelta=" + clusterDelta +
", topicsDelta=" + topicsDelta +
", configsDelta=" + configsDelta +
", clientQuotasDelta=" + clientQuotasDelta +
')';
}
}

View File

@ -138,4 +138,11 @@ public final class TopicDelta {
return new LocalReplicaChanges(deletes, leaders, followers);
}
@Override
public String toString() {
return "TopicDelta(" +
"partitionChanges=" + partitionChanges +
')';
}
}

View File

@ -201,4 +201,12 @@ public final class TopicsDelta {
return new LocalReplicaChanges(deletes, leaders, followers);
}
@Override
public String toString() {
return "TopicsDelta(" +
"changedTopics=" + changedTopics +
", deletedTopicIds=" + deletedTopicIds +
')';
}
}

View File

@ -107,6 +107,14 @@ public class PartitionChangeBuilderTest {
shouldTryElection());
assertFalse(createFooBuilder(false).setTargetIsr(Arrays.asList(2, 1)).
shouldTryElection());
assertTrue(createFooBuilder(true)
.setTargetIsr(Arrays.asList(3))
.shouldTryElection());
assertTrue(createFooBuilder(true)
.setTargetIsr(Arrays.asList(4))
.setTargetReplicas(Arrays.asList(2, 1, 3, 4))
.shouldTryElection());
}
private static void testTriggerLeaderEpochBumpIfNeededLeader(PartitionChangeBuilder builder,

View File

@ -18,6 +18,7 @@
package org.apache.kafka.controller;
import java.util.Optional;
import org.apache.kafka.common.ElectionType;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.config.ConfigResource;
@ -37,12 +38,12 @@ import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsAssignment;
import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsTopic;
import org.apache.kafka.common.message.CreatePartitionsResponseData.CreatePartitionsTopicResult;
import org.apache.kafka.common.message.CreateTopicsRequestData;
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableReplicaAssignment;
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection;
import org.apache.kafka.common.message.CreateTopicsRequestData;
import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
import org.apache.kafka.common.message.CreateTopicsResponseData;
import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
import org.apache.kafka.common.message.ElectLeadersRequestData;
import org.apache.kafka.common.message.ElectLeadersRequestData.TopicPartitions;
import org.apache.kafka.common.message.ElectLeadersRequestData.TopicPartitionsCollection;
@ -50,9 +51,10 @@ import org.apache.kafka.common.message.ElectLeadersResponseData;
import org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult;
import org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult;
import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData.ListPartitionReassignmentsTopics;
import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData;
import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData.OngoingPartitionReassignment;
import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData.OngoingTopicReassignment;
import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData;
import org.apache.kafka.common.metadata.ConfigRecord;
import org.apache.kafka.common.metadata.PartitionChangeRecord;
import org.apache.kafka.common.metadata.PartitionRecord;
import org.apache.kafka.common.metadata.RegisterBrokerRecord;
@ -63,20 +65,21 @@ import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.metadata.BrokerHeartbeatReply;
import org.apache.kafka.metadata.BrokerRegistration;
import org.apache.kafka.metadata.PartitionRegistration;
import org.apache.kafka.metadata.RecordTestUtils;
import org.apache.kafka.metadata.Replicas;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.metadata.BrokerHeartbeatReply;
import org.apache.kafka.metadata.BrokerRegistration;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@ -84,18 +87,26 @@ import java.util.List;
import java.util.Map;
import java.util.OptionalInt;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static java.util.Arrays.asList;
import static java.util.Collections.singletonList;
import static java.util.Collections.singletonMap;
import static org.apache.kafka.common.protocol.Errors.ELECTION_NOT_NEEDED;
import static org.apache.kafka.common.protocol.Errors.ELIGIBLE_LEADERS_NOT_AVAILABLE;
import static org.apache.kafka.common.protocol.Errors.FENCED_LEADER_EPOCH;
import static org.apache.kafka.common.protocol.Errors.INVALID_PARTITIONS;
import static org.apache.kafka.common.protocol.Errors.INVALID_REPLICA_ASSIGNMENT;
import static org.apache.kafka.common.protocol.Errors.INVALID_TOPIC_EXCEPTION;
import static org.apache.kafka.common.protocol.Errors.NONE;
import static org.apache.kafka.common.protocol.Errors.NO_REASSIGNMENT_IN_PROGRESS;
import static org.apache.kafka.common.protocol.Errors.PREFERRED_LEADER_NOT_AVAILABLE;
import static org.apache.kafka.common.protocol.Errors.UNKNOWN_TOPIC_ID;
import static org.apache.kafka.common.protocol.Errors.UNKNOWN_TOPIC_OR_PARTITION;
import static org.apache.kafka.controller.BrokersToIsrs.TopicIdPartition;
import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@ -117,7 +128,7 @@ public class ReplicationControlManagerTest {
final MockTime time = new MockTime();
final MockRandom random = new MockRandom();
final ClusterControlManager clusterControl = new ClusterControlManager(
logContext, time, snapshotRegistry, BROKER_SESSION_TIMEOUT_MS,
logContext, time, snapshotRegistry, TimeUnit.MILLISECONDS.convert(BROKER_SESSION_TIMEOUT_MS, TimeUnit.NANOSECONDS),
new StripedReplicaPlacer(random));
final ControllerMetrics metrics = new MockControllerMetrics();
final ConfigurationControlManager configurationControl = new ConfigurationControlManager(
@ -200,7 +211,45 @@ public class ReplicationControlManagerTest {
}
}
void unfenceBrokers(Integer... brokerIds) throws Exception {
void alterIsr(
TopicIdPartition topicIdPartition,
int leaderId,
List<Integer> isr
) throws Exception {
BrokerRegistration registration = clusterControl.brokerRegistrations().get(leaderId);
assertFalse(registration.fenced());
PartitionRegistration partition = replicationControl.getPartition(
topicIdPartition.topicId(),
topicIdPartition.partitionId()
);
assertNotNull(partition);
assertEquals(leaderId, partition.leader);
PartitionData partitionData = new PartitionData()
.setPartitionIndex(topicIdPartition.partitionId())
.setCurrentIsrVersion(partition.partitionEpoch)
.setLeaderEpoch(partition.leaderEpoch)
.setNewIsr(isr);
String topicName = replicationControl.getTopic(topicIdPartition.topicId()).name();
TopicData topicData = new TopicData()
.setName(topicName)
.setPartitions(singletonList(partitionData));
ControllerResult<AlterIsrResponseData> alterIsr = replicationControl.alterIsr(
new AlterIsrRequestData()
.setBrokerId(leaderId)
.setBrokerEpoch(registration.epoch())
.setTopics(singletonList(topicData)));
replay(alterIsr.records());
}
void unfenceBrokers(Integer... brokerIds) throws Exception {
unfenceBrokers(Utils.mkSet(brokerIds));
}
void unfenceBrokers(Set<Integer> brokerIds) throws Exception {
for (int brokerId : brokerIds) {
ControllerResult<BrokerHeartbeatReply> result = replicationControl.
processBrokerHeartbeat(new BrokerHeartbeatRequestData().
@ -213,6 +262,19 @@ public class ReplicationControlManagerTest {
}
}
void alterTopicConfig(
String topic,
String configKey,
String configValue
) throws Exception {
ConfigRecord configRecord = new ConfigRecord()
.setResourceType(ConfigResource.Type.TOPIC.id())
.setResourceName(topic)
.setName(configKey)
.setValue(configValue);
replay(singletonList(new ApiMessageAndVersion(configRecord, (short) 0)));
}
void fenceBrokers(Set<Integer> brokerIds) throws Exception {
time.sleep(BROKER_SESSION_TIMEOUT_MS);
@ -284,10 +346,10 @@ public class ReplicationControlManagerTest {
setErrorMessage(Errors.TOPIC_ALREADY_EXISTS.exception().getMessage()));
assertEquals(expectedResponse3, result3.response());
Uuid fooId = result2.response().topics().find("foo").topicId();
RecordTestUtils.assertBatchIteratorContains(Arrays.asList(
Arrays.asList(new ApiMessageAndVersion(new PartitionRecord().
RecordTestUtils.assertBatchIteratorContains(asList(
asList(new ApiMessageAndVersion(new PartitionRecord().
setPartitionId(0).setTopicId(fooId).
setReplicas(Arrays.asList(1, 2, 0)).setIsr(Arrays.asList(1, 2, 0)).
setReplicas(asList(1, 2, 0)).setIsr(asList(1, 2, 0)).
setRemovingReplicas(Collections.emptyList()).setAddingReplicas(Collections.emptyList()).setLeader(1).
setLeaderEpoch(0).setPartitionEpoch(0), (short) 0),
new ApiMessageAndVersion(new TopicRecord().
@ -450,7 +512,7 @@ public class ReplicationControlManagerTest {
assertEquals(OptionalInt.of(0), ctx.currentLeader(topicIdPartition));
long brokerEpoch = ctx.currentBrokerEpoch(0);
PartitionData shrinkIsrRequest = newAlterIsrPartition(
replicationControl, topicIdPartition, Arrays.asList(0, 1));
replicationControl, topicIdPartition, asList(0, 1));
ControllerResult<AlterIsrResponseData> shrinkIsrResult = sendAlterIsr(
replicationControl, 0, brokerEpoch, "foo", shrinkIsrRequest);
AlterIsrResponseData.PartitionData shrinkIsrResponse = assertAlterIsrResponse(
@ -458,7 +520,7 @@ public class ReplicationControlManagerTest {
assertConsistentAlterIsrResponse(replicationControl, topicIdPartition, shrinkIsrResponse);
PartitionData expandIsrRequest = newAlterIsrPartition(
replicationControl, topicIdPartition, Arrays.asList(0, 1, 2));
replicationControl, topicIdPartition, asList(0, 1, 2));
ControllerResult<AlterIsrResponseData> expandIsrResult = sendAlterIsr(
replicationControl, 0, brokerEpoch, "foo", expandIsrRequest);
AlterIsrResponseData.PartitionData expandIsrResponse = assertAlterIsrResponse(
@ -482,7 +544,7 @@ public class ReplicationControlManagerTest {
// Invalid leader
PartitionData invalidLeaderRequest = newAlterIsrPartition(
replicationControl, topicIdPartition, Arrays.asList(0, 1));
replicationControl, topicIdPartition, asList(0, 1));
ControllerResult<AlterIsrResponseData> invalidLeaderResult = sendAlterIsr(
replicationControl, 1, ctx.currentBrokerEpoch(1),
"foo", invalidLeaderRequest);
@ -490,13 +552,13 @@ public class ReplicationControlManagerTest {
// Stale broker epoch
PartitionData invalidBrokerEpochRequest = newAlterIsrPartition(
replicationControl, topicIdPartition, Arrays.asList(0, 1));
replicationControl, topicIdPartition, asList(0, 1));
assertThrows(StaleBrokerEpochException.class, () -> sendAlterIsr(
replicationControl, 0, brokerEpoch - 1, "foo", invalidBrokerEpochRequest));
// Invalid leader epoch
PartitionData invalidLeaderEpochRequest = newAlterIsrPartition(
replicationControl, topicIdPartition, Arrays.asList(0, 1));
replicationControl, topicIdPartition, asList(0, 1));
invalidLeaderEpochRequest.setLeaderEpoch(500);
ControllerResult<AlterIsrResponseData> invalidLeaderEpochResult = sendAlterIsr(
replicationControl, 1, ctx.currentBrokerEpoch(1),
@ -505,8 +567,8 @@ public class ReplicationControlManagerTest {
// Invalid ISR (3 is not a valid replica)
PartitionData invalidIsrRequest1 = newAlterIsrPartition(
replicationControl, topicIdPartition, Arrays.asList(0, 1));
invalidIsrRequest1.setNewIsr(Arrays.asList(0, 1, 3));
replicationControl, topicIdPartition, asList(0, 1));
invalidIsrRequest1.setNewIsr(asList(0, 1, 3));
ControllerResult<AlterIsrResponseData> invalidIsrResult1 = sendAlterIsr(
replicationControl, 1, ctx.currentBrokerEpoch(1),
"foo", invalidIsrRequest1);
@ -514,8 +576,8 @@ public class ReplicationControlManagerTest {
// Invalid ISR (does not include leader 0)
PartitionData invalidIsrRequest2 = newAlterIsrPartition(
replicationControl, topicIdPartition, Arrays.asList(0, 1));
invalidIsrRequest2.setNewIsr(Arrays.asList(1, 2));
replicationControl, topicIdPartition, asList(0, 1));
invalidIsrRequest2.setNewIsr(asList(1, 2));
ControllerResult<AlterIsrResponseData> invalidIsrResult2 = sendAlterIsr(
replicationControl, 1, ctx.currentBrokerEpoch(1),
"foo", invalidIsrRequest2);
@ -647,28 +709,28 @@ public class ReplicationControlManagerTest {
assertNull(replicationControl.getPartition(topicId, 3));
assertCreatedTopicConfigs(ctx, "foo", requestConfigs);
assertEquals(Collections.singletonMap(topicId, new ResultOrError<>("foo")),
assertEquals(singletonMap(topicId, new ResultOrError<>("foo")),
replicationControl.findTopicNames(Long.MAX_VALUE, Collections.singleton(topicId)));
assertEquals(Collections.singletonMap("foo", new ResultOrError<>(topicId)),
assertEquals(singletonMap("foo", new ResultOrError<>(topicId)),
replicationControl.findTopicIds(Long.MAX_VALUE, Collections.singleton("foo")));
Uuid invalidId = new Uuid(topicId.getMostSignificantBits() + 1,
topicId.getLeastSignificantBits());
assertEquals(Collections.singletonMap(invalidId,
assertEquals(singletonMap(invalidId,
new ResultOrError<>(new ApiError(UNKNOWN_TOPIC_ID))),
replicationControl.findTopicNames(Long.MAX_VALUE, Collections.singleton(invalidId)));
assertEquals(Collections.singletonMap("bar",
assertEquals(singletonMap("bar",
new ResultOrError<>(new ApiError(UNKNOWN_TOPIC_OR_PARTITION))),
replicationControl.findTopicIds(Long.MAX_VALUE, Collections.singleton("bar")));
ControllerResult<Map<Uuid, ApiError>> invalidDeleteResult = replicationControl.
deleteTopics(Collections.singletonList(invalidId));
assertEquals(0, invalidDeleteResult.records().size());
assertEquals(Collections.singletonMap(invalidId, new ApiError(UNKNOWN_TOPIC_ID, null)),
assertEquals(singletonMap(invalidId, new ApiError(UNKNOWN_TOPIC_ID, null)),
invalidDeleteResult.response());
ControllerResult<Map<Uuid, ApiError>> deleteResult = replicationControl.
deleteTopics(Collections.singletonList(topicId));
assertTrue(deleteResult.isAtomic());
assertEquals(Collections.singletonMap(topicId, new ApiError(NONE, null)),
assertEquals(singletonMap(topicId, new ApiError(NONE, null)),
deleteResult.response());
assertEquals(1, deleteResult.records().size());
ctx.replay(deleteResult.records());
@ -676,10 +738,10 @@ public class ReplicationControlManagerTest {
assertNull(replicationControl.getPartition(topicId, 1));
assertNull(replicationControl.getPartition(topicId, 2));
assertNull(replicationControl.getPartition(topicId, 3));
assertEquals(Collections.singletonMap(topicId, new ResultOrError<>(
assertEquals(singletonMap(topicId, new ResultOrError<>(
new ApiError(UNKNOWN_TOPIC_ID))), replicationControl.findTopicNames(
Long.MAX_VALUE, Collections.singleton(topicId)));
assertEquals(Collections.singletonMap("foo", new ResultOrError<>(
assertEquals(singletonMap("foo", new ResultOrError<>(
new ApiError(UNKNOWN_TOPIC_OR_PARTITION))), replicationControl.findTopicIds(
Long.MAX_VALUE, Collections.singleton("foo")));
assertEmptyTopicConfigs(ctx, "foo");
@ -715,7 +777,7 @@ public class ReplicationControlManagerTest {
setName("quux").setCount(2).setAssignments(null));
ControllerResult<List<CreatePartitionsTopicResult>> createPartitionsResult =
replicationControl.createPartitions(topics);
assertEquals(Arrays.asList(new CreatePartitionsTopicResult().
assertEquals(asList(new CreatePartitionsTopicResult().
setName("foo").
setErrorCode(NONE.code()).
setErrorMessage(null),
@ -735,20 +797,20 @@ public class ReplicationControlManagerTest {
ctx.replay(createPartitionsResult.records());
List<CreatePartitionsTopic> topics2 = new ArrayList<>();
topics2.add(new CreatePartitionsTopic().
setName("foo").setCount(6).setAssignments(Arrays.asList(
new CreatePartitionsAssignment().setBrokerIds(Arrays.asList(1, 0)))));
setName("foo").setCount(6).setAssignments(asList(
new CreatePartitionsAssignment().setBrokerIds(asList(1, 0)))));
topics2.add(new CreatePartitionsTopic().
setName("bar").setCount(5).setAssignments(Arrays.asList(
new CreatePartitionsAssignment().setBrokerIds(Arrays.asList(1)))));
setName("bar").setCount(5).setAssignments(asList(
new CreatePartitionsAssignment().setBrokerIds(asList(1)))));
topics2.add(new CreatePartitionsTopic().
setName("quux").setCount(4).setAssignments(Arrays.asList(
new CreatePartitionsAssignment().setBrokerIds(Arrays.asList(1, 0)))));
setName("quux").setCount(4).setAssignments(asList(
new CreatePartitionsAssignment().setBrokerIds(asList(1, 0)))));
topics2.add(new CreatePartitionsTopic().
setName("foo2").setCount(3).setAssignments(Arrays.asList(
new CreatePartitionsAssignment().setBrokerIds(Arrays.asList(2, 0)))));
setName("foo2").setCount(3).setAssignments(asList(
new CreatePartitionsAssignment().setBrokerIds(asList(2, 0)))));
ControllerResult<List<CreatePartitionsTopicResult>> createPartitionsResult2 =
replicationControl.createPartitions(topics2);
assertEquals(Arrays.asList(new CreatePartitionsTopicResult().
assertEquals(asList(new CreatePartitionsTopicResult().
setName("foo").
setErrorCode(NONE.code()).
setErrorMessage(null),
@ -775,13 +837,13 @@ public class ReplicationControlManagerTest {
public void testValidateGoodManualPartitionAssignments() throws Exception {
ReplicationControlTestContext ctx = new ReplicationControlTestContext();
ctx.registerBrokers(1, 2, 3);
ctx.replicationControl.validateManualPartitionAssignment(Arrays.asList(1),
ctx.replicationControl.validateManualPartitionAssignment(asList(1),
OptionalInt.of(1));
ctx.replicationControl.validateManualPartitionAssignment(Arrays.asList(1),
ctx.replicationControl.validateManualPartitionAssignment(asList(1),
OptionalInt.empty());
ctx.replicationControl.validateManualPartitionAssignment(Arrays.asList(1, 2, 3),
ctx.replicationControl.validateManualPartitionAssignment(asList(1, 2, 3),
OptionalInt.of(3));
ctx.replicationControl.validateManualPartitionAssignment(Arrays.asList(1, 2, 3),
ctx.replicationControl.validateManualPartitionAssignment(asList(1, 2, 3),
OptionalInt.empty());
}
@ -791,20 +853,20 @@ public class ReplicationControlManagerTest {
ctx.registerBrokers(1, 2);
assertEquals("The manual partition assignment includes an empty replica list.",
assertThrows(InvalidReplicaAssignmentException.class, () ->
ctx.replicationControl.validateManualPartitionAssignment(Arrays.asList(),
ctx.replicationControl.validateManualPartitionAssignment(asList(),
OptionalInt.empty())).getMessage());
assertEquals("The manual partition assignment includes broker 3, but no such " +
"broker is registered.", assertThrows(InvalidReplicaAssignmentException.class, () ->
ctx.replicationControl.validateManualPartitionAssignment(Arrays.asList(1, 2, 3),
ctx.replicationControl.validateManualPartitionAssignment(asList(1, 2, 3),
OptionalInt.empty())).getMessage());
assertEquals("The manual partition assignment includes the broker 2 more than " +
"once.", assertThrows(InvalidReplicaAssignmentException.class, () ->
ctx.replicationControl.validateManualPartitionAssignment(Arrays.asList(1, 2, 2),
ctx.replicationControl.validateManualPartitionAssignment(asList(1, 2, 2),
OptionalInt.empty())).getMessage());
assertEquals("The manual partition assignment includes a partition with 2 " +
"replica(s), but this is not consistent with previous partitions, which have " +
"3 replica(s).", assertThrows(InvalidReplicaAssignmentException.class, () ->
ctx.replicationControl.validateManualPartitionAssignment(Arrays.asList(1, 2),
ctx.replicationControl.validateManualPartitionAssignment(asList(1, 2),
OptionalInt.of(3))).getMessage());
}
@ -824,18 +886,18 @@ public class ReplicationControlManagerTest {
assertEquals(NONE_REASSIGNING, replication.listPartitionReassignments(null));
ControllerResult<AlterPartitionReassignmentsResponseData> alterResult =
replication.alterPartitionReassignments(
new AlterPartitionReassignmentsRequestData().setTopics(Arrays.asList(
new ReassignableTopic().setName("foo").setPartitions(Arrays.asList(
new AlterPartitionReassignmentsRequestData().setTopics(asList(
new ReassignableTopic().setName("foo").setPartitions(asList(
new ReassignablePartition().setPartitionIndex(0).
setReplicas(Arrays.asList(3, 2, 1)),
setReplicas(asList(3, 2, 1)),
new ReassignablePartition().setPartitionIndex(1).
setReplicas(Arrays.asList(0, 2, 1)),
setReplicas(asList(0, 2, 1)),
new ReassignablePartition().setPartitionIndex(2).
setReplicas(Arrays.asList(0, 2, 1)))),
setReplicas(asList(0, 2, 1)))),
new ReassignableTopic().setName("bar"))));
assertEquals(new AlterPartitionReassignmentsResponseData().
setErrorMessage(null).setResponses(Arrays.asList(
new ReassignableTopicResponse().setName("foo").setPartitions(Arrays.asList(
setErrorMessage(null).setResponses(asList(
new ReassignableTopicResponse().setName("foo").setPartitions(asList(
new ReassignablePartitionResponse().setPartitionIndex(0).
setErrorMessage(null),
new ReassignablePartitionResponse().setPartitionIndex(1).
@ -849,41 +911,41 @@ public class ReplicationControlManagerTest {
ctx.replay(alterResult.records());
ListPartitionReassignmentsResponseData currentReassigning =
new ListPartitionReassignmentsResponseData().setErrorMessage(null).
setTopics(Arrays.asList(new OngoingTopicReassignment().
setName("foo").setPartitions(Arrays.asList(
setTopics(asList(new OngoingTopicReassignment().
setName("foo").setPartitions(asList(
new OngoingPartitionReassignment().setPartitionIndex(1).
setRemovingReplicas(Arrays.asList(3)).
setAddingReplicas(Arrays.asList(0)).
setReplicas(Arrays.asList(0, 2, 1, 3))))));
setRemovingReplicas(asList(3)).
setAddingReplicas(asList(0)).
setReplicas(asList(0, 2, 1, 3))))));
assertEquals(currentReassigning, replication.listPartitionReassignments(null));
assertEquals(NONE_REASSIGNING, replication.listPartitionReassignments(Arrays.asList(
assertEquals(NONE_REASSIGNING, replication.listPartitionReassignments(asList(
new ListPartitionReassignmentsTopics().setName("bar").
setPartitionIndexes(Arrays.asList(0, 1, 2)))));
assertEquals(currentReassigning, replication.listPartitionReassignments(Arrays.asList(
setPartitionIndexes(asList(0, 1, 2)))));
assertEquals(currentReassigning, replication.listPartitionReassignments(asList(
new ListPartitionReassignmentsTopics().setName("foo").
setPartitionIndexes(Arrays.asList(0, 1, 2)))));
setPartitionIndexes(asList(0, 1, 2)))));
ControllerResult<AlterPartitionReassignmentsResponseData> cancelResult =
replication.alterPartitionReassignments(
new AlterPartitionReassignmentsRequestData().setTopics(Arrays.asList(
new ReassignableTopic().setName("foo").setPartitions(Arrays.asList(
new AlterPartitionReassignmentsRequestData().setTopics(asList(
new ReassignableTopic().setName("foo").setPartitions(asList(
new ReassignablePartition().setPartitionIndex(0).
setReplicas(null),
new ReassignablePartition().setPartitionIndex(1).
setReplicas(null),
new ReassignablePartition().setPartitionIndex(2).
setReplicas(null))),
new ReassignableTopic().setName("bar").setPartitions(Arrays.asList(
new ReassignableTopic().setName("bar").setPartitions(asList(
new ReassignablePartition().setPartitionIndex(0).
setReplicas(null))))));
assertEquals(ControllerResult.atomicOf(Collections.singletonList(new ApiMessageAndVersion(
new PartitionChangeRecord().setTopicId(fooId).
setPartitionId(1).
setReplicas(Arrays.asList(2, 1, 3)).
setReplicas(asList(2, 1, 3)).
setLeader(3).
setRemovingReplicas(Collections.emptyList()).
setAddingReplicas(Collections.emptyList()), (short) 0)),
new AlterPartitionReassignmentsResponseData().setErrorMessage(null).setResponses(Arrays.asList(
new ReassignableTopicResponse().setName("foo").setPartitions(Arrays.asList(
new AlterPartitionReassignmentsResponseData().setErrorMessage(null).setResponses(asList(
new ReassignableTopicResponse().setName("foo").setPartitions(asList(
new ReassignablePartitionResponse().setPartitionIndex(0).
setErrorCode(NO_REASSIGNMENT_IN_PROGRESS.code()).setErrorMessage(null),
new ReassignablePartitionResponse().setPartitionIndex(1).
@ -891,7 +953,7 @@ public class ReplicationControlManagerTest {
new ReassignablePartitionResponse().setPartitionIndex(2).
setErrorCode(UNKNOWN_TOPIC_OR_PARTITION.code()).
setErrorMessage("Unable to find partition foo:2."))),
new ReassignableTopicResponse().setName("bar").setPartitions(Arrays.asList(
new ReassignableTopicResponse().setName("bar").setPartitions(asList(
new ReassignablePartitionResponse().setPartitionIndex(0).
setErrorCode(NO_REASSIGNMENT_IN_PROGRESS.code()).
setErrorMessage(null)))))),
@ -899,11 +961,11 @@ public class ReplicationControlManagerTest {
log.info("running final alterIsr...");
ControllerResult<AlterIsrResponseData> alterIsrResult = replication.alterIsr(
new AlterIsrRequestData().setBrokerId(3).setBrokerEpoch(103).
setTopics(Arrays.asList(new TopicData().setName("foo").setPartitions(Arrays.asList(
setTopics(asList(new TopicData().setName("foo").setPartitions(asList(
new PartitionData().setPartitionIndex(1).setCurrentIsrVersion(1).
setLeaderEpoch(0).setNewIsr(Arrays.asList(3, 0, 2, 1)))))));
assertEquals(new AlterIsrResponseData().setTopics(Arrays.asList(
new AlterIsrResponseData.TopicData().setName("foo").setPartitions(Arrays.asList(
setLeaderEpoch(0).setNewIsr(asList(3, 0, 2, 1)))))));
assertEquals(new AlterIsrResponseData().setTopics(asList(
new AlterIsrResponseData.TopicData().setName("foo").setPartitions(asList(
new AlterIsrResponseData.PartitionData().
setPartitionIndex(1).
setErrorCode(FENCED_LEADER_EPOCH.code()))))),
@ -931,22 +993,22 @@ public class ReplicationControlManagerTest {
new int[] {}, new int[] {}, 1, 1, 1), replication.getPartition(fooId, 0));
ControllerResult<AlterPartitionReassignmentsResponseData> alterResult =
replication.alterPartitionReassignments(
new AlterPartitionReassignmentsRequestData().setTopics(Arrays.asList(
new ReassignableTopic().setName("foo").setPartitions(Arrays.asList(
new AlterPartitionReassignmentsRequestData().setTopics(asList(
new ReassignableTopic().setName("foo").setPartitions(asList(
new ReassignablePartition().setPartitionIndex(0).
setReplicas(Arrays.asList(1, 2, 3)),
setReplicas(asList(1, 2, 3)),
new ReassignablePartition().setPartitionIndex(1).
setReplicas(Arrays.asList(1, 2, 3, 0)),
setReplicas(asList(1, 2, 3, 0)),
new ReassignablePartition().setPartitionIndex(2).
setReplicas(Arrays.asList(5, 6, 7)),
setReplicas(asList(5, 6, 7)),
new ReassignablePartition().setPartitionIndex(3).
setReplicas(Arrays.asList()))),
new ReassignableTopic().setName("bar").setPartitions(Arrays.asList(
setReplicas(asList()))),
new ReassignableTopic().setName("bar").setPartitions(asList(
new ReassignablePartition().setPartitionIndex(0).
setReplicas(Arrays.asList(1, 2, 3, 4, 0)))))));
setReplicas(asList(1, 2, 3, 4, 0)))))));
assertEquals(new AlterPartitionReassignmentsResponseData().
setErrorMessage(null).setResponses(Arrays.asList(
new ReassignableTopicResponse().setName("foo").setPartitions(Arrays.asList(
setErrorMessage(null).setResponses(asList(
new ReassignableTopicResponse().setName("foo").setPartitions(asList(
new ReassignablePartitionResponse().setPartitionIndex(0).
setErrorMessage(null),
new ReassignablePartitionResponse().setPartitionIndex(1).
@ -959,7 +1021,7 @@ public class ReplicationControlManagerTest {
setErrorCode(INVALID_REPLICA_ASSIGNMENT.code()).
setErrorMessage("The manual partition assignment includes an empty " +
"replica list."))),
new ReassignableTopicResponse().setName("bar").setPartitions(Arrays.asList(
new ReassignableTopicResponse().setName("bar").setPartitions(asList(
new ReassignablePartitionResponse().setPartitionIndex(0).
setErrorMessage(null))))),
alterResult.response());
@ -972,55 +1034,55 @@ public class ReplicationControlManagerTest {
new int[] {}, new int[] {0, 1}, 4, 1, 2), replication.getPartition(barId, 0));
ListPartitionReassignmentsResponseData currentReassigning =
new ListPartitionReassignmentsResponseData().setErrorMessage(null).
setTopics(Arrays.asList(new OngoingTopicReassignment().
setName("bar").setPartitions(Arrays.asList(
setTopics(asList(new OngoingTopicReassignment().
setName("bar").setPartitions(asList(
new OngoingPartitionReassignment().setPartitionIndex(0).
setRemovingReplicas(Collections.emptyList()).
setAddingReplicas(Arrays.asList(0, 1)).
setReplicas(Arrays.asList(1, 2, 3, 4, 0))))));
setAddingReplicas(asList(0, 1)).
setReplicas(asList(1, 2, 3, 4, 0))))));
assertEquals(currentReassigning, replication.listPartitionReassignments(null));
assertEquals(NONE_REASSIGNING, replication.listPartitionReassignments(Arrays.asList(
assertEquals(NONE_REASSIGNING, replication.listPartitionReassignments(asList(
new ListPartitionReassignmentsTopics().setName("foo").
setPartitionIndexes(Arrays.asList(0, 1, 2)))));
assertEquals(currentReassigning, replication.listPartitionReassignments(Arrays.asList(
setPartitionIndexes(asList(0, 1, 2)))));
assertEquals(currentReassigning, replication.listPartitionReassignments(asList(
new ListPartitionReassignmentsTopics().setName("bar").
setPartitionIndexes(Arrays.asList(0, 1, 2)))));
setPartitionIndexes(asList(0, 1, 2)))));
ControllerResult<AlterIsrResponseData> alterIsrResult = replication.alterIsr(
new AlterIsrRequestData().setBrokerId(4).setBrokerEpoch(104).
setTopics(Arrays.asList(new TopicData().setName("bar").setPartitions(Arrays.asList(
setTopics(asList(new TopicData().setName("bar").setPartitions(asList(
new PartitionData().setPartitionIndex(0).setCurrentIsrVersion(2).
setLeaderEpoch(1).setNewIsr(Arrays.asList(4, 1, 2, 3, 0)))))));
assertEquals(new AlterIsrResponseData().setTopics(Arrays.asList(
new AlterIsrResponseData.TopicData().setName("bar").setPartitions(Arrays.asList(
setLeaderEpoch(1).setNewIsr(asList(4, 1, 2, 3, 0)))))));
assertEquals(new AlterIsrResponseData().setTopics(asList(
new AlterIsrResponseData.TopicData().setName("bar").setPartitions(asList(
new AlterIsrResponseData.PartitionData().
setPartitionIndex(0).
setLeaderId(4).
setLeaderEpoch(1).
setIsr(Arrays.asList(4, 1, 2, 3, 0)).
setIsr(asList(4, 1, 2, 3, 0)).
setCurrentIsrVersion(3).
setErrorCode(NONE.code()))))),
alterIsrResult.response());
ControllerResult<AlterPartitionReassignmentsResponseData> cancelResult =
replication.alterPartitionReassignments(
new AlterPartitionReassignmentsRequestData().setTopics(Arrays.asList(
new ReassignableTopic().setName("foo").setPartitions(Arrays.asList(
new AlterPartitionReassignmentsRequestData().setTopics(asList(
new ReassignableTopic().setName("foo").setPartitions(asList(
new ReassignablePartition().setPartitionIndex(0).
setReplicas(null))),
new ReassignableTopic().setName("bar").setPartitions(Arrays.asList(
new ReassignableTopic().setName("bar").setPartitions(asList(
new ReassignablePartition().setPartitionIndex(0).
setReplicas(null))))));
assertEquals(ControllerResult.atomicOf(Collections.singletonList(new ApiMessageAndVersion(
new PartitionChangeRecord().setTopicId(barId).
setPartitionId(0).
setLeader(4).
setReplicas(Arrays.asList(2, 3, 4)).
setReplicas(asList(2, 3, 4)).
setRemovingReplicas(null).
setAddingReplicas(Collections.emptyList()), (short) 0)),
new AlterPartitionReassignmentsResponseData().setErrorMessage(null).setResponses(Arrays.asList(
new ReassignableTopicResponse().setName("foo").setPartitions(Arrays.asList(
new AlterPartitionReassignmentsResponseData().setErrorMessage(null).setResponses(asList(
new ReassignableTopicResponse().setName("foo").setPartitions(asList(
new ReassignablePartitionResponse().setPartitionIndex(0).
setErrorCode(NO_REASSIGNMENT_IN_PROGRESS.code()).setErrorMessage(null))),
new ReassignableTopicResponse().setName("bar").setPartitions(Arrays.asList(
new ReassignableTopicResponse().setName("bar").setPartitions(asList(
new ReassignablePartitionResponse().setPartitionIndex(0).
setErrorMessage(null)))))),
cancelResult);
@ -1052,6 +1114,160 @@ public class ReplicationControlManagerTest {
ctx.replicationControl.getPartition(fooId, 1));
}
private void assertLeaderAndIsr(
ReplicationControlManager replication,
TopicIdPartition topicIdPartition,
int leaderId,
int[] isr
) {
PartitionRegistration registration = replication.getPartition(
topicIdPartition.topicId(),
topicIdPartition.partitionId()
);
assertArrayEquals(isr, registration.isr);
assertEquals(leaderId, registration.leader);
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testElectUncleanLeaders(boolean electAllPartitions) throws Exception {
ReplicationControlTestContext ctx = new ReplicationControlTestContext();
ReplicationControlManager replication = ctx.replicationControl;
ctx.registerBrokers(0, 1, 2, 3, 4);
ctx.unfenceBrokers(0, 1, 2, 3, 4);
Uuid fooId = ctx.createTestTopic("foo", new int[][]{
new int[]{1, 2, 3}, new int[]{2, 3, 4}, new int[]{0, 2, 1}}).topicId();
TopicIdPartition partition0 = new TopicIdPartition(fooId, 0);
TopicIdPartition partition1 = new TopicIdPartition(fooId, 1);
TopicIdPartition partition2 = new TopicIdPartition(fooId, 2);
ctx.fenceBrokers(Utils.mkSet(2, 3));
ctx.fenceBrokers(Utils.mkSet(1, 2, 3));
assertLeaderAndIsr(replication, partition0, NO_LEADER, new int[]{1});
assertLeaderAndIsr(replication, partition1, 4, new int[]{4});
assertLeaderAndIsr(replication, partition2, 0, new int[]{0});
ElectLeadersRequestData request = buildElectLeadersRequest(
ElectionType.UNCLEAN,
electAllPartitions ? null : singletonMap("foo", asList(0, 1, 2))
);
// No election can be done yet because no replicas are available for partition 0
ControllerResult<ElectLeadersResponseData> result1 = replication.electLeaders(request);
assertEquals(Collections.emptyList(), result1.records());
ElectLeadersResponseData expectedResponse1 = buildElectLeadersResponse(NONE, electAllPartitions, Utils.mkMap(
Utils.mkEntry(
new TopicPartition("foo", 0),
new ApiError(ELIGIBLE_LEADERS_NOT_AVAILABLE)
),
Utils.mkEntry(
new TopicPartition("foo", 1),
new ApiError(ELECTION_NOT_NEEDED)
),
Utils.mkEntry(
new TopicPartition("foo", 2),
new ApiError(ELECTION_NOT_NEEDED)
)
));
assertElectLeadersResponse(expectedResponse1, result1.response());
// Now we bring 2 back online which should allow the unclean election of partition 0
ctx.unfenceBrokers(Utils.mkSet(2));
// Bring 2 back into the ISR for partition 1. This allows us to verify that
// preferred election does not occur as a result of the unclean election request.
ctx.alterIsr(partition1, 4, asList(2, 4));
ControllerResult<ElectLeadersResponseData> result = replication.electLeaders(request);
assertEquals(1, result.records().size());
ApiMessageAndVersion record = result.records().get(0);
assertTrue(record.message() instanceof PartitionChangeRecord);
PartitionChangeRecord partitionChangeRecord = (PartitionChangeRecord) record.message();
assertEquals(0, partitionChangeRecord.partitionId());
assertEquals(2, partitionChangeRecord.leader());
assertEquals(singletonList(2), partitionChangeRecord.isr());
ctx.replay(result.records());
assertLeaderAndIsr(replication, partition0, 2, new int[]{2});
assertLeaderAndIsr(replication, partition1, 4, new int[]{2, 4});
assertLeaderAndIsr(replication, partition2, 0, new int[]{0});
ElectLeadersResponseData expectedResponse = buildElectLeadersResponse(NONE, electAllPartitions, Utils.mkMap(
Utils.mkEntry(
new TopicPartition("foo", 0),
ApiError.NONE
),
Utils.mkEntry(
new TopicPartition("foo", 1),
new ApiError(ELECTION_NOT_NEEDED)
),
Utils.mkEntry(
new TopicPartition("foo", 2),
new ApiError(ELECTION_NOT_NEEDED)
)
));
assertElectLeadersResponse(expectedResponse, result.response());
}
@Test
public void testPreferredElectionDoesNotTriggerUncleanElection() throws Exception {
ReplicationControlTestContext ctx = new ReplicationControlTestContext();
ReplicationControlManager replication = ctx.replicationControl;
ctx.registerBrokers(1, 2, 3, 4);
ctx.unfenceBrokers(1, 2, 3, 4);
Uuid fooId = ctx.createTestTopic("foo", new int[][]{new int[]{1, 2, 3}}).topicId();
TopicIdPartition partition = new TopicIdPartition(fooId, 0);
ctx.fenceBrokers(Utils.mkSet(2, 3));
ctx.fenceBrokers(Utils.mkSet(1, 2, 3));
ctx.unfenceBrokers(Utils.mkSet(2));
assertLeaderAndIsr(replication, partition, NO_LEADER, new int[]{1});
ctx.alterTopicConfig("foo", "unclean.leader.election.enable", "true");
ElectLeadersRequestData request = buildElectLeadersRequest(
ElectionType.PREFERRED,
singletonMap("foo", singletonList(0))
);
// No election should be done even though unclean election is available
ControllerResult<ElectLeadersResponseData> result = replication.electLeaders(request);
assertEquals(Collections.emptyList(), result.records());
ElectLeadersResponseData expectedResponse = buildElectLeadersResponse(NONE, false, singletonMap(
new TopicPartition("foo", 0), new ApiError(PREFERRED_LEADER_NOT_AVAILABLE)
));
assertEquals(expectedResponse, result.response());
}
private ElectLeadersRequestData buildElectLeadersRequest(
ElectionType electionType,
Map<String, List<Integer>> partitions
) {
ElectLeadersRequestData request = new ElectLeadersRequestData().
setElectionType(electionType.value);
if (partitions == null) {
request.setTopicPartitions(null);
} else {
partitions.forEach((topic, partitionIds) -> {
request.topicPartitions().add(new TopicPartitions()
.setTopic(topic)
.setPartitions(partitionIds)
);
});
}
return request;
}
@Test
public void testFenceMultipleBrokers() throws Exception {
ReplicationControlTestContext ctx = new ReplicationControlTestContext();
@ -1083,7 +1299,7 @@ public class ReplicationControlManagerTest {
}
@Test
public void testElectLeaders() throws Exception {
public void testElectPreferredLeaders() throws Exception {
ReplicationControlTestContext ctx = new ReplicationControlTestContext();
ReplicationControlManager replication = ctx.replicationControl;
ctx.registerBrokers(0, 1, 2, 3, 4);
@ -1091,60 +1307,130 @@ public class ReplicationControlManagerTest {
Uuid fooId = ctx.createTestTopic("foo", new int[][]{
new int[]{1, 2, 3}, new int[]{2, 3, 4}, new int[]{0, 2, 1}}).topicId();
ElectLeadersRequestData request1 = new ElectLeadersRequestData().
setElectionType((byte) 0).
setTopicPartitions(new TopicPartitionsCollection(Arrays.asList(
setElectionType(ElectionType.PREFERRED.value).
setTopicPartitions(new TopicPartitionsCollection(asList(
new TopicPartitions().setTopic("foo").
setPartitions(Arrays.asList(0, 1)),
setPartitions(asList(0, 1)),
new TopicPartitions().setTopic("bar").
setPartitions(Arrays.asList(0, 1))).iterator()));
setPartitions(asList(0, 1))).iterator()));
ControllerResult<ElectLeadersResponseData> election1Result =
replication.electLeaders(request1);
ElectLeadersResponseData expectedResponse1 = new ElectLeadersResponseData().
setErrorCode((short) 0).
setReplicaElectionResults(Arrays.asList(
new ReplicaElectionResult().setTopic("foo").
setPartitionResult(Arrays.asList(
new PartitionResult().setPartitionId(0).
setErrorCode(NONE.code()).
setErrorMessage(null),
new PartitionResult().setPartitionId(1).
setErrorCode(NONE.code()).
setErrorMessage(null))),
new ReplicaElectionResult().setTopic("bar").
setPartitionResult(Arrays.asList(
new PartitionResult().setPartitionId(0).
setErrorCode(UNKNOWN_TOPIC_OR_PARTITION.code()).
setErrorMessage("No such topic as bar"),
new PartitionResult().setPartitionId(1).
setErrorCode(UNKNOWN_TOPIC_OR_PARTITION.code()).
setErrorMessage("No such topic as bar")))));
assertEquals(expectedResponse1, election1Result.response());
ElectLeadersResponseData expectedResponse1 = buildElectLeadersResponse(NONE, false, Utils.mkMap(
Utils.mkEntry(
new TopicPartition("foo", 0),
new ApiError(PREFERRED_LEADER_NOT_AVAILABLE)
),
Utils.mkEntry(
new TopicPartition("foo", 1),
new ApiError(ELECTION_NOT_NEEDED)
),
Utils.mkEntry(
new TopicPartition("bar", 0),
new ApiError(UNKNOWN_TOPIC_OR_PARTITION, "No such topic as bar")
),
Utils.mkEntry(
new TopicPartition("bar", 1),
new ApiError(UNKNOWN_TOPIC_OR_PARTITION, "No such topic as bar")
)
));
assertElectLeadersResponse(expectedResponse1, election1Result.response());
assertEquals(Collections.emptyList(), election1Result.records());
ctx.unfenceBrokers(0, 1);
ControllerResult<AlterIsrResponseData> alterIsrResult = replication.alterIsr(
new AlterIsrRequestData().setBrokerId(2).setBrokerEpoch(102).
setTopics(Arrays.asList(new AlterIsrRequestData.TopicData().setName("foo").
setPartitions(Arrays.asList(new AlterIsrRequestData.PartitionData().
setTopics(asList(new AlterIsrRequestData.TopicData().setName("foo").
setPartitions(asList(new AlterIsrRequestData.PartitionData().
setPartitionIndex(0).setCurrentIsrVersion(0).
setLeaderEpoch(0).setNewIsr(Arrays.asList(1, 2, 3)))))));
assertEquals(new AlterIsrResponseData().setTopics(Arrays.asList(
new AlterIsrResponseData.TopicData().setName("foo").setPartitions(Arrays.asList(
setLeaderEpoch(0).setNewIsr(asList(1, 2, 3)))))));
assertEquals(new AlterIsrResponseData().setTopics(asList(
new AlterIsrResponseData.TopicData().setName("foo").setPartitions(asList(
new AlterIsrResponseData.PartitionData().
setPartitionIndex(0).
setLeaderId(2).
setLeaderEpoch(0).
setIsr(Arrays.asList(1, 2, 3)).
setIsr(asList(1, 2, 3)).
setCurrentIsrVersion(1).
setErrorCode(NONE.code()))))),
alterIsrResult.response());
ElectLeadersResponseData expectedResponse2 = buildElectLeadersResponse(NONE, false, Utils.mkMap(
Utils.mkEntry(
new TopicPartition("foo", 0),
ApiError.NONE
),
Utils.mkEntry(
new TopicPartition("foo", 1),
new ApiError(ELECTION_NOT_NEEDED)
),
Utils.mkEntry(
new TopicPartition("bar", 0),
new ApiError(UNKNOWN_TOPIC_OR_PARTITION, "No such topic as bar")
),
Utils.mkEntry(
new TopicPartition("bar", 1),
new ApiError(UNKNOWN_TOPIC_OR_PARTITION, "No such topic as bar")
)
));
ctx.replay(alterIsrResult.records());
ControllerResult<ElectLeadersResponseData> election2Result =
replication.electLeaders(request1);
assertEquals(expectedResponse1, election2Result.response());
assertEquals(Arrays.asList(new ApiMessageAndVersion(new PartitionChangeRecord().
assertElectLeadersResponse(expectedResponse2, election2Result.response());
assertEquals(asList(new ApiMessageAndVersion(new PartitionChangeRecord().
setPartitionId(0).
setTopicId(fooId).
setLeader(1), (short) 0)), election2Result.records());
}
private void assertElectLeadersResponse(
ElectLeadersResponseData expected,
ElectLeadersResponseData actual
) {
assertEquals(Errors.forCode(expected.errorCode()), Errors.forCode(actual.errorCode()));
assertEquals(collectElectLeadersErrors(expected), collectElectLeadersErrors(actual));
}
private Map<TopicPartition, PartitionResult> collectElectLeadersErrors(ElectLeadersResponseData response) {
Map<TopicPartition, PartitionResult> res = new HashMap<>();
response.replicaElectionResults().forEach(topicResult -> {
String topic = topicResult.topic();
topicResult.partitionResult().forEach(partitionResult -> {
TopicPartition topicPartition = new TopicPartition(topic, partitionResult.partitionId());
res.put(topicPartition, partitionResult);
});
});
return res;
}
private ElectLeadersResponseData buildElectLeadersResponse(
Errors topLevelError,
boolean electAllPartitions,
Map<TopicPartition, ApiError> errors
) {
Map<String, List<Map.Entry<TopicPartition, ApiError>>> errorsByTopic = errors.entrySet().stream()
.collect(Collectors.groupingBy(entry -> entry.getKey().topic()));
ElectLeadersResponseData response = new ElectLeadersResponseData()
.setErrorCode(topLevelError.code());
errorsByTopic.forEach((topic, partitionErrors) -> {
ReplicaElectionResult electionResult = new ReplicaElectionResult().setTopic(topic);
electionResult.setPartitionResult(partitionErrors.stream()
.filter(entry -> !electAllPartitions || entry.getValue().error() != ELECTION_NOT_NEEDED)
.map(entry -> {
TopicPartition topicPartition = entry.getKey();
ApiError error = entry.getValue();
return new PartitionResult()
.setPartitionId(topicPartition.partition())
.setErrorCode(error.error().code())
.setErrorMessage(error.message());
})
.collect(Collectors.toList()));
response.replicaElectionResults().add(electionResult);
});
return response;
}
}