KAFKA-17970 Moving some share purgatory classes from core to share module (#17722)

As part of PR: https://github.com/apache/kafka/pull/17636 where purgatory has been moved from core to server-common hence move some existing classes used in Share Fetch to Share module.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Apoorv Mittal 2024-11-09 17:04:17 +00:00 committed by GitHub
parent 81019b6e9f
commit 4966aed40d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 21 additions and 9 deletions

View File

@ -46,6 +46,7 @@
<allow pkg="org.apache.kafka.server.share" />
<subpackage name="fetch">
<allow pkg="org.apache.kafka.server.purgatory" />
<allow class="org.apache.kafka.server.storage.log.FetchParams"/>
</subpackage>

View File

@ -25,6 +25,7 @@ import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.server.purgatory.DelayedOperation;
import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey;
import org.apache.kafka.server.share.fetch.ShareFetchData;
import org.apache.kafka.server.storage.log.FetchIsolation;
import org.apache.kafka.server.storage.log.FetchPartitionData;

View File

@ -36,6 +36,8 @@ import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.coordinator.group.GroupConfigManager;
import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch;
import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey;
import org.apache.kafka.server.share.fetch.DelayedShareFetchKey;
import org.apache.kafka.server.share.fetch.ShareAcquiredRecords;
import org.apache.kafka.server.share.persister.GroupTopicPartitionData;
import org.apache.kafka.server.share.persister.PartitionAllData;

View File

@ -46,6 +46,9 @@ import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch;
import org.apache.kafka.server.share.context.FinalContext;
import org.apache.kafka.server.share.context.ShareFetchContext;
import org.apache.kafka.server.share.context.ShareSessionContext;
import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey;
import org.apache.kafka.server.share.fetch.DelayedShareFetchKey;
import org.apache.kafka.server.share.fetch.DelayedShareFetchPartitionKey;
import org.apache.kafka.server.share.fetch.ShareFetchData;
import org.apache.kafka.server.share.persister.Persister;
import org.apache.kafka.server.share.session.ShareSession;

View File

@ -25,7 +25,7 @@ import kafka.log._
import kafka.log.remote.RemoteLogManager
import kafka.server._
import kafka.server.metadata.{KRaftMetadataCache, ZkMetadataCache}
import kafka.server.share.{DelayedShareFetch, DelayedShareFetchPartitionKey}
import kafka.server.share.DelayedShareFetch
import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
import kafka.utils._
import kafka.zookeeper.ZooKeeperClientException
@ -46,6 +46,7 @@ import org.apache.kafka.server.common.{MetadataVersion, RequestLocal}
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, LeaderHwChange, LogAppendInfo, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogReadInfo, LogStartOffsetIncrementReason, VerificationGuard}
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.purgatory.{DelayedOperationPurgatory, TopicPartitionOperationKey}
import org.apache.kafka.server.share.fetch.DelayedShareFetchPartitionKey
import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams}
import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpoints
import org.slf4j.event.Level

View File

@ -25,7 +25,7 @@ import kafka.server.HostedPartition.Online
import kafka.server.QuotaFactory.QuotaManagers
import kafka.server.ReplicaManager.{AtMinIsrPartitionCountMetricName, FailedIsrUpdatesPerSecMetricName, IsrExpandsPerSecMetricName, IsrShrinksPerSecMetricName, LeaderCountMetricName, OfflineReplicaCountMetricName, PartitionCountMetricName, PartitionsWithLateTransactionsCountMetricName, ProducerIdCountMetricName, ReassigningPartitionsMetricName, UnderMinIsrPartitionCountMetricName, UnderReplicatedPartitionsMetricName, createLogReadResult, isListOffsetsTimestampUnsupported}
import kafka.server.metadata.ZkMetadataCache
import kafka.server.share.{DelayedShareFetch, DelayedShareFetchKey, DelayedShareFetchPartitionKey}
import kafka.server.share.DelayedShareFetch
import kafka.utils._
import kafka.zk.KafkaZkClient
import org.apache.kafka.common.errors._
@ -61,6 +61,7 @@ import org.apache.kafka.server.common.MetadataVersion._
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.network.BrokerEndPoint
import org.apache.kafka.server.purgatory.{DelayedOperationKey, DelayedOperationPurgatory, TopicPartitionOperationKey}
import org.apache.kafka.server.share.fetch.{DelayedShareFetchKey, DelayedShareFetchPartitionKey}
import org.apache.kafka.server.storage.log.{FetchParams, FetchPartitionData}
import org.apache.kafka.server.util.{Scheduler, ShutdownableThread}
import org.apache.kafka.storage.internals.checkpoint.{LazyOffsetCheckpoints, OffsetCheckpointFile, OffsetCheckpoints}

View File

@ -29,6 +29,7 @@ import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.server.purgatory.DelayedOperationKey;
import org.apache.kafka.server.purgatory.DelayedOperationPurgatory;
import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey;
import org.apache.kafka.server.share.fetch.ShareAcquiredRecords;
import org.apache.kafka.server.share.fetch.ShareFetchData;
import org.apache.kafka.server.storage.log.FetchIsolation;
@ -72,7 +73,6 @@ import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.when;
public class DelayedShareFetchTest {
private static final int MAX_WAIT_MS = 5000;
private static final int MAX_FETCH_RECORDS = 100;

View File

@ -60,6 +60,8 @@ import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch;
import org.apache.kafka.server.share.context.FinalContext;
import org.apache.kafka.server.share.context.ShareFetchContext;
import org.apache.kafka.server.share.context.ShareSessionContext;
import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey;
import org.apache.kafka.server.share.fetch.DelayedShareFetchKey;
import org.apache.kafka.server.share.fetch.ShareAcquiredRecords;
import org.apache.kafka.server.share.fetch.ShareFetchData;
import org.apache.kafka.server.share.persister.NoOpShareStatePersister;

View File

@ -46,7 +46,7 @@ import java.nio.ByteBuffer
import java.util.Optional
import java.util.concurrent.{ConcurrentHashMap, CountDownLatch, Semaphore}
import kafka.server.metadata.{KRaftMetadataCache, ZkMetadataCache}
import kafka.server.share.{DelayedShareFetch, DelayedShareFetchPartitionKey}
import kafka.server.share.DelayedShareFetch
import org.apache.kafka.clients.ClientResponse
import org.apache.kafka.common.compress.Compression
import org.apache.kafka.common.config.TopicConfig
@ -59,6 +59,7 @@ import org.apache.kafka.server.common.{ControllerRequestCompletionHandler, Metad
import org.apache.kafka.server.common.MetadataVersion.IBP_2_6_IV0
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.kafka.server.purgatory.{DelayedOperationPurgatory, TopicPartitionOperationKey}
import org.apache.kafka.server.share.fetch.DelayedShareFetchPartitionKey
import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams}
import org.apache.kafka.server.util.{KafkaScheduler, MockTime}
import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpoints

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server.share;
package org.apache.kafka.server.share.fetch;
import org.apache.kafka.common.Uuid;
@ -28,7 +28,7 @@ public class DelayedShareFetchGroupKey implements DelayedShareFetchKey {
private final Uuid topicId;
private final int partition;
DelayedShareFetchGroupKey(String groupId, Uuid topicId, int partition) {
public DelayedShareFetchGroupKey(String groupId, Uuid topicId, int partition) {
this.groupId = groupId;
this.topicId = topicId;
this.partition = partition;

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server.share;
package org.apache.kafka.server.share.fetch;
import org.apache.kafka.server.purgatory.DelayedOperationKey;

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server.share;
package org.apache.kafka.server.share.fetch;
import org.apache.kafka.common.Uuid;

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server.share;
package org.apache.kafka.server.share.fetch;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;