KAFKA-12258; Add support for splitting appending records (#10063)

1. Type `BatchAccumulator`. Add support for appending records into one or more batches.
2. Type `RaftClient`. Rename `scheduleAppend` to `scheduleAtomicAppend`.
3. Type `RaftClient`. Add a new method `scheduleAppend` which appends records to the log using as many batches as necessary.
4. Increase the batch size from 1MB to 8MB.

Reviewers: David Arthur <mumrah@gmail.com>, Jason Gustafson <jason@confluent.io>
This commit is contained in:
José Armando García Sancio 2021-02-18 19:46:23 -08:00 committed by GitHub
parent c8112b5ecd
commit 9243c10161
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 341 additions and 130 deletions

View File

@ -141,7 +141,7 @@ import static org.apache.kafka.raft.RaftUtil.hasValidTopicPartition;
public class KafkaRaftClient<T> implements RaftClient<T> {
private static final int RETRY_BACKOFF_BASE_MS = 100;
private static final int FETCH_MAX_WAIT_MS = 1000;
static final int MAX_BATCH_SIZE = 1024 * 1024;
static final int MAX_BATCH_SIZE = 8 * 1024 * 1024;
private final AtomicReference<GracefulShutdown> shutdown = new AtomicReference<>();
private final Logger logger;
@ -2188,13 +2188,27 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
@Override
public Long scheduleAppend(int epoch, List<T> records) {
return append(epoch, records, false);
}
@Override
public Long scheduleAtomicAppend(int epoch, List<T> records) {
return append(epoch, records, true);
}
private Long append(int epoch, List<T> records, boolean isAtomic) {
BatchAccumulator<T> accumulator = this.accumulator;
if (accumulator == null) {
return Long.MAX_VALUE;
}
boolean isFirstAppend = accumulator.isEmpty();
Long offset = accumulator.append(epoch, records);
final Long offset;
if (isAtomic) {
offset = accumulator.appendAtomic(epoch, records);
} else {
offset = accumulator.append(epoch, records);
}
// Wakeup the network channel if either this is the first append
// or the accumulator is ready to drain now. Checking for the first
@ -2351,9 +2365,10 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
/**
* This API is used for committed records originating from {@link #scheduleAppend(int, List)}
* on this instance. In this case, we are able to save the original record objects,
* which saves the need to read them back from disk. This is a nice optimization
* for the leader which is typically doing more work than all of the followers.
* or {@link #scheduleAtomicAppend(int, List)} on this instance. In this case, we are able to
* save the original record objects, which saves the need to read them back from disk. This is
* a nice optimization for the leader which is typically doing more work than all of the
* followers.
*/
public void fireHandleCommit(long baseOffset, int epoch, List<T> records) {
BatchReader.Batch<T> batch = new BatchReader.Batch<>(baseOffset, epoch, records);

View File

@ -32,11 +32,13 @@ public interface RaftClient<T> extends Closeable {
* after consuming the reader.
*
* Note that there is not a one-to-one correspondence between writes through
* {@link #scheduleAppend(int, List)} and this callback. The Raft implementation
* is free to batch together the records from multiple append calls provided
* that batch boundaries are respected. This means that each batch specified
* through {@link #scheduleAppend(int, List)} is guaranteed to be a subset of
* a batch provided by the {@link BatchReader}.
* {@link #scheduleAppend(int, List)} or {@link #scheduleAtomicAppend(int, List)}
* and this callback. The Raft implementation is free to batch together the records
* from multiple append calls provided that batch boundaries are respected. Records
* specified through {@link #scheduleAtomicAppend(int, List)} are guaranteed to be a
* subset of a batch provided by the {@link BatchReader}. Records specified through
* {@link #scheduleAppend(int, List)} are guaranteed to be in the same order but
* they can map to any number of batches provided by the {@link BatchReader}.
*
* @param reader reader instance which must be iterated and closed
*/
@ -48,7 +50,7 @@ public interface RaftClient<T> extends Closeable {
* {@link #handleCommit(BatchReader)}.
*
* After becoming a leader, the client is eligible to write to the log
* using {@link #scheduleAppend(int, List)}.
* using {@link #scheduleAppend(int, List)} or {@link #scheduleAtomicAppend(int, List)}.
*
* @param epoch the claimed leader epoch
*/
@ -84,6 +86,30 @@ public interface RaftClient<T> extends Closeable {
*/
LeaderAndEpoch leaderAndEpoch();
/**
* Append a list of records to the log. The write will be scheduled for some time
* in the future. There is no guarantee that appended records will be written to
* the log and eventually committed. While the order of the records is preserve, they can
* be appended to the log using one or more batches. Each record may be committed independently.
* If a record is committed, then all records scheduled for append during this epoch
* and prior to this record are also committed.
*
* If the provided current leader epoch does not match the current epoch, which
* is possible when the state machine has yet to observe the epoch change, then
* this method will return {@link Long#MAX_VALUE} to indicate an offset which is
* not possible to become committed. The state machine is expected to discard all
* uncommitted entries after observing an epoch change.
*
* @param epoch the current leader epoch
* @param records the list of records to append
* @return the expected offset of the last record; {@link Long#MAX_VALUE} if the records could
* be committed; null if no memory could be allocated for the batch at this time
* @throws RecordBatchTooLargeException if the size of the records is greater than the maximum
* batch size; if this exception is throw none of the elements in records were
* committed
*/
Long scheduleAppend(int epoch, List<T> records);
/**
* Append a list of records to the log. The write will be scheduled for some time
* in the future. There is no guarantee that appended records will be written to
@ -98,11 +124,13 @@ public interface RaftClient<T> extends Closeable {
*
* @param epoch the current leader epoch
* @param records the list of records to append
* @return the offset within the current epoch that the log entries will be appended,
* or null if the leader was unable to accept the write (e.g. due to memory
* being reached).
* @return the expected offset of the last record; {@link Long#MAX_VALUE} if the records could
* be committed; null if no memory could be allocated for the batch at this time
* @throws RecordBatchTooLargeException if the size of the records is greater than the maximum
* batch size; if this exception is throw none of the elements in records were
* committed
*/
Long scheduleAppend(int epoch, List<T> records);
Long scheduleAtomicAppend(int epoch, List<T> records);
/**
* Attempt a graceful shutdown of the client. This allows the leader to proactively

View File

@ -16,6 +16,7 @@
*/
package org.apache.kafka.raft.internals;
import org.apache.kafka.common.errors.RecordBatchTooLargeException;
import org.apache.kafka.common.memory.MemoryPool;
import org.apache.kafka.common.protocol.ObjectSerializationCache;
import org.apache.kafka.common.record.CompressionType;
@ -26,8 +27,10 @@ import org.apache.kafka.raft.RecordSerde;
import java.io.Closeable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.OptionalInt;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
@ -79,70 +82,111 @@ public class BatchAccumulator<T> implements Closeable {
}
/**
* Append a list of records into an atomic batch. We guarantee all records
* are included in the same underlying record batch so that either all of
* the records become committed or none of them do.
* Append a list of records into as many batches as necessary.
*
* @param epoch the expected leader epoch. If this does not match, then
* {@link Long#MAX_VALUE} will be returned as an offset which
* cannot become committed.
* @param records the list of records to include in a batch
* @return the expected offset of the last record (which will be
* {@link Long#MAX_VALUE} if the epoch does not match), or null if
* no memory could be allocated for the batch at this time
* The order of the elements in the records argument will match the order in the batches.
* This method will use as many batches as necessary to serialize all of the records. Since
* this method can split the records into multiple batches it is possible that some of the
* records will get committed while other will not when the leader fails.
*
* @param epoch the expected leader epoch. If this does not match, then {@link Long#MAX_VALUE}
* will be returned as an offset which cannot become committed
* @param records the list of records to include in the batches
* @return the expected offset of the last record; {@link Long#MAX_VALUE} if the epoch does not
* match; null if no memory could be allocated for the batch at this time
* @throws RecordBatchTooLargeException if the size of one record T is greater than the maximum
* batch size; if this exception is throw some of the elements in records may have
* been committed
*/
public Long append(int epoch, List<T> records) {
return append(epoch, records, false);
}
/**
* Append a list of records into an atomic batch. We guarantee all records are included in the
* same underlying record batch so that either all of the records become committed or none of
* them do.
*
* @param epoch the expected leader epoch. If this does not match, then {@link Long#MAX_VALUE}
* will be returned as an offset which cannot become committed
* @param records the list of records to include in a batch
* @return the expected offset of the last record; {@link Long#MAX_VALUE} if the epoch does not
* match; null if no memory could be allocated for the batch at this time
* @throws RecordBatchTooLargeException if the size of the records is greater than the maximum
* batch size; if this exception is throw none of the elements in records were
* committed
*/
public Long appendAtomic(int epoch, List<T> records) {
return append(epoch, records, true);
}
private Long append(int epoch, List<T> records, boolean isAtomic) {
if (epoch != this.epoch) {
// If the epoch does not match, then the state machine probably
// has not gotten the notification about the latest epoch change.
// In this case, ignore the append and return a large offset value
// which will never be committed.
return Long.MAX_VALUE;
}
ObjectSerializationCache serializationCache = new ObjectSerializationCache();
int batchSize = 0;
for (T record : records) {
batchSize += serde.recordSize(record, serializationCache);
}
if (batchSize > maxBatchSize) {
throw new IllegalArgumentException("The total size of " + records + " is " + batchSize +
", which exceeds the maximum allowed batch size of " + maxBatchSize);
}
appendLock.lock();
try {
maybeCompleteDrain();
BatchBuilder<T> batch = maybeAllocateBatch(batchSize);
if (batch == null) {
return null;
}
// Restart the linger timer if necessary
if (!lingerTimer.isRunning()) {
lingerTimer.reset(time.milliseconds() + lingerMs);
BatchBuilder<T> batch = null;
if (isAtomic) {
batch = maybeAllocateBatch(records, serializationCache);
}
for (T record : records) {
if (!isAtomic) {
batch = maybeAllocateBatch(Collections.singleton(record), serializationCache);
}
if (batch == null) {
return null;
}
batch.appendRecord(record, serializationCache);
nextOffset += 1;
}
maybeResetLinger();
return nextOffset - 1;
} finally {
appendLock.unlock();
}
}
private BatchBuilder<T> maybeAllocateBatch(int batchSize) {
private void maybeResetLinger() {
if (!lingerTimer.isRunning()) {
lingerTimer.reset(time.milliseconds() + lingerMs);
}
}
private BatchBuilder<T> maybeAllocateBatch(
Collection<T> records,
ObjectSerializationCache serializationCache
) {
if (currentBatch == null) {
startNewBatch();
} else if (!currentBatch.hasRoomFor(batchSize)) {
completeCurrentBatch();
startNewBatch();
}
if (currentBatch != null) {
OptionalInt bytesNeeded = currentBatch.bytesNeeded(records, serializationCache);
if (bytesNeeded.isPresent() && bytesNeeded.getAsInt() > maxBatchSize) {
throw new RecordBatchTooLargeException(
String.format(
"The total record(s) size of %s exceeds the maximum allowed batch size of %s",
bytesNeeded.getAsInt(),
maxBatchSize
)
);
} else if (bytesNeeded.isPresent()) {
completeCurrentBatch();
startNewBatch();
}
}
return currentBatch;
}
@ -298,20 +342,22 @@ public class BatchAccumulator<T> implements Closeable {
public final List<T> records;
public final MemoryRecords data;
private final MemoryPool pool;
private final ByteBuffer buffer;
// Buffer that was allocated by the MemoryPool (pool). This may not be the buffer used in
// the MemoryRecords (data) object.
private final ByteBuffer initialBuffer;
private CompletedBatch(
long baseOffset,
List<T> records,
MemoryRecords data,
MemoryPool pool,
ByteBuffer buffer
ByteBuffer initialBuffer
) {
this.baseOffset = baseOffset;
this.records = records;
this.data = data;
this.pool = pool;
this.buffer = buffer;
this.initialBuffer = initialBuffer;
}
public int sizeInBytes() {
@ -319,7 +365,7 @@ public class BatchAccumulator<T> implements Closeable {
}
public void release() {
pool.release(buffer);
pool.release(initialBuffer);
}
}

View File

@ -33,12 +33,14 @@ import org.apache.kafka.raft.RecordSerde;
import java.io.DataOutputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.OptionalInt;
/**
* Collect a set of records into a single batch. New records are added
* through {@link #appendRecord(Object, ObjectSerializationCache)}, but the caller must first
* check whether there is room using {@link #hasRoomFor(int)}. Once the
* check whether there is room using {@link #bytesNeeded(Collection, ObjectSerializationCache)}. Once the
* batch is ready, then {@link #build()} should be used to get the resulting
* {@link MemoryRecords} instance.
*
@ -85,8 +87,8 @@ public class BatchBuilder<T> {
this.maxBytes = maxBytes;
this.records = new ArrayList<>();
int batchHeaderSizeInBytes = AbstractRecords.recordBatchHeaderSizeInBytes(
RecordBatch.MAGIC_VALUE_V2, compressionType);
// field compressionType must be set before calculating the batch header size
int batchHeaderSizeInBytes = batchHeaderSizeInBytes();
batchOutput.position(initialPosition + batchHeaderSizeInBytes);
this.recordOutput = new DataOutputStreamWritable(new DataOutputStream(
@ -95,7 +97,7 @@ public class BatchBuilder<T> {
/**
* Append a record to this patch. The caller must first verify there is room for the batch
* using {@link #hasRoomFor(int)}.
* using {@link #bytesNeeded(Collection, ObjectSerializationCache)}.
*
* @param record the record to append
* @param serializationCache serialization cache for use in {@link RecordSerde#write(Object, ObjectSerializationCache, Writable)}
@ -103,7 +105,7 @@ public class BatchBuilder<T> {
*/
public long appendRecord(T record, ObjectSerializationCache serializationCache) {
if (!isOpenForAppends) {
throw new IllegalArgumentException("Cannot append new records after the batch has been built");
throw new IllegalStateException("Cannot append new records after the batch has been built");
}
if (nextOffset - baseOffset > Integer.MAX_VALUE) {
@ -123,39 +125,39 @@ public class BatchBuilder<T> {
}
/**
* Check whether the batch has enough room for a record of the given size in bytes.
* Check whether the batch has enough room for all the record values.
*
* @param sizeInBytes the size of the record to be appended
* @return true if there is room for the record to be appended, false otherwise
* Returns an empty {@link OptionalInt} if the batch builder has room for this list of records.
* Otherwise it returns the expected number of bytes needed for a batch to contain these records.
*
* @param records the records to use when checking for room
* @param serializationCache serialization cache for computing sizes
* @return empty {@link OptionalInt} if there is room for the records to be appended, otherwise
* returns the number of bytes needed
*/
public boolean hasRoomFor(int sizeInBytes) {
if (!isOpenForAppends) {
return false;
}
if (nextOffset - baseOffset >= Integer.MAX_VALUE) {
return false;
}
int recordSizeInBytes = DefaultRecord.sizeOfBodyInBytes(
(int) (nextOffset - baseOffset),
0,
-1,
sizeInBytes,
DefaultRecord.EMPTY_HEADERS
public OptionalInt bytesNeeded(Collection<T> records, ObjectSerializationCache serializationCache) {
int bytesNeeded = bytesNeededForRecords(
records,
serializationCache
);
int unusedSizeInBytes = maxBytes - approximateSizeInBytes();
if (unusedSizeInBytes >= recordSizeInBytes) {
return true;
if (!isOpenForAppends) {
return OptionalInt.of(batchHeaderSizeInBytes() + bytesNeeded);
}
int approxUnusedSizeInBytes = maxBytes - approximateSizeInBytes();
if (approxUnusedSizeInBytes >= bytesNeeded) {
return OptionalInt.empty();
} else if (unflushedBytes > 0) {
recordOutput.flush();
unflushedBytes = 0;
unusedSizeInBytes = maxBytes - flushedSizeInBytes();
return unusedSizeInBytes >= recordSizeInBytes;
} else {
return false;
int unusedSizeInBytes = maxBytes - flushedSizeInBytes();
if (unusedSizeInBytes >= bytesNeeded) {
return OptionalInt.empty();
}
}
return OptionalInt.of(batchHeaderSizeInBytes() + bytesNeeded);
}
private int flushedSizeInBytes() {
@ -307,4 +309,46 @@ public class BatchBuilder<T> {
recordOutput.writeVarint(0);
return ByteUtils.sizeOfVarint(sizeInBytes) + sizeInBytes;
}
private int batchHeaderSizeInBytes() {
return AbstractRecords.recordBatchHeaderSizeInBytes(
RecordBatch.MAGIC_VALUE_V2,
compressionType
);
}
private int bytesNeededForRecords(
Collection<T> records,
ObjectSerializationCache serializationCache
) {
long expectedNextOffset = nextOffset;
int bytesNeeded = 0;
for (T record : records) {
if (expectedNextOffset - baseOffset >= Integer.MAX_VALUE) {
throw new IllegalArgumentException(
String.format(
"Adding %s records to a batch with base offset of %s and next offset of %s",
records.size(),
baseOffset,
expectedNextOffset
)
);
}
int recordSizeInBytes = DefaultRecord.sizeOfBodyInBytes(
(int) (expectedNextOffset - baseOffset),
0,
-1,
serde.recordSize(record, serializationCache),
DefaultRecord.EMPTY_HEADERS
);
bytesNeeded = Math.addExact(bytesNeeded, ByteUtils.sizeOfVarint(recordSizeInBytes));
bytesNeeded = Math.addExact(bytesNeeded, recordSizeInBytes);
expectedNextOffset += 1;
}
return bytesNeeded;
}
}

View File

@ -19,7 +19,11 @@ package org.apache.kafka.raft.internals;
import org.apache.kafka.common.memory.MemoryPool;
import org.apache.kafka.common.protocol.ObjectSerializationCache;
import org.apache.kafka.common.protocol.Writable;
import org.apache.kafka.common.record.AbstractRecords;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.DefaultRecord;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.utils.ByteUtils;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Utils;
import org.junit.jupiter.api.Test;
@ -29,6 +33,8 @@ import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static java.util.Arrays.asList;
import static java.util.Collections.singletonList;
@ -164,10 +170,85 @@ class BatchAccumulatorTest {
@Test
public void testSingleBatchAccumulation() {
asList(APPEND, APPEND_ATOMIC).forEach(appender -> {
int leaderEpoch = 17;
long baseOffset = 157;
int lingerMs = 50;
int maxBatchSize = 512;
Mockito.when(memoryPool.tryAllocate(maxBatchSize))
.thenReturn(ByteBuffer.allocate(maxBatchSize));
BatchAccumulator<String> acc = buildAccumulator(
leaderEpoch,
baseOffset,
lingerMs,
maxBatchSize
);
List<String> records = asList("a", "b", "c", "d", "e", "f", "g", "h", "i");
assertEquals(baseOffset, appender.call(acc, leaderEpoch, records.subList(0, 1)));
assertEquals(baseOffset + 2, appender.call(acc, leaderEpoch, records.subList(1, 3)));
assertEquals(baseOffset + 5, appender.call(acc, leaderEpoch, records.subList(3, 6)));
assertEquals(baseOffset + 7, appender.call(acc, leaderEpoch, records.subList(6, 8)));
assertEquals(baseOffset + 8, appender.call(acc, leaderEpoch, records.subList(8, 9)));
time.sleep(lingerMs);
assertTrue(acc.needsDrain(time.milliseconds()));
List<BatchAccumulator.CompletedBatch<String>> batches = acc.drain();
assertEquals(1, batches.size());
assertFalse(acc.needsDrain(time.milliseconds()));
assertEquals(Long.MAX_VALUE - time.milliseconds(), acc.timeUntilDrain(time.milliseconds()));
BatchAccumulator.CompletedBatch<String> batch = batches.get(0);
assertEquals(records, batch.records);
assertEquals(baseOffset, batch.baseOffset);
});
}
@Test
public void testMultipleBatchAccumulation() {
asList(APPEND, APPEND_ATOMIC).forEach(appender -> {
int leaderEpoch = 17;
long baseOffset = 157;
int lingerMs = 50;
int maxBatchSize = 256;
Mockito.when(memoryPool.tryAllocate(maxBatchSize))
.thenReturn(ByteBuffer.allocate(maxBatchSize));
BatchAccumulator<String> acc = buildAccumulator(
leaderEpoch,
baseOffset,
lingerMs,
maxBatchSize
);
// Append entries until we have 4 batches to drain (3 completed, 1 building)
while (acc.numCompletedBatches() < 3) {
appender.call(acc, leaderEpoch, singletonList("foo"));
}
List<BatchAccumulator.CompletedBatch<String>> batches = acc.drain();
assertEquals(4, batches.size());
assertTrue(batches.stream().allMatch(batch -> batch.data.sizeInBytes() <= maxBatchSize));
});
}
@Test
public void testRecordsAreSplit() {
int leaderEpoch = 17;
long baseOffset = 157;
int lingerMs = 50;
int maxBatchSize = 512;
String record = "a";
int numberOfRecords = 9;
int recordsPerBatch = 2;
int batchHeaderSize = AbstractRecords.recordBatchHeaderSizeInBytes(
RecordBatch.MAGIC_VALUE_V2,
CompressionType.NONE
);
int maxBatchSize = batchHeaderSize + recordsPerBatch * recordSizeInBytes(record, recordsPerBatch);
Mockito.when(memoryPool.tryAllocate(maxBatchSize))
.thenReturn(ByteBuffer.allocate(maxBatchSize));
@ -179,50 +260,19 @@ class BatchAccumulatorTest {
maxBatchSize
);
List<String> records = asList("a", "b", "c", "d", "e", "f", "g", "h", "i");
assertEquals(baseOffset, acc.append(leaderEpoch, records.subList(0, 1)));
assertEquals(baseOffset + 2, acc.append(leaderEpoch, records.subList(1, 3)));
assertEquals(baseOffset + 5, acc.append(leaderEpoch, records.subList(3, 6)));
assertEquals(baseOffset + 7, acc.append(leaderEpoch, records.subList(6, 8)));
assertEquals(baseOffset + 8, acc.append(leaderEpoch, records.subList(8, 9)));
List<String> records = Stream
.generate(() -> record)
.limit(numberOfRecords)
.collect(Collectors.toList());
assertEquals(baseOffset + numberOfRecords - 1, acc.append(leaderEpoch, records));
time.sleep(lingerMs);
assertTrue(acc.needsDrain(time.milliseconds()));
List<BatchAccumulator.CompletedBatch<String>> batches = acc.drain();
assertEquals(1, batches.size());
assertFalse(acc.needsDrain(time.milliseconds()));
assertEquals(Long.MAX_VALUE - time.milliseconds(), acc.timeUntilDrain(time.milliseconds()));
BatchAccumulator.CompletedBatch<String> batch = batches.get(0);
assertEquals(records, batch.records);
assertEquals(baseOffset, batch.baseOffset);
}
@Test
public void testMultipleBatchAccumulation() {
int leaderEpoch = 17;
long baseOffset = 157;
int lingerMs = 50;
int maxBatchSize = 256;
Mockito.when(memoryPool.tryAllocate(maxBatchSize))
.thenReturn(ByteBuffer.allocate(maxBatchSize));
BatchAccumulator<String> acc = buildAccumulator(
leaderEpoch,
baseOffset,
lingerMs,
maxBatchSize
);
// Append entries until we have 4 batches to drain (3 completed, 1 building)
while (acc.numCompletedBatches() < 3) {
acc.append(leaderEpoch, singletonList("foo"));
}
List<BatchAccumulator.CompletedBatch<String>> batches = acc.drain();
assertEquals(4, batches.size());
// ceilingDiv(records.size(), recordsPerBatch)
int expectedBatches = (records.size() + recordsPerBatch - 1) / recordsPerBatch;
assertEquals(expectedBatches, batches.size());
assertTrue(batches.stream().allMatch(batch -> batch.data.sizeInBytes() <= maxBatchSize));
}
@ -306,4 +356,35 @@ class BatchAccumulatorTest {
});
}
int recordSizeInBytes(String record, int numberOfRecords) {
int serdeSize = serde.recordSize("a", new ObjectSerializationCache());
int recordSizeInBytes = DefaultRecord.sizeOfBodyInBytes(
numberOfRecords,
0,
-1,
serdeSize,
DefaultRecord.EMPTY_HEADERS
);
return ByteUtils.sizeOfVarint(recordSizeInBytes) + recordSizeInBytes;
}
static interface Appender {
Long call(BatchAccumulator<String> acc, int epoch, List<String> records);
}
static final Appender APPEND_ATOMIC = new Appender() {
@Override
public Long call(BatchAccumulator<String> acc, int epoch, List<String> records) {
return acc.appendAtomic(epoch, records);
}
};
static final Appender APPEND = new Appender() {
@Override
public Long call(BatchAccumulator<String> acc, int epoch, List<String> records) {
return acc.append(epoch, records);
}
};
}

View File

@ -31,7 +31,6 @@ import java.util.List;
import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ -69,8 +68,8 @@ class BatchBuilderTest {
records.forEach(record -> builder.appendRecord(record, null));
MemoryRecords builtRecordSet = builder.build();
assertFalse(builder.hasRoomFor(1));
assertThrows(IllegalArgumentException.class, () -> builder.appendRecord("a", null));
assertTrue(builder.bytesNeeded(Arrays.asList("a"), null).isPresent());
assertThrows(IllegalStateException.class, () -> builder.appendRecord("a", null));
List<MutableRecordBatch> builtBatches = Utils.toList(builtRecordSet.batchIterator());
assertEquals(1, builtBatches.size());
@ -112,9 +111,8 @@ class BatchBuilderTest {
);
String record = "i am a record";
int recordSize = serde.recordSize(record);
while (builder.hasRoomFor(recordSize)) {
while (!builder.bytesNeeded(Arrays.asList(record), null).isPresent()) {
builder.appendRecord(record, null);
}
@ -125,5 +123,4 @@ class BatchBuilderTest {
assertTrue(sizeInBytes <= batchSize, "Built batch size "
+ sizeInBytes + " is larger than max batch size " + batchSize);
}
}