mirror of https://github.com/apache/kafka.git
KAFKA-17378 Fixes for performance testing (#16942)
Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
parent
1e14b0c964
commit
94f5039350
|
@ -259,36 +259,41 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi
|
|||
long currentTimeMs,
|
||||
boolean onCommitAsync,
|
||||
AtomicBoolean isAsyncDone) {
|
||||
if (acknowledgeRequestState == null || (!acknowledgeRequestState.onClose() && acknowledgeRequestState.isEmpty())) {
|
||||
if (onCommitAsync) {
|
||||
isAsyncDone.set(true);
|
||||
boolean asyncDone = true;
|
||||
try {
|
||||
if (acknowledgeRequestState == null || (!acknowledgeRequestState.onClose() && acknowledgeRequestState.isEmpty())) {
|
||||
return Optional.empty();
|
||||
}
|
||||
return Optional.empty();
|
||||
} else if (!acknowledgeRequestState.maybeExpire()) {
|
||||
if (acknowledgeRequestState.canSendRequest(currentTimeMs)) {
|
||||
acknowledgeRequestState.onSendAttempt(currentTimeMs);
|
||||
if (onCommitAsync) {
|
||||
isAsyncDone.set(true);
|
||||
|
||||
if (acknowledgeRequestState.maybeExpire()) {
|
||||
// Fill in TimeoutException
|
||||
for (TopicIdPartition tip : acknowledgeRequestState.incompleteAcknowledgements.keySet()) {
|
||||
metricsManager.recordFailedAcknowledgements(acknowledgeRequestState.getIncompleteAcknowledgementsCount(tip));
|
||||
acknowledgeRequestState.handleAcknowledgeTimedOut(tip);
|
||||
}
|
||||
return Optional.of(acknowledgeRequestState.buildRequest(currentTimeMs));
|
||||
} else {
|
||||
acknowledgeRequestState.incompleteAcknowledgements.clear();
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
if (!acknowledgeRequestState.canSendRequest(currentTimeMs)) {
|
||||
// We wait for the backoff before we can send this request.
|
||||
if (onCommitAsync) {
|
||||
isAsyncDone.set(false);
|
||||
}
|
||||
asyncDone = false;
|
||||
return Optional.empty();
|
||||
}
|
||||
} else {
|
||||
// Fill in TimeoutException
|
||||
for (TopicIdPartition tip : acknowledgeRequestState.incompleteAcknowledgements.keySet()) {
|
||||
metricsManager.recordFailedAcknowledgements(acknowledgeRequestState.getIncompleteAcknowledgementsCount(tip));
|
||||
acknowledgeRequestState.handleAcknowledgeTimedOut(tip);
|
||||
|
||||
UnsentRequest request = acknowledgeRequestState.buildRequest(currentTimeMs);
|
||||
if (request == null) {
|
||||
asyncDone = false;
|
||||
return Optional.empty();
|
||||
}
|
||||
acknowledgeRequestState.incompleteAcknowledgements.clear();
|
||||
|
||||
acknowledgeRequestState.onSendAttempt(currentTimeMs);
|
||||
return Optional.of(request);
|
||||
} finally {
|
||||
if (onCommitAsync) {
|
||||
isAsyncDone.set(true);
|
||||
isAsyncDone.set(asyncDone);
|
||||
}
|
||||
}
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -351,7 +351,7 @@ public class SharePartition {
|
|||
String memberId,
|
||||
FetchPartitionData fetchPartitionData
|
||||
) {
|
||||
log.trace("Received acquire request for share partition: {}-{}", memberId, fetchPartitionData);
|
||||
log.trace("Received acquire request for share partition: {}-{} memberId: {}", groupId, topicIdPartition, memberId);
|
||||
RecordBatch lastBatch = fetchPartitionData.records.lastBatch().orElse(null);
|
||||
if (lastBatch == null) {
|
||||
// Nothing to acquire.
|
||||
|
@ -386,7 +386,7 @@ public class SharePartition {
|
|||
}
|
||||
|
||||
log.trace("Overlap exists with in-flight records. Acquire the records if available for"
|
||||
+ " the share group: {}-{}", groupId, topicIdPartition);
|
||||
+ " the share partition: {}-{}", groupId, topicIdPartition);
|
||||
List<AcquiredRecords> result = new ArrayList<>();
|
||||
// The fetched records are already part of the in-flight records. The records might
|
||||
// be available for re-delivery hence try acquiring same. The request batches could
|
||||
|
@ -399,7 +399,7 @@ public class SharePartition {
|
|||
if (!fullMatch || inFlightBatch.offsetState() != null) {
|
||||
log.trace("Subset or offset tracked batch record found for share partition,"
|
||||
+ " batch: {} request offsets - first: {}, last: {} for the share"
|
||||
+ " group: {}-{}", inFlightBatch, firstBatch.baseOffset(),
|
||||
+ " partition: {}-{}", inFlightBatch, firstBatch.baseOffset(),
|
||||
lastBatch.lastOffset(), groupId, topicIdPartition);
|
||||
if (inFlightBatch.offsetState() == null) {
|
||||
// Though the request is a subset of in-flight batch but the offset
|
||||
|
@ -408,7 +408,7 @@ public class SharePartition {
|
|||
// complete batch is available yet. Hence, do a pre-check to avoid exploding
|
||||
// the in-flight offset tracking unnecessarily.
|
||||
if (inFlightBatch.batchState() != RecordState.AVAILABLE) {
|
||||
log.trace("The batch is not available to acquire in share group: {}-{}, skipping: {}"
|
||||
log.trace("The batch is not available to acquire in share partition: {}-{}, skipping: {}"
|
||||
+ " skipping offset tracking for batch as well.", groupId,
|
||||
topicIdPartition, inFlightBatch);
|
||||
continue;
|
||||
|
@ -423,14 +423,14 @@ public class SharePartition {
|
|||
|
||||
// The in-flight batch is a full match hence change the state of the complete batch.
|
||||
if (inFlightBatch.batchState() != RecordState.AVAILABLE) {
|
||||
log.trace("The batch is not available to acquire in share group: {}-{}, skipping: {}",
|
||||
log.trace("The batch is not available to acquire in share partition: {}-{}, skipping: {}",
|
||||
groupId, topicIdPartition, inFlightBatch);
|
||||
continue;
|
||||
}
|
||||
|
||||
InFlightState updateResult = inFlightBatch.tryUpdateBatchState(RecordState.ACQUIRED, true, maxDeliveryCount, memberId);
|
||||
if (updateResult == null) {
|
||||
log.info("Unable to acquire records for the batch: {} in share group: {}-{}",
|
||||
log.info("Unable to acquire records for the batch: {} in share partition: {}-{}",
|
||||
inFlightBatch, groupId, topicIdPartition);
|
||||
continue;
|
||||
}
|
||||
|
@ -972,7 +972,7 @@ public class SharePartition {
|
|||
|
||||
if (offsetState.getValue().state != RecordState.AVAILABLE) {
|
||||
log.trace("The offset is not available skipping, offset: {} batch: {}"
|
||||
+ " for the share group: {}-{}", offsetState.getKey(), inFlightBatch,
|
||||
+ " for the share partition: {}-{}", offsetState.getKey(), inFlightBatch,
|
||||
groupId, topicIdPartition);
|
||||
continue;
|
||||
}
|
||||
|
@ -981,7 +981,7 @@ public class SharePartition {
|
|||
memberId);
|
||||
if (updateResult == null) {
|
||||
log.trace("Unable to acquire records for the offset: {} in batch: {}"
|
||||
+ " for the share group: {}-{}", offsetState.getKey(), inFlightBatch,
|
||||
+ " for the share partition: {}-{}", offsetState.getKey(), inFlightBatch,
|
||||
groupId, topicIdPartition);
|
||||
continue;
|
||||
}
|
||||
|
@ -1601,7 +1601,7 @@ public class SharePartition {
|
|||
if (!stateBatches.isEmpty() && !isWriteShareGroupStateSuccessful(stateBatches)) {
|
||||
|
||||
// Even if write share group state RPC call fails, we will still go ahead with the state transition.
|
||||
log.error("Failed to write the share group state on acquisition lock timeout for share partition: {}-{} memberId {}. " +
|
||||
log.error("Failed to write the share group state on acquisition lock timeout for share partition: {}-{} memberId: {}. " +
|
||||
"Proceeding with state transition.", groupId, topicIdPartition, memberId);
|
||||
}
|
||||
|
||||
|
@ -1637,7 +1637,7 @@ public class SharePartition {
|
|||
return;
|
||||
}
|
||||
log.debug("The batch is not in acquired state while release of acquisition lock on timeout, skipping, batch: {}"
|
||||
+ " for the share group: {}-{}-{}", inFlightBatch, groupId, memberId, topicIdPartition);
|
||||
+ " for the share partition: {}-{} memberId: {}", inFlightBatch, groupId, topicIdPartition, memberId);
|
||||
}
|
||||
|
||||
private void releaseAcquisitionLockOnTimeoutForPerOffsetBatch(InFlightBatch inFlightBatch,
|
||||
|
@ -1658,7 +1658,7 @@ public class SharePartition {
|
|||
}
|
||||
if (offsetState.getValue().state != RecordState.ACQUIRED) {
|
||||
log.debug("The offset is not in acquired state while release of acquisition lock on timeout, skipping, offset: {} batch: {}"
|
||||
+ " for the share group: {}-{} memberId: {}", offsetState.getKey(), inFlightBatch,
|
||||
+ " for the share partition: {}-{} memberId: {}", offsetState.getKey(), inFlightBatch,
|
||||
groupId, topicIdPartition, memberId);
|
||||
continue;
|
||||
}
|
||||
|
@ -1669,7 +1669,7 @@ public class SharePartition {
|
|||
EMPTY_MEMBER_ID);
|
||||
if (updateResult == null) {
|
||||
log.error("Unable to release acquisition lock on timeout for the offset: {} in batch: {}"
|
||||
+ " for the share group: {}-{} memberId: {}", offsetState.getKey(), inFlightBatch,
|
||||
+ " for the share partition: {}-{} memberId: {}", offsetState.getKey(), inFlightBatch,
|
||||
groupId, topicIdPartition, memberId);
|
||||
continue;
|
||||
}
|
||||
|
|
|
@ -461,7 +461,7 @@ public class SharePartitionManager implements AutoCloseable {
|
|||
if (shareSession.epoch != reqMetadata.epoch()) {
|
||||
log.debug("Share session error for {}: expected epoch {}, but got {} instead", key,
|
||||
shareSession.epoch, reqMetadata.epoch());
|
||||
throw Errors.INVALID_SHARE_SESSION_EPOCH.exception();
|
||||
throw Errors.INVALID_SHARE_SESSION_EPOCH.exception();
|
||||
} else {
|
||||
cache.touch(shareSession, time.milliseconds());
|
||||
shareSession.epoch = ShareRequestMetadata.nextEpoch(shareSession.epoch);
|
||||
|
|
Loading…
Reference in New Issue