KAFKA-18728 Move ListOffsetsPartitionStatus to server module (#18807)

Reviewers: Kamal Chandraprakash <kamal.chandraprakash@gmail.com>
This commit is contained in:
Ken Huang 2025-02-13 13:06:46 +08:00 committed by GitHub
parent 6a050c6351
commit 9494bebee6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 167 additions and 81 deletions

View File

@ -83,6 +83,7 @@
<subpackage name="server">
<allow pkg="org.apache.kafka.server" />
<allow pkg="org.apache.kafka.image" />
<allow pkg="org.apache.kafka.storage.internals.log" />
<subpackage name="metrics">
<allow class="org.apache.kafka.server.authorizer.AuthorizableRequestContext" />
<allow pkg="org.apache.kafka.server.telemetry" />

View File

@ -23,6 +23,7 @@ import org.apache.kafka.common.errors.ApiException
import org.apache.kafka.common.message.ListOffsetsResponseData.{ListOffsetsPartitionResponse, ListOffsetsTopicResponse}
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.ListOffsetsResponse
import org.apache.kafka.server.ListOffsetsPartitionStatus
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.purgatory.DelayedOperation
import org.apache.kafka.storage.internals.log.OffsetResultHolder.FileRecordsOrError
@ -41,9 +42,9 @@ class DelayedRemoteListOffsets(delayMs: Long,
// Mark the status as completed, if there is no async task to track.
// If there is a task to track, then build the response as REQUEST_TIMED_OUT by default.
statusByPartition.foreachEntry { (topicPartition, status) =>
status.completed = status.futureHolderOpt.isEmpty
status.completed(status.futureHolderOpt.isEmpty)
if (status.futureHolderOpt.isPresent) {
status.responseOpt = Some(buildErrorResponse(Errors.REQUEST_TIMED_OUT, topicPartition.partition()))
status.responseOpt(Optional.of(buildErrorResponse(Errors.REQUEST_TIMED_OUT, topicPartition.partition())))
}
trace(s"Initial partition status for $topicPartition is $status")
}
@ -68,7 +69,7 @@ class DelayedRemoteListOffsets(delayMs: Long,
override def onComplete(): Unit = {
val responseTopics = statusByPartition.groupBy(e => e._1.topic()).map {
case (topic, status) =>
new ListOffsetsTopicResponse().setName(topic).setPartitions(status.values.flatMap(s => s.responseOpt).toList.asJava)
new ListOffsetsTopicResponse().setName(topic).setPartitions(status.values.flatMap(s => Some(s.responseOpt.get())).toList.asJava)
}.toList
responseCallback(responseTopics)
}
@ -103,13 +104,13 @@ class DelayedRemoteListOffsets(delayMs: Long,
} else if (!taskFuture.hasTimestampAndOffset) {
val error = status.maybeOffsetsError
.map(e => if (version >= 5) Errors.forException(e) else Errors.LEADER_NOT_AVAILABLE)
.getOrElse(Errors.NONE)
.orElse(Errors.NONE)
buildErrorResponse(error, partition.partition())
} else {
var partitionResponse = buildErrorResponse(Errors.NONE, partition.partition())
val found = taskFuture.timestampAndOffset().get()
if (status.lastFetchableOffset.isDefined && found.offset >= status.lastFetchableOffset.get) {
if (status.maybeOffsetsError.isDefined) {
if (status.lastFetchableOffset.isPresent && found.offset >= status.lastFetchableOffset.get) {
if (status.maybeOffsetsError.isPresent) {
val error = if (version >= 5) Errors.forException(status.maybeOffsetsError.get) else Errors.LEADER_NOT_AVAILABLE
partitionResponse.setErrorCode(error.code())
}
@ -127,8 +128,8 @@ class DelayedRemoteListOffsets(delayMs: Long,
partitionResponse
}
}
status.responseOpt = Some(response)
status.completed = true
status.responseOpt(Optional.of(response))
status.completed(true)
}
completable = completable && futureHolder.taskFuture.isDone
}

View File

@ -1,48 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server
import org.apache.kafka.common.errors.ApiException
import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsPartitionResponse
import org.apache.kafka.storage.internals.log.AsyncOffsetReadFutureHolder
import org.apache.kafka.storage.internals.log.OffsetResultHolder.FileRecordsOrError
import java.util.Optional
class ListOffsetsPartitionStatus(val futureHolderOpt: Optional[AsyncOffsetReadFutureHolder[FileRecordsOrError]],
val lastFetchableOffset: Option[Long],
val maybeOffsetsError: Option[ApiException]) {
@volatile var responseOpt: Option[ListOffsetsPartitionResponse] = None
@volatile var completed = false
override def toString: String = {
s"[responseOpt: $responseOpt, lastFetchableOffset: $lastFetchableOffset, " +
s"maybeOffsetsError: $maybeOffsetsError, completed: $completed]"
}
}
object ListOffsetsPartitionStatus {
def apply(responseOpt: Option[ListOffsetsPartitionResponse],
futureHolderOpt: Optional[AsyncOffsetReadFutureHolder[FileRecordsOrError]] = Optional.empty(),
lastFetchableOffset: Option[Long] = None,
maybeOffsetsError: Option[ApiException] = None): ListOffsetsPartitionStatus = {
val status = new ListOffsetsPartitionStatus(futureHolderOpt, lastFetchableOffset, maybeOffsetsError)
status.responseOpt = responseOpt
status
}
}

View File

@ -49,7 +49,7 @@ import org.apache.kafka.common.utils.{Exit, Time}
import org.apache.kafka.common.{IsolationLevel, Node, TopicIdPartition, TopicPartition, Uuid}
import org.apache.kafka.image.{LocalReplicaChanges, MetadataImage, TopicsDelta}
import org.apache.kafka.metadata.LeaderConstants.NO_LEADER
import org.apache.kafka.server.{ActionQueue, DelayedActionQueue, common}
import org.apache.kafka.server.{ActionQueue, DelayedActionQueue, ListOffsetsPartitionStatus, common}
import org.apache.kafka.server.common.{DirectoryEventHandler, RequestLocal, StopPartition, TopicOptionalIdPartition}
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.network.BrokerEndPoint
@ -1383,9 +1383,11 @@ class ReplicaManager(val config: KafkaConfig,
if (duplicatePartitions.contains(topicPartition)) {
debug(s"OffsetRequest with correlation id $correlationId from client $clientId on partition $topicPartition " +
s"failed because the partition is duplicated in the request.")
statusByPartition += topicPartition -> ListOffsetsPartitionStatus(Some(buildErrorResponse(Errors.INVALID_REQUEST, partition)))
statusByPartition += topicPartition ->
ListOffsetsPartitionStatus.builder().responseOpt(Optional.of(buildErrorResponse(Errors.INVALID_REQUEST, partition))).build()
} else if (isListOffsetsTimestampUnsupported(partition.timestamp(), version)) {
statusByPartition += topicPartition -> ListOffsetsPartitionStatus(Some(buildErrorResponse(Errors.UNSUPPORTED_VERSION, partition)))
statusByPartition += topicPartition ->
ListOffsetsPartitionStatus.builder().responseOpt(Optional.of(buildErrorResponse(Errors.UNSUPPORTED_VERSION, partition))).build()
} else {
try {
val fetchOnlyFromLeader = replicaId != ListOffsetsRequest.DEBUGGING_REPLICA_ID
@ -1418,15 +1420,19 @@ class ReplicaManager(val config: KafkaConfig,
if (timestampAndOffsetOpt.leaderEpoch.isPresent && version >= 4)
partitionResponse.setLeaderEpoch(timestampAndOffsetOpt.leaderEpoch.get)
}
ListOffsetsPartitionStatus(Some(partitionResponse))
ListOffsetsPartitionStatus.builder().responseOpt(Optional.of(partitionResponse)).build()
} else if (resultHolder.timestampAndOffsetOpt.isEmpty && resultHolder.futureHolderOpt.isEmpty) {
// This is an empty offset response scenario
resultHolder.maybeOffsetsError.map(e => throw e)
ListOffsetsPartitionStatus(Some(buildErrorResponse(Errors.NONE, partition)))
ListOffsetsPartitionStatus.builder().responseOpt(Optional.of(buildErrorResponse(Errors.NONE, partition))).build()
} else if (resultHolder.timestampAndOffsetOpt.isEmpty && resultHolder.futureHolderOpt.isPresent) {
// This case is for topic enabled with remote storage and we want to search the timestamp in
// remote storage using async fashion.
ListOffsetsPartitionStatus(None, resultHolder.futureHolderOpt(), resultHolder.lastFetchableOffset.toScala.map(_.longValue()), resultHolder.maybeOffsetsError.toScala)
ListOffsetsPartitionStatus.builder()
.futureHolderOpt(resultHolder.futureHolderOpt())
.lastFetchableOffset(resultHolder.lastFetchableOffset)
.maybeOffsetsError(resultHolder.maybeOffsetsError)
.build()
} else {
throw new IllegalStateException(s"Unexpected result holder state $resultHolder")
}
@ -1443,19 +1449,22 @@ class ReplicaManager(val config: KafkaConfig,
_ : UnsupportedForMessageFormatException) =>
debug(s"Offset request with correlation id $correlationId from client $clientId on " +
s"partition $topicPartition failed due to ${e.getMessage}")
statusByPartition += topicPartition -> ListOffsetsPartitionStatus(Some(buildErrorResponse(Errors.forException(e), partition)))
statusByPartition += topicPartition ->
ListOffsetsPartitionStatus.builder().responseOpt(Optional.of(buildErrorResponse(Errors.forException(e), partition))).build()
// Only V5 and newer ListOffset calls should get OFFSET_NOT_AVAILABLE
case e: OffsetNotAvailableException =>
if (version >= 5) {
statusByPartition += topicPartition -> ListOffsetsPartitionStatus(Some(buildErrorResponse(Errors.forException(e), partition)))
statusByPartition += topicPartition ->
ListOffsetsPartitionStatus.builder().responseOpt(Optional.of(buildErrorResponse(Errors.forException(e), partition))).build()
} else {
statusByPartition += topicPartition -> ListOffsetsPartitionStatus(Some(buildErrorResponse(Errors.LEADER_NOT_AVAILABLE, partition)))
statusByPartition += topicPartition ->
ListOffsetsPartitionStatus.builder().responseOpt(Optional.of(buildErrorResponse(Errors.LEADER_NOT_AVAILABLE, partition))).build()
}
case e: Throwable =>
error("Error while responding to offset request", e)
statusByPartition += topicPartition -> ListOffsetsPartitionStatus(Some(buildErrorResponse(Errors.forException(e), partition)))
statusByPartition += topicPartition ->
ListOffsetsPartitionStatus.builder().responseOpt(Optional.of(buildErrorResponse(Errors.forException(e), partition))).build()
}
}
}
@ -1473,7 +1482,7 @@ class ReplicaManager(val config: KafkaConfig,
// we can respond immediately
val responseTopics = statusByPartition.groupBy(e => e._1.topic()).map {
case (topic, status) =>
new ListOffsetsTopicResponse().setName(topic).setPartitions(status.values.flatMap(s => s.responseOpt).toList.asJava)
new ListOffsetsTopicResponse().setName(topic).setPartitions(status.values.flatMap(s => Some(s.responseOpt.get())).toList.asJava)
}.toList
responseCallback(responseTopics)
}

View File

@ -22,6 +22,7 @@ import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsTopicR
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.FileRecords.TimestampAndOffset
import org.apache.kafka.common.requests.ListOffsetsResponse
import org.apache.kafka.server.ListOffsetsPartitionStatus
import org.apache.kafka.server.purgatory.{DelayedOperationPurgatory, TopicPartitionOperationKey}
import org.apache.kafka.server.util.timer.MockTimer
import org.apache.kafka.storage.internals.log.{AsyncOffsetReadFutureHolder, OffsetResultHolder}
@ -76,9 +77,9 @@ class DelayedRemoteListOffsetsTest {
})
val statusByPartition = mutable.Map(
new TopicPartition("test", 0) -> ListOffsetsPartitionStatus(None, Optional.of(holder)),
new TopicPartition("test", 1) -> ListOffsetsPartitionStatus(None, Optional.of(holder)),
new TopicPartition("test1", 0) -> ListOffsetsPartitionStatus(None, Optional.of(holder))
new TopicPartition("test", 0) -> ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(holder)).build(),
new TopicPartition("test", 1) -> ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(holder)).build(),
new TopicPartition("test1", 0) -> ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(holder)).build(),
)
val delayedRemoteListOffsets = new DelayedRemoteListOffsets(delayMs, version = 5, statusByPartition, replicaManager, responseCallback)
@ -128,9 +129,9 @@ class DelayedRemoteListOffsetsTest {
})
val statusByPartition = mutable.Map(
new TopicPartition("test", 0) -> ListOffsetsPartitionStatus(None, Optional.of(holder)),
new TopicPartition("test", 1) -> ListOffsetsPartitionStatus(None, Optional.of(holder)),
new TopicPartition("test1", 0) -> ListOffsetsPartitionStatus(None, Optional.of(holder))
new TopicPartition("test", 0) -> ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(holder)).build(),
new TopicPartition("test", 1) -> ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(holder)).build(),
new TopicPartition("test1", 0) -> ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(holder)).build()
)
val delayedRemoteListOffsets = new DelayedRemoteListOffsets(delayMs, version = 5, statusByPartition, replicaManager, responseCallback)
@ -184,9 +185,9 @@ class DelayedRemoteListOffsetsTest {
when(errorFutureHolder.jobFuture).thenReturn(jobFuture)
val statusByPartition = mutable.Map(
new TopicPartition("test", 0) -> ListOffsetsPartitionStatus(None, Optional.of(holder)),
new TopicPartition("test", 1) -> ListOffsetsPartitionStatus(None, Optional.of(holder)),
new TopicPartition("test1", 0) -> ListOffsetsPartitionStatus(None, Optional.of(errorFutureHolder))
new TopicPartition("test", 0) -> ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(holder)).build(),
new TopicPartition("test", 1) -> ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(holder)).build(),
new TopicPartition("test1", 0) -> ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(errorFutureHolder)).build()
)
val delayedRemoteListOffsets = new DelayedRemoteListOffsets(delayMs, version = 5, statusByPartition, replicaManager, responseCallback)
@ -241,10 +242,10 @@ class DelayedRemoteListOffsetsTest {
when(errorFutureHolder.jobFuture).thenReturn(jobFuture)
val statusByPartition = mutable.Map(
new TopicPartition("test", 0) -> ListOffsetsPartitionStatus(None, Optional.of(holder)),
new TopicPartition("test", 1) -> ListOffsetsPartitionStatus(None, Optional.of(holder)),
new TopicPartition("test1", 0) -> ListOffsetsPartitionStatus(None, Optional.of(errorFutureHolder)),
new TopicPartition("test1", 1) -> ListOffsetsPartitionStatus(None, Optional.of(holder))
new TopicPartition("test", 0) -> ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(holder)).build(),
new TopicPartition("test", 1) -> ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(holder)).build(),
new TopicPartition("test1", 0) -> ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(errorFutureHolder)).build(),
new TopicPartition("test1", 1) -> ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(holder)).build()
)
val delayedRemoteListOffsets = new DelayedRemoteListOffsets(delayMs, version = 5, statusByPartition, replicaManager, responseCallback)

View File

@ -0,0 +1,122 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsPartitionResponse;
import org.apache.kafka.storage.internals.log.AsyncOffsetReadFutureHolder;
import org.apache.kafka.storage.internals.log.OffsetResultHolder.FileRecordsOrError;
import java.util.Optional;
public class ListOffsetsPartitionStatus {
private final Optional<AsyncOffsetReadFutureHolder<FileRecordsOrError>> futureHolderOpt;
private final Optional<Long> lastFetchableOffset;
private final Optional<ApiException> maybeOffsetsError;
private volatile Optional<ListOffsetsPartitionResponse> responseOpt;
private volatile boolean completed = false;
private ListOffsetsPartitionStatus(
Optional<AsyncOffsetReadFutureHolder<FileRecordsOrError>> futureHolderOpt,
Optional<Long> lastFetchableOffset,
Optional<ApiException> maybeOffsetsError,
Optional<ListOffsetsPartitionResponse> responseOpt
) {
this.futureHolderOpt = futureHolderOpt;
this.lastFetchableOffset = lastFetchableOffset;
this.maybeOffsetsError = maybeOffsetsError;
this.responseOpt = responseOpt;
}
public static Builder builder() {
return new Builder();
}
public static class Builder {
private Optional<AsyncOffsetReadFutureHolder<FileRecordsOrError>> futureHolderOpt = Optional.empty();
private Optional<Long> lastFetchableOffset = Optional.empty();
private Optional<ApiException> maybeOffsetsError = Optional.empty();
private volatile Optional<ListOffsetsPartitionResponse> responseOpt = Optional.empty();
public Builder futureHolderOpt(Optional<AsyncOffsetReadFutureHolder<FileRecordsOrError>> futureHolder) {
this.futureHolderOpt = futureHolder;
return this;
}
public Builder lastFetchableOffset(Optional<Long> lastFetchableOffset) {
this.lastFetchableOffset = lastFetchableOffset;
return this;
}
public Builder maybeOffsetsError(Optional<ApiException> maybeOffsetsError) {
this.maybeOffsetsError = maybeOffsetsError;
return this;
}
public Builder responseOpt(Optional<ListOffsetsPartitionResponse> responseOpt) {
this.responseOpt = responseOpt;
return this;
}
public ListOffsetsPartitionStatus build() {
return new ListOffsetsPartitionStatus(
futureHolderOpt,
lastFetchableOffset,
maybeOffsetsError,
responseOpt
);
}
}
public Optional<AsyncOffsetReadFutureHolder<FileRecordsOrError>> futureHolderOpt() {
return futureHolderOpt;
}
public Optional<Long> lastFetchableOffset() {
return lastFetchableOffset;
}
public Optional<ApiException> maybeOffsetsError() {
return maybeOffsetsError;
}
public void responseOpt(Optional<ListOffsetsPartitionResponse> responseOpt) {
this.responseOpt = responseOpt;
}
public Optional<ListOffsetsPartitionResponse> responseOpt() {
return responseOpt;
}
public void completed(boolean completed) {
this.completed = completed;
}
public boolean completed() {
return completed;
}
@Override
public String toString() {
return String.format("[responseOpt: %s, lastFetchableOffset: %s, " +
"maybeOffsetsError: %s, completed: %s]",
responseOpt, lastFetchableOffset, maybeOffsetsError, completed);
}
}