KAFKA-10547; Add TopicId in MetadataResponse (#9622)

Includes:
- Bump the version of MetadataRequest and MetadataResponse, add topicId in MetadataResponse
- Alter describeTopic in AdminClientTopicService and ZookeeperTopicService
- TopicMetadata is cached in MetadataCache, so we need to add topicId to MetadataCache
- MetadataCache is updated by UpdateMetadataRequest, bump the version of UpdateMetadataReq and UpdateMetadataResp, add topicId in UpdateMetadataReq.

Reviewers: Justine Olshan <jolshan@confluent.io>, Rajini Sivaram <rajinisivaram@googlemail.com>
This commit is contained in:
dengziming 2020-12-19 05:30:52 +08:00 committed by GitHub
parent baef516789
commit 5c921afa4a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 260 additions and 85 deletions

View File

@ -1788,7 +1788,7 @@ public class KafkaAdminClient extends AdminClient {
}
partitions.sort(Comparator.comparingInt(TopicPartitionInfo::partition));
TopicDescription topicDescription = new TopicDescription(topicName, isInternal, partitions,
validAclOperations(response.topicAuthorizedOperations(topicName).get()));
validAclOperations(response.topicAuthorizedOperations(topicName).get()), cluster.topicId(topicName));
future.complete(topicDescription);
}
}

View File

@ -18,6 +18,7 @@
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.utils.Utils;
@ -34,6 +35,7 @@ public class TopicDescription {
private final boolean internal;
private final List<TopicPartitionInfo> partitions;
private final Set<AclOperation> authorizedOperations;
private final Uuid topicId;
@Override
public boolean equals(final Object o) {
@ -74,10 +76,16 @@ public class TopicDescription {
*/
public TopicDescription(String name, boolean internal, List<TopicPartitionInfo> partitions,
Set<AclOperation> authorizedOperations) {
this(name, internal, partitions, authorizedOperations, Uuid.ZERO_UUID);
}
public TopicDescription(String name, boolean internal, List<TopicPartitionInfo> partitions,
Set<AclOperation> authorizedOperations, Uuid topicId) {
this.name = name;
this.internal = internal;
this.partitions = partitions;
this.authorizedOperations = authorizedOperations;
this.topicId = topicId;
}
/**
@ -95,6 +103,10 @@ public class TopicDescription {
return internal;
}
public Uuid topicId() {
return topicId;
}
/**
* A list of partitions where the index represents the partition id and the element contains leadership and replica
* information for that partition.

View File

@ -46,6 +46,7 @@ public final class Cluster {
private final Map<Integer, List<PartitionInfo>> partitionsByNode;
private final Map<Integer, Node> nodesById;
private final ClusterResource clusterResource;
private final Map<String, Uuid> topicIds;
/**
* Create a new cluster with the given id, nodes and partitions
@ -57,7 +58,7 @@ public final class Cluster {
Collection<PartitionInfo> partitions,
Set<String> unauthorizedTopics,
Set<String> internalTopics) {
this(clusterId, false, nodes, partitions, unauthorizedTopics, Collections.emptySet(), internalTopics, null);
this(clusterId, false, nodes, partitions, unauthorizedTopics, Collections.emptySet(), internalTopics, null, Collections.emptyMap());
}
/**
@ -71,7 +72,7 @@ public final class Cluster {
Set<String> unauthorizedTopics,
Set<String> internalTopics,
Node controller) {
this(clusterId, false, nodes, partitions, unauthorizedTopics, Collections.emptySet(), internalTopics, controller);
this(clusterId, false, nodes, partitions, unauthorizedTopics, Collections.emptySet(), internalTopics, controller, Collections.emptyMap());
}
/**
@ -86,7 +87,23 @@ public final class Cluster {
Set<String> invalidTopics,
Set<String> internalTopics,
Node controller) {
this(clusterId, false, nodes, partitions, unauthorizedTopics, invalidTopics, internalTopics, controller);
this(clusterId, false, nodes, partitions, unauthorizedTopics, invalidTopics, internalTopics, controller, Collections.emptyMap());
}
/**
* Create a new cluster with the given id, nodes, partitions and topicIds
* @param nodes The nodes in the cluster
* @param partitions Information about a subset of the topic-partitions this cluster hosts
*/
public Cluster(String clusterId,
Collection<Node> nodes,
Collection<PartitionInfo> partitions,
Set<String> unauthorizedTopics,
Set<String> invalidTopics,
Set<String> internalTopics,
Node controller,
Map<String, Uuid> topicIds) {
this(clusterId, false, nodes, partitions, unauthorizedTopics, invalidTopics, internalTopics, controller, topicIds);
}
private Cluster(String clusterId,
@ -96,7 +113,8 @@ public final class Cluster {
Set<String> unauthorizedTopics,
Set<String> invalidTopics,
Set<String> internalTopics,
Node controller) {
Node controller,
Map<String, Uuid> topicIds) {
this.isBootstrapConfigured = isBootstrapConfigured;
this.clusterResource = new ClusterResource(clusterId);
// make a randomized, unmodifiable copy of the nodes
@ -165,6 +183,7 @@ public final class Cluster {
this.partitionsByTopic = Collections.unmodifiableMap(tmpPartitionsByTopic);
this.availablePartitionsByTopic = Collections.unmodifiableMap(tmpAvailablePartitionsByTopic);
this.partitionsByNode = Collections.unmodifiableMap(tmpPartitionsByNode);
this.topicIds = Collections.unmodifiableMap(topicIds);
this.unauthorizedTopics = Collections.unmodifiableSet(unauthorizedTopics);
this.invalidTopics = Collections.unmodifiableSet(invalidTopics);
@ -191,7 +210,7 @@ public final class Cluster {
for (InetSocketAddress address : addresses)
nodes.add(new Node(nodeId--, address.getHostString(), address.getPort()));
return new Cluster(null, true, nodes, new ArrayList<>(0),
Collections.emptySet(), Collections.emptySet(), Collections.emptySet(), null);
Collections.emptySet(), Collections.emptySet(), Collections.emptySet(), null, Collections.emptyMap());
}
/**
@ -327,6 +346,14 @@ public final class Cluster {
return controller;
}
public Collection<Uuid> topicIds() {
return topicIds.values();
}
public Uuid topicId(String topic) {
return topicIds.getOrDefault(topic, Uuid.ZERO_UUID);
}
@Override
public String toString() {
return "Cluster(id = " + clusterResource.clusterId() + ", nodes = " + this.nodes +

View File

@ -20,6 +20,7 @@ import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.MetadataResponseData;
import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseBroker;
import org.apache.kafka.common.message.MetadataResponseData.MetadataResponsePartition;
@ -125,18 +126,22 @@ public class MetadataResponse extends AbstractResponse {
public Cluster cluster() {
Set<String> internalTopics = new HashSet<>();
List<PartitionInfo> partitions = new ArrayList<>();
Map<String, Uuid> topicIds = new HashMap<>();
for (TopicMetadata metadata : topicMetadata()) {
if (metadata.error == Errors.NONE) {
if (metadata.isInternal)
internalTopics.add(metadata.topic);
if (metadata.topicId() != null && metadata.topicId() != Uuid.ZERO_UUID) {
topicIds.put(metadata.topic, metadata.topicId());
}
for (PartitionMetadata partitionMetadata : metadata.partitionMetadata) {
partitions.add(toPartitionInfo(partitionMetadata, holder().brokers));
}
}
}
return new Cluster(data.clusterId(), brokers(), partitions, topicsByError(Errors.TOPIC_AUTHORIZATION_FAILED),
topicsByError(Errors.INVALID_TOPIC_EXCEPTION), internalTopics, controller());
topicsByError(Errors.INVALID_TOPIC_EXCEPTION), internalTopics, controller(), topicIds);
}
public static PartitionInfo toPartitionInfo(PartitionMetadata metadata, Map<Integer, Node> nodesById) {
@ -251,17 +256,20 @@ public class MetadataResponse extends AbstractResponse {
public static class TopicMetadata {
private final Errors error;
private final String topic;
private final Uuid topicId;
private final boolean isInternal;
private final List<PartitionMetadata> partitionMetadata;
private int authorizedOperations;
public TopicMetadata(Errors error,
String topic,
Uuid topicId,
boolean isInternal,
List<PartitionMetadata> partitionMetadata,
int authorizedOperations) {
this.error = error;
this.topic = topic;
this.topicId = topicId;
this.isInternal = isInternal;
this.partitionMetadata = partitionMetadata;
this.authorizedOperations = authorizedOperations;
@ -271,7 +279,7 @@ public class MetadataResponse extends AbstractResponse {
String topic,
boolean isInternal,
List<PartitionMetadata> partitionMetadata) {
this(error, topic, isInternal, partitionMetadata, AUTHORIZED_OPERATIONS_OMITTED);
this(error, topic, Uuid.ZERO_UUID, isInternal, partitionMetadata, AUTHORIZED_OPERATIONS_OMITTED);
}
public Errors error() {
@ -282,6 +290,10 @@ public class MetadataResponse extends AbstractResponse {
return topic;
}
public Uuid topicId() {
return topicId;
}
public boolean isInternal() {
return isInternal;
}
@ -306,6 +318,7 @@ public class MetadataResponse extends AbstractResponse {
return isInternal == that.isInternal &&
error == that.error &&
Objects.equals(topic, that.topic) &&
Objects.equals(topicId, that.topicId) &&
Objects.equals(partitionMetadata, that.partitionMetadata) &&
Objects.equals(authorizedOperations, that.authorizedOperations);
}
@ -320,6 +333,7 @@ public class MetadataResponse extends AbstractResponse {
return "TopicMetadata{" +
"error=" + error +
", topic='" + topic + '\'' +
", topicId='" + topicId + '\'' +
", isInternal=" + isInternal +
", partitionMetadata=" + partitionMetadata +
", authorizedOperations=" + authorizedOperations +
@ -405,6 +419,7 @@ public class MetadataResponse extends AbstractResponse {
for (MetadataResponseTopic topicMetadata : data.topics()) {
Errors topicError = Errors.forCode(topicMetadata.errorCode());
String topic = topicMetadata.name();
Uuid topicId = topicMetadata.topicId();
boolean isInternal = topicMetadata.isInternal();
List<PartitionMetadata> partitionMetadataList = new ArrayList<>();
@ -422,7 +437,7 @@ public class MetadataResponse extends AbstractResponse {
partitionMetadata.offlineReplicas()));
}
topicMetadataList.add(new TopicMetadata(topicError, topic, isInternal, partitionMetadataList,
topicMetadataList.add(new TopicMetadata(topicError, topic, topicId, isInternal, partitionMetadataList,
topicMetadata.topicAuthorizedOperations()));
}
return topicMetadataList;

View File

@ -16,6 +16,7 @@
*/
package org.apache.kafka.common.requests;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.UpdateMetadataRequestData;
import org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataBroker;
@ -33,6 +34,7 @@ import org.apache.kafka.common.utils.Utils;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -44,12 +46,15 @@ public class UpdateMetadataRequest extends AbstractControlRequest {
public static class Builder extends AbstractControlRequest.Builder<UpdateMetadataRequest> {
private final List<UpdateMetadataPartitionState> partitionStates;
private final List<UpdateMetadataBroker> liveBrokers;
private final Map<String, Uuid> topicIds;
public Builder(short version, int controllerId, int controllerEpoch, long brokerEpoch,
List<UpdateMetadataPartitionState> partitionStates, List<UpdateMetadataBroker> liveBrokers) {
List<UpdateMetadataPartitionState> partitionStates, List<UpdateMetadataBroker> liveBrokers,
Map<String, Uuid> topicIds) {
super(ApiKeys.UPDATE_METADATA, version, controllerId, controllerEpoch, brokerEpoch);
this.partitionStates = partitionStates;
this.liveBrokers = liveBrokers;
this.topicIds = topicIds;
}
@Override
@ -82,7 +87,7 @@ public class UpdateMetadataRequest extends AbstractControlRequest {
.setLiveBrokers(liveBrokers);
if (version >= 5) {
Map<String, UpdateMetadataTopicState> topicStatesMap = groupByTopic(partitionStates);
Map<String, UpdateMetadataTopicState> topicStatesMap = groupByTopic(topicIds, partitionStates);
data.setTopicStates(new ArrayList<>(topicStatesMap.values()));
} else {
data.setUngroupedPartitionStates(partitionStates);
@ -91,13 +96,17 @@ public class UpdateMetadataRequest extends AbstractControlRequest {
return new UpdateMetadataRequest(data, version);
}
private static Map<String, UpdateMetadataTopicState> groupByTopic(List<UpdateMetadataPartitionState> partitionStates) {
private static Map<String, UpdateMetadataTopicState> groupByTopic(Map<String, Uuid> topicIds, List<UpdateMetadataPartitionState> partitionStates) {
Map<String, UpdateMetadataTopicState> topicStates = new HashMap<>();
for (UpdateMetadataPartitionState partition : partitionStates) {
// We don't null out the topic name in UpdateMetadataPartitionState since it's ignored by the generated
// code if version >= 5
UpdateMetadataTopicState topicState = topicStates.computeIfAbsent(partition.topicName(),
t -> new UpdateMetadataTopicState().setTopicName(partition.topicName()));
t -> new UpdateMetadataTopicState()
.setTopicName(partition.topicName())
.setTopicId(topicIds.getOrDefault(partition.topicName(), Uuid.ZERO_UUID))
);
topicState.partitionStates().add(partition);
}
return topicStates;
@ -196,6 +205,13 @@ public class UpdateMetadataRequest extends AbstractControlRequest {
return data.ungroupedPartitionStates();
}
public List<UpdateMetadataTopicState> topicStates() {
if (version() >= 5) {
return data.topicStates();
}
return Collections.emptyList();
}
public List<UpdateMetadataBroker> liveBrokers() {
return data.liveBrokers();
}

View File

@ -17,7 +17,7 @@
"apiKey": 3,
"type": "request",
"name": "MetadataRequest",
"validVersions": "0-9",
"validVersions": "0-10",
"flexibleVersions": "9+",
"fields": [
// In version 0, an empty array indicates "request metadata for all topics." In version 1 and
@ -31,9 +31,11 @@
// Starting in version 8, authorized operations can be requested for cluster and topic resource.
//
// Version 9 is the first flexible version.
// Version 10 add topicId
{ "name": "Topics", "type": "[]MetadataRequestTopic", "versions": "0+", "nullableVersions": "1+",
"about": "The topics to fetch metadata for.", "fields": [
{ "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
{ "name": "TopicId", "type": "uuid", "versions": "10+", "ignorable": true, "about": "The topic id." },
{ "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName", "nullableVersions": "10+",
"about": "The topic name." }
]},
{ "name": "AllowAutoTopicCreation", "type": "bool", "versions": "4+", "default": "true", "ignorable": false,

View File

@ -36,7 +36,8 @@
// Starting in version 8, brokers can send authorized operations for topic and cluster.
//
// Version 9 is the first flexible version.
"validVersions": "0-9",
// Version 10 add topicId
"validVersions": "0-10",
"flexibleVersions": "9+",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "3+", "ignorable": true,
@ -62,6 +63,7 @@
"about": "The topic error, or 0 if there was no error." },
{ "name": "Name", "type": "string", "versions": "0+", "mapKey": true, "entityType": "topicName",
"about": "The topic name." },
{ "name": "TopicId", "type": "uuid", "versions": "10+", "ignorable": true, "about": "The topic id." },
{ "name": "IsInternal", "type": "bool", "versions": "1+", "default": "false", "ignorable": true,
"about": "True if the topic is internal." },
{ "name": "Partitions", "type": "[]MetadataResponsePartition", "versions": "0+",

View File

@ -26,7 +26,8 @@
// Version 4 adds the offline replica list.
//
// Version 5 adds the broker epoch field and normalizes partitions by topic.
"validVersions": "0-6",
// Version 7 adds topicId
"validVersions": "0-7",
"flexibleVersions": "6+",
"fields": [
{ "name": "ControllerId", "type": "int32", "versions": "0+", "entityType": "brokerId",
@ -41,6 +42,7 @@
"about": "In newer versions of this RPC, each topic that we would like to update.", "fields": [
{ "name": "TopicName", "type": "string", "versions": "5+", "entityType": "topicName",
"about": "The topic name." },
{ "name": "TopicId", "type": "uuid", "versions": "7+", "ignorable": true, "about": "The topic id."},
{ "name": "PartitionStates", "type": "[]UpdateMetadataPartitionState", "versions": "5+",
"about": "The partition that we would like to update." }
]},

View File

@ -18,7 +18,7 @@
"type": "response",
"name": "UpdateMetadataResponse",
// Versions 1, 2, 3, 4, and 5 are the same as version 0
"validVersions": "0-6",
"validVersions": "0-7",
"flexibleVersions": "6+",
"fields": [
{ "name": "ErrorCode", "type": "int16", "versions": "0+",

View File

@ -36,6 +36,7 @@ import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionReplica;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.acl.AccessControlEntryFilter;
import org.apache.kafka.common.acl.AclBinding;
@ -1060,7 +1061,7 @@ public class KafkaAdminClientTest {
singletonList(leader.id()), singletonList(leader.id()), singletonList(leader.id()));
env.kafkaClient().prepareResponse(RequestTestUtils.metadataResponse(initializedCluster.nodes(),
initializedCluster.clusterResource().clusterId(), 1,
singletonList(new MetadataResponse.TopicMetadata(Errors.NONE, topic, false,
singletonList(new MetadataResponse.TopicMetadata(Errors.NONE, topic, Uuid.ZERO_UUID, false,
singletonList(partitionMetadata), MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED))));
DescribeTopicsResult result = env.adminClient().describeTopics(Collections.singleton(topic));

View File

@ -1640,7 +1640,7 @@ public class RequestResponseTest {
.setRack(rack)
);
return new UpdateMetadataRequest.Builder((short) version, 1, 10, 0, partitionStates,
liveBrokers).build();
liveBrokers, Collections.emptyMap()).build();
}
private UpdateMetadataResponse createUpdateMetadataResponse() {

View File

@ -17,6 +17,7 @@
package org.apache.kafka.common.requests;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.ClusterAuthorizationException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.UpdateMetadataRequestData;
@ -33,8 +34,10 @@ import org.junit.Test;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
@ -52,7 +55,7 @@ public class UpdateMetadataRequestTest {
public void testUnsupportedVersion() {
UpdateMetadataRequest.Builder builder = new UpdateMetadataRequest.Builder(
(short) (UPDATE_METADATA.latestVersion() + 1), 0, 0, 0,
Collections.emptyList(), Collections.emptyList());
Collections.emptyList(), Collections.emptyList(), Collections.emptyMap());
assertThrows(UnsupportedVersionException.class, builder::build);
}
@ -60,7 +63,7 @@ public class UpdateMetadataRequestTest {
public void testGetErrorResponse() {
for (short version = UPDATE_METADATA.oldestVersion(); version < UPDATE_METADATA.latestVersion(); version++) {
UpdateMetadataRequest.Builder builder = new UpdateMetadataRequest.Builder(
version, 0, 0, 0, Collections.emptyList(), Collections.emptyList());
version, 0, 0, 0, Collections.emptyList(), Collections.emptyList(), Collections.emptyMap());
UpdateMetadataRequest request = builder.build();
UpdateMetadataResponse response = request.getErrorResponse(0,
new ClusterAuthorizationException("Not authorized"));
@ -76,10 +79,12 @@ public class UpdateMetadataRequestTest {
*/
@Test
public void testVersionLogic() {
String topic0 = "topic0";
String topic1 = "topic1";
for (short version = UPDATE_METADATA.oldestVersion(); version <= UPDATE_METADATA.latestVersion(); version++) {
List<UpdateMetadataPartitionState> partitionStates = asList(
new UpdateMetadataPartitionState()
.setTopicName("topic0")
.setTopicName(topic0)
.setPartitionIndex(0)
.setControllerEpoch(2)
.setLeader(0)
@ -89,7 +94,7 @@ public class UpdateMetadataRequestTest {
.setReplicas(asList(0, 1, 2))
.setOfflineReplicas(asList(2)),
new UpdateMetadataPartitionState()
.setTopicName("topic0")
.setTopicName(topic0)
.setPartitionIndex(1)
.setControllerEpoch(2)
.setLeader(1)
@ -99,7 +104,7 @@ public class UpdateMetadataRequestTest {
.setReplicas(asList(1, 2, 3))
.setOfflineReplicas(emptyList()),
new UpdateMetadataPartitionState()
.setTopicName("topic1")
.setTopicName(topic1)
.setPartitionIndex(0)
.setControllerEpoch(2)
.setLeader(2)
@ -147,8 +152,12 @@ public class UpdateMetadataRequestTest {
))
);
Map<String, Uuid> topicIds = new HashMap<>();
topicIds.put(topic0, Uuid.randomUuid());
topicIds.put(topic1, Uuid.randomUuid());
UpdateMetadataRequest request = new UpdateMetadataRequest.Builder(version, 1, 2, 3,
partitionStates, liveBrokers).build();
partitionStates, liveBrokers, topicIds).build();
assertEquals(new HashSet<>(partitionStates), iterableToSet(request.partitionStates()));
assertEquals(liveBrokers, request.liveBrokers());
@ -191,6 +200,14 @@ public class UpdateMetadataRequestTest {
assertEquals(3, deserializedRequest.brokerEpoch());
else
assertEquals(-1, deserializedRequest.brokerEpoch());
long topicIdCount = deserializedRequest.data().topicStates().stream()
.map(UpdateMetadataRequestData.UpdateMetadataTopicState::topicId)
.filter(topicId -> topicId != Uuid.ZERO_UUID).count();
if (version >= 7)
assertEquals(2, topicIdCount);
else
assertEquals(0, topicIdCount);
}
}
@ -204,7 +221,7 @@ public class UpdateMetadataRequestTest {
.setPartitionIndex(tp.partition()));
}
UpdateMetadataRequest.Builder builder = new UpdateMetadataRequest.Builder((short) 5, 0, 0, 0,
partitionStates, Collections.emptyList());
partitionStates, Collections.emptyList(), Collections.emptyMap());
assertTrue(builder.build((short) 5).sizeInBytes() < builder.build((short) 4).sizeInBytes());
}

View File

@ -32,7 +32,7 @@ import org.apache.kafka.clients.admin.CreatePartitionsOptions
import org.apache.kafka.clients.admin.CreateTopicsOptions
import org.apache.kafka.clients.admin.DeleteTopicsOptions
import org.apache.kafka.clients.admin.{Admin, ConfigEntry, ListTopicsOptions, NewPartitions, NewTopic, PartitionReassignment, Config => JConfig}
import org.apache.kafka.common.{Node, TopicPartition, TopicPartitionInfo}
import org.apache.kafka.common.{Node, TopicPartition, TopicPartitionInfo, Uuid}
import org.apache.kafka.common.config.ConfigResource.Type
import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
import org.apache.kafka.common.errors.{ClusterAuthorizationException, InvalidTopicException, TopicExistsException, UnsupportedVersionException}
@ -105,6 +105,7 @@ object TopicCommand extends Logging {
}
case class TopicDescription(topic: String,
topicId: Uuid,
numPartitions: Int,
replicationFactor: Int,
config: JConfig,
@ -113,6 +114,7 @@ object TopicCommand extends Logging {
def printDescription(): Unit = {
val configsAsString = config.entries.asScala.filter(!_.isDefault).map { ce => s"${ce.name}=${ce.value}" }.mkString(",")
print(s"Topic: $topic")
if(topicId != Uuid.ZERO_UUID) print(s"\tTopicId: $topicId")
print(s"\tPartitionCount: $numPartitions")
print(s"\tReplicationFactor: $replicationFactor")
print(s"\tConfigs: $configsAsString")
@ -326,6 +328,7 @@ object TopicCommand extends Logging {
for (td <- topicDescriptions) {
val topicName = td.name
val topicId = td.topicId()
val config = allConfigs.get(new ConfigResource(Type.TOPIC, topicName)).get()
val sortedPartitions = td.partitions.asScala.sortBy(_.partition)
@ -335,7 +338,7 @@ object TopicCommand extends Logging {
val numPartitions = td.partitions().size
val firstPartition = td.partitions.iterator.next()
val reassignment = reassignments.get(new TopicPartition(td.name, firstPartition.partition))
val topicDesc = TopicDescription(topicName, numPartitions, getReplicationFactor(firstPartition, reassignment), config, markedForDeletion = false)
val topicDesc = TopicDescription(topicName, topicId, numPartitions, getReplicationFactor(firstPartition, reassignment), config, markedForDeletion = false)
topicDesc.printDescription()
}
}
@ -448,23 +451,24 @@ object TopicCommand extends Logging {
val adminZkClient = new AdminZkClient(zkClient)
for (topic <- topics) {
zkClient.getPartitionAssignmentForTopics(immutable.Set(topic)).get(topic) match {
case Some(topicPartitionAssignment) =>
zkClient.getReplicaAssignmentAndTopicIdForTopics(immutable.Set(topic)).headOption match {
case Some(replicaAssignmentAndTopicId) =>
val markedForDeletion = zkClient.isTopicMarkedForDeletion(topic)
if (describeOptions.describeConfigs) {
val configs = adminZkClient.fetchEntityConfig(ConfigType.Topic, topic).asScala
if (!opts.reportOverriddenConfigs || configs.nonEmpty) {
val numPartitions = topicPartitionAssignment.size
val replicationFactor = topicPartitionAssignment.head._2.replicas.size
val numPartitions = replicaAssignmentAndTopicId.assignment.size
val replicationFactor = replicaAssignmentAndTopicId.assignment.head._2.replicas.size
val config = new JConfig(configs.map{ case (k, v) => new ConfigEntry(k, v) }.asJavaCollection)
val topicDesc = TopicDescription(topic, numPartitions, replicationFactor, config, markedForDeletion)
val topicDesc = TopicDescription(topic,
replicaAssignmentAndTopicId.topicId.getOrElse(Uuid.ZERO_UUID), numPartitions, replicationFactor, config, markedForDeletion)
topicDesc.printDescription()
}
}
if (describeOptions.describePartitions) {
for ((partitionId, replicaAssignment) <- topicPartitionAssignment.toSeq.sortBy(_._1)) {
for ((tp, replicaAssignment) <- replicaAssignmentAndTopicId.assignment.toSeq.sortBy(_._1.partition())) {
val assignedReplicas = replicaAssignment.replicas
val tp = new TopicPartition(topic, partitionId)
val (leaderOpt, isr) = zkClient.getTopicPartitionState(tp).map(_.leaderAndIsr) match {
case Some(leaderAndIsr) => (leaderAndIsr.leaderOpt, leaderAndIsr.isr)
case None => (None, Seq.empty[Int])
@ -477,7 +481,7 @@ object TopicCommand extends Logging {
}
}
val info = new TopicPartitionInfo(partitionId, leaderOpt.map(asNode).orNull,
val info = new TopicPartitionInfo(tp.partition(), leaderOpt.map(asNode).orNull,
assignedReplicas.map(asNode).toList.asJava,
isr.map(asNode).toList.asJava)

View File

@ -110,7 +110,9 @@ object ApiVersion {
// Introduced AlterIsr (KIP-497)
KAFKA_2_7_IV2,
// Flexible versioning on ListOffsets, WriteTxnMarkers and OffsetsForLeaderEpoch.
KAFKA_2_8_IV0
KAFKA_2_8_IV0,
// Add topicId to MetadataUpdateRequest
KAFKA_2_8_IV1
)
// Map keys are the union of the short and full versions
@ -437,6 +439,13 @@ case object KAFKA_2_8_IV0 extends DefaultApiVersion {
val id: Int = 31
}
case object KAFKA_2_8_IV1 extends DefaultApiVersion {
val shortVersion: String = "2.8"
val subVersion = "IV1"
val recordVersion = RecordVersion.V2
val id: Int = 32
}
object ApiVersionValidator extends Validator {
override def ensureValid(name: String, value: Any): Unit = {

View File

@ -499,7 +499,8 @@ abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig,
val partitionStates = updateMetadataRequestPartitionInfoMap.values.toBuffer
val updateMetadataRequestVersion: Short =
if (config.interBrokerProtocolVersion >= KAFKA_2_4_IV1) 6
if (config.interBrokerProtocolVersion >= KAFKA_2_8_IV1) 7
else if (config.interBrokerProtocolVersion >= KAFKA_2_4_IV1) 6
else if (config.interBrokerProtocolVersion >= KAFKA_2_2_IV0) 5
else if (config.interBrokerProtocolVersion >= KAFKA_1_0_IV0) 4
else if (config.interBrokerProtocolVersion >= KAFKA_0_10_2_IV0) 3
@ -535,8 +536,12 @@ abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig,
updateMetadataRequestBrokerSet.intersect(controllerContext.liveOrShuttingDownBrokerIds).foreach { broker =>
val brokerEpoch = controllerContext.liveBrokerIdAndEpochs(broker)
val topicIds = partitionStates.map(_.topicName())
.distinct
.filter(controllerContext.topicIds.contains)
.map(topic => (topic, controllerContext.topicIds(topic))).toMap
val updateMetadataRequestBuilder = new UpdateMetadataRequest.Builder(updateMetadataRequestVersion,
controllerId, controllerEpoch, brokerEpoch, partitionStates.asJava, liveBrokers.asJava)
controllerId, controllerEpoch, brokerEpoch, partitionStates.asJava, liveBrokers.asJava, topicIds.asJava)
sendRequest(broker, updateMetadataRequestBuilder, (r: AbstractResponse) => {
val updateMetadataResponse = r.asInstanceOf[UpdateMetadataResponse]
sendEvent(UpdateMetadataResponseReceived(updateMetadataResponse, broker))

View File

@ -21,7 +21,7 @@ import java.util
import java.util.Collections
import java.util.concurrent.locks.ReentrantReadWriteLock
import scala.collection.{mutable, Seq, Set}
import scala.collection.{Seq, Set, mutable}
import scala.jdk.CollectionConverters._
import kafka.cluster.{Broker, EndPoint}
import kafka.api._
@ -31,7 +31,7 @@ import kafka.utils.Logging
import kafka.utils.Implicits._
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataPartitionState
import org.apache.kafka.common.{Cluster, Node, PartitionInfo, TopicPartition}
import org.apache.kafka.common.{Cluster, Node, PartitionInfo, TopicPartition, Uuid}
import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic
import org.apache.kafka.common.message.MetadataResponseData.MetadataResponsePartition
import org.apache.kafka.common.network.ListenerName
@ -51,7 +51,7 @@ class MetadataCache(brokerId: Int) extends Logging {
//the value of this var (into a val) ONCE and retain that read copy for the duration of their operation.
//multiple reads of this value risk getting different snapshots.
@volatile private var metadataSnapshot: MetadataSnapshot = MetadataSnapshot(partitionStates = mutable.AnyRefMap.empty,
controllerId = None, aliveBrokers = mutable.LongMap.empty, aliveNodes = mutable.LongMap.empty)
topicIds = Map.empty, controllerId = None, aliveBrokers = mutable.LongMap.empty, aliveNodes = mutable.LongMap.empty)
this.logIdent = s"[MetadataCache brokerId=$brokerId] "
private val stateChangeLogger = new StateChangeLogger(brokerId, inControllerContext = false, None)
@ -116,7 +116,7 @@ class MetadataCache(brokerId: Int) extends Logging {
.setIsrNodes(filteredIsr)
.setOfflineReplicas(offlineReplicas)
case Some(leader) =>
case Some(_) =>
val error = if (filteredReplicas.size < replicas.size) {
debug(s"Error while fetching metadata for $topicPartition: replica information not available for " +
s"following brokers ${replicas.asScala.filterNot(filteredReplicas.contains).mkString(",")}")
@ -172,6 +172,7 @@ class MetadataCache(brokerId: Int) extends Logging {
new MetadataResponseTopic()
.setErrorCode(Errors.NONE.code)
.setName(topic)
.setTopicId(snapshot.topicIds.getOrElse(topic, Uuid.ZERO_UUID))
.setIsInternal(Topic.isInternal(topic))
.setPartitions(partitionMetadata.toBuffer.asJava)
}
@ -314,9 +315,16 @@ class MetadataCache(brokerId: Int) extends Logging {
error(s"Listeners are not identical across brokers: $aliveNodes")
}
val newTopicIds = updateMetadataRequest.topicStates().asScala
.map(topicState => (topicState.topicName(), topicState.topicId()))
.filter(_._2 != Uuid.ZERO_UUID).toMap
val topicIds = mutable.Map.empty[String, Uuid]
topicIds ++= metadataSnapshot.topicIds
topicIds ++= newTopicIds
val deletedPartitions = new mutable.ArrayBuffer[TopicPartition]
if (!updateMetadataRequest.partitionStates.iterator.hasNext) {
metadataSnapshot = MetadataSnapshot(metadataSnapshot.partitionStates, controllerIdOpt, aliveBrokers, aliveNodes)
metadataSnapshot = MetadataSnapshot(metadataSnapshot.partitionStates, topicIds.toMap, controllerIdOpt, aliveBrokers, aliveNodes)
} else {
//since kafka may do partial metadata updates, we start by copying the previous state
val partitionStates = new mutable.AnyRefMap[String, mutable.LongMap[UpdateMetadataPartitionState]](metadataSnapshot.partitionStates.size)
@ -334,7 +342,7 @@ class MetadataCache(brokerId: Int) extends Logging {
// per-partition logging here can be very expensive due going through all partitions in the cluster
val tp = new TopicPartition(state.topicName, state.partitionIndex)
if (state.leader == LeaderAndIsr.LeaderDuringDelete) {
removePartitionInfo(partitionStates, tp.topic, tp.partition)
removePartitionInfo(partitionStates, topicIds, tp.topic, tp.partition)
if (traceEnabled)
stateChangeLogger.trace(s"Deleted partition $tp from metadata cache in response to UpdateMetadata " +
s"request sent by controller $controllerId epoch $controllerEpoch with correlation id $correlationId")
@ -350,7 +358,7 @@ class MetadataCache(brokerId: Int) extends Logging {
stateChangeLogger.info(s"Add $cachedPartitionsCount partitions and deleted ${deletedPartitions.size} partitions from metadata cache " +
s"in response to UpdateMetadata request sent by controller $controllerId epoch $controllerEpoch with correlation id $correlationId")
metadataSnapshot = MetadataSnapshot(partitionStates, controllerIdOpt, aliveBrokers, aliveNodes)
metadataSnapshot = MetadataSnapshot(partitionStates, topicIds.toMap, controllerIdOpt, aliveBrokers, aliveNodes)
}
deletedPartitions
}
@ -363,15 +371,19 @@ class MetadataCache(brokerId: Int) extends Logging {
def contains(tp: TopicPartition): Boolean = getPartitionInfo(tp.topic, tp.partition).isDefined
private def removePartitionInfo(partitionStates: mutable.AnyRefMap[String, mutable.LongMap[UpdateMetadataPartitionState]],
topic: String, partitionId: Int): Boolean = {
topicIds: mutable.Map[String, Uuid], topic: String, partitionId: Int): Boolean = {
partitionStates.get(topic).exists { infos =>
infos.remove(partitionId)
if (infos.isEmpty) partitionStates.remove(topic)
if (infos.isEmpty) {
partitionStates.remove(topic)
topicIds.remove(topic)
}
true
}
}
case class MetadataSnapshot(partitionStates: mutable.AnyRefMap[String, mutable.LongMap[UpdateMetadataPartitionState]],
topicIds: Map[String, Uuid],
controllerId: Option[Int],
aliveBrokers: mutable.LongMap[Broker],
aliveNodes: mutable.LongMap[collection.Map[ListenerName, Node]])

View File

@ -373,7 +373,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
.setSecurityProtocol(securityProtocol.id)
.setListener(ListenerName.forSecurityProtocol(securityProtocol).value)).asJava)).asJava
val version = ApiKeys.UPDATE_METADATA.latestVersion
new requests.UpdateMetadataRequest.Builder(version, brokerId, Int.MaxValue, Long.MaxValue, partitionStates, brokers).build()
new requests.UpdateMetadataRequest.Builder(version, brokerId, Int.MaxValue, Long.MaxValue, partitionStates,
brokers, Collections.emptyMap()).build()
}
private def createJoinGroupRequest = {

View File

@ -111,13 +111,13 @@ class ApiVersionTest {
assertEquals(KAFKA_2_6_IV0, ApiVersion("2.6"))
assertEquals(KAFKA_2_6_IV0, ApiVersion("2.6-IV0"))
assertEquals(KAFKA_2_7_IV2, ApiVersion("2.7"))
assertEquals(KAFKA_2_7_IV0, ApiVersion("2.7-IV0"))
assertEquals(KAFKA_2_7_IV1, ApiVersion("2.7-IV1"))
assertEquals(KAFKA_2_7_IV2, ApiVersion("2.7-IV2"))
assertEquals(KAFKA_2_8_IV0, ApiVersion("2.8"))
assertEquals(KAFKA_2_8_IV1, ApiVersion("2.8"))
assertEquals(KAFKA_2_8_IV0, ApiVersion("2.8-IV0"))
assertEquals(KAFKA_2_8_IV1, ApiVersion("2.8-IV1"))
}
@Test

View File

@ -18,11 +18,11 @@ package kafka.controller
import java.util.Properties
import kafka.api.{ApiVersion, KAFKA_0_10_0_IV1, KAFKA_0_10_2_IV0, KAFKA_0_9_0, KAFKA_1_0_IV0, KAFKA_2_2_IV0, KAFKA_2_4_IV0, KAFKA_2_4_IV1, KAFKA_2_6_IV0, LeaderAndIsr}
import kafka.api.{ApiVersion, KAFKA_0_10_0_IV1, KAFKA_0_10_2_IV0, KAFKA_0_9_0, KAFKA_1_0_IV0, KAFKA_2_2_IV0, KAFKA_2_4_IV0, KAFKA_2_4_IV1, KAFKA_2_6_IV0, KAFKA_2_8_IV1, LeaderAndIsr}
import kafka.cluster.{Broker, EndPoint}
import kafka.server.KafkaConfig
import kafka.utils.TestUtils
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.{TopicPartition, Uuid}
import org.apache.kafka.common.message.{LeaderAndIsrResponseData, StopReplicaResponseData, UpdateMetadataResponseData}
import org.apache.kafka.common.message.LeaderAndIsrResponseData.LeaderAndIsrPartitionError
import org.apache.kafka.common.message.StopReplicaRequestData.StopReplicaPartitionState
@ -49,7 +49,7 @@ class ControllerChannelManagerTest {
@Test
def testLeaderAndIsrRequestSent(): Unit = {
val context = initContext(Seq(1, 2, 3), Set("foo", "bar"), 2, 3)
val context = initContext(Seq(1, 2, 3), Set("foo", "bar"), 2, 3, Map.empty)
val batch = new MockControllerBrokerRequestBatch(context)
val partitions = Map(
@ -92,7 +92,7 @@ class ControllerChannelManagerTest {
@Test
def testLeaderAndIsrRequestIsNew(): Unit = {
val context = initContext(Seq(1, 2, 3), Set("foo", "bar"), 2, 3)
val context = initContext(Seq(1, 2, 3), Set("foo", "bar"), 2, 3, Map.empty)
val batch = new MockControllerBrokerRequestBatch(context)
val partition = new TopicPartition("foo", 0)
@ -120,7 +120,7 @@ class ControllerChannelManagerTest {
@Test
def testLeaderAndIsrRequestSentToLiveOrShuttingDownBrokers(): Unit = {
val context = initContext(Seq(1, 2, 3), Set("foo", "bar"), 2, 3)
val context = initContext(Seq(1, 2, 3), Set("foo", "bar"), 2, 3, Map.empty)
val batch = new MockControllerBrokerRequestBatch(context)
// 2 is shutting down, 3 is dead
@ -169,7 +169,7 @@ class ControllerChannelManagerTest {
private def testLeaderAndIsrRequestFollowsInterBrokerProtocolVersion(interBrokerProtocolVersion: ApiVersion,
expectedLeaderAndIsrVersion: Short): Unit = {
val context = initContext(Seq(1, 2, 3), Set("foo", "bar"), 2, 3)
val context = initContext(Seq(1, 2, 3), Set("foo", "bar"), 2, 3, Map.empty)
val config = createConfig(interBrokerProtocolVersion)
val batch = new MockControllerBrokerRequestBatch(context, config)
@ -191,7 +191,9 @@ class ControllerChannelManagerTest {
@Test
def testUpdateMetadataRequestSent(): Unit = {
val context = initContext(Seq(1, 2, 3), Set("foo", "bar"), 2, 3)
val topicIds = Map("foo" -> Uuid.randomUuid(), "bar" -> Uuid.randomUuid())
val context = initContext(Seq(1, 2, 3), Set("foo", "bar"), 2, 3, topicIds)
val batch = new MockControllerBrokerRequestBatch(context)
val partitions = Map(
@ -219,6 +221,12 @@ class ControllerChannelManagerTest {
assertEquals(partitions.map { case (k, v) => (k, v.isr) },
partitionStates.map(ps => (new TopicPartition(ps.topicName, ps.partitionIndex), ps.isr.asScala)).toMap)
val topicStates = updateMetadataRequest.topicStates()
assertEquals(2, topicStates.size)
for (topicState <- topicStates.asScala) {
assertEquals(topicState.topicId(), topicIds(topicState.topicName()))
}
assertEquals(controllerId, updateMetadataRequest.controllerId)
assertEquals(controllerEpoch, updateMetadataRequest.controllerEpoch)
assertEquals(3, updateMetadataRequest.liveBrokers.size)
@ -234,7 +242,7 @@ class ControllerChannelManagerTest {
@Test
def testUpdateMetadataDoesNotIncludePartitionsWithoutLeaderAndIsr(): Unit = {
val context = initContext(Seq(1, 2, 3), Set("foo", "bar"), 2, 3)
val context = initContext(Seq(1, 2, 3), Set("foo", "bar"), 2, 3, Map.empty)
val batch = new MockControllerBrokerRequestBatch(context)
val partitions = Set(
@ -262,7 +270,7 @@ class ControllerChannelManagerTest {
@Test
def testUpdateMetadataRequestDuringTopicDeletion(): Unit = {
val context = initContext(Seq(1, 2, 3), Set("foo", "bar"), 2, 3)
val context = initContext(Seq(1, 2, 3), Set("foo", "bar"), 2, 3, Map.empty)
val batch = new MockControllerBrokerRequestBatch(context)
val partitions = Map(
@ -304,7 +312,7 @@ class ControllerChannelManagerTest {
@Test
def testUpdateMetadataIncludesLiveOrShuttingDownBrokers(): Unit = {
val context = initContext(Seq(1, 2, 3), Set("foo", "bar"), 2, 3)
val context = initContext(Seq(1, 2, 3), Set("foo", "bar"), 2, 3, Map.empty)
val batch = new MockControllerBrokerRequestBatch(context)
// 2 is shutting down, 3 is dead
@ -334,7 +342,8 @@ class ControllerChannelManagerTest {
for (apiVersion <- ApiVersion.allVersions) {
val updateMetadataRequestVersion: Short =
if (apiVersion >= KAFKA_2_4_IV1) 6
if (apiVersion >= KAFKA_2_8_IV1) 7
else if (apiVersion >= KAFKA_2_4_IV1) 6
else if (apiVersion >= KAFKA_2_2_IV0) 5
else if (apiVersion >= KAFKA_1_0_IV0) 4
else if (apiVersion >= KAFKA_0_10_2_IV0) 3
@ -348,7 +357,7 @@ class ControllerChannelManagerTest {
private def testUpdateMetadataFollowsInterBrokerProtocolVersion(interBrokerProtocolVersion: ApiVersion,
expectedUpdateMetadataVersion: Short): Unit = {
val context = initContext(Seq(1, 2, 3), Set("foo", "bar"), 2, 3)
val context = initContext(Seq(1, 2, 3), Set("foo", "bar"), 2, 3, Map.empty)
val config = createConfig(interBrokerProtocolVersion)
val batch = new MockControllerBrokerRequestBatch(context, config)
@ -369,7 +378,7 @@ class ControllerChannelManagerTest {
@Test
def testStopReplicaRequestSent(): Unit = {
val context = initContext(Seq(1, 2, 3), Set("foo", "bar"), 2, 3)
val context = initContext(Seq(1, 2, 3), Set("foo", "bar"), 2, 3, Map.empty)
val batch = new MockControllerBrokerRequestBatch(context)
val partitions = Map(
@ -404,7 +413,7 @@ class ControllerChannelManagerTest {
@Test
def testStopReplicaRequestWithAlreadyDefinedDeletedPartition(): Unit = {
val context = initContext(Seq(1, 2, 3), Set("foo", "bar"), 2, 3)
val context = initContext(Seq(1, 2, 3), Set("foo", "bar"), 2, 3, Map.empty)
val batch = new MockControllerBrokerRequestBatch(context)
val partition = new TopicPartition("foo", 0)
@ -432,7 +441,7 @@ class ControllerChannelManagerTest {
}
private def testStopReplicaRequestsWhileTopicQueuedForDeletion(interBrokerProtocolVersion: ApiVersion): Unit = {
val context = initContext(Seq(1, 2, 3), Set("foo", "bar"), 2, 3)
val context = initContext(Seq(1, 2, 3), Set("foo", "bar"), 2, 3, Map.empty)
val config = createConfig(interBrokerProtocolVersion)
val batch = new MockControllerBrokerRequestBatch(context, config)
@ -479,7 +488,7 @@ class ControllerChannelManagerTest {
}
private def testStopReplicaRequestsWhileTopicDeletionStarted(interBrokerProtocolVersion: ApiVersion): Unit = {
val context = initContext(Seq(1, 2, 3), Set("foo", "bar"), 2, 3)
val context = initContext(Seq(1, 2, 3), Set("foo", "bar"), 2, 3, Map.empty)
val config = createConfig(interBrokerProtocolVersion)
val batch = new MockControllerBrokerRequestBatch(context, config)
@ -534,7 +543,7 @@ class ControllerChannelManagerTest {
}
private def testStopReplicaRequestWithoutDeletePartitionWhileTopicDeletionStarted(interBrokerProtocolVersion: ApiVersion): Unit = {
val context = initContext(Seq(1, 2, 3), Set("foo", "bar"), 2, 3)
val context = initContext(Seq(1, 2, 3), Set("foo", "bar"), 2, 3, Map.empty)
val config = createConfig(interBrokerProtocolVersion)
val batch = new MockControllerBrokerRequestBatch(context, config)
@ -592,7 +601,7 @@ class ControllerChannelManagerTest {
private def testMixedDeleteAndNotDeleteStopReplicaRequests(interBrokerProtocolVersion: ApiVersion,
expectedStopReplicaRequestVersion: Short): Unit = {
val context = initContext(Seq(1, 2, 3), Set("foo", "bar"), 2, 3)
val context = initContext(Seq(1, 2, 3), Set("foo", "bar"), 2, 3, Map.empty)
val config = createConfig(interBrokerProtocolVersion)
val batch = new MockControllerBrokerRequestBatch(context, config)
@ -651,7 +660,7 @@ class ControllerChannelManagerTest {
@Test
def testStopReplicaGroupsByBroker(): Unit = {
val context = initContext(Seq(1, 2, 3), Set("foo", "bar"), 2, 3)
val context = initContext(Seq(1, 2, 3), Set("foo", "bar"), 2, 3, Map.empty)
val batch = new MockControllerBrokerRequestBatch(context)
val partitions = Map(
@ -689,7 +698,7 @@ class ControllerChannelManagerTest {
@Test
def testStopReplicaSentOnlyToLiveAndShuttingDownBrokers(): Unit = {
val context = initContext(Seq(1, 2, 3), Set("foo", "bar"), 2, 3)
val context = initContext(Seq(1, 2, 3), Set("foo", "bar"), 2, 3, Map.empty)
val batch = new MockControllerBrokerRequestBatch(context)
// 2 is shutting down, 3 is dead
@ -741,7 +750,7 @@ class ControllerChannelManagerTest {
private def testStopReplicaFollowsInterBrokerProtocolVersion(interBrokerProtocolVersion: ApiVersion,
expectedStopReplicaRequestVersion: Short): Unit = {
val context = initContext(Seq(1, 2, 3), Set("foo"), 2, 3)
val context = initContext(Seq(1, 2, 3), Set("foo"), 2, 3, Map.empty)
val config = createConfig(interBrokerProtocolVersion)
val batch = new MockControllerBrokerRequestBatch(context, config)
@ -852,7 +861,8 @@ class ControllerChannelManagerTest {
private def initContext(brokers: Seq[Int],
topics: Set[String],
numPartitions: Int,
replicationFactor: Int): ControllerContext = {
replicationFactor: Int,
topicIds: Map[String, Uuid]): ControllerContext = {
val context = new ControllerContext
val brokerEpochs = brokers.map { brokerId =>
val endpoint = new EndPoint("localhost", 9900 + brokerId, new ListenerName("PLAINTEXT"),
@ -873,6 +883,9 @@ class ControllerChannelManagerTest {
context.updatePartitionFullReplicaAssignment(partition, ReplicaAssignment(replicas))
leaderIndex += 1
}
context.allTopics ++= topics
topicIds.foreach { case (name, id) => context.addTopicId(name, id) }
context
}

View File

@ -17,6 +17,8 @@
package kafka.server
import java.util.Collections
import kafka.api.LeaderAndIsr
import kafka.cluster.Broker
import kafka.controller.{ControllerChannelManager, ControllerContext, StateChangeLogger}
@ -195,7 +197,7 @@ class BrokerEpochIntegrationTest extends ZooKeeperTestHarness {
val requestBuilder = new UpdateMetadataRequest.Builder(
ApiKeys.UPDATE_METADATA.latestVersion, controllerId, controllerEpoch,
epochInRequest,
partitionStates.asJava, liveBrokers.asJava)
partitionStates.asJava, liveBrokers.asJava, Collections.emptyMap())
if (epochInRequestDiffFromCurrentEpoch < 0) {
// stale broker epoch in UPDATE_METADATA

View File

@ -2848,7 +2848,7 @@ class KafkaApisTest {
.setListener(plaintextListener.value)).asJava)
)
val updateMetadataRequest = new UpdateMetadataRequest.Builder(ApiKeys.UPDATE_METADATA.latestVersion, 0,
0, 0, Seq.empty[UpdateMetadataPartitionState].asJava, brokers.asJava).build()
0, 0, Seq.empty[UpdateMetadataPartitionState].asJava, brokers.asJava, Collections.emptyMap()).build()
metadataCache.updateMetadata(correlationId = 0, updateMetadataRequest)
(plaintextListener, anotherListener)
}
@ -3017,7 +3017,7 @@ class KafkaApisTest {
.setListener(plaintextListener.value)).asJava)
val partitionStates = (0 until numPartitions).map(createPartitionState)
new UpdateMetadataRequest.Builder(ApiKeys.UPDATE_METADATA.latestVersion, 0,
0, brokerEpoch, partitionStates.asJava, Seq(broker).asJava).build()
0, brokerEpoch, partitionStates.asJava, Seq(broker).asJava, Collections.emptyMap()).build()
}
private def setupBasicMetadataCache(topic: String, numPartitions: Int): Unit = {

View File

@ -17,8 +17,10 @@
package kafka.server
import java.util
import java.util.Collections
import util.Arrays.asList
import org.apache.kafka.common.Uuid
import org.apache.kafka.common.message.UpdateMetadataRequestData.{UpdateMetadataBroker, UpdateMetadataEndpoint, UpdateMetadataPartitionState}
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
@ -105,9 +107,13 @@ class MetadataCacheTest {
.setZkVersion(zkVersion)
.setReplicas(asList(2, 1, 3)))
val topicIds = new util.HashMap[String, Uuid]()
topicIds.put(topic0, Uuid.randomUuid())
topicIds.put(topic1, Uuid.randomUuid())
val version = ApiKeys.UPDATE_METADATA.latestVersion
val updateMetadataRequest = new UpdateMetadataRequest.Builder(version, controllerId, controllerEpoch, brokerEpoch,
partitionStates.asJava, brokers.asJava).build()
partitionStates.asJava, brokers.asJava, topicIds).build()
cache.updateMetadata(15, updateMetadataRequest)
for (securityProtocol <- Seq(SecurityProtocol.PLAINTEXT, SecurityProtocol.SSL)) {
@ -120,6 +126,7 @@ class MetadataCacheTest {
val topicMetadata = topicMetadatas.head
assertEquals(Errors.NONE.code, topicMetadata.errorCode)
assertEquals(topic, topicMetadata.name)
assertEquals(topicIds.get(topic), topicMetadata.topicId())
val topicPartitionStates = partitionStates.filter { ps => ps.topicName == topic }
val partitionMetadatas = topicMetadata.partitions.asScala.sortBy(_.partitionIndex)
@ -227,7 +234,7 @@ class MetadataCacheTest {
val version = ApiKeys.UPDATE_METADATA.latestVersion
val updateMetadataRequest = new UpdateMetadataRequest.Builder(version, controllerId, controllerEpoch, brokerEpoch,
partitionStates.asJava, brokers.asJava).build()
partitionStates.asJava, brokers.asJava, Collections.emptyMap()).build()
cache.updateMetadata(15, updateMetadataRequest)
val topicMetadatas = cache.getTopicMetadata(Set(topic), listenerName, errorUnavailableListeners = errorUnavailableListeners)
@ -284,7 +291,7 @@ class MetadataCacheTest {
val version = ApiKeys.UPDATE_METADATA.latestVersion
val updateMetadataRequest = new UpdateMetadataRequest.Builder(version, controllerId, controllerEpoch, brokerEpoch,
partitionStates.asJava, brokers.asJava).build()
partitionStates.asJava, brokers.asJava, Collections.emptyMap()).build()
cache.updateMetadata(15, updateMetadataRequest)
// Validate errorUnavailableEndpoints = false
@ -358,7 +365,7 @@ class MetadataCacheTest {
val version = ApiKeys.UPDATE_METADATA.latestVersion
val updateMetadataRequest = new UpdateMetadataRequest.Builder(version, controllerId, controllerEpoch, brokerEpoch,
partitionStates.asJava, brokers.asJava).build()
partitionStates.asJava, brokers.asJava, Collections.emptyMap()).build()
cache.updateMetadata(15, updateMetadataRequest)
// Validate errorUnavailableEndpoints = false
@ -423,7 +430,7 @@ class MetadataCacheTest {
.setReplicas(replicas))
val version = ApiKeys.UPDATE_METADATA.latestVersion
val updateMetadataRequest = new UpdateMetadataRequest.Builder(version, 2, controllerEpoch, brokerEpoch, partitionStates.asJava,
brokers.asJava).build()
brokers.asJava, Collections.emptyMap()).build()
cache.updateMetadata(15, updateMetadataRequest)
val topicMetadata = cache.getTopicMetadata(Set(topic), ListenerName.forSecurityProtocol(SecurityProtocol.SSL))
@ -465,7 +472,7 @@ class MetadataCacheTest {
.setReplicas(replicas))
val version = ApiKeys.UPDATE_METADATA.latestVersion
val updateMetadataRequest = new UpdateMetadataRequest.Builder(version, 2, controllerEpoch, brokerEpoch, partitionStates.asJava,
brokers.asJava).build()
brokers.asJava, Collections.emptyMap()).build()
cache.updateMetadata(15, updateMetadataRequest)
}

View File

@ -21,6 +21,7 @@ import java.util.{Optional, Properties}
import kafka.network.SocketServer
import kafka.utils.TestUtils
import org.apache.kafka.common.Uuid
import org.apache.kafka.common.errors.UnsupportedVersionException
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.message.MetadataRequestData
@ -223,6 +224,32 @@ class MetadataRequestTest extends BaseRequestTest {
assertEquals("V1 Response should have 2 (all) topics", 2, metadataResponseV1.topicMetadata.size())
}
@Test
def testTopicIdsInResponse(): Unit = {
val replicaAssignment = Map(0 -> Seq(1, 2, 0), 1 -> Seq(2, 0, 1))
val topic1 = "topic1"
val topic2 = "topic2"
createTopic(topic1, replicaAssignment)
createTopic(topic2, replicaAssignment)
// if version < 9, return ZERO_UUID in MetadataResponse
val resp1 = sendMetadataRequest(new MetadataRequest.Builder(Seq(topic1, topic2).asJava, true, 0, 9).build(), Some(controllerSocketServer))
assertEquals(2, resp1.topicMetadata.size)
resp1.topicMetadata.forEach { topicMetadata =>
assertEquals(Errors.NONE, topicMetadata.error)
assertEquals(Uuid.ZERO_UUID, topicMetadata.topicId())
}
// from version 10, UUID will be included in MetadataResponse
val resp2 = sendMetadataRequest(new MetadataRequest.Builder(Seq(topic1, topic2).asJava, true, 10, 10).build(), Some(notControllerSocketServer))
assertEquals(2, resp2.topicMetadata.size)
resp2.topicMetadata.forEach { topicMetadata =>
assertEquals(Errors.NONE, topicMetadata.error)
assertNotEquals(Uuid.ZERO_UUID, topicMetadata.topicId())
assertNotNull(topicMetadata.topicId())
}
}
/**
* Preferred replica should be the first item in the replicas list
*/

View File

@ -286,7 +286,8 @@ class RequestQuotaTest extends BaseRequestTest {
.setPort(0)
.setSecurityProtocol(securityProtocol.id)
.setListener(ListenerName.forSecurityProtocol(securityProtocol).value)).asJava)).asJava
new UpdateMetadataRequest.Builder(ApiKeys.UPDATE_METADATA.latestVersion, brokerId, Int.MaxValue, Long.MaxValue, partitionState, brokers)
new UpdateMetadataRequest.Builder(ApiKeys.UPDATE_METADATA.latestVersion, brokerId, Int.MaxValue, Long.MaxValue,
partitionState, brokers, Collections.emptyMap())
case ApiKeys.CONTROLLED_SHUTDOWN =>
new ControlledShutdownRequest.Builder(

View File

@ -152,7 +152,7 @@ public class MetadataRequestBenchmark {
UpdateMetadataRequest updateMetadataRequest = new UpdateMetadataRequest.Builder(
ApiKeys.UPDATE_METADATA.latestVersion(),
1, 1, 1,
partitionStates, liveBrokers).build();
partitionStates, liveBrokers, Collections.emptyMap()).build();
metadataCache.updateMetadata(100, updateMetadataRequest);
}