KAFKA-14406: Fix double iteration of restoring records (#12842)

While restoring a batch of records, RocksDBStore was iterating the ConsumerRecords, building a list of KeyValues, and then iterating that list of KeyValues to add them to the RocksDB batch.

Simply adding the key and value directly to the RocksDB batch prevents this unnecessary second iteration, and the creation of itermediate KeyValue objects, improving the performance of state restoration, and reducing unnecessary object allocation.

This also simplifies the API of RocksDBAccessor, as prepareBatchForRestore is no longer needed.

Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>, Walker Carlson <wcarlson@confluent.io>
This commit is contained in:
Nick Telford 2022-11-19 04:44:56 +00:00 committed by GitHub
parent 795390a3c8
commit 1d6430249b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 1 additions and 23 deletions

View File

@ -584,9 +584,6 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
void flush() throws RocksDBException;
void prepareBatchForRestore(final Collection<KeyValue<byte[], byte[]>> records,
final WriteBatch batch) throws RocksDBException;
void addToBatch(final byte[] key,
final byte[] value,
final WriteBatch batch) throws RocksDBException;
@ -700,14 +697,6 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
db.flush(fOptions, columnFamily);
}
@Override
public void prepareBatchForRestore(final Collection<KeyValue<byte[], byte[]>> records,
final WriteBatch batch) throws RocksDBException {
for (final KeyValue<byte[], byte[]> record : records) {
addToBatch(record.key, record.value, batch);
}
}
@Override
public void addToBatch(final byte[] key,
final byte[] value,
@ -727,7 +716,6 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
void restoreBatch(final Collection<ConsumerRecord<byte[], byte[]>> records) {
try (final WriteBatch batch = new WriteBatch()) {
final List<KeyValue<byte[], byte[]>> keyValues = new ArrayList<>();
for (final ConsumerRecord<byte[], byte[]> record : records) {
ChangelogRecordDeserializationHelper.applyChecksAndUpdatePosition(
record,
@ -735,9 +723,8 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
position
);
// If version headers are not present or version is V0
keyValues.add(new KeyValue<>(record.key(), record.value()));
dbAccessor.addToBatch(record.key(), record.value(), batch);
}
dbAccessor.prepareBatchForRestore(keyValues, batch);
write(batch);
} catch (final RocksDBException e) {
throw new ProcessorStateException("Error restoring batch to store " + name, e);

View File

@ -37,7 +37,6 @@ import org.slf4j.LoggerFactory;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.NoSuchElementException;
@ -262,14 +261,6 @@ public class RocksDBTimestampedStore extends RocksDBStore implements Timestamped
db.flush(fOptions, newColumnFamily);
}
@Override
public void prepareBatchForRestore(final Collection<KeyValue<byte[], byte[]>> records,
final WriteBatch batch) throws RocksDBException {
for (final KeyValue<byte[], byte[]> record : records) {
addToBatch(record.key, record.value, batch);
}
}
@Override
public void addToBatch(final byte[] key,
final byte[] value,