KAFKA-10545: Create topic IDs and propagate to brokers (#9626)

This change propagates topic ids to brokers in LeaderAndIsr Request. It also removes the topic name from the LeaderAndIsr Response, reorganizes the response to be sorted by topic, and includes the topic ID.

In addition, the topic ID is persisted to each replica in Log as well as in a file on disk. This file is read on startup and if the topic ID exists, it will be reloaded.

Reviewers: David Jacot <djacot@confluent.io>, dengziming <dengziming1993@gmail.com>, Nikhil Bhatia <rite2nikhil@gmail.com>, Rajini Sivaram <rajinisivaram@googlemail.com>
This commit is contained in:
Justine Olshan 2020-12-18 17:19:50 -05:00 committed by GitHub
parent 5c921afa4a
commit 1dd1e7f945
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 761 additions and 125 deletions

View File

@ -100,7 +100,7 @@
files="RequestResponseTest.java|FetcherTest.java|KafkaAdminClientTest.java"/>
<suppress checks="NPathComplexity"
files="MemoryRecordsTest|MetricsTest|TestSslUtils|AclAuthorizerBenchmark"/>
files="MemoryRecordsTest|MetricsTest|RequestResponseTest|TestSslUtils|AclAuthorizerBenchmark"/>
<suppress checks="(WhitespaceAround|LocalVariableName|ImportControl|AvoidStarImport)"
files="Murmur3Test.java"/>

View File

@ -17,11 +17,13 @@
package org.apache.kafka.common.requests;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.LeaderAndIsrRequestData;
import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrLiveLeader;
import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrTopicState;
import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState;
import org.apache.kafka.common.message.LeaderAndIsrResponseData;
import org.apache.kafka.common.message.LeaderAndIsrResponseData.LeaderAndIsrTopicError;
import org.apache.kafka.common.message.LeaderAndIsrResponseData.LeaderAndIsrPartitionError;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
@ -43,12 +45,15 @@ public class LeaderAndIsrRequest extends AbstractControlRequest {
public static class Builder extends AbstractControlRequest.Builder<LeaderAndIsrRequest> {
private final List<LeaderAndIsrPartitionState> partitionStates;
private final Map<String, Uuid> topicIds;
private final Collection<Node> liveLeaders;
public Builder(short version, int controllerId, int controllerEpoch, long brokerEpoch,
List<LeaderAndIsrPartitionState> partitionStates, Collection<Node> liveLeaders) {
List<LeaderAndIsrPartitionState> partitionStates, Map<String, Uuid> topicIds,
Collection<Node> liveLeaders) {
super(ApiKeys.LEADER_AND_ISR, version, controllerId, controllerEpoch, brokerEpoch);
this.partitionStates = partitionStates;
this.topicIds = topicIds;
this.liveLeaders = liveLeaders;
}
@ -67,7 +72,7 @@ public class LeaderAndIsrRequest extends AbstractControlRequest {
.setLiveLeaders(leaders);
if (version >= 2) {
Map<String, LeaderAndIsrTopicState> topicStatesMap = groupByTopic(partitionStates);
Map<String, LeaderAndIsrTopicState> topicStatesMap = groupByTopic(partitionStates, topicIds);
data.setTopicStates(new ArrayList<>(topicStatesMap.values()));
} else {
data.setUngroupedPartitionStates(partitionStates);
@ -76,13 +81,14 @@ public class LeaderAndIsrRequest extends AbstractControlRequest {
return new LeaderAndIsrRequest(data, version);
}
private static Map<String, LeaderAndIsrTopicState> groupByTopic(List<LeaderAndIsrPartitionState> partitionStates) {
private static Map<String, LeaderAndIsrTopicState> groupByTopic(List<LeaderAndIsrPartitionState> partitionStates, Map<String, Uuid> topicIds) {
Map<String, LeaderAndIsrTopicState> topicStates = new HashMap<>();
// We don't null out the topic name in LeaderAndIsrRequestPartition since it's ignored by
// the generated code if version >= 2
for (LeaderAndIsrPartitionState partition : partitionStates) {
LeaderAndIsrTopicState topicState = topicStates.computeIfAbsent(partition.topicName(),
t -> new LeaderAndIsrTopicState().setTopicName(partition.topicName()));
LeaderAndIsrTopicState topicState = topicStates.computeIfAbsent(partition.topicName(), t -> new LeaderAndIsrTopicState()
.setTopicName(partition.topicName())
.setTopicId(topicIds.getOrDefault(partition.topicName(), Uuid.ZERO_UUID)));
topicState.partitionStates().add(partition);
}
return topicStates;
@ -96,6 +102,7 @@ public class LeaderAndIsrRequest extends AbstractControlRequest {
.append(", controllerEpoch=").append(controllerEpoch)
.append(", brokerEpoch=").append(brokerEpoch)
.append(", partitionStates=").append(partitionStates)
.append(", topicIds=").append(topicIds)
.append(", liveLeaders=(").append(Utils.join(liveLeaders, ", ")).append(")")
.append(")");
return bld.toString();
@ -129,15 +136,34 @@ public class LeaderAndIsrRequest extends AbstractControlRequest {
Errors error = Errors.forException(e);
responseData.setErrorCode(error.code());
List<LeaderAndIsrPartitionError> partitions = new ArrayList<>();
for (LeaderAndIsrPartitionState partition : partitionStates()) {
partitions.add(new LeaderAndIsrPartitionError()
.setTopicName(partition.topicName())
.setPartitionIndex(partition.partitionIndex())
.setErrorCode(error.code()));
if (version() < 5) {
List<LeaderAndIsrPartitionError> partitions = new ArrayList<>();
for (LeaderAndIsrPartitionState partition : partitionStates()) {
partitions.add(new LeaderAndIsrPartitionError()
.setTopicName(partition.topicName())
.setPartitionIndex(partition.partitionIndex())
.setErrorCode(error.code()));
}
responseData.setPartitionErrors(partitions);
return new LeaderAndIsrResponse(responseData, version());
}
responseData.setPartitionErrors(partitions);
return new LeaderAndIsrResponse(responseData);
List<LeaderAndIsrTopicError> topics = new ArrayList<>(data.topicStates().size());
Map<String, Uuid> topicIds = topicIds();
for (LeaderAndIsrTopicState topicState : data.topicStates()) {
LeaderAndIsrTopicError topicError = new LeaderAndIsrTopicError();
topicError.setTopicId(topicIds.get(topicState.topicName()));
List<LeaderAndIsrPartitionError> partitions = new ArrayList<>(topicState.partitionStates().size());
for (LeaderAndIsrPartitionState partition : topicState.partitionStates()) {
partitions.add(new LeaderAndIsrPartitionError()
.setPartitionIndex(partition.partitionIndex())
.setErrorCode(error.code()));
}
topicError.setPartitionErrors(partitions);
topics.add(topicError);
}
responseData.setTopics(topics);
return new LeaderAndIsrResponse(responseData, version());
}
@Override
@ -162,6 +188,11 @@ public class LeaderAndIsrRequest extends AbstractControlRequest {
return data.ungroupedPartitionStates();
}
public Map<String, Uuid> topicIds() {
return data.topicStates().stream()
.collect(Collectors.toMap(LeaderAndIsrTopicState::topicName, LeaderAndIsrTopicState::topicId));
}
public List<LeaderAndIsrLiveLeader> liveLeaders() {
return Collections.unmodifiableList(data.liveLeaders());
}

View File

@ -16,15 +16,20 @@
*/
package org.apache.kafka.common.requests;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.LeaderAndIsrResponseData;
import org.apache.kafka.common.message.LeaderAndIsrResponseData.LeaderAndIsrTopicError;
import org.apache.kafka.common.message.LeaderAndIsrResponseData.LeaderAndIsrPartitionError;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.utils.FlattenedIterator;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;
import java.util.HashMap;
import java.util.Map;
public class LeaderAndIsrResponse extends AbstractResponse {
@ -36,14 +41,24 @@ public class LeaderAndIsrResponse extends AbstractResponse {
* STALE_BROKER_EPOCH (77)
*/
private final LeaderAndIsrResponseData data;
private short version;
public LeaderAndIsrResponse(LeaderAndIsrResponseData data) {
public LeaderAndIsrResponse(LeaderAndIsrResponseData data, short version) {
super(ApiKeys.LEADER_AND_ISR);
this.data = data;
this.version = version;
}
public List<LeaderAndIsrPartitionError> partitions() {
return data.partitionErrors();
public List<LeaderAndIsrTopicError> topics() {
return this.data.topics();
}
public Iterable<LeaderAndIsrPartitionError> partitions() {
if (version < 5) {
return data.partitionErrors();
}
return () -> new FlattenedIterator<>(data.topics().iterator(),
topic -> topic.partitionErrors().iterator());
}
public Errors error() {
@ -53,22 +68,49 @@ public class LeaderAndIsrResponse extends AbstractResponse {
@Override
public Map<Errors, Integer> errorCounts() {
Errors error = error();
if (error != Errors.NONE)
if (error != Errors.NONE) {
// Minor optimization since the top-level error applies to all partitions
return Collections.singletonMap(error, data.partitionErrors().size() + 1);
Map<Errors, Integer> errors = errorCounts(data.partitionErrors().stream().map(l -> Errors.forCode(l.errorCode())));
// Top level error
if (version < 5)
return Collections.singletonMap(error, data.partitionErrors().size() + 1);
return Collections.singletonMap(error,
data.topics().stream().mapToInt(t -> t.partitionErrors().size()).sum() + 1);
}
Map<Errors, Integer> errors;
if (version < 5)
errors = errorCounts(data.partitionErrors().stream().map(l -> Errors.forCode(l.errorCode())));
else
errors = errorCounts(data.topics().stream().flatMap(t -> t.partitionErrors().stream()).map(l ->
Errors.forCode(l.errorCode())));
updateErrorCounts(errors, Errors.NONE);
return errors;
}
public Map<TopicPartition, Errors> partitionErrors(Map<Uuid, String> topicNames) {
Map<TopicPartition, Errors> errors = new HashMap<>();
if (version < 5) {
data.partitionErrors().forEach(partition ->
errors.put(new TopicPartition(partition.topicName(), partition.partitionIndex()),
Errors.forCode(partition.errorCode())));
} else {
for (LeaderAndIsrTopicError topic : data.topics()) {
String topicName = topicNames.get(topic.topicId());
if (topicName != null) {
topic.partitionErrors().forEach(partition ->
errors.put(new TopicPartition(topicName, partition.partitionIndex()),
Errors.forCode(partition.errorCode())));
}
}
}
return errors;
}
@Override
public int throttleTimeMs() {
return DEFAULT_THROTTLE_TIME;
}
public static LeaderAndIsrResponse parse(ByteBuffer buffer, short version) {
return new LeaderAndIsrResponse(new LeaderAndIsrResponseData(new ByteBufferAccessor(buffer), version));
return new LeaderAndIsrResponse(new LeaderAndIsrResponseData(new ByteBufferAccessor(buffer), version), version);
}
@Override

View File

@ -21,8 +21,12 @@
//
// Version 2 adds broker epoch and reorganizes the partitions by topic.
//
// Version 3 adds AddingReplicas and RemovingReplicas
"validVersions": "0-4",
// Version 3 adds AddingReplicas and RemovingReplicas.
//
// Version 4 is the first flexible version.
//
// Version 5 adds Topic ID and Type to the TopicStates, as described in KIP-516.
"validVersions": "0-5",
"flexibleVersions": "4+",
"fields": [
{ "name": "ControllerId", "type": "int32", "versions": "0+", "entityType": "brokerId",
@ -31,6 +35,8 @@
"about": "The current controller epoch." },
{ "name": "BrokerEpoch", "type": "int64", "versions": "2+", "ignorable": true, "default": "-1",
"about": "The current broker epoch." },
{ "name": "Type", "type": "int8", "versions": "5+",
"about": "The type that indicates whether all topics are included in the request"},
{ "name": "UngroupedPartitionStates", "type": "[]LeaderAndIsrPartitionState", "versions": "0-1",
"about": "The state of each partition, in a v0 or v1 message." },
// In v0 or v1 requests, each partition is listed alongside its topic name.
@ -40,6 +46,8 @@
"about": "Each topic.", "fields": [
{ "name": "TopicName", "type": "string", "versions": "2+", "entityType": "topicName",
"about": "The topic name." },
{ "name": "TopicId", "type": "uuid", "versions": "5+", "ignorable": true,
"about": "The unique topic ID." },
{ "name": "PartitionStates", "type": "[]LeaderAndIsrPartitionState", "versions": "2+",
"about": "The state of each partition" }
]},

View File

@ -22,15 +22,29 @@
// Version 2 is the same as version 1.
//
// Version 3 is the same as version 2.
"validVersions": "0-4",
//
// Version 4 is the first flexible version.
//
// Version 5 removes TopicName and replaces it with TopicId and reorganizes
// the partitions by topic, as described by KIP-516.
"validVersions": "0-5",
"flexibleVersions": "4+",
"fields": [
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The error code, or 0 if there was no error." },
{ "name": "PartitionErrors", "type": "[]LeaderAndIsrPartitionError", "versions": "0+",
"about": "Each partition.", "fields": [
{ "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
"about": "The topic name." },
{ "name": "PartitionErrors", "type": "[]LeaderAndIsrPartitionError", "versions": "0-4",
"about": "Each partition in v0 to v4 message."},
{ "name": "Topics", "type": "[]LeaderAndIsrTopicError", "versions": "5+",
"about": "Each topic", "fields": [
{ "name": "TopicId", "type": "uuid", "versions": "5+", "about": "The unique topic ID" },
{ "name": "PartitionErrors", "type": "[]LeaderAndIsrPartitionError", "versions": "5+",
"about": "Each partition."}
]}
],
"commonStructs": [
{ "name": "LeaderAndIsrPartitionError", "versions": "0+", "fields": [
{ "name": "TopicName", "type": "string", "versions": "0-4", "entityType": "topicName", "ignorable": true,
"about": "The topic name."},
{ "name": "PartitionIndex", "type": "int32", "versions": "0+",
"about": "The partition index." },
{ "name": "ErrorCode", "type": "int16", "versions": "0+",

View File

@ -18,6 +18,7 @@ package org.apache.kafka.common.requests;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.ClusterAuthorizationException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.LeaderAndIsrRequestData;
@ -31,8 +32,10 @@ import org.junit.Test;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
@ -50,19 +53,25 @@ public class LeaderAndIsrRequestTest {
public void testUnsupportedVersion() {
LeaderAndIsrRequest.Builder builder = new LeaderAndIsrRequest.Builder(
(short) (LEADER_AND_ISR.latestVersion() + 1), 0, 0, 0,
Collections.emptyList(), Collections.emptySet());
Collections.emptyList(), Collections.emptyMap(), Collections.emptySet());
assertThrows(UnsupportedVersionException.class, builder::build);
}
@Test
public void testGetErrorResponse() {
Uuid id = Uuid.randomUuid();
for (short version = LEADER_AND_ISR.oldestVersion(); version < LEADER_AND_ISR.latestVersion(); version++) {
LeaderAndIsrRequest.Builder builder = new LeaderAndIsrRequest.Builder(version, 0, 0, 0,
Collections.emptyList(), Collections.emptySet());
Collections.emptyList(), Collections.singletonMap("topic", id), Collections.emptySet());
LeaderAndIsrRequest request = builder.build();
LeaderAndIsrResponse response = request.getErrorResponse(0,
new ClusterAuthorizationException("Not authorized"));
assertEquals(Errors.CLUSTER_AUTHORIZATION_FAILED, response.error());
if (version < 5) {
assertEquals(0, response.topics().size());
} else {
assertEquals(id, response.topics().get(0).topicId());
}
}
}
@ -115,8 +124,13 @@ public class LeaderAndIsrRequestTest {
new Node(0, "host0", 9090),
new Node(1, "host1", 9091)
);
Map<String, Uuid> topicIds = new HashMap<>();
topicIds.put("topic0", Uuid.randomUuid());
topicIds.put("topic1", Uuid.randomUuid());
LeaderAndIsrRequest request = new LeaderAndIsrRequest.Builder(version, 1, 2, 3, partitionStates,
liveNodes).build();
topicIds, liveNodes).build();
List<LeaderAndIsrLiveLeader> liveLeaders = liveNodes.stream().map(n -> new LeaderAndIsrLiveLeader()
.setBrokerId(n.id())
@ -140,7 +154,21 @@ public class LeaderAndIsrRequestTest {
.setRemovingReplicas(emptyList());
}
// Prior to version 2, there were no TopicStates, so a map of Topic Ids from a list of
// TopicStates is an empty map.
if (version < 2) {
topicIds = new HashMap<>();
}
// In versions 2-4 there are TopicStates, but no topicIds, so deserialized requests will have
// Zero Uuids in place.
if (version > 1 && version < 5) {
topicIds.put("topic0", Uuid.ZERO_UUID);
topicIds.put("topic1", Uuid.ZERO_UUID);
}
assertEquals(new HashSet<>(partitionStates), iterableToSet(deserializedRequest.partitionStates()));
assertEquals(topicIds, deserializedRequest.topicIds());
assertEquals(liveLeaders, deserializedRequest.liveLeaders());
assertEquals(1, request.controllerId());
assertEquals(2, request.controllerEpoch());
@ -152,13 +180,15 @@ public class LeaderAndIsrRequestTest {
public void testTopicPartitionGroupingSizeReduction() {
Set<TopicPartition> tps = TestUtils.generateRandomTopicPartitions(10, 10);
List<LeaderAndIsrPartitionState> partitionStates = new ArrayList<>();
Map<String, Uuid> topicIds = new HashMap<>();
for (TopicPartition tp : tps) {
partitionStates.add(new LeaderAndIsrPartitionState()
.setTopicName(tp.topic())
.setPartitionIndex(tp.partition()));
topicIds.put(tp.topic(), Uuid.randomUuid());
}
LeaderAndIsrRequest.Builder builder = new LeaderAndIsrRequest.Builder((short) 2, 0, 0, 0,
partitionStates, Collections.emptySet());
partitionStates, topicIds, Collections.emptySet());
LeaderAndIsrRequest v2 = builder.build((short) 2);
LeaderAndIsrRequest v1 = builder.build((short) 1);

View File

@ -16,8 +16,10 @@
*/
package org.apache.kafka.common.requests;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState;
import org.apache.kafka.common.message.LeaderAndIsrResponseData;
import org.apache.kafka.common.message.LeaderAndIsrResponseData.LeaderAndIsrTopicError;
import org.apache.kafka.common.message.LeaderAndIsrResponseData.LeaderAndIsrPartitionError;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
@ -29,6 +31,7 @@ import java.util.List;
import java.util.Map;
import static java.util.Arrays.asList;
import static org.apache.kafka.common.protocol.ApiKeys.LEADER_AND_ISR;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@ -57,46 +60,87 @@ public class LeaderAndIsrResponseTest {
.setZkVersion(20)
.setReplicas(Collections.singletonList(10))
.setIsNew(false));
Map<String, Uuid> topicIds = Collections.singletonMap("foo", Uuid.randomUuid());
LeaderAndIsrRequest request = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion(),
15, 20, 0, partitionStates, Collections.emptySet()).build();
15, 20, 0, partitionStates, topicIds, Collections.emptySet()).build();
LeaderAndIsrResponse response = request.getErrorResponse(0, Errors.CLUSTER_AUTHORIZATION_FAILED.exception());
assertEquals(Collections.singletonMap(Errors.CLUSTER_AUTHORIZATION_FAILED, 3), response.errorCounts());
}
@Test
public void testErrorCountsWithTopLevelError() {
List<LeaderAndIsrPartitionError> partitions = createPartitions("foo",
asList(Errors.NONE, Errors.NOT_LEADER_OR_FOLLOWER));
LeaderAndIsrResponse response = new LeaderAndIsrResponse(new LeaderAndIsrResponseData()
.setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code())
.setPartitionErrors(partitions));
assertEquals(Collections.singletonMap(Errors.UNKNOWN_SERVER_ERROR, 3), response.errorCounts());
for (short version = LEADER_AND_ISR.oldestVersion(); version < LEADER_AND_ISR.latestVersion(); version++) {
LeaderAndIsrResponse response;
if (version < 5) {
List<LeaderAndIsrPartitionError> partitions = createPartitions("foo",
asList(Errors.NONE, Errors.NOT_LEADER_OR_FOLLOWER));
response = new LeaderAndIsrResponse(new LeaderAndIsrResponseData()
.setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code())
.setPartitionErrors(partitions), version);
} else {
Uuid id = Uuid.randomUuid();
List<LeaderAndIsrTopicError> topics = createTopic(id, asList(Errors.NONE, Errors.NOT_LEADER_OR_FOLLOWER));
response = new LeaderAndIsrResponse(new LeaderAndIsrResponseData()
.setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code())
.setTopics(topics), version);
}
assertEquals(Collections.singletonMap(Errors.UNKNOWN_SERVER_ERROR, 3), response.errorCounts());
}
}
@Test
public void testErrorCountsNoTopLevelError() {
List<LeaderAndIsrPartitionError> partitions = createPartitions("foo",
asList(Errors.NONE, Errors.CLUSTER_AUTHORIZATION_FAILED));
LeaderAndIsrResponse response = new LeaderAndIsrResponse(new LeaderAndIsrResponseData()
.setErrorCode(Errors.NONE.code())
.setPartitionErrors(partitions));
Map<Errors, Integer> errorCounts = response.errorCounts();
assertEquals(2, errorCounts.size());
assertEquals(2, errorCounts.get(Errors.NONE).intValue());
assertEquals(1, errorCounts.get(Errors.CLUSTER_AUTHORIZATION_FAILED).intValue());
for (short version = LEADER_AND_ISR.oldestVersion(); version < LEADER_AND_ISR.latestVersion(); version++) {
LeaderAndIsrResponse response;
if (version < 5) {
List<LeaderAndIsrPartitionError> partitions = createPartitions("foo",
asList(Errors.NONE, Errors.CLUSTER_AUTHORIZATION_FAILED));
response = new LeaderAndIsrResponse(new LeaderAndIsrResponseData()
.setErrorCode(Errors.NONE.code())
.setPartitionErrors(partitions), version);
} else {
Uuid id = Uuid.randomUuid();
List<LeaderAndIsrTopicError> topics = createTopic(id, asList(Errors.NONE, Errors.CLUSTER_AUTHORIZATION_FAILED));
response = new LeaderAndIsrResponse(new LeaderAndIsrResponseData()
.setErrorCode(Errors.NONE.code())
.setTopics(topics), version);
}
Map<Errors, Integer> errorCounts = response.errorCounts();
assertEquals(2, errorCounts.size());
assertEquals(2, errorCounts.get(Errors.NONE).intValue());
assertEquals(1, errorCounts.get(Errors.CLUSTER_AUTHORIZATION_FAILED).intValue());
}
}
@Test
public void testToString() {
List<LeaderAndIsrPartitionError> partitions = createPartitions("foo",
asList(Errors.NONE, Errors.CLUSTER_AUTHORIZATION_FAILED));
LeaderAndIsrResponse response = new LeaderAndIsrResponse(new LeaderAndIsrResponseData()
.setErrorCode(Errors.NONE.code())
.setPartitionErrors(partitions));
String responseStr = response.toString();
assertTrue(responseStr.contains(LeaderAndIsrResponse.class.getSimpleName()));
assertTrue(responseStr.contains(partitions.toString()));
assertTrue(responseStr.contains("errorCode=" + Errors.NONE.code()));
for (short version = LEADER_AND_ISR.oldestVersion(); version < LEADER_AND_ISR.latestVersion(); version++) {
LeaderAndIsrResponse response;
if (version < 5) {
List<LeaderAndIsrPartitionError> partitions = createPartitions("foo",
asList(Errors.NONE, Errors.CLUSTER_AUTHORIZATION_FAILED));
response = new LeaderAndIsrResponse(new LeaderAndIsrResponseData()
.setErrorCode(Errors.NONE.code())
.setPartitionErrors(partitions), version);
String responseStr = response.toString();
assertTrue(responseStr.contains(LeaderAndIsrResponse.class.getSimpleName()));
assertTrue(responseStr.contains(partitions.toString()));
assertTrue(responseStr.contains("errorCode=" + Errors.NONE.code()));
} else {
Uuid id = Uuid.randomUuid();
List<LeaderAndIsrTopicError> topics = createTopic(id, asList(Errors.NONE, Errors.CLUSTER_AUTHORIZATION_FAILED));
response = new LeaderAndIsrResponse(new LeaderAndIsrResponseData()
.setErrorCode(Errors.NONE.code())
.setTopics(topics), version);
String responseStr = response.toString();
assertTrue(responseStr.contains(LeaderAndIsrResponse.class.getSimpleName()));
assertTrue(responseStr.contains(topics.toString()));
assertTrue(responseStr.contains(id.toString()));
assertTrue(responseStr.contains("errorCode=" + Errors.NONE.code()));
}
}
}
private List<LeaderAndIsrPartitionError> createPartitions(String topicName, List<Errors> errors) {
@ -104,11 +148,27 @@ public class LeaderAndIsrResponseTest {
int partitionIndex = 0;
for (Errors error : errors) {
partitions.add(new LeaderAndIsrPartitionError()
.setTopicName(topicName)
.setPartitionIndex(partitionIndex++)
.setErrorCode(error.code()));
.setTopicName(topicName)
.setPartitionIndex(partitionIndex++)
.setErrorCode(error.code()));
}
return partitions;
}
private List<LeaderAndIsrTopicError> createTopic(Uuid id, List<Errors> errors) {
List<LeaderAndIsrTopicError> topics = new ArrayList<>();
LeaderAndIsrTopicError topic = new LeaderAndIsrTopicError();
topic.setTopicId(id);
List<LeaderAndIsrPartitionError> partitions = new ArrayList<>();
int partitionIndex = 0;
for (Errors error : errors) {
partitions.add(new LeaderAndIsrPartitionError()
.setPartitionIndex(partitionIndex++)
.setErrorCode(error.code()));
}
topic.setPartitionErrors(partitions);
topics.add(topic);
return topics;
}
}

View File

@ -21,6 +21,7 @@ import org.apache.kafka.common.ElectionType;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.acl.AccessControlEntryFilter;
import org.apache.kafka.common.acl.AclBinding;
@ -320,13 +321,12 @@ public class RequestResponseTest {
checkResponse(createStopReplicaResponse(), v, true);
}
checkRequest(createLeaderAndIsrRequest(0), true);
checkErrorResponse(createLeaderAndIsrRequest(0), unknownServerException, false);
checkRequest(createLeaderAndIsrRequest(1), true);
checkErrorResponse(createLeaderAndIsrRequest(1), unknownServerException, false);
checkRequest(createLeaderAndIsrRequest(2), true);
checkErrorResponse(createLeaderAndIsrRequest(2), unknownServerException, false);
checkResponse(createLeaderAndIsrResponse(), 0, true);
for (int v = ApiKeys.LEADER_AND_ISR.oldestVersion(); v <= ApiKeys.LEADER_AND_ISR.latestVersion(); v++) {
checkRequest(createLeaderAndIsrRequest(v), true);
checkErrorResponse(createLeaderAndIsrRequest(v), unknownServerException, false);
checkResponse(createLeaderAndIsrResponse(v), v, true);
}
checkRequest(createSaslHandshakeRequest(), true);
checkErrorResponse(createSaslHandshakeRequest(), unknownServerException, true);
checkResponse(createSaslHandshakeResponse(), 0, true);
@ -1550,18 +1550,37 @@ public class RequestResponseTest {
new Node(0, "test0", 1223),
new Node(1, "test1", 1223)
);
return new LeaderAndIsrRequest.Builder((short) version, 1, 10, 0, partitionStates, leaders).build();
Map<String, Uuid> topicIds = new HashMap<>();
topicIds.put("topic5", Uuid.randomUuid());
topicIds.put("topic20", Uuid.randomUuid());
return new LeaderAndIsrRequest.Builder((short) version, 1, 10, 0,
partitionStates, topicIds, leaders).build();
}
private LeaderAndIsrResponse createLeaderAndIsrResponse() {
List<LeaderAndIsrResponseData.LeaderAndIsrPartitionError> partitions = new ArrayList<>();
partitions.add(new LeaderAndIsrResponseData.LeaderAndIsrPartitionError()
.setTopicName("test")
.setPartitionIndex(0)
.setErrorCode(Errors.NONE.code()));
return new LeaderAndIsrResponse(new LeaderAndIsrResponseData()
.setErrorCode(Errors.NONE.code())
.setPartitionErrors(partitions));
private LeaderAndIsrResponse createLeaderAndIsrResponse(int version) {
if (version < 5) {
List<LeaderAndIsrResponseData.LeaderAndIsrPartitionError> partitions = new ArrayList<>();
partitions.add(new LeaderAndIsrResponseData.LeaderAndIsrPartitionError()
.setTopicName("test")
.setPartitionIndex(0)
.setErrorCode(Errors.NONE.code()));
return new LeaderAndIsrResponse(new LeaderAndIsrResponseData()
.setErrorCode(Errors.NONE.code())
.setPartitionErrors(partitions), (short) version);
} else {
List<LeaderAndIsrResponseData.LeaderAndIsrPartitionError> partition = Collections.singletonList(
new LeaderAndIsrResponseData.LeaderAndIsrPartitionError()
.setPartitionIndex(0)
.setErrorCode(Errors.NONE.code()));
List<LeaderAndIsrResponseData.LeaderAndIsrTopicError> topics = new ArrayList<>();
topics.add(new LeaderAndIsrResponseData.LeaderAndIsrTopicError()
.setTopicId(Uuid.randomUuid())
.setPartitionErrors(partition));
return new LeaderAndIsrResponse(new LeaderAndIsrResponseData()
.setTopics(topics), (short) version);
}
}
private UpdateMetadataRequest createUpdateMetadataRequest(int version, String rack) {
@ -1600,6 +1619,10 @@ public class RequestResponseTest {
.setReplicas(replicas)
.setOfflineReplicas(offlineReplicas));
Map<String, Uuid> topicIds = new HashMap<>();
topicIds.put("topic5", Uuid.randomUuid());
topicIds.put("topic20", Uuid.randomUuid());
SecurityProtocol plaintext = SecurityProtocol.PLAINTEXT;
List<UpdateMetadataEndpoint> endpoints1 = new ArrayList<>();
endpoints1.add(new UpdateMetadataEndpoint()
@ -2541,7 +2564,8 @@ public class RequestResponseTest {
assertEquals(Integer.valueOf(1), createHeartBeatResponse().errorCounts().get(Errors.NONE));
assertEquals(Integer.valueOf(1), createIncrementalAlterConfigsResponse().errorCounts().get(Errors.NONE));
assertEquals(Integer.valueOf(1), createJoinGroupResponse(JOIN_GROUP.latestVersion()).errorCounts().get(Errors.NONE));
assertEquals(Integer.valueOf(2), createLeaderAndIsrResponse().errorCounts().get(Errors.NONE));
assertEquals(Integer.valueOf(2), createLeaderAndIsrResponse(4).errorCounts().get(Errors.NONE));
assertEquals(Integer.valueOf(2), createLeaderAndIsrResponse(5).errorCounts().get(Errors.NONE));
assertEquals(Integer.valueOf(3), createLeaderEpochResponse().errorCounts().get(Errors.NONE));
assertEquals(Integer.valueOf(1), createLeaveGroupResponse().errorCounts().get(Errors.NONE));
assertEquals(Integer.valueOf(1), createListGroupsResponse(LIST_GROUPS.latestVersion()).errorCounts().get(Errors.NONE));

View File

@ -111,7 +111,7 @@ object ApiVersion {
KAFKA_2_7_IV2,
// Flexible versioning on ListOffsets, WriteTxnMarkers and OffsetsForLeaderEpoch.
KAFKA_2_8_IV0,
// Add topicId to MetadataUpdateRequest
// Introduced topic IDs to LeaderAndIsr and UpdateMetadata requests/responses (KIP-516)
KAFKA_2_8_IV1
)

View File

@ -455,7 +455,8 @@ abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig,
private def sendLeaderAndIsrRequest(controllerEpoch: Int, stateChangeLog: StateChangeLogger): Unit = {
val leaderAndIsrRequestVersion: Short =
if (config.interBrokerProtocolVersion >= KAFKA_2_4_IV1) 4
if (config.interBrokerProtocolVersion >= KAFKA_2_8_IV1) 5
else if (config.interBrokerProtocolVersion >= KAFKA_2_4_IV1) 4
else if (config.interBrokerProtocolVersion >= KAFKA_2_4_IV0) 3
else if (config.interBrokerProtocolVersion >= KAFKA_2_2_IV0) 2
else if (config.interBrokerProtocolVersion >= KAFKA_1_0_IV0) 1
@ -482,8 +483,13 @@ abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig,
_.node(config.interBrokerListenerName)
}
val brokerEpoch = controllerContext.liveBrokerIdAndEpochs(broker)
val topicIds = leaderAndIsrPartitionStates.keys
.map(_.topic)
.toSet[String]
.map(topic => (topic, controllerContext.topicIds(topic)))
.toMap
val leaderAndIsrRequestBuilder = new LeaderAndIsrRequest.Builder(leaderAndIsrRequestVersion, controllerId,
controllerEpoch, brokerEpoch, leaderAndIsrPartitionStates.values.toBuffer.asJava, leaders.asJava)
controllerEpoch, brokerEpoch, leaderAndIsrPartitionStates.values.toBuffer.asJava, topicIds.asJava, leaders.asJava)
sendRequest(broker, leaderAndIsrRequestBuilder, (r: AbstractResponse) => {
val leaderAndIsrResponse = r.asInstanceOf[LeaderAndIsrResponse]
sendEvent(LeaderAndIsrResponseReceived(leaderAndIsrResponse, broker))

View File

@ -1378,11 +1378,10 @@ class KafkaController(val config: KafkaConfig,
val offlineReplicas = new ArrayBuffer[TopicPartition]()
val onlineReplicas = new ArrayBuffer[TopicPartition]()
leaderAndIsrResponse.partitions.forEach { partition =>
val tp = new TopicPartition(partition.topicName, partition.partitionIndex)
if (partition.errorCode == Errors.KAFKA_STORAGE_ERROR.code)
leaderAndIsrResponse.partitionErrors(controllerContext.topicNames.asJava).forEach{ case (tp, error) =>
if (error.code() == Errors.KAFKA_STORAGE_ERROR.code)
offlineReplicas += tp
else if (partition.errorCode == Errors.NONE.code)
else if (error.code() == Errors.NONE.code)
onlineReplicas += tp
}

View File

@ -32,7 +32,7 @@ import kafka.message.{BrokerCompressionCodec, CompressionCodec, NoCompressionCod
import kafka.metrics.KafkaMetricsGroup
import kafka.server.checkpoints.LeaderEpochCheckpointFile
import kafka.server.epoch.LeaderEpochFileCache
import kafka.server.{BrokerTopicStats, FetchDataInfo, FetchHighWatermark, FetchIsolation, FetchLogEnd, FetchTxnCommitted, LogDirFailureChannel, LogOffsetMetadata, OffsetAndEpoch}
import kafka.server.{BrokerTopicStats, FetchDataInfo, FetchHighWatermark, FetchIsolation, FetchLogEnd, FetchTxnCommitted, LogDirFailureChannel, LogOffsetMetadata, OffsetAndEpoch, PartitionMetadataFile}
import kafka.utils._
import org.apache.kafka.common.errors._
import org.apache.kafka.common.message.FetchResponseData
@ -43,7 +43,7 @@ import org.apache.kafka.common.requests.ListOffsetsRequest
import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.UNDEFINED_EPOCH_OFFSET
import org.apache.kafka.common.requests.ProduceResponse.RecordError
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.{InvalidRecordException, KafkaException, TopicPartition}
import org.apache.kafka.common.{InvalidRecordException, KafkaException, TopicPartition, Uuid}
import scala.jdk.CollectionConverters._
import scala.collection.mutable.{ArrayBuffer, ListBuffer}
@ -296,11 +296,16 @@ class Log(@volatile private var _dir: File,
// Visible for testing
@volatile var leaderEpochCache: Option[LeaderEpochFileCache] = None
@volatile var partitionMetadataFile : Option[PartitionMetadataFile] = None
@volatile var topicId : Uuid = Uuid.ZERO_UUID
locally {
// create the log directory if it doesn't exist
Files.createDirectories(dir.toPath)
initializeLeaderEpochCache()
initializePartitionMetadata()
val nextOffset = loadSegments()
@ -324,6 +329,12 @@ class Log(@volatile private var _dir: File,
// deletion.
producerStateManager.removeStraySnapshots(segments.values().asScala.map(_.baseOffset).toSeq)
loadProducerState(logEndOffset, reloadFromCleanShutdown = hadCleanShutdown)
// Recover topic ID if present
partitionMetadataFile.foreach { file =>
if (!file.isEmpty())
topicId = file.read().topicId
}
}
def dir: File = _dir
@ -536,6 +547,11 @@ class Log(@volatile private var _dir: File,
private def recordVersion: RecordVersion = config.messageFormatVersion.recordVersion
private def initializePartitionMetadata(): Unit = lock synchronized {
val partitionMetadata = PartitionMetadataFile.newFile(dir)
partitionMetadataFile = Some(new PartitionMetadataFile(partitionMetadata, logDirFailureChannel))
}
private def initializeLeaderEpochCache(): Unit = lock synchronized {
val leaderEpochFile = LeaderEpochCheckpointFile.newFile(dir)
@ -1003,6 +1019,7 @@ class Log(@volatile private var _dir: File,
// re-initialize leader epoch cache so that LeaderEpochCheckpointFile.checkpoint can correctly reference
// the checkpoint file in renamed log directory
initializeLeaderEpochCache()
initializePartitionMetadata()
}
}
}

View File

@ -0,0 +1,144 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server
import java.io.{BufferedReader, BufferedWriter, File, FileOutputStream, IOException, OutputStreamWriter}
import java.nio.charset.StandardCharsets
import java.nio.file.{FileAlreadyExistsException, Files, Paths}
import java.util.regex.Pattern
import kafka.utils.Logging
import org.apache.kafka.common.Uuid
import org.apache.kafka.common.errors.KafkaStorageException
import org.apache.kafka.common.utils.Utils
object PartitionMetadataFile {
private val PartitionMetadataFilename = "partition.metadata"
private val WhiteSpacesPattern = Pattern.compile(":\\s+")
private val CurrentVersion = 0
def newFile(dir: File): File = new File(dir, PartitionMetadataFilename)
object PartitionMetadataFileFormatter {
def toFile(data: PartitionMetadata): String = {
s"version: ${data.version}\ntopic_id: ${data.topicId}"
}
}
class PartitionMetadataReadBuffer[T](location: String,
reader: BufferedReader,
version: Int) extends Logging {
def read(): PartitionMetadata = {
def malformedLineException(line: String) =
new IOException(s"Malformed line in checkpoint file ($location): '$line'")
var line: String = null
var metadataTopicId: Uuid = null
try {
line = reader.readLine()
WhiteSpacesPattern.split(line) match {
case Array(_, version) =>
if (version.toInt == CurrentVersion) {
line = reader.readLine()
WhiteSpacesPattern.split(line) match {
case Array(_, topicId) => metadataTopicId = Uuid.fromString(topicId)
case _ => throw malformedLineException(line)
}
if (metadataTopicId.equals(Uuid.ZERO_UUID)) {
throw new IOException(s"Invalid topic ID in partition metadata file ($location)")
}
new PartitionMetadata(CurrentVersion, metadataTopicId)
} else {
throw new IOException(s"Unrecognized version of partition metadata file ($location): " + version)
}
case _ => throw malformedLineException(line)
}
} catch {
case _: NumberFormatException => throw malformedLineException(line)
}
}
}
}
class PartitionMetadata(val version: Int, val topicId: Uuid)
class PartitionMetadataFile(val file: File,
logDirFailureChannel: LogDirFailureChannel) extends Logging {
import kafka.server.PartitionMetadataFile.{CurrentVersion, PartitionMetadataFileFormatter, PartitionMetadataReadBuffer}
private val path = file.toPath.toAbsolutePath
private val tempPath = Paths.get(path.toString + ".tmp")
private val lock = new Object()
private val logDir = file.getParentFile.getParent
try Files.createFile(file.toPath) // create the file if it doesn't exist
catch { case _: FileAlreadyExistsException => }
def write(topicId: Uuid): Unit = {
lock synchronized {
try {
// write to temp file and then swap with the existing file
val fileOutputStream = new FileOutputStream(tempPath.toFile)
val writer = new BufferedWriter(new OutputStreamWriter(fileOutputStream, StandardCharsets.UTF_8))
try {
writer.write(PartitionMetadataFileFormatter.toFile(new PartitionMetadata(CurrentVersion,topicId)))
writer.flush()
fileOutputStream.getFD().sync()
} finally {
writer.close()
}
Utils.atomicMoveWithFallback(tempPath, path)
} catch {
case e: IOException =>
val msg = s"Error while writing to partition metadata file ${file.getAbsolutePath}"
logDirFailureChannel.maybeAddOfflineLogDir(logDir, msg, e)
throw new KafkaStorageException(msg, e)
}
}
}
def read(): PartitionMetadata = {
lock synchronized {
try {
val reader = Files.newBufferedReader(path)
try {
val partitionBuffer = new PartitionMetadataReadBuffer(file.getAbsolutePath, reader, CurrentVersion)
partitionBuffer.read()
} finally {
reader.close()
}
} catch {
case e: IOException =>
val msg = s"Error while reading partition metadata file ${file.getAbsolutePath}"
logDirFailureChannel.maybeAddOfflineLogDir(logDir, msg, e)
throw new KafkaStorageException(msg, e)
}
}
}
def isEmpty(): Boolean = {
file.length() == 0
}
}

View File

@ -36,12 +36,13 @@ import kafka.server.checkpoints.{LazyOffsetCheckpoints, OffsetCheckpointFile, Of
import kafka.utils._
import kafka.utils.Implicits._
import kafka.zk.KafkaZkClient
import org.apache.kafka.common.{ElectionType, IsolationLevel, Node, TopicPartition}
import org.apache.kafka.common.{ElectionType, IsolationLevel, Node, TopicPartition, Uuid}
import org.apache.kafka.common.errors._
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState
import org.apache.kafka.common.message.DeleteRecordsResponseData.DeleteRecordsPartitionResult
import org.apache.kafka.common.message.{DescribeLogDirsResponseData, FetchResponseData, LeaderAndIsrResponseData}
import org.apache.kafka.common.message.LeaderAndIsrResponseData.LeaderAndIsrTopicError
import org.apache.kafka.common.message.LeaderAndIsrResponseData.LeaderAndIsrPartitionError
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.{OffsetForLeaderTopic}
import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.{OffsetForLeaderTopicResult, EpochEndOffset}
@ -1331,6 +1332,7 @@ class ReplicaManager(val config: KafkaConfig,
s"correlation id $correlationId from controller $controllerId " +
s"epoch ${leaderAndIsrRequest.controllerEpoch}")
}
val topicIds = leaderAndIsrRequest.topicIds()
val response = {
if (leaderAndIsrRequest.controllerEpoch < controllerEpoch) {
@ -1437,6 +1439,24 @@ class ReplicaManager(val config: KafkaConfig,
*/
if (localLog(topicPartition).isEmpty)
markPartitionOffline(topicPartition)
else {
val id = topicIds.get(topicPartition.topic())
// Ensure we have not received a request from an older protocol
if (id != null && !id.equals(Uuid.ZERO_UUID)) {
val log = localLog(topicPartition).get
// Check if topic ID is in memory, if not, it must be new to the broker and does not have a metadata file.
// This is because if the broker previously wrote it to file, it would be recovered on restart after failure.
if (log.topicId.equals(Uuid.ZERO_UUID)) {
log.partitionMetadataFile.get.write(id)
log.topicId = id
// Warn if the topic ID in the request does not match the log.
} else if (!log.topicId.equals(id)) {
stateChangeLogger.warn(s"Topic Id in memory: ${log.topicId.toString} does not" +
s" match the topic Id provided in the request: " +
s"${id.toString}.")
}
}
}
}
// we initialize highwatermark thread after the first leaderisrrequest. This ensures that all the partitions
@ -1448,15 +1468,38 @@ class ReplicaManager(val config: KafkaConfig,
replicaFetcherManager.shutdownIdleFetcherThreads()
replicaAlterLogDirsManager.shutdownIdleFetcherThreads()
onLeadershipChange(partitionsBecomeLeader, partitionsBecomeFollower)
val responsePartitions = responseMap.iterator.map { case (tp, error) =>
new LeaderAndIsrPartitionError()
.setTopicName(tp.topic)
.setPartitionIndex(tp.partition)
.setErrorCode(error.code)
}.toBuffer
new LeaderAndIsrResponse(new LeaderAndIsrResponseData()
.setErrorCode(Errors.NONE.code)
.setPartitionErrors(responsePartitions.asJava))
if (leaderAndIsrRequest.version() < 5) {
val responsePartitions = responseMap.iterator.map { case (tp, error) =>
new LeaderAndIsrPartitionError()
.setTopicName(tp.topic)
.setPartitionIndex(tp.partition)
.setErrorCode(error.code)
}.toBuffer
new LeaderAndIsrResponse(new LeaderAndIsrResponseData()
.setErrorCode(Errors.NONE.code)
.setPartitionErrors(responsePartitions.asJava), leaderAndIsrRequest.version())
} else {
val topics = new mutable.HashMap[String, List[LeaderAndIsrPartitionError]]
responseMap.asJava.forEach { case (tp, error) =>
if (!topics.contains(tp.topic)) {
topics.put(tp.topic, List(new LeaderAndIsrPartitionError()
.setPartitionIndex(tp.partition)
.setErrorCode(error.code)))
} else {
topics.put(tp.topic, new LeaderAndIsrPartitionError()
.setPartitionIndex(tp.partition)
.setErrorCode(error.code)::topics(tp.topic))
}
}
val topicErrors = topics.iterator.map { case (topic, partitionError) =>
new LeaderAndIsrTopicError()
.setTopicId(topicIds.get(topic))
.setPartitionErrors(partitionError.asJava)
}.toBuffer
new LeaderAndIsrResponse(new LeaderAndIsrResponseData()
.setErrorCode(Errors.NONE.code)
.setTopics(topicErrors.asJava), leaderAndIsrRequest.version())
}
}
}
val endMs = time.milliseconds()

View File

@ -507,7 +507,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
/**
* Sets the topic znode with the given assignment.
* @param topic the topic whose assignment is being set.
* @param topicId optional topic ID if the topic has one
* @param topicId unique topic ID for the topic
* @param assignment the partition to replica mapping to set for the given topic
* @param expectedControllerEpochZkVersion expected controller epoch zkVersion.
* @return SetDataResponse

View File

@ -56,7 +56,7 @@ import org.apache.kafka.common.resource.PatternType.{LITERAL, PREFIXED}
import org.apache.kafka.common.resource.ResourceType._
import org.apache.kafka.common.resource.{PatternType, Resource, ResourcePattern, ResourcePatternFilter, ResourceType}
import org.apache.kafka.common.security.auth.{AuthenticationContext, KafkaPrincipal, KafkaPrincipalBuilder, SecurityProtocol}
import org.apache.kafka.common.{ElectionType, IsolationLevel, Node, TopicPartition, requests}
import org.apache.kafka.common.{ElectionType, IsolationLevel, Node, TopicPartition, requests, Uuid}
import org.apache.kafka.test.{TestUtils => JTestUtils}
import org.junit.Assert._
import org.junit.{After, Before, Test}
@ -99,6 +99,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
val brokerId: Integer = 0
val topic = "topic"
val topicId = Uuid.randomUuid()
val topicPattern = "topic.*"
val transactionalId = "transactional.id"
val producerId = 83392L
@ -106,6 +107,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
val correlationId = 0
val clientId = "client-Id"
val tp = new TopicPartition(topic, part)
val topicIds = Collections.singletonMap(topic, topicId)
val topicNames = Collections.singletonMap(topicId, topic)
val logDir = "logDir"
val group = "my-group"
val protocolType = "consumer"
@ -181,7 +184,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
ApiKeys.LEAVE_GROUP -> ((resp: LeaveGroupResponse) => resp.error),
ApiKeys.DELETE_GROUPS -> ((resp: DeleteGroupsResponse) => resp.get(group)),
ApiKeys.LEADER_AND_ISR -> ((resp: requests.LeaderAndIsrResponse) => Errors.forCode(
resp.partitions.asScala.find(p => p.topicName == tp.topic && p.partitionIndex == tp.partition).get.errorCode)),
resp.topics.asScala.find(t => topicNames.get(t.topicId) == tp.topic).get.partitionErrors.asScala.find(
p => p.partitionIndex == tp.partition).get.errorCode)),
ApiKeys.STOP_REPLICA -> ((resp: requests.StopReplicaResponse) => Errors.forCode(
resp.partitionErrors.asScala.find(pe => pe.topicName == tp.topic && pe.partitionIndex == tp.partition).get.errorCode)),
ApiKeys.CONTROLLED_SHUTDOWN -> ((resp: requests.ControlledShutdownResponse) => resp.error),
@ -474,6 +478,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
.setZkVersion(2)
.setReplicas(Seq(brokerId).asJava)
.setIsNew(false)).asJava,
topicIds,
Set(new Node(brokerId, "localhost", 0)).asJava).build()
}

View File

@ -30,6 +30,7 @@ import org.apache.kafka.common.message.StopReplicaResponseData.StopReplicaPartit
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.{AbstractControlRequest, AbstractResponse, LeaderAndIsrRequest, LeaderAndIsrResponse, StopReplicaRequest, StopReplicaResponse, UpdateMetadataRequest, UpdateMetadataResponse}
import org.apache.kafka.common.message.LeaderAndIsrResponseData.LeaderAndIsrTopicError
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.junit.Assert._
import org.junit.Test
@ -72,6 +73,8 @@ class ControllerChannelManagerTest {
assertEquals(1, updateMetadataRequests.size)
val leaderAndIsrRequest = leaderAndIsrRequests.head
val topicIds = leaderAndIsrRequest.topicIds();
val topicNames = topicIds.asScala.map { case (k, v) => (v, k) }
assertEquals(controllerId, leaderAndIsrRequest.controllerId)
assertEquals(controllerEpoch, leaderAndIsrRequest.controllerEpoch)
assertEquals(partitions.keySet,
@ -87,7 +90,10 @@ class ControllerChannelManagerTest {
val LeaderAndIsrResponseReceived(leaderAndIsrResponse, brokerId) = batch.sentEvents.head
assertEquals(2, brokerId)
assertEquals(partitions.keySet,
leaderAndIsrResponse.partitions.asScala.map(p => new TopicPartition(p.topicName, p.partitionIndex)).toSet)
leaderAndIsrResponse.topics.asScala.flatMap(t => t.partitionErrors.asScala.map(p =>
new TopicPartition(topicNames(t.topicId), p.partitionIndex))).toSet)
leaderAndIsrResponse.topics.forEach(topic =>
assertEquals(topicIds.get(topicNames.get(topic.topicId).get), topic.topicId))
}
@Test
@ -157,7 +163,8 @@ class ControllerChannelManagerTest {
for (apiVersion <- ApiVersion.allVersions) {
val leaderAndIsrRequestVersion: Short =
if (apiVersion >= KAFKA_2_4_IV1) 4
if (apiVersion >= KAFKA_2_8_IV1) 5
else if (apiVersion >= KAFKA_2_4_IV1) 4
else if (apiVersion >= KAFKA_2_4_IV0) 3
else if (apiVersion >= KAFKA_2_2_IV0) 2
else if (apiVersion >= KAFKA_1_0_IV0) 1
@ -187,6 +194,21 @@ class ControllerChannelManagerTest {
assertEquals(1, leaderAndIsrRequests.size)
assertEquals(s"IBP $interBrokerProtocolVersion should use version $expectedLeaderAndIsrVersion",
expectedLeaderAndIsrVersion, leaderAndIsrRequests.head.version)
val request = leaderAndIsrRequests.head
val byteBuffer = request.serialize
val deserializedRequest = LeaderAndIsrRequest.parse(byteBuffer, expectedLeaderAndIsrVersion)
if (interBrokerProtocolVersion >= KAFKA_2_8_IV1) {
assertTrue(!request.topicIds().get("foo").equals(Uuid.ZERO_UUID))
assertTrue(!deserializedRequest.topicIds().get("foo").equals(Uuid.ZERO_UUID))
} else if (interBrokerProtocolVersion >= KAFKA_2_2_IV0) {
assertTrue(!request.topicIds().get("foo").equals(Uuid.ZERO_UUID))
assertTrue(deserializedRequest.topicIds().get("foo").equals(Uuid.ZERO_UUID))
} else {
assertTrue(request.topicIds().get("foo") == null)
assertTrue(deserializedRequest.topicIds().get("foo") == null)
}
}
@Test
@ -827,15 +849,18 @@ class ControllerChannelManagerTest {
private def applyLeaderAndIsrResponseCallbacks(error: Errors, sentRequests: List[SentRequest]): Unit = {
sentRequests.filter(_.request.apiKey == ApiKeys.LEADER_AND_ISR).filter(_.responseCallback != null).foreach { sentRequest =>
val leaderAndIsrRequest = sentRequest.request.build().asInstanceOf[LeaderAndIsrRequest]
val partitionErrors = leaderAndIsrRequest.partitionStates.asScala.map(p =>
new LeaderAndIsrPartitionError()
.setTopicName(p.topicName)
.setPartitionIndex(p.partitionIndex)
.setErrorCode(error.code))
val topicIds = leaderAndIsrRequest.topicIds
val topicErrors = leaderAndIsrRequest.data.topicStates.asScala.map(t =>
new LeaderAndIsrTopicError()
.setTopicId(topicIds.get(t.topicName))
.setPartitionErrors(t.partitionStates.asScala.map(p =>
new LeaderAndIsrPartitionError()
.setPartitionIndex(p.partitionIndex)
.setErrorCode(error.code)).asJava))
val leaderAndIsrResponse = new LeaderAndIsrResponse(
new LeaderAndIsrResponseData()
.setErrorCode(error.code)
.setPartitionErrors(partitionErrors.toBuffer.asJava))
.setTopics(topicErrors.toBuffer.asJava), leaderAndIsrRequest.version())
sentRequest.responseCallback(leaderAndIsrResponse)
}
}
@ -871,6 +896,11 @@ class ControllerChannelManagerTest {
}.toMap
context.setLiveBrokers(brokerEpochs)
context.setAllTopics(topics)
for (topic <- topics) {
context.addTopicId(topic, Uuid.randomUuid())
}
// Simple round-robin replica assignment
var leaderIndex = 0

View File

@ -228,8 +228,8 @@ class LogManagerTest {
s.lazyTimeIndex.get
})
// there should be a log file, two indexes, one producer snapshot, and the leader epoch checkpoint
assertEquals("Files should have been deleted", log.numberOfSegments * 4 + 1, log.dir.list.length)
// there should be a log file, two indexes, one producer snapshot, partition metadata, and the leader epoch checkpoint
assertEquals("Files should have been deleted", log.numberOfSegments * 4 + 2, log.dir.list.length)
assertEquals("Should get empty fetch off new log.", 0, readLog(log, offset + 1).records.sizeInBytes)
try {
@ -278,8 +278,8 @@ class LogManagerTest {
time.sleep(log.config.fileDeleteDelayMs + 1)
// there should be a log file, two indexes (the txn index is created lazily),
// and a producer snapshot file per segment, and the leader epoch checkpoint.
assertEquals("Files should have been deleted", log.numberOfSegments * 4 + 1, log.dir.list.length)
// and a producer snapshot file per segment, and the leader epoch checkpoint and partition metadata file.
assertEquals("Files should have been deleted", log.numberOfSegments * 4 + 2, log.dir.list.length)
assertEquals("Should get empty fetch off new log.", 0, readLog(log, offset + 1).records.sizeInBytes)
try {
readLog(log, 0)

View File

@ -30,9 +30,9 @@ import kafka.log.Log.DeleteDirSuffix
import kafka.metrics.KafkaYammerMetrics
import kafka.server.checkpoints.LeaderEpochCheckpointFile
import kafka.server.epoch.{EpochEntry, LeaderEpochFileCache}
import kafka.server.{BrokerState, BrokerTopicStats, FetchDataInfo, FetchHighWatermark, FetchIsolation, FetchLogEnd, FetchTxnCommitted, KafkaConfig, LogDirFailureChannel, LogOffsetMetadata}
import kafka.server.{BrokerState, BrokerTopicStats, FetchDataInfo, FetchHighWatermark, FetchIsolation, FetchLogEnd, FetchTxnCommitted, KafkaConfig, LogDirFailureChannel, LogOffsetMetadata, PartitionMetadataFile}
import kafka.utils._
import org.apache.kafka.common.{InvalidRecordException, KafkaException, TopicPartition}
import org.apache.kafka.common.{InvalidRecordException, KafkaException, TopicPartition, Uuid}
import org.apache.kafka.common.errors._
import org.apache.kafka.common.record.FileRecords.TimestampAndOffset
import org.apache.kafka.common.record.MemoryRecords.RecordFilter
@ -2372,6 +2372,21 @@ class LogTest {
log.close()
}
@Test
def testLogRecoversTopicId(): Unit = {
val logConfig = LogTest.createLogConfig()
var log = createLog(logDir, logConfig)
val topicId = Uuid.randomUuid()
log.partitionMetadataFile.get.write(topicId)
log.close()
// test recovery case
log = createLog(logDir, logConfig)
assertTrue(log.topicId == topicId)
log.close()
}
/**
* Test building the time index on the follower by setting assignOffsets to false.
*/
@ -2906,6 +2921,33 @@ class LogTest {
assertFalse(LeaderEpochCheckpointFile.newFile(this.logDir).exists())
}
@Test
def testTopicIdTransfersAfterDirectoryRename(): Unit = {
val logConfig = LogTest.createLogConfig(segmentBytes = 1000, indexIntervalBytes = 1, maxMessageBytes = 64 * 1024)
val log = createLog(logDir, logConfig)
// Write a topic ID to the partition metadata file to ensure it is transferred correctly.
val id = Uuid.randomUuid()
log.topicId = id
log.partitionMetadataFile.get.write(id)
log.appendAsLeader(TestUtils.records(List(new SimpleRecord("foo".getBytes()))), leaderEpoch = 5)
assertEquals(Some(5), log.latestEpoch)
// Ensure that after a directory rename, the partition metadata file is written to the right location.
val tp = Log.parseTopicPartitionName(log.dir)
log.renameDir(Log.logDeleteDirName(tp))
log.appendAsLeader(TestUtils.records(List(new SimpleRecord("foo".getBytes()))), leaderEpoch = 10)
assertEquals(Some(10), log.latestEpoch)
assertTrue(PartitionMetadataFile.newFile(log.dir).exists())
assertFalse(PartitionMetadataFile.newFile(this.logDir).exists())
// Check the topic ID remains in memory and was copied correctly.
assertEquals(id, log.topicId)
assertTrue(!log.partitionMetadataFile.isEmpty)
assertEquals(id, log.partitionMetadataFile.get.read().topicId)
}
@Test
def testLeaderEpochCacheClearedAfterDowngradeInAppendedMessages(): Unit = {
val logConfig = LogTest.createLogConfig(segmentBytes = 1000, indexIntervalBytes = 1, maxMessageBytes = 64 * 1024)

View File

@ -25,7 +25,7 @@ import kafka.controller.{ControllerChannelManager, ControllerContext, StateChang
import kafka.utils.TestUtils
import kafka.utils.TestUtils.createTopic
import kafka.zk.ZooKeeperTestHarness
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.{TopicPartition, Uuid}
import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState
import org.apache.kafka.common.message.StopReplicaRequestData.{StopReplicaPartitionState, StopReplicaTopicState}
import org.apache.kafka.common.message.UpdateMetadataRequestData.{UpdateMetadataBroker, UpdateMetadataEndpoint, UpdateMetadataPartitionState}
@ -112,6 +112,7 @@ class BrokerEpochIntegrationTest extends ZooKeeperTestHarness {
private def testControlRequestWithBrokerEpoch(epochInRequestDiffFromCurrentEpoch: Long): Unit = {
val tp = new TopicPartition("new-topic", 0)
val topicIds = Collections.singletonMap("new-topic", Uuid.randomUuid)
// create topic with 1 partition, 2 replicas, one on each broker
createTopic(zkClient, tp.topic(), partitionReplicaAssignment = Map(0 -> Seq(brokerId1, brokerId2)), servers = servers)
@ -155,7 +156,7 @@ class BrokerEpochIntegrationTest extends ZooKeeperTestHarness {
val requestBuilder = new LeaderAndIsrRequest.Builder(
ApiKeys.LEADER_AND_ISR.latestVersion, controllerId, controllerEpoch,
epochInRequest,
partitionStates.asJava, nodes.toSet.asJava)
partitionStates.asJava, topicIds, nodes.toSet.asJava)
if (epochInRequestDiffFromCurrentEpoch < 0) {
// stale broker epoch in LEADER_AND_ISR

View File

@ -65,7 +65,7 @@ import org.apache.kafka.common.requests.{FetchMetadata => JFetchMetadata, _}
import org.apache.kafka.common.resource.{PatternType, Resource, ResourcePattern, ResourceType}
import org.apache.kafka.common.security.auth.{KafkaPrincipal, KafkaPrincipalSerde, SecurityProtocol}
import org.apache.kafka.common.utils.ProducerIdAndEpoch
import org.apache.kafka.common.{IsolationLevel, Node, TopicPartition}
import org.apache.kafka.common.{IsolationLevel, Node, TopicPartition, Uuid}
import org.apache.kafka.server.authorizer.{Action, AuthorizationResult, Authorizer}
import org.easymock.EasyMock._
import org.easymock.{Capture, EasyMock, IAnswer, IArgumentMatcher}
@ -2681,12 +2681,13 @@ class KafkaApisTest {
controllerEpoch,
brokerEpochInRequest,
partitionStates,
Collections.singletonMap("topicW", Uuid.randomUuid()),
asList(new Node(0, "host0", 9090), new Node(1, "host1", 9091))
).build()
val request = buildRequest(leaderAndIsrRequest)
val response = new LeaderAndIsrResponse(new LeaderAndIsrResponseData()
.setErrorCode(Errors.NONE.code)
.setPartitionErrors(asList()))
.setPartitionErrors(asList()), leaderAndIsrRequest.version())
EasyMock.expect(controller.brokerEpoch).andStubReturn(currentBrokerEpoch)
EasyMock.expect(replicaManager.becomeLeaderOrFollower(

View File

@ -17,7 +17,9 @@
package kafka.server
import org.apache.kafka.common.TopicPartition
import java.util.Collections
import org.apache.kafka.common.{TopicPartition, Uuid}
import scala.jdk.CollectionConverters._
import kafka.api.LeaderAndIsr
@ -155,7 +157,8 @@ class LeaderElectionTest extends ZooKeeperTestHarness {
)
val requestBuilder = new LeaderAndIsrRequest.Builder(
ApiKeys.LEADER_AND_ISR.latestVersion, controllerId, staleControllerEpoch,
servers(brokerId2).kafkaController.brokerEpoch, partitionStates.asJava, nodes.toSet.asJava)
servers(brokerId2).kafkaController.brokerEpoch, partitionStates.asJava,
Collections.singletonMap(topic, Uuid.randomUuid()), nodes.toSet.asJava)
controllerChannelManager.sendRequest(brokerId2, requestBuilder, staleControllerEpochCallback)
TestUtils.waitUntilTrue(() => staleControllerEpochDetected, "Controller epoch should be stale")

View File

@ -21,7 +21,7 @@ import java.io.File
import java.net.InetAddress
import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference}
import java.util.concurrent.{CountDownLatch, TimeUnit}
import java.util.{Optional, Properties}
import java.util.{Collections, Optional, Properties}
import kafka.api._
import kafka.log.{AppendOrigin, Log, LogConfig, LogManager, ProducerStateManager}
@ -50,7 +50,7 @@ import org.apache.kafka.common.requests._
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.common.{IsolationLevel, Node, TopicPartition}
import org.apache.kafka.common.{IsolationLevel, Node, TopicPartition, Uuid}
import org.easymock.EasyMock
import org.junit.Assert._
import org.junit.{After, Before, Test}
@ -174,6 +174,7 @@ class ReplicaManagerTest {
try {
val brokerList = Seq[Integer](0, 1).asJava
val topicIds = Collections.singletonMap(topic, Uuid.randomUuid())
val partition = rm.createPartition(new TopicPartition(topic, 0))
partition.createLogIfNotExists(isNew = false, isFutureReplica = false,
@ -190,6 +191,7 @@ class ReplicaManagerTest {
.setZkVersion(0)
.setReplicas(brokerList)
.setIsNew(false)).asJava,
topicIds,
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
rm.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => ())
rm.getPartitionOrException(new TopicPartition(topic, 0))
@ -212,6 +214,7 @@ class ReplicaManagerTest {
.setZkVersion(0)
.setReplicas(brokerList)
.setIsNew(false)).asJava,
topicIds,
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
rm.becomeLeaderOrFollower(1, leaderAndIsrRequest2, (_, _) => ())
@ -236,6 +239,7 @@ class ReplicaManagerTest {
replicaManager.createPartition(topicPartition)
.createLogIfNotExists(isNew = false, isFutureReplica = false,
new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints))
val topicIds = Collections.singletonMap(topic, Uuid.randomUuid())
def leaderAndIsrRequest(epoch: Int): LeaderAndIsrRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
Seq(new LeaderAndIsrPartitionState()
@ -248,6 +252,7 @@ class ReplicaManagerTest {
.setZkVersion(0)
.setReplicas(brokerList)
.setIsNew(true)).asJava,
topicIds,
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(0), (_, _) => ())
@ -307,6 +312,7 @@ class ReplicaManagerTest {
.setZkVersion(0)
.setReplicas(brokerList)
.setIsNew(true)).asJava,
Collections.singletonMap(topic, Uuid.randomUuid()),
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => ())
replicaManager.getPartitionOrException(new TopicPartition(topic, 0))
@ -367,6 +373,7 @@ class ReplicaManagerTest {
.setZkVersion(0)
.setReplicas(brokerList)
.setIsNew(true)).asJava,
Collections.singletonMap(topic, Uuid.randomUuid()),
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => ())
replicaManager.getPartitionOrException(new TopicPartition(topic, 0))
@ -473,6 +480,7 @@ class ReplicaManagerTest {
.setZkVersion(0)
.setReplicas(brokerList)
.setIsNew(true)).asJava,
Collections.singletonMap(topic, Uuid.randomUuid()),
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => ())
replicaManager.getPartitionOrException(new TopicPartition(topic, 0))
@ -549,6 +557,7 @@ class ReplicaManagerTest {
.setZkVersion(0)
.setReplicas(brokerList)
.setIsNew(false)).asJava,
Collections.singletonMap(topic, Uuid.randomUuid()),
Set(new Node(0, "host1", 0), new Node(1, "host2", 1), new Node(2, "host2", 2)).asJava).build()
rm.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => ())
rm.getPartitionOrException(new TopicPartition(topic, 0))
@ -605,6 +614,7 @@ class ReplicaManagerTest {
.setIsNew(true)
val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
Seq(leaderAndIsrPartitionState).asJava,
Collections.singletonMap(topic, Uuid.randomUuid()),
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
val leaderAndIsrResponse = replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) => ())
assertEquals(Errors.NONE, leaderAndIsrResponse.error)
@ -696,6 +706,7 @@ class ReplicaManagerTest {
replicaManager.createPartition(tp1).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints)
val partition0Replicas = Seq[Integer](0, 1).asJava
val partition1Replicas = Seq[Integer](0, 2).asJava
val topicIds = Map(tp0.topic -> Uuid.randomUuid(), tp1.topic -> Uuid.randomUuid()).asJava
val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
Seq(
new LeaderAndIsrPartitionState()
@ -719,6 +730,7 @@ class ReplicaManagerTest {
.setReplicas(partition1Replicas)
.setIsNew(true)
).asJava,
topicIds,
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) => ())
@ -832,6 +844,7 @@ class ReplicaManagerTest {
val leaderAndIsrRequest0 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion,
controllerId, controllerEpoch, brokerEpoch,
Seq(leaderAndIsrPartitionState(tp, leaderEpoch, leaderBrokerId, aliveBrokerIds)).asJava,
Collections.singletonMap(topic, Uuid.randomUuid()),
Set(new Node(followerBrokerId, "host1", 0),
new Node(leaderBrokerId, "host2", 1)).asJava).build()
replicaManager.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest0,
@ -908,6 +921,7 @@ class ReplicaManagerTest {
.setZkVersion(0)
.setReplicas(brokerList)
.setIsNew(false)).asJava,
Collections.singletonMap(topic, Uuid.randomUuid()),
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
replicaManager.becomeLeaderOrFollower(1, leaderAndIsrRequest2, (_, _) => ())
@ -957,6 +971,7 @@ class ReplicaManagerTest {
.setZkVersion(0)
.setReplicas(brokerList)
.setIsNew(false)).asJava,
Collections.singletonMap(topic, Uuid.randomUuid()),
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
replicaManager.becomeLeaderOrFollower(1, leaderAndIsrRequest2, (_, _) => ())
@ -1007,6 +1022,7 @@ class ReplicaManagerTest {
.setZkVersion(0)
.setReplicas(brokerList)
.setIsNew(false)).asJava,
Collections.singletonMap(topic, Uuid.randomUuid()),
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
replicaManager.becomeLeaderOrFollower(1, leaderAndIsrRequest2, (_, _) => ())
@ -1088,6 +1104,7 @@ class ReplicaManagerTest {
.setZkVersion(0)
.setReplicas(partition0Replicas)
.setIsNew(true)).asJava,
Collections.singletonMap(tp0.topic, Uuid.randomUuid()),
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
replicaManager.becomeLeaderOrFollower(0, becomeFollowerRequest, (_, _) => ())
@ -1128,6 +1145,7 @@ class ReplicaManagerTest {
.setZkVersion(0)
.setReplicas(partition0Replicas)
.setIsNew(true)).asJava,
Collections.singletonMap(tp0.topic, Uuid.randomUuid()),
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
replicaManager.becomeLeaderOrFollower(1, becomeLeaderRequest, (_, _) => ())
@ -1161,6 +1179,7 @@ class ReplicaManagerTest {
val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints)
val partition0Replicas = Seq[Integer](0, 1).asJava
val topicIds = Collections.singletonMap(tp0.topic, Uuid.randomUuid())
val becomeLeaderRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
Seq(new LeaderAndIsrPartitionState()
@ -1173,6 +1192,7 @@ class ReplicaManagerTest {
.setZkVersion(0)
.setReplicas(partition0Replicas)
.setIsNew(true)).asJava,
topicIds,
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
replicaManager.becomeLeaderOrFollower(1, becomeLeaderRequest, (_, _) => ())
@ -1193,6 +1213,7 @@ class ReplicaManagerTest {
.setZkVersion(0)
.setReplicas(partition0Replicas)
.setIsNew(true)).asJava,
topicIds,
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
replicaManager.becomeLeaderOrFollower(0, becomeFollowerRequest, (_, _) => ())
@ -1209,6 +1230,7 @@ class ReplicaManagerTest {
val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints)
val partition0Replicas = Seq[Integer](0, 1).asJava
val topicIds = Collections.singletonMap(tp0.topic, Uuid.randomUuid())
val becomeLeaderRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
Seq(new LeaderAndIsrPartitionState()
@ -1221,6 +1243,7 @@ class ReplicaManagerTest {
.setZkVersion(0)
.setReplicas(partition0Replicas)
.setIsNew(true)).asJava,
topicIds,
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
replicaManager.becomeLeaderOrFollower(1, becomeLeaderRequest, (_, _) => ())
@ -1242,6 +1265,7 @@ class ReplicaManagerTest {
.setZkVersion(0)
.setReplicas(partition0Replicas)
.setIsNew(true)).asJava,
topicIds,
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
replicaManager.becomeLeaderOrFollower(0, becomeFollowerRequest, (_, _) => ())
@ -1269,6 +1293,7 @@ class ReplicaManagerTest {
.setZkVersion(0)
.setReplicas(partition0Replicas)
.setIsNew(true)).asJava,
Collections.singletonMap(tp0.topic, Uuid.randomUuid()),
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
replicaManager.becomeLeaderOrFollower(1, becomeLeaderRequest, (_, _) => ())
@ -1311,6 +1336,7 @@ class ReplicaManagerTest {
.setZkVersion(0)
.setReplicas(partition0Replicas)
.setIsNew(true)).asJava,
Collections.singletonMap(tp0.topic, Uuid.randomUuid()),
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
replicaManager.becomeLeaderOrFollower(1, becomeLeaderRequest, (_, _) => ())
@ -1354,6 +1380,7 @@ class ReplicaManagerTest {
.setZkVersion(0)
.setReplicas(partition0Replicas)
.setIsNew(true)).asJava,
Collections.singletonMap(tp0.topic, Uuid.randomUuid()),
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
replicaManager.becomeLeaderOrFollower(1, becomeLeaderRequest, (_, _) => ())
@ -1732,6 +1759,7 @@ class ReplicaManagerTest {
val tp1 = new TopicPartition(topic, 1)
val partition0Replicas = Seq[Integer](0, 1).asJava
val partition1Replicas = Seq[Integer](1, 0).asJava
val topicIds = Map(tp0.topic -> Uuid.randomUuid(), tp1.topic -> Uuid.randomUuid()).asJava
val leaderAndIsrRequest1 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion,
controllerId, 0, brokerEpoch,
@ -1757,6 +1785,7 @@ class ReplicaManagerTest {
.setReplicas(partition1Replicas)
.setIsNew(true)
).asJava,
topicIds,
Set(new Node(0, "host0", 0), new Node(1, "host1", 1)).asJava).build()
rm0.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest1, (_, _) => ())
@ -1787,6 +1816,7 @@ class ReplicaManagerTest {
.setReplicas(partition1Replicas)
.setIsNew(true)
).asJava,
topicIds,
Set(new Node(0, "host0", 0), new Node(1, "host1", 1)).asJava).build()
rm0.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest2, (_, _) => ())
@ -1821,6 +1851,7 @@ class ReplicaManagerTest {
val tp1 = new TopicPartition(topic, 1)
val partition0Replicas = Seq[Integer](1, 0).asJava
val partition1Replicas = Seq[Integer](1, 0).asJava
val topicIds = Map(tp0.topic -> Uuid.randomUuid(), tp1.topic -> Uuid.randomUuid()).asJava
val leaderAndIsrRequest1 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion,
controllerId, 0, brokerEpoch,
@ -1846,6 +1877,7 @@ class ReplicaManagerTest {
.setReplicas(partition1Replicas)
.setIsNew(true)
).asJava,
topicIds,
Set(new Node(0, "host0", 0), new Node(1, "host1", 1)).asJava).build()
rm0.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest1, (_, _) => ())
@ -1876,6 +1908,7 @@ class ReplicaManagerTest {
.setReplicas(partition1Replicas)
.setIsNew(true)
).asJava,
topicIds,
Set(new Node(0, "host0", 0), new Node(1, "host1", 1)).asJava).build()
rm0.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest2, (_, _) => ())
@ -1935,6 +1968,7 @@ class ReplicaManagerTest {
val becomeLeaderRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 10, brokerEpoch,
Seq(leaderAndIsrPartitionState(tp0, 1, 0, Seq(0, 1), true)).asJava,
Collections.singletonMap(topic, Uuid.randomUuid()),
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava
).build()
@ -1961,6 +1995,7 @@ class ReplicaManagerTest {
val becomeLeaderRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
Seq(leaderAndIsrPartitionState(tp0, 1, 0, Seq(0, 1), true)).asJava,
Collections.singletonMap(topic, Uuid.randomUuid()),
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava
).build()
@ -2110,6 +2145,7 @@ class ReplicaManagerTest {
val becomeLeaderRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
Seq(leaderAndIsrPartitionState(tp0, 1, 0, Seq(0, 1), true)).asJava,
Collections.singletonMap(tp0.topic(), Uuid.randomUuid()),
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava
).build()
@ -2176,4 +2212,100 @@ class ReplicaManagerTest {
replicaManager.shutdown(false)
}
}
@Test
def testPartitionMetadataFile() = {
val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time))
try {
val brokerList = Seq[Integer](0, 1).asJava
val topicPartition = new TopicPartition(topic, 0)
replicaManager.createPartition(topicPartition)
.createLogIfNotExists(isNew = false, isFutureReplica = false,
new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints))
val topicIds = Collections.singletonMap(topic, Uuid.randomUuid())
def leaderAndIsrRequest(epoch: Int): LeaderAndIsrRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
Seq(new LeaderAndIsrPartitionState()
.setTopicName(topic)
.setPartitionIndex(0)
.setControllerEpoch(0)
.setLeader(0)
.setLeaderEpoch(epoch)
.setIsr(brokerList)
.setZkVersion(0)
.setReplicas(brokerList)
.setIsNew(true)).asJava,
topicIds,
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(0), (_, _) => ())
assertFalse(replicaManager.localLog(topicPartition).isEmpty)
val id = topicIds.get(topicPartition.topic())
val log = replicaManager.localLog(topicPartition).get
assertFalse(log.partitionMetadataFile.isEmpty)
assertFalse(log.partitionMetadataFile.get.isEmpty())
val partitionMetadata = log.partitionMetadataFile.get.read()
// Current version of PartitionMetadataFile is 0.
assertEquals(0, partitionMetadata.version)
assertEquals(id, partitionMetadata.topicId)
} finally replicaManager.shutdown(checkpointHW = false)
}
@Test
def testPartitionMetadataFileNotCreated() = {
val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time))
try {
val brokerList = Seq[Integer](0, 1).asJava
val topicPartition = new TopicPartition(topic, 0)
val topicPartitionFoo = new TopicPartition("foo", 0)
replicaManager.createPartition(topicPartition)
.createLogIfNotExists(isNew = false, isFutureReplica = false,
new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints))
val topicIds = Map(topic -> Uuid.ZERO_UUID, "foo" -> Uuid.randomUuid()).asJava
def leaderAndIsrRequest(epoch: Int, name: String, version: Short): LeaderAndIsrRequest = LeaderAndIsrRequest.parse(
new LeaderAndIsrRequest.Builder(version, 0, 0, brokerEpoch,
Seq(new LeaderAndIsrPartitionState()
.setTopicName(name)
.setPartitionIndex(0)
.setControllerEpoch(0)
.setLeader(0)
.setLeaderEpoch(epoch)
.setIsr(brokerList)
.setZkVersion(0)
.setReplicas(brokerList)
.setIsNew(true)).asJava,
topicIds,
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build().serialize(), version)
// The file has no contents if the topic does not have an associated topic ID.
replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(0, "fakeTopic", ApiKeys.LEADER_AND_ISR.latestVersion), (_, _) => ())
assertFalse(replicaManager.localLog(topicPartition).isEmpty)
val log = replicaManager.localLog(topicPartition).get
assertFalse(log.partitionMetadataFile.isEmpty)
assertTrue(log.partitionMetadataFile.get.isEmpty())
// The file has no contents if the topic has the default UUID.
replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(0, topic, ApiKeys.LEADER_AND_ISR.latestVersion), (_, _) => ())
assertFalse(replicaManager.localLog(topicPartition).isEmpty)
val log2 = replicaManager.localLog(topicPartition).get
assertFalse(log2.partitionMetadataFile.isEmpty)
assertTrue(log2.partitionMetadataFile.get.isEmpty())
// The file has no contents if the request is an older version
replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(0, "foo", 0), (_, _) => ())
assertFalse(replicaManager.localLog(topicPartitionFoo).isEmpty)
val log3 = replicaManager.localLog(topicPartitionFoo).get
assertFalse(log3.partitionMetadataFile.isEmpty)
assertTrue(log3.partitionMetadataFile.get.isEmpty())
// The file has no contents if the request is an older version
replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(0, "foo", 4), (_, _) => ())
assertFalse(replicaManager.localLog(topicPartitionFoo).isEmpty)
val log4 = replicaManager.localLog(topicPartitionFoo).get
assertFalse(log4.partitionMetadataFile.isEmpty)
assertTrue(log4.partitionMetadataFile.get.isEmpty())
} finally replicaManager.shutdown(checkpointHW = false)
}
}

View File

@ -62,6 +62,7 @@ class RequestQuotaTest extends BaseRequestTest {
private val topic = "topic-1"
private val numPartitions = 1
private val tp = new TopicPartition(topic, 0)
private val topicIds = Collections.singletonMap(topic, Uuid.randomUuid())
private val logDir = "logDir"
private val unthrottledClientId = "unthrottled-client"
private val smallQuotaProducerClientId = "small-quota-producer-client"
@ -254,6 +255,7 @@ class RequestQuotaTest extends BaseRequestTest {
.setZkVersion(2)
.setReplicas(Seq(brokerId).asJava)
.setIsNew(true)).asJava,
topicIds,
Set(new Node(brokerId, "localhost", 0)).asJava)
case ApiKeys.STOP_REPLICA =>

View File

@ -21,6 +21,7 @@ import kafka.utils.{CoreUtils, TestUtils}
import kafka.utils.TestUtils._
import java.io.{DataInputStream, File}
import java.net.ServerSocket
import java.util.Collections
import java.util.concurrent.{Executors, TimeUnit}
import kafka.cluster.Broker
@ -29,6 +30,7 @@ import kafka.log.LogManager
import kafka.zookeeper.ZooKeeperClientTimeoutException
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.Uuid
import org.apache.kafka.common.errors.KafkaStorageException
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.ListenerName
@ -233,7 +235,8 @@ class ServerShutdownTest extends ZooKeeperTestHarness {
// Initiate a sendRequest and wait until connection is established and one byte is received by the peer
val requestBuilder = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion,
controllerId, 1, 0L, Seq.empty.asJava, brokerAndEpochs.keys.map(_.node(listenerName)).toSet.asJava)
controllerId, 1, 0L, Seq.empty.asJava, Collections.singletonMap(topic, Uuid.randomUuid()),
brokerAndEpochs.keys.map(_.node(listenerName)).toSet.asJava)
controllerChannelManager.sendRequest(1, requestBuilder)
receiveFuture.get(10, TimeUnit.SECONDS)

View File

@ -1891,7 +1891,6 @@ public final class MessageDataGenerator implements MessageClassGenerator {
prefix, field.camelCaseName(), field.camelCaseName());
} else if (field.type().isStruct() ||
field.type() instanceof FieldType.UUIDFieldType) {
} else if (field.type().isStruct()) {
buffer.printf("+ \"%s%s=\" + %s.toString()%n",
prefix, field.camelCaseName(), field.camelCaseName());
} else if (field.type().isArray()) {