diff --git a/checkstyle/import-control-server.xml b/checkstyle/import-control-server.xml index 3d215ee6a5f..94937333a05 100644 --- a/checkstyle/import-control-server.xml +++ b/checkstyle/import-control-server.xml @@ -83,6 +83,7 @@ + diff --git a/core/src/main/scala/kafka/server/DelayedRemoteListOffsets.scala b/core/src/main/scala/kafka/server/DelayedRemoteListOffsets.scala index f2bb8c37d85..a84b78ff25c 100644 --- a/core/src/main/scala/kafka/server/DelayedRemoteListOffsets.scala +++ b/core/src/main/scala/kafka/server/DelayedRemoteListOffsets.scala @@ -23,6 +23,7 @@ import org.apache.kafka.common.errors.ApiException import org.apache.kafka.common.message.ListOffsetsResponseData.{ListOffsetsPartitionResponse, ListOffsetsTopicResponse} import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.requests.ListOffsetsResponse +import org.apache.kafka.server.ListOffsetsPartitionStatus import org.apache.kafka.server.metrics.KafkaMetricsGroup import org.apache.kafka.server.purgatory.DelayedOperation import org.apache.kafka.storage.internals.log.OffsetResultHolder.FileRecordsOrError @@ -41,9 +42,9 @@ class DelayedRemoteListOffsets(delayMs: Long, // Mark the status as completed, if there is no async task to track. // If there is a task to track, then build the response as REQUEST_TIMED_OUT by default. statusByPartition.foreachEntry { (topicPartition, status) => - status.completed = status.futureHolderOpt.isEmpty + status.completed(status.futureHolderOpt.isEmpty) if (status.futureHolderOpt.isPresent) { - status.responseOpt = Some(buildErrorResponse(Errors.REQUEST_TIMED_OUT, topicPartition.partition())) + status.responseOpt(Optional.of(buildErrorResponse(Errors.REQUEST_TIMED_OUT, topicPartition.partition()))) } trace(s"Initial partition status for $topicPartition is $status") } @@ -68,7 +69,7 @@ class DelayedRemoteListOffsets(delayMs: Long, override def onComplete(): Unit = { val responseTopics = statusByPartition.groupBy(e => e._1.topic()).map { case (topic, status) => - new ListOffsetsTopicResponse().setName(topic).setPartitions(status.values.flatMap(s => s.responseOpt).toList.asJava) + new ListOffsetsTopicResponse().setName(topic).setPartitions(status.values.flatMap(s => Some(s.responseOpt.get())).toList.asJava) }.toList responseCallback(responseTopics) } @@ -103,13 +104,13 @@ class DelayedRemoteListOffsets(delayMs: Long, } else if (!taskFuture.hasTimestampAndOffset) { val error = status.maybeOffsetsError .map(e => if (version >= 5) Errors.forException(e) else Errors.LEADER_NOT_AVAILABLE) - .getOrElse(Errors.NONE) + .orElse(Errors.NONE) buildErrorResponse(error, partition.partition()) } else { var partitionResponse = buildErrorResponse(Errors.NONE, partition.partition()) val found = taskFuture.timestampAndOffset().get() - if (status.lastFetchableOffset.isDefined && found.offset >= status.lastFetchableOffset.get) { - if (status.maybeOffsetsError.isDefined) { + if (status.lastFetchableOffset.isPresent && found.offset >= status.lastFetchableOffset.get) { + if (status.maybeOffsetsError.isPresent) { val error = if (version >= 5) Errors.forException(status.maybeOffsetsError.get) else Errors.LEADER_NOT_AVAILABLE partitionResponse.setErrorCode(error.code()) } @@ -127,8 +128,8 @@ class DelayedRemoteListOffsets(delayMs: Long, partitionResponse } } - status.responseOpt = Some(response) - status.completed = true + status.responseOpt(Optional.of(response)) + status.completed(true) } completable = completable && futureHolder.taskFuture.isDone } diff --git a/core/src/main/scala/kafka/server/ListOffsetsPartitionStatus.scala b/core/src/main/scala/kafka/server/ListOffsetsPartitionStatus.scala deleted file mode 100644 index 51507e12043..00000000000 --- a/core/src/main/scala/kafka/server/ListOffsetsPartitionStatus.scala +++ /dev/null @@ -1,48 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package kafka.server - -import org.apache.kafka.common.errors.ApiException -import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsPartitionResponse -import org.apache.kafka.storage.internals.log.AsyncOffsetReadFutureHolder -import org.apache.kafka.storage.internals.log.OffsetResultHolder.FileRecordsOrError - -import java.util.Optional - -class ListOffsetsPartitionStatus(val futureHolderOpt: Optional[AsyncOffsetReadFutureHolder[FileRecordsOrError]], - val lastFetchableOffset: Option[Long], - val maybeOffsetsError: Option[ApiException]) { - - @volatile var responseOpt: Option[ListOffsetsPartitionResponse] = None - @volatile var completed = false - - override def toString: String = { - s"[responseOpt: $responseOpt, lastFetchableOffset: $lastFetchableOffset, " + - s"maybeOffsetsError: $maybeOffsetsError, completed: $completed]" - } -} - -object ListOffsetsPartitionStatus { - def apply(responseOpt: Option[ListOffsetsPartitionResponse], - futureHolderOpt: Optional[AsyncOffsetReadFutureHolder[FileRecordsOrError]] = Optional.empty(), - lastFetchableOffset: Option[Long] = None, - maybeOffsetsError: Option[ApiException] = None): ListOffsetsPartitionStatus = { - val status = new ListOffsetsPartitionStatus(futureHolderOpt, lastFetchableOffset, maybeOffsetsError) - status.responseOpt = responseOpt - status - } -} diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 48960e8d08f..aa4d8ae24cc 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -49,7 +49,7 @@ import org.apache.kafka.common.utils.{Exit, Time} import org.apache.kafka.common.{IsolationLevel, Node, TopicIdPartition, TopicPartition, Uuid} import org.apache.kafka.image.{LocalReplicaChanges, MetadataImage, TopicsDelta} import org.apache.kafka.metadata.LeaderConstants.NO_LEADER -import org.apache.kafka.server.{ActionQueue, DelayedActionQueue, common} +import org.apache.kafka.server.{ActionQueue, DelayedActionQueue, ListOffsetsPartitionStatus, common} import org.apache.kafka.server.common.{DirectoryEventHandler, RequestLocal, StopPartition, TopicOptionalIdPartition} import org.apache.kafka.server.metrics.KafkaMetricsGroup import org.apache.kafka.server.network.BrokerEndPoint @@ -1383,9 +1383,11 @@ class ReplicaManager(val config: KafkaConfig, if (duplicatePartitions.contains(topicPartition)) { debug(s"OffsetRequest with correlation id $correlationId from client $clientId on partition $topicPartition " + s"failed because the partition is duplicated in the request.") - statusByPartition += topicPartition -> ListOffsetsPartitionStatus(Some(buildErrorResponse(Errors.INVALID_REQUEST, partition))) + statusByPartition += topicPartition -> + ListOffsetsPartitionStatus.builder().responseOpt(Optional.of(buildErrorResponse(Errors.INVALID_REQUEST, partition))).build() } else if (isListOffsetsTimestampUnsupported(partition.timestamp(), version)) { - statusByPartition += topicPartition -> ListOffsetsPartitionStatus(Some(buildErrorResponse(Errors.UNSUPPORTED_VERSION, partition))) + statusByPartition += topicPartition -> + ListOffsetsPartitionStatus.builder().responseOpt(Optional.of(buildErrorResponse(Errors.UNSUPPORTED_VERSION, partition))).build() } else { try { val fetchOnlyFromLeader = replicaId != ListOffsetsRequest.DEBUGGING_REPLICA_ID @@ -1418,15 +1420,19 @@ class ReplicaManager(val config: KafkaConfig, if (timestampAndOffsetOpt.leaderEpoch.isPresent && version >= 4) partitionResponse.setLeaderEpoch(timestampAndOffsetOpt.leaderEpoch.get) } - ListOffsetsPartitionStatus(Some(partitionResponse)) + ListOffsetsPartitionStatus.builder().responseOpt(Optional.of(partitionResponse)).build() } else if (resultHolder.timestampAndOffsetOpt.isEmpty && resultHolder.futureHolderOpt.isEmpty) { // This is an empty offset response scenario resultHolder.maybeOffsetsError.map(e => throw e) - ListOffsetsPartitionStatus(Some(buildErrorResponse(Errors.NONE, partition))) + ListOffsetsPartitionStatus.builder().responseOpt(Optional.of(buildErrorResponse(Errors.NONE, partition))).build() } else if (resultHolder.timestampAndOffsetOpt.isEmpty && resultHolder.futureHolderOpt.isPresent) { // This case is for topic enabled with remote storage and we want to search the timestamp in // remote storage using async fashion. - ListOffsetsPartitionStatus(None, resultHolder.futureHolderOpt(), resultHolder.lastFetchableOffset.toScala.map(_.longValue()), resultHolder.maybeOffsetsError.toScala) + ListOffsetsPartitionStatus.builder() + .futureHolderOpt(resultHolder.futureHolderOpt()) + .lastFetchableOffset(resultHolder.lastFetchableOffset) + .maybeOffsetsError(resultHolder.maybeOffsetsError) + .build() } else { throw new IllegalStateException(s"Unexpected result holder state $resultHolder") } @@ -1443,19 +1449,22 @@ class ReplicaManager(val config: KafkaConfig, _ : UnsupportedForMessageFormatException) => debug(s"Offset request with correlation id $correlationId from client $clientId on " + s"partition $topicPartition failed due to ${e.getMessage}") - statusByPartition += topicPartition -> ListOffsetsPartitionStatus(Some(buildErrorResponse(Errors.forException(e), partition))) - + statusByPartition += topicPartition -> + ListOffsetsPartitionStatus.builder().responseOpt(Optional.of(buildErrorResponse(Errors.forException(e), partition))).build() // Only V5 and newer ListOffset calls should get OFFSET_NOT_AVAILABLE case e: OffsetNotAvailableException => if (version >= 5) { - statusByPartition += topicPartition -> ListOffsetsPartitionStatus(Some(buildErrorResponse(Errors.forException(e), partition))) + statusByPartition += topicPartition -> + ListOffsetsPartitionStatus.builder().responseOpt(Optional.of(buildErrorResponse(Errors.forException(e), partition))).build() } else { - statusByPartition += topicPartition -> ListOffsetsPartitionStatus(Some(buildErrorResponse(Errors.LEADER_NOT_AVAILABLE, partition))) + statusByPartition += topicPartition -> + ListOffsetsPartitionStatus.builder().responseOpt(Optional.of(buildErrorResponse(Errors.LEADER_NOT_AVAILABLE, partition))).build() } case e: Throwable => error("Error while responding to offset request", e) - statusByPartition += topicPartition -> ListOffsetsPartitionStatus(Some(buildErrorResponse(Errors.forException(e), partition))) + statusByPartition += topicPartition -> + ListOffsetsPartitionStatus.builder().responseOpt(Optional.of(buildErrorResponse(Errors.forException(e), partition))).build() } } } @@ -1473,7 +1482,7 @@ class ReplicaManager(val config: KafkaConfig, // we can respond immediately val responseTopics = statusByPartition.groupBy(e => e._1.topic()).map { case (topic, status) => - new ListOffsetsTopicResponse().setName(topic).setPartitions(status.values.flatMap(s => s.responseOpt).toList.asJava) + new ListOffsetsTopicResponse().setName(topic).setPartitions(status.values.flatMap(s => Some(s.responseOpt.get())).toList.asJava) }.toList responseCallback(responseTopics) } diff --git a/core/src/test/scala/integration/kafka/server/DelayedRemoteListOffsetsTest.scala b/core/src/test/scala/integration/kafka/server/DelayedRemoteListOffsetsTest.scala index 96664d41a80..f40338d3264 100644 --- a/core/src/test/scala/integration/kafka/server/DelayedRemoteListOffsetsTest.scala +++ b/core/src/test/scala/integration/kafka/server/DelayedRemoteListOffsetsTest.scala @@ -22,6 +22,7 @@ import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsTopicR import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.record.FileRecords.TimestampAndOffset import org.apache.kafka.common.requests.ListOffsetsResponse +import org.apache.kafka.server.ListOffsetsPartitionStatus import org.apache.kafka.server.purgatory.{DelayedOperationPurgatory, TopicPartitionOperationKey} import org.apache.kafka.server.util.timer.MockTimer import org.apache.kafka.storage.internals.log.{AsyncOffsetReadFutureHolder, OffsetResultHolder} @@ -76,9 +77,9 @@ class DelayedRemoteListOffsetsTest { }) val statusByPartition = mutable.Map( - new TopicPartition("test", 0) -> ListOffsetsPartitionStatus(None, Optional.of(holder)), - new TopicPartition("test", 1) -> ListOffsetsPartitionStatus(None, Optional.of(holder)), - new TopicPartition("test1", 0) -> ListOffsetsPartitionStatus(None, Optional.of(holder)) + new TopicPartition("test", 0) -> ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(holder)).build(), + new TopicPartition("test", 1) -> ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(holder)).build(), + new TopicPartition("test1", 0) -> ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(holder)).build(), ) val delayedRemoteListOffsets = new DelayedRemoteListOffsets(delayMs, version = 5, statusByPartition, replicaManager, responseCallback) @@ -128,9 +129,9 @@ class DelayedRemoteListOffsetsTest { }) val statusByPartition = mutable.Map( - new TopicPartition("test", 0) -> ListOffsetsPartitionStatus(None, Optional.of(holder)), - new TopicPartition("test", 1) -> ListOffsetsPartitionStatus(None, Optional.of(holder)), - new TopicPartition("test1", 0) -> ListOffsetsPartitionStatus(None, Optional.of(holder)) + new TopicPartition("test", 0) -> ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(holder)).build(), + new TopicPartition("test", 1) -> ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(holder)).build(), + new TopicPartition("test1", 0) -> ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(holder)).build() ) val delayedRemoteListOffsets = new DelayedRemoteListOffsets(delayMs, version = 5, statusByPartition, replicaManager, responseCallback) @@ -184,9 +185,9 @@ class DelayedRemoteListOffsetsTest { when(errorFutureHolder.jobFuture).thenReturn(jobFuture) val statusByPartition = mutable.Map( - new TopicPartition("test", 0) -> ListOffsetsPartitionStatus(None, Optional.of(holder)), - new TopicPartition("test", 1) -> ListOffsetsPartitionStatus(None, Optional.of(holder)), - new TopicPartition("test1", 0) -> ListOffsetsPartitionStatus(None, Optional.of(errorFutureHolder)) + new TopicPartition("test", 0) -> ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(holder)).build(), + new TopicPartition("test", 1) -> ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(holder)).build(), + new TopicPartition("test1", 0) -> ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(errorFutureHolder)).build() ) val delayedRemoteListOffsets = new DelayedRemoteListOffsets(delayMs, version = 5, statusByPartition, replicaManager, responseCallback) @@ -241,10 +242,10 @@ class DelayedRemoteListOffsetsTest { when(errorFutureHolder.jobFuture).thenReturn(jobFuture) val statusByPartition = mutable.Map( - new TopicPartition("test", 0) -> ListOffsetsPartitionStatus(None, Optional.of(holder)), - new TopicPartition("test", 1) -> ListOffsetsPartitionStatus(None, Optional.of(holder)), - new TopicPartition("test1", 0) -> ListOffsetsPartitionStatus(None, Optional.of(errorFutureHolder)), - new TopicPartition("test1", 1) -> ListOffsetsPartitionStatus(None, Optional.of(holder)) + new TopicPartition("test", 0) -> ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(holder)).build(), + new TopicPartition("test", 1) -> ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(holder)).build(), + new TopicPartition("test1", 0) -> ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(errorFutureHolder)).build(), + new TopicPartition("test1", 1) -> ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(holder)).build() ) val delayedRemoteListOffsets = new DelayedRemoteListOffsets(delayMs, version = 5, statusByPartition, replicaManager, responseCallback) diff --git a/server/src/main/java/org/apache/kafka/server/ListOffsetsPartitionStatus.java b/server/src/main/java/org/apache/kafka/server/ListOffsetsPartitionStatus.java new file mode 100644 index 00000000000..b489b820ef0 --- /dev/null +++ b/server/src/main/java/org/apache/kafka/server/ListOffsetsPartitionStatus.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server; + +import org.apache.kafka.common.errors.ApiException; +import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsPartitionResponse; +import org.apache.kafka.storage.internals.log.AsyncOffsetReadFutureHolder; +import org.apache.kafka.storage.internals.log.OffsetResultHolder.FileRecordsOrError; + +import java.util.Optional; + +public class ListOffsetsPartitionStatus { + + private final Optional> futureHolderOpt; + private final Optional lastFetchableOffset; + private final Optional maybeOffsetsError; + + private volatile Optional responseOpt; + private volatile boolean completed = false; + + private ListOffsetsPartitionStatus( + Optional> futureHolderOpt, + Optional lastFetchableOffset, + Optional maybeOffsetsError, + Optional responseOpt + ) { + this.futureHolderOpt = futureHolderOpt; + this.lastFetchableOffset = lastFetchableOffset; + this.maybeOffsetsError = maybeOffsetsError; + this.responseOpt = responseOpt; + } + + public static Builder builder() { + return new Builder(); + } + + public static class Builder { + private Optional> futureHolderOpt = Optional.empty(); + private Optional lastFetchableOffset = Optional.empty(); + private Optional maybeOffsetsError = Optional.empty(); + private volatile Optional responseOpt = Optional.empty(); + + public Builder futureHolderOpt(Optional> futureHolder) { + this.futureHolderOpt = futureHolder; + return this; + } + + public Builder lastFetchableOffset(Optional lastFetchableOffset) { + this.lastFetchableOffset = lastFetchableOffset; + return this; + } + + public Builder maybeOffsetsError(Optional maybeOffsetsError) { + this.maybeOffsetsError = maybeOffsetsError; + return this; + } + + public Builder responseOpt(Optional responseOpt) { + this.responseOpt = responseOpt; + return this; + } + + public ListOffsetsPartitionStatus build() { + return new ListOffsetsPartitionStatus( + futureHolderOpt, + lastFetchableOffset, + maybeOffsetsError, + responseOpt + ); + } + + } + + public Optional> futureHolderOpt() { + return futureHolderOpt; + } + + public Optional lastFetchableOffset() { + return lastFetchableOffset; + } + + public Optional maybeOffsetsError() { + return maybeOffsetsError; + } + + public void responseOpt(Optional responseOpt) { + this.responseOpt = responseOpt; + } + + public Optional responseOpt() { + return responseOpt; + } + + public void completed(boolean completed) { + this.completed = completed; + } + + public boolean completed() { + return completed; + } + + @Override + public String toString() { + return String.format("[responseOpt: %s, lastFetchableOffset: %s, " + + "maybeOffsetsError: %s, completed: %s]", + responseOpt, lastFetchableOffset, maybeOffsetsError, completed); + } +}