MINOR; Revert "KAFKA-18681: Created GetReplicaLogInfo RPCs (#19664)" (#20371)

This reverts commit d86ba7f54a.

Reverting since we are planning to change how KIP-966 is implemented. We
should revert this RPC until we have more clarity on how this KIP will
be executed.

Reviewers: José Armando García Sancio <jsancio@apache.org>
This commit is contained in:
Jonah Hooper 2025-09-05 11:31:50 -04:00 committed by GitHub
parent f922ff6d1f
commit 29ce96151c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 6 additions and 760 deletions

View File

@ -134,8 +134,8 @@ public enum ApiKeys {
STREAMS_GROUP_DESCRIBE(ApiMessageType.STREAMS_GROUP_DESCRIBE),
DESCRIBE_SHARE_GROUP_OFFSETS(ApiMessageType.DESCRIBE_SHARE_GROUP_OFFSETS),
ALTER_SHARE_GROUP_OFFSETS(ApiMessageType.ALTER_SHARE_GROUP_OFFSETS),
DELETE_SHARE_GROUP_OFFSETS(ApiMessageType.DELETE_SHARE_GROUP_OFFSETS),
GET_REPLICA_LOG_INFO(ApiMessageType.GET_REPLICA_LOG_INFO);
DELETE_SHARE_GROUP_OFFSETS(ApiMessageType.DELETE_SHARE_GROUP_OFFSETS);
private static final Map<ApiMessageType.ListenerType, EnumSet<ApiKeys>> APIS_BY_LISTENER =
new EnumMap<>(ApiMessageType.ListenerType.class);

View File

@ -354,8 +354,6 @@ public abstract class AbstractRequest implements AbstractRequestResponse {
return AlterShareGroupOffsetsRequest.parse(readable, apiVersion);
case DELETE_SHARE_GROUP_OFFSETS:
return DeleteShareGroupOffsetsRequest.parse(readable, apiVersion);
case GET_REPLICA_LOG_INFO:
return GetReplicaLogInfoRequest.parse(readable, apiVersion);
default:
throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseRequest`, the " +
"code should be updated to do so.", apiKey));

View File

@ -291,8 +291,6 @@ public abstract class AbstractResponse implements AbstractRequestResponse {
return AlterShareGroupOffsetsResponse.parse(readable, version);
case DELETE_SHARE_GROUP_OFFSETS:
return DeleteShareGroupOffsetsResponse.parse(readable, version);
case GET_REPLICA_LOG_INFO:
return GetReplicaLogInfoResponse.parse(readable, version);
default:
throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseResponse`, the " +
"code should be updated to do so.", apiKey));

View File

@ -1,98 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.requests;
import org.apache.kafka.common.message.GetReplicaLogInfoRequestData;
import org.apache.kafka.common.message.GetReplicaLogInfoResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.Readable;
import java.util.ArrayList;
import java.util.List;
public class GetReplicaLogInfoRequest extends AbstractRequest {
public static final int MAX_PARTITIONS_PER_REQUEST = 1000;
public static class Builder extends AbstractRequest.Builder<GetReplicaLogInfoRequest> {
private final GetReplicaLogInfoRequestData data;
public Builder(GetReplicaLogInfoRequestData data) {
super(ApiKeys.GET_REPLICA_LOG_INFO);
this.data = data;
}
public Builder(List<GetReplicaLogInfoRequestData.TopicPartitions> topicPartitions) {
super(ApiKeys.GET_REPLICA_LOG_INFO, ApiKeys.GET_REPLICA_LOG_INFO.oldestVersion(),
ApiKeys.GET_REPLICA_LOG_INFO.latestVersion());
GetReplicaLogInfoRequestData data = new GetReplicaLogInfoRequestData();
data.setTopicPartitions(topicPartitions);
this.data = data;
}
@Override
public GetReplicaLogInfoRequest build(short version) {
return new GetReplicaLogInfoRequest(data, version);
}
@Override
public String toString() {
return data.toString();
}
}
private final GetReplicaLogInfoRequestData data;
public GetReplicaLogInfoRequest(GetReplicaLogInfoRequestData data, short version) {
super(ApiKeys.GET_REPLICA_LOG_INFO, version);
this.data = data;
}
@Override
public GetReplicaLogInfoRequestData data() {
return data;
}
@Override
public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
Errors error = Errors.forException(e);
GetReplicaLogInfoResponseData responseData = new GetReplicaLogInfoResponseData();
for (GetReplicaLogInfoRequestData.TopicPartitions topicPartition : data().topicPartitions()) {
ArrayList<GetReplicaLogInfoResponseData.PartitionLogInfo> partitionLogInfos = new ArrayList<>(topicPartition.partitions().size());
for (Integer partition: topicPartition.partitions()) {
partitionLogInfos.add(new GetReplicaLogInfoResponseData.PartitionLogInfo()
.setPartition(partition)
.setErrorCode(error.code())
);
}
responseData.topicPartitionLogInfoList().add(new GetReplicaLogInfoResponseData.TopicPartitionLogInfo()
.setTopicId(topicPartition.topicId())
.setPartitionLogInfo(partitionLogInfos));
}
return new GetReplicaLogInfoResponse(responseData);
}
public static GetReplicaLogInfoRequest parse(Readable readable, short version) {
return new GetReplicaLogInfoRequest(
new GetReplicaLogInfoRequestData(readable, version),
version
);
}
}

View File

@ -1,75 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.requests;
import org.apache.kafka.common.message.GetReplicaLogInfoResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.Readable;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class GetReplicaLogInfoResponse extends AbstractResponse {
private final GetReplicaLogInfoResponseData data;
public GetReplicaLogInfoResponse(GetReplicaLogInfoResponseData data) {
super(ApiKeys.GET_REPLICA_LOG_INFO);
this.data = data;
}
@Override
public GetReplicaLogInfoResponseData data() {
return data;
}
@Override
public int throttleTimeMs() {
return data.throttleTimeMs();
}
@Override
public void maybeSetThrottleTimeMs(int throttleTimeMs) {
data.setThrottleTimeMs(throttleTimeMs);
}
@Override
public Map<Errors, Integer> errorCounts() {
Map<Errors, Integer> errorCounts = new HashMap<>();
data.topicPartitionLogInfoList().forEach(topicPartitionLogInfo -> {
topicPartitionLogInfo.partitionLogInfo().forEach(partitionLogInfo -> {
updateErrorCounts(errorCounts, Errors.forCode(partitionLogInfo.errorCode()));
});
});
return errorCounts;
}
public static GetReplicaLogInfoResponse prepareResponse(List<GetReplicaLogInfoResponseData.TopicPartitionLogInfo> topicPartitionLogInfoList) {
GetReplicaLogInfoResponseData responseData = new GetReplicaLogInfoResponseData();
topicPartitionLogInfoList.forEach(topicPartitionLogInfo -> {
responseData.topicPartitionLogInfoList().add(topicPartitionLogInfo);
});
return new GetReplicaLogInfoResponse(responseData);
}
public static GetReplicaLogInfoResponse parse(Readable readable, short version) {
return new GetReplicaLogInfoResponse(new GetReplicaLogInfoResponseData(readable, version));
}
}

View File

@ -1,32 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
{
"apiKey":93,
"type": "request",
"listeners": ["broker"],
"name": "GetReplicaLogInfoRequest",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "TopicPartitions", "type": "[]TopicPartitions", "versions": "0+", "nullableVersions": "0+",
"about": "The topic partitions to query the log info for.",
"fields": [
{ "name": "TopicId", "type": "uuid", "versions": "0+", "about": "The unique topic ID"},
{ "name": "Partitions", "type": "[]int32", "versions": "0+",
"about": "The partitions of this topic whose leader should be elected." }
]}
]
}

View File

@ -1,43 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
{
"apiKey":93,
"type": "response",
"name": "GetReplicaLogInfoResponse",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
{ "name": "BrokerEpoch", "type": "int64", "versions": "0+",
"about": "The epoch of the broker." },
{ "name": "HasMoreData", "type": "bool", "versions": "0+",
"about": "True if response does not include all the topic partitions requested. Only the first 1000 topic partitions are returned."},
{ "name": "TopicPartitionLogInfoList", "type": "[]TopicPartitionLogInfo", "versions": "0+",
"about": "The list of the partition log info.",
"fields": [
{ "name": "TopicId", "type": "uuid", "versions": "0+", "about": "The unique topic ID."},
{ "name": "PartitionLogInfo", "type": "[]PartitionLogInfo", "versions": "0+", "about": "The log info of a partition.",
"fields": [
{ "name": "Partition", "type": "int32", "versions": "0+", "about": "The id for the partition." },
{ "name": "LastWrittenLeaderEpoch", "type": "int32", "versions": "0+", "about": "The last written leader epoch in the log." },
{ "name": "CurrentLeaderEpoch", "type": "int32", "versions": "0+", "about": "The current leader epoch for the partition from the broker point of view." },
{ "name": "LogEndOffset", "type": "int64", "versions": "0+", "about": "The log end offset for the partition." },
{ "name": "ErrorCode", "type": "int16", "versions": "0+", "about": "The result error, or zero if there was no error."}
]}
]}
]
}

View File

@ -157,8 +157,6 @@ import org.apache.kafka.common.message.FetchResponseData;
import org.apache.kafka.common.message.FetchSnapshotRequestData;
import org.apache.kafka.common.message.FetchSnapshotResponseData;
import org.apache.kafka.common.message.FindCoordinatorRequestData;
import org.apache.kafka.common.message.GetReplicaLogInfoRequestData;
import org.apache.kafka.common.message.GetReplicaLogInfoResponseData;
import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData;
import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData;
import org.apache.kafka.common.message.HeartbeatRequestData;
@ -1077,7 +1075,6 @@ public class RequestResponseTest {
case DESCRIBE_SHARE_GROUP_OFFSETS: return createDescribeShareGroupOffsetsRequest(version);
case ALTER_SHARE_GROUP_OFFSETS: return createAlterShareGroupOffsetsRequest(version);
case DELETE_SHARE_GROUP_OFFSETS: return createDeleteShareGroupOffsetsRequest(version);
case GET_REPLICA_LOG_INFO: return createGetReplicaLogInfoRequest(version);
default: throw new IllegalArgumentException("Unknown API key " + apikey);
}
}
@ -1173,25 +1170,10 @@ public class RequestResponseTest {
case DESCRIBE_SHARE_GROUP_OFFSETS: return createDescribeShareGroupOffsetsResponse();
case ALTER_SHARE_GROUP_OFFSETS: return createAlterShareGroupOffsetsResponse();
case DELETE_SHARE_GROUP_OFFSETS: return createDeleteShareGroupOffsetsResponse();
case GET_REPLICA_LOG_INFO: return createGetReplicaLogInfoResponse();
default: throw new IllegalArgumentException("Unknown API key " + apikey);
}
}
private GetReplicaLogInfoRequest createGetReplicaLogInfoRequest(short version) {
GetReplicaLogInfoRequestData data = new GetReplicaLogInfoRequestData()
.setTopicPartitions(singletonList(new GetReplicaLogInfoRequestData.TopicPartitions()
.setPartitions(singletonList(0))));
return new GetReplicaLogInfoRequest.Builder(data).build(version);
}
private GetReplicaLogInfoResponse createGetReplicaLogInfoResponse() {
GetReplicaLogInfoResponseData data = new GetReplicaLogInfoResponseData();
data.setBrokerEpoch(0);
data.setTopicPartitionLogInfoList(singletonList(new GetReplicaLogInfoResponseData.TopicPartitionLogInfo()));
return new GetReplicaLogInfoResponse(data);
}
private ConsumerGroupDescribeRequest createConsumerGroupDescribeRequest(short version) {
ConsumerGroupDescribeRequestData data = new ConsumerGroupDescribeRequestData()
.setGroupIds(Collections.singletonList("group"))

View File

@ -44,7 +44,6 @@ import org.apache.kafka.storage.log.metrics.BrokerTopicStats;
import java.util.Map;
import java.util.Optional;
import java.util.function.Supplier;
import scala.jdk.javaapi.OptionConverters;
@ -72,7 +71,6 @@ public class KafkaApisBuilder {
private ClientMetricsManager clientMetricsManager = null;
private ShareCoordinator shareCoordinator = null;
private GroupConfigManager groupConfigManager = null;
private Supplier<Long> brokerEpochSupplier = () -> -1L;
public KafkaApisBuilder setRequestChannel(RequestChannel requestChannel) {
this.requestChannel = requestChannel;
@ -189,11 +187,6 @@ public class KafkaApisBuilder {
return this;
}
public KafkaApisBuilder setBrokerEpochSupplier(Supplier<Long> brokerEpochSupplier) {
this.brokerEpochSupplier = brokerEpochSupplier;
return this;
}
@SuppressWarnings({"CyclomaticComplexity"})
public KafkaApis build() {
if (requestChannel == null) throw new RuntimeException("you must set requestChannel");
@ -237,7 +230,6 @@ public class KafkaApisBuilder {
tokenManager,
apiVersionManager,
clientMetricsManager,
groupConfigManager,
brokerEpochSupplier);
groupConfigManager);
}
}

View File

@ -463,8 +463,7 @@ class BrokerServer(
tokenManager = tokenManager,
apiVersionManager = apiVersionManager,
clientMetricsManager = clientMetricsManager,
groupConfigManager = groupConfigManager,
brokerEpochSupplier = () => lifecycleManager.brokerEpoch)
groupConfigManager = groupConfigManager)
dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.nodeId,
socketServer.dataPlaneRequestChannel, dataPlaneRequestProcessor, time,

View File

@ -77,7 +77,6 @@ import java.util
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.{CompletableFuture, ConcurrentHashMap}
import java.util.stream.Collectors
import java.util.function.Supplier
import java.util.{Collections, Optional}
import scala.annotation.nowarn
import scala.collection.mutable.ArrayBuffer
@ -110,8 +109,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val tokenManager: DelegationTokenManager,
val apiVersionManager: ApiVersionManager,
val clientMetricsManager: ClientMetricsManager,
val groupConfigManager: GroupConfigManager,
val brokerEpochSupplier: Supplier[java.lang.Long]
val groupConfigManager: GroupConfigManager
) extends ApiRequestHandler with Logging {
type ProduceResponseStats = Map[TopicIdPartition, RecordValidationStats]
@ -247,7 +245,6 @@ class KafkaApis(val requestChannel: RequestChannel,
case ApiKeys.DELETE_SHARE_GROUP_OFFSETS => handleDeleteShareGroupOffsetsRequest(request).exceptionally(handleError)
case ApiKeys.STREAMS_GROUP_DESCRIBE => handleStreamsGroupDescribe(request).exceptionally(handleError)
case ApiKeys.STREAMS_GROUP_HEARTBEAT => handleStreamsGroupHeartbeat(request).exceptionally(handleError)
case ApiKeys.GET_REPLICA_LOG_INFO => handleGetReplicaLogInfo(request)
case _ => throw new IllegalStateException(s"No handler for request api key ${request.header.apiKey}")
}
} catch {
@ -265,79 +262,6 @@ class KafkaApis(val requestChannel: RequestChannel,
}
}
def handleGetReplicaLogInfo(request: RequestChannel.Request): Unit = {
var partitionCount = 0
def processPartitions(topicLogInfo: GetReplicaLogInfoResponseData.TopicPartitionLogInfo,
partitionIter: util.Iterator[Integer],
action: Integer => GetReplicaLogInfoResponseData.PartitionLogInfo): Unit = {
while (partitionIter.hasNext && partitionCount < GetReplicaLogInfoRequest.MAX_PARTITIONS_PER_REQUEST) {
topicLogInfo.partitionLogInfo().add(action(partitionIter.next()))
partitionCount += 1
}
}
val isAuthorizedClusterAction = authorizeClusterOperation(request, CLUSTER_ACTION)
def isAuthorized(topicName: String): Boolean =
isAuthorizedClusterAction || authHelper.authorize(request.context, DESCRIBE, TOPIC, topicName)
val getReplicaLogInfoRequest = request.body[GetReplicaLogInfoRequest]
val data = getReplicaLogInfoRequest.data()
val topicIter = data.topicPartitions().iterator()
var previousPartitionIter: Option[util.Iterator[Integer]] = None
val responseData = new GetReplicaLogInfoResponseData()
.setBrokerEpoch(brokerEpochSupplier.get())
while (topicIter.hasNext && partitionCount < GetReplicaLogInfoRequest.MAX_PARTITIONS_PER_REQUEST) {
val topic = topicIter.next()
val partitionIter = topic.partitions().iterator()
previousPartitionIter = Some(partitionIter)
val topicPartitionLogInfo = new GetReplicaLogInfoResponseData.TopicPartitionLogInfo()
.setTopicId(topic.topicId())
val maybeTopicName = metadataCache.getTopicName(topic.topicId())
if (maybeTopicName.isEmpty) {
processPartitions(topicPartitionLogInfo, partitionIter,
new GetReplicaLogInfoResponseData.PartitionLogInfo()
.setPartition(_)
.setErrorCode(Errors.UNKNOWN_TOPIC_ID.code()))
} else if (!isAuthorized(maybeTopicName.get())) {
processPartitions(topicPartitionLogInfo, partitionIter,
new GetReplicaLogInfoResponseData.PartitionLogInfo()
.setPartition(_)
.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code()))
} else {
val topicName = maybeTopicName.get()
processPartitions(topicPartitionLogInfo, partitionIter, { partitionId: Integer =>
val topicPartition = new TopicPartition(topicName, partitionId)
replicaManager.getPartitionOrError(topicPartition) match {
case Left(err) => new GetReplicaLogInfoResponseData.PartitionLogInfo()
.setPartition(topicPartition.partition())
.setErrorCode(err.code())
case Right(partition) => partition.log match {
case None => new GetReplicaLogInfoResponseData.PartitionLogInfo()
.setErrorCode(Errors.LOG_DIR_NOT_FOUND.code())
case Some(log) => {
val logEndOffset = log.logEndOffset
val lastLeaderEpoch = log.latestEpoch.orElse(-1)
val leaderEpoch = partition.getLeaderEpoch
new GetReplicaLogInfoResponseData.PartitionLogInfo()
.setPartition(partitionId)
.setLogEndOffset(logEndOffset)
.setCurrentLeaderEpoch(leaderEpoch)
.setLastWrittenLeaderEpoch(lastLeaderEpoch)
}
}
}
})
}
responseData.topicPartitionLogInfoList().add(topicPartitionLogInfo)
}
responseData.setHasMoreData(topicIter.hasNext || previousPartitionIter.map(_.hasNext).getOrElse(false))
requestHelper.sendMaybeThrottle(request, new GetReplicaLogInfoResponse(responseData))
}
override def tryCompleteActions(): Unit = {
replicaManager.tryCompleteActions()
}

View File

@ -154,7 +154,6 @@ class KafkaApisTest extends Logging {
private val time = new MockTime
private val clientId = ""
private var kafkaApis: KafkaApis = _
private val brokerEpoch = 123L
@AfterEach
def tearDown(): Unit = {
@ -211,8 +210,7 @@ class KafkaApisTest extends Logging {
tokenManager = null,
apiVersionManager = apiVersionManager,
clientMetricsManager = clientMetricsManager,
groupConfigManager = groupConfigManager,
brokerEpochSupplier = () => brokerEpoch)
groupConfigManager = groupConfigManager)
}
private def setupFeatures(featureVersions: Seq[FeatureVersion]): Unit = {
@ -13707,392 +13705,6 @@ class KafkaApisTest extends Logging {
assertEquals(alterShareGroupOffsetsResponseData, response.data)
}
def verifyGetReplicaLogInfoRequest(builder: GetReplicaLogInfoRequest.Builder, validate: GetReplicaLogInfoResponseData => Unit): Unit = {
val authorizer: Authorizer = mock(classOf[Authorizer])
authorizeResource(authorizer, AclOperation.CLUSTER_ACTION, ResourceType.CLUSTER, Resource.CLUSTER_NAME, AuthorizationResult.ALLOWED)
kafkaApis = createKafkaApis(authorizer = Some(authorizer))
val request = buildRequest(builder.build())
kafkaApis.handleGetReplicaLogInfo(request)
validate(verifyNoThrottling[GetReplicaLogInfoResponse](request).data())
}
def verifyGetReplicaLogInfoRequestWithResponse(builder: GetReplicaLogInfoRequest.Builder, expectedResponseData: GetReplicaLogInfoResponseData): Unit = {
verifyGetReplicaLogInfoRequest(builder, { response =>
assertEquals(expectedResponseData, response)
})
}
@Test
def testUnauthorizedGetReplicaLogInfo(): Unit = {
val authorizedUuid = Uuid.randomUuid()
val unauthorizedUuid = Uuid.randomUuid()
val requestData = new GetReplicaLogInfoRequestData()
requestData.topicPartitions().add(
new GetReplicaLogInfoRequestData.TopicPartitions()
.setTopicId(authorizedUuid)
.setPartitions(util.List.of(0)))
requestData.topicPartitions().add(
new GetReplicaLogInfoRequestData.TopicPartitions()
.setTopicId(unauthorizedUuid)
.setPartitions(util.List.of(0)))
val builder = new GetReplicaLogInfoRequest.Builder(requestData)
metadataCache = mock(classOf[MetadataCache])
when(metadataCache.getTopicName(authorizedUuid)).thenReturn(Optional.of("authorized"))
when(metadataCache.getTopicName(unauthorizedUuid)).thenReturn(Optional.of("unauthorized"))
val log = mock(classOf[UnifiedLog])
when(log.logEndOffset).thenReturn(100L)
when(log.latestEpoch).thenReturn(Optional.of(10))
val partition = mock(classOf[Partition])
when(partition.log).thenReturn(Some(log))
when(partition.getLeaderEpoch).thenReturn(1)
when(partition.partitionId).thenReturn(0)
val valid = new TopicPartition("authorized", 0)
when(replicaManager.getPartitionOrError(valid)).thenReturn(Right(partition))
val expected = new GetReplicaLogInfoResponseData().setBrokerEpoch(brokerEpoch)
expected.topicPartitionLogInfoList().add(new GetReplicaLogInfoResponseData.TopicPartitionLogInfo()
.setTopicId(authorizedUuid)
.setPartitionLogInfo(util.List.of(new GetReplicaLogInfoResponseData.PartitionLogInfo()
.setPartition(0)
.setLogEndOffset(100L)
.setLastWrittenLeaderEpoch(10)
.setCurrentLeaderEpoch(1))))
expected.topicPartitionLogInfoList().add(new GetReplicaLogInfoResponseData.TopicPartitionLogInfo()
.setTopicId(unauthorizedUuid)
.setPartitionLogInfo(util.List.of(
new GetReplicaLogInfoResponseData.PartitionLogInfo()
.setPartition(0)
.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code()))))
val authorizer: Authorizer = mock(classOf[Authorizer])
authorizeResource(authorizer, AclOperation.CLUSTER_ACTION, ResourceType.CLUSTER, Resource.CLUSTER_NAME, AuthorizationResult.DENIED)
authorizeResource(authorizer, AclOperation.DESCRIBE, ResourceType.TOPIC, "authorized", AuthorizationResult.ALLOWED)
authorizeResource(authorizer, AclOperation.DESCRIBE, ResourceType.TOPIC, "unauthorized", AuthorizationResult.DENIED)
kafkaApis = createKafkaApis(authorizer = Some(authorizer))
val request = buildRequest(builder.build())
kafkaApis.handleGetReplicaLogInfo(request)
val responseData = verifyNoThrottling[GetReplicaLogInfoResponse](request).data()
assertEquals(expected, responseData)
}
@ParameterizedTest
@ValueSource(ints=Array(GetReplicaLogInfoRequest.MAX_PARTITIONS_PER_REQUEST, GetReplicaLogInfoRequest.MAX_PARTITIONS_PER_REQUEST+1))
def testGetReplicaLogInfoSingleTopicManyPartitionsHasMoreData(partitionCount: Int): Unit = {
val expectedMoreData = GetReplicaLogInfoRequest.MAX_PARTITIONS_PER_REQUEST < partitionCount
val uuid = Uuid.randomUuid()
val expected = new GetReplicaLogInfoResponseData().setBrokerEpoch(brokerEpoch).setHasMoreData(expectedMoreData)
val topicPartitionResponses = new GetReplicaLogInfoResponseData.TopicPartitionLogInfo()
.setTopicId(uuid)
expected.topicPartitionLogInfoList().add(topicPartitionResponses)
val topicPartitionRequests = new GetReplicaLogInfoRequestData.TopicPartitions()
.setTopicId(uuid)
val topicName = "topic1"
(1 to partitionCount).foreach { pid =>
topicPartitionRequests.partitions().add(pid)
// Now setup the request itself
when(replicaManager.getPartitionOrError(new TopicPartition(topicName, pid))).thenReturn(Left(Errors.UNKNOWN_TOPIC_ID))
topicPartitionResponses.partitionLogInfo().add(new GetReplicaLogInfoResponseData.PartitionLogInfo().setErrorCode(Errors.UNKNOWN_TOPIC_ID.code()))
expected.topicPartitionLogInfoList().add(topicPartitionResponses)
}
val builder = new GetReplicaLogInfoRequest.Builder(new GetReplicaLogInfoRequestData()
.setTopicPartitions(util.List.of(topicPartitionRequests)))
verifyGetReplicaLogInfoRequest(builder, { response =>
assertEquals(expectedMoreData, response.hasMoreData)
})
}
@ParameterizedTest
@ValueSource(ints=Array(GetReplicaLogInfoRequest.MAX_PARTITIONS_PER_REQUEST, GetReplicaLogInfoRequest.MAX_PARTITIONS_PER_REQUEST+1))
def testGetReplicaLogInfoSingleTopicPartitionHasMoreData(topicCount: Int): Unit = {
val expectedMoreData = GetReplicaLogInfoRequest.MAX_PARTITIONS_PER_REQUEST < topicCount
val expected = new GetReplicaLogInfoResponseData().setBrokerEpoch(brokerEpoch).setHasMoreData(expectedMoreData)
val uuids = (1 to topicCount).map(_ => Uuid.randomUuid())
val topicName = "topic1"
val requestData = new GetReplicaLogInfoRequestData()
uuids.foreach { uuid =>
// setup response
val topicResponse = new GetReplicaLogInfoResponseData.TopicPartitionLogInfo().setTopicId(uuid)
topicResponse.partitionLogInfo().add(new GetReplicaLogInfoResponseData.PartitionLogInfo().setErrorCode(Errors.UNKNOWN_TOPIC_ID.code()))
expected.topicPartitionLogInfoList().add(topicResponse)
when(replicaManager.getPartitionOrError(new TopicPartition(topicName, 0))).thenReturn(Left(Errors.UNKNOWN_TOPIC_ID))
// setup request
requestData.topicPartitions().add(
new GetReplicaLogInfoRequestData.TopicPartitions()
.setTopicId(uuid)
.setPartitions(util.List.of(0)))
}
val builder = new GetReplicaLogInfoRequest.Builder(requestData)
// Tests a specific edge case; when we have exactly 1000 results we should return hasMoreData=false
verifyGetReplicaLogInfoRequest(builder, { response =>
assertEquals(expectedMoreData, response.hasMoreData)
})
}
@Test
def testGetReplicaLogInfoMixOfSuccessAndFailure(): Unit = {
// 1 succesful case and 3 failing topic partitions with different error codes, returned in same request.
val uuids = (1 to 4).map(_ => Uuid.randomUuid()).toList
val tps = uuids.map(new GetReplicaLogInfoRequestData.TopicPartitions()
.setTopicId(_)
.setPartitions(util.List.of(1)))
def mockTopicName(uuid: Uuid, idx: Int): String = s"topic-idx-$idx-with-uuid-$uuid"
metadataCache = mock(classOf[MetadataCache])
uuids.zipWithIndex.foreach { case (uuid, idx) =>
val name = mockTopicName(uuid, idx)
when(metadataCache.getTopicName(uuid)).thenReturn(Optional.of(name))
}
val log = mock(classOf[UnifiedLog])
when(log.logEndOffset).thenReturn(100L)
when(log.latestEpoch).thenReturn(Optional.of(10))
val partition = mock(classOf[Partition])
when(partition.log).thenReturn(Some(log))
when(partition.getLeaderEpoch).thenReturn(1)
when(partition.partitionId).thenReturn(1)
val valid = new TopicPartition(mockTopicName(uuids.head, 0), 1)
when(replicaManager.getPartitionOrError(valid)).thenReturn(Right(partition))
val expected = new GetReplicaLogInfoResponseData().setBrokerEpoch(brokerEpoch)
expected.topicPartitionLogInfoList().add(new GetReplicaLogInfoResponseData.TopicPartitionLogInfo()
.setTopicId(uuids.head)
.setPartitionLogInfo(util.List.of(new GetReplicaLogInfoResponseData.PartitionLogInfo()
.setPartition(1)
.setLogEndOffset(100L)
.setLastWrittenLeaderEpoch(10)
.setCurrentLeaderEpoch(1))))
var idx = 1
List(Errors.KAFKA_STORAGE_ERROR, Errors.NOT_LEADER_OR_FOLLOWER, Errors.UNKNOWN_TOPIC_OR_PARTITION).foreach { err =>
val uuid = uuids(idx)
val name = mockTopicName(uuid, idx)
val invalid = new TopicPartition(name, 1)
when(replicaManager.getPartitionOrError(invalid)).thenReturn(Left(err))
expected.topicPartitionLogInfoList().add(new GetReplicaLogInfoResponseData.TopicPartitionLogInfo()
.setTopicId(uuid)
.setPartitionLogInfo(util.List.of(new GetReplicaLogInfoResponseData.PartitionLogInfo()
.setErrorCode(err.code())
.setPartition(1))))
idx += 1
}
val builder = new GetReplicaLogInfoRequest.Builder(
new GetReplicaLogInfoRequestData().setTopicPartitions(tps asJava)
)
verifyGetReplicaLogInfoRequestWithResponse(builder, expected)
}
@Test
def testGetReplicaLogInfoLogDirNotFoundError(): Unit = {
// Handles case where an online LogDir is not found.
val topic = new GetReplicaLogInfoRequestData.TopicPartitions()
.setTopicId(Uuid.randomUuid())
.setPartitions(util.List.of(2))
val builder = new GetReplicaLogInfoRequest.Builder(List(topic) asJava)
metadataCache = mock(classOf[MetadataCache])
when(metadataCache.getTopicName(topic.topicId())).thenReturn(Optional.of("topic2"))
val partition = mock(classOf[Partition])
when(partition.log).thenReturn(None)
when(partition.getLeaderEpoch).thenReturn(2)
when(partition.partitionId).thenReturn(2)
val tp = new TopicPartition("topic2", 2)
when(replicaManager.getPartitionOrError(tp)).thenReturn(Right(partition))
val expected = new GetReplicaLogInfoResponseData()
.setTopicPartitionLogInfoList(List(
new GetReplicaLogInfoResponseData.TopicPartitionLogInfo()
.setTopicId(topic.topicId())
.setPartitionLogInfo(util.List.of(
new GetReplicaLogInfoResponseData.PartitionLogInfo()
.setErrorCode(Errors.LOG_DIR_NOT_FOUND.code())
))) asJava)
.setBrokerEpoch(brokerEpoch)
verifyGetReplicaLogInfoRequestWithResponse(builder, expected)
}
@Test
def testGetReplicaLogInfoUnknownTopic(): Unit = {
val unknownTopicUuid = Uuid.randomUuid()
val knownTopicUuid = Uuid.randomUuid()
val builder = new GetReplicaLogInfoRequest.Builder(new GetReplicaLogInfoRequestData()
.setTopicPartitions(
util.List.of(
new GetReplicaLogInfoRequestData.TopicPartitions()
.setTopicId(unknownTopicUuid)
.setPartitions(util.List.of(1)),
new GetReplicaLogInfoRequestData.TopicPartitions()
.setTopicId(knownTopicUuid)
.setPartitions(util.List.of(1)))))
metadataCache = mock(classOf[MetadataCache])
when(metadataCache.getTopicName(unknownTopicUuid)).thenReturn(Optional.empty())
when(metadataCache.getTopicName(knownTopicUuid)).thenReturn(Optional.of("topic1"))
val log = mock(classOf[UnifiedLog])
when(log.logEndOffset).thenReturn(100L)
when(log.latestEpoch).thenReturn(Optional.of(10))
val partition = mock(classOf[Partition])
when(partition.log).thenReturn(Some(log))
when(partition.getLeaderEpoch).thenReturn(1)
when(partition.partitionId).thenReturn(1)
val valid = new TopicPartition("topic1", 1)
when(replicaManager.getPartitionOrError(valid)).thenReturn(Right(partition))
val expected = new GetReplicaLogInfoResponseData()
.setBrokerEpoch(brokerEpoch)
.setTopicPartitionLogInfoList(util.List.of(
new GetReplicaLogInfoResponseData.TopicPartitionLogInfo()
.setTopicId(unknownTopicUuid)
.setPartitionLogInfo(util.List.of(
new GetReplicaLogInfoResponseData.PartitionLogInfo()
.setPartition(1)
.setErrorCode(Errors.UNKNOWN_TOPIC_ID.code())
)),
new GetReplicaLogInfoResponseData.TopicPartitionLogInfo()
.setTopicId(knownTopicUuid)
.setPartitionLogInfo(util.List.of(
new GetReplicaLogInfoResponseData.PartitionLogInfo()
.setPartition(1)
.setLogEndOffset(100L)
.setCurrentLeaderEpoch(1)
.setLastWrittenLeaderEpoch(10)))))
verifyGetReplicaLogInfoRequestWithResponse(builder, expected)
}
@Test
def testGetReplicaLogInfoRequestTooManyTopics(): Unit = {
// 100 topics, 20 partitions per topic = 2k topic-partitions
// only first 1000 should be sent back and HasMoreData = true
val numberUuids = 100
val numberPartitions = 20
val uuids: List[Uuid] = (1 to numberUuids).map(_ => Uuid.randomUuid()).toList
val tps = uuids.map(new GetReplicaLogInfoRequestData.TopicPartitions()
.setTopicId(_)
.setPartitions((1 to numberPartitions).map(new Integer(_)).asJava))
val builder = new GetReplicaLogInfoRequest.Builder(
new GetReplicaLogInfoRequestData().setTopicPartitions(tps asJava))
val expectedLogEndOffset = 10L
val expectedLeaderEpoch = 2
val expectedLatestEpoch = 3
def mockTopicName(uuid: Uuid, idx: Int): String = s"topic-idx-$idx-with-uuid-$uuid"
// We instrument every-topic partition so that the failure happens at the verifyGetReplicaLogInfoRequest assertion.
metadataCache = mock(classOf[MetadataCache])
uuids.zipWithIndex.foreach { case (uuid, idx) =>
when(metadataCache.getTopicName(uuid)).thenReturn(Optional.of(mockTopicName(uuid, idx)))
}
val expected =
new GetReplicaLogInfoResponseData().setHasMoreData(true).setBrokerEpoch(brokerEpoch)
// Since each topic has 20 partitions, we only return the first 50 to reach topic-partition limit of 1000.
uuids.take(50).zipWithIndex.foreach { case (uuid, idx) =>
val tpli = new GetReplicaLogInfoResponseData.TopicPartitionLogInfo()
.setTopicId(uuid)
val topicName = mockTopicName(uuid, idx)
val log = mock(classOf[UnifiedLog])
when(log.logEndOffset).thenReturn(expectedLogEndOffset)
when(log.latestEpoch).thenReturn(Optional.of(expectedLatestEpoch))
(1 to numberPartitions).foreach { pid =>
val partition = mock(classOf[Partition])
when(partition.log).thenReturn(Some(log))
when(partition.getLeaderEpoch).thenReturn(expectedLeaderEpoch)
when(partition.partitionId).thenReturn(pid)
when(replicaManager.getPartitionOrError(new TopicPartition(topicName, pid))).thenReturn(Right(partition))
tpli.partitionLogInfo().add(new GetReplicaLogInfoResponseData.PartitionLogInfo()
.setPartition(pid)
.setLogEndOffset(expectedLogEndOffset)
.setLastWrittenLeaderEpoch(expectedLatestEpoch)
.setCurrentLeaderEpoch(expectedLeaderEpoch))
}
expected.topicPartitionLogInfoList().add(tpli)
}
verifyGetReplicaLogInfoRequestWithResponse(builder, expected)
}
@Test
def testGetReplicaInfoRequestHappyTrail(): Unit = {
// Return multiple successful topic-partitions. The first topic has 2 partitions, the second has 1.
val topic1 = new GetReplicaLogInfoRequestData.TopicPartitions()
.setTopicId(Uuid.randomUuid())
.setPartitions(util.List.of(1, 2))
val topic2 = new GetReplicaLogInfoRequestData.TopicPartitions()
.setTopicId(Uuid.randomUuid())
.setPartitions(util.List.of(3))
val builder = new GetReplicaLogInfoRequest.Builder(List(topic1, topic2) asJava)
metadataCache = mock(classOf[MetadataCache])
when(metadataCache.getTopicName(topic1.topicId())).thenReturn(Optional.of("topic1"))
when(metadataCache.getTopicName(topic2.topicId())).thenReturn(Optional.of("topic2"))
val log1 = mock(classOf[UnifiedLog])
when(log1.logEndOffset).thenReturn(100L)
when(log1.latestEpoch).thenReturn(Optional.of(10))
val partition1 = mock(classOf[Partition])
when(partition1.log).thenReturn(Some(log1))
when(partition1.getLeaderEpoch).thenReturn(1)
when(partition1.partitionId).thenReturn(1)
val log2 = mock(classOf[UnifiedLog])
when(log2.logEndOffset).thenReturn(200L)
when(log2.latestEpoch).thenReturn(Optional.of(20))
val partition2 = mock(classOf[Partition])
when(partition2.log).thenReturn(Some(log2))
when(partition2.getLeaderEpoch).thenReturn(2)
when(partition2.partitionId).thenReturn(2)
val log3 = mock(classOf[UnifiedLog])
when(log3.logEndOffset).thenReturn(300L)
when(log3.latestEpoch).thenReturn(Optional.of(30))
val partition3 = mock(classOf[Partition])
when(partition3.log).thenReturn(Some(log3))
when(partition3.getLeaderEpoch).thenReturn(3)
when(partition3.partitionId).thenReturn(3)
val tp1 = new TopicPartition("topic1", 1)
when(replicaManager.getPartitionOrError(tp1)).thenReturn(Right(partition1))
val tp2 = new TopicPartition("topic1", 2)
when(replicaManager.getPartitionOrError(tp2)).thenReturn(Right(partition2))
val tp3 = new TopicPartition("topic2", 3)
when(replicaManager.getPartitionOrError(tp3)).thenReturn(Right(partition3))
val expected = new GetReplicaLogInfoResponseData()
.setTopicPartitionLogInfoList(List(
new GetReplicaLogInfoResponseData.TopicPartitionLogInfo()
.setTopicId(topic1.topicId())
.setPartitionLogInfo(util.List.of(
new GetReplicaLogInfoResponseData.PartitionLogInfo()
.setPartition(1)
.setLogEndOffset(100L)
.setLastWrittenLeaderEpoch(10)
.setCurrentLeaderEpoch(1),
new GetReplicaLogInfoResponseData.PartitionLogInfo()
.setPartition(2)
.setLogEndOffset(200L)
.setLastWrittenLeaderEpoch(20)
.setCurrentLeaderEpoch(2))),
new GetReplicaLogInfoResponseData.TopicPartitionLogInfo()
.setTopicId(topic2.topicId())
.setPartitionLogInfo(util.List.of(
new GetReplicaLogInfoResponseData.PartitionLogInfo()
.setPartition(3)
.setLogEndOffset(300L)
.setLastWrittenLeaderEpoch(30)
.setCurrentLeaderEpoch(3)
))) asJava)
.setBrokerEpoch(brokerEpoch)
verifyGetReplicaLogInfoRequestWithResponse(builder, expected)
}
def getShareGroupDescribeResponse(groupIds: util.List[String], enableShareGroups: Boolean = true,
verifyNoErr: Boolean = true, authorizer: Authorizer = null,
describedGroups: util.List[ShareGroupDescribeResponseData.DescribedGroup]): ShareGroupDescribeResponse = {

View File

@ -760,9 +760,6 @@ class RequestQuotaTest extends BaseRequestTest {
case ApiKeys.DELETE_SHARE_GROUP_OFFSETS =>
new DeleteShareGroupOffsetsRequest.Builder(new DeleteShareGroupOffsetsRequestData())
case ApiKeys.GET_REPLICA_LOG_INFO =>
new GetReplicaLogInfoRequest.Builder(new GetReplicaLogInfoRequestData())
case _ =>
throw new IllegalArgumentException("Unsupported API key " + apiKey)
}

View File

@ -116,8 +116,6 @@ import org.apache.kafka.common.message.FetchSnapshotRequestDataJsonConverter;
import org.apache.kafka.common.message.FetchSnapshotResponseDataJsonConverter;
import org.apache.kafka.common.message.FindCoordinatorRequestDataJsonConverter;
import org.apache.kafka.common.message.FindCoordinatorResponseDataJsonConverter;
import org.apache.kafka.common.message.GetReplicaLogInfoRequestDataJsonConverter;
import org.apache.kafka.common.message.GetReplicaLogInfoResponseDataJsonConverter;
import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestDataJsonConverter;
import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseDataJsonConverter;
import org.apache.kafka.common.message.HeartbeatRequestDataJsonConverter;
@ -300,8 +298,6 @@ import org.apache.kafka.common.requests.FetchSnapshotRequest;
import org.apache.kafka.common.requests.FetchSnapshotResponse;
import org.apache.kafka.common.requests.FindCoordinatorRequest;
import org.apache.kafka.common.requests.FindCoordinatorResponse;
import org.apache.kafka.common.requests.GetReplicaLogInfoRequest;
import org.apache.kafka.common.requests.GetReplicaLogInfoResponse;
import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest;
import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse;
import org.apache.kafka.common.requests.HeartbeatRequest;
@ -575,8 +571,6 @@ public class RequestConvertToJson {
return WriteShareGroupStateRequestDataJsonConverter.write(((WriteShareGroupStateRequest) request).data(), request.version());
case WRITE_TXN_MARKERS:
return WriteTxnMarkersRequestDataJsonConverter.write(((WriteTxnMarkersRequest) request).data(), request.version());
case GET_REPLICA_LOG_INFO:
return GetReplicaLogInfoRequestDataJsonConverter.write(((GetReplicaLogInfoRequest) request).data(), request.version());
default:
throw new IllegalStateException("ApiKey " + request.apiKey() + " is not currently handled in `request`, the " +
"code should be updated to do so.");
@ -763,8 +757,6 @@ public class RequestConvertToJson {
return WriteShareGroupStateResponseDataJsonConverter.write(((WriteShareGroupStateResponse) response).data(), version);
case WRITE_TXN_MARKERS:
return WriteTxnMarkersResponseDataJsonConverter.write(((WriteTxnMarkersResponse) response).data(), version);
case GET_REPLICA_LOG_INFO:
return GetReplicaLogInfoResponseDataJsonConverter.write(((GetReplicaLogInfoResponse) response).data(), version);
default:
throw new IllegalStateException("ApiKey " + response.apiKey() + " is not currently handled in `response`, the " +
"code should be updated to do so.");