Compare commits

...

1 Commits

Author SHA1 Message Date
1sonofqiu 684f3b55c3
feat(stream): impl log tail truncate 2025-10-28 23:55:06 +08:00
20 changed files with 563 additions and 50 deletions

View File

@ -180,6 +180,11 @@ public class BootstrapWalV1 implements WriteAheadLog {
return wal.trim(offset);
}
@Override
public CompletableFuture<Void> truncateTail(RecordOffset offset) {
return wal.truncateTail(offset);
}
private CompletableFuture<? extends WriteAheadLog> buildRecoverWal(String kraftWalConfigs, long oldNodeEpoch) {
IdURI uri = IdURI.parse(kraftWalConfigs);
CompletableFuture<Void> cf = walHandle

View File

@ -292,6 +292,11 @@ public class ClientWrapper implements Client {
}, generalCallbackExecutors);
}
@Override
public CompletableFuture<Void> truncateTail(long newNextOffset) {
return failureHandle(stream.truncateTail(newNextOffset).thenApplyAsync(nil -> nil, streamManagerCallbackExecutors));
}
@Override
public CompletableFuture<Void> close() {
return failureHandle(stream.close().thenApplyAsync(nil -> nil, streamManagerCallbackExecutors));

View File

@ -144,6 +144,11 @@ public class LazyStream implements Stream {
return inner.trim(newStartOffset);
}
@Override
public CompletableFuture<Void> truncateTail(long newNextOffset) {
return inner.truncateTail(newNextOffset);
}
@Override
public CompletableFuture<FetchResult> fetch(FetchContext context, long startOffset, long endOffset, int maxBytesHint) {
return inner.fetch(context, startOffset, endOffset, maxBytesHint);
@ -228,6 +233,11 @@ public class LazyStream implements Stream {
return CompletableFuture.completedFuture(null);
}
@Override
public CompletableFuture<Void> truncateTail(long newNextOffset) {
return CompletableFuture.completedFuture(null);
}
@Override
public CompletableFuture<FetchResult> fetch(FetchContext context, long startOffset, long endOffset, int maxBytesHint) {
return CompletableFuture.completedFuture(Collections::emptyList);

View File

@ -165,6 +165,13 @@ public class MemoryClient implements Client {
return CompletableFuture.completedFuture(null);
}
@Override
public synchronized CompletableFuture<Void> truncateTail(long newNextOffset) {
recordMap = new ConcurrentSkipListMap<>(recordMap.headMap(newNextOffset, false));
nextOffsetAlloc.updateAndGet(current -> Math.min(current, newNextOffset));
return CompletableFuture.completedFuture(null);
}
@Override
public CompletableFuture<Void> close() {
return CompletableFuture.completedFuture(null);

View File

@ -174,6 +174,12 @@ public class MetaStream implements Stream {
return innerStream.trim(newStartOffset);
}
@Override
public CompletableFuture<Void> truncateTail(long newNextOffset) {
metaCache.entrySet().removeIf(entry -> entry.getValue().offset >= newNextOffset);
return innerStream.truncateTail(newNextOffset);
}
@Override
public CompletableFuture<Void> close() {
if (compactionFuture != null) {

View File

@ -99,6 +99,14 @@ public interface Stream {
*/
CompletableFuture<Void> trim(long newStartOffset);
/**
* Truncate the tail of the stream so that subsequent appends start from {@code newNextOffset}.
*
* @param newNextOffset new next offset after truncation
* @return future completing when truncation finishes
*/
CompletableFuture<Void> truncateTail(long newNextOffset);
/**
* Close the stream.
*/

View File

@ -236,7 +236,7 @@ public class S3Storage implements Storage {
) {
RecordOffset logEndOffset = null;
Map<Long, Long> streamNextOffsets = new HashMap<>();
Map<Long, Queue<StreamRecordBatch>> streamDiscontinuousRecords = new HashMap<>();
Map<Long, Queue<RecoverRecord>> streamDiscontinuousRecords = new HashMap<>();
LogCache.LogCacheBlock cacheBlock = new LogCache.LogCacheBlock(maxCacheSize);
boolean first = true;
@ -249,11 +249,11 @@ public class S3Storage implements Storage {
first = false;
}
StreamRecordBatch streamRecordBatch = recoverResult.record();
processRecoveredRecord(streamRecordBatch, openingStreamEndOffsets, streamDiscontinuousRecords, cacheBlock, streamNextOffsets, logger);
processRecoveredRecord(streamRecordBatch, recoverResult.recordOffset(), openingStreamEndOffsets, streamDiscontinuousRecords, cacheBlock, streamNextOffsets, logger);
}
} catch (Throwable e) {
// {@link RuntimeIOException} may be thrown by {@code it.next()}
releaseAllRecords(streamDiscontinuousRecords.values());
releaseRecoverRecords(streamDiscontinuousRecords.values());
releaseAllRecords(cacheBlock.records().values());
throw e;
}
@ -278,8 +278,9 @@ public class S3Storage implements Storage {
*/
private static void processRecoveredRecord(
StreamRecordBatch streamRecordBatch,
RecordOffset recordOffset,
Map<Long, Long> openingStreamEndOffsets,
Map<Long, Queue<StreamRecordBatch>> streamDiscontinuousRecords,
Map<Long, Queue<RecoverRecord>> streamDiscontinuousRecords,
LogCache.LogCacheBlock cacheBlock,
Map<Long, Long> streamNextOffsets,
Logger logger
@ -294,19 +295,19 @@ public class S3Storage implements Storage {
}
Long expectedNextOffset = streamNextOffsets.get(streamId);
Queue<StreamRecordBatch> discontinuousRecords = streamDiscontinuousRecords.get(streamId);
Queue<RecoverRecord> discontinuousRecords = streamDiscontinuousRecords.get(streamId);
boolean isContinuous = expectedNextOffset == null || expectedNextOffset == streamRecordBatch.getBaseOffset();
if (!isContinuous) {
// unexpected record, put it into discontinuous records queue.
if (discontinuousRecords == null) {
discontinuousRecords = new PriorityQueue<>(Comparator.comparingLong(StreamRecordBatch::getBaseOffset));
discontinuousRecords = new PriorityQueue<>(Comparator.comparingLong(r -> r.record.getBaseOffset()));
streamDiscontinuousRecords.put(streamId, discontinuousRecords);
}
discontinuousRecords.add(streamRecordBatch);
discontinuousRecords.add(new RecoverRecord(streamRecordBatch, recordOffset));
return;
}
// continuous record, put it into cache, and check if there is any historical discontinuous records can be polled.
cacheBlock.put(streamRecordBatch);
cacheBlock.put(streamRecordBatch, recordOffset);
expectedNextOffset = maybePollDiscontinuousRecords(streamRecordBatch, cacheBlock, discontinuousRecords, logger);
streamNextOffsets.put(streamId, expectedNextOffset);
}
@ -314,7 +315,7 @@ public class S3Storage implements Storage {
private static long maybePollDiscontinuousRecords(
StreamRecordBatch streamRecordBatch,
LogCache.LogCacheBlock cacheBlock,
Queue<StreamRecordBatch> discontinuousRecords,
Queue<RecoverRecord> discontinuousRecords,
Logger logger
) {
long expectedNextOffset = streamRecordBatch.getLastOffset();
@ -323,25 +324,28 @@ public class S3Storage implements Storage {
}
// check and poll historical discontinuous records.
while (!discontinuousRecords.isEmpty()) {
StreamRecordBatch peek = discontinuousRecords.peek();
if (peek.getBaseOffset() != expectedNextOffset) {
RecoverRecord peek = discontinuousRecords.peek();
if (peek.record.getBaseOffset() != expectedNextOffset) {
break;
}
// should never happen, log it.
logger.error("[BUG] recover an out of order record, streamId={}, expectedNextOffset={}, record={}", streamRecordBatch.getStreamId(), expectedNextOffset, peek);
logger.error("[BUG] recover an out of order record, streamId={}, expectedNextOffset={}, record={}", streamRecordBatch.getStreamId(), expectedNextOffset, peek.record);
discontinuousRecords.poll();
cacheBlock.put(peek);
expectedNextOffset = peek.getLastOffset();
cacheBlock.put(peek.record, peek.walOffset);
expectedNextOffset = peek.record.getLastOffset();
}
return expectedNextOffset;
}
private static void releaseDiscontinuousRecords(Map<Long, Queue<StreamRecordBatch>> streamDiscontinuousRecords,
private static void releaseDiscontinuousRecords(Map<Long, Queue<RecoverRecord>> streamDiscontinuousRecords,
Logger logger) {
streamDiscontinuousRecords.values().stream()
.filter(q -> !q.isEmpty())
.peek(q -> logger.info("drop discontinuous records, records={}", q))
.forEach(S3Storage::releaseRecords);
.forEach(queue -> {
queue.forEach(record -> record.record.release());
queue.clear();
});
}
/**
@ -375,7 +379,7 @@ public class S3Storage implements Storage {
LogCache.LogCacheBlock newCacheBlock = new LogCache.LogCacheBlock(1024L * 1024 * 1024);
cacheBlock.records().forEach((streamId, records) -> {
if (!invalidStreams.contains(streamId)) {
records.forEach(newCacheBlock::put);
records.forEach(record -> newCacheBlock.put(record, null));
} else {
// release invalid records.
releaseRecords(records);
@ -388,6 +392,13 @@ public class S3Storage implements Storage {
allRecords.forEach(S3Storage::releaseRecords);
}
private static void releaseRecoverRecords(Collection<Queue<RecoverRecord>> allRecords) {
allRecords.forEach(queue -> {
queue.forEach(record -> record.record.release());
queue.clear();
});
}
private static void releaseRecords(Collection<StreamRecordBatch> records) {
records.forEach(StreamRecordBatch::release);
}
@ -648,6 +659,78 @@ public class S3Storage implements Storage {
return snapshotReadCache;
}
@Override
public CompletableFuture<Void> truncateTail(long streamId, long newNextOffset) {
if (streamId < 0) {
return CompletableFuture.failedFuture(new IllegalArgumentException("streamId must be non-negative"));
}
if (newNextOffset < 0) {
return CompletableFuture.failedFuture(new IllegalArgumentException("newNextOffset must be non-negative"));
}
try {
RecordOffset walOffset = prepareTruncateTail(streamId, newNextOffset);
if (walOffset == null) {
return CompletableFuture.completedFuture(null);
}
return deltaWAL.truncateTail(walOffset);
} catch (Exception e) {
return CompletableFuture.failedFuture(e);
}
}
private RecordOffset prepareTruncateTail(long streamId, long newNextOffset) {
synchronized (this) {
validateTruncateTailState(streamId);
cancelBackoffRecords(streamId, newNextOffset);
RecordOffset walOffset = selectFirstRemovedWalOffset(streamId, newNextOffset);
if (walOffset != null) {
deltaWALCache.setLastRecordOffset(walOffset);
}
return walOffset;
}
}
private void validateTruncateTailState(long streamId) {
if (!walPrepareQueue.isEmpty() || !walCommitQueue.isEmpty()) {
throw new IllegalStateException("Cannot truncate tail when WAL upload tasks are pending");
}
boolean hasInflight = inflightWALUploadTasks.stream().anyMatch(ctx -> ctx.cache.containsStream(streamId));
if (hasInflight) {
throw new IllegalStateException("Cannot truncate tail while stream has inflight WAL uploads");
}
}
private void cancelBackoffRecords(long streamId, long newNextOffset) {
Iterator<WalWriteRequest> iterator = backoffRecords.iterator();
while (iterator.hasNext()) {
WalWriteRequest request = iterator.next();
StreamRecordBatch record = request.record;
if (record.getStreamId() >= 0 && record.getStreamId() == streamId && record.getBaseOffset() >= newNextOffset) {
iterator.remove();
request.cf.completeExceptionally(new IllegalStateException("Append cancelled due to truncate"));
record.release();
}
}
}
private RecordOffset selectFirstRemovedWalOffset(long streamId, long newNextOffset) {
Optional<LogCache.TruncateResult> deltaResult = deltaWALCache.truncateStreamRecords(streamId, newNextOffset);
Optional<LogCache.TruncateResult> snapshotResult = snapshotReadCache == null
? Optional.empty()
: snapshotReadCache.truncateStreamRecords(streamId, newNextOffset);
RecordOffset walOffset = deltaResult
.map(r -> r.firstRemovedWalOffset)
.filter(Objects::nonNull)
.orElse(null);
if (walOffset == null) {
walOffset = snapshotResult
.map(r -> r.firstRemovedWalOffset)
.filter(Objects::nonNull)
.orElse(null);
}
return walOffset;
}
@SuppressWarnings({"checkstyle:npathcomplexity"})
@WithSpan
private CompletableFuture<ReadDataBlock> read0(FetchContext context,
@ -806,7 +889,7 @@ public class S3Storage implements Storage {
private void handleAppendCallback0(WalWriteRequest request) {
final long startTime = System.nanoTime();
request.record.retain();
boolean full = deltaWALCache.put(request.record);
boolean full = deltaWALCache.put(request.record, request.offset);
deltaWALCache.setLastRecordOffset(request.offset);
if (full) {
// cache block is full, trigger WAL upload.
@ -1062,6 +1145,16 @@ public class S3Storage implements Storage {
}
}
static class RecoverRecord {
final StreamRecordBatch record;
final RecordOffset walOffset;
RecoverRecord(StreamRecordBatch record, RecordOffset walOffset) {
this.record = record;
this.walOffset = walOffset;
}
}
public static class LazyCommit {
final CompletableFuture<Void> cf = new CompletableFuture<>();
final long lazyLingerMs;

View File

@ -55,6 +55,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.TimeUnit;
@ -365,6 +366,78 @@ public class S3Stream implements Stream, StreamMetadataListener {
return trimCf;
}
@Override
public CompletableFuture<Void> truncateTail(long newNextOffset) {
if (snapshotRead()) {
return FutureUtil.failedFuture(new IllegalStateException("truncateTail is not support for readonly stream"));
}
try {
truncateTailSync(newNextOffset);
return CompletableFuture.completedFuture(null);
} catch (RuntimeException e) {
return CompletableFuture.failedFuture(e);
}
}
private void truncateTailSync(long newNextOffset) {
writeLock.lock();
try {
ensureTruncateTailAllowed(newNextOffset);
waitForInFlightOperations();
invokeStorageTruncate(newNextOffset);
nextOffset.updateAndGet(old -> Math.min(old, newNextOffset));
confirmOffset.updateAndGet(old -> Math.min(old, newNextOffset));
} finally {
writeLock.unlock();
}
}
private void ensureTruncateTailAllowed(long newNextOffset) {
if (!status.isWritable()) {
throw new StreamClientException(ErrorCode.STREAM_ALREADY_CLOSED, logIdent + "stream is not writable");
}
if (newNextOffset < startOffset()) {
throw new IllegalArgumentException("newNextOffset " + newNextOffset + " is less than start offset " + startOffset());
}
long currentNext = nextOffset.get();
if (newNextOffset > currentNext) {
throw new IllegalArgumentException("newNextOffset " + newNextOffset + " is greater than current next offset " + currentNext);
}
}
private void waitForInFlightOperations() {
List<CompletableFuture<?>> waits = new ArrayList<>(pendingAppends);
waits.addAll(pendingFetches);
if (lastAppendFuture != null) {
waits.add(lastAppendFuture);
}
if (waits.isEmpty()) {
return;
}
CompletableFuture<?>[] array = waits.toArray(new CompletableFuture[0]);
try {
CompletableFuture.allOf(array).join();
} catch (CompletionException e) {
throw wrapCompletionException(e);
}
}
private void invokeStorageTruncate(long newNextOffset) {
try {
storage.truncateTail(streamId, newNextOffset).join();
} catch (CompletionException e) {
throw wrapCompletionException(e);
}
}
private RuntimeException wrapCompletionException(CompletionException exception) {
Throwable cause = FutureUtil.cause(exception);
if (cause instanceof RuntimeException) {
return (RuntimeException) cause;
}
return new RuntimeException(cause);
}
@Override
public CompletableFuture<Void> close() {
return close(false);

View File

@ -351,6 +351,11 @@ public class S3StreamClient implements StreamClient {
return stream.trim(newStartOffset);
}
@Override
public CompletableFuture<Void> truncateTail(long newNextOffset) {
return stream.truncateTail(newNextOffset);
}
@Override
public CompletableFuture<Void> close() {
return close(false);

View File

@ -57,4 +57,13 @@ public interface Storage {
* Force stream record in WAL upload to s3
*/
CompletableFuture<Void> forceUpload(long streamId);
/**
* Rollback the tail of a stream so that subsequent appends start from {@code newNextOffset}.
*
* @param streamId stream identifier
* @param newNextOffset new next offset after rollback
* @return future complete when rollback finished
*/
CompletableFuture<Void> truncateTail(long streamId, long newNextOffset);
}

View File

@ -24,6 +24,7 @@ import com.automq.stream.s3.metrics.stats.StorageOperationStats;
import com.automq.stream.s3.model.StreamRecordBatch;
import com.automq.stream.s3.trace.context.TraceContext;
import com.automq.stream.s3.wal.RecordOffset;
import com.automq.stream.s3.wal.impl.DefaultRecordOffset;
import com.automq.stream.utils.biniarysearch.StreamRecordBatchList;
import org.slf4j.Logger;
@ -36,6 +37,7 @@ import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
@ -92,14 +94,14 @@ public class LogCache {
* Put a record batch into the cache.
* record batched in the same stream should be put in order.
*/
public boolean put(StreamRecordBatch recordBatch) {
public boolean put(StreamRecordBatch recordBatch, RecordOffset walOffset) {
long startTime = System.nanoTime();
tryRealFree();
size.addAndGet(recordBatch.occupiedSize());
readLock.lock();
boolean full;
try {
full = activeBlock.put(recordBatch);
full = activeBlock.put(recordBatch, walOffset);
} finally {
readLock.unlock();
}
@ -356,6 +358,37 @@ public class LogCache {
}
}
public Optional<TruncateResult> truncateStreamRecords(long streamId, long newNextOffset) {
if (streamId == MATCH_ALL_STREAMS) {
throw new IllegalArgumentException("streamId must not be MATCH_ALL_STREAMS");
}
List<TruncateResult> results = new ArrayList<>();
readLock.lock();
try {
for (LogCacheBlock block : blocks) {
TruncateResult result = block.truncateStream(streamId, newNextOffset);
if (result != null) {
results.add(result);
}
}
} finally {
readLock.unlock();
}
if (results.isEmpty()) {
return Optional.empty();
}
long freedBytes = results.stream().mapToLong(r -> r.freedBytes).sum();
if (freedBytes > 0) {
size.addAndGet(-freedBytes);
}
RecordOffset earliest = results.stream()
.map(r -> r.firstRemovedWalOffset)
.filter(Objects::nonNull)
.min((a, b) -> Long.compare(DefaultRecordOffset.of(a).offset(), DefaultRecordOffset.of(b).offset()))
.orElse(null);
return Optional.of(new TruncateResult(freedBytes, earliest));
}
static boolean isDiscontinuous(LogCacheBlock left, LogCacheBlock right) {
for (Map.Entry<Long, StreamCache> entry : left.map.entrySet()) {
Long streamId = entry.getKey();
@ -382,6 +415,7 @@ public class LogCache {
StreamCache rightStreamCache = right.map.get(streamId);
if (rightStreamCache != null) {
leftStreamCache.records.addAll(rightStreamCache.records);
leftStreamCache.walOffsets.addAll(rightStreamCache.walOffsets);
leftStreamCache.endOffset(rightStreamCache.endOffset());
}
});
@ -393,6 +427,16 @@ public class LogCache {
}
}
public static class TruncateResult {
public final long freedBytes;
public final RecordOffset firstRemovedWalOffset;
public TruncateResult(long freedBytes, RecordOffset firstRemovedWalOffset) {
this.freedBytes = freedBytes;
this.firstRemovedWalOffset = firstRemovedWalOffset;
}
}
public static class LogCacheBlock {
private static final AtomicLong BLOCK_ID_ALLOC = new AtomicLong();
final Map<Long, StreamCache> map = new ConcurrentHashMap<>();
@ -423,12 +467,12 @@ public class LogCache {
return size.get() >= maxSize || map.size() >= maxStreamCount;
}
public boolean put(StreamRecordBatch recordBatch) {
public boolean put(StreamRecordBatch recordBatch, RecordOffset walOffset) {
map.compute(recordBatch.getStreamId(), (id, cache) -> {
if (cache == null) {
cache = new StreamCache();
}
cache.add(recordBatch);
cache.add(recordBatch, walOffset);
return cache;
});
size.addAndGet(recordBatch.occupiedSize());
@ -452,6 +496,22 @@ public class LogCache {
}
}
TruncateResult truncateStream(long streamId, long newNextOffset) {
StreamCache cache = map.get(streamId);
if (cache == null) {
return null;
}
TruncateResult result = cache.truncate(newNextOffset);
if (result == null) {
return null;
}
size.addAndGet(-result.freedBytes);
if (cache.isEmpty()) {
map.remove(streamId);
}
return result;
}
public Map<Long, List<StreamRecordBatch>> records() {
return map.entrySet().stream()
.map(e -> Map.entry(e.getKey(), e.getValue().records))
@ -549,17 +609,19 @@ public class LogCache {
static class StreamCache {
List<StreamRecordBatch> records = new ArrayList<>();
List<RecordOffset> walOffsets = new ArrayList<>();
long startOffset = NOOP_OFFSET;
long endOffset = NOOP_OFFSET;
Map<Long, IndexAndCount> offsetIndexMap = new HashMap<>();
synchronized void add(StreamRecordBatch recordBatch) {
synchronized void add(StreamRecordBatch recordBatch, RecordOffset walOffset) {
if (recordBatch.getBaseOffset() != endOffset && endOffset != NOOP_OFFSET) {
RuntimeException ex = new IllegalArgumentException(String.format("streamId=%s record batch base offset mismatch, expect %s, actual %s",
recordBatch.getStreamId(), endOffset, recordBatch.getBaseOffset()));
LOGGER.error("[FATAL]", ex);
}
records.add(recordBatch);
walOffsets.add(walOffset);
if (startOffset == NOOP_OFFSET) {
startOffset = recordBatch.getBaseOffset();
}
@ -593,6 +655,44 @@ public class LogCache {
return new ArrayList<>(records.subList(startIndex, endIndex));
}
synchronized TruncateResult truncate(long newNextOffset) {
if (records.isEmpty()) {
return null;
}
int removeFrom = -1;
for (int i = 0; i < records.size(); i++) {
StreamRecordBatch record = records.get(i);
if (record.getLastOffset() >= newNextOffset) {
removeFrom = i;
break;
}
}
if (removeFrom == -1) {
return null;
}
RecordOffset firstRemoved = walOffsets.get(removeFrom);
long freed = 0L;
for (int i = records.size() - 1; i >= removeFrom; i--) {
StreamRecordBatch record = records.remove(i);
freed += record.occupiedSize();
record.release();
walOffsets.remove(i);
}
Iterator<Map.Entry<Long, IndexAndCount>> iterator = offsetIndexMap.entrySet().iterator();
while (iterator.hasNext()) {
if (iterator.next().getKey() >= newNextOffset) {
iterator.remove();
}
}
if (records.isEmpty()) {
startOffset = endOffset = NOOP_OFFSET;
} else {
startOffset = records.get(0).getBaseOffset();
endOffset = records.get(records.size() - 1).getLastOffset();
}
return new TruncateResult(freed, firstRemoved);
}
int searchStartIndex(long startOffset) {
IndexAndCount indexAndCount = offsetIndexMap.get(startOffset);
if (indexAndCount != null) {
@ -629,6 +729,11 @@ public class LogCache {
synchronized void free() {
records.forEach(StreamRecordBatch::release);
records.clear();
walOffsets.clear();
}
synchronized boolean isEmpty() {
return records.isEmpty();
}
synchronized long startOffset() {

View File

@ -33,6 +33,7 @@ import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
@ -112,7 +113,7 @@ public class SnapshotReadCache {
// The LogCacheBlock doesn't accept discontinuous record batches.
cache.clearStreamRecords(streamId);
}
if (cache.put(batch)) {
if (cache.put(batch, null)) {
// the block is full
LogCache.LogCacheBlock cacheBlock = cache.archiveCurrentBlock();
cacheBlock.addFreeListener(cacheFreeListener);
@ -138,6 +139,13 @@ public class SnapshotReadCache {
cacheFreeListener.addListener(eventListener);
}
public Optional<LogCache.TruncateResult> truncateStreamRecords(long streamId, long newNextOffset) {
if (cache == null) {
return Optional.empty();
}
return cache.truncateStreamRecords(streamId, newNextOffset);
}
@EventLoopSafe
private void clearStream(Long streamId) {
cache.clearStreamRecords(streamId);

View File

@ -84,4 +84,14 @@ public interface WriteAheadLog {
* @return future complete when trim done.
*/
CompletableFuture<Void> trim(RecordOffset offset);
/**
* Truncate wal tail so that all records with offset greater than or equal to {@code offset}
* are discarded. After completion, {@link #confirmOffset()} and {@code offset} should align
* with the new logical end of the wal.
*
* @param offset new next offset after truncate.
* @return future completes when truncate is done.
*/
CompletableFuture<Void> truncateTail(RecordOffset offset);
}

View File

@ -135,4 +135,16 @@ public class MemoryWriteAheadLog implements WriteAheadLog {
});
return CompletableFuture.completedFuture(null);
}
}
@Override
public CompletableFuture<Void> truncateTail(RecordOffset offset) {
long targetOffset = DefaultRecordOffset.of(offset).offset();
dataMap.tailMap(targetOffset, true)
.forEach((key, value) -> {
dataMap.remove(key);
value.release();
});
offsetAlloc.updateAndGet(current -> Math.min(current, targetOffset));
return CompletableFuture.completedFuture(null);
}
}

View File

@ -47,8 +47,10 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
@ -443,6 +445,112 @@ public class DefaultWriter implements Writer {
return trim0(newStartOffset);
}
@Override
public CompletableFuture<Void> truncateTail(RecordOffset recordOffset) throws WALFencedException {
try {
TailTruncatePlan plan = prepareTruncatePlan(DefaultRecordOffset.of(recordOffset).offset());
if (plan.noop || plan.deleteObjects.isEmpty()) {
return CompletableFuture.completedFuture(null);
}
return objectStorage.delete(plan.deleteObjects).whenComplete((nil, throwable) -> {
if (throwable != null) {
LOGGER.error("Failed to delete WAL objects when truncating tail: {}", plan.deleteObjects, throwable);
}
});
} catch (Throwable t) {
return CompletableFuture.failedFuture(t);
}
}
private TailTruncatePlan prepareTruncatePlan(long targetOffset) throws WALFencedException {
checkStatus();
if (targetOffset < 0) {
throw new IllegalArgumentException("targetOffset must be non-negative");
}
lock.writeLock().lock();
try {
if (targetOffset > nextOffset.get()) {
throw new IllegalArgumentException("targetOffset " + targetOffset + " is greater than current nextOffset " + nextOffset.get());
}
if (targetOffset == nextOffset.get()) {
return TailTruncatePlan.noop();
}
if (targetOffset < trimOffset.get()) {
throw new IllegalArgumentException("targetOffset " + targetOffset + " is less than trimmed offset " + trimOffset.get());
}
if (activeBulk != null || !waitingUploadBulks.isEmpty() || !uploadingBulks.isEmpty()) {
throw new IllegalStateException("Cannot truncate tail while there are pending bulks");
}
List<ObjectStorage.ObjectPath> deleteObjects = new ArrayList<>();
long deletedBytes = collectTailObjects(targetOffset, deleteObjects);
deletedBytes += collectHistoricalObjects(targetOffset, deleteObjects);
objectDataBytes.addAndGet(-deletedBytes);
long alignedTarget = ObjectUtils.ceilAlignOffset(targetOffset);
nextOffset.set(alignedTarget);
flushedOffset.updateAndGet(current -> Math.min(current, alignedTarget));
return new TailTruncatePlan(deleteObjects.isEmpty(), deleteObjects);
} finally {
lock.writeLock().unlock();
}
}
private long collectTailObjects(long targetOffset, List<ObjectStorage.ObjectPath> deleteObjects) {
long deletedSize = 0L;
List<Long> keysToRemove = new ArrayList<>();
for (Map.Entry<Long, WALObject> entry : lastRecordOffset2object.tailMap(targetOffset, true).entrySet()) {
WALObject object = entry.getValue();
ensureNotWithinObject(targetOffset, object);
if (object.startOffset() >= targetOffset) {
deleteObjects.add(new ObjectStorage.ObjectPath(object.bucketId(), object.path()));
deletedSize += object.length();
keysToRemove.add(entry.getKey());
}
}
keysToRemove.forEach(lastRecordOffset2object::remove);
return deletedSize;
}
private long collectHistoricalObjects(long targetOffset, List<ObjectStorage.ObjectPath> deleteObjects) {
if (previousObjects.isEmpty()) {
return 0L;
}
long deletedSize = 0L;
List<WALObject> retained = new ArrayList<>(previousObjects.size());
for (WALObject object : previousObjects) {
ensureNotWithinObject(targetOffset, object);
if (object.startOffset() >= targetOffset) {
deleteObjects.add(new ObjectStorage.ObjectPath(object.bucketId(), object.path()));
deletedSize += object.length();
} else {
retained.add(object);
}
}
previousObjects = retained;
return deletedSize;
}
private static class TailTruncatePlan {
final boolean noop;
final List<ObjectStorage.ObjectPath> deleteObjects;
TailTruncatePlan(boolean noop, List<ObjectStorage.ObjectPath> deleteObjects) {
this.noop = noop;
this.deleteObjects = deleteObjects;
}
static TailTruncatePlan noop() {
return new TailTruncatePlan(true, Collections.emptyList());
}
}
private void ensureNotWithinObject(long targetOffset, WALObject object) {
if (targetOffset > object.startOffset() && targetOffset < object.endOffset()) {
throw new IllegalArgumentException("targetOffset " + targetOffset + " falls inside WAL object " + object);
}
}
@Override
public Iterator<RecoverResult> recover() {
try {

View File

@ -62,6 +62,11 @@ public class NoopWriter implements Writer {
return CompletableFuture.failedFuture(new UnsupportedOperationException("trim is not supported"));
}
@Override
public CompletableFuture<Void> truncateTail(RecordOffset recordOffset) throws WALFencedException {
return CompletableFuture.failedFuture(new UnsupportedOperationException("truncateTail is not supported"));
}
@Override
public Iterator<RecoverResult> recover() {
throw new UnsupportedOperationException("recover is not supported");

View File

@ -125,6 +125,17 @@ public class ObjectWALService implements WriteAheadLog {
}
}
@Override
public CompletableFuture<Void> truncateTail(RecordOffset offset) {
log.info("Truncate tail of S3 WAL to offset: {}", offset);
try {
return writer.truncateTail(offset);
} catch (Throwable e) {
log.error("Truncate tail of S3 WAL failed, due to unrecoverable exception.", e);
return CompletableFuture.failedFuture(e);
}
}
@Override
public String toString() {
return String.format("ObjectWALService{%s@%s-%s-%s}", config.bucketId(), config.nodeId(), config.epoch(), config.type());

View File

@ -42,5 +42,7 @@ public interface Writer {
CompletableFuture<Void> trim(RecordOffset recordOffset) throws WALFencedException;
CompletableFuture<Void> truncateTail(RecordOffset recordOffset) throws WALFencedException;
Iterator<RecoverResult> recover();
}

View File

@ -58,6 +58,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import static com.automq.stream.s3.TestUtils.random;
import static org.junit.jupiter.api.Assertions.assertEquals;
@ -153,14 +154,15 @@ public class S3StorageTest {
Mockito.doAnswer(invocation -> commitCfList.get(commitCfIndex.getAndIncrement())).when(objectManager).commitStreamSetObject(any());
LogCache.LogCacheBlock logCacheBlock1 = new LogCache.LogCacheBlock(1024);
logCacheBlock1.put(newRecord(233L, 10L));
logCacheBlock1.put(newRecord(234L, 10L));
AtomicLong walOffsetGen = new AtomicLong();
logCacheBlock1.put(newRecord(233L, 10L), DefaultRecordOffset.of(0, walOffsetGen.getAndIncrement(), 0));
logCacheBlock1.put(newRecord(234L, 10L), DefaultRecordOffset.of(0, walOffsetGen.getAndIncrement(), 0));
logCacheBlock1.lastRecordOffset(DefaultRecordOffset.of(0, 10L, 0));
CompletableFuture<Void> cf1 = storage.uploadDeltaWAL(logCacheBlock1);
LogCache.LogCacheBlock logCacheBlock2 = new LogCache.LogCacheBlock(1024);
logCacheBlock2.put(newRecord(233L, 20L));
logCacheBlock2.put(newRecord(234L, 20L));
logCacheBlock2.put(newRecord(233L, 20L), DefaultRecordOffset.of(0, walOffsetGen.getAndIncrement(), 0));
logCacheBlock2.put(newRecord(234L, 20L), DefaultRecordOffset.of(0, walOffsetGen.getAndIncrement(), 0));
logCacheBlock2.lastRecordOffset(DefaultRecordOffset.of(0, 20L, 0));
CompletableFuture<Void> cf2 = storage.uploadDeltaWAL(logCacheBlock2);
@ -331,7 +333,7 @@ public class S3StorageTest {
@Override
public RecordOffset recordOffset() {
return DefaultRecordOffset.of(0, 0, 0);
return DefaultRecordOffset.of(0, record.getBaseOffset(), 0);
}
}
}

View File

@ -22,15 +22,18 @@ package com.automq.stream.s3.cache;
import com.automq.stream.s3.TestUtils;
import com.automq.stream.s3.cache.LogCache.LogCacheBlock;
import com.automq.stream.s3.model.StreamRecordBatch;
import com.automq.stream.s3.wal.impl.DefaultRecordOffset;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
@Tag("S3Unit")
@ -39,16 +42,17 @@ public class LogCacheTest {
@Test
public void testPutGet() {
LogCache logCache = new LogCache(1024 * 1024, 1024 * 1024);
long walOffset = 0L;
logCache.put(new StreamRecordBatch(233L, 0L, 10L, 1, TestUtils.random(20)));
logCache.put(new StreamRecordBatch(233L, 0L, 11L, 2, TestUtils.random(20)));
logCache.put(new StreamRecordBatch(233L, 0L, 10L, 1, TestUtils.random(20)), DefaultRecordOffset.of(0, walOffset++, 0));
logCache.put(new StreamRecordBatch(233L, 0L, 11L, 2, TestUtils.random(20)), DefaultRecordOffset.of(0, walOffset++, 0));
logCache.archiveCurrentBlock();
logCache.put(new StreamRecordBatch(233L, 0L, 13L, 2, TestUtils.random(20)));
logCache.put(new StreamRecordBatch(233L, 0L, 13L, 2, TestUtils.random(20)), DefaultRecordOffset.of(0, walOffset++, 0));
logCache.archiveCurrentBlock();
logCache.put(new StreamRecordBatch(233L, 0L, 20L, 1, TestUtils.random(20)));
logCache.put(new StreamRecordBatch(233L, 0L, 21L, 1, TestUtils.random(20)));
logCache.put(new StreamRecordBatch(233L, 0L, 20L, 1, TestUtils.random(20)), DefaultRecordOffset.of(0, walOffset++, 0));
logCache.put(new StreamRecordBatch(233L, 0L, 21L, 1, TestUtils.random(20)), DefaultRecordOffset.of(0, walOffset++, 0));
List<StreamRecordBatch> records = logCache.get(233L, 10L, 21L, 1000);
assertEquals(1, records.size());
@ -71,9 +75,9 @@ public class LogCacheTest {
@Test
public void testOffsetIndex() {
LogCache cache = new LogCache(Integer.MAX_VALUE, Integer.MAX_VALUE);
long walOffset = 0L;
for (int i = 0; i < 100000; i++) {
cache.put(new StreamRecordBatch(233L, 0L, i, 1, TestUtils.random(1)));
cache.put(new StreamRecordBatch(233L, 0L, i, 1, TestUtils.random(1)), DefaultRecordOffset.of(0, walOffset++, 0));
}
long start = System.nanoTime();
@ -89,14 +93,15 @@ public class LogCacheTest {
@Test
public void testClearStreamRecords() {
LogCache logCache = new LogCache(1024 * 1024, 1024 * 1024);
long walOffset = 0L;
logCache.put(new StreamRecordBatch(233L, 0L, 10L, 1, TestUtils.random(20)));
logCache.put(new StreamRecordBatch(233L, 0L, 11L, 2, TestUtils.random(20)));
logCache.put(new StreamRecordBatch(233L, 0L, 10L, 1, TestUtils.random(20)), DefaultRecordOffset.of(0, walOffset++, 0));
logCache.put(new StreamRecordBatch(233L, 0L, 11L, 2, TestUtils.random(20)), DefaultRecordOffset.of(0, walOffset++, 0));
logCache.archiveCurrentBlock();
logCache.put(new StreamRecordBatch(233L, 0L, 13L, 2, TestUtils.random(20)));
logCache.put(new StreamRecordBatch(233L, 0L, 13L, 2, TestUtils.random(20)), DefaultRecordOffset.of(0, walOffset++, 0));
logCache.put(new StreamRecordBatch(234L, 0L, 13L, 2, TestUtils.random(20)));
logCache.put(new StreamRecordBatch(234L, 0L, 13L, 2, TestUtils.random(20)), DefaultRecordOffset.of(0, walOffset++, 0));
assertTrue(logCache.blocks.get(0).containsStream(233L));
assertTrue(logCache.blocks.get(1).containsStream(234L));
@ -112,19 +117,21 @@ public class LogCacheTest {
@Test
public void testIsDiscontinuous() {
LogCacheBlock left = new LogCacheBlock(1024L * 1024);
left.put(new StreamRecordBatch(233L, 0L, 10L, 1, TestUtils.random(20)));
long walOffset = 0L;
left.put(new StreamRecordBatch(233L, 0L, 10L, 1, TestUtils.random(20)), DefaultRecordOffset.of(0, walOffset++, 0));
LogCacheBlock right = new LogCacheBlock(1024L * 1024);
right.put(new StreamRecordBatch(233L, 0L, 13L, 1, TestUtils.random(20)));
right.put(new StreamRecordBatch(233L, 0L, 13L, 1, TestUtils.random(20)), DefaultRecordOffset.of(0, walOffset++, 0));
assertTrue(LogCache.isDiscontinuous(left, right));
left = new LogCacheBlock(1024L * 1024);
left.put(new StreamRecordBatch(233L, 0L, 10L, 1, TestUtils.random(20)));
left.put(new StreamRecordBatch(234L, 0L, 10L, 1, TestUtils.random(20)));
walOffset = 0L;
left.put(new StreamRecordBatch(233L, 0L, 10L, 1, TestUtils.random(20)), DefaultRecordOffset.of(0, walOffset++, 0));
left.put(new StreamRecordBatch(234L, 0L, 10L, 1, TestUtils.random(20)), DefaultRecordOffset.of(0, walOffset++, 0));
right = new LogCacheBlock(1024L * 1024);
right.put(new StreamRecordBatch(233L, 0L, 11L, 1, TestUtils.random(20)));
right.put(new StreamRecordBatch(233L, 0L, 11L, 1, TestUtils.random(20)), DefaultRecordOffset.of(0, walOffset++, 0));
assertFalse(LogCache.isDiscontinuous(left, right));
}
@ -132,13 +139,14 @@ public class LogCacheTest {
public void testMergeBlock() {
long size = 0;
LogCacheBlock left = new LogCacheBlock(1024L * 1024);
left.put(new StreamRecordBatch(233L, 0L, 10L, 1, TestUtils.random(20)));
left.put(new StreamRecordBatch(234L, 0L, 100L, 1, TestUtils.random(20)));
long walOffset = 0L;
left.put(new StreamRecordBatch(233L, 0L, 10L, 1, TestUtils.random(20)), DefaultRecordOffset.of(0, walOffset++, 0));
left.put(new StreamRecordBatch(234L, 0L, 100L, 1, TestUtils.random(20)), DefaultRecordOffset.of(0, walOffset++, 0));
size += left.size();
LogCacheBlock right = new LogCacheBlock(1024L * 1024);
right.put(new StreamRecordBatch(233L, 0L, 11L, 1, TestUtils.random(20)));
right.put(new StreamRecordBatch(235L, 0L, 200L, 1, TestUtils.random(20)));
right.put(new StreamRecordBatch(233L, 0L, 11L, 1, TestUtils.random(20)), DefaultRecordOffset.of(0, walOffset++, 0));
right.put(new StreamRecordBatch(235L, 0L, 200L, 1, TestUtils.random(20)), DefaultRecordOffset.of(0, walOffset++, 0));
size += right.size();
LogCache.mergeBlock(left, right);
@ -165,4 +173,25 @@ public class LogCacheTest {
}
@Test
public void testTruncateStreamRecords() {
LogCache cache = new LogCache(1024 * 1024, 1024 * 1024);
long walOffset = 0L;
cache.put(new StreamRecordBatch(233L, 0L, 10L, 1, TestUtils.random(20)), DefaultRecordOffset.of(0, walOffset++, 0));
cache.put(new StreamRecordBatch(233L, 0L, 11L, 1, TestUtils.random(20)), DefaultRecordOffset.of(0, walOffset++, 0));
Optional<LogCache.TruncateResult> result = cache.truncateStreamRecords(233L, 12L);
assertTrue(result.isPresent());
LogCache.TruncateResult truncateResult = result.get();
assertTrue(truncateResult.freedBytes > 0);
assertNotNull(truncateResult.firstRemovedWalOffset);
assertEquals(1L, DefaultRecordOffset.of(truncateResult.firstRemovedWalOffset).offset());
List<StreamRecordBatch> remaining = cache.get(233L, 12L, 14L, 1000);
assertEquals(0, remaining.size());
assertTrue(cache.truncateStreamRecords(233L, 12L).isEmpty());
}
}