Compare commits
1 Commits
main
...
feat/trunc
| Author | SHA1 | Date |
|---|---|---|
|
|
684f3b55c3 |
|
|
@ -180,6 +180,11 @@ public class BootstrapWalV1 implements WriteAheadLog {
|
|||
return wal.trim(offset);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Void> truncateTail(RecordOffset offset) {
|
||||
return wal.truncateTail(offset);
|
||||
}
|
||||
|
||||
private CompletableFuture<? extends WriteAheadLog> buildRecoverWal(String kraftWalConfigs, long oldNodeEpoch) {
|
||||
IdURI uri = IdURI.parse(kraftWalConfigs);
|
||||
CompletableFuture<Void> cf = walHandle
|
||||
|
|
|
|||
|
|
@ -292,6 +292,11 @@ public class ClientWrapper implements Client {
|
|||
}, generalCallbackExecutors);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Void> truncateTail(long newNextOffset) {
|
||||
return failureHandle(stream.truncateTail(newNextOffset).thenApplyAsync(nil -> nil, streamManagerCallbackExecutors));
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Void> close() {
|
||||
return failureHandle(stream.close().thenApplyAsync(nil -> nil, streamManagerCallbackExecutors));
|
||||
|
|
|
|||
|
|
@ -144,6 +144,11 @@ public class LazyStream implements Stream {
|
|||
return inner.trim(newStartOffset);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Void> truncateTail(long newNextOffset) {
|
||||
return inner.truncateTail(newNextOffset);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<FetchResult> fetch(FetchContext context, long startOffset, long endOffset, int maxBytesHint) {
|
||||
return inner.fetch(context, startOffset, endOffset, maxBytesHint);
|
||||
|
|
@ -228,6 +233,11 @@ public class LazyStream implements Stream {
|
|||
return CompletableFuture.completedFuture(null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Void> truncateTail(long newNextOffset) {
|
||||
return CompletableFuture.completedFuture(null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<FetchResult> fetch(FetchContext context, long startOffset, long endOffset, int maxBytesHint) {
|
||||
return CompletableFuture.completedFuture(Collections::emptyList);
|
||||
|
|
|
|||
|
|
@ -165,6 +165,13 @@ public class MemoryClient implements Client {
|
|||
return CompletableFuture.completedFuture(null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized CompletableFuture<Void> truncateTail(long newNextOffset) {
|
||||
recordMap = new ConcurrentSkipListMap<>(recordMap.headMap(newNextOffset, false));
|
||||
nextOffsetAlloc.updateAndGet(current -> Math.min(current, newNextOffset));
|
||||
return CompletableFuture.completedFuture(null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Void> close() {
|
||||
return CompletableFuture.completedFuture(null);
|
||||
|
|
|
|||
|
|
@ -174,6 +174,12 @@ public class MetaStream implements Stream {
|
|||
return innerStream.trim(newStartOffset);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Void> truncateTail(long newNextOffset) {
|
||||
metaCache.entrySet().removeIf(entry -> entry.getValue().offset >= newNextOffset);
|
||||
return innerStream.truncateTail(newNextOffset);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Void> close() {
|
||||
if (compactionFuture != null) {
|
||||
|
|
|
|||
|
|
@ -99,6 +99,14 @@ public interface Stream {
|
|||
*/
|
||||
CompletableFuture<Void> trim(long newStartOffset);
|
||||
|
||||
/**
|
||||
* Truncate the tail of the stream so that subsequent appends start from {@code newNextOffset}.
|
||||
*
|
||||
* @param newNextOffset new next offset after truncation
|
||||
* @return future completing when truncation finishes
|
||||
*/
|
||||
CompletableFuture<Void> truncateTail(long newNextOffset);
|
||||
|
||||
/**
|
||||
* Close the stream.
|
||||
*/
|
||||
|
|
|
|||
|
|
@ -236,7 +236,7 @@ public class S3Storage implements Storage {
|
|||
) {
|
||||
RecordOffset logEndOffset = null;
|
||||
Map<Long, Long> streamNextOffsets = new HashMap<>();
|
||||
Map<Long, Queue<StreamRecordBatch>> streamDiscontinuousRecords = new HashMap<>();
|
||||
Map<Long, Queue<RecoverRecord>> streamDiscontinuousRecords = new HashMap<>();
|
||||
LogCache.LogCacheBlock cacheBlock = new LogCache.LogCacheBlock(maxCacheSize);
|
||||
|
||||
boolean first = true;
|
||||
|
|
@ -249,11 +249,11 @@ public class S3Storage implements Storage {
|
|||
first = false;
|
||||
}
|
||||
StreamRecordBatch streamRecordBatch = recoverResult.record();
|
||||
processRecoveredRecord(streamRecordBatch, openingStreamEndOffsets, streamDiscontinuousRecords, cacheBlock, streamNextOffsets, logger);
|
||||
processRecoveredRecord(streamRecordBatch, recoverResult.recordOffset(), openingStreamEndOffsets, streamDiscontinuousRecords, cacheBlock, streamNextOffsets, logger);
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
// {@link RuntimeIOException} may be thrown by {@code it.next()}
|
||||
releaseAllRecords(streamDiscontinuousRecords.values());
|
||||
releaseRecoverRecords(streamDiscontinuousRecords.values());
|
||||
releaseAllRecords(cacheBlock.records().values());
|
||||
throw e;
|
||||
}
|
||||
|
|
@ -278,8 +278,9 @@ public class S3Storage implements Storage {
|
|||
*/
|
||||
private static void processRecoveredRecord(
|
||||
StreamRecordBatch streamRecordBatch,
|
||||
RecordOffset recordOffset,
|
||||
Map<Long, Long> openingStreamEndOffsets,
|
||||
Map<Long, Queue<StreamRecordBatch>> streamDiscontinuousRecords,
|
||||
Map<Long, Queue<RecoverRecord>> streamDiscontinuousRecords,
|
||||
LogCache.LogCacheBlock cacheBlock,
|
||||
Map<Long, Long> streamNextOffsets,
|
||||
Logger logger
|
||||
|
|
@ -294,19 +295,19 @@ public class S3Storage implements Storage {
|
|||
}
|
||||
|
||||
Long expectedNextOffset = streamNextOffsets.get(streamId);
|
||||
Queue<StreamRecordBatch> discontinuousRecords = streamDiscontinuousRecords.get(streamId);
|
||||
Queue<RecoverRecord> discontinuousRecords = streamDiscontinuousRecords.get(streamId);
|
||||
boolean isContinuous = expectedNextOffset == null || expectedNextOffset == streamRecordBatch.getBaseOffset();
|
||||
if (!isContinuous) {
|
||||
// unexpected record, put it into discontinuous records queue.
|
||||
if (discontinuousRecords == null) {
|
||||
discontinuousRecords = new PriorityQueue<>(Comparator.comparingLong(StreamRecordBatch::getBaseOffset));
|
||||
discontinuousRecords = new PriorityQueue<>(Comparator.comparingLong(r -> r.record.getBaseOffset()));
|
||||
streamDiscontinuousRecords.put(streamId, discontinuousRecords);
|
||||
}
|
||||
discontinuousRecords.add(streamRecordBatch);
|
||||
discontinuousRecords.add(new RecoverRecord(streamRecordBatch, recordOffset));
|
||||
return;
|
||||
}
|
||||
// continuous record, put it into cache, and check if there is any historical discontinuous records can be polled.
|
||||
cacheBlock.put(streamRecordBatch);
|
||||
cacheBlock.put(streamRecordBatch, recordOffset);
|
||||
expectedNextOffset = maybePollDiscontinuousRecords(streamRecordBatch, cacheBlock, discontinuousRecords, logger);
|
||||
streamNextOffsets.put(streamId, expectedNextOffset);
|
||||
}
|
||||
|
|
@ -314,7 +315,7 @@ public class S3Storage implements Storage {
|
|||
private static long maybePollDiscontinuousRecords(
|
||||
StreamRecordBatch streamRecordBatch,
|
||||
LogCache.LogCacheBlock cacheBlock,
|
||||
Queue<StreamRecordBatch> discontinuousRecords,
|
||||
Queue<RecoverRecord> discontinuousRecords,
|
||||
Logger logger
|
||||
) {
|
||||
long expectedNextOffset = streamRecordBatch.getLastOffset();
|
||||
|
|
@ -323,25 +324,28 @@ public class S3Storage implements Storage {
|
|||
}
|
||||
// check and poll historical discontinuous records.
|
||||
while (!discontinuousRecords.isEmpty()) {
|
||||
StreamRecordBatch peek = discontinuousRecords.peek();
|
||||
if (peek.getBaseOffset() != expectedNextOffset) {
|
||||
RecoverRecord peek = discontinuousRecords.peek();
|
||||
if (peek.record.getBaseOffset() != expectedNextOffset) {
|
||||
break;
|
||||
}
|
||||
// should never happen, log it.
|
||||
logger.error("[BUG] recover an out of order record, streamId={}, expectedNextOffset={}, record={}", streamRecordBatch.getStreamId(), expectedNextOffset, peek);
|
||||
logger.error("[BUG] recover an out of order record, streamId={}, expectedNextOffset={}, record={}", streamRecordBatch.getStreamId(), expectedNextOffset, peek.record);
|
||||
discontinuousRecords.poll();
|
||||
cacheBlock.put(peek);
|
||||
expectedNextOffset = peek.getLastOffset();
|
||||
cacheBlock.put(peek.record, peek.walOffset);
|
||||
expectedNextOffset = peek.record.getLastOffset();
|
||||
}
|
||||
return expectedNextOffset;
|
||||
}
|
||||
|
||||
private static void releaseDiscontinuousRecords(Map<Long, Queue<StreamRecordBatch>> streamDiscontinuousRecords,
|
||||
private static void releaseDiscontinuousRecords(Map<Long, Queue<RecoverRecord>> streamDiscontinuousRecords,
|
||||
Logger logger) {
|
||||
streamDiscontinuousRecords.values().stream()
|
||||
.filter(q -> !q.isEmpty())
|
||||
.peek(q -> logger.info("drop discontinuous records, records={}", q))
|
||||
.forEach(S3Storage::releaseRecords);
|
||||
.forEach(queue -> {
|
||||
queue.forEach(record -> record.record.release());
|
||||
queue.clear();
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -375,7 +379,7 @@ public class S3Storage implements Storage {
|
|||
LogCache.LogCacheBlock newCacheBlock = new LogCache.LogCacheBlock(1024L * 1024 * 1024);
|
||||
cacheBlock.records().forEach((streamId, records) -> {
|
||||
if (!invalidStreams.contains(streamId)) {
|
||||
records.forEach(newCacheBlock::put);
|
||||
records.forEach(record -> newCacheBlock.put(record, null));
|
||||
} else {
|
||||
// release invalid records.
|
||||
releaseRecords(records);
|
||||
|
|
@ -388,6 +392,13 @@ public class S3Storage implements Storage {
|
|||
allRecords.forEach(S3Storage::releaseRecords);
|
||||
}
|
||||
|
||||
private static void releaseRecoverRecords(Collection<Queue<RecoverRecord>> allRecords) {
|
||||
allRecords.forEach(queue -> {
|
||||
queue.forEach(record -> record.record.release());
|
||||
queue.clear();
|
||||
});
|
||||
}
|
||||
|
||||
private static void releaseRecords(Collection<StreamRecordBatch> records) {
|
||||
records.forEach(StreamRecordBatch::release);
|
||||
}
|
||||
|
|
@ -648,6 +659,78 @@ public class S3Storage implements Storage {
|
|||
return snapshotReadCache;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Void> truncateTail(long streamId, long newNextOffset) {
|
||||
if (streamId < 0) {
|
||||
return CompletableFuture.failedFuture(new IllegalArgumentException("streamId must be non-negative"));
|
||||
}
|
||||
if (newNextOffset < 0) {
|
||||
return CompletableFuture.failedFuture(new IllegalArgumentException("newNextOffset must be non-negative"));
|
||||
}
|
||||
try {
|
||||
RecordOffset walOffset = prepareTruncateTail(streamId, newNextOffset);
|
||||
if (walOffset == null) {
|
||||
return CompletableFuture.completedFuture(null);
|
||||
}
|
||||
return deltaWAL.truncateTail(walOffset);
|
||||
} catch (Exception e) {
|
||||
return CompletableFuture.failedFuture(e);
|
||||
}
|
||||
}
|
||||
|
||||
private RecordOffset prepareTruncateTail(long streamId, long newNextOffset) {
|
||||
synchronized (this) {
|
||||
validateTruncateTailState(streamId);
|
||||
cancelBackoffRecords(streamId, newNextOffset);
|
||||
RecordOffset walOffset = selectFirstRemovedWalOffset(streamId, newNextOffset);
|
||||
if (walOffset != null) {
|
||||
deltaWALCache.setLastRecordOffset(walOffset);
|
||||
}
|
||||
return walOffset;
|
||||
}
|
||||
}
|
||||
|
||||
private void validateTruncateTailState(long streamId) {
|
||||
if (!walPrepareQueue.isEmpty() || !walCommitQueue.isEmpty()) {
|
||||
throw new IllegalStateException("Cannot truncate tail when WAL upload tasks are pending");
|
||||
}
|
||||
boolean hasInflight = inflightWALUploadTasks.stream().anyMatch(ctx -> ctx.cache.containsStream(streamId));
|
||||
if (hasInflight) {
|
||||
throw new IllegalStateException("Cannot truncate tail while stream has inflight WAL uploads");
|
||||
}
|
||||
}
|
||||
|
||||
private void cancelBackoffRecords(long streamId, long newNextOffset) {
|
||||
Iterator<WalWriteRequest> iterator = backoffRecords.iterator();
|
||||
while (iterator.hasNext()) {
|
||||
WalWriteRequest request = iterator.next();
|
||||
StreamRecordBatch record = request.record;
|
||||
if (record.getStreamId() >= 0 && record.getStreamId() == streamId && record.getBaseOffset() >= newNextOffset) {
|
||||
iterator.remove();
|
||||
request.cf.completeExceptionally(new IllegalStateException("Append cancelled due to truncate"));
|
||||
record.release();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private RecordOffset selectFirstRemovedWalOffset(long streamId, long newNextOffset) {
|
||||
Optional<LogCache.TruncateResult> deltaResult = deltaWALCache.truncateStreamRecords(streamId, newNextOffset);
|
||||
Optional<LogCache.TruncateResult> snapshotResult = snapshotReadCache == null
|
||||
? Optional.empty()
|
||||
: snapshotReadCache.truncateStreamRecords(streamId, newNextOffset);
|
||||
RecordOffset walOffset = deltaResult
|
||||
.map(r -> r.firstRemovedWalOffset)
|
||||
.filter(Objects::nonNull)
|
||||
.orElse(null);
|
||||
if (walOffset == null) {
|
||||
walOffset = snapshotResult
|
||||
.map(r -> r.firstRemovedWalOffset)
|
||||
.filter(Objects::nonNull)
|
||||
.orElse(null);
|
||||
}
|
||||
return walOffset;
|
||||
}
|
||||
|
||||
@SuppressWarnings({"checkstyle:npathcomplexity"})
|
||||
@WithSpan
|
||||
private CompletableFuture<ReadDataBlock> read0(FetchContext context,
|
||||
|
|
@ -806,7 +889,7 @@ public class S3Storage implements Storage {
|
|||
private void handleAppendCallback0(WalWriteRequest request) {
|
||||
final long startTime = System.nanoTime();
|
||||
request.record.retain();
|
||||
boolean full = deltaWALCache.put(request.record);
|
||||
boolean full = deltaWALCache.put(request.record, request.offset);
|
||||
deltaWALCache.setLastRecordOffset(request.offset);
|
||||
if (full) {
|
||||
// cache block is full, trigger WAL upload.
|
||||
|
|
@ -1062,6 +1145,16 @@ public class S3Storage implements Storage {
|
|||
}
|
||||
}
|
||||
|
||||
static class RecoverRecord {
|
||||
final StreamRecordBatch record;
|
||||
final RecordOffset walOffset;
|
||||
|
||||
RecoverRecord(StreamRecordBatch record, RecordOffset walOffset) {
|
||||
this.record = record;
|
||||
this.walOffset = walOffset;
|
||||
}
|
||||
}
|
||||
|
||||
public static class LazyCommit {
|
||||
final CompletableFuture<Void> cf = new CompletableFuture<>();
|
||||
final long lazyLingerMs;
|
||||
|
|
|
|||
|
|
@ -55,6 +55,7 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.CompletionException;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentLinkedDeque;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
|
@ -365,6 +366,78 @@ public class S3Stream implements Stream, StreamMetadataListener {
|
|||
return trimCf;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Void> truncateTail(long newNextOffset) {
|
||||
if (snapshotRead()) {
|
||||
return FutureUtil.failedFuture(new IllegalStateException("truncateTail is not support for readonly stream"));
|
||||
}
|
||||
try {
|
||||
truncateTailSync(newNextOffset);
|
||||
return CompletableFuture.completedFuture(null);
|
||||
} catch (RuntimeException e) {
|
||||
return CompletableFuture.failedFuture(e);
|
||||
}
|
||||
}
|
||||
|
||||
private void truncateTailSync(long newNextOffset) {
|
||||
writeLock.lock();
|
||||
try {
|
||||
ensureTruncateTailAllowed(newNextOffset);
|
||||
waitForInFlightOperations();
|
||||
invokeStorageTruncate(newNextOffset);
|
||||
nextOffset.updateAndGet(old -> Math.min(old, newNextOffset));
|
||||
confirmOffset.updateAndGet(old -> Math.min(old, newNextOffset));
|
||||
} finally {
|
||||
writeLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
private void ensureTruncateTailAllowed(long newNextOffset) {
|
||||
if (!status.isWritable()) {
|
||||
throw new StreamClientException(ErrorCode.STREAM_ALREADY_CLOSED, logIdent + "stream is not writable");
|
||||
}
|
||||
if (newNextOffset < startOffset()) {
|
||||
throw new IllegalArgumentException("newNextOffset " + newNextOffset + " is less than start offset " + startOffset());
|
||||
}
|
||||
long currentNext = nextOffset.get();
|
||||
if (newNextOffset > currentNext) {
|
||||
throw new IllegalArgumentException("newNextOffset " + newNextOffset + " is greater than current next offset " + currentNext);
|
||||
}
|
||||
}
|
||||
|
||||
private void waitForInFlightOperations() {
|
||||
List<CompletableFuture<?>> waits = new ArrayList<>(pendingAppends);
|
||||
waits.addAll(pendingFetches);
|
||||
if (lastAppendFuture != null) {
|
||||
waits.add(lastAppendFuture);
|
||||
}
|
||||
if (waits.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
CompletableFuture<?>[] array = waits.toArray(new CompletableFuture[0]);
|
||||
try {
|
||||
CompletableFuture.allOf(array).join();
|
||||
} catch (CompletionException e) {
|
||||
throw wrapCompletionException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private void invokeStorageTruncate(long newNextOffset) {
|
||||
try {
|
||||
storage.truncateTail(streamId, newNextOffset).join();
|
||||
} catch (CompletionException e) {
|
||||
throw wrapCompletionException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private RuntimeException wrapCompletionException(CompletionException exception) {
|
||||
Throwable cause = FutureUtil.cause(exception);
|
||||
if (cause instanceof RuntimeException) {
|
||||
return (RuntimeException) cause;
|
||||
}
|
||||
return new RuntimeException(cause);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Void> close() {
|
||||
return close(false);
|
||||
|
|
|
|||
|
|
@ -351,6 +351,11 @@ public class S3StreamClient implements StreamClient {
|
|||
return stream.trim(newStartOffset);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Void> truncateTail(long newNextOffset) {
|
||||
return stream.truncateTail(newNextOffset);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Void> close() {
|
||||
return close(false);
|
||||
|
|
|
|||
|
|
@ -57,4 +57,13 @@ public interface Storage {
|
|||
* Force stream record in WAL upload to s3
|
||||
*/
|
||||
CompletableFuture<Void> forceUpload(long streamId);
|
||||
|
||||
/**
|
||||
* Rollback the tail of a stream so that subsequent appends start from {@code newNextOffset}.
|
||||
*
|
||||
* @param streamId stream identifier
|
||||
* @param newNextOffset new next offset after rollback
|
||||
* @return future complete when rollback finished
|
||||
*/
|
||||
CompletableFuture<Void> truncateTail(long streamId, long newNextOffset);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -24,6 +24,7 @@ import com.automq.stream.s3.metrics.stats.StorageOperationStats;
|
|||
import com.automq.stream.s3.model.StreamRecordBatch;
|
||||
import com.automq.stream.s3.trace.context.TraceContext;
|
||||
import com.automq.stream.s3.wal.RecordOffset;
|
||||
import com.automq.stream.s3.wal.impl.DefaultRecordOffset;
|
||||
import com.automq.stream.utils.biniarysearch.StreamRecordBatchList;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
|
|
@ -36,6 +37,7 @@ import java.util.Iterator;
|
|||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
|
@ -92,14 +94,14 @@ public class LogCache {
|
|||
* Put a record batch into the cache.
|
||||
* record batched in the same stream should be put in order.
|
||||
*/
|
||||
public boolean put(StreamRecordBatch recordBatch) {
|
||||
public boolean put(StreamRecordBatch recordBatch, RecordOffset walOffset) {
|
||||
long startTime = System.nanoTime();
|
||||
tryRealFree();
|
||||
size.addAndGet(recordBatch.occupiedSize());
|
||||
readLock.lock();
|
||||
boolean full;
|
||||
try {
|
||||
full = activeBlock.put(recordBatch);
|
||||
full = activeBlock.put(recordBatch, walOffset);
|
||||
} finally {
|
||||
readLock.unlock();
|
||||
}
|
||||
|
|
@ -356,6 +358,37 @@ public class LogCache {
|
|||
}
|
||||
}
|
||||
|
||||
public Optional<TruncateResult> truncateStreamRecords(long streamId, long newNextOffset) {
|
||||
if (streamId == MATCH_ALL_STREAMS) {
|
||||
throw new IllegalArgumentException("streamId must not be MATCH_ALL_STREAMS");
|
||||
}
|
||||
List<TruncateResult> results = new ArrayList<>();
|
||||
readLock.lock();
|
||||
try {
|
||||
for (LogCacheBlock block : blocks) {
|
||||
TruncateResult result = block.truncateStream(streamId, newNextOffset);
|
||||
if (result != null) {
|
||||
results.add(result);
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
readLock.unlock();
|
||||
}
|
||||
if (results.isEmpty()) {
|
||||
return Optional.empty();
|
||||
}
|
||||
long freedBytes = results.stream().mapToLong(r -> r.freedBytes).sum();
|
||||
if (freedBytes > 0) {
|
||||
size.addAndGet(-freedBytes);
|
||||
}
|
||||
RecordOffset earliest = results.stream()
|
||||
.map(r -> r.firstRemovedWalOffset)
|
||||
.filter(Objects::nonNull)
|
||||
.min((a, b) -> Long.compare(DefaultRecordOffset.of(a).offset(), DefaultRecordOffset.of(b).offset()))
|
||||
.orElse(null);
|
||||
return Optional.of(new TruncateResult(freedBytes, earliest));
|
||||
}
|
||||
|
||||
static boolean isDiscontinuous(LogCacheBlock left, LogCacheBlock right) {
|
||||
for (Map.Entry<Long, StreamCache> entry : left.map.entrySet()) {
|
||||
Long streamId = entry.getKey();
|
||||
|
|
@ -382,6 +415,7 @@ public class LogCache {
|
|||
StreamCache rightStreamCache = right.map.get(streamId);
|
||||
if (rightStreamCache != null) {
|
||||
leftStreamCache.records.addAll(rightStreamCache.records);
|
||||
leftStreamCache.walOffsets.addAll(rightStreamCache.walOffsets);
|
||||
leftStreamCache.endOffset(rightStreamCache.endOffset());
|
||||
}
|
||||
});
|
||||
|
|
@ -393,6 +427,16 @@ public class LogCache {
|
|||
}
|
||||
}
|
||||
|
||||
public static class TruncateResult {
|
||||
public final long freedBytes;
|
||||
public final RecordOffset firstRemovedWalOffset;
|
||||
|
||||
public TruncateResult(long freedBytes, RecordOffset firstRemovedWalOffset) {
|
||||
this.freedBytes = freedBytes;
|
||||
this.firstRemovedWalOffset = firstRemovedWalOffset;
|
||||
}
|
||||
}
|
||||
|
||||
public static class LogCacheBlock {
|
||||
private static final AtomicLong BLOCK_ID_ALLOC = new AtomicLong();
|
||||
final Map<Long, StreamCache> map = new ConcurrentHashMap<>();
|
||||
|
|
@ -423,12 +467,12 @@ public class LogCache {
|
|||
return size.get() >= maxSize || map.size() >= maxStreamCount;
|
||||
}
|
||||
|
||||
public boolean put(StreamRecordBatch recordBatch) {
|
||||
public boolean put(StreamRecordBatch recordBatch, RecordOffset walOffset) {
|
||||
map.compute(recordBatch.getStreamId(), (id, cache) -> {
|
||||
if (cache == null) {
|
||||
cache = new StreamCache();
|
||||
}
|
||||
cache.add(recordBatch);
|
||||
cache.add(recordBatch, walOffset);
|
||||
return cache;
|
||||
});
|
||||
size.addAndGet(recordBatch.occupiedSize());
|
||||
|
|
@ -452,6 +496,22 @@ public class LogCache {
|
|||
}
|
||||
}
|
||||
|
||||
TruncateResult truncateStream(long streamId, long newNextOffset) {
|
||||
StreamCache cache = map.get(streamId);
|
||||
if (cache == null) {
|
||||
return null;
|
||||
}
|
||||
TruncateResult result = cache.truncate(newNextOffset);
|
||||
if (result == null) {
|
||||
return null;
|
||||
}
|
||||
size.addAndGet(-result.freedBytes);
|
||||
if (cache.isEmpty()) {
|
||||
map.remove(streamId);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
public Map<Long, List<StreamRecordBatch>> records() {
|
||||
return map.entrySet().stream()
|
||||
.map(e -> Map.entry(e.getKey(), e.getValue().records))
|
||||
|
|
@ -549,17 +609,19 @@ public class LogCache {
|
|||
|
||||
static class StreamCache {
|
||||
List<StreamRecordBatch> records = new ArrayList<>();
|
||||
List<RecordOffset> walOffsets = new ArrayList<>();
|
||||
long startOffset = NOOP_OFFSET;
|
||||
long endOffset = NOOP_OFFSET;
|
||||
Map<Long, IndexAndCount> offsetIndexMap = new HashMap<>();
|
||||
|
||||
synchronized void add(StreamRecordBatch recordBatch) {
|
||||
synchronized void add(StreamRecordBatch recordBatch, RecordOffset walOffset) {
|
||||
if (recordBatch.getBaseOffset() != endOffset && endOffset != NOOP_OFFSET) {
|
||||
RuntimeException ex = new IllegalArgumentException(String.format("streamId=%s record batch base offset mismatch, expect %s, actual %s",
|
||||
recordBatch.getStreamId(), endOffset, recordBatch.getBaseOffset()));
|
||||
LOGGER.error("[FATAL]", ex);
|
||||
}
|
||||
records.add(recordBatch);
|
||||
walOffsets.add(walOffset);
|
||||
if (startOffset == NOOP_OFFSET) {
|
||||
startOffset = recordBatch.getBaseOffset();
|
||||
}
|
||||
|
|
@ -593,6 +655,44 @@ public class LogCache {
|
|||
return new ArrayList<>(records.subList(startIndex, endIndex));
|
||||
}
|
||||
|
||||
synchronized TruncateResult truncate(long newNextOffset) {
|
||||
if (records.isEmpty()) {
|
||||
return null;
|
||||
}
|
||||
int removeFrom = -1;
|
||||
for (int i = 0; i < records.size(); i++) {
|
||||
StreamRecordBatch record = records.get(i);
|
||||
if (record.getLastOffset() >= newNextOffset) {
|
||||
removeFrom = i;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (removeFrom == -1) {
|
||||
return null;
|
||||
}
|
||||
RecordOffset firstRemoved = walOffsets.get(removeFrom);
|
||||
long freed = 0L;
|
||||
for (int i = records.size() - 1; i >= removeFrom; i--) {
|
||||
StreamRecordBatch record = records.remove(i);
|
||||
freed += record.occupiedSize();
|
||||
record.release();
|
||||
walOffsets.remove(i);
|
||||
}
|
||||
Iterator<Map.Entry<Long, IndexAndCount>> iterator = offsetIndexMap.entrySet().iterator();
|
||||
while (iterator.hasNext()) {
|
||||
if (iterator.next().getKey() >= newNextOffset) {
|
||||
iterator.remove();
|
||||
}
|
||||
}
|
||||
if (records.isEmpty()) {
|
||||
startOffset = endOffset = NOOP_OFFSET;
|
||||
} else {
|
||||
startOffset = records.get(0).getBaseOffset();
|
||||
endOffset = records.get(records.size() - 1).getLastOffset();
|
||||
}
|
||||
return new TruncateResult(freed, firstRemoved);
|
||||
}
|
||||
|
||||
int searchStartIndex(long startOffset) {
|
||||
IndexAndCount indexAndCount = offsetIndexMap.get(startOffset);
|
||||
if (indexAndCount != null) {
|
||||
|
|
@ -629,6 +729,11 @@ public class LogCache {
|
|||
synchronized void free() {
|
||||
records.forEach(StreamRecordBatch::release);
|
||||
records.clear();
|
||||
walOffsets.clear();
|
||||
}
|
||||
|
||||
synchronized boolean isEmpty() {
|
||||
return records.isEmpty();
|
||||
}
|
||||
|
||||
synchronized long startOffset() {
|
||||
|
|
|
|||
|
|
@ -33,6 +33,7 @@ import java.util.HashSet;
|
|||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Queue;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
|
|
@ -112,7 +113,7 @@ public class SnapshotReadCache {
|
|||
// The LogCacheBlock doesn't accept discontinuous record batches.
|
||||
cache.clearStreamRecords(streamId);
|
||||
}
|
||||
if (cache.put(batch)) {
|
||||
if (cache.put(batch, null)) {
|
||||
// the block is full
|
||||
LogCache.LogCacheBlock cacheBlock = cache.archiveCurrentBlock();
|
||||
cacheBlock.addFreeListener(cacheFreeListener);
|
||||
|
|
@ -138,6 +139,13 @@ public class SnapshotReadCache {
|
|||
cacheFreeListener.addListener(eventListener);
|
||||
}
|
||||
|
||||
public Optional<LogCache.TruncateResult> truncateStreamRecords(long streamId, long newNextOffset) {
|
||||
if (cache == null) {
|
||||
return Optional.empty();
|
||||
}
|
||||
return cache.truncateStreamRecords(streamId, newNextOffset);
|
||||
}
|
||||
|
||||
@EventLoopSafe
|
||||
private void clearStream(Long streamId) {
|
||||
cache.clearStreamRecords(streamId);
|
||||
|
|
|
|||
|
|
@ -84,4 +84,14 @@ public interface WriteAheadLog {
|
|||
* @return future complete when trim done.
|
||||
*/
|
||||
CompletableFuture<Void> trim(RecordOffset offset);
|
||||
|
||||
/**
|
||||
* Truncate wal tail so that all records with offset greater than or equal to {@code offset}
|
||||
* are discarded. After completion, {@link #confirmOffset()} and {@code offset} should align
|
||||
* with the new logical end of the wal.
|
||||
*
|
||||
* @param offset new next offset after truncate.
|
||||
* @return future completes when truncate is done.
|
||||
*/
|
||||
CompletableFuture<Void> truncateTail(RecordOffset offset);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -135,4 +135,16 @@ public class MemoryWriteAheadLog implements WriteAheadLog {
|
|||
});
|
||||
return CompletableFuture.completedFuture(null);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Void> truncateTail(RecordOffset offset) {
|
||||
long targetOffset = DefaultRecordOffset.of(offset).offset();
|
||||
dataMap.tailMap(targetOffset, true)
|
||||
.forEach((key, value) -> {
|
||||
dataMap.remove(key);
|
||||
value.release();
|
||||
});
|
||||
offsetAlloc.updateAndGet(current -> Math.min(current, targetOffset));
|
||||
return CompletableFuture.completedFuture(null);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -47,8 +47,10 @@ import org.slf4j.Logger;
|
|||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Queue;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
|
|
@ -443,6 +445,112 @@ public class DefaultWriter implements Writer {
|
|||
return trim0(newStartOffset);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Void> truncateTail(RecordOffset recordOffset) throws WALFencedException {
|
||||
try {
|
||||
TailTruncatePlan plan = prepareTruncatePlan(DefaultRecordOffset.of(recordOffset).offset());
|
||||
if (plan.noop || plan.deleteObjects.isEmpty()) {
|
||||
return CompletableFuture.completedFuture(null);
|
||||
}
|
||||
return objectStorage.delete(plan.deleteObjects).whenComplete((nil, throwable) -> {
|
||||
if (throwable != null) {
|
||||
LOGGER.error("Failed to delete WAL objects when truncating tail: {}", plan.deleteObjects, throwable);
|
||||
}
|
||||
});
|
||||
} catch (Throwable t) {
|
||||
return CompletableFuture.failedFuture(t);
|
||||
}
|
||||
}
|
||||
|
||||
private TailTruncatePlan prepareTruncatePlan(long targetOffset) throws WALFencedException {
|
||||
checkStatus();
|
||||
if (targetOffset < 0) {
|
||||
throw new IllegalArgumentException("targetOffset must be non-negative");
|
||||
}
|
||||
lock.writeLock().lock();
|
||||
try {
|
||||
if (targetOffset > nextOffset.get()) {
|
||||
throw new IllegalArgumentException("targetOffset " + targetOffset + " is greater than current nextOffset " + nextOffset.get());
|
||||
}
|
||||
if (targetOffset == nextOffset.get()) {
|
||||
return TailTruncatePlan.noop();
|
||||
}
|
||||
if (targetOffset < trimOffset.get()) {
|
||||
throw new IllegalArgumentException("targetOffset " + targetOffset + " is less than trimmed offset " + trimOffset.get());
|
||||
}
|
||||
if (activeBulk != null || !waitingUploadBulks.isEmpty() || !uploadingBulks.isEmpty()) {
|
||||
throw new IllegalStateException("Cannot truncate tail while there are pending bulks");
|
||||
}
|
||||
|
||||
List<ObjectStorage.ObjectPath> deleteObjects = new ArrayList<>();
|
||||
long deletedBytes = collectTailObjects(targetOffset, deleteObjects);
|
||||
deletedBytes += collectHistoricalObjects(targetOffset, deleteObjects);
|
||||
|
||||
objectDataBytes.addAndGet(-deletedBytes);
|
||||
long alignedTarget = ObjectUtils.ceilAlignOffset(targetOffset);
|
||||
nextOffset.set(alignedTarget);
|
||||
flushedOffset.updateAndGet(current -> Math.min(current, alignedTarget));
|
||||
return new TailTruncatePlan(deleteObjects.isEmpty(), deleteObjects);
|
||||
} finally {
|
||||
lock.writeLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
private long collectTailObjects(long targetOffset, List<ObjectStorage.ObjectPath> deleteObjects) {
|
||||
long deletedSize = 0L;
|
||||
List<Long> keysToRemove = new ArrayList<>();
|
||||
for (Map.Entry<Long, WALObject> entry : lastRecordOffset2object.tailMap(targetOffset, true).entrySet()) {
|
||||
WALObject object = entry.getValue();
|
||||
ensureNotWithinObject(targetOffset, object);
|
||||
if (object.startOffset() >= targetOffset) {
|
||||
deleteObjects.add(new ObjectStorage.ObjectPath(object.bucketId(), object.path()));
|
||||
deletedSize += object.length();
|
||||
keysToRemove.add(entry.getKey());
|
||||
}
|
||||
}
|
||||
keysToRemove.forEach(lastRecordOffset2object::remove);
|
||||
return deletedSize;
|
||||
}
|
||||
|
||||
private long collectHistoricalObjects(long targetOffset, List<ObjectStorage.ObjectPath> deleteObjects) {
|
||||
if (previousObjects.isEmpty()) {
|
||||
return 0L;
|
||||
}
|
||||
long deletedSize = 0L;
|
||||
List<WALObject> retained = new ArrayList<>(previousObjects.size());
|
||||
for (WALObject object : previousObjects) {
|
||||
ensureNotWithinObject(targetOffset, object);
|
||||
if (object.startOffset() >= targetOffset) {
|
||||
deleteObjects.add(new ObjectStorage.ObjectPath(object.bucketId(), object.path()));
|
||||
deletedSize += object.length();
|
||||
} else {
|
||||
retained.add(object);
|
||||
}
|
||||
}
|
||||
previousObjects = retained;
|
||||
return deletedSize;
|
||||
}
|
||||
|
||||
private static class TailTruncatePlan {
|
||||
final boolean noop;
|
||||
final List<ObjectStorage.ObjectPath> deleteObjects;
|
||||
|
||||
TailTruncatePlan(boolean noop, List<ObjectStorage.ObjectPath> deleteObjects) {
|
||||
this.noop = noop;
|
||||
this.deleteObjects = deleteObjects;
|
||||
}
|
||||
|
||||
static TailTruncatePlan noop() {
|
||||
return new TailTruncatePlan(true, Collections.emptyList());
|
||||
}
|
||||
}
|
||||
|
||||
private void ensureNotWithinObject(long targetOffset, WALObject object) {
|
||||
if (targetOffset > object.startOffset() && targetOffset < object.endOffset()) {
|
||||
throw new IllegalArgumentException("targetOffset " + targetOffset + " falls inside WAL object " + object);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Iterator<RecoverResult> recover() {
|
||||
try {
|
||||
|
|
|
|||
|
|
@ -62,6 +62,11 @@ public class NoopWriter implements Writer {
|
|||
return CompletableFuture.failedFuture(new UnsupportedOperationException("trim is not supported"));
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Void> truncateTail(RecordOffset recordOffset) throws WALFencedException {
|
||||
return CompletableFuture.failedFuture(new UnsupportedOperationException("truncateTail is not supported"));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Iterator<RecoverResult> recover() {
|
||||
throw new UnsupportedOperationException("recover is not supported");
|
||||
|
|
|
|||
|
|
@ -125,6 +125,17 @@ public class ObjectWALService implements WriteAheadLog {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Void> truncateTail(RecordOffset offset) {
|
||||
log.info("Truncate tail of S3 WAL to offset: {}", offset);
|
||||
try {
|
||||
return writer.truncateTail(offset);
|
||||
} catch (Throwable e) {
|
||||
log.error("Truncate tail of S3 WAL failed, due to unrecoverable exception.", e);
|
||||
return CompletableFuture.failedFuture(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return String.format("ObjectWALService{%s@%s-%s-%s}", config.bucketId(), config.nodeId(), config.epoch(), config.type());
|
||||
|
|
|
|||
|
|
@ -42,5 +42,7 @@ public interface Writer {
|
|||
|
||||
CompletableFuture<Void> trim(RecordOffset recordOffset) throws WALFencedException;
|
||||
|
||||
CompletableFuture<Void> truncateTail(RecordOffset recordOffset) throws WALFencedException;
|
||||
|
||||
Iterator<RecoverResult> recover();
|
||||
}
|
||||
|
|
|
|||
|
|
@ -58,6 +58,7 @@ import java.util.concurrent.ExecutionException;
|
|||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import static com.automq.stream.s3.TestUtils.random;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
|
|
@ -153,14 +154,15 @@ public class S3StorageTest {
|
|||
Mockito.doAnswer(invocation -> commitCfList.get(commitCfIndex.getAndIncrement())).when(objectManager).commitStreamSetObject(any());
|
||||
|
||||
LogCache.LogCacheBlock logCacheBlock1 = new LogCache.LogCacheBlock(1024);
|
||||
logCacheBlock1.put(newRecord(233L, 10L));
|
||||
logCacheBlock1.put(newRecord(234L, 10L));
|
||||
AtomicLong walOffsetGen = new AtomicLong();
|
||||
logCacheBlock1.put(newRecord(233L, 10L), DefaultRecordOffset.of(0, walOffsetGen.getAndIncrement(), 0));
|
||||
logCacheBlock1.put(newRecord(234L, 10L), DefaultRecordOffset.of(0, walOffsetGen.getAndIncrement(), 0));
|
||||
logCacheBlock1.lastRecordOffset(DefaultRecordOffset.of(0, 10L, 0));
|
||||
CompletableFuture<Void> cf1 = storage.uploadDeltaWAL(logCacheBlock1);
|
||||
|
||||
LogCache.LogCacheBlock logCacheBlock2 = new LogCache.LogCacheBlock(1024);
|
||||
logCacheBlock2.put(newRecord(233L, 20L));
|
||||
logCacheBlock2.put(newRecord(234L, 20L));
|
||||
logCacheBlock2.put(newRecord(233L, 20L), DefaultRecordOffset.of(0, walOffsetGen.getAndIncrement(), 0));
|
||||
logCacheBlock2.put(newRecord(234L, 20L), DefaultRecordOffset.of(0, walOffsetGen.getAndIncrement(), 0));
|
||||
logCacheBlock2.lastRecordOffset(DefaultRecordOffset.of(0, 20L, 0));
|
||||
CompletableFuture<Void> cf2 = storage.uploadDeltaWAL(logCacheBlock2);
|
||||
|
||||
|
|
@ -331,7 +333,7 @@ public class S3StorageTest {
|
|||
|
||||
@Override
|
||||
public RecordOffset recordOffset() {
|
||||
return DefaultRecordOffset.of(0, 0, 0);
|
||||
return DefaultRecordOffset.of(0, record.getBaseOffset(), 0);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -22,15 +22,18 @@ package com.automq.stream.s3.cache;
|
|||
import com.automq.stream.s3.TestUtils;
|
||||
import com.automq.stream.s3.cache.LogCache.LogCacheBlock;
|
||||
import com.automq.stream.s3.model.StreamRecordBatch;
|
||||
import com.automq.stream.s3.wal.impl.DefaultRecordOffset;
|
||||
|
||||
import org.junit.jupiter.api.Tag;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
@Tag("S3Unit")
|
||||
|
|
@ -39,16 +42,17 @@ public class LogCacheTest {
|
|||
@Test
|
||||
public void testPutGet() {
|
||||
LogCache logCache = new LogCache(1024 * 1024, 1024 * 1024);
|
||||
long walOffset = 0L;
|
||||
|
||||
logCache.put(new StreamRecordBatch(233L, 0L, 10L, 1, TestUtils.random(20)));
|
||||
logCache.put(new StreamRecordBatch(233L, 0L, 11L, 2, TestUtils.random(20)));
|
||||
logCache.put(new StreamRecordBatch(233L, 0L, 10L, 1, TestUtils.random(20)), DefaultRecordOffset.of(0, walOffset++, 0));
|
||||
logCache.put(new StreamRecordBatch(233L, 0L, 11L, 2, TestUtils.random(20)), DefaultRecordOffset.of(0, walOffset++, 0));
|
||||
|
||||
logCache.archiveCurrentBlock();
|
||||
logCache.put(new StreamRecordBatch(233L, 0L, 13L, 2, TestUtils.random(20)));
|
||||
logCache.put(new StreamRecordBatch(233L, 0L, 13L, 2, TestUtils.random(20)), DefaultRecordOffset.of(0, walOffset++, 0));
|
||||
|
||||
logCache.archiveCurrentBlock();
|
||||
logCache.put(new StreamRecordBatch(233L, 0L, 20L, 1, TestUtils.random(20)));
|
||||
logCache.put(new StreamRecordBatch(233L, 0L, 21L, 1, TestUtils.random(20)));
|
||||
logCache.put(new StreamRecordBatch(233L, 0L, 20L, 1, TestUtils.random(20)), DefaultRecordOffset.of(0, walOffset++, 0));
|
||||
logCache.put(new StreamRecordBatch(233L, 0L, 21L, 1, TestUtils.random(20)), DefaultRecordOffset.of(0, walOffset++, 0));
|
||||
|
||||
List<StreamRecordBatch> records = logCache.get(233L, 10L, 21L, 1000);
|
||||
assertEquals(1, records.size());
|
||||
|
|
@ -71,9 +75,9 @@ public class LogCacheTest {
|
|||
@Test
|
||||
public void testOffsetIndex() {
|
||||
LogCache cache = new LogCache(Integer.MAX_VALUE, Integer.MAX_VALUE);
|
||||
|
||||
long walOffset = 0L;
|
||||
for (int i = 0; i < 100000; i++) {
|
||||
cache.put(new StreamRecordBatch(233L, 0L, i, 1, TestUtils.random(1)));
|
||||
cache.put(new StreamRecordBatch(233L, 0L, i, 1, TestUtils.random(1)), DefaultRecordOffset.of(0, walOffset++, 0));
|
||||
}
|
||||
|
||||
long start = System.nanoTime();
|
||||
|
|
@ -89,14 +93,15 @@ public class LogCacheTest {
|
|||
@Test
|
||||
public void testClearStreamRecords() {
|
||||
LogCache logCache = new LogCache(1024 * 1024, 1024 * 1024);
|
||||
long walOffset = 0L;
|
||||
|
||||
logCache.put(new StreamRecordBatch(233L, 0L, 10L, 1, TestUtils.random(20)));
|
||||
logCache.put(new StreamRecordBatch(233L, 0L, 11L, 2, TestUtils.random(20)));
|
||||
logCache.put(new StreamRecordBatch(233L, 0L, 10L, 1, TestUtils.random(20)), DefaultRecordOffset.of(0, walOffset++, 0));
|
||||
logCache.put(new StreamRecordBatch(233L, 0L, 11L, 2, TestUtils.random(20)), DefaultRecordOffset.of(0, walOffset++, 0));
|
||||
|
||||
logCache.archiveCurrentBlock();
|
||||
logCache.put(new StreamRecordBatch(233L, 0L, 13L, 2, TestUtils.random(20)));
|
||||
logCache.put(new StreamRecordBatch(233L, 0L, 13L, 2, TestUtils.random(20)), DefaultRecordOffset.of(0, walOffset++, 0));
|
||||
|
||||
logCache.put(new StreamRecordBatch(234L, 0L, 13L, 2, TestUtils.random(20)));
|
||||
logCache.put(new StreamRecordBatch(234L, 0L, 13L, 2, TestUtils.random(20)), DefaultRecordOffset.of(0, walOffset++, 0));
|
||||
|
||||
assertTrue(logCache.blocks.get(0).containsStream(233L));
|
||||
assertTrue(logCache.blocks.get(1).containsStream(234L));
|
||||
|
|
@ -112,19 +117,21 @@ public class LogCacheTest {
|
|||
@Test
|
||||
public void testIsDiscontinuous() {
|
||||
LogCacheBlock left = new LogCacheBlock(1024L * 1024);
|
||||
left.put(new StreamRecordBatch(233L, 0L, 10L, 1, TestUtils.random(20)));
|
||||
long walOffset = 0L;
|
||||
left.put(new StreamRecordBatch(233L, 0L, 10L, 1, TestUtils.random(20)), DefaultRecordOffset.of(0, walOffset++, 0));
|
||||
|
||||
LogCacheBlock right = new LogCacheBlock(1024L * 1024);
|
||||
right.put(new StreamRecordBatch(233L, 0L, 13L, 1, TestUtils.random(20)));
|
||||
right.put(new StreamRecordBatch(233L, 0L, 13L, 1, TestUtils.random(20)), DefaultRecordOffset.of(0, walOffset++, 0));
|
||||
|
||||
assertTrue(LogCache.isDiscontinuous(left, right));
|
||||
|
||||
left = new LogCacheBlock(1024L * 1024);
|
||||
left.put(new StreamRecordBatch(233L, 0L, 10L, 1, TestUtils.random(20)));
|
||||
left.put(new StreamRecordBatch(234L, 0L, 10L, 1, TestUtils.random(20)));
|
||||
walOffset = 0L;
|
||||
left.put(new StreamRecordBatch(233L, 0L, 10L, 1, TestUtils.random(20)), DefaultRecordOffset.of(0, walOffset++, 0));
|
||||
left.put(new StreamRecordBatch(234L, 0L, 10L, 1, TestUtils.random(20)), DefaultRecordOffset.of(0, walOffset++, 0));
|
||||
|
||||
right = new LogCacheBlock(1024L * 1024);
|
||||
right.put(new StreamRecordBatch(233L, 0L, 11L, 1, TestUtils.random(20)));
|
||||
right.put(new StreamRecordBatch(233L, 0L, 11L, 1, TestUtils.random(20)), DefaultRecordOffset.of(0, walOffset++, 0));
|
||||
assertFalse(LogCache.isDiscontinuous(left, right));
|
||||
}
|
||||
|
||||
|
|
@ -132,13 +139,14 @@ public class LogCacheTest {
|
|||
public void testMergeBlock() {
|
||||
long size = 0;
|
||||
LogCacheBlock left = new LogCacheBlock(1024L * 1024);
|
||||
left.put(new StreamRecordBatch(233L, 0L, 10L, 1, TestUtils.random(20)));
|
||||
left.put(new StreamRecordBatch(234L, 0L, 100L, 1, TestUtils.random(20)));
|
||||
long walOffset = 0L;
|
||||
left.put(new StreamRecordBatch(233L, 0L, 10L, 1, TestUtils.random(20)), DefaultRecordOffset.of(0, walOffset++, 0));
|
||||
left.put(new StreamRecordBatch(234L, 0L, 100L, 1, TestUtils.random(20)), DefaultRecordOffset.of(0, walOffset++, 0));
|
||||
size += left.size();
|
||||
|
||||
LogCacheBlock right = new LogCacheBlock(1024L * 1024);
|
||||
right.put(new StreamRecordBatch(233L, 0L, 11L, 1, TestUtils.random(20)));
|
||||
right.put(new StreamRecordBatch(235L, 0L, 200L, 1, TestUtils.random(20)));
|
||||
right.put(new StreamRecordBatch(233L, 0L, 11L, 1, TestUtils.random(20)), DefaultRecordOffset.of(0, walOffset++, 0));
|
||||
right.put(new StreamRecordBatch(235L, 0L, 200L, 1, TestUtils.random(20)), DefaultRecordOffset.of(0, walOffset++, 0));
|
||||
size += right.size();
|
||||
|
||||
LogCache.mergeBlock(left, right);
|
||||
|
|
@ -165,4 +173,25 @@ public class LogCacheTest {
|
|||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTruncateStreamRecords() {
|
||||
LogCache cache = new LogCache(1024 * 1024, 1024 * 1024);
|
||||
long walOffset = 0L;
|
||||
|
||||
cache.put(new StreamRecordBatch(233L, 0L, 10L, 1, TestUtils.random(20)), DefaultRecordOffset.of(0, walOffset++, 0));
|
||||
cache.put(new StreamRecordBatch(233L, 0L, 11L, 1, TestUtils.random(20)), DefaultRecordOffset.of(0, walOffset++, 0));
|
||||
|
||||
Optional<LogCache.TruncateResult> result = cache.truncateStreamRecords(233L, 12L);
|
||||
assertTrue(result.isPresent());
|
||||
LogCache.TruncateResult truncateResult = result.get();
|
||||
assertTrue(truncateResult.freedBytes > 0);
|
||||
assertNotNull(truncateResult.firstRemovedWalOffset);
|
||||
assertEquals(1L, DefaultRecordOffset.of(truncateResult.firstRemovedWalOffset).offset());
|
||||
|
||||
List<StreamRecordBatch> remaining = cache.get(233L, 12L, 14L, 1000);
|
||||
assertEquals(0, remaining.size());
|
||||
|
||||
assertTrue(cache.truncateStreamRecords(233L, 12L).isEmpty());
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue