MINOR; Clean up LeaderAndIsrResponse construction in `ReplicaManager#becomeLeaderOrFollower` (#10234)

This patch refactors the code, which constructs the `LeaderAndIsrResponse` in `ReplicaManager#becomeLeaderOrFollower`, to improve the readability and to remove unnecessary operations.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
David Jacot 2021-03-04 10:31:35 +01:00 committed by GitHub
parent f06a47a7bb
commit 3ef39e1365
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 82 additions and 66 deletions

View File

@ -145,24 +145,21 @@ public class LeaderAndIsrRequest extends AbstractControlRequest {
.setErrorCode(error.code()));
}
responseData.setPartitionErrors(partitions);
return new LeaderAndIsrResponse(responseData, version());
}
List<LeaderAndIsrTopicError> topics = new ArrayList<>(data.topicStates().size());
Map<String, Uuid> topicIds = topicIds();
for (LeaderAndIsrTopicState topicState : data.topicStates()) {
LeaderAndIsrTopicError topicError = new LeaderAndIsrTopicError();
topicError.setTopicId(topicIds.get(topicState.topicName()));
List<LeaderAndIsrPartitionError> partitions = new ArrayList<>(topicState.partitionStates().size());
for (LeaderAndIsrPartitionState partition : topicState.partitionStates()) {
partitions.add(new LeaderAndIsrPartitionError()
} else {
for (LeaderAndIsrTopicState topicState : data.topicStates()) {
List<LeaderAndIsrPartitionError> partitions = new ArrayList<>(
topicState.partitionStates().size());
for (LeaderAndIsrPartitionState partition : topicState.partitionStates()) {
partitions.add(new LeaderAndIsrPartitionError()
.setPartitionIndex(partition.partitionIndex())
.setErrorCode(error.code()));
}
responseData.topics().add(new LeaderAndIsrTopicError()
.setTopicId(topicState.topicId())
.setPartitionErrors(partitions));
}
topicError.setPartitionErrors(partitions);
topics.add(topicError);
}
responseData.setTopics(topics);
return new LeaderAndIsrResponse(responseData, version());
}

View File

@ -20,13 +20,13 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.LeaderAndIsrResponseData;
import org.apache.kafka.common.message.LeaderAndIsrResponseData.LeaderAndIsrTopicError;
import org.apache.kafka.common.message.LeaderAndIsrResponseData.LeaderAndIsrTopicErrorCollection;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;
import java.util.HashMap;
import java.util.Map;
@ -39,7 +39,7 @@ public class LeaderAndIsrResponse extends AbstractResponse {
* STALE_BROKER_EPOCH (77)
*/
private final LeaderAndIsrResponseData data;
private short version;
private final short version;
public LeaderAndIsrResponse(LeaderAndIsrResponseData data, short version) {
super(ApiKeys.LEADER_AND_ISR);
@ -47,7 +47,7 @@ public class LeaderAndIsrResponse extends AbstractResponse {
this.version = version;
}
public List<LeaderAndIsrTopicError> topics() {
public LeaderAndIsrTopicErrorCollection topics() {
return this.data.topics();
}

View File

@ -36,7 +36,8 @@
"about": "Each partition in v0 to v4 message."},
{ "name": "Topics", "type": "[]LeaderAndIsrTopicError", "versions": "5+",
"about": "Each topic", "fields": [
{ "name": "TopicId", "type": "uuid", "versions": "5+", "about": "The unique topic ID" },
{ "name": "TopicId", "type": "uuid", "versions": "5+", "mapKey": true,
"about": "The unique topic ID" },
{ "name": "PartitionErrors", "type": "[]LeaderAndIsrPartitionError", "versions": "5+",
"about": "Each partition."}
]}

View File

@ -24,6 +24,8 @@ import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.LeaderAndIsrRequestData;
import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrLiveLeader;
import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState;
import org.apache.kafka.common.message.LeaderAndIsrResponseData.LeaderAndIsrPartitionError;
import org.apache.kafka.common.message.LeaderAndIsrResponseData.LeaderAndIsrTopicError;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.test.TestUtils;
@ -59,18 +61,41 @@ public class LeaderAndIsrRequestTest {
@Test
public void testGetErrorResponse() {
Uuid id = Uuid.randomUuid();
for (short version = LEADER_AND_ISR.oldestVersion(); version < LEADER_AND_ISR.latestVersion(); version++) {
LeaderAndIsrRequest.Builder builder = new LeaderAndIsrRequest.Builder(version, 0, 0, 0,
Collections.emptyList(), Collections.singletonMap("topic", id), Collections.emptySet());
LeaderAndIsrRequest request = builder.build();
Uuid topicId = Uuid.randomUuid();
String topicName = "topic";
int partition = 0;
for (short version = LEADER_AND_ISR.oldestVersion(); version <= LEADER_AND_ISR.latestVersion(); version++) {
LeaderAndIsrRequest request = new LeaderAndIsrRequest.Builder(version, 0, 0, 0,
Collections.singletonList(new LeaderAndIsrPartitionState()
.setTopicName(topicName)
.setPartitionIndex(partition)),
Collections.singletonMap(topicName, topicId),
Collections.emptySet()
).build(version);
LeaderAndIsrResponse response = request.getErrorResponse(0,
new ClusterAuthorizationException("Not authorized"));
new ClusterAuthorizationException("Not authorized"));
assertEquals(Errors.CLUSTER_AUTHORIZATION_FAILED, response.error());
if (version < 5) {
assertEquals(0, response.topics().size());
assertEquals(
Collections.singletonList(new LeaderAndIsrPartitionError()
.setTopicName(topicName)
.setPartitionIndex(partition)
.setErrorCode(Errors.CLUSTER_AUTHORIZATION_FAILED.code())),
response.data().partitionErrors());
assertEquals(0, response.data().topics().size());
} else {
assertEquals(id, response.topics().get(0).topicId());
LeaderAndIsrTopicError topicState = response.topics().find(topicId);
assertEquals(topicId, topicState.topicId());
assertEquals(
Collections.singletonList(new LeaderAndIsrPartitionError()
.setPartitionIndex(partition)
.setErrorCode(Errors.CLUSTER_AUTHORIZATION_FAILED.code())),
topicState.partitionErrors());
assertEquals(0, response.data().partitionErrors().size());
}
}
}

View File

@ -21,6 +21,7 @@ import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrParti
import org.apache.kafka.common.message.LeaderAndIsrResponseData;
import org.apache.kafka.common.message.LeaderAndIsrResponseData.LeaderAndIsrTopicError;
import org.apache.kafka.common.message.LeaderAndIsrResponseData.LeaderAndIsrPartitionError;
import org.apache.kafka.common.message.LeaderAndIsrResponseData.LeaderAndIsrTopicErrorCollection;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.junit.jupiter.api.Test;
@ -80,7 +81,7 @@ public class LeaderAndIsrResponseTest {
.setPartitionErrors(partitions), version);
} else {
Uuid id = Uuid.randomUuid();
List<LeaderAndIsrTopicError> topics = createTopic(id, asList(Errors.NONE, Errors.NOT_LEADER_OR_FOLLOWER));
LeaderAndIsrTopicErrorCollection topics = createTopic(id, asList(Errors.NONE, Errors.NOT_LEADER_OR_FOLLOWER));
response = new LeaderAndIsrResponse(new LeaderAndIsrResponseData()
.setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code())
.setTopics(topics), version);
@ -101,7 +102,7 @@ public class LeaderAndIsrResponseTest {
.setPartitionErrors(partitions), version);
} else {
Uuid id = Uuid.randomUuid();
List<LeaderAndIsrTopicError> topics = createTopic(id, asList(Errors.NONE, Errors.CLUSTER_AUTHORIZATION_FAILED));
LeaderAndIsrTopicErrorCollection topics = createTopic(id, asList(Errors.NONE, Errors.CLUSTER_AUTHORIZATION_FAILED));
response = new LeaderAndIsrResponse(new LeaderAndIsrResponseData()
.setErrorCode(Errors.NONE.code())
.setTopics(topics), version);
@ -130,7 +131,7 @@ public class LeaderAndIsrResponseTest {
} else {
Uuid id = Uuid.randomUuid();
List<LeaderAndIsrTopicError> topics = createTopic(id, asList(Errors.NONE, Errors.CLUSTER_AUTHORIZATION_FAILED));
LeaderAndIsrTopicErrorCollection topics = createTopic(id, asList(Errors.NONE, Errors.CLUSTER_AUTHORIZATION_FAILED));
response = new LeaderAndIsrResponse(new LeaderAndIsrResponseData()
.setErrorCode(Errors.NONE.code())
.setTopics(topics), version);
@ -155,8 +156,8 @@ public class LeaderAndIsrResponseTest {
return partitions;
}
private List<LeaderAndIsrTopicError> createTopic(Uuid id, List<Errors> errors) {
List<LeaderAndIsrTopicError> topics = new ArrayList<>();
private LeaderAndIsrTopicErrorCollection createTopic(Uuid id, List<Errors> errors) {
LeaderAndIsrTopicErrorCollection topics = new LeaderAndIsrTopicErrorCollection();
LeaderAndIsrTopicError topic = new LeaderAndIsrTopicError();
topic.setTopicId(id);
List<LeaderAndIsrPartitionError> partitions = new ArrayList<>();

View File

@ -126,6 +126,7 @@ import org.apache.kafka.common.message.JoinGroupResponseData;
import org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember;
import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState;
import org.apache.kafka.common.message.LeaderAndIsrResponseData;
import org.apache.kafka.common.message.LeaderAndIsrResponseData.LeaderAndIsrTopicErrorCollection;
import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity;
import org.apache.kafka.common.message.LeaveGroupResponseData;
import org.apache.kafka.common.message.ListGroupsRequestData;
@ -1685,7 +1686,7 @@ public class RequestResponseTest {
new LeaderAndIsrResponseData.LeaderAndIsrPartitionError()
.setPartitionIndex(0)
.setErrorCode(Errors.NONE.code()));
List<LeaderAndIsrResponseData.LeaderAndIsrTopicError> topics = new ArrayList<>();
LeaderAndIsrTopicErrorCollection topics = new LeaderAndIsrTopicErrorCollection();
topics.add(new LeaderAndIsrResponseData.LeaderAndIsrTopicError()
.setTopicId(Uuid.randomUuid())
.setPartitionErrors(partition));

View File

@ -1441,38 +1441,29 @@ class ReplicaManager(val config: KafkaConfig,
replicaFetcherManager.shutdownIdleFetcherThreads()
replicaAlterLogDirsManager.shutdownIdleFetcherThreads()
onLeadershipChange(partitionsBecomeLeader, partitionsBecomeFollower)
if (leaderAndIsrRequest.version() < 5) {
val responsePartitions = responseMap.iterator.map { case (tp, error) =>
new LeaderAndIsrPartitionError()
val data = new LeaderAndIsrResponseData().setErrorCode(Errors.NONE.code)
if (leaderAndIsrRequest.version < 5) {
responseMap.forKeyValue { (tp, error) =>
data.partitionErrors.add(new LeaderAndIsrPartitionError()
.setTopicName(tp.topic)
.setPartitionIndex(tp.partition)
.setErrorCode(error.code)
}.toBuffer
new LeaderAndIsrResponse(new LeaderAndIsrResponseData()
.setErrorCode(Errors.NONE.code)
.setPartitionErrors(responsePartitions.asJava), leaderAndIsrRequest.version())
} else {
val topics = new mutable.HashMap[String, List[LeaderAndIsrPartitionError]]
responseMap.asJava.forEach { case (tp, error) =>
if (!topics.contains(tp.topic)) {
topics.put(tp.topic, List(new LeaderAndIsrPartitionError()
.setPartitionIndex(tp.partition)
.setErrorCode(error.code)))
} else {
topics.put(tp.topic, new LeaderAndIsrPartitionError()
.setPartitionIndex(tp.partition)
.setErrorCode(error.code)::topics(tp.topic))
}
.setErrorCode(error.code))
}
} else {
responseMap.forKeyValue { (tp, error) =>
val topicId = topicIds.get(tp.topic)
var topic = data.topics.find(topicId)
if (topic == null) {
topic = new LeaderAndIsrTopicError().setTopicId(topicId)
data.topics.add(topic)
}
topic.partitionErrors.add(new LeaderAndIsrPartitionError()
.setPartitionIndex(tp.partition)
.setErrorCode(error.code))
}
val topicErrors = topics.iterator.map { case (topic, partitionError) =>
new LeaderAndIsrTopicError()
.setTopicId(topicIds.get(topic))
.setPartitionErrors(partitionError.asJava)
}.toBuffer
new LeaderAndIsrResponse(new LeaderAndIsrResponseData()
.setErrorCode(Errors.NONE.code)
.setTopics(topicErrors.asJava), leaderAndIsrRequest.version())
}
new LeaderAndIsrResponse(data, leaderAndIsrRequest.version)
}
}
val endMs = time.milliseconds()

View File

@ -847,17 +847,17 @@ class ControllerChannelManagerTest {
sentRequests.filter(_.request.apiKey == ApiKeys.LEADER_AND_ISR).filter(_.responseCallback != null).foreach { sentRequest =>
val leaderAndIsrRequest = sentRequest.request.build().asInstanceOf[LeaderAndIsrRequest]
val topicIds = leaderAndIsrRequest.topicIds
val topicErrors = leaderAndIsrRequest.data.topicStates.asScala.map(t =>
new LeaderAndIsrTopicError()
val data = new LeaderAndIsrResponseData()
.setErrorCode(error.code)
leaderAndIsrRequest.data.topicStates.asScala.foreach { t =>
data.topics.add(new LeaderAndIsrTopicError()
.setTopicId(topicIds.get(t.topicName))
.setPartitionErrors(t.partitionStates.asScala.map(p =>
new LeaderAndIsrPartitionError()
.setPartitionIndex(p.partitionIndex)
.setErrorCode(error.code)).asJava))
val leaderAndIsrResponse = new LeaderAndIsrResponse(
new LeaderAndIsrResponseData()
.setErrorCode(error.code)
.setTopics(topicErrors.toBuffer.asJava), leaderAndIsrRequest.version())
}
val leaderAndIsrResponse = new LeaderAndIsrResponse(data, leaderAndIsrRequest.version)
sentRequest.responseCallback(leaderAndIsrResponse)
}
}