KAFKA-19076 replace `String` by `Supplier<String>` for UnifiedLog#maybeHandleIOException (#19392)

jira: https://issues.apache.org/jira/browse/KAFKA-19076

the message is used when the function encounters error, so the error
message should be created lazy.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Nick Guo 2025-04-07 00:43:44 +08:00 committed by GitHub
parent a65626b6a8
commit e69a311068
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 14 additions and 13 deletions

View File

@ -83,6 +83,7 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
@ -922,7 +923,7 @@ public class UnifiedLog implements AutoCloseable {
localLog.checkIfMemoryMappedBufferClosed();
producerExpireCheck.cancel(true);
maybeHandleIOException(
"Error while renaming dir for " + topicPartition() + " in dir " + dir().getParent(),
() -> "Error while renaming dir for " + topicPartition() + " in dir " + dir().getParent(),
() -> {
// We take a snapshot at the last written offset to hopefully avoid the need to scan the log
// after restarting and to ensure that we cannot inadvertently hit the upgrade optimization
@ -945,7 +946,7 @@ public class UnifiedLog implements AutoCloseable {
public void renameDir(String name, boolean shouldReinitialize) {
synchronized (lock) {
maybeHandleIOException(
"Error while renaming dir for " + topicPartition() + " in log dir " + dir().getParent(),
() -> "Error while renaming dir for " + topicPartition() + " in log dir " + dir().getParent(),
() -> {
// Flush partitionMetadata file before initializing again
maybeFlushMetadataFile();
@ -1087,7 +1088,7 @@ public class UnifiedLog implements AutoCloseable {
// they are valid, insert them in the log
synchronized (lock) {
return maybeHandleIOException(
"Error while appending records to " + topicPartition() + " in dir " + dir().getParent(),
() -> "Error while appending records to " + topicPartition() + " in dir " + dir().getParent(),
() -> {
MemoryRecords validRecords = trimmedRecords;
localLog.checkIfMemoryMappedBufferClosed();
@ -1300,7 +1301,7 @@ public class UnifiedLog implements AutoCloseable {
// The deleteRecordsOffset may be lost only if all in-sync replicas of this broker are shutdown
// in an unclean manner within log.flush.start.offset.checkpoint.interval.ms. The chance of this happening is low.
return maybeHandleIOException(
"Exception while increasing log start offset for " + topicPartition() + " to " + newLogStartOffset + " in dir " + dir().getParent(),
() -> "Exception while increasing log start offset for " + topicPartition() + " to " + newLogStartOffset + " in dir " + dir().getParent(),
() -> {
synchronized (lock) {
if (newLogStartOffset > highWatermark()) {
@ -1613,7 +1614,7 @@ public class UnifiedLog implements AutoCloseable {
*/
public OffsetResultHolder fetchOffsetByTimestamp(long targetTimestamp, Optional<AsyncOffsetReader> remoteOffsetReader) {
return maybeHandleIOException(
"Error while fetching offset by timestamp for " + topicPartition() + " in dir " + dir().getParent(),
() -> "Error while fetching offset by timestamp for " + topicPartition() + " in dir " + dir().getParent(),
() -> {
logger.debug("Searching offset for timestamp {}.", targetTimestamp);
@ -1831,7 +1832,7 @@ public class UnifiedLog implements AutoCloseable {
}
private int deleteSegments(List<LogSegment> deletable, SegmentDeletionReason reason) {
return maybeHandleIOException("Error while deleting segments for " + topicPartition() + " in dir " + dir().getParent(),
return maybeHandleIOException(() -> "Error while deleting segments for " + topicPartition() + " in dir " + dir().getParent(),
() -> {
int numToDelete = deletable.size();
if (numToDelete > 0) {
@ -2138,7 +2139,7 @@ public class UnifiedLog implements AutoCloseable {
long flushOffset = includingOffset ? offset + 1 : offset;
String includingOffsetStr = includingOffset ? "inclusive" : "exclusive";
maybeHandleIOException(
"Error while flushing log for " + topicPartition() + " in dir " + dir().getParent() + " with offset " + offset +
() -> "Error while flushing log for " + topicPartition() + " in dir " + dir().getParent() + " with offset " + offset +
" (" + includingOffsetStr + ") and recovery point " + offset,
() -> {
if (flushOffset > localLog.recoveryPoint()) {
@ -2158,7 +2159,7 @@ public class UnifiedLog implements AutoCloseable {
*/
public void delete() {
maybeHandleIOException(
"Error while deleting log for " + topicPartition() + " in dir " + dir().getParent(),
() -> "Error while deleting log for " + topicPartition() + " in dir " + dir().getParent(),
() -> {
synchronized (lock) {
localLog.checkIfMemoryMappedBufferClosed();
@ -2204,7 +2205,7 @@ public class UnifiedLog implements AutoCloseable {
// visible for testing
public void flushProducerStateSnapshot(Path snapshot) {
maybeHandleIOException(
"Error while deleting producer state snapshot " + snapshot + " for " + topicPartition() + " in dir " + dir().getParent(),
() -> "Error while deleting producer state snapshot " + snapshot + " for " + topicPartition() + " in dir " + dir().getParent(),
() -> {
Utils.flushFileIfExists(snapshot);
return null;
@ -2219,7 +2220,7 @@ public class UnifiedLog implements AutoCloseable {
*/
public boolean truncateTo(long targetOffset) {
return maybeHandleIOException(
"Error while truncating log to offset " + targetOffset + " for " + topicPartition() + " in dir " + dir().getParent(),
() -> "Error while truncating log to offset " + targetOffset + " for " + topicPartition() + " in dir " + dir().getParent(),
() -> {
if (targetOffset < 0) {
throw new IllegalArgumentException("Cannot truncate partition " + topicPartition() + " to a negative offset (" + targetOffset + ").");
@ -2263,7 +2264,7 @@ public class UnifiedLog implements AutoCloseable {
*/
public void truncateFullyAndStartAt(long newOffset, Optional<Long> logStartOffsetOpt) {
maybeHandleIOException(
"Error while truncating the entire log for " + topicPartition() + " in dir " + dir().getParent(),
() -> "Error while truncating the entire log for " + topicPartition() + " in dir " + dir().getParent(),
() -> {
logger.debug("Truncate and start at offset {}, logStartOffset: {}", newOffset, logStartOffsetOpt.orElse(newOffset));
synchronized (lock) {
@ -2370,8 +2371,8 @@ public class UnifiedLog implements AutoCloseable {
metricNames.clear();
}
private <T> T maybeHandleIOException(String msg, StorageAction<T, IOException> fun) throws KafkaStorageException {
return LocalLog.maybeHandleIOException(logDirFailureChannel(), parentDir(), () -> msg, fun);
private <T> T maybeHandleIOException(Supplier<String> msg, StorageAction<T, IOException> fun) throws KafkaStorageException {
return LocalLog.maybeHandleIOException(logDirFailureChannel(), parentDir(), msg, fun);
}
public List<LogSegment> splitOverflowedSegment(LogSegment segment) throws IOException {