MINOR Refactoring share fetch code (KIP-932) (#17269)

Reviewers: Andrew Schofield <aschofield@confluent.io>, David Arthur <mumrah@gmail.com>
This commit is contained in:
Apoorv Mittal 2024-09-26 13:09:31 +01:00 committed by GitHub
parent bd94a739ef
commit 6abbd548b8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 297 additions and 283 deletions

View File

@ -954,6 +954,7 @@ project(':share') {
dependencies {
implementation project(':server-common')
implementation project(':storage')
implementation libs.slf4jApi

View File

@ -43,6 +43,10 @@
<subpackage name="server">
<subpackage name="share">
<allow pkg="org.apache.kafka.server.share" />
<subpackage name="fetch">
<allow class="org.apache.kafka.storage.internals.log.FetchParams"/>
</subpackage>
</subpackage>
</subpackage>

View File

@ -23,6 +23,8 @@ import kafka.server.ReplicaManager;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.server.share.SharePartitionKey;
import org.apache.kafka.server.share.fetch.ShareFetchData;
import org.apache.kafka.storage.internals.log.FetchPartitionData;
import org.slf4j.Logger;
@ -46,19 +48,20 @@ import scala.runtime.BoxedUnit;
* A delayed share fetch operation has been introduced in case there is a share fetch request which cannot be completed instantaneously.
*/
public class DelayedShareFetch extends DelayedOperation {
private final SharePartitionManager.ShareFetchPartitionData shareFetchPartitionData;
private final ShareFetchData shareFetchData;
private final ReplicaManager replicaManager;
private final Map<SharePartitionManager.SharePartitionKey, SharePartition> partitionCacheMap;
private final Map<SharePartitionKey, SharePartition> partitionCacheMap;
private Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionDataFromTryComplete = new LinkedHashMap<>();
private static final Logger log = LoggerFactory.getLogger(DelayedShareFetch.class);
DelayedShareFetch(
SharePartitionManager.ShareFetchPartitionData shareFetchPartitionData,
ShareFetchData shareFetchData,
ReplicaManager replicaManager,
Map<SharePartitionManager.SharePartitionKey, SharePartition> partitionCacheMap) {
super(shareFetchPartitionData.fetchParams().maxWaitMs, Option.empty());
this.shareFetchPartitionData = shareFetchPartitionData;
Map<SharePartitionKey, SharePartition> partitionCacheMap) {
super(shareFetchData.fetchParams().maxWaitMs, Option.empty());
this.shareFetchData = shareFetchData;
this.replicaManager = replicaManager;
this.partitionCacheMap = partitionCacheMap;
}
@ -75,10 +78,10 @@ public class DelayedShareFetch extends DelayedOperation {
@Override
public void onComplete() {
log.trace("Completing the delayed share fetch request for group {}, member {}, " +
"topic partitions {}", shareFetchPartitionData.groupId(),
shareFetchPartitionData.memberId(), shareFetchPartitionData.partitionMaxBytes().keySet());
"topic partitions {}", shareFetchData.groupId(),
shareFetchData.memberId(), shareFetchData.partitionMaxBytes().keySet());
if (shareFetchPartitionData.future().isDone())
if (shareFetchData.future().isDone())
return;
Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData;
@ -91,14 +94,14 @@ public class DelayedShareFetch extends DelayedOperation {
try {
if (topicPartitionData.isEmpty()) {
// No locks for share partitions could be acquired, so we complete the request with an empty response.
shareFetchPartitionData.future().complete(Collections.emptyMap());
shareFetchData.future().complete(Collections.emptyMap());
return;
}
log.trace("Fetchable share partitions data: {} with groupId: {} fetch params: {}",
topicPartitionData, shareFetchPartitionData.groupId(), shareFetchPartitionData.fetchParams());
topicPartitionData, shareFetchData.groupId(), shareFetchData.fetchParams());
Seq<Tuple2<TopicIdPartition, LogReadResult>> responseLogResult = replicaManager.readFromLog(
shareFetchPartitionData.fetchParams(),
shareFetchData.fetchParams(),
CollectionConverters.asScala(
topicPartitionData.entrySet().stream().map(entry ->
new Tuple2<>(entry.getKey(), entry.getValue())).collect(Collectors.toList())
@ -116,23 +119,23 @@ public class DelayedShareFetch extends DelayedOperation {
});
log.trace("Data successfully retrieved by replica manager: {}", responseData);
ShareFetchUtils.processFetchResponse(shareFetchPartitionData, responseData, partitionCacheMap, replicaManager)
ShareFetchUtils.processFetchResponse(shareFetchData, responseData, partitionCacheMap, replicaManager)
.whenComplete((result, throwable) -> {
if (throwable != null) {
log.error("Error processing fetch response for share partitions", throwable);
shareFetchPartitionData.future().completeExceptionally(throwable);
shareFetchData.future().completeExceptionally(throwable);
} else {
shareFetchPartitionData.future().complete(result);
shareFetchData.future().complete(result);
}
// Releasing the lock to move ahead with the next request in queue.
releasePartitionLocks(shareFetchPartitionData.groupId(), topicPartitionData.keySet());
releasePartitionLocks(shareFetchData.groupId(), topicPartitionData.keySet());
});
} catch (Exception e) {
// Release the locks acquired for the partitions in the share fetch request in case there is an exception
log.error("Error processing delayed share fetch request", e);
shareFetchPartitionData.future().completeExceptionally(e);
releasePartitionLocks(shareFetchPartitionData.groupId(), topicPartitionData.keySet());
shareFetchData.future().completeExceptionally(e);
releasePartitionLocks(shareFetchData.groupId(), topicPartitionData.keySet());
}
}
@ -142,16 +145,16 @@ public class DelayedShareFetch extends DelayedOperation {
@Override
public boolean tryComplete() {
log.trace("Try to complete the delayed share fetch request for group {}, member {}, topic partitions {}",
shareFetchPartitionData.groupId(), shareFetchPartitionData.memberId(),
shareFetchPartitionData.partitionMaxBytes().keySet());
shareFetchData.groupId(), shareFetchData.memberId(),
shareFetchData.partitionMaxBytes().keySet());
topicPartitionDataFromTryComplete = acquirablePartitions();
if (!topicPartitionDataFromTryComplete.isEmpty())
return forceComplete();
log.info("Can't acquire records for any partition in the share fetch request for group {}, member {}, " +
"topic partitions {}", shareFetchPartitionData.groupId(),
shareFetchPartitionData.memberId(), shareFetchPartitionData.partitionMaxBytes().keySet());
"topic partitions {}", shareFetchData.groupId(),
shareFetchData.memberId(), shareFetchData.partitionMaxBytes().keySet());
return false;
}
@ -163,11 +166,11 @@ public class DelayedShareFetch extends DelayedOperation {
// Initialize the topic partitions for which the fetch should be attempted.
Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData = new LinkedHashMap<>();
shareFetchPartitionData.partitionMaxBytes().keySet().forEach(topicIdPartition -> {
SharePartition sharePartition = partitionCacheMap.get(new SharePartitionManager.SharePartitionKey(
shareFetchPartitionData.groupId(), topicIdPartition));
shareFetchData.partitionMaxBytes().keySet().forEach(topicIdPartition -> {
SharePartition sharePartition = partitionCacheMap.get(new SharePartitionKey(
shareFetchData.groupId(), topicIdPartition));
int partitionMaxBytes = shareFetchPartitionData.partitionMaxBytes().getOrDefault(topicIdPartition, 0);
int partitionMaxBytes = shareFetchData.partitionMaxBytes().getOrDefault(topicIdPartition, 0);
// Add the share partition to the list of partitions to be fetched only if we can
// acquire the fetch lock on it.
if (sharePartition.maybeAcquireFetchLock()) {
@ -195,6 +198,6 @@ public class DelayedShareFetch extends DelayedOperation {
private void releasePartitionLocks(String groupId, Set<TopicIdPartition> topicIdPartitions) {
topicIdPartitions.forEach(tp -> partitionCacheMap.get(new
SharePartitionManager.SharePartitionKey(groupId, tp)).releaseFetchLock());
SharePartitionKey(groupId, tp)).releaseFetchLock());
}
}

View File

@ -19,19 +19,17 @@ package kafka.server.share;
import kafka.server.DelayedOperationKey;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.server.share.SharePartitionKey;
import java.util.Objects;
/**
* A key for delayed operations that fetch data for share consumers.
*/
public class DelayedShareFetchKey implements DelayedOperationKey {
private final String groupId;
private final TopicIdPartition topicIdPartition;
public class DelayedShareFetchKey extends SharePartitionKey implements DelayedOperationKey {
DelayedShareFetchKey(String groupId, TopicIdPartition topicIdPartition) {
this.groupId = groupId;
this.topicIdPartition = topicIdPartition;
super(groupId, topicIdPartition);
}
@Override

View File

@ -23,6 +23,8 @@ import org.apache.kafka.common.message.ShareFetchResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.FileRecords;
import org.apache.kafka.common.requests.ListOffsetsRequest;
import org.apache.kafka.server.share.SharePartitionKey;
import org.apache.kafka.server.share.fetch.ShareFetchData;
import org.apache.kafka.storage.internals.log.FetchPartitionData;
import org.slf4j.Logger;
@ -45,20 +47,20 @@ public class ShareFetchUtils {
// Process the replica manager fetch response to update share partitions and futures. We acquire the fetched data
// from share partitions.
static CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>> processFetchResponse(
SharePartitionManager.ShareFetchPartitionData shareFetchPartitionData,
ShareFetchData shareFetchData,
Map<TopicIdPartition, FetchPartitionData> responseData,
Map<SharePartitionManager.SharePartitionKey, SharePartition> partitionCacheMap,
Map<SharePartitionKey, SharePartition> partitionCacheMap,
ReplicaManager replicaManager
) {
Map<TopicIdPartition, CompletableFuture<ShareFetchResponseData.PartitionData>> futures = new HashMap<>();
responseData.forEach((topicIdPartition, fetchPartitionData) -> {
SharePartition sharePartition = partitionCacheMap.get(new SharePartitionManager.SharePartitionKey(
shareFetchPartitionData.groupId(), topicIdPartition));
futures.put(topicIdPartition, sharePartition.acquire(shareFetchPartitionData.memberId(), fetchPartitionData)
SharePartition sharePartition = partitionCacheMap.get(new SharePartitionKey(
shareFetchData.groupId(), topicIdPartition));
futures.put(topicIdPartition, sharePartition.acquire(shareFetchData.memberId(), fetchPartitionData)
.handle((acquiredRecords, throwable) -> {
log.trace("Acquired records for topicIdPartition: {} with share fetch data: {}, records: {}",
topicIdPartition, shareFetchPartitionData, acquiredRecords);
topicIdPartition, shareFetchData, acquiredRecords);
ShareFetchResponseData.PartitionData partitionData = new ShareFetchResponseData.PartitionData()
.setPartitionIndex(topicIdPartition.partition());

View File

@ -41,9 +41,9 @@ import org.apache.kafka.server.share.PartitionStateBatchData;
import org.apache.kafka.server.share.Persister;
import org.apache.kafka.server.share.PersisterStateBatch;
import org.apache.kafka.server.share.ReadShareGroupStateParameters;
import org.apache.kafka.server.share.ShareAcknowledgementBatch;
import org.apache.kafka.server.share.TopicData;
import org.apache.kafka.server.share.WriteShareGroupStateParameters;
import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch;
import org.apache.kafka.server.util.timer.Timer;
import org.apache.kafka.server.util.timer.TimerTask;
import org.apache.kafka.storage.internals.log.FetchPartitionData;

View File

@ -40,10 +40,12 @@ import org.apache.kafka.common.utils.ImplicitLinkedHashCollection;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.share.CachedSharePartition;
import org.apache.kafka.server.share.Persister;
import org.apache.kafka.server.share.ShareAcknowledgementBatch;
import org.apache.kafka.server.share.SharePartitionKey;
import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch;
import org.apache.kafka.server.share.context.FinalContext;
import org.apache.kafka.server.share.context.ShareFetchContext;
import org.apache.kafka.server.share.context.ShareSessionContext;
import org.apache.kafka.server.share.fetch.ShareFetchData;
import org.apache.kafka.server.share.session.ShareSession;
import org.apache.kafka.server.share.session.ShareSessionCache;
import org.apache.kafka.server.share.session.ShareSessionKey;
@ -103,7 +105,7 @@ public class SharePartitionManager implements AutoCloseable {
/**
* The fetch queue stores the share fetch requests that are waiting to be processed.
*/
private final ConcurrentLinkedQueue<ShareFetchPartitionData> fetchQueue;
private final ConcurrentLinkedQueue<ShareFetchData> fetchQueue;
/**
* The process fetch queue lock is used to ensure that only one thread is processing the fetch queue at a time.
@ -203,7 +205,7 @@ public class SharePartitionManager implements AutoCloseable {
Time time,
ShareSessionCache cache,
Map<SharePartitionKey, SharePartition> partitionCacheMap,
ConcurrentLinkedQueue<ShareFetchPartitionData> fetchQueue,
ConcurrentLinkedQueue<ShareFetchData> fetchQueue,
int recordLockDurationMs,
Timer timer,
int maxDeliveryCount,
@ -248,8 +250,8 @@ public class SharePartitionManager implements AutoCloseable {
partitionMaxBytes.keySet(), groupId, fetchParams);
CompletableFuture<Map<TopicIdPartition, PartitionData>> future = new CompletableFuture<>();
ShareFetchPartitionData shareFetchPartitionData = new ShareFetchPartitionData(fetchParams, groupId, memberId, future, partitionMaxBytes);
fetchQueue.add(shareFetchPartitionData);
ShareFetchData shareFetchData = new ShareFetchData(fetchParams, groupId, memberId, future, partitionMaxBytes);
fetchQueue.add(shareFetchData);
maybeProcessFetchQueue();
return future;
@ -518,8 +520,9 @@ public class SharePartitionManager implements AutoCloseable {
// Add the share fetch request to the delayed share fetch purgatory to process the fetch request if it can be
// completed else watch until it can be completed/timeout.
private void addDelayedShareFetch(DelayedShareFetch delayedShareFetch, Set<Object> keys) {
delayedShareFetchPurgatory.tryCompleteElseWatch(delayedShareFetch, CollectionConverters.asScala(keys).toSeq());
private void addDelayedShareFetch(DelayedShareFetch delayedShareFetch, Set<DelayedShareFetchKey> keys) {
delayedShareFetchPurgatory.tryCompleteElseWatch(delayedShareFetch,
CollectionConverters.asScala(keys).toSeq().indices());
}
@Override
@ -529,7 +532,7 @@ public class SharePartitionManager implements AutoCloseable {
this.persister.stop();
if (!fetchQueue.isEmpty()) {
log.warn("Closing SharePartitionManager with pending fetch requests count: {}", fetchQueue.size());
fetchQueue.forEach(shareFetchPartitionData -> shareFetchPartitionData.future.completeExceptionally(
fetchQueue.forEach(shareFetchData -> shareFetchData.future().completeExceptionally(
Errors.BROKER_NOT_AVAILABLE.exception()));
fetchQueue.clear();
}
@ -553,17 +556,17 @@ public class SharePartitionManager implements AutoCloseable {
return;
}
ShareFetchPartitionData shareFetchPartitionData = fetchQueue.poll();
if (shareFetchPartitionData == null) {
ShareFetchData shareFetchData = fetchQueue.poll();
if (shareFetchData == null) {
// No more requests to process, so release the lock. Though we should not reach here as the lock
// is acquired only when there are requests in the queue. But still, it's safe to release the lock.
releaseProcessFetchQueueLock();
return;
}
if (shareFetchPartitionData.partitionMaxBytes.isEmpty()) {
if (shareFetchData.partitionMaxBytes().isEmpty()) {
// If there are no partitions to fetch then complete the future with an empty map.
shareFetchPartitionData.future.complete(Collections.emptyMap());
shareFetchData.future().complete(Collections.emptyMap());
// Release the lock so that other threads can process the queue.
releaseProcessFetchQueueLock();
if (!fetchQueue.isEmpty())
@ -572,9 +575,9 @@ public class SharePartitionManager implements AutoCloseable {
}
try {
shareFetchPartitionData.partitionMaxBytes.keySet().forEach(topicIdPartition -> {
shareFetchData.partitionMaxBytes().keySet().forEach(topicIdPartition -> {
SharePartitionKey sharePartitionKey = sharePartitionKey(
shareFetchPartitionData.groupId,
shareFetchData.groupId(),
topicIdPartition
);
SharePartition sharePartition = fetchSharePartition(sharePartitionKey);
@ -585,19 +588,19 @@ public class SharePartitionManager implements AutoCloseable {
// TopicPartitionData list will be populated only if the share partition is already initialized.
sharePartition.maybeInitialize().whenComplete((result, throwable) -> {
if (throwable != null) {
maybeCompleteInitializationWithException(sharePartitionKey, shareFetchPartitionData.future, throwable);
maybeCompleteInitializationWithException(sharePartitionKey, shareFetchData.future(), throwable);
return;
}
});
});
Set<Object> delayedShareFetchWatchKeys = new HashSet<>();
shareFetchPartitionData.partitionMaxBytes.keySet().forEach(
Set<DelayedShareFetchKey> delayedShareFetchWatchKeys = new HashSet<>();
shareFetchData.partitionMaxBytes().keySet().forEach(
topicIdPartition -> delayedShareFetchWatchKeys.add(
new DelayedShareFetchKey(shareFetchPartitionData.groupId, topicIdPartition)));
new DelayedShareFetchKey(shareFetchData.groupId(), topicIdPartition)));
// Add the share fetch to the delayed share fetch purgatory to process the fetch request.
addDelayedShareFetch(new DelayedShareFetch(shareFetchPartitionData, replicaManager, partitionCacheMap),
addDelayedShareFetch(new DelayedShareFetch(shareFetchData, replicaManager, partitionCacheMap),
delayedShareFetchWatchKeys);
// Release the lock so that other threads can process the queue.
@ -621,8 +624,8 @@ public class SharePartitionManager implements AutoCloseable {
k -> {
long start = time.hiResClockMs();
SharePartition partition = new SharePartition(
sharePartitionKey.groupId,
sharePartitionKey.topicIdPartition,
sharePartitionKey.groupId(),
sharePartitionKey.topicIdPartition(),
maxInFlightMessages,
maxDeliveryCount,
recordLockDurationMs,
@ -679,89 +682,6 @@ public class SharePartitionManager implements AutoCloseable {
return new SharePartitionKey(groupId, topicIdPartition);
}
/**
* The SharePartitionKey is used to uniquely identify a share partition. The key is made up of the
* share group id, the topic id and the partition id. The key is used to store the SharePartition
* objects in the partition cache map.
*/
// Visible for testing
static class SharePartitionKey {
private final String groupId;
private final TopicIdPartition topicIdPartition;
public SharePartitionKey(String groupId, TopicIdPartition topicIdPartition) {
this.groupId = Objects.requireNonNull(groupId);
this.topicIdPartition = Objects.requireNonNull(topicIdPartition);
}
@Override
public int hashCode() {
return Objects.hash(groupId, topicIdPartition);
}
@Override
public boolean equals(final Object obj) {
if (this == obj)
return true;
else if (obj == null || getClass() != obj.getClass())
return false;
else {
SharePartitionKey that = (SharePartitionKey) obj;
return groupId.equals(that.groupId) && Objects.equals(topicIdPartition, that.topicIdPartition);
}
}
@Override
public String toString() {
return "SharePartitionKey{" +
"groupId='" + groupId +
", topicIdPartition=" + topicIdPartition +
'}';
}
}
/**
* The ShareFetchPartitionData class is used to store the fetch parameters for a share fetch request.
*/
// Visible for testing
static class ShareFetchPartitionData {
private final FetchParams fetchParams;
private final String groupId;
private final String memberId;
private final CompletableFuture<Map<TopicIdPartition, PartitionData>> future;
private final Map<TopicIdPartition, Integer> partitionMaxBytes;
public ShareFetchPartitionData(FetchParams fetchParams, String groupId, String memberId,
CompletableFuture<Map<TopicIdPartition, PartitionData>> future,
Map<TopicIdPartition, Integer> partitionMaxBytes) {
this.fetchParams = fetchParams;
this.groupId = groupId;
this.memberId = memberId;
this.future = future;
this.partitionMaxBytes = partitionMaxBytes;
}
public String groupId() {
return groupId;
}
public String memberId() {
return memberId;
}
public CompletableFuture<Map<TopicIdPartition, PartitionData>> future() {
return future;
}
public Map<TopicIdPartition, Integer> partitionMaxBytes() {
return partitionMaxBytes;
}
public FetchParams fetchParams() {
return fetchParams;
}
}
static class ShareGroupMetrics {
/**
* share-acknowledgement (share-acknowledgement-rate and share-acknowledgement-count) - The total number of offsets acknowledged for share groups (requests to be ack).

View File

@ -77,7 +77,8 @@ import org.apache.kafka.server.common.{GroupVersion, MetadataVersion, RequestLoc
import org.apache.kafka.server.common.MetadataVersion.{IBP_0_11_0_IV0, IBP_2_3_IV0}
import org.apache.kafka.server.record.BrokerCompressionType
import org.apache.kafka.server.share.context.ShareFetchContext
import org.apache.kafka.server.share.{ErroneousAndValidPartitionData, ShareAcknowledgementBatch}
import org.apache.kafka.server.share.ErroneousAndValidPartitionData
import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchIsolation, FetchParams, FetchPartitionData}
import org.apache.kafka.storage.log.metrics.BrokerTopicStats

View File

@ -25,6 +25,8 @@ import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.ShareFetchResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.server.share.SharePartitionKey;
import org.apache.kafka.server.share.fetch.ShareFetchData;
import org.apache.kafka.storage.internals.log.FetchIsolation;
import org.apache.kafka.storage.internals.log.FetchParams;
@ -68,11 +70,11 @@ public class DelayedShareFetchTest {
when(sp0.maybeAcquireFetchLock()).thenReturn(true);
when(sp1.maybeAcquireFetchLock()).thenReturn(true);
Map<SharePartitionManager.SharePartitionKey, SharePartition> partitionCacheMap = new ConcurrentHashMap<>();
partitionCacheMap.put(new SharePartitionManager.SharePartitionKey(groupId, tp0), sp0);
partitionCacheMap.put(new SharePartitionManager.SharePartitionKey(groupId, tp1), sp1);
Map<SharePartitionKey, SharePartition> partitionCacheMap = new ConcurrentHashMap<>();
partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0);
partitionCacheMap.put(new SharePartitionKey(groupId, tp1), sp1);
SharePartitionManager.ShareFetchPartitionData shareFetchPartitionData = new SharePartitionManager.ShareFetchPartitionData(
ShareFetchData shareFetchData = new ShareFetchData(
new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS,
1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty()), groupId, Uuid.randomUuid().toString(),
new CompletableFuture<>(), partitionMaxBytes);
@ -80,7 +82,7 @@ public class DelayedShareFetchTest {
when(sp0.canAcquireRecords()).thenReturn(false);
when(sp1.canAcquireRecords()).thenReturn(false);
DelayedShareFetch delayedShareFetch = DelayedShareFetchBuilder.builder()
.withShareFetchPartitionData(shareFetchPartitionData)
.withShareFetchData(shareFetchData)
.withPartitionCacheMap(partitionCacheMap)
.build();
@ -105,11 +107,11 @@ public class DelayedShareFetchTest {
when(sp0.maybeAcquireFetchLock()).thenReturn(true);
when(sp1.maybeAcquireFetchLock()).thenReturn(true);
Map<SharePartitionManager.SharePartitionKey, SharePartition> partitionCacheMap = new ConcurrentHashMap<>();
partitionCacheMap.put(new SharePartitionManager.SharePartitionKey(groupId, tp0), sp0);
partitionCacheMap.put(new SharePartitionManager.SharePartitionKey(groupId, tp1), sp1);
Map<SharePartitionKey, SharePartition> partitionCacheMap = new ConcurrentHashMap<>();
partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0);
partitionCacheMap.put(new SharePartitionKey(groupId, tp1), sp1);
SharePartitionManager.ShareFetchPartitionData shareFetchPartitionData = new SharePartitionManager.ShareFetchPartitionData(
ShareFetchData shareFetchData = new ShareFetchData(
new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS,
1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty()), groupId, Uuid.randomUuid().toString(),
new CompletableFuture<>(), partitionMaxBytes);
@ -117,7 +119,7 @@ public class DelayedShareFetchTest {
when(sp0.canAcquireRecords()).thenReturn(true);
when(sp1.canAcquireRecords()).thenReturn(false);
DelayedShareFetch delayedShareFetch = DelayedShareFetchBuilder.builder()
.withShareFetchPartitionData(shareFetchPartitionData)
.withShareFetchData(shareFetchData)
.withPartitionCacheMap(partitionCacheMap)
.build();
assertFalse(delayedShareFetch.isCompleted());
@ -144,11 +146,11 @@ public class DelayedShareFetchTest {
when(sp0.maybeAcquireFetchLock()).thenReturn(true);
when(sp1.maybeAcquireFetchLock()).thenReturn(true);
Map<SharePartitionManager.SharePartitionKey, SharePartition> partitionCacheMap = new ConcurrentHashMap<>();
partitionCacheMap.put(new SharePartitionManager.SharePartitionKey(groupId, tp0), sp0);
partitionCacheMap.put(new SharePartitionManager.SharePartitionKey(groupId, tp1), sp1);
Map<SharePartitionKey, SharePartition> partitionCacheMap = new ConcurrentHashMap<>();
partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0);
partitionCacheMap.put(new SharePartitionKey(groupId, tp1), sp1);
SharePartitionManager.ShareFetchPartitionData shareFetchPartitionData = new SharePartitionManager.ShareFetchPartitionData(
ShareFetchData shareFetchData = new ShareFetchData(
new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS,
1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty()), groupId, Uuid.randomUuid().toString(),
new CompletableFuture<>(), partitionMaxBytes);
@ -156,7 +158,7 @@ public class DelayedShareFetchTest {
when(sp0.canAcquireRecords()).thenReturn(false);
when(sp1.canAcquireRecords()).thenReturn(false);
DelayedShareFetch delayedShareFetch = DelayedShareFetchBuilder.builder()
.withShareFetchPartitionData(shareFetchPartitionData)
.withShareFetchData(shareFetchData)
.withReplicaManager(replicaManager)
.withPartitionCacheMap(partitionCacheMap)
.build();
@ -164,7 +166,7 @@ public class DelayedShareFetchTest {
delayedShareFetch.forceComplete();
// Since no partition could be acquired, the future should be empty and replicaManager.readFromLog should not be called.
assertEquals(0, shareFetchPartitionData.future().join().size());
assertEquals(0, shareFetchData.future().join().size());
Mockito.verify(replicaManager, times(0)).readFromLog(
any(), any(), any(ReplicaQuota.class), anyBoolean());
assertTrue(delayedShareFetch.isCompleted());
@ -187,11 +189,11 @@ public class DelayedShareFetchTest {
when(sp0.maybeAcquireFetchLock()).thenReturn(true);
when(sp1.maybeAcquireFetchLock()).thenReturn(true);
Map<SharePartitionManager.SharePartitionKey, SharePartition> partitionCacheMap = new ConcurrentHashMap<>();
partitionCacheMap.put(new SharePartitionManager.SharePartitionKey(groupId, tp0), sp0);
partitionCacheMap.put(new SharePartitionManager.SharePartitionKey(groupId, tp1), sp1);
Map<SharePartitionKey, SharePartition> partitionCacheMap = new ConcurrentHashMap<>();
partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0);
partitionCacheMap.put(new SharePartitionKey(groupId, tp1), sp1);
SharePartitionManager.ShareFetchPartitionData shareFetchPartitionData = new SharePartitionManager.ShareFetchPartitionData(
ShareFetchData shareFetchData = new ShareFetchData(
new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS,
1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty()), groupId, Uuid.randomUuid().toString(),
new CompletableFuture<>(), partitionMaxBytes);
@ -199,7 +201,7 @@ public class DelayedShareFetchTest {
when(sp0.canAcquireRecords()).thenReturn(true);
when(sp1.canAcquireRecords()).thenReturn(false);
DelayedShareFetch delayedShareFetch = DelayedShareFetchBuilder.builder()
.withShareFetchPartitionData(shareFetchPartitionData)
.withShareFetchData(shareFetchData)
.withReplicaManager(replicaManager)
.withPartitionCacheMap(partitionCacheMap)
.build();
@ -225,11 +227,11 @@ public class DelayedShareFetchTest {
SharePartition sp0 = mock(SharePartition.class);
Map<SharePartitionManager.SharePartitionKey, SharePartition> partitionCacheMap = new ConcurrentHashMap<>();
partitionCacheMap.put(new SharePartitionManager.SharePartitionKey(groupId, tp0), sp0);
Map<SharePartitionKey, SharePartition> partitionCacheMap = new ConcurrentHashMap<>();
partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0);
CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>> future = new CompletableFuture<>();
SharePartitionManager.ShareFetchPartitionData shareFetchPartitionData = new SharePartitionManager.ShareFetchPartitionData(
ShareFetchData shareFetchData = new ShareFetchData(
new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS,
1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty()), groupId, Uuid.randomUuid().toString(),
future, partitionMaxBytes);
@ -238,7 +240,7 @@ public class DelayedShareFetchTest {
when(sp0.canAcquireRecords()).thenReturn(false);
DelayedShareFetch delayedShareFetch = spy(DelayedShareFetchBuilder.builder()
.withShareFetchPartitionData(shareFetchPartitionData)
.withShareFetchData(shareFetchData)
.withReplicaManager(replicaManager)
.withPartitionCacheMap(partitionCacheMap)
.build());
@ -249,7 +251,7 @@ public class DelayedShareFetchTest {
assertTrue(delayedShareFetch.isCompleted());
// Verifying that the first forceComplete calls acquirablePartitions method in DelayedShareFetch.
Mockito.verify(delayedShareFetch, times(1)).acquirablePartitions();
assertEquals(0, shareFetchPartitionData.future().join().size());
assertEquals(0, shareFetchData.future().join().size());
// Force completing the share fetch request for the second time should hit the future completion check and not
// proceed ahead in the function.
@ -260,12 +262,12 @@ public class DelayedShareFetchTest {
}
static class DelayedShareFetchBuilder {
SharePartitionManager.ShareFetchPartitionData shareFetchPartitionData = mock(SharePartitionManager.ShareFetchPartitionData.class);
ShareFetchData shareFetchData = mock(ShareFetchData.class);
private ReplicaManager replicaManager = mock(ReplicaManager.class);
private Map<SharePartitionManager.SharePartitionKey, SharePartition> partitionCacheMap = new HashMap<>();
private Map<SharePartitionKey, SharePartition> partitionCacheMap = new HashMap<>();
DelayedShareFetchBuilder withShareFetchPartitionData(SharePartitionManager.ShareFetchPartitionData shareFetchPartitionData) {
this.shareFetchPartitionData = shareFetchPartitionData;
DelayedShareFetchBuilder withShareFetchData(ShareFetchData shareFetchData) {
this.shareFetchData = shareFetchData;
return this;
}
@ -274,7 +276,7 @@ public class DelayedShareFetchTest {
return this;
}
DelayedShareFetchBuilder withPartitionCacheMap(Map<SharePartitionManager.SharePartitionKey, SharePartition> partitionCacheMap) {
DelayedShareFetchBuilder withPartitionCacheMap(Map<SharePartitionKey, SharePartition> partitionCacheMap) {
this.partitionCacheMap = partitionCacheMap;
return this;
}
@ -285,7 +287,7 @@ public class DelayedShareFetchTest {
public DelayedShareFetch build() {
return new DelayedShareFetch(
shareFetchPartitionData,
shareFetchData,
replicaManager,
partitionCacheMap);
}

View File

@ -30,6 +30,8 @@ import org.apache.kafka.common.record.FileRecords;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.SimpleRecord;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.server.share.SharePartitionKey;
import org.apache.kafka.server.share.fetch.ShareFetchData;
import org.apache.kafka.storage.internals.log.FetchIsolation;
import org.apache.kafka.storage.internals.log.FetchParams;
import org.apache.kafka.storage.internals.log.FetchPartitionData;
@ -90,11 +92,11 @@ public class ShareFetchUtilsTest {
doNothing().when(sp1).updateCacheAndOffsets(any(Long.class));
doNothing().when(sp0).updateCacheAndOffsets(any(Long.class));
Map<SharePartitionManager.SharePartitionKey, SharePartition> partitionCacheMap = new ConcurrentHashMap<>();
partitionCacheMap.put(new SharePartitionManager.SharePartitionKey(groupId, tp0), sp0);
partitionCacheMap.put(new SharePartitionManager.SharePartitionKey(groupId, tp1), sp1);
Map<SharePartitionKey, SharePartition> partitionCacheMap = new ConcurrentHashMap<>();
partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0);
partitionCacheMap.put(new SharePartitionKey(groupId, tp1), sp1);
SharePartitionManager.ShareFetchPartitionData shareFetchPartitionData = new SharePartitionManager.ShareFetchPartitionData(
ShareFetchData shareFetchData = new ShareFetchData(
new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), FetchRequest.ORDINARY_CONSUMER_ID, -1, 0,
1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty()), groupId, memberId,
new CompletableFuture<>(), partitionMaxBytes);
@ -119,7 +121,7 @@ public class ShareFetchUtilsTest {
records1, Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false));
CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>> result =
ShareFetchUtils.processFetchResponse(shareFetchPartitionData, responseData, partitionCacheMap, mock(ReplicaManager.class));
ShareFetchUtils.processFetchResponse(shareFetchData, responseData, partitionCacheMap, mock(ReplicaManager.class));
assertTrue(result.isDone());
Map<TopicIdPartition, ShareFetchResponseData.PartitionData> resultData = result.join();
@ -161,11 +163,11 @@ public class ShareFetchUtilsTest {
doNothing().when(sp1).updateCacheAndOffsets(any(Long.class));
doNothing().when(sp0).updateCacheAndOffsets(any(Long.class));
Map<SharePartitionManager.SharePartitionKey, SharePartition> partitionCacheMap = new ConcurrentHashMap<>();
partitionCacheMap.put(new SharePartitionManager.SharePartitionKey(groupId, tp0), sp0);
partitionCacheMap.put(new SharePartitionManager.SharePartitionKey(groupId, tp1), sp1);
Map<SharePartitionKey, SharePartition> partitionCacheMap = new ConcurrentHashMap<>();
partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0);
partitionCacheMap.put(new SharePartitionKey(groupId, tp1), sp1);
SharePartitionManager.ShareFetchPartitionData shareFetchPartitionData = new SharePartitionManager.ShareFetchPartitionData(
ShareFetchData shareFetchData = new ShareFetchData(
new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), FetchRequest.ORDINARY_CONSUMER_ID, -1, 0,
1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty()), groupId, memberId,
new CompletableFuture<>(), partitionMaxBytes);
@ -178,7 +180,7 @@ public class ShareFetchUtilsTest {
MemoryRecords.EMPTY, Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false));
CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>> result =
ShareFetchUtils.processFetchResponse(shareFetchPartitionData, responseData, partitionCacheMap, mock(ReplicaManager.class));
ShareFetchUtils.processFetchResponse(shareFetchData, responseData, partitionCacheMap, mock(ReplicaManager.class));
assertTrue(result.isDone());
Map<TopicIdPartition, ShareFetchResponseData.PartitionData> resultData = result.join();
@ -207,11 +209,11 @@ public class ShareFetchUtilsTest {
SharePartition sp0 = Mockito.mock(SharePartition.class);
SharePartition sp1 = Mockito.mock(SharePartition.class);
Map<SharePartitionManager.SharePartitionKey, SharePartition> partitionCacheMap = new ConcurrentHashMap<>();
partitionCacheMap.put(new SharePartitionManager.SharePartitionKey(groupId, tp0), sp0);
partitionCacheMap.put(new SharePartitionManager.SharePartitionKey(groupId, tp1), sp1);
Map<SharePartitionKey, SharePartition> partitionCacheMap = new ConcurrentHashMap<>();
partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0);
partitionCacheMap.put(new SharePartitionKey(groupId, tp1), sp1);
SharePartitionManager.ShareFetchPartitionData shareFetchPartitionData = new SharePartitionManager.ShareFetchPartitionData(
ShareFetchData shareFetchData = new ShareFetchData(
new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), FetchRequest.ORDINARY_CONSUMER_ID, -1, 0,
1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty()),
groupId, Uuid.randomUuid().toString(), new CompletableFuture<>(), partitionMaxBytes);
@ -251,7 +253,7 @@ public class ShareFetchUtilsTest {
records1, Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false));
CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>> result1 =
ShareFetchUtils.processFetchResponse(shareFetchPartitionData, responseData1, partitionCacheMap, replicaManager);
ShareFetchUtils.processFetchResponse(shareFetchData, responseData1, partitionCacheMap, replicaManager);
assertTrue(result1.isDone());
Map<TopicIdPartition, ShareFetchResponseData.PartitionData> resultData1 = result1.join();
@ -282,7 +284,7 @@ public class ShareFetchUtilsTest {
MemoryRecords.EMPTY, Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false));
CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>> result2 =
ShareFetchUtils.processFetchResponse(shareFetchPartitionData, responseData2, partitionCacheMap, replicaManager);
ShareFetchUtils.processFetchResponse(shareFetchData, responseData2, partitionCacheMap, replicaManager);
assertTrue(result2.isDone());
Map<TopicIdPartition, ShareFetchResponseData.PartitionData> resultData2 = result2.join();

View File

@ -53,10 +53,12 @@ import org.apache.kafka.server.share.CachedSharePartition;
import org.apache.kafka.server.share.ErroneousAndValidPartitionData;
import org.apache.kafka.server.share.NoOpShareStatePersister;
import org.apache.kafka.server.share.Persister;
import org.apache.kafka.server.share.ShareAcknowledgementBatch;
import org.apache.kafka.server.share.SharePartitionKey;
import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch;
import org.apache.kafka.server.share.context.FinalContext;
import org.apache.kafka.server.share.context.ShareFetchContext;
import org.apache.kafka.server.share.context.ShareSessionContext;
import org.apache.kafka.server.share.fetch.ShareFetchData;
import org.apache.kafka.server.share.session.ShareSession;
import org.apache.kafka.server.share.session.ShareSessionCache;
import org.apache.kafka.server.share.session.ShareSessionKey;
@ -978,17 +980,17 @@ public class SharePartitionManagerTest {
@Test
public void testSharePartitionKey() {
SharePartitionManager.SharePartitionKey sharePartitionKey1 = new SharePartitionManager.SharePartitionKey("mock-group-1",
SharePartitionKey sharePartitionKey1 = new SharePartitionKey("mock-group-1",
new TopicIdPartition(new Uuid(0L, 1L), new TopicPartition("test", 0)));
SharePartitionManager.SharePartitionKey sharePartitionKey2 = new SharePartitionManager.SharePartitionKey("mock-group-2",
SharePartitionKey sharePartitionKey2 = new SharePartitionKey("mock-group-2",
new TopicIdPartition(new Uuid(0L, 1L), new TopicPartition("test", 0)));
SharePartitionManager.SharePartitionKey sharePartitionKey3 = new SharePartitionManager.SharePartitionKey("mock-group-1",
SharePartitionKey sharePartitionKey3 = new SharePartitionKey("mock-group-1",
new TopicIdPartition(new Uuid(1L, 1L), new TopicPartition("test-1", 0)));
SharePartitionManager.SharePartitionKey sharePartitionKey4 = new SharePartitionManager.SharePartitionKey("mock-group-1",
SharePartitionKey sharePartitionKey4 = new SharePartitionKey("mock-group-1",
new TopicIdPartition(new Uuid(0L, 1L), new TopicPartition("test", 1)));
SharePartitionManager.SharePartitionKey sharePartitionKey5 = new SharePartitionManager.SharePartitionKey("mock-group-1",
SharePartitionKey sharePartitionKey5 = new SharePartitionKey("mock-group-1",
new TopicIdPartition(new Uuid(0L, 0L), new TopicPartition("test-2", 0)));
SharePartitionManager.SharePartitionKey sharePartitionKey1Copy = new SharePartitionManager.SharePartitionKey("mock-group-1",
SharePartitionKey sharePartitionKey1Copy = new SharePartitionKey("mock-group-1",
new TopicIdPartition(new Uuid(0L, 1L), new TopicPartition("test", 0)));
assertEquals(sharePartitionKey1, sharePartitionKey1Copy);
@ -1182,8 +1184,8 @@ public class SharePartitionManagerTest {
when(sp0.maybeAcquireFetchLock()).thenReturn(true);
when(sp0.canAcquireRecords()).thenReturn(false);
when(sp0.maybeInitialize()).thenReturn(CompletableFuture.completedFuture(null));
Map<SharePartitionManager.SharePartitionKey, SharePartition> partitionCacheMap = new HashMap<>();
partitionCacheMap.put(new SharePartitionManager.SharePartitionKey(groupId, tp0), sp0);
Map<SharePartitionKey, SharePartition> partitionCacheMap = new HashMap<>();
partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0);
DelayedOperationPurgatory<DelayedShareFetch> delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
"TestShareFetch", mockTimer, replicaManager.localBrokerId(),
@ -1305,9 +1307,9 @@ public class SharePartitionManagerTest {
partitionMap.add(new CachedSharePartition(tp3));
when(shareSession.partitionMap()).thenReturn(partitionMap);
Map<SharePartitionManager.SharePartitionKey, SharePartition> partitionCacheMap = new HashMap<>();
partitionCacheMap.put(new SharePartitionManager.SharePartitionKey(groupId, tp1), sp1);
partitionCacheMap.put(new SharePartitionManager.SharePartitionKey(groupId, tp2), sp2);
Map<SharePartitionKey, SharePartition> partitionCacheMap = new HashMap<>();
partitionCacheMap.put(new SharePartitionKey(groupId, tp1), sp1);
partitionCacheMap.put(new SharePartitionKey(groupId, tp2), sp2);
SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder()
.withCache(cache)
@ -1439,8 +1441,8 @@ public class SharePartitionManagerTest {
when(sp.acknowledge(ArgumentMatchers.eq(memberId), any())).thenReturn(CompletableFuture.completedFuture(null));
Map<SharePartitionManager.SharePartitionKey, SharePartition> partitionCacheMap = new HashMap<>();
partitionCacheMap.put(new SharePartitionManager.SharePartitionKey(groupId, tp), sp);
Map<SharePartitionKey, SharePartition> partitionCacheMap = new HashMap<>();
partitionCacheMap.put(new SharePartitionKey(groupId, tp), sp);
SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder()
.withPartitionCacheMap(partitionCacheMap).build();
@ -1475,10 +1477,10 @@ public class SharePartitionManagerTest {
when(sp2.acknowledge(ArgumentMatchers.eq(memberId), any())).thenReturn(CompletableFuture.completedFuture(null));
when(sp3.acknowledge(ArgumentMatchers.eq(memberId), any())).thenReturn(CompletableFuture.completedFuture(null));
Map<SharePartitionManager.SharePartitionKey, SharePartition> partitionCacheMap = new HashMap<>();
partitionCacheMap.put(new SharePartitionManager.SharePartitionKey(groupId, tp1), sp1);
partitionCacheMap.put(new SharePartitionManager.SharePartitionKey(groupId, tp2), sp2);
partitionCacheMap.put(new SharePartitionManager.SharePartitionKey(groupId, tp3), sp3);
Map<SharePartitionKey, SharePartition> partitionCacheMap = new HashMap<>();
partitionCacheMap.put(new SharePartitionKey(groupId, tp1), sp1);
partitionCacheMap.put(new SharePartitionKey(groupId, tp2), sp2);
partitionCacheMap.put(new SharePartitionKey(groupId, tp3), sp3);
Metrics metrics = new Metrics();
SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder()
@ -1565,8 +1567,8 @@ public class SharePartitionManagerTest {
TopicIdPartition tp = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
SharePartition sp = mock(SharePartition.class);
Map<SharePartitionManager.SharePartitionKey, SharePartition> partitionCacheMap = new HashMap<>();
partitionCacheMap.put(new SharePartitionManager.SharePartitionKey(groupId, tp), sp);
Map<SharePartitionKey, SharePartition> partitionCacheMap = new HashMap<>();
partitionCacheMap.put(new SharePartitionKey(groupId, tp), sp);
SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder()
.withPartitionCacheMap(partitionCacheMap).build();
@ -1594,8 +1596,8 @@ public class SharePartitionManagerTest {
when(sp.acknowledge(ArgumentMatchers.eq(memberId), any())).thenReturn(FutureUtils.failedFuture(
new InvalidRequestException("Member is not the owner of batch record")
));
Map<SharePartitionManager.SharePartitionKey, SharePartition> partitionCacheMap = new HashMap<>();
partitionCacheMap.put(new SharePartitionManager.SharePartitionKey(groupId, tp), sp);
Map<SharePartitionKey, SharePartition> partitionCacheMap = new HashMap<>();
partitionCacheMap.put(new SharePartitionKey(groupId, tp), sp);
SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder()
.withPartitionCacheMap(partitionCacheMap).build();
@ -1649,16 +1651,16 @@ public class SharePartitionManagerTest {
final Time time = new MockTime();
ReplicaManager replicaManager = mock(ReplicaManager.class);
SharePartitionManager.ShareFetchPartitionData shareFetchPartitionData1 = new SharePartitionManager.ShareFetchPartitionData(
ShareFetchData shareFetchData1 = new ShareFetchData(
fetchParams, groupId, memberId, new CompletableFuture<>(), Collections.emptyMap());
SharePartitionManager.ShareFetchPartitionData shareFetchPartitionData2 = new SharePartitionManager.ShareFetchPartitionData(
ShareFetchData shareFetchData2 = new ShareFetchData(
fetchParams, groupId, memberId, new CompletableFuture<>(), partitionMaxBytes);
ConcurrentLinkedQueue<SharePartitionManager.ShareFetchPartitionData> fetchQueue = new ConcurrentLinkedQueue<>();
ConcurrentLinkedQueue<ShareFetchData> fetchQueue = new ConcurrentLinkedQueue<>();
// First request added to fetch queue is empty i.e. no topic partitions to fetch.
fetchQueue.add(shareFetchPartitionData1);
fetchQueue.add(shareFetchData1);
// Second request added to fetch queue has a topic partition to fetch.
fetchQueue.add(shareFetchPartitionData2);
fetchQueue.add(shareFetchData2);
DelayedOperationPurgatory<DelayedShareFetch> delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
"TestShareFetch", mockTimer, replicaManager.localBrokerId(),
@ -1704,11 +1706,11 @@ public class SharePartitionManagerTest {
return CompletableFuture.completedFuture(Optional.empty());
}).when(sp2).acknowledge(ArgumentMatchers.eq(memberId), any());
Map<SharePartitionManager.SharePartitionKey, SharePartition> partitionCacheMap = new HashMap<>();
partitionCacheMap.put(new SharePartitionManager.SharePartitionKey(groupId, tp1), sp1);
partitionCacheMap.put(new SharePartitionManager.SharePartitionKey(groupId, tp2), sp2);
Map<SharePartitionKey, SharePartition> partitionCacheMap = new HashMap<>();
partitionCacheMap.put(new SharePartitionKey(groupId, tp1), sp1);
partitionCacheMap.put(new SharePartitionKey(groupId, tp2), sp2);
SharePartitionManager.ShareFetchPartitionData shareFetchPartitionData = new SharePartitionManager.ShareFetchPartitionData(
ShareFetchData shareFetchData = new ShareFetchData(
new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), FetchRequest.ORDINARY_CONSUMER_ID, -1, DELAYED_SHARE_FETCH_MAX_WAIT_MS,
1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty()),
groupId,
@ -1731,7 +1733,7 @@ public class SharePartitionManagerTest {
partitionMaxBytes.keySet().forEach(topicIdPartition -> delayedShareFetchWatchKeys.add(new DelayedShareFetchKey(groupId, topicIdPartition)));
DelayedShareFetch delayedShareFetch = DelayedShareFetchTest.DelayedShareFetchBuilder.builder()
.withShareFetchPartitionData(shareFetchPartitionData)
.withShareFetchData(shareFetchData)
.withReplicaManager(replicaManager)
.withPartitionCacheMap(partitionCacheMap)
.build();
@ -1799,12 +1801,12 @@ public class SharePartitionManagerTest {
return CompletableFuture.completedFuture(Optional.empty());
}).when(sp3).acknowledge(ArgumentMatchers.eq(memberId), any());
Map<SharePartitionManager.SharePartitionKey, SharePartition> partitionCacheMap = new HashMap<>();
partitionCacheMap.put(new SharePartitionManager.SharePartitionKey(groupId, tp1), sp1);
partitionCacheMap.put(new SharePartitionManager.SharePartitionKey(groupId, tp2), sp2);
partitionCacheMap.put(new SharePartitionManager.SharePartitionKey(groupId, tp3), sp3);
Map<SharePartitionKey, SharePartition> partitionCacheMap = new HashMap<>();
partitionCacheMap.put(new SharePartitionKey(groupId, tp1), sp1);
partitionCacheMap.put(new SharePartitionKey(groupId, tp2), sp2);
partitionCacheMap.put(new SharePartitionKey(groupId, tp3), sp3);
SharePartitionManager.ShareFetchPartitionData shareFetchPartitionData = new SharePartitionManager.ShareFetchPartitionData(
ShareFetchData shareFetchData = new ShareFetchData(
new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), FetchRequest.ORDINARY_CONSUMER_ID, -1, DELAYED_SHARE_FETCH_MAX_WAIT_MS,
1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty()),
groupId,
@ -1829,7 +1831,7 @@ public class SharePartitionManagerTest {
partitionMaxBytes.keySet().forEach(topicIdPartition -> delayedShareFetchWatchKeys.add(new DelayedShareFetchKey(groupId, topicIdPartition)));
DelayedShareFetch delayedShareFetch = DelayedShareFetchTest.DelayedShareFetchBuilder.builder()
.withShareFetchPartitionData(shareFetchPartitionData)
.withShareFetchData(shareFetchData)
.withReplicaManager(replicaManager)
.withPartitionCacheMap(partitionCacheMap)
.build();
@ -1893,11 +1895,11 @@ public class SharePartitionManagerTest {
return CompletableFuture.completedFuture(Optional.empty());
}).when(sp2).releaseAcquiredRecords(ArgumentMatchers.eq(memberId));
Map<SharePartitionManager.SharePartitionKey, SharePartition> partitionCacheMap = new HashMap<>();
partitionCacheMap.put(new SharePartitionManager.SharePartitionKey(groupId, tp1), sp1);
partitionCacheMap.put(new SharePartitionManager.SharePartitionKey(groupId, tp2), sp2);
Map<SharePartitionKey, SharePartition> partitionCacheMap = new HashMap<>();
partitionCacheMap.put(new SharePartitionKey(groupId, tp1), sp1);
partitionCacheMap.put(new SharePartitionKey(groupId, tp2), sp2);
SharePartitionManager.ShareFetchPartitionData shareFetchPartitionData = new SharePartitionManager.ShareFetchPartitionData(
ShareFetchData shareFetchData = new ShareFetchData(
new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), FetchRequest.ORDINARY_CONSUMER_ID, -1, DELAYED_SHARE_FETCH_MAX_WAIT_MS,
1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty()),
groupId,
@ -1920,7 +1922,7 @@ public class SharePartitionManagerTest {
partitionMaxBytes.keySet().forEach(topicIdPartition -> delayedShareFetchWatchKeys.add(new DelayedShareFetchKey(groupId, topicIdPartition)));
DelayedShareFetch delayedShareFetch = DelayedShareFetchTest.DelayedShareFetchBuilder.builder()
.withShareFetchPartitionData(shareFetchPartitionData)
.withShareFetchData(shareFetchData)
.withReplicaManager(replicaManager)
.withPartitionCacheMap(partitionCacheMap)
.build();
@ -1992,12 +1994,12 @@ public class SharePartitionManagerTest {
return CompletableFuture.completedFuture(Optional.empty());
}).when(sp3).releaseAcquiredRecords(ArgumentMatchers.eq(memberId));
Map<SharePartitionManager.SharePartitionKey, SharePartition> partitionCacheMap = new HashMap<>();
partitionCacheMap.put(new SharePartitionManager.SharePartitionKey(groupId, tp1), sp1);
partitionCacheMap.put(new SharePartitionManager.SharePartitionKey(groupId, tp2), sp2);
partitionCacheMap.put(new SharePartitionManager.SharePartitionKey(groupId, tp3), sp3);
Map<SharePartitionKey, SharePartition> partitionCacheMap = new HashMap<>();
partitionCacheMap.put(new SharePartitionKey(groupId, tp1), sp1);
partitionCacheMap.put(new SharePartitionKey(groupId, tp2), sp2);
partitionCacheMap.put(new SharePartitionKey(groupId, tp3), sp3);
SharePartitionManager.ShareFetchPartitionData shareFetchPartitionData = new SharePartitionManager.ShareFetchPartitionData(
ShareFetchData shareFetchData = new ShareFetchData(
new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), FetchRequest.ORDINARY_CONSUMER_ID, -1, DELAYED_SHARE_FETCH_MAX_WAIT_MS,
1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty()),
groupId,
@ -2022,7 +2024,7 @@ public class SharePartitionManagerTest {
partitionMaxBytes.keySet().forEach(topicIdPartition -> delayedShareFetchWatchKeys.add(new DelayedShareFetchKey(groupId, topicIdPartition)));
DelayedShareFetch delayedShareFetch = DelayedShareFetchTest.DelayedShareFetchBuilder.builder()
.withShareFetchPartitionData(shareFetchPartitionData)
.withShareFetchData(shareFetchData)
.withReplicaManager(replicaManager)
.withPartitionCacheMap(partitionCacheMap)
.build();
@ -2067,8 +2069,8 @@ public class SharePartitionManagerTest {
Map<TopicIdPartition, Integer> partitionMaxBytes = Collections.singletonMap(tp0, PARTITION_MAX_BYTES);
SharePartition sp0 = mock(SharePartition.class);
Map<SharePartitionManager.SharePartitionKey, SharePartition> partitionCacheMap = new HashMap<>();
partitionCacheMap.put(new SharePartitionManager.SharePartitionKey(groupId, tp0), sp0);
Map<SharePartitionKey, SharePartition> partitionCacheMap = new HashMap<>();
partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0);
// Keep the initialization future pending, so fetch request is stuck.
CompletableFuture<Void> pendingInitializationFuture = new CompletableFuture<>();
@ -2111,8 +2113,8 @@ public class SharePartitionManagerTest {
Map<TopicIdPartition, Integer> partitionMaxBytes = Collections.singletonMap(tp0, PARTITION_MAX_BYTES);
SharePartition sp0 = mock(SharePartition.class);
Map<SharePartitionManager.SharePartitionKey, SharePartition> partitionCacheMap = new HashMap<>();
partitionCacheMap.put(new SharePartitionManager.SharePartitionKey(groupId, tp0), sp0);
Map<SharePartitionKey, SharePartition> partitionCacheMap = new HashMap<>();
partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0);
ReplicaManager replicaManager = mock(ReplicaManager.class);
DelayedOperationPurgatory<DelayedShareFetch> delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
@ -2181,7 +2183,7 @@ public class SharePartitionManagerTest {
assertTrue(partitionCacheMap.isEmpty());
// The last exception removes the share partition from the cache hence re-add the share partition to cache.
partitionCacheMap.put(new SharePartitionManager.SharePartitionKey(groupId, tp0), sp0);
partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0);
// Return NotLeaderOrFollowerException to simulate initialization failure.
when(sp0.maybeInitialize()).thenReturn(FutureUtils.failedFuture(new NotLeaderOrFollowerException("Not leader or follower")));
future = sharePartitionManager.fetchMessages(groupId, memberId.toString(), fetchParams, partitionMaxBytes);
@ -2195,7 +2197,7 @@ public class SharePartitionManagerTest {
assertTrue(partitionCacheMap.isEmpty());
// The last exception removes the share partition from the cache hence re-add the share partition to cache.
partitionCacheMap.put(new SharePartitionManager.SharePartitionKey(groupId, tp0), sp0);
partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0);
// Return RuntimeException to simulate initialization failure.
when(sp0.maybeInitialize()).thenReturn(FutureUtils.failedFuture(new RuntimeException("Runtime exception")));
future = sharePartitionManager.fetchMessages(groupId, memberId.toString(), fetchParams, partitionMaxBytes);
@ -2289,11 +2291,11 @@ public class SharePartitionManagerTest {
private ReplicaManager replicaManager = mock(ReplicaManager.class);
private Time time = new MockTime();
private ShareSessionCache cache = new ShareSessionCache(10, 1000);
private Map<SharePartitionManager.SharePartitionKey, SharePartition> partitionCacheMap = new HashMap<>();
private Map<SharePartitionKey, SharePartition> partitionCacheMap = new HashMap<>();
private Persister persister = NoOpShareStatePersister.getInstance();
private Timer timer = new MockTimer();
private Metrics metrics = new Metrics();
private ConcurrentLinkedQueue<SharePartitionManager.ShareFetchPartitionData> fetchQueue = new ConcurrentLinkedQueue<>();
private ConcurrentLinkedQueue<ShareFetchData> fetchQueue = new ConcurrentLinkedQueue<>();
private DelayedOperationPurgatory<DelayedShareFetch> delayedShareFetchPurgatory = mock(DelayedOperationPurgatory.class);
private SharePartitionManagerBuilder withReplicaManager(ReplicaManager replicaManager) {
@ -2311,7 +2313,7 @@ public class SharePartitionManagerTest {
return this;
}
private SharePartitionManagerBuilder withPartitionCacheMap(Map<SharePartitionManager.SharePartitionKey, SharePartition> partitionCacheMap) {
private SharePartitionManagerBuilder withPartitionCacheMap(Map<SharePartitionKey, SharePartition> partitionCacheMap) {
this.partitionCacheMap = partitionCacheMap;
return this;
}
@ -2331,7 +2333,7 @@ public class SharePartitionManagerTest {
return this;
}
private SharePartitionManagerBuilder withFetchQueue(ConcurrentLinkedQueue<SharePartitionManager.ShareFetchPartitionData> fetchQueue) {
private SharePartitionManagerBuilder withFetchQueue(ConcurrentLinkedQueue<ShareFetchData> fetchQueue) {
this.fetchQueue = fetchQueue;
return this;
}

View File

@ -42,9 +42,9 @@ import org.apache.kafka.server.share.PartitionFactory;
import org.apache.kafka.server.share.Persister;
import org.apache.kafka.server.share.PersisterStateBatch;
import org.apache.kafka.server.share.ReadShareGroupStateResult;
import org.apache.kafka.server.share.ShareAcknowledgementBatch;
import org.apache.kafka.server.share.TopicData;
import org.apache.kafka.server.share.WriteShareGroupStateResult;
import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch;
import org.apache.kafka.server.util.FutureUtils;
import org.apache.kafka.server.util.timer.SystemTimer;
import org.apache.kafka.server.util.timer.SystemTimerReaper;

View File

@ -88,8 +88,9 @@ import org.apache.kafka.server.common.MetadataVersion.{IBP_0_10_2_IV0, IBP_2_2_I
import org.apache.kafka.server.common.{FeatureVersion, FinalizedFeatures, GroupVersion, KRaftVersion, MetadataVersion, RequestLocal}
import org.apache.kafka.server.config.{ConfigType, KRaftConfigs, ReplicationConfigs, ServerConfigs, ServerLogConfigs, ShareGroupConfig}
import org.apache.kafka.server.metrics.ClientMetricsTestUtils
import org.apache.kafka.server.share.{CachedSharePartition, ErroneousAndValidPartitionData, ShareAcknowledgementBatch}
import org.apache.kafka.server.share.{CachedSharePartition, ErroneousAndValidPartitionData}
import org.apache.kafka.server.quota.ThrottleCallback
import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch
import org.apache.kafka.server.share.context.{FinalContext, ShareSessionContext}
import org.apache.kafka.server.share.session.{ShareSession, ShareSessionKey}
import org.apache.kafka.server.util.{FutureUtils, MockTime}

View File

@ -17,36 +17,48 @@
package org.apache.kafka.server.share;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import java.util.Objects;
/**
* Common immutable share partition key class. This class is
* placed in server-common so that it can be freely used across
* various modules.
* The SharePartitionKey is used to uniquely identify a share partition. The key is made up of the
* share group id, the topic id and the partition id. The key is used to store the SharePartition
* objects in the partition cache map.
*/
public class SharePartitionKey {
private final String groupId;
private final Uuid topicId;
private final int partition;
protected final String groupId;
protected final TopicIdPartition topicIdPartition;
public SharePartitionKey(String groupId, TopicIdPartition topicIdPartition) {
this.groupId = Objects.requireNonNull(groupId);
this.topicIdPartition = Objects.requireNonNull(topicIdPartition);
}
private SharePartitionKey(String groupId, Uuid topicId, int partition) {
this.groupId = groupId;
this.topicId = topicId;
this.partition = partition;
this(groupId, topicId, null, partition);
}
private SharePartitionKey(String groupId, Uuid topicId, String topic, int partition) {
this(groupId, new TopicIdPartition(Objects.requireNonNull(topicId), new TopicPartition(topic, partition)));
}
public String groupId() {
return groupId;
}
public TopicIdPartition topicIdPartition() {
return topicIdPartition;
}
public Uuid topicId() {
return topicId;
return topicIdPartition.topicId();
}
public int partition() {
return partition;
return topicIdPartition.partition();
}
public static SharePartitionKey getInstance(String groupId, TopicIdPartition topicIdPartition) {
@ -57,33 +69,28 @@ public class SharePartitionKey {
return new SharePartitionKey(groupId, topicId, partition);
}
public String asCoordinatorKey() {
return asCoordinatorKey(groupId, topicId, partition);
}
public static String asCoordinatorKey(String groupId, Uuid topicId, int partition) {
return String.format("%s:%s:%d", groupId, topicId, partition);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof SharePartitionKey)) return false;
SharePartitionKey that = (SharePartitionKey) o;
return partition == that.partition && Objects.equals(groupId, that.groupId) && Objects.equals(topicId, that.topicId);
public boolean equals(final Object obj) {
if (this == obj)
return true;
else if (obj == null || getClass() != obj.getClass())
return false;
else {
SharePartitionKey that = (SharePartitionKey) obj;
return groupId.equals(that.groupId) && Objects.equals(topicIdPartition, that.topicIdPartition);
}
}
@Override
public int hashCode() {
return Objects.hash(groupId, topicId, partition);
return Objects.hash(groupId, topicIdPartition);
}
@Override
public String toString() {
return "SharePartitionKey{" +
"groupId=" + groupId +
",topicId=" + topicId +
",partition=" + partition +
"}";
"groupId='" + groupId +
", topicIdPartition=" + topicIdPartition +
'}';
}
}

View File

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.kafka.server.share;
package org.apache.kafka.server.share.acknowledge;
import java.util.List;

View File

@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.share.fetch;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.message.ShareFetchResponseData.PartitionData;
import org.apache.kafka.storage.internals.log.FetchParams;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
/**
* The ShareFetchData class is used to store the fetch parameters for a share fetch request.
*/
public class ShareFetchData {
private final FetchParams fetchParams;
private final String groupId;
private final String memberId;
private final CompletableFuture<Map<TopicIdPartition, PartitionData>> future;
private final Map<TopicIdPartition, Integer> partitionMaxBytes;
public ShareFetchData(
FetchParams fetchParams,
String groupId,
String memberId,
CompletableFuture<Map<TopicIdPartition, PartitionData>> future,
Map<TopicIdPartition, Integer> partitionMaxBytes
) {
this.fetchParams = fetchParams;
this.groupId = groupId;
this.memberId = memberId;
this.future = future;
this.partitionMaxBytes = partitionMaxBytes;
}
public String groupId() {
return groupId;
}
public String memberId() {
return memberId;
}
public CompletableFuture<Map<TopicIdPartition, PartitionData>> future() {
return future;
}
public Map<TopicIdPartition, Integer> partitionMaxBytes() {
return partitionMaxBytes;
}
public FetchParams fetchParams() {
return fetchParams;
}
}