diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java index 939212a16ea..d7382863176 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java @@ -145,24 +145,21 @@ public class LeaderAndIsrRequest extends AbstractControlRequest { .setErrorCode(error.code())); } responseData.setPartitionErrors(partitions); - return new LeaderAndIsrResponse(responseData, version()); - } - - List topics = new ArrayList<>(data.topicStates().size()); - Map topicIds = topicIds(); - for (LeaderAndIsrTopicState topicState : data.topicStates()) { - LeaderAndIsrTopicError topicError = new LeaderAndIsrTopicError(); - topicError.setTopicId(topicIds.get(topicState.topicName())); - List partitions = new ArrayList<>(topicState.partitionStates().size()); - for (LeaderAndIsrPartitionState partition : topicState.partitionStates()) { - partitions.add(new LeaderAndIsrPartitionError() + } else { + for (LeaderAndIsrTopicState topicState : data.topicStates()) { + List partitions = new ArrayList<>( + topicState.partitionStates().size()); + for (LeaderAndIsrPartitionState partition : topicState.partitionStates()) { + partitions.add(new LeaderAndIsrPartitionError() .setPartitionIndex(partition.partitionIndex()) .setErrorCode(error.code())); + } + responseData.topics().add(new LeaderAndIsrTopicError() + .setTopicId(topicState.topicId()) + .setPartitionErrors(partitions)); } - topicError.setPartitionErrors(partitions); - topics.add(topicError); } - responseData.setTopics(topics); + return new LeaderAndIsrResponse(responseData, version()); } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java index 490983f1b5d..c7c04e2d99b 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java @@ -20,13 +20,13 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.message.LeaderAndIsrResponseData; import org.apache.kafka.common.message.LeaderAndIsrResponseData.LeaderAndIsrTopicError; +import org.apache.kafka.common.message.LeaderAndIsrResponseData.LeaderAndIsrTopicErrorCollection; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.ByteBufferAccessor; import org.apache.kafka.common.protocol.Errors; import java.nio.ByteBuffer; import java.util.Collections; -import java.util.List; import java.util.HashMap; import java.util.Map; @@ -39,7 +39,7 @@ public class LeaderAndIsrResponse extends AbstractResponse { * STALE_BROKER_EPOCH (77) */ private final LeaderAndIsrResponseData data; - private short version; + private final short version; public LeaderAndIsrResponse(LeaderAndIsrResponseData data, short version) { super(ApiKeys.LEADER_AND_ISR); @@ -47,7 +47,7 @@ public class LeaderAndIsrResponse extends AbstractResponse { this.version = version; } - public List topics() { + public LeaderAndIsrTopicErrorCollection topics() { return this.data.topics(); } diff --git a/clients/src/main/resources/common/message/LeaderAndIsrResponse.json b/clients/src/main/resources/common/message/LeaderAndIsrResponse.json index dc5879b1436..958448be274 100644 --- a/clients/src/main/resources/common/message/LeaderAndIsrResponse.json +++ b/clients/src/main/resources/common/message/LeaderAndIsrResponse.json @@ -36,7 +36,8 @@ "about": "Each partition in v0 to v4 message."}, { "name": "Topics", "type": "[]LeaderAndIsrTopicError", "versions": "5+", "about": "Each topic", "fields": [ - { "name": "TopicId", "type": "uuid", "versions": "5+", "about": "The unique topic ID" }, + { "name": "TopicId", "type": "uuid", "versions": "5+", "mapKey": true, + "about": "The unique topic ID" }, { "name": "PartitionErrors", "type": "[]LeaderAndIsrPartitionError", "versions": "5+", "about": "Each partition."} ]} diff --git a/clients/src/test/java/org/apache/kafka/common/requests/LeaderAndIsrRequestTest.java b/clients/src/test/java/org/apache/kafka/common/requests/LeaderAndIsrRequestTest.java index 9f636d7ef8b..4fe51a0dccd 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/LeaderAndIsrRequestTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/LeaderAndIsrRequestTest.java @@ -24,6 +24,8 @@ import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.message.LeaderAndIsrRequestData; import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrLiveLeader; import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState; +import org.apache.kafka.common.message.LeaderAndIsrResponseData.LeaderAndIsrPartitionError; +import org.apache.kafka.common.message.LeaderAndIsrResponseData.LeaderAndIsrTopicError; import org.apache.kafka.common.protocol.ByteBufferAccessor; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.test.TestUtils; @@ -59,18 +61,41 @@ public class LeaderAndIsrRequestTest { @Test public void testGetErrorResponse() { - Uuid id = Uuid.randomUuid(); - for (short version = LEADER_AND_ISR.oldestVersion(); version < LEADER_AND_ISR.latestVersion(); version++) { - LeaderAndIsrRequest.Builder builder = new LeaderAndIsrRequest.Builder(version, 0, 0, 0, - Collections.emptyList(), Collections.singletonMap("topic", id), Collections.emptySet()); - LeaderAndIsrRequest request = builder.build(); + Uuid topicId = Uuid.randomUuid(); + String topicName = "topic"; + int partition = 0; + + for (short version = LEADER_AND_ISR.oldestVersion(); version <= LEADER_AND_ISR.latestVersion(); version++) { + LeaderAndIsrRequest request = new LeaderAndIsrRequest.Builder(version, 0, 0, 0, + Collections.singletonList(new LeaderAndIsrPartitionState() + .setTopicName(topicName) + .setPartitionIndex(partition)), + Collections.singletonMap(topicName, topicId), + Collections.emptySet() + ).build(version); + LeaderAndIsrResponse response = request.getErrorResponse(0, - new ClusterAuthorizationException("Not authorized")); + new ClusterAuthorizationException("Not authorized")); + assertEquals(Errors.CLUSTER_AUTHORIZATION_FAILED, response.error()); + if (version < 5) { - assertEquals(0, response.topics().size()); + assertEquals( + Collections.singletonList(new LeaderAndIsrPartitionError() + .setTopicName(topicName) + .setPartitionIndex(partition) + .setErrorCode(Errors.CLUSTER_AUTHORIZATION_FAILED.code())), + response.data().partitionErrors()); + assertEquals(0, response.data().topics().size()); } else { - assertEquals(id, response.topics().get(0).topicId()); + LeaderAndIsrTopicError topicState = response.topics().find(topicId); + assertEquals(topicId, topicState.topicId()); + assertEquals( + Collections.singletonList(new LeaderAndIsrPartitionError() + .setPartitionIndex(partition) + .setErrorCode(Errors.CLUSTER_AUTHORIZATION_FAILED.code())), + topicState.partitionErrors()); + assertEquals(0, response.data().partitionErrors().size()); } } } diff --git a/clients/src/test/java/org/apache/kafka/common/requests/LeaderAndIsrResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/LeaderAndIsrResponseTest.java index 3f6a22496ec..9ae2fdb4204 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/LeaderAndIsrResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/LeaderAndIsrResponseTest.java @@ -21,6 +21,7 @@ import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrParti import org.apache.kafka.common.message.LeaderAndIsrResponseData; import org.apache.kafka.common.message.LeaderAndIsrResponseData.LeaderAndIsrTopicError; import org.apache.kafka.common.message.LeaderAndIsrResponseData.LeaderAndIsrPartitionError; +import org.apache.kafka.common.message.LeaderAndIsrResponseData.LeaderAndIsrTopicErrorCollection; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; import org.junit.jupiter.api.Test; @@ -80,7 +81,7 @@ public class LeaderAndIsrResponseTest { .setPartitionErrors(partitions), version); } else { Uuid id = Uuid.randomUuid(); - List topics = createTopic(id, asList(Errors.NONE, Errors.NOT_LEADER_OR_FOLLOWER)); + LeaderAndIsrTopicErrorCollection topics = createTopic(id, asList(Errors.NONE, Errors.NOT_LEADER_OR_FOLLOWER)); response = new LeaderAndIsrResponse(new LeaderAndIsrResponseData() .setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code()) .setTopics(topics), version); @@ -101,7 +102,7 @@ public class LeaderAndIsrResponseTest { .setPartitionErrors(partitions), version); } else { Uuid id = Uuid.randomUuid(); - List topics = createTopic(id, asList(Errors.NONE, Errors.CLUSTER_AUTHORIZATION_FAILED)); + LeaderAndIsrTopicErrorCollection topics = createTopic(id, asList(Errors.NONE, Errors.CLUSTER_AUTHORIZATION_FAILED)); response = new LeaderAndIsrResponse(new LeaderAndIsrResponseData() .setErrorCode(Errors.NONE.code()) .setTopics(topics), version); @@ -130,7 +131,7 @@ public class LeaderAndIsrResponseTest { } else { Uuid id = Uuid.randomUuid(); - List topics = createTopic(id, asList(Errors.NONE, Errors.CLUSTER_AUTHORIZATION_FAILED)); + LeaderAndIsrTopicErrorCollection topics = createTopic(id, asList(Errors.NONE, Errors.CLUSTER_AUTHORIZATION_FAILED)); response = new LeaderAndIsrResponse(new LeaderAndIsrResponseData() .setErrorCode(Errors.NONE.code()) .setTopics(topics), version); @@ -155,8 +156,8 @@ public class LeaderAndIsrResponseTest { return partitions; } - private List createTopic(Uuid id, List errors) { - List topics = new ArrayList<>(); + private LeaderAndIsrTopicErrorCollection createTopic(Uuid id, List errors) { + LeaderAndIsrTopicErrorCollection topics = new LeaderAndIsrTopicErrorCollection(); LeaderAndIsrTopicError topic = new LeaderAndIsrTopicError(); topic.setTopicId(id); List partitions = new ArrayList<>(); diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index d873d152073..228f32afcea 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -126,6 +126,7 @@ import org.apache.kafka.common.message.JoinGroupResponseData; import org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember; import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState; import org.apache.kafka.common.message.LeaderAndIsrResponseData; +import org.apache.kafka.common.message.LeaderAndIsrResponseData.LeaderAndIsrTopicErrorCollection; import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity; import org.apache.kafka.common.message.LeaveGroupResponseData; import org.apache.kafka.common.message.ListGroupsRequestData; @@ -1685,7 +1686,7 @@ public class RequestResponseTest { new LeaderAndIsrResponseData.LeaderAndIsrPartitionError() .setPartitionIndex(0) .setErrorCode(Errors.NONE.code())); - List topics = new ArrayList<>(); + LeaderAndIsrTopicErrorCollection topics = new LeaderAndIsrTopicErrorCollection(); topics.add(new LeaderAndIsrResponseData.LeaderAndIsrTopicError() .setTopicId(Uuid.randomUuid()) .setPartitionErrors(partition)); diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 820af7b6ece..87b44620853 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -1441,38 +1441,29 @@ class ReplicaManager(val config: KafkaConfig, replicaFetcherManager.shutdownIdleFetcherThreads() replicaAlterLogDirsManager.shutdownIdleFetcherThreads() onLeadershipChange(partitionsBecomeLeader, partitionsBecomeFollower) - if (leaderAndIsrRequest.version() < 5) { - val responsePartitions = responseMap.iterator.map { case (tp, error) => - new LeaderAndIsrPartitionError() + + val data = new LeaderAndIsrResponseData().setErrorCode(Errors.NONE.code) + if (leaderAndIsrRequest.version < 5) { + responseMap.forKeyValue { (tp, error) => + data.partitionErrors.add(new LeaderAndIsrPartitionError() .setTopicName(tp.topic) .setPartitionIndex(tp.partition) - .setErrorCode(error.code) - }.toBuffer - new LeaderAndIsrResponse(new LeaderAndIsrResponseData() - .setErrorCode(Errors.NONE.code) - .setPartitionErrors(responsePartitions.asJava), leaderAndIsrRequest.version()) - } else { - val topics = new mutable.HashMap[String, List[LeaderAndIsrPartitionError]] - responseMap.asJava.forEach { case (tp, error) => - if (!topics.contains(tp.topic)) { - topics.put(tp.topic, List(new LeaderAndIsrPartitionError() - .setPartitionIndex(tp.partition) - .setErrorCode(error.code))) - } else { - topics.put(tp.topic, new LeaderAndIsrPartitionError() - .setPartitionIndex(tp.partition) - .setErrorCode(error.code)::topics(tp.topic)) - } + .setErrorCode(error.code)) + } + } else { + responseMap.forKeyValue { (tp, error) => + val topicId = topicIds.get(tp.topic) + var topic = data.topics.find(topicId) + if (topic == null) { + topic = new LeaderAndIsrTopicError().setTopicId(topicId) + data.topics.add(topic) + } + topic.partitionErrors.add(new LeaderAndIsrPartitionError() + .setPartitionIndex(tp.partition) + .setErrorCode(error.code)) } - val topicErrors = topics.iterator.map { case (topic, partitionError) => - new LeaderAndIsrTopicError() - .setTopicId(topicIds.get(topic)) - .setPartitionErrors(partitionError.asJava) - }.toBuffer - new LeaderAndIsrResponse(new LeaderAndIsrResponseData() - .setErrorCode(Errors.NONE.code) - .setTopics(topicErrors.asJava), leaderAndIsrRequest.version()) } + new LeaderAndIsrResponse(data, leaderAndIsrRequest.version) } } val endMs = time.milliseconds() diff --git a/core/src/test/scala/unit/kafka/controller/ControllerChannelManagerTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerChannelManagerTest.scala index cb494e62338..8a816865b45 100644 --- a/core/src/test/scala/unit/kafka/controller/ControllerChannelManagerTest.scala +++ b/core/src/test/scala/unit/kafka/controller/ControllerChannelManagerTest.scala @@ -847,17 +847,17 @@ class ControllerChannelManagerTest { sentRequests.filter(_.request.apiKey == ApiKeys.LEADER_AND_ISR).filter(_.responseCallback != null).foreach { sentRequest => val leaderAndIsrRequest = sentRequest.request.build().asInstanceOf[LeaderAndIsrRequest] val topicIds = leaderAndIsrRequest.topicIds - val topicErrors = leaderAndIsrRequest.data.topicStates.asScala.map(t => - new LeaderAndIsrTopicError() + val data = new LeaderAndIsrResponseData() + .setErrorCode(error.code) + leaderAndIsrRequest.data.topicStates.asScala.foreach { t => + data.topics.add(new LeaderAndIsrTopicError() .setTopicId(topicIds.get(t.topicName)) .setPartitionErrors(t.partitionStates.asScala.map(p => new LeaderAndIsrPartitionError() .setPartitionIndex(p.partitionIndex) .setErrorCode(error.code)).asJava)) - val leaderAndIsrResponse = new LeaderAndIsrResponse( - new LeaderAndIsrResponseData() - .setErrorCode(error.code) - .setTopics(topicErrors.toBuffer.asJava), leaderAndIsrRequest.version()) + } + val leaderAndIsrResponse = new LeaderAndIsrResponse(data, leaderAndIsrRequest.version) sentRequest.responseCallback(leaderAndIsrResponse) } }