KAFKA-16523; kafka-metadata-quorum: support add-controller and remove-controller (#16774)

This PR adds support for add-controller and remove-controller in the kafka-metadata-quorum.sh
command-line tool. It also fixes some minor server-side bugs that blocked the tool from working.

In kafka-metadata-quorum.sh, the implementation of remove-controller is fairly straightforward. It
just takes some command-line flags and uses them to invoke AdminClient. The add-controller
implementation is a bit more complex because we have to look at the new controller's configuration
file. The parsing logic for the advertised.listeners and listeners server configurations that we
need was previously implemented in the :core module. However, the gradle module where
kafka-metadata-quorum.sh lives, :tools, cannot depend on :core. Therefore, I moved listener parsing
into SocketServerConfigs.listenerListToEndPoints. This will be a small step forward in our efforts
to move Kafka configuration out of :core.

I also made some minor changes in kafka-metadata-quorum.sh and Kafka-storage-tool.sh to handle
--help without displaying a backtrace on the screen, and give slightly better error messages on
stderr. Also, in DynamicVoter.toString, we now enclose the host in brackets if it contains a colon
(as IPV6 addresses can).

This PR fixes our handling of clusterId in addRaftVoter and removeRaftVoter, in two ways. Firstly,
it marks clusterId as nullable in the AddRaftVoterRequest.json and RemoveRaftVoterRequest.json
schemas, as it was always intended to be. Secondly, it allows AdminClient to optionally send
clusterId, by using AddRaftVoterOptions and RemoveRaftVoterOptions. We now also remember to
properly set timeoutMs in AddRaftVoterRequest. This PR adds unit tests for
KafkaAdminClient#addRaftVoter and KafkaAdminClient#removeRaftVoter, to make sure they are sending
the right things.

Finally, I fixed some minor server-side bugs that were blocking the handling of these RPCs.
Firstly, ApiKeys.ADD_RAFT_VOTER and ApiKeys.REMOVE_RAFT_VOTER are now marked as forwardable so that
forwarding from the broker to the active controller works correctly. Secondly,
org.apache.kafka.raft.KafkaNetworkChannel has now been updated to enable API_VERSIONS_REQUEST and
API_VERSIONS_RESPONSE.

Co-authored-by: Murali Basani muralidhar.basani@aiven.io
Reviewers: José Armando García Sancio <jsancio@apache.org>, Alyssa Huang <ahuang@confluent.io>
This commit is contained in:
Colin Patrick McCabe 2024-08-08 15:54:12 -07:00 committed by GitHub
parent e7317b37bf
commit 6a44fb154d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
27 changed files with 873 additions and 121 deletions

View File

@ -2112,7 +2112,9 @@ project(':tools') {
dependencies {
implementation project(':clients')
implementation project(':metadata')
implementation project(':storage')
implementation project(':server')
implementation project(':server-common')
implementation project(':connect:runtime')
implementation project(':tools:tools-api')

View File

@ -282,6 +282,8 @@
<subpackage name="tools">
<allow pkg="org.apache.kafka.common"/>
<allow pkg="org.apache.kafka.metadata.properties" />
<allow pkg="org.apache.kafka.network" />
<allow pkg="org.apache.kafka.server.util" />
<allow pkg="kafka.admin" />
<allow pkg="kafka.server" />

View File

@ -18,9 +18,21 @@ package org.apache.kafka.clients.admin;
import org.apache.kafka.common.annotation.InterfaceStability;
import java.util.Optional;
/**
* Options for {@link Admin#addRaftVoter}.
*/
@InterfaceStability.Stable
public class AddRaftVoterOptions extends AbstractOptions<AddRaftVoterOptions> {
private Optional<String> clusterId = Optional.empty();
public AddRaftVoterOptions setClusterId(Optional<String> clusterId) {
this.clusterId = clusterId;
return this;
}
public Optional<String> clusterId() {
return clusterId;
}
}

View File

@ -4820,6 +4820,8 @@ public class KafkaAdminClient extends AdminClient {
setPort(endpoint.port())));
return new AddRaftVoterRequest.Builder(
new AddRaftVoterRequestData().
setClusterId(options.clusterId().orElse(null)).
setTimeoutMs(timeoutMs).
setVoterId(voterId) .
setVoterDirectoryId(voterDirectoryId).
setListeners(listeners));
@ -4864,6 +4866,7 @@ public class KafkaAdminClient extends AdminClient {
RemoveRaftVoterRequest.Builder createRequest(int timeoutMs) {
return new RemoveRaftVoterRequest.Builder(
new RemoveRaftVoterRequestData().
setClusterId(options.clusterId().orElse(null)).
setVoterId(voterId) .
setVoterDirectoryId(voterDirectoryId));
}

View File

@ -18,9 +18,21 @@ package org.apache.kafka.clients.admin;
import org.apache.kafka.common.annotation.InterfaceStability;
import java.util.Optional;
/**
* Options for {@link Admin#removeRaftVoter}.
*/
@InterfaceStability.Stable
public class RemoveRaftVoterOptions extends AbstractOptions<RemoveRaftVoterOptions> {
private Optional<String> clusterId = Optional.empty();
public RemoveRaftVoterOptions setClusterId(Optional<String> clusterId) {
this.clusterId = clusterId;
return this;
}
public Optional<String> clusterId() {
return clusterId;
}
}

View File

@ -123,8 +123,8 @@ public enum ApiKeys {
SHARE_GROUP_DESCRIBE(ApiMessageType.SHARE_GROUP_DESCRIBE),
SHARE_FETCH(ApiMessageType.SHARE_FETCH),
SHARE_ACKNOWLEDGE(ApiMessageType.SHARE_ACKNOWLEDGE),
ADD_RAFT_VOTER(ApiMessageType.ADD_RAFT_VOTER),
REMOVE_RAFT_VOTER(ApiMessageType.REMOVE_RAFT_VOTER),
ADD_RAFT_VOTER(ApiMessageType.ADD_RAFT_VOTER, false, RecordBatch.MAGIC_VALUE_V0, true),
REMOVE_RAFT_VOTER(ApiMessageType.REMOVE_RAFT_VOTER, false, RecordBatch.MAGIC_VALUE_V0, true),
UPDATE_RAFT_VOTER(ApiMessageType.UPDATE_RAFT_VOTER),
INITIALIZE_SHARE_GROUP_STATE(ApiMessageType.INITIALIZE_SHARE_GROUP_STATE, true),
READ_SHARE_GROUP_STATE(ApiMessageType.READ_SHARE_GROUP_STATE, true),

View File

@ -46,7 +46,7 @@ public class AddRaftVoterResponse extends AbstractResponse {
@Override
public void maybeSetThrottleTimeMs(int throttleTimeMs) {
// not supported
data.setThrottleTimeMs(throttleTimeMs);
}
@Override

View File

@ -32,26 +32,39 @@ public class ApiVersionsRequest extends AbstractRequest {
public static class Builder extends AbstractRequest.Builder<ApiVersionsRequest> {
private static final String DEFAULT_CLIENT_SOFTWARE_NAME = "apache-kafka-java";
private static final ApiVersionsRequestData DATA = new ApiVersionsRequestData()
private static final ApiVersionsRequestData DEFAULT_DATA = new ApiVersionsRequestData()
.setClientSoftwareName(DEFAULT_CLIENT_SOFTWARE_NAME)
.setClientSoftwareVersion(AppInfoParser.getVersion());
private final ApiVersionsRequestData data;
public Builder() {
super(ApiKeys.API_VERSIONS);
this(DEFAULT_DATA,
ApiKeys.API_VERSIONS.oldestVersion(),
ApiKeys.API_VERSIONS.latestVersion());
}
public Builder(short version) {
super(ApiKeys.API_VERSIONS, version);
this(DEFAULT_DATA, version, version);
}
public Builder(
ApiVersionsRequestData data,
short oldestAllowedVersion,
short latestAllowedVersion
) {
super(ApiKeys.API_VERSIONS, oldestAllowedVersion, latestAllowedVersion);
this.data = data.duplicate();
}
@Override
public ApiVersionsRequest build(short version) {
return new ApiVersionsRequest(DATA, version);
return new ApiVersionsRequest(data, version);
}
@Override
public String toString() {
return DATA.toString();
return data.toString();
}
}

View File

@ -46,7 +46,7 @@ public class RemoveRaftVoterResponse extends AbstractResponse {
@Override
public void maybeSetThrottleTimeMs(int throttleTimeMs) {
// not supported
data.setThrottleTimeMs(throttleTimeMs);
}
@Override

View File

@ -21,7 +21,7 @@
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "ClusterId", "type": "string", "versions": "0+" },
{ "name": "ClusterId", "type": "string", "versions": "0+", "nullableVersions": "0+" },
{ "name": "TimeoutMs", "type": "int32", "versions": "0+" },
{ "name": "VoterId", "type": "int32", "versions": "0+",
"about": "The replica id of the voter getting added to the topic partition" },

View File

@ -21,7 +21,7 @@
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "ClusterId", "type": "string", "versions": "0+" },
{ "name": "ClusterId", "type": "string", "versions": "0+", "nullableVersions": "0+" },
{ "name": "VoterId", "type": "int32", "versions": "0+",
"about": "The replica id of the voter getting removed from the topic partition" },
{ "name": "VoterDirectoryId", "type": "uuid", "versions": "0+",

View File

@ -74,6 +74,8 @@ import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.feature.Features;
import org.apache.kafka.common.internals.Topic;
import org.apache.kafka.common.message.AddRaftVoterRequestData;
import org.apache.kafka.common.message.AddRaftVoterResponseData;
import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData;
import org.apache.kafka.common.message.AlterReplicaLogDirsResponseData;
import org.apache.kafka.common.message.AlterReplicaLogDirsResponseData.AlterReplicaLogDirPartitionResult;
@ -143,6 +145,8 @@ import org.apache.kafka.common.message.OffsetDeleteResponseData.OffsetDeleteResp
import org.apache.kafka.common.message.OffsetFetchRequestData;
import org.apache.kafka.common.message.OffsetFetchRequestData.OffsetFetchRequestGroup;
import org.apache.kafka.common.message.OffsetFetchRequestData.OffsetFetchRequestTopics;
import org.apache.kafka.common.message.RemoveRaftVoterRequestData;
import org.apache.kafka.common.message.RemoveRaftVoterResponseData;
import org.apache.kafka.common.message.ShareGroupDescribeResponseData;
import org.apache.kafka.common.message.UnregisterBrokerResponseData;
import org.apache.kafka.common.message.WriteTxnMarkersResponseData;
@ -153,6 +157,8 @@ import org.apache.kafka.common.quota.ClientQuotaEntity;
import org.apache.kafka.common.quota.ClientQuotaFilter;
import org.apache.kafka.common.quota.ClientQuotaFilterComponent;
import org.apache.kafka.common.record.RecordVersion;
import org.apache.kafka.common.requests.AddRaftVoterRequest;
import org.apache.kafka.common.requests.AddRaftVoterResponse;
import org.apache.kafka.common.requests.AlterClientQuotasResponse;
import org.apache.kafka.common.requests.AlterPartitionReassignmentsResponse;
import org.apache.kafka.common.requests.AlterReplicaLogDirsResponse;
@ -215,6 +221,8 @@ import org.apache.kafka.common.requests.OffsetDeleteResponse;
import org.apache.kafka.common.requests.OffsetFetchRequest;
import org.apache.kafka.common.requests.OffsetFetchResponse;
import org.apache.kafka.common.requests.OffsetFetchResponse.PartitionData;
import org.apache.kafka.common.requests.RemoveRaftVoterRequest;
import org.apache.kafka.common.requests.RemoveRaftVoterResponse;
import org.apache.kafka.common.requests.RequestTestUtils;
import org.apache.kafka.common.requests.ShareGroupDescribeResponse;
import org.apache.kafka.common.requests.UnregisterBrokerResponse;
@ -236,6 +244,7 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.slf4j.Logger;
@ -266,6 +275,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@ -8268,4 +8278,92 @@ public class KafkaAdminClientTest {
}
}
}
@ParameterizedTest
@CsvSource({ "false, false", "false, true", "true, false", "true, true" })
public void testAddRaftVoterRequest(boolean fail, boolean sendClusterId) throws Exception {
try (AdminClientUnitTestEnv env = mockClientEnv()) {
AddRaftVoterResponseData responseData = new AddRaftVoterResponseData();
if (fail) {
responseData.
setErrorCode(Errors.DUPLICATE_VOTER.code()).
setErrorMessage("duplicate");
}
AtomicReference<AddRaftVoterRequestData> requestData = new AtomicReference<>();
env.kafkaClient().prepareResponse(
request -> {
if (!(request instanceof AddRaftVoterRequest)) return false;
requestData.set((AddRaftVoterRequestData) request.data());
return true;
},
new AddRaftVoterResponse(responseData));
AddRaftVoterOptions options = new AddRaftVoterOptions();
if (sendClusterId) {
options.setClusterId(Optional.of("_o_GnDGwQaWu4r-NMzmkTw"));
}
AddRaftVoterResult result = env.adminClient().addRaftVoter(1,
Uuid.fromString("YAfa4HClT3SIIW2klIUspg"),
Collections.singleton(new RaftVoterEndpoint("CONTROLLER", "example.com", 8080)),
options);
assertNotNull(result.all());
if (fail) {
TestUtils.assertFutureThrows(result.all(), Errors.DUPLICATE_VOTER.exception().getClass());
} else {
result.all().get();
}
if (sendClusterId) {
assertEquals("_o_GnDGwQaWu4r-NMzmkTw", requestData.get().clusterId());
} else {
assertNull(requestData.get().clusterId());
}
assertEquals(1000, requestData.get().timeoutMs());
assertEquals(1, requestData.get().voterId());
assertEquals(Uuid.fromString("YAfa4HClT3SIIW2klIUspg"), requestData.get().voterDirectoryId());
assertEquals(new AddRaftVoterRequestData.Listener().
setName("CONTROLLER").
setHost("example.com").
setPort(8080), requestData.get().listeners().find("CONTROLLER"));
}
}
@ParameterizedTest
@CsvSource({ "false, false", "false, true", "true, false", "true, true" })
public void testRemoveRaftVoterRequest(boolean fail, boolean sendClusterId) throws Exception {
try (AdminClientUnitTestEnv env = mockClientEnv()) {
RemoveRaftVoterResponseData responseData = new RemoveRaftVoterResponseData();
if (fail) {
responseData.
setErrorCode(Errors.VOTER_NOT_FOUND.code()).
setErrorMessage("not found");
}
AtomicReference<RemoveRaftVoterRequestData> requestData = new AtomicReference<>();
env.kafkaClient().prepareResponse(
request -> {
if (!(request instanceof RemoveRaftVoterRequest)) return false;
requestData.set((RemoveRaftVoterRequestData) request.data());
return true;
},
new RemoveRaftVoterResponse(responseData));
RemoveRaftVoterOptions options = new RemoveRaftVoterOptions();
if (sendClusterId) {
options.setClusterId(Optional.of("_o_GnDGwQaWu4r-NMzmkTw"));
}
RemoveRaftVoterResult result = env.adminClient().removeRaftVoter(1,
Uuid.fromString("YAfa4HClT3SIIW2klIUspg"),
options);
assertNotNull(result.all());
if (fail) {
TestUtils.assertFutureThrows(result.all(), Errors.VOTER_NOT_FOUND.exception().getClass());
} else {
result.all().get();
}
if (sendClusterId) {
assertEquals("_o_GnDGwQaWu4r-NMzmkTw", requestData.get().clusterId());
} else {
assertNull(requestData.get().clusterId());
}
assertEquals(1, requestData.get().voterId());
assertEquals(Uuid.fromString("YAfa4HClT3SIIW2klIUspg"), requestData.get().voterDirectoryId());
}
}
}

View File

@ -23,47 +23,14 @@ import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.Utils
import java.util.Locale
import scala.collection.Map
object EndPoint {
private val uriParseExp = """^(.*)://\[?([0-9a-zA-Z\-%._:]*)\]?:(-?[0-9]+)""".r
private[kafka] val DefaultSecurityProtocolMap: Map[ListenerName, SecurityProtocol] =
SecurityProtocol.values.map(sp => ListenerName.forSecurityProtocol(sp) -> sp).toMap
/**
* Create EndPoint object from `connectionString` and optional `securityProtocolMap`. If the latter is not provided,
* we fallback to the default behaviour where listener names are the same as security protocols.
*
* @param connectionString the format is listener_name://host:port or listener_name://[ipv6 host]:port
* for example: PLAINTEXT://myhost:9092, CLIENT://myhost:9092 or REPLICATION://[::1]:9092
* Host can be empty (PLAINTEXT://:9092) in which case we'll bind to default interface
* Negative ports are also accepted, since they are used in some unit tests
*/
def createEndPoint(connectionString: String, securityProtocolMap: Option[Map[ListenerName, SecurityProtocol]]): EndPoint = {
val protocolMap = securityProtocolMap.getOrElse(DefaultSecurityProtocolMap)
def securityProtocol(listenerName: ListenerName): SecurityProtocol =
protocolMap.getOrElse(listenerName,
throw new IllegalArgumentException(s"No security protocol defined for listener ${listenerName.value}"))
connectionString match {
case uriParseExp(listenerNameString, "", port) =>
val listenerName = ListenerName.normalised(listenerNameString)
new EndPoint(null, port.toInt, listenerName, securityProtocol(listenerName))
case uriParseExp(listenerNameString, host, port) =>
val listenerName = ListenerName.normalised(listenerNameString)
new EndPoint(host, port.toInt, listenerName, securityProtocol(listenerName))
case _ => throw new KafkaException(s"Unable to parse $connectionString to a broker endpoint")
}
}
def parseListenerName(connectionString: String): String = {
connectionString match {
case uriParseExp(listenerNameString, _, _) => listenerNameString.toUpperCase(Locale.ROOT)
case _ => throw new KafkaException(s"Unable to parse a listener name from $connectionString")
val firstColon = connectionString.indexOf(':')
if (firstColon < 0) {
throw new KafkaException(s"Unable to parse a listener name from $connectionString")
}
connectionString.substring(0, firstColon).toUpperCase(Locale.ROOT)
}
def fromJava(endpoint: JEndpoint): EndPoint =

View File

@ -25,6 +25,7 @@ import kafka.utils.{Exit, Logging}
import net.sourceforge.argparse4j.ArgumentParsers
import net.sourceforge.argparse4j.impl.Arguments.{append, store, storeTrue}
import net.sourceforge.argparse4j.inf.{ArgumentParserException, Namespace}
import net.sourceforge.argparse4j.internal.HelpScreenException
import org.apache.kafka.common.Uuid
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.server.common.MetadataVersion
@ -68,8 +69,10 @@ object StorageTool extends Logging {
val namespace = try {
parseArguments(args)
} catch {
case _: HelpScreenException =>
return 0
case e: ArgumentParserException =>
e.printStackTrace(printStream)
e.getParser.handleError(e)
return 1
}
val command = namespace.getString("command")
@ -137,16 +140,16 @@ object StorageTool extends Logging {
if (!config.processRoles.contains(ProcessRole.ControllerRole)) {
throw new TerseFailure("You cannot use --standalone on a broker node.")
}
if (config.controllerListeners.isEmpty) {
if (config.effectiveAdvertisedControllerListeners.isEmpty) {
throw new RuntimeException("No controller listeners found.")
}
val host = if (config.controllerListeners.head.host == null) {
val listener = config.effectiveAdvertisedControllerListeners.head
val host = if (listener.host == null) {
"localhost"
} else {
config.controllerListeners.head.host
listener.host
}
val port = config.controllerListeners.head.port
DynamicVoters.parse(s"${config.nodeId}@${host}:${port}:${Uuid.randomUuid()}")
DynamicVoters.parse(s"${config.nodeId}@${host}:${listener.port}:${Uuid.randomUuid()}")
}
def parseArguments(args: Array[String]): Namespace = {
@ -190,8 +193,8 @@ object StorageTool extends Logging {
help("Used to initialize a single-node quorum controller quorum.").
action(storeTrue())
reconfigurableQuorumOptions.addArgument("--initial-controllers", "-I").
help("A list of controller quorum voter ids, directories, and hostname:port pairs. The same values must be used to format all nodes. For example:\n" +
"0@localhost:8082:JEXY6aqzQY-32P5TStzaFg@,1@localhost:8083:MvDxzVmcRsaTz33bUuRU6A,2@localhost:8084:07R5amHmR32VDA6jHkGbTA\n").
help("The initial controllers, as a comma-separated list of id@hostname:port:directory. The same values must be used to format all nodes. For example:\n" +
"0@example.com:8082:JEXY6aqzQY-32P5TStzaFg,1@example.com:8083:MvDxzVmcRsaTz33bUuRU6A,2@example.com:8084:07R5amHmR32VDA6jHkGbTA\n").
action(store())
parser.parseArgs(args)
}

View File

@ -32,7 +32,7 @@ import org.apache.commons.validator.routines.InetAddressValidator
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.server.util.Csv
import org.apache.kafka.network.SocketServerConfigs
import org.slf4j.event.Level
import java.util
@ -209,8 +209,8 @@ object CoreUtils {
}
val endPoints = try {
val listenerList = Csv.parseCsvList(listeners)
listenerList.asScala.map(EndPoint.createEndPoint(_, Some(securityProtocolMap)))
SocketServerConfigs.listenerListToEndPoints(listeners, securityProtocolMap.asJava).
asScala.map(EndPoint.fromJava(_))
} catch {
case e: Exception =>
throw new IllegalArgumentException(s"Error creating broker listeners from '$listeners': ${e.getMessage}", e)

View File

@ -40,6 +40,7 @@ import org.apache.kafka.common.utils.{SecurityUtils, Time}
import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid}
import org.apache.kafka.metadata.LeaderRecoveryState
import org.apache.kafka.metadata.migration.ZkMigrationLeadershipState
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.security.authorizer.AclEntry
import org.apache.kafka.server.common.{MetadataVersion, ProducerIdsBlock}
import org.apache.kafka.server.common.MetadataVersion.{IBP_0_10_0_IV1, IBP_2_7_IV0}
@ -275,12 +276,20 @@ object BrokerIdZNode {
Seq(endPoint)
}
else {
val securityProtocolMap = brokerInfo.get(ListenerSecurityProtocolMapKey).map(
_.to[Map[String, String]].map { case (listenerName, securityProtocol) =>
new ListenerName(listenerName) -> SecurityProtocol.forName(securityProtocol)
})
val listeners = brokerInfo(EndpointsKey).to[Seq[String]]
listeners.map(EndPoint.createEndPoint(_, securityProtocolMap))
val securityProtocolMap = brokerInfo.get(ListenerSecurityProtocolMapKey) match {
case None => SocketServerConfigs.DEFAULT_NAME_TO_SECURITY_PROTO
case Some(m) => {
val result = new java.util.HashMap[ListenerName, SecurityProtocol]()
m.to[Map[String, String]].foreach {
case (k, v) => result.put(
new ListenerName(k), SecurityProtocol.forName(v))
}
result
}
}
val listenersString = brokerInfo(EndpointsKey).to[Seq[String]].mkString(",")
SocketServerConfigs.listenerListToEndPoints(listenersString, securityProtocolMap).
asScala.map(EndPoint.fromJava(_))
}
val rack = brokerInfo.get(RackKey).flatMap(_.to[Option[String]])

View File

@ -24,7 +24,7 @@ import org.apache.kafka.common.feature.{Features, SupportedVersionRange}
import org.apache.kafka.common.feature.Features._
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.junit.jupiter.api.Assertions.{assertEquals, assertNotEquals, assertNull}
import org.junit.jupiter.api.Assertions.{assertEquals, assertNotEquals}
import org.junit.jupiter.api.Test
import scala.jdk.CollectionConverters._
@ -199,45 +199,6 @@ class BrokerEndPointTest {
broker.features)
}
@Test
def testEndpointFromUri(): Unit = {
var connectionString = "PLAINTEXT://localhost:9092"
var endpoint = EndPoint.createEndPoint(connectionString, None)
assertEquals("localhost", endpoint.host)
assertEquals(9092, endpoint.port)
assertEquals("PLAINTEXT://localhost:9092", endpoint.connectionString)
// KAFKA-3719
connectionString = "PLAINTEXT://local_host:9092"
endpoint = EndPoint.createEndPoint(connectionString, None)
assertEquals("local_host", endpoint.host)
assertEquals(9092, endpoint.port)
assertEquals("PLAINTEXT://local_host:9092", endpoint.connectionString)
// also test for default bind
connectionString = "PLAINTEXT://:9092"
endpoint = EndPoint.createEndPoint(connectionString, None)
assertNull(endpoint.host)
assertEquals(9092, endpoint.port)
assertEquals( "PLAINTEXT://:9092", endpoint.connectionString)
// also test for ipv6
connectionString = "PLAINTEXT://[::1]:9092"
endpoint = EndPoint.createEndPoint(connectionString, None)
assertEquals("::1", endpoint.host)
assertEquals(9092, endpoint.port)
assertEquals("PLAINTEXT://[::1]:9092", endpoint.connectionString)
// test for ipv6 with % character
connectionString = "PLAINTEXT://[fe80::b1da:69ca:57f7:63d8%3]:9092"
endpoint = EndPoint.createEndPoint(connectionString, None)
assertEquals("fe80::b1da:69ca:57f7:63d8%3", endpoint.host)
assertEquals(9092, endpoint.port)
assertEquals("PLAINTEXT://[fe80::b1da:69ca:57f7:63d8%3]:9092", endpoint.connectionString)
// test hostname
connectionString = "PLAINTEXT://MyHostname:9092"
endpoint = EndPoint.createEndPoint(connectionString, None)
assertEquals("MyHostname", endpoint.host)
assertEquals(9092, endpoint.port)
assertEquals("PLAINTEXT://MyHostname:9092", endpoint.connectionString)
}
private def parseBrokerJson(id: Int, jsonString: String): Broker =
BrokerIdZNode.decode(id, jsonString.getBytes(StandardCharsets.UTF_8)).broker
}

View File

@ -630,7 +630,7 @@ class KafkaConfigTest {
}
private def listenerListToEndPoints(listenerList: String,
securityProtocolMap: collection.Map[ListenerName, SecurityProtocol] = EndPoint.DefaultSecurityProtocolMap) =
securityProtocolMap: collection.Map[ListenerName, SecurityProtocol] = SocketServerConfigs.DEFAULT_NAME_TO_SECURITY_PROTO.asScala) =
CoreUtils.listenerListToEndPoints(listenerList, securityProtocolMap)
@Test

View File

@ -24,6 +24,7 @@ import java.util
import java.util.Properties
import kafka.server.KafkaConfig
import kafka.utils.TestUtils
import net.sourceforge.argparse4j.inf.ArgumentParserException
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.server.common.Features
import org.apache.kafka.metadata.properties.{MetaPropertiesEnsemble, PropertiesUtils}
@ -400,21 +401,13 @@ Found problem:
@Test
def testFormatWithStandaloneFlagAndInitialControllersFlagFails(): Unit = {
val availableDirs = Seq(TestUtils.tempDir())
val properties = new Properties()
properties.putAll(defaultDynamicQuorumProperties)
properties.setProperty("log.dirs", availableDirs.mkString(","))
val stream = new ByteArrayOutputStream()
val arguments = ListBuffer[String](
"--release-version", "3.9-IV0",
"--standalone", "--initial-controllers",
"0@localhost:8020:K90IZ-0DRNazJ49kCZ1EMQ," +
"1@localhost:8030:aUARLskQTCW4qCZDtS_cwA," +
"2@localhost:8040:2ggvsS4kQb-fSJ_-zC_Ang")
assertEquals(1, runFormatCommand(stream, properties, arguments.toSeq))
assertTrue(stream.toString().contains("net.sourceforge.argparse4j.inf.ArgumentParserException: " +
"argument --initial-controllers/-I: not allowed with argument --standalone/-s"),
"Failed to find content in output: " + stream.toString())
assertThrows(classOf[ArgumentParserException], () => StorageTool.parseArguments(arguments.toArray))
}
@ParameterizedTest

View File

@ -183,6 +183,10 @@ public final class DynamicVoter {
@Override
public String toString() {
return nodeId + "@" + host + ":" + port + ":" + directoryId;
if (host.contains(":")) {
return nodeId + "@[" + host + "]:" + port + ":" + directoryId;
} else {
return nodeId + "@" + host + ":" + port + ":" + directoryId;
}
}
}

View File

@ -19,6 +19,7 @@ package org.apache.kafka.raft;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.message.ApiVersionsRequestData;
import org.apache.kafka.common.message.BeginQuorumEpochRequestData;
import org.apache.kafka.common.message.EndQuorumEpochRequestData;
import org.apache.kafka.common.message.FetchRequestData;
@ -29,6 +30,7 @@ import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.ApiVersionsRequest;
import org.apache.kafka.common.requests.BeginQuorumEpochRequest;
import org.apache.kafka.common.requests.EndQuorumEpochRequest;
import org.apache.kafka.common.requests.FetchRequest;
@ -185,6 +187,10 @@ public class KafkaNetworkChannel implements NetworkChannel {
return new FetchRequest.SimpleBuilder((FetchRequestData) requestData);
if (requestData instanceof FetchSnapshotRequestData)
return new FetchSnapshotRequest.Builder((FetchSnapshotRequestData) requestData);
if (requestData instanceof ApiVersionsRequestData)
return new ApiVersionsRequest.Builder((ApiVersionsRequestData) requestData,
ApiKeys.API_VERSIONS.oldestVersion(),
ApiKeys.API_VERSIONS.latestVersion());
throw new IllegalArgumentException("Unexpected type for requestData: " + requestData);
}
}

View File

@ -21,6 +21,7 @@ import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.feature.SupportedVersionRange;
import org.apache.kafka.common.message.AddRaftVoterRequestData;
import org.apache.kafka.common.message.AddRaftVoterResponseData;
import org.apache.kafka.common.message.ApiVersionsResponseData;
import org.apache.kafka.common.message.BeginQuorumEpochRequestData;
import org.apache.kafka.common.message.BeginQuorumEpochResponseData;
import org.apache.kafka.common.message.DescribeQuorumRequestData;
@ -67,6 +68,8 @@ public class RaftUtil {
return new FetchResponseData().setErrorCode(error.code());
case FETCH_SNAPSHOT:
return new FetchSnapshotResponseData().setErrorCode(error.code());
case API_VERSIONS:
return new ApiVersionsResponseData().setErrorCode(error.code());
default:
throw new IllegalArgumentException("Received response for unexpected request type: " + apiKey);
}

View File

@ -16,12 +16,23 @@
*/
package org.apache.kafka.network;
import org.apache.kafka.common.Endpoint;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.server.config.ReplicationConfigs;
import org.apache.kafka.server.util.Csv;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import static org.apache.kafka.common.config.ConfigDef.Importance.HIGH;
@ -184,4 +195,58 @@ public class SocketServerConfigs {
.define(QUEUED_MAX_REQUESTS_CONFIG, INT, QUEUED_MAX_REQUESTS_DEFAULT, atLeast(1), HIGH, QUEUED_MAX_REQUESTS_DOC)
.define(QUEUED_MAX_BYTES_CONFIG, LONG, QUEUED_MAX_REQUEST_BYTES_DEFAULT, MEDIUM, QUEUED_MAX_REQUEST_BYTES_DOC)
.define(NUM_NETWORK_THREADS_CONFIG, INT, NUM_NETWORK_THREADS_DEFAULT, atLeast(1), HIGH, NUM_NETWORK_THREADS_DOC);
private static final Pattern URI_PARSE_REGEXP = Pattern.compile(
"^(.*)://\\[?([0-9a-zA-Z\\-%._:]*)\\]?:(-?[0-9]+)");
public static final Map<ListenerName, SecurityProtocol> DEFAULT_NAME_TO_SECURITY_PROTO;
static {
HashMap<ListenerName, SecurityProtocol> nameToSecurityProtocol = new HashMap<>();
for (SecurityProtocol securityProtocol : SecurityProtocol.values()) {
nameToSecurityProtocol.put(ListenerName.forSecurityProtocol(securityProtocol), securityProtocol);
}
DEFAULT_NAME_TO_SECURITY_PROTO = Collections.unmodifiableMap(nameToSecurityProtocol);
}
public static List<Endpoint> listenerListToEndPoints(
String input,
Map<ListenerName, SecurityProtocol> nameToSecurityProto
) {
return listenerListToEndPoints(input, n -> {
SecurityProtocol result = nameToSecurityProto.get(n);
if (result == null) {
throw new IllegalArgumentException("No security protocol defined for listener " + n.value());
}
return result;
});
}
public static List<Endpoint> listenerListToEndPoints(
String input,
Function<ListenerName, SecurityProtocol> nameToSecurityProto
) {
List<Endpoint> results = new ArrayList<>();
for (String entry : Csv.parseCsvList(input.trim())) {
Matcher matcher = URI_PARSE_REGEXP.matcher(entry);
if (!matcher.matches()) {
throw new KafkaException("Unable to parse " + entry + " to a broker endpoint");
}
ListenerName listenerName = ListenerName.normalised(matcher.group(1));
String host = matcher.group(2);
if (host.isEmpty()) {
// By Kafka convention, an empty host string indicates binding to the wildcard
// address, and is stored as null.
host = null;
}
String portString = matcher.group(3);
int port = Integer.parseInt(portString);
SecurityProtocol securityProtocol = nameToSecurityProto.apply(listenerName);
results.add(new Endpoint(listenerName.value(),
securityProtocol,
host,
port));
}
return results;
}
}

View File

@ -0,0 +1,107 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.network;
import org.apache.kafka.common.Endpoint;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class SocketServerConfigsTest {
@Test
public void testDefaultNameToSecurityProto() {
Map<ListenerName, SecurityProtocol> expected = new HashMap<>();
expected.put(new ListenerName("PLAINTEXT"), SecurityProtocol.PLAINTEXT);
expected.put(new ListenerName("SSL"), SecurityProtocol.SSL);
expected.put(new ListenerName("SASL_PLAINTEXT"), SecurityProtocol.SASL_PLAINTEXT);
expected.put(new ListenerName("SASL_SSL"), SecurityProtocol.SASL_SSL);
assertEquals(expected, SocketServerConfigs.DEFAULT_NAME_TO_SECURITY_PROTO);
}
@Test
public void testListenerListToEndPointsWithEmptyString() {
assertEquals(Arrays.asList(),
SocketServerConfigs.listenerListToEndPoints("",
SocketServerConfigs.DEFAULT_NAME_TO_SECURITY_PROTO));
}
@Test
public void testListenerListToEndPointsWithBlankString() {
assertEquals(Arrays.asList(),
SocketServerConfigs.listenerListToEndPoints(" ",
SocketServerConfigs.DEFAULT_NAME_TO_SECURITY_PROTO));
}
@Test
public void testListenerListToEndPointsWithOneEndpoint() {
assertEquals(Arrays.asList(
new Endpoint("PLAINTEXT", SecurityProtocol.PLAINTEXT, "example.com", 8080)),
SocketServerConfigs.listenerListToEndPoints("PLAINTEXT://example.com:8080",
SocketServerConfigs.DEFAULT_NAME_TO_SECURITY_PROTO));
}
// Regression test for KAFKA-3719
@Test
public void testListenerListToEndPointsWithUnderscores() {
assertEquals(Arrays.asList(
new Endpoint("PLAINTEXT", SecurityProtocol.PLAINTEXT, "example.com", 8080),
new Endpoint("SSL", SecurityProtocol.SSL, "local_host", 8081)),
SocketServerConfigs.listenerListToEndPoints("PLAINTEXT://example.com:8080,SSL://local_host:8081",
SocketServerConfigs.DEFAULT_NAME_TO_SECURITY_PROTO));
}
@Test
public void testListenerListToEndPointsWithWildcard() {
assertEquals(Arrays.asList(
new Endpoint("PLAINTEXT", SecurityProtocol.PLAINTEXT, null, 8080)),
SocketServerConfigs.listenerListToEndPoints("PLAINTEXT://:8080",
SocketServerConfigs.DEFAULT_NAME_TO_SECURITY_PROTO));
}
@Test
public void testListenerListToEndPointsWithIpV6() {
assertEquals(Arrays.asList(
new Endpoint("PLAINTEXT", SecurityProtocol.PLAINTEXT, "::1", 9092)),
SocketServerConfigs.listenerListToEndPoints("PLAINTEXT://[::1]:9092",
SocketServerConfigs.DEFAULT_NAME_TO_SECURITY_PROTO));
}
@Test
public void testAnotherListenerListToEndPointsWithIpV6() {
assertEquals(Arrays.asList(
new Endpoint("SASL_SSL", SecurityProtocol.SASL_SSL, "fe80::b1da:69ca:57f7:63d8%3", 9092)),
SocketServerConfigs.listenerListToEndPoints("SASL_SSL://[fe80::b1da:69ca:57f7:63d8%3]:9092",
SocketServerConfigs.DEFAULT_NAME_TO_SECURITY_PROTO));
}
@Test
public void testAnotherListenerListToEndPointsWithNonDefaultProtoMap() {
Map<ListenerName, SecurityProtocol> map = new HashMap<>();
map.put(new ListenerName("CONTROLLER"), SecurityProtocol.PLAINTEXT);
assertEquals(Arrays.asList(
new Endpoint("CONTROLLER", SecurityProtocol.PLAINTEXT, "example.com", 9093)),
SocketServerConfigs.listenerListToEndPoints("CONTROLLER://example.com:9093",
map));
}
}

View File

@ -19,31 +19,46 @@ package org.apache.kafka.tools;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.QuorumInfo;
import org.apache.kafka.clients.admin.RaftVoterEndpoint;
import org.apache.kafka.common.Endpoint;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.metadata.properties.MetaProperties;
import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble;
import org.apache.kafka.network.SocketServerConfigs;
import org.apache.kafka.server.config.KRaftConfigs;
import org.apache.kafka.server.config.ServerLogConfigs;
import org.apache.kafka.server.util.CommandLineUtils;
import net.sourceforge.argparse4j.ArgumentParsers;
import net.sourceforge.argparse4j.impl.Arguments;
import net.sourceforge.argparse4j.inf.ArgumentGroup;
import net.sourceforge.argparse4j.inf.ArgumentParser;
import net.sourceforge.argparse4j.inf.ArgumentParserException;
import net.sourceforge.argparse4j.inf.MutuallyExclusiveGroup;
import net.sourceforge.argparse4j.inf.Namespace;
import net.sourceforge.argparse4j.inf.Subparser;
import net.sourceforge.argparse4j.inf.Subparsers;
import net.sourceforge.argparse4j.internal.HelpScreenException;
import java.io.File;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@ -65,6 +80,11 @@ public class MetadataQuorumCommand {
try {
execute(args);
return 0;
} catch (HelpScreenException e) {
return 0;
} catch (ArgumentParserException e) {
e.getParser().handleError(e);
return 1;
} catch (TerseException e) {
System.err.println(e.getMessage());
return 1;
@ -88,11 +108,14 @@ public class MetadataQuorumCommand {
parser.addArgument("--command-config")
.type(Arguments.fileType())
.help("Property file containing configs to be passed to Admin Client.");
addDescribeSubParser(parser);
Subparsers subparsers = parser.addSubparsers().dest("command");
addDescribeSubParser(subparsers);
addAddControllerSubParser(subparsers);
addRemoveControllerSubParser(subparsers);
Admin admin = null;
try {
Namespace namespace = parser.parseArgsOrFail(args);
Namespace namespace = parser.parseArgs(args);
String command = namespace.getString("command");
File optionalCommandConfig = namespace.get("command_config");
@ -116,8 +139,21 @@ public class MetadataQuorumCommand {
} else {
throw new TerseException("One of --status or --replication must be specified with describe sub-command");
}
} else if (command.equals("add-controller")) {
if (optionalCommandConfig == null) {
throw new TerseException("You must supply the configuration file of the controller you are " +
"adding when using add-controller.");
}
handleAddController(admin,
namespace.getBoolean("dry_run"),
props);
} else if (command.equals("remove-controller")) {
handleRemoveController(admin,
namespace.getInt("controller_id"),
namespace.getString("controller_directory_id"),
namespace.getBoolean("dry_run"));
} else {
throw new IllegalStateException(format("Unknown command: %s, only 'describe' is supported", command));
throw new IllegalStateException(format("Unknown command: %s", command));
}
} finally {
if (admin != null)
@ -135,8 +171,7 @@ public class MetadataQuorumCommand {
}
}
private static void addDescribeSubParser(ArgumentParser parser) {
Subparsers subparsers = parser.addSubparsers().dest("command");
private static void addDescribeSubParser(Subparsers subparsers) {
Subparser describeParser = subparsers
.addParser("describe")
.help("Describe the metadata quorum info");
@ -293,4 +328,179 @@ public class MetadataQuorumCommand {
return sb.toString();
}
}
private static void addAddControllerSubParser(Subparsers subparsers) {
Subparser addControllerParser = subparsers
.addParser("add-controller")
.help("Add a controller to the KRaft controller cluster");
addControllerParser
.addArgument("--dry-run")
.help("True if we should print what would be done, but not do it.")
.action(Arguments.storeTrue());
}
static int getControllerId(Properties props) throws TerseException {
if (!props.containsKey(KRaftConfigs.NODE_ID_CONFIG)) {
throw new TerseException(KRaftConfigs.NODE_ID_CONFIG + " not found in configuration " +
"file. Is this a valid controller configuration file?");
}
int nodeId = Integer.parseInt(props.getProperty(KRaftConfigs.NODE_ID_CONFIG));
if (nodeId < 0) {
throw new TerseException(KRaftConfigs.NODE_ID_CONFIG + " was negative in configuration " +
"file. Is this a valid controller configuration file?");
}
if (!props.getOrDefault(KRaftConfigs.PROCESS_ROLES_CONFIG, "").toString().contains("controller")) {
throw new TerseException(KRaftConfigs.PROCESS_ROLES_CONFIG + " did not contain 'controller' in " +
"configuration file. Is this a valid controller configuration file?");
}
return nodeId;
}
static String getMetadataDirectory(Properties props) throws TerseException {
if (props.containsKey(KRaftConfigs.METADATA_LOG_DIR_CONFIG)) {
return props.getProperty(KRaftConfigs.METADATA_LOG_DIR_CONFIG);
}
if (props.containsKey(ServerLogConfigs.LOG_DIRS_CONFIG)) {
String[] logDirs = props.getProperty(ServerLogConfigs.LOG_DIRS_CONFIG).trim().split(",");
if (logDirs.length > 0) {
return logDirs[0];
}
}
throw new TerseException("Neither " + KRaftConfigs.METADATA_LOG_DIR_CONFIG + " nor " +
ServerLogConfigs.LOG_DIRS_CONFIG + " were found. Is this a valid controller " +
"configuration file?");
}
static Uuid getMetadataDirectoryId(String metadataDirectory) throws Exception {
MetaPropertiesEnsemble ensemble = new MetaPropertiesEnsemble.Loader().
addLogDirs(Collections.singletonList(metadataDirectory)).
addMetadataLogDir(metadataDirectory).
load();
MetaProperties metaProperties = ensemble.logDirProps().get(metadataDirectory);
if (metaProperties == null) {
throw new TerseException("Unable to read meta.properties from " + metadataDirectory);
}
if (!metaProperties.directoryId().isPresent()) {
throw new TerseException("No directory id found in " + metadataDirectory);
}
return metaProperties.directoryId().get();
}
static Set<RaftVoterEndpoint> getControllerAdvertisedListeners(
Properties props
) throws Exception {
Map<String, Endpoint> listeners = new HashMap<>();
SocketServerConfigs.listenerListToEndPoints(
props.getOrDefault(SocketServerConfigs.LISTENERS_CONFIG, "").toString(),
__ -> SecurityProtocol.PLAINTEXT).forEach(e -> listeners.put(e.listenerName().get(), e));
SocketServerConfigs.listenerListToEndPoints(
props.getOrDefault(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG, "").toString(),
__ -> SecurityProtocol.PLAINTEXT).forEach(e -> listeners.put(e.listenerName().get(), e));
if (!props.containsKey(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG)) {
throw new TerseException(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG +
" was not found. Is this a valid controller configuration file?");
}
LinkedHashSet<RaftVoterEndpoint> results = new LinkedHashSet<>();
for (String listenerName : props.getProperty(
KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG).split(",")) {
listenerName = ListenerName.normalised(listenerName).value();
Endpoint endpoint = listeners.get(listenerName);
if (endpoint == null) {
throw new TerseException("Cannot find information about controller listener name: " +
listenerName);
}
results.add(new RaftVoterEndpoint(endpoint.listenerName().get(),
endpoint.host() == null ? "localhost" : endpoint.host(),
endpoint.port()));
}
return results;
}
static void handleAddController(
Admin admin,
boolean dryRun,
Properties props
) throws Exception {
int controllerId = getControllerId(props);
String metadataDirectory = getMetadataDirectory(props);
Uuid directoryId = getMetadataDirectoryId(metadataDirectory);
Set<RaftVoterEndpoint> endpoints = getControllerAdvertisedListeners(props);
if (!dryRun) {
admin.addRaftVoter(controllerId, directoryId, endpoints).
all().get();
}
StringBuilder output = new StringBuilder();
if (dryRun) {
output.append("DRY RUN of adding");
} else {
output.append("Added");
}
output.append(" controller ").append(controllerId);
output.append(" with directory id ").append(directoryId);
output.append(" and endpoints: ");
String prefix = "";
for (RaftVoterEndpoint endpoint : endpoints) {
output.append(prefix).append(endpoint.name()).append("://");
if (endpoint.host().contains(":")) {
output.append("[");
}
output.append(endpoint.host());
if (endpoint.host().contains(":")) {
output.append("]");
}
output.append(":").append(endpoint.port());
prefix = ", ";
}
System.out.println(output);
}
private static void addRemoveControllerSubParser(Subparsers subparsers) {
Subparser removeControllerParser = subparsers
.addParser("remove-controller")
.help("Remove a controller from the KRaft controller cluster");
removeControllerParser
.addArgument("--controller-id", "-i")
.help("The id of the controller to remove.")
.type(Integer.class)
.required(true)
.action(Arguments.store());
removeControllerParser
.addArgument("--controller-directory-id", "-d")
.help("The directory ID of the controller to remove.")
.required(true)
.action(Arguments.store());
removeControllerParser
.addArgument("--dry-run")
.help("True if we should print what would be done, but not do it.")
.action(Arguments.storeTrue());
}
static void handleRemoveController(
Admin admin,
int controllerId,
String controllerDirectoryIdString,
boolean dryRun
) throws TerseException, ExecutionException, InterruptedException {
if (controllerId < 0) {
throw new TerseException("Invalid negative --controller-id: " + controllerId);
}
Uuid directoryId;
try {
directoryId = Uuid.fromString(controllerDirectoryIdString);
} catch (IllegalArgumentException e) {
throw new TerseException("Failed to parse --controller-directory-id: " + e.getMessage());
}
if (!dryRun) {
admin.removeRaftVoter(controllerId, directoryId).
all().get();
}
System.out.printf("%s KRaft controller %d with directory id %s%n",
dryRun ? "DRY RUN of removing " : "Removed ",
controllerId,
directoryId);
}
}

View File

@ -18,6 +18,8 @@ package org.apache.kafka.tools;
import org.apache.kafka.common.KafkaException;
import net.sourceforge.argparse4j.inf.ArgumentParserException;
import org.junit.jupiter.api.Test;
import java.time.Instant;
@ -64,4 +66,20 @@ public class MetadataQuorumCommandErrorTest {
assertThrows(KafkaException.class, () -> MetadataQuorumCommand.relativeTimeMs(futureEpochMs, "test"));
}
@Test
public void testRemoveControllerRequiresControllerId() {
assertThrows(ArgumentParserException.class, () ->
MetadataQuorumCommand.execute("--bootstrap-server", "localhost:9092",
"remove-controller",
"--controller-directory-id", "_KWDkTahTVaiVVVTaugNew",
"--dry-run"));
}
@Test
public void testRemoveControllerRequiresControllerDirectoryId() {
assertThrows(ArgumentParserException.class, () ->
MetadataQuorumCommand.execute("--bootstrap-server", "localhost:9092",
"remove-controller",
"--controller-id", "1"));
}
}

View File

@ -0,0 +1,264 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.tools;
import org.apache.kafka.clients.admin.RaftVoterEndpoint;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.metadata.properties.MetaProperties;
import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble;
import org.apache.kafka.metadata.properties.PropertiesUtils;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Test;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class MetadataQuorumCommandUnitTest {
@Test
public void testRemoveControllerDryRun() {
List<String> outputs = Arrays.asList(
ToolsTestUtils.captureStandardOut(() ->
assertEquals(0, MetadataQuorumCommand.mainNoExit("--bootstrap-server", "localhost:9092",
"remove-controller",
"--controller-id", "2",
"--controller-directory-id", "_KWDkTahTVaiVVVTaugNew",
"--dry-run"))).split("\n"));
assertTrue(outputs.contains("DRY RUN of removing KRaft controller 2 with directory id _KWDkTahTVaiVVVTaugNew"),
"Failed to find expected output in stdout: " + outputs);
}
@Test
public void testGetControllerIdWithoutId() {
Properties props = new Properties();
props.setProperty("process.roles", "controller");
assertEquals("node.id not found in configuration file. Is this a valid controller configuration file?",
assertThrows(TerseException.class,
() -> assertEquals(4, MetadataQuorumCommand.getControllerId(props))).
getMessage());
}
@Test
public void testGetControllerId() throws Exception {
Properties props = new Properties();
props.setProperty("node.id", "4");
props.setProperty("process.roles", "controller");
assertEquals(4, MetadataQuorumCommand.getControllerId(props));
}
@Test
public void testGetNegativeControllerId() {
Properties props = new Properties();
props.setProperty("node.id", "-4");
props.setProperty("process.roles", "controller");
assertEquals("node.id was negative in configuration file. Is this a valid controller configuration file?",
assertThrows(TerseException.class,
() -> assertEquals(4, MetadataQuorumCommand.getControllerId(props))).
getMessage());
}
@Test
public void testGetControllerIdWithoutProcessRoles() {
Properties props = new Properties();
props.setProperty("node.id", "4");
assertEquals("process.roles did not contain 'controller' in configuration file. Is this a valid controller configuration file?",
assertThrows(TerseException.class,
() -> assertEquals(4, MetadataQuorumCommand.getControllerId(props))).
getMessage());
}
@Test
public void testGetControllerIdWithBrokerProcessRoles() {
Properties props = new Properties();
props.setProperty("node.id", "4");
props.setProperty("process.roles", "broker");
assertEquals("process.roles did not contain 'controller' in configuration file. Is this a valid controller configuration file?",
assertThrows(TerseException.class,
() -> assertEquals(4, MetadataQuorumCommand.getControllerId(props))).
getMessage());
}
@Test
public void testGetMetadataDirectory() throws Exception {
Properties props = new Properties();
props.setProperty("metadata.log.dir", "/tmp/meta");
props.setProperty("log.dirs", "/tmp/log,/tmp/log2");
assertEquals("/tmp/meta", MetadataQuorumCommand.getMetadataDirectory(props));
}
@Test
public void testGetMetadataDirectoryFromLogDirs() throws Exception {
Properties props = new Properties();
props.setProperty("log.dirs", "/tmp/log");
assertEquals("/tmp/log", MetadataQuorumCommand.getMetadataDirectory(props));
}
@Test
public void testGetMetadataDirectoryFromMultipleLogDirs() throws Exception {
Properties props = new Properties();
props.setProperty("log.dirs", "/tmp/log,/tmp/log2");
assertEquals("/tmp/log", MetadataQuorumCommand.getMetadataDirectory(props));
}
@Test
public void testGetMetadataDirectoryFailure() {
Properties props = new Properties();
assertEquals("Neither metadata.log.dir nor log.dirs were found. Is this a valid controller configuration file?",
assertThrows(TerseException.class,
() -> MetadataQuorumCommand.getMetadataDirectory(props)).
getMessage());
}
static class MetadataQuorumCommandUnitTestEnv implements AutoCloseable {
File metadataDir;
MetadataQuorumCommandUnitTestEnv(Optional<Uuid> directoryId) throws Exception {
this.metadataDir = TestUtils.tempDirectory();
new MetaPropertiesEnsemble.Copier(MetaPropertiesEnsemble.EMPTY).
setMetaLogDir(Optional.of(metadataDir.getAbsolutePath())).
setLogDirProps(metadataDir.getAbsolutePath(),
new MetaProperties.Builder().
setClusterId("Ig-WB32JRqqzct3VafTr0w").
setNodeId(2).
setDirectoryId(directoryId).
build()).
writeLogDirChanges();
}
File writePropertiesFile() throws IOException {
Properties props = new Properties();
props.setProperty("metadata.log.dir", metadataDir.getAbsolutePath());
props.setProperty("log.dirs", metadataDir.getAbsolutePath());
props.setProperty("process.roles", "controller,broker");
props.setProperty("node.id", "5");
props.setProperty("controller.listener.names", "CONTROLLER,CONTROLLER_SSL");
props.setProperty("listeners", "CONTROLLER://:9093,CONTROLLER_SSL://:9094");
props.setProperty("advertised.listeners", "CONTROLLER://example.com:9093,CONTROLLER_SSL://example.com:9094");
File file = new File(metadataDir, "controller.properties");
PropertiesUtils.writePropertiesFile(props, file.getAbsolutePath(), false);
return file;
}
@Override
public void close() throws Exception {
Utils.delete(metadataDir);
}
}
@Test
public void testGetMetadataDirectoryId() throws Exception {
try (MetadataQuorumCommandUnitTestEnv testEnv =
new MetadataQuorumCommandUnitTestEnv(Optional.
of(Uuid.fromString("wZoXPqWoSu6F6c8MkmdyAg")))) {
assertEquals(Uuid.fromString("wZoXPqWoSu6F6c8MkmdyAg"),
MetadataQuorumCommand.getMetadataDirectoryId(testEnv.metadataDir.getAbsolutePath()));
}
}
@Test
public void testGetMetadataDirectoryIdWhenThereIsNoId() throws Exception {
try (MetadataQuorumCommandUnitTestEnv testEnv =
new MetadataQuorumCommandUnitTestEnv(Optional.empty())) {
assertEquals("No directory id found in " + testEnv.metadataDir.getAbsolutePath(),
assertThrows(TerseException.class,
() -> MetadataQuorumCommand.getMetadataDirectoryId(testEnv.metadataDir.getAbsolutePath())).
getMessage());
}
}
@Test
public void testGetMetadataDirectoryIdWhenThereIsNoDirectory() throws Exception {
try (MetadataQuorumCommandUnitTestEnv testEnv =
new MetadataQuorumCommandUnitTestEnv(Optional.empty())) {
testEnv.close();
assertEquals("Unable to read meta.properties from " + testEnv.metadataDir.getAbsolutePath(),
assertThrows(TerseException.class,
() -> MetadataQuorumCommand.getMetadataDirectoryId(testEnv.metadataDir.getAbsolutePath())).
getMessage());
}
}
@Test
public void testGetControllerAdvertisedListenersWithNoControllerListenerNames() {
Properties props = new Properties();
assertEquals("controller.listener.names was not found. Is this a valid controller configuration file?",
assertThrows(TerseException.class,
() -> MetadataQuorumCommand.getControllerAdvertisedListeners(props)).
getMessage());
}
@Test
public void testGetControllerAdvertisedListenersWithNoControllerListenerInformation() {
Properties props = new Properties();
props.setProperty("controller.listener.names", "CONTROLLER,CONTROLLER2");
assertEquals("Cannot find information about controller listener name: CONTROLLER",
assertThrows(TerseException.class,
() -> MetadataQuorumCommand.getControllerAdvertisedListeners(props)).
getMessage());
}
@Test
public void testGetControllerAdvertisedListenersWithRegularListeners() throws Exception {
Properties props = new Properties();
props.setProperty("controller.listener.names", "CONTROLLER,CONTROLLER2");
props.setProperty("listeners", "CONTROLLER://example.com:9092,CONTROLLER2://:9093");
assertEquals(new HashSet<>(Arrays.asList(
new RaftVoterEndpoint("CONTROLLER", "example.com", 9092),
new RaftVoterEndpoint("CONTROLLER2", "localhost", 9093))),
MetadataQuorumCommand.getControllerAdvertisedListeners(props));
}
@Test
public void testGetControllerAdvertisedListenersWithRegularListenersAndAdvertisedListeners() throws Exception {
Properties props = new Properties();
props.setProperty("controller.listener.names", "CONTROLLER,CONTROLLER2");
props.setProperty("listeners", "CONTROLLER://:9092,CONTROLLER2://:9093");
props.setProperty("advertised.listeners", "CONTROLLER://example.com:9092,CONTROLLER2://example.com:9093");
assertEquals(new HashSet<>(Arrays.asList(
new RaftVoterEndpoint("CONTROLLER", "example.com", 9092),
new RaftVoterEndpoint("CONTROLLER2", "example.com", 9093))),
MetadataQuorumCommand.getControllerAdvertisedListeners(props));
}
@Test
public void testAddControllerDryRun() throws Exception {
try (MetadataQuorumCommandUnitTestEnv testEnv =
new MetadataQuorumCommandUnitTestEnv(Optional.
of(Uuid.fromString("wZoXPqWoSu6F6c8MkmdyAg")))) {
File propsFile = testEnv.writePropertiesFile();
List<String> outputs = Arrays.asList(
ToolsTestUtils.captureStandardOut(() ->
assertEquals(0, MetadataQuorumCommand.mainNoExit("--bootstrap-server", "localhost:9092",
"--command-config", propsFile.getAbsolutePath(),
"add-controller",
"--dry-run"))).split("\n"));
assertTrue(outputs.contains("DRY RUN of adding controller 5 with directory id " +
"wZoXPqWoSu6F6c8MkmdyAg and endpoints: CONTROLLER://example.com:9093, CONTROLLER_SSL://example.com:9094"),
"Failed to find expected output in stdout: " + outputs);
}
}
}