MINOR: Migrate CoordinatorLoaderImpl from Scala to Java (#20089)
CI / build (push) Waiting to run Details

### Summary of Changes

- Rewrote both `CoordinatorLoaderImpl` and `CoordinatorLoaderImplTest`
in Java, replacing their original Scala implementations.
- Removed the direct dependency on `ReplicaManager` and replaced it with
functional interfaces for `partitionLogSupplier` and
`partitionLogEndOffsetSupplier`
- Preserved original logic and test coverage during migration.

Reviewers: TaiJuWu <tjwu1217@gmail.com>, Ken Huang <s7133700@gmail.com>,
 TengYao Chi <frankvicky@apache.org>, Chia-Ping Tsai
 <chia7712@gmail.com>
This commit is contained in:
Logan Zhu 2025-07-18 01:51:46 +08:00 committed by GitHub
parent 4a2d4ee76a
commit d03878c7fb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 1044 additions and 886 deletions

View File

@ -1663,6 +1663,7 @@ project(':coordinator-common') {
testImplementation project(':clients').sourceSets.test.output
testImplementation project(':server-common').sourceSets.test.output
testImplementation project(':core')
testImplementation libs.junitJupiter
testImplementation libs.mockitoCore
testImplementation testLog4j2Libs

View File

@ -27,8 +27,6 @@
<allow pkg="org.slf4j" />
<allow pkg="org.junit" />
<allow pkg="org.mockito" />
<!-- no one depends on the server -->
<disallow pkg="kafka" />
<!-- anyone can use public classes -->
<allow pkg="org.apache.kafka.common.errors" exact-match="true" />
@ -43,6 +41,7 @@
<subpackage name="common">
<subpackage name="runtime">
<allow pkg="com.yammer.metrics.core" />
<allow pkg="kafka.server" />
<allow pkg="org.apache.kafka.clients.consumer" />
<allow pkg="org.apache.kafka.common.annotation" />
<allow pkg="org.apache.kafka.common.compress" />
@ -61,11 +60,13 @@
<allow pkg="org.apache.kafka.server.authorizer" />
<allow pkg="org.apache.kafka.server.common" />
<allow pkg="org.apache.kafka.server.metrics" />
<allow pkg="org.apache.kafka.server.storage" />
<allow pkg="org.apache.kafka.server.util" />
<allow pkg="org.apache.kafka.storage.internals.log" />
<allow pkg="org.apache.kafka.test" />
<allow pkg="org.apache.kafka.timeline" />
<allow pkg="org.HdrHistogram" />
<allow pkg="scala" />
</subpackage>
</subpackage>
</subpackage>

View File

@ -0,0 +1,331 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.common.runtime;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
import org.apache.kafka.common.record.ControlRecordType;
import org.apache.kafka.common.record.FileRecords;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MutableRecordBatch;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.requests.TransactionResult;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.storage.log.FetchIsolation;
import org.apache.kafka.server.util.KafkaScheduler;
import org.apache.kafka.storage.internals.log.FetchDataInfo;
import org.apache.kafka.storage.internals.log.UnifiedLog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
/**
* Coordinator loader which reads records from a partition and replays them
* to a group coordinator.
*
* @param <T> The record type.
*/
public class CoordinatorLoaderImpl<T> implements CoordinatorLoader<T> {
private static final Logger LOG = LoggerFactory.getLogger(CoordinatorLoaderImpl.class);
private final Time time;
private final Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier;
private final Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier;
private final Deserializer<T> deserializer;
private final int loadBufferSize;
private final AtomicBoolean isRunning = new AtomicBoolean(true);
private final KafkaScheduler scheduler = new KafkaScheduler(1);
public CoordinatorLoaderImpl(
Time time,
Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier,
Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier,
Deserializer<T> deserializer,
int loadBufferSize
) {
this.time = time;
this.partitionLogSupplier = partitionLogSupplier;
this.partitionLogEndOffsetSupplier = partitionLogEndOffsetSupplier;
this.deserializer = deserializer;
this.loadBufferSize = loadBufferSize;
this.scheduler.startup();
}
/**
* Loads the coordinator by reading all the records from the TopicPartition
* and applying them to the Replayable object.
*
* @param tp The TopicPartition to read from.
* @param coordinator The object to apply records to.
*/
@Override
public CompletableFuture<LoadSummary> load(TopicPartition tp, CoordinatorPlayback<T> coordinator) {
final CompletableFuture<LoadSummary> future = new CompletableFuture<>();
long startTimeMs = time.milliseconds();
try {
ScheduledFuture<?> result = scheduler.scheduleOnce(String.format("Load coordinator from %s", tp),
() -> doLoad(tp, coordinator, future, startTimeMs));
if (result.isCancelled()) {
future.completeExceptionally(new RuntimeException("Coordinator loader is closed."));
}
} catch (Exception e) {
future.completeExceptionally(e);
}
return future;
}
private void doLoad(
TopicPartition tp,
CoordinatorPlayback<T> coordinator,
CompletableFuture<LoadSummary> future,
long startTimeMs
) {
long schedulerQueueTimeMs = time.milliseconds() - startTimeMs;
try {
Optional<UnifiedLog> logOpt = partitionLogSupplier.apply(tp);
if (logOpt.isEmpty()) {
future.completeExceptionally(new NotLeaderOrFollowerException(
"Could not load records from " + tp + " because the log does not exist."));
return;
}
UnifiedLog log = logOpt.get();
// Buffer may not be needed if records are read from memory.
ByteBuffer buffer = ByteBuffer.allocate(0);
long currentOffset = log.logStartOffset();
LoadStats stats = new LoadStats();
long previousHighWatermark = -1L;
while (shouldFetchNextBatch(currentOffset, logEndOffset(tp), stats.readAtLeastOneRecord)) {
FetchDataInfo fetchDataInfo = log.read(currentOffset, loadBufferSize, FetchIsolation.LOG_END, true);
stats.readAtLeastOneRecord = fetchDataInfo.records.sizeInBytes() > 0;
// Reuses a potentially larger buffer by updating it when reading from FileRecords.
MemoryRecords memoryRecords = toReadableMemoryRecords(tp, fetchDataInfo.records, buffer);
if (fetchDataInfo.records instanceof FileRecords) {
buffer = memoryRecords.buffer();
}
ReplayResult replayResult = processMemoryRecords(tp, log, memoryRecords, coordinator, stats, currentOffset, previousHighWatermark);
currentOffset = replayResult.nextOffset;
previousHighWatermark = replayResult.highWatermark;
}
long endTimeMs = time.milliseconds();
if (logEndOffset(tp) == -1L) {
future.completeExceptionally(new NotLeaderOrFollowerException(
String.format("Stopped loading records from %s because the partition is not online or is no longer the leader.", tp)
));
} else if (isRunning.get()) {
future.complete(new LoadSummary(startTimeMs, endTimeMs, schedulerQueueTimeMs, stats.numRecords, stats.numBytes));
} else {
future.completeExceptionally(new RuntimeException("Coordinator loader is closed."));
}
} catch (Throwable ex) {
future.completeExceptionally(ex);
}
}
private long logEndOffset(TopicPartition tp) {
return partitionLogEndOffsetSupplier.apply(tp).orElse(-1L);
}
/**
* Returns true if it's still valid to fetch the next batch of records.
* <p>
* This method ensures fetching continues only under safe and meaningful conditions:
* <ul>
* <li>The current offset is less than the log end offset.</li>
* <li>At least one record was read in the previous fetch. This ensures that fetching stops even if the
* current offset remains smaller than the log end offset but the log is empty. This could happen with compacted topics.</li>
* <li>The log end offset is not -1L, which ensures the partition is online and is still the leader.</li>
* <li>The loader is still running.</li>
* </ul>
*/
private boolean shouldFetchNextBatch(long currentOffset, long logEndOffset, boolean readAtLeastOneRecord) {
return currentOffset < logEndOffset && readAtLeastOneRecord && isRunning.get();
}
private MemoryRecords toReadableMemoryRecords(TopicPartition tp, Records records, ByteBuffer buffer) throws IOException {
if (records instanceof MemoryRecords memoryRecords) {
return memoryRecords;
} else if (records instanceof FileRecords fileRecords) {
int sizeInBytes = fileRecords.sizeInBytes();
int bytesNeeded = Math.max(loadBufferSize, sizeInBytes);
// "minOneMessage = true in the above log.read() means that the buffer may need to
// be grown to ensure progress can be made.
if (buffer.capacity() < bytesNeeded) {
if (loadBufferSize < bytesNeeded) {
LOG.warn("Loaded metadata from {} with buffer larger ({} bytes) than" +
" configured buffer size ({} bytes).", tp, bytesNeeded, loadBufferSize);
}
buffer = ByteBuffer.allocate(bytesNeeded);
} else {
buffer.clear();
}
fileRecords.readInto(buffer, 0);
return MemoryRecords.readableRecords(buffer);
} else {
throw new IllegalArgumentException("Unsupported record type: " + records.getClass());
}
}
private ReplayResult processMemoryRecords(
TopicPartition tp,
UnifiedLog log,
MemoryRecords memoryRecords,
CoordinatorPlayback<T> coordinator,
LoadStats loadStats,
long currentOffset,
long previousHighWatermark
) {
for (MutableRecordBatch batch : memoryRecords.batches()) {
if (batch.isControlBatch()) {
for (Record record : batch) {
ControlRecordType controlRecord = ControlRecordType.parse(record.key());
if (controlRecord == ControlRecordType.COMMIT) {
if (LOG.isTraceEnabled()) {
LOG.trace("Replaying end transaction marker from {} at offset {} to commit" +
" transaction with producer id {} and producer epoch {}.",
tp, record.offset(), batch.producerId(), batch.producerEpoch());
}
coordinator.replayEndTransactionMarker(
batch.producerId(),
batch.producerEpoch(),
TransactionResult.COMMIT
);
} else if (controlRecord == ControlRecordType.ABORT) {
if (LOG.isTraceEnabled()) {
LOG.trace("Replaying end transaction marker from {} at offset {} to abort" +
" transaction with producer id {} and producer epoch {}.",
tp, record.offset(), batch.producerId(), batch.producerEpoch());
}
coordinator.replayEndTransactionMarker(
batch.producerId(),
batch.producerEpoch(),
TransactionResult.ABORT
);
}
}
} else {
for (Record record : batch) {
loadStats.numRecords++;
Optional<T> coordinatorRecordOpt = Optional.empty();
try {
coordinatorRecordOpt = Optional.ofNullable(deserializer.deserialize(record.key(), record.value()));
} catch (Deserializer.UnknownRecordTypeException ex) {
LOG.warn("Unknown record type {} while loading offsets and group metadata from {}." +
" Ignoring it. It could be a left over from an aborted upgrade.", ex.unknownType(), tp);
} catch (RuntimeException ex) {
String msg = String.format("Deserializing record %s from %s failed.", record, tp);
LOG.error(msg, ex);
throw new RuntimeException(msg, ex);
}
coordinatorRecordOpt.ifPresent(coordinatorRecord -> {
try {
if (LOG.isTraceEnabled()) {
LOG.trace("Replaying record {} from {} at offset {} with producer id {}" +
" and producer epoch {}.", coordinatorRecord, tp, record.offset(), batch.producerId(), batch.producerEpoch());
}
coordinator.replay(
record.offset(),
batch.producerId(),
batch.producerEpoch(),
coordinatorRecord
);
} catch (RuntimeException ex) {
String msg = String.format("Replaying record %s from %s at offset %d with producer id %d and" +
" producer epoch %d failed.", coordinatorRecord, tp, record.offset(),
batch.producerId(), batch.producerEpoch());
LOG.error(msg, ex);
throw new RuntimeException(msg, ex);
}
});
}
}
// Note that the high watermark can be greater than the current offset but as we load more records
// the current offset will eventually surpass the high watermark. Also note that the high watermark
// will continue to advance while loading.
currentOffset = batch.nextOffset();
long currentHighWatermark = log.highWatermark();
if (currentOffset >= currentHighWatermark) {
coordinator.updateLastWrittenOffset(currentOffset);
if (currentHighWatermark > previousHighWatermark) {
coordinator.updateLastCommittedOffset(currentHighWatermark);
previousHighWatermark = currentHighWatermark;
}
}
}
loadStats.numBytes += memoryRecords.sizeInBytes();
return new ReplayResult(currentOffset, previousHighWatermark);
}
/**
* Closes the loader.
*/
@Override
public void close() throws Exception {
if (!isRunning.compareAndSet(true, false)) {
LOG.warn("Coordinator loader is already shutting down.");
return;
}
scheduler.shutdown();
}
/**
* A helper class to track key metrics during the data loading operation.
*/
private static class LoadStats {
private long numRecords = 0L;
private long numBytes = 0L;
private boolean readAtLeastOneRecord = true;
@Override
public String toString() {
return "LoadStats{" +
"numRecords=" + numRecords +
", numBytes=" + numBytes +
", readAtLeastOneRecord=" + readAtLeastOneRecord +
'}';
}
}
private record ReplayResult(long nextOffset, long highWatermark) {
}
}

View File

@ -0,0 +1,703 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.common.runtime;
import kafka.server.ReplicaManager;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.compress.Compression;
import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
import org.apache.kafka.common.record.ControlRecordType;
import org.apache.kafka.common.record.EndTransactionMarker;
import org.apache.kafka.common.record.FileRecords;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.SimpleRecord;
import org.apache.kafka.common.requests.TransactionResult;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.storage.log.FetchIsolation;
import org.apache.kafka.storage.internals.log.FetchDataInfo;
import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
import org.apache.kafka.storage.internals.log.UnifiedLog;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.mockito.ArgumentMatchers;
import org.mockito.invocation.InvocationOnMock;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import scala.Option;
import static org.apache.kafka.test.TestUtils.assertFutureThrows;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static scala.jdk.javaapi.OptionConverters.toJava;
import static scala.jdk.javaapi.OptionConverters.toScala;
@SuppressWarnings("unchecked")
@Timeout(60)
class CoordinatorLoaderImplTest {
private record Tuple<K, V>(K key, V value) {
}
private static class StringKeyValueDeserializer implements Deserializer<Tuple<String, String>> {
@Override
public Tuple<String, String> deserialize(ByteBuffer key, ByteBuffer value) throws RuntimeException {
return new Tuple<>(
StandardCharsets.UTF_8.decode(key).toString(),
StandardCharsets.UTF_8.decode(value).toString()
);
}
}
@Test
void testNonexistentPartition() throws Exception {
TopicPartition tp = new TopicPartition("foo", 0);
ReplicaManager replicaManager = mock(ReplicaManager.class);
Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier = partition -> toJava(replicaManager.getLog(partition));
Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier = partition -> Optional.ofNullable((Long) replicaManager.getLogEndOffset(partition).get());
Deserializer<Tuple<String, String>> serde = mock(Deserializer.class);
CoordinatorPlayback<Tuple<String, String>> coordinator = mock(CoordinatorPlayback.class);
try (CoordinatorLoaderImpl<Tuple<String, String>> loader = new CoordinatorLoaderImpl<>(
Time.SYSTEM,
partitionLogSupplier,
partitionLogEndOffsetSupplier,
serde,
1000
)) {
when(replicaManager.getLog(tp)).thenReturn(Option.empty());
assertFutureThrows(NotLeaderOrFollowerException.class, loader.load(tp, coordinator));
}
}
@Test
void testLoadingIsRejectedWhenClosed() throws Exception {
TopicPartition tp = new TopicPartition("foo", 0);
ReplicaManager replicaManager = mock(ReplicaManager.class);
Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier = partition -> toJava(replicaManager.getLog(partition));
Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier = partition -> Optional.ofNullable((Long) replicaManager.getLogEndOffset(partition).get());
Deserializer<Tuple<String, String>> serde = mock(Deserializer.class);
CoordinatorPlayback<Tuple<String, String>> coordinator = mock(CoordinatorPlayback.class);
try (CoordinatorLoaderImpl<Tuple<String, String>> loader = new CoordinatorLoaderImpl<>(
Time.SYSTEM,
partitionLogSupplier,
partitionLogEndOffsetSupplier,
serde,
1000
)) {
loader.close();
assertFutureThrows(RuntimeException.class, loader.load(tp, coordinator));
}
}
@Test
void testLoading() throws Exception {
TopicPartition tp = new TopicPartition("foo", 0);
ReplicaManager replicaManager = mock(ReplicaManager.class);
Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier = partition -> toJava(replicaManager.getLog(partition));
Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier = partition -> Optional.ofNullable((Long) replicaManager.getLogEndOffset(partition).get());
Deserializer<Tuple<String, String>> serde = new StringKeyValueDeserializer();
UnifiedLog log = mock(UnifiedLog.class);
CoordinatorPlayback<Tuple<String, String>> coordinator = mock(CoordinatorPlayback.class);
try (CoordinatorLoaderImpl<Tuple<String, String>> loader = new CoordinatorLoaderImpl<>(
Time.SYSTEM,
partitionLogSupplier,
partitionLogEndOffsetSupplier,
serde,
1000
)) {
when(replicaManager.getLog(tp)).thenReturn(toScala(Optional.of(log)));
when(log.logStartOffset()).thenReturn(0L);
when(replicaManager.getLogEndOffset(tp)).thenReturn(toScala(Optional.of(9L)));
when(log.highWatermark()).thenReturn(0L);
FetchDataInfo readResult1 = logReadResult(0, Arrays.asList(
new SimpleRecord("k1".getBytes(), "v1".getBytes()),
new SimpleRecord("k2".getBytes(), "v2".getBytes())
));
when(log.read(0L, 1000, FetchIsolation.LOG_END, true))
.thenReturn(readResult1);
FetchDataInfo readResult2 = logReadResult(2, Arrays.asList(
new SimpleRecord("k3".getBytes(), "v3".getBytes()),
new SimpleRecord("k4".getBytes(), "v4".getBytes()),
new SimpleRecord("k5".getBytes(), "v5".getBytes())
));
when(log.read(2L, 1000, FetchIsolation.LOG_END, true))
.thenReturn(readResult2);
FetchDataInfo readResult3 = logReadResult(5, 100L, (short) 5, Arrays.asList(
new SimpleRecord("k6".getBytes(), "v6".getBytes()),
new SimpleRecord("k7".getBytes(), "v7".getBytes())
));
when(log.read(5L, 1000, FetchIsolation.LOG_END, true))
.thenReturn(readResult3);
FetchDataInfo readResult4 = logReadResult(
7,
100L,
(short) 5,
ControlRecordType.COMMIT
);
when(log.read(7L, 1000, FetchIsolation.LOG_END, true))
.thenReturn(readResult4);
FetchDataInfo readResult5 = logReadResult(
8,
500L,
(short) 10,
ControlRecordType.ABORT
);
when(log.read(8L, 1000, FetchIsolation.LOG_END, true))
.thenReturn(readResult5);
assertNotNull(loader.load(tp, coordinator).get(10, TimeUnit.SECONDS));
verify(coordinator).replay(0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k1", "v1"));
verify(coordinator).replay(1L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k2", "v2"));
verify(coordinator).replay(2L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k3", "v3"));
verify(coordinator).replay(3L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k4", "v4"));
verify(coordinator).replay(4L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k5", "v5"));
verify(coordinator).replay(5L, 100L, (short) 5, new Tuple<>("k6", "v6"));
verify(coordinator).replay(6L, 100L, (short) 5, new Tuple<>("k7", "v7"));
verify(coordinator).replayEndTransactionMarker(100L, (short) 5, TransactionResult.COMMIT);
verify(coordinator).replayEndTransactionMarker(500L, (short) 10, TransactionResult.ABORT);
verify(coordinator).updateLastWrittenOffset(2L);
verify(coordinator).updateLastWrittenOffset(5L);
verify(coordinator).updateLastWrittenOffset(7L);
verify(coordinator).updateLastWrittenOffset(8L);
verify(coordinator).updateLastCommittedOffset(0L);
}
}
@Test
void testLoadingStoppedWhenClosed() throws Exception {
TopicPartition tp = new TopicPartition("foo", 0);
ReplicaManager replicaManager = mock(ReplicaManager.class);
Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier = partition -> toJava(replicaManager.getLog(partition));
Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier = partition -> Optional.ofNullable((Long) replicaManager.getLogEndOffset(partition).get());
Deserializer<Tuple<String, String>> serde = new StringKeyValueDeserializer();
UnifiedLog log = mock(UnifiedLog.class);
CoordinatorPlayback<Tuple<String, String>> coordinator = mock(CoordinatorPlayback.class);
try (CoordinatorLoaderImpl<Tuple<String, String>> loader = new CoordinatorLoaderImpl<>(
Time.SYSTEM,
partitionLogSupplier,
partitionLogEndOffsetSupplier,
serde,
1000
)) {
when(replicaManager.getLog(tp)).thenReturn(toScala(Optional.of(log)));
when(log.logStartOffset()).thenReturn(0L);
when(replicaManager.getLogEndOffset(tp)).thenReturn(toScala(Optional.of(100L)));
FetchDataInfo readResult = logReadResult(0, Arrays.asList(
new SimpleRecord("k1".getBytes(), "v1".getBytes()),
new SimpleRecord("k2".getBytes(), "v2".getBytes())
));
CountDownLatch latch = new CountDownLatch(1);
when(log.read(
anyLong(),
eq(1000),
eq(FetchIsolation.LOG_END),
eq(true)
)).thenAnswer((InvocationOnMock invocation) -> {
latch.countDown();
return readResult;
});
CompletableFuture<CoordinatorLoader.LoadSummary> result = loader.load(tp, coordinator);
boolean completed = latch.await(10, TimeUnit.SECONDS);
assertTrue(completed, "Log read timeout: Latch did not count down in time.");
loader.close();
RuntimeException ex = assertFutureThrows(RuntimeException.class, result);
assertNotNull(ex);
assertEquals("Coordinator loader is closed.", ex.getMessage());
}
}
@Test
void testUnknownRecordTypeAreIgnored() throws Exception {
TopicPartition tp = new TopicPartition("foo", 0);
ReplicaManager replicaManager = mock(ReplicaManager.class);
Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier = partition -> toJava(replicaManager.getLog(partition));
Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier = partition -> Optional.ofNullable((Long) replicaManager.getLogEndOffset(partition).get());
StringKeyValueDeserializer serde = mock(StringKeyValueDeserializer.class);
UnifiedLog log = mock(UnifiedLog.class);
CoordinatorPlayback<Tuple<String, String>> coordinator = mock(CoordinatorPlayback.class);
try (CoordinatorLoaderImpl<Tuple<String, String>> loader = new CoordinatorLoaderImpl<>(
Time.SYSTEM,
partitionLogSupplier,
partitionLogEndOffsetSupplier,
serde,
1000
)) {
when(replicaManager.getLog(tp)).thenReturn(toScala(Optional.of(log)));
when(log.logStartOffset()).thenReturn(0L);
when(replicaManager.getLogEndOffset(tp)).thenReturn(toScala(Optional.of(2L)));
FetchDataInfo readResult = logReadResult(0, Arrays.asList(
new SimpleRecord("k1".getBytes(), "v1".getBytes()),
new SimpleRecord("k2".getBytes(), "v2".getBytes())
));
when(log.read(0L, 1000, FetchIsolation.LOG_END, true))
.thenReturn(readResult);
when(serde.deserialize(any(ByteBuffer.class), any(ByteBuffer.class)))
.thenThrow(new Deserializer.UnknownRecordTypeException((short) 1))
.thenReturn(new Tuple<>("k2", "v2"));
loader.load(tp, coordinator).get(10, TimeUnit.SECONDS);
verify(coordinator).replay(1L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k2", "v2"));
}
}
@Test
void testDeserializationErrorFailsTheLoading() throws Exception {
TopicPartition tp = new TopicPartition("foo", 0);
ReplicaManager replicaManager = mock(ReplicaManager.class);
Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier = partition -> toJava(replicaManager.getLog(partition));
Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier = partition -> Optional.ofNullable((Long) replicaManager.getLogEndOffset(partition).get());
StringKeyValueDeserializer serde = mock(StringKeyValueDeserializer.class);
UnifiedLog log = mock(UnifiedLog.class);
CoordinatorPlayback<Tuple<String, String>> coordinator = mock(CoordinatorPlayback.class);
try (CoordinatorLoaderImpl<Tuple<String, String>> loader = new CoordinatorLoaderImpl<>(
Time.SYSTEM,
partitionLogSupplier,
partitionLogEndOffsetSupplier,
serde,
1000
)) {
when(replicaManager.getLog(tp)).thenReturn(toScala(Optional.of(log)));
when(log.logStartOffset()).thenReturn(0L);
when(replicaManager.getLogEndOffset(tp)).thenReturn(toScala(Optional.of(2L)));
FetchDataInfo readResult = logReadResult(0, Arrays.asList(
new SimpleRecord("k1".getBytes(), "v1".getBytes()),
new SimpleRecord("k2".getBytes(), "v2".getBytes())
));
when(log.read(0L, 1000, FetchIsolation.LOG_END, true))
.thenReturn(readResult);
when(serde.deserialize(any(ByteBuffer.class), any(ByteBuffer.class)))
.thenThrow(new RuntimeException("Error!"));
RuntimeException ex = assertFutureThrows(RuntimeException.class, loader.load(tp, coordinator));
assertNotNull(ex);
assertEquals(String.format("Deserializing record DefaultRecord(offset=0, timestamp=-1, key=2 bytes, value=2 bytes) from %s failed.", tp), ex.getMessage());
}
}
@Test
void testLoadGroupAndOffsetsWithCorruptedLog() throws Exception {
// Simulate a case where startOffset < endOffset but log is empty. This could theoretically happen
// when all the records are expired and the active segment is truncated or when the partition
// is accidentally corrupted.
TopicPartition tp = new TopicPartition("foo", 0);
ReplicaManager replicaManager = mock(ReplicaManager.class);
Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier = partition -> toJava(replicaManager.getLog(partition));
Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier = partition -> Optional.ofNullable((Long) replicaManager.getLogEndOffset(partition).get());
StringKeyValueDeserializer serde = mock(StringKeyValueDeserializer.class);
UnifiedLog log = mock(UnifiedLog.class);
CoordinatorPlayback<Tuple<String, String>> coordinator = mock(CoordinatorPlayback.class);
try (CoordinatorLoaderImpl<Tuple<String, String>> loader = new CoordinatorLoaderImpl<>(
Time.SYSTEM,
partitionLogSupplier,
partitionLogEndOffsetSupplier,
serde,
1000
)) {
when(replicaManager.getLog(tp)).thenReturn(toScala(Optional.of(log)));
when(log.logStartOffset()).thenReturn(0L);
when(replicaManager.getLogEndOffset(tp)).thenReturn(toScala(Optional.of(10L)));
FetchDataInfo readResult = logReadResult(0, List.of());
when(log.read(0L, 1000, FetchIsolation.LOG_END, true))
.thenReturn(readResult);
assertNotNull(loader.load(tp, coordinator).get(10, TimeUnit.SECONDS));
}
}
@Test
void testLoadSummary() throws Exception {
TopicPartition tp = new TopicPartition("foo", 0);
ReplicaManager replicaManager = mock(ReplicaManager.class);
Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier = partition -> toJava(replicaManager.getLog(partition));
Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier = partition -> Optional.ofNullable((Long) replicaManager.getLogEndOffset(partition).get());
StringKeyValueDeserializer serde = new StringKeyValueDeserializer();
UnifiedLog log = mock(UnifiedLog.class);
CoordinatorPlayback<Tuple<String, String>> coordinator = mock(CoordinatorPlayback.class);
MockTime time = new MockTime();
try (CoordinatorLoaderImpl<Tuple<String, String>> loader = new CoordinatorLoaderImpl<>(
time,
partitionLogSupplier,
partitionLogEndOffsetSupplier,
serde,
1000
)) {
long startTimeMs = time.milliseconds();
when(replicaManager.getLog(tp)).thenReturn(toScala(Optional.of(log)));
when(log.logStartOffset()).thenReturn(0L);
when(replicaManager.getLogEndOffset(tp)).thenReturn(toScala(Optional.of(5L)));
FetchDataInfo readResult1 = logReadResult(0, Arrays.asList(
new SimpleRecord("k1".getBytes(), "v1".getBytes()),
new SimpleRecord("k2".getBytes(), "v2".getBytes())
));
when(log.read(0L, 1000, FetchIsolation.LOG_END, true))
.thenAnswer((InvocationOnMock invocation) -> {
time.sleep(1000);
return readResult1;
});
FetchDataInfo readResult2 = logReadResult(2, Arrays.asList(
new SimpleRecord("k3".getBytes(), "v3".getBytes()),
new SimpleRecord("k4".getBytes(), "v4".getBytes()),
new SimpleRecord("k5".getBytes(), "v5".getBytes())
));
when(log.read(2L, 1000, FetchIsolation.LOG_END, true))
.thenReturn(readResult2);
CoordinatorLoader.LoadSummary summary = loader.load(tp, coordinator).get(10, TimeUnit.SECONDS);
assertEquals(startTimeMs, summary.startTimeMs());
assertEquals(startTimeMs + 1000, summary.endTimeMs());
assertEquals(5, summary.numRecords());
assertEquals(readResult1.records.sizeInBytes() + readResult2.records.sizeInBytes(), summary.numBytes());
}
}
@Test
void testUpdateLastWrittenOffsetOnBatchLoaded() throws Exception {
TopicPartition tp = new TopicPartition("foo", 0);
ReplicaManager replicaManager = mock(ReplicaManager.class);
Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier = partition -> toJava(replicaManager.getLog(partition));
Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier = partition -> Optional.ofNullable((Long) replicaManager.getLogEndOffset(partition).get());
StringKeyValueDeserializer serde = new StringKeyValueDeserializer();
UnifiedLog log = mock(UnifiedLog.class);
CoordinatorPlayback<Tuple<String, String>> coordinator = mock(CoordinatorPlayback.class);
try (CoordinatorLoaderImpl<Tuple<String, String>> loader = new CoordinatorLoaderImpl<>(
Time.SYSTEM,
partitionLogSupplier,
partitionLogEndOffsetSupplier,
serde,
1000
)) {
when(replicaManager.getLog(tp)).thenReturn(toScala(Optional.of(log)));
when(log.logStartOffset()).thenReturn(0L);
when(log.highWatermark()).thenReturn(0L, 0L, 2L);
when(replicaManager.getLogEndOffset(tp)).thenReturn(toScala(Optional.of(7L)));
FetchDataInfo readResult1 = logReadResult(0, Arrays.asList(
new SimpleRecord("k1".getBytes(), "v1".getBytes()),
new SimpleRecord("k2".getBytes(), "v2".getBytes())
));
when(log.read(0L, 1000, FetchIsolation.LOG_END, true))
.thenReturn(readResult1);
FetchDataInfo readResult2 = logReadResult(2, Arrays.asList(
new SimpleRecord("k3".getBytes(), "v3".getBytes()),
new SimpleRecord("k4".getBytes(), "v4".getBytes()),
new SimpleRecord("k5".getBytes(), "v5".getBytes())
));
when(log.read(2L, 1000, FetchIsolation.LOG_END, true))
.thenReturn(readResult2);
FetchDataInfo readResult3 = logReadResult(5, Arrays.asList(
new SimpleRecord("k6".getBytes(), "v6".getBytes()),
new SimpleRecord("k7".getBytes(), "v7".getBytes())
));
when(log.read(5L, 1000, FetchIsolation.LOG_END, true))
.thenReturn(readResult3);
assertNotNull(loader.load(tp, coordinator).get(10, TimeUnit.SECONDS));
verify(coordinator).replay(0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k1", "v1"));
verify(coordinator).replay(1L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k2", "v2"));
verify(coordinator).replay(2L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k3", "v3"));
verify(coordinator).replay(3L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k4", "v4"));
verify(coordinator).replay(4L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k5", "v5"));
verify(coordinator).replay(5L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k6", "v6"));
verify(coordinator).replay(6L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k7", "v7"));
verify(coordinator, times(0)).updateLastWrittenOffset(0L);
verify(coordinator, times(1)).updateLastWrittenOffset(2L);
verify(coordinator, times(1)).updateLastWrittenOffset(5L);
verify(coordinator, times(1)).updateLastWrittenOffset(7L);
verify(coordinator, times(1)).updateLastCommittedOffset(0L);
verify(coordinator, times(1)).updateLastCommittedOffset(2L);
verify(coordinator, times(0)).updateLastCommittedOffset(5L);
}
}
@Test
void testUpdateLastWrittenOffsetAndUpdateLastCommittedOffsetNoRecordsRead() throws Exception {
TopicPartition tp = new TopicPartition("foo", 0);
ReplicaManager replicaManager = mock(ReplicaManager.class);
Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier = partition -> toJava(replicaManager.getLog(partition));
Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier = partition -> Optional.ofNullable((Long) replicaManager.getLogEndOffset(partition).get());
StringKeyValueDeserializer serde = new StringKeyValueDeserializer();
UnifiedLog log = mock(UnifiedLog.class);
CoordinatorPlayback<Tuple<String, String>> coordinator = mock(CoordinatorPlayback.class);
try (CoordinatorLoaderImpl<Tuple<String, String>> loader = new CoordinatorLoaderImpl<>(
Time.SYSTEM,
partitionLogSupplier,
partitionLogEndOffsetSupplier,
serde,
1000
)) {
when(replicaManager.getLog(tp)).thenReturn(toScala(Optional.of(log)));
when(log.logStartOffset()).thenReturn(0L);
when(log.highWatermark()).thenReturn(0L);
when(replicaManager.getLogEndOffset(tp)).thenReturn(toScala(Optional.of(0L)));
assertNotNull(loader.load(tp, coordinator).get(10, TimeUnit.SECONDS));
verify(coordinator, times(0)).updateLastWrittenOffset(anyLong());
verify(coordinator, times(0)).updateLastCommittedOffset(anyLong());
}
}
@Test
void testUpdateLastWrittenOffsetOnBatchLoadedWhileHighWatermarkAhead() throws Exception {
TopicPartition tp = new TopicPartition("foo", 0);
ReplicaManager replicaManager = mock(ReplicaManager.class);
Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier = partition -> toJava(replicaManager.getLog(partition));
Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier = partition -> Optional.ofNullable((Long) replicaManager.getLogEndOffset(partition).get());
StringKeyValueDeserializer serde = new StringKeyValueDeserializer();
UnifiedLog log = mock(UnifiedLog.class);
CoordinatorPlayback<Tuple<String, String>> coordinator = mock(CoordinatorPlayback.class);
try (CoordinatorLoaderImpl<Tuple<String, String>> loader = new CoordinatorLoaderImpl<>(
Time.SYSTEM,
partitionLogSupplier,
partitionLogEndOffsetSupplier,
serde,
1000
)) {
when(replicaManager.getLog(tp)).thenReturn(toScala(Optional.of(log)));
when(log.logStartOffset()).thenReturn(0L);
when(log.highWatermark()).thenReturn(5L, 7L, 7L);
when(replicaManager.getLogEndOffset(tp)).thenReturn(toScala(Optional.of(7L)));
FetchDataInfo readResult1 = logReadResult(0, Arrays.asList(
new SimpleRecord("k1".getBytes(), "v1".getBytes()),
new SimpleRecord("k2".getBytes(), "v2".getBytes())
));
when(log.read(0L, 1000, FetchIsolation.LOG_END, true))
.thenReturn(readResult1);
FetchDataInfo readResult2 = logReadResult(2, Arrays.asList(
new SimpleRecord("k3".getBytes(), "v3".getBytes()),
new SimpleRecord("k4".getBytes(), "v4".getBytes()),
new SimpleRecord("k5".getBytes(), "v5".getBytes())
));
when(log.read(2L, 1000, FetchIsolation.LOG_END, true))
.thenReturn(readResult2);
FetchDataInfo readResult3 = logReadResult(5, Arrays.asList(
new SimpleRecord("k6".getBytes(), "v6".getBytes()),
new SimpleRecord("k7".getBytes(), "v7".getBytes())
));
when(log.read(5L, 1000, FetchIsolation.LOG_END, true))
.thenReturn(readResult3);
assertNotNull(loader.load(tp, coordinator).get(10, TimeUnit.SECONDS));
verify(coordinator).replay(0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k1", "v1"));
verify(coordinator).replay(1L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k2", "v2"));
verify(coordinator).replay(2L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k3", "v3"));
verify(coordinator).replay(3L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k4", "v4"));
verify(coordinator).replay(4L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k5", "v5"));
verify(coordinator).replay(5L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k6", "v6"));
verify(coordinator).replay(6L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k7", "v7"));
verify(coordinator, times(0)).updateLastWrittenOffset(0L);
verify(coordinator, times(0)).updateLastWrittenOffset(2L);
verify(coordinator, times(0)).updateLastWrittenOffset(5L);
verify(coordinator, times(1)).updateLastWrittenOffset(7L);
verify(coordinator, times(0)).updateLastCommittedOffset(0L);
verify(coordinator, times(0)).updateLastCommittedOffset(2L);
verify(coordinator, times(0)).updateLastCommittedOffset(5L);
verify(coordinator, times(1)).updateLastCommittedOffset(7L);
}
}
@Test
void testPartitionGoesOfflineDuringLoad() throws Exception {
TopicPartition tp = new TopicPartition("foo", 0);
ReplicaManager replicaManager = mock(ReplicaManager.class);
Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier = partition -> toJava(replicaManager.getLog(partition));
Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier = partition -> Optional.ofNullable((Long) replicaManager.getLogEndOffset(partition).get());
StringKeyValueDeserializer serde = new StringKeyValueDeserializer();
UnifiedLog log = mock(UnifiedLog.class);
CoordinatorPlayback<Tuple<String, String>> coordinator = mock(CoordinatorPlayback.class);
try (CoordinatorLoaderImpl<Tuple<String, String>> loader = new CoordinatorLoaderImpl<>(
Time.SYSTEM,
partitionLogSupplier,
partitionLogEndOffsetSupplier,
serde,
1000
)) {
when(replicaManager.getLog(tp)).thenReturn(toScala(Optional.of(log)));
when(log.logStartOffset()).thenReturn(0L);
when(log.highWatermark()).thenReturn(0L);
when(replicaManager.getLogEndOffset(tp)).thenReturn(toScala(Optional.of(5L))).thenReturn(toScala(Optional.of(-1L)));
FetchDataInfo readResult1 = logReadResult(0, Arrays.asList(
new SimpleRecord("k1".getBytes(), "v1".getBytes()),
new SimpleRecord("k2".getBytes(), "v2".getBytes())
));
when(log.read(0L, 1000, FetchIsolation.LOG_END, true))
.thenReturn(readResult1);
FetchDataInfo readResult2 = logReadResult(2, Arrays.asList(
new SimpleRecord("k3".getBytes(), "v3".getBytes()),
new SimpleRecord("k4".getBytes(), "v4".getBytes()),
new SimpleRecord("k5".getBytes(), "v5".getBytes())
));
when(log.read(2L, 1000, FetchIsolation.LOG_END, true))
.thenReturn(readResult2);
assertFutureThrows(NotLeaderOrFollowerException.class, loader.load(tp, coordinator));
}
}
private FetchDataInfo logReadResult(long startOffset, List<SimpleRecord> records) throws IOException {
return logReadResult(startOffset, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, records);
}
private FetchDataInfo logReadResult(
long startOffset,
long producerId,
short producerEpoch,
List<SimpleRecord> records
) throws IOException {
FileRecords fileRecords = mock(FileRecords.class);
MemoryRecords memoryRecords;
if (producerId == RecordBatch.NO_PRODUCER_ID) {
memoryRecords = MemoryRecords.withRecords(
startOffset,
Compression.NONE,
records.toArray(new SimpleRecord[0])
);
} else {
memoryRecords = MemoryRecords.withTransactionalRecords(
startOffset,
Compression.NONE,
producerId,
producerEpoch,
0,
RecordBatch.NO_PARTITION_LEADER_EPOCH,
records.toArray(new SimpleRecord[0])
);
}
when(fileRecords.sizeInBytes()).thenReturn(memoryRecords.sizeInBytes());
doAnswer(invocation -> {
ByteBuffer buffer = invocation.getArgument(0, ByteBuffer.class);
buffer.put(memoryRecords.buffer().duplicate());
buffer.flip();
return null;
}).when(fileRecords).readInto(any(ByteBuffer.class), ArgumentMatchers.anyInt());
return new FetchDataInfo(new LogOffsetMetadata(startOffset), fileRecords);
}
private FetchDataInfo logReadResult(
long startOffset,
long producerId,
short producerEpoch,
ControlRecordType controlRecordType
) throws IOException {
FileRecords fileRecords = mock(FileRecords.class);
MemoryRecords memoryRecords = MemoryRecords.withEndTransactionMarker(
startOffset,
0L,
RecordBatch.NO_PARTITION_LEADER_EPOCH,
producerId,
producerEpoch,
new EndTransactionMarker(controlRecordType, 0)
);
when(fileRecords.sizeInBytes()).thenReturn(memoryRecords.sizeInBytes());
doAnswer(invocation -> {
ByteBuffer buffer = invocation.getArgument(0, ByteBuffer.class);
buffer.put(memoryRecords.buffer().duplicate());
buffer.flip();
return null;
}).when(fileRecords).readInto(any(ByteBuffer.class), ArgumentMatchers.anyInt());
return new FetchDataInfo(new LogOffsetMetadata(startOffset), fileRecords);
}
}

View File

@ -1,248 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.coordinator.group
import kafka.server.ReplicaManager
import kafka.utils.Logging
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.NotLeaderOrFollowerException
import org.apache.kafka.common.record.{ControlRecordType, FileRecords, MemoryRecords}
import org.apache.kafka.common.requests.TransactionResult
import org.apache.kafka.common.utils.Time
import org.apache.kafka.coordinator.common.runtime.CoordinatorLoader.LoadSummary
import org.apache.kafka.coordinator.common.runtime.Deserializer.UnknownRecordTypeException
import org.apache.kafka.coordinator.common.runtime.{CoordinatorLoader, CoordinatorPlayback, Deserializer}
import org.apache.kafka.server.storage.log.FetchIsolation
import org.apache.kafka.server.util.KafkaScheduler
import java.nio.ByteBuffer
import java.util.concurrent.CompletableFuture
import java.util.concurrent.atomic.AtomicBoolean
import scala.jdk.CollectionConverters._
/**
* Coordinator loader which reads records from a partition and replays them
* to a group coordinator.
*
* @param replicaManager The replica manager.
* @param deserializer The deserializer to use.
* @param loadBufferSize The load buffer size.
* @tparam T The record type.
*/
class CoordinatorLoaderImpl[T](
time: Time,
replicaManager: ReplicaManager,
deserializer: Deserializer[T],
loadBufferSize: Int
) extends CoordinatorLoader[T] with Logging {
private val isRunning = new AtomicBoolean(true)
private val scheduler = new KafkaScheduler(1)
scheduler.startup()
/**
* Loads the coordinator by reading all the records from the TopicPartition
* and applying them to the Replayable object.
*
* @param tp The TopicPartition to read from.
* @param coordinator The object to apply records to.
*/
override def load(
tp: TopicPartition,
coordinator: CoordinatorPlayback[T]
): CompletableFuture[LoadSummary] = {
val future = new CompletableFuture[LoadSummary]()
val startTimeMs = time.milliseconds()
val result = scheduler.scheduleOnce(s"Load coordinator from $tp",
() => doLoad(tp, coordinator, future, startTimeMs))
if (result.isCancelled) {
future.completeExceptionally(new RuntimeException("Coordinator loader is closed."))
}
future
}
private def doLoad(
tp: TopicPartition,
coordinator: CoordinatorPlayback[T],
future: CompletableFuture[LoadSummary],
startTimeMs: Long
): Unit = {
val schedulerQueueTimeMs = time.milliseconds() - startTimeMs
try {
replicaManager.getLog(tp) match {
case None =>
future.completeExceptionally(new NotLeaderOrFollowerException(
s"Could not load records from $tp because the log does not exist."))
case Some(log) =>
def logEndOffset: Long = replicaManager.getLogEndOffset(tp).getOrElse(-1L)
// Buffer may not be needed if records are read from memory.
var buffer = ByteBuffer.allocate(0)
// Loop breaks if leader changes at any time during the load, since logEndOffset is -1.
var currentOffset = log.logStartOffset
// Loop breaks if no records have been read, since the end of the log has been reached.
// This is to ensure that the loop breaks even if the current offset remains smaller than
// the log end offset but the log is empty. This could happen with compacted topics.
var readAtLeastOneRecord = true
var previousHighWatermark = -1L
var numRecords = 0L
var numBytes = 0L
while (currentOffset < logEndOffset && readAtLeastOneRecord && isRunning.get) {
val fetchDataInfo = log.read(currentOffset, loadBufferSize, FetchIsolation.LOG_END, true)
readAtLeastOneRecord = fetchDataInfo.records.sizeInBytes > 0
val memoryRecords = (fetchDataInfo.records: @unchecked) match {
case records: MemoryRecords =>
records
case fileRecords: FileRecords =>
val sizeInBytes = fileRecords.sizeInBytes
val bytesNeeded = Math.max(loadBufferSize, sizeInBytes)
// "minOneMessage = true in the above log.read() means that the buffer may need to
// be grown to ensure progress can be made.
if (buffer.capacity < bytesNeeded) {
if (loadBufferSize < bytesNeeded)
warn(s"Loaded metadata from $tp with buffer larger ($bytesNeeded bytes) than " +
s"configured buffer size ($loadBufferSize bytes).")
buffer = ByteBuffer.allocate(bytesNeeded)
} else {
buffer.clear()
}
fileRecords.readInto(buffer, 0)
MemoryRecords.readableRecords(buffer)
}
memoryRecords.batches.forEach { batch =>
if (batch.isControlBatch) {
batch.asScala.foreach { record =>
val controlRecord = ControlRecordType.parse(record.key)
if (controlRecord == ControlRecordType.COMMIT) {
if (isTraceEnabled) {
trace(s"Replaying end transaction marker from $tp at offset ${record.offset} to commit transaction " +
s"with producer id ${batch.producerId} and producer epoch ${batch.producerEpoch}.")
}
coordinator.replayEndTransactionMarker(
batch.producerId,
batch.producerEpoch,
TransactionResult.COMMIT
)
} else if (controlRecord == ControlRecordType.ABORT) {
if (isTraceEnabled) {
trace(s"Replaying end transaction marker from $tp at offset ${record.offset} to abort transaction " +
s"with producer id ${batch.producerId} and producer epoch ${batch.producerEpoch}.")
}
coordinator.replayEndTransactionMarker(
batch.producerId,
batch.producerEpoch,
TransactionResult.ABORT
)
}
}
} else {
batch.asScala.foreach { record =>
numRecords = numRecords + 1
val coordinatorRecordOpt = {
try {
Some(deserializer.deserialize(record.key, record.value))
} catch {
case ex: UnknownRecordTypeException =>
warn(s"Unknown record type ${ex.unknownType} while loading offsets and group metadata " +
s"from $tp. Ignoring it. It could be a left over from an aborted upgrade.")
None
case ex: RuntimeException =>
val msg = s"Deserializing record $record from $tp failed due to: ${ex.getMessage}"
error(s"$msg.")
throw new RuntimeException(msg, ex)
}
}
coordinatorRecordOpt.foreach { coordinatorRecord =>
try {
if (isTraceEnabled) {
trace(s"Replaying record $coordinatorRecord from $tp at offset ${record.offset()} " +
s"with producer id ${batch.producerId} and producer epoch ${batch.producerEpoch}.")
}
coordinator.replay(
record.offset(),
batch.producerId,
batch.producerEpoch,
coordinatorRecord
)
} catch {
case ex: RuntimeException =>
val msg = s"Replaying record $coordinatorRecord from $tp at offset ${record.offset()} " +
s"with producer id ${batch.producerId} and producer epoch ${batch.producerEpoch} " +
s"failed due to: ${ex.getMessage}"
error(s"$msg.")
throw new RuntimeException(msg, ex)
}
}
}
}
// Note that the high watermark can be greater than the current offset but as we load more records
// the current offset will eventually surpass the high watermark. Also note that the high watermark
// will continue to advance while loading.
currentOffset = batch.nextOffset
val currentHighWatermark = log.highWatermark
if (currentOffset >= currentHighWatermark) {
coordinator.updateLastWrittenOffset(currentOffset)
if (currentHighWatermark > previousHighWatermark) {
coordinator.updateLastCommittedOffset(currentHighWatermark)
previousHighWatermark = currentHighWatermark
}
}
}
numBytes = numBytes + memoryRecords.sizeInBytes()
}
val endTimeMs = time.milliseconds()
if (logEndOffset == -1L) {
future.completeExceptionally(new NotLeaderOrFollowerException(
s"Stopped loading records from $tp because the partition is not online or is no longer the leader."
))
} else if (isRunning.get) {
future.complete(new LoadSummary(startTimeMs, endTimeMs, schedulerQueueTimeMs, numRecords, numBytes))
} else {
future.completeExceptionally(new RuntimeException("Coordinator loader is closed."))
}
}
} catch {
case ex: Throwable =>
future.completeExceptionally(ex)
}
}
/**
* Closes the loader.
*/
override def close(): Unit = {
if (!isRunning.compareAndSet(true, false)) {
warn("Coordinator loader is already shutting down.")
return
}
scheduler.shutdown()
}
}

View File

@ -17,7 +17,7 @@
package kafka.server
import kafka.coordinator.group.{CoordinatorLoaderImpl, CoordinatorPartitionWriter}
import kafka.coordinator.group.CoordinatorPartitionWriter
import kafka.coordinator.transaction.TransactionCoordinator
import kafka.log.LogManager
import kafka.network.SocketServer
@ -34,7 +34,7 @@ import org.apache.kafka.common.security.scram.internals.ScramMechanism
import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache
import org.apache.kafka.common.utils.{LogContext, Time, Utils}
import org.apache.kafka.common.{ClusterResource, TopicPartition, Uuid}
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord
import org.apache.kafka.coordinator.common.runtime.{CoordinatorLoaderImpl, CoordinatorRecord}
import org.apache.kafka.coordinator.group.metrics.{GroupCoordinatorMetrics, GroupCoordinatorRuntimeMetrics}
import org.apache.kafka.coordinator.group.{GroupConfigManager, GroupCoordinator, GroupCoordinatorRecordSerde, GroupCoordinatorService}
import org.apache.kafka.coordinator.share.metrics.{ShareCoordinatorMetrics, ShareCoordinatorRuntimeMetrics}
@ -611,7 +611,8 @@ class BrokerServer(
)
val loader = new CoordinatorLoaderImpl[CoordinatorRecord](
time,
replicaManager,
tp => replicaManager.getLog(tp).toJava,
tp => replicaManager.getLogEndOffset(tp).map(Long.box).toJava,
serde,
config.groupCoordinatorConfig.offsetsLoadBufferSize
)
@ -641,7 +642,8 @@ class BrokerServer(
val serde = new ShareCoordinatorRecordSerde
val loader = new CoordinatorLoaderImpl[CoordinatorRecord](
time,
replicaManager,
tp => replicaManager.getLog(tp).toJava,
tp => replicaManager.getLogEndOffset(tp).map(Long.box).toJava,
serde,
config.shareCoordinatorConfig.shareCoordinatorLoadBufferSize()
)

View File

@ -1,632 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.coordinator.group
import kafka.server.ReplicaManager
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.compress.Compression
import org.apache.kafka.common.errors.NotLeaderOrFollowerException
import org.apache.kafka.common.record.{ControlRecordType, EndTransactionMarker, FileRecords, MemoryRecords, RecordBatch, SimpleRecord}
import org.apache.kafka.common.requests.TransactionResult
import org.apache.kafka.common.utils.{MockTime, Time}
import org.apache.kafka.coordinator.common.runtime.Deserializer.UnknownRecordTypeException
import org.apache.kafka.coordinator.common.runtime.{CoordinatorPlayback, Deserializer}
import org.apache.kafka.server.storage.log.FetchIsolation
import org.apache.kafka.storage.internals.log.{FetchDataInfo, LogOffsetMetadata, UnifiedLog}
import org.apache.kafka.test.TestUtils.assertFutureThrows
import org.junit.jupiter.api.Assertions.{assertEquals, assertNotNull}
import org.junit.jupiter.api.{Test, Timeout}
import org.mockito.ArgumentMatchers.anyLong
import org.mockito.{ArgumentCaptor, ArgumentMatchers}
import org.mockito.Mockito.{mock, times, verify, when}
import org.mockito.invocation.InvocationOnMock
import java.nio.ByteBuffer
import java.nio.charset.Charset
import java.util.concurrent.{CountDownLatch, TimeUnit}
import scala.util.Using
class StringKeyValueDeserializer extends Deserializer[(String, String)] {
override def deserialize(key: ByteBuffer, value: ByteBuffer): (String, String) = {
(
Charset.defaultCharset().decode(key).toString,
Charset.defaultCharset().decode(value).toString
)
}
}
@Timeout(60)
class CoordinatorLoaderImplTest {
@Test
def testNonexistentPartition(): Unit = {
val tp = new TopicPartition("foo", 0)
val replicaManager = mock(classOf[ReplicaManager])
val serde = mock(classOf[Deserializer[(String, String)]])
val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
Using.resource(new CoordinatorLoaderImpl[(String, String)](
time = Time.SYSTEM,
replicaManager = replicaManager,
deserializer = serde,
loadBufferSize = 1000
)) { loader =>
when(replicaManager.getLog(tp)).thenReturn(None)
val result = loader.load(tp, coordinator)
assertFutureThrows(classOf[NotLeaderOrFollowerException], result)
}
}
@Test
def testLoadingIsRejectedWhenClosed(): Unit = {
val tp = new TopicPartition("foo", 0)
val replicaManager = mock(classOf[ReplicaManager])
val serde = mock(classOf[Deserializer[(String, String)]])
val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
Using.resource(new CoordinatorLoaderImpl[(String, String)](
time = Time.SYSTEM,
replicaManager = replicaManager,
deserializer = serde,
loadBufferSize = 1000
)) { loader =>
loader.close()
val result = loader.load(tp, coordinator)
assertFutureThrows(classOf[RuntimeException], result)
}
}
@Test
def testLoading(): Unit = {
val tp = new TopicPartition("foo", 0)
val replicaManager = mock(classOf[ReplicaManager])
val serde = new StringKeyValueDeserializer
val log = mock(classOf[UnifiedLog])
val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
Using.resource(new CoordinatorLoaderImpl[(String, String)](
time = Time.SYSTEM,
replicaManager = replicaManager,
deserializer = serde,
loadBufferSize = 1000
)) { loader =>
when(replicaManager.getLog(tp)).thenReturn(Some(log))
when(log.logStartOffset).thenReturn(0L)
when(replicaManager.getLogEndOffset(tp)).thenReturn(Some(9L))
when(log.highWatermark).thenReturn(0L)
val readResult1 = logReadResult(startOffset = 0, records = Seq(
new SimpleRecord("k1".getBytes, "v1".getBytes),
new SimpleRecord("k2".getBytes, "v2".getBytes)
))
when(log.read(0L, 1000, FetchIsolation.LOG_END, true
)).thenReturn(readResult1)
val readResult2 = logReadResult(startOffset = 2, records = Seq(
new SimpleRecord("k3".getBytes, "v3".getBytes),
new SimpleRecord("k4".getBytes, "v4".getBytes),
new SimpleRecord("k5".getBytes, "v5".getBytes)
))
when(log.read(2L, 1000, FetchIsolation.LOG_END, true
)).thenReturn(readResult2)
val readResult3 = logReadResult(startOffset = 5, producerId = 100L, producerEpoch = 5, records = Seq(
new SimpleRecord("k6".getBytes, "v6".getBytes),
new SimpleRecord("k7".getBytes, "v7".getBytes)
))
when(log.read(5L, 1000, FetchIsolation.LOG_END, true
)).thenReturn(readResult3)
val readResult4 = logReadResult(
startOffset = 7,
producerId = 100L,
producerEpoch = 5,
controlRecordType = ControlRecordType.COMMIT
)
when(log.read(7L, 1000, FetchIsolation.LOG_END, true
)).thenReturn(readResult4)
val readResult5 = logReadResult(
startOffset = 8,
producerId = 500L,
producerEpoch = 10,
controlRecordType = ControlRecordType.ABORT
)
when(log.read(8L, 1000, FetchIsolation.LOG_END, true
)).thenReturn(readResult5)
assertNotNull(loader.load(tp, coordinator).get(10, TimeUnit.SECONDS))
verify(coordinator).replay(0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k1", "v1"))
verify(coordinator).replay(1L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k2", "v2"))
verify(coordinator).replay(2L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k3", "v3"))
verify(coordinator).replay(3L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k4", "v4"))
verify(coordinator).replay(4L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k5", "v5"))
verify(coordinator).replay(5L, 100L, 5.toShort, ("k6", "v6"))
verify(coordinator).replay(6L, 100L, 5.toShort, ("k7", "v7"))
verify(coordinator).replayEndTransactionMarker(100L, 5, TransactionResult.COMMIT)
verify(coordinator).replayEndTransactionMarker(500L, 10, TransactionResult.ABORT)
verify(coordinator).updateLastWrittenOffset(2)
verify(coordinator).updateLastWrittenOffset(5)
verify(coordinator).updateLastWrittenOffset(7)
verify(coordinator).updateLastWrittenOffset(8)
verify(coordinator).updateLastCommittedOffset(0)
}
}
@Test
def testLoadingStoppedWhenClosed(): Unit = {
val tp = new TopicPartition("foo", 0)
val replicaManager = mock(classOf[ReplicaManager])
val serde = new StringKeyValueDeserializer
val log = mock(classOf[UnifiedLog])
val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
Using.resource(new CoordinatorLoaderImpl[(String, String)](
time = Time.SYSTEM,
replicaManager = replicaManager,
deserializer = serde,
loadBufferSize = 1000
)) { loader =>
when(replicaManager.getLog(tp)).thenReturn(Some(log))
when(log.logStartOffset).thenReturn(0L)
when(replicaManager.getLogEndOffset(tp)).thenReturn(Some(100L))
val readResult = logReadResult(startOffset = 0, records = Seq(
new SimpleRecord("k1".getBytes, "v1".getBytes),
new SimpleRecord("k2".getBytes, "v2".getBytes)
))
val latch = new CountDownLatch(1)
when(log.read(
ArgumentMatchers.anyLong(),
ArgumentMatchers.eq(1000),
ArgumentMatchers.eq(FetchIsolation.LOG_END),
ArgumentMatchers.eq(true)
)).thenAnswer { _ =>
latch.countDown()
readResult
}
val result = loader.load(tp, coordinator)
latch.await(10, TimeUnit.SECONDS)
loader.close()
val ex = assertFutureThrows(classOf[RuntimeException], result)
assertEquals("Coordinator loader is closed.", ex.getMessage)
}
}
@Test
def testUnknownRecordTypeAreIgnored(): Unit = {
val tp = new TopicPartition("foo", 0)
val replicaManager = mock(classOf[ReplicaManager])
val serde = mock(classOf[StringKeyValueDeserializer])
val log = mock(classOf[UnifiedLog])
val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
Using.resource(new CoordinatorLoaderImpl[(String, String)](
time = Time.SYSTEM,
replicaManager = replicaManager,
deserializer = serde,
loadBufferSize = 1000
)) { loader =>
when(replicaManager.getLog(tp)).thenReturn(Some(log))
when(log.logStartOffset).thenReturn(0L)
when(replicaManager.getLogEndOffset(tp)).thenReturn(Some(2L))
val readResult = logReadResult(startOffset = 0, records = Seq(
new SimpleRecord("k1".getBytes, "v1".getBytes),
new SimpleRecord("k2".getBytes, "v2".getBytes)
))
when(log.read(0L, 1000, FetchIsolation.LOG_END, true
)).thenReturn(readResult)
when(serde.deserialize(ArgumentMatchers.any(), ArgumentMatchers.any()))
.thenThrow(new UnknownRecordTypeException(1))
.thenReturn(("k2", "v2"))
loader.load(tp, coordinator).get(10, TimeUnit.SECONDS)
verify(coordinator).replay(1L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k2", "v2"))
}
}
@Test
def testDeserializationErrorFailsTheLoading(): Unit = {
val tp = new TopicPartition("foo", 0)
val replicaManager = mock(classOf[ReplicaManager])
val serde = mock(classOf[StringKeyValueDeserializer])
val log = mock(classOf[UnifiedLog])
val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
Using.resource(new CoordinatorLoaderImpl[(String, String)](
time = Time.SYSTEM,
replicaManager = replicaManager,
deserializer = serde,
loadBufferSize = 1000
)) { loader =>
when(replicaManager.getLog(tp)).thenReturn(Some(log))
when(log.logStartOffset).thenReturn(0L)
when(replicaManager.getLogEndOffset(tp)).thenReturn(Some(2L))
val readResult = logReadResult(startOffset = 0, records = Seq(
new SimpleRecord("k1".getBytes, "v1".getBytes),
new SimpleRecord("k2".getBytes, "v2".getBytes)
))
when(log.read(0L, 1000, FetchIsolation.LOG_END, true
)).thenReturn(readResult)
when(serde.deserialize(ArgumentMatchers.any(), ArgumentMatchers.any()))
.thenThrow(new RuntimeException("Error!"))
val ex = assertFutureThrows(classOf[RuntimeException], loader.load(tp, coordinator))
assertEquals(s"Deserializing record DefaultRecord(offset=0, timestamp=-1, key=2 bytes, value=2 bytes) from $tp failed due to: Error!", ex.getMessage)
}
}
@Test
def testLoadGroupAndOffsetsWithCorruptedLog(): Unit = {
// Simulate a case where startOffset < endOffset but log is empty. This could theoretically happen
// when all the records are expired and the active segment is truncated or when the partition
// is accidentally corrupted.
val tp = new TopicPartition("foo", 0)
val replicaManager = mock(classOf[ReplicaManager])
val serde = mock(classOf[StringKeyValueDeserializer])
val log = mock(classOf[UnifiedLog])
val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
Using.resource(new CoordinatorLoaderImpl[(String, String)](
time = Time.SYSTEM,
replicaManager = replicaManager,
deserializer = serde,
loadBufferSize = 1000
)) { loader =>
when(replicaManager.getLog(tp)).thenReturn(Some(log))
when(log.logStartOffset).thenReturn(0L)
when(replicaManager.getLogEndOffset(tp)).thenReturn(Some(10L))
val readResult = logReadResult(startOffset = 0, records = Seq())
when(log.read(0L, 1000, FetchIsolation.LOG_END, true
)).thenReturn(readResult)
assertNotNull(loader.load(tp, coordinator).get(10, TimeUnit.SECONDS))
}
}
@Test
def testLoadSummary(): Unit = {
val tp = new TopicPartition("foo", 0)
val replicaManager = mock(classOf[ReplicaManager])
val serde = new StringKeyValueDeserializer
val log = mock(classOf[UnifiedLog])
val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
val time = new MockTime()
Using.resource(new CoordinatorLoaderImpl[(String, String)](
time,
replicaManager = replicaManager,
deserializer = serde,
loadBufferSize = 1000
)) { loader =>
val startTimeMs = time.milliseconds()
when(replicaManager.getLog(tp)).thenReturn(Some(log))
when(log.logStartOffset).thenReturn(0L)
when(replicaManager.getLogEndOffset(tp)).thenReturn(Some(5L))
val readResult1 = logReadResult(startOffset = 0, records = Seq(
new SimpleRecord("k1".getBytes, "v1".getBytes),
new SimpleRecord("k2".getBytes, "v2".getBytes)
))
when(log.read(0L, 1000, FetchIsolation.LOG_END, true
)).thenAnswer((_: InvocationOnMock) => {
time.sleep(1000)
readResult1
})
val readResult2 = logReadResult(startOffset = 2, records = Seq(
new SimpleRecord("k3".getBytes, "v3".getBytes),
new SimpleRecord("k4".getBytes, "v4".getBytes),
new SimpleRecord("k5".getBytes, "v5".getBytes)
))
when(log.read(2L, 1000, FetchIsolation.LOG_END, true
)).thenReturn(readResult2)
val summary = loader.load(tp, coordinator).get(10, TimeUnit.SECONDS)
assertEquals(startTimeMs, summary.startTimeMs())
assertEquals(startTimeMs + 1000, summary.endTimeMs())
assertEquals(5, summary.numRecords())
assertEquals(readResult1.records.sizeInBytes() + readResult2.records.sizeInBytes(), summary.numBytes())
}
}
@Test
def testUpdateLastWrittenOffsetOnBatchLoaded(): Unit = {
val tp = new TopicPartition("foo", 0)
val replicaManager = mock(classOf[ReplicaManager])
val serde = new StringKeyValueDeserializer
val log = mock(classOf[UnifiedLog])
val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
Using.resource(new CoordinatorLoaderImpl[(String, String)](
time = Time.SYSTEM,
replicaManager = replicaManager,
deserializer = serde,
loadBufferSize = 1000
)) { loader =>
when(replicaManager.getLog(tp)).thenReturn(Some(log))
when(log.logStartOffset).thenReturn(0L)
when(log.highWatermark).thenReturn(0L).thenReturn(0L).thenReturn(2L)
when(replicaManager.getLogEndOffset(tp)).thenReturn(Some(7L))
val readResult1 = logReadResult(startOffset = 0, records = Seq(
new SimpleRecord("k1".getBytes, "v1".getBytes),
new SimpleRecord("k2".getBytes, "v2".getBytes)
))
when(log.read(0L, 1000, FetchIsolation.LOG_END, true
)).thenReturn(readResult1)
val readResult2 = logReadResult(startOffset = 2, records = Seq(
new SimpleRecord("k3".getBytes, "v3".getBytes),
new SimpleRecord("k4".getBytes, "v4".getBytes),
new SimpleRecord("k5".getBytes, "v5".getBytes)
))
when(log.read(2L, 1000, FetchIsolation.LOG_END, true
)).thenReturn(readResult2)
val readResult3 = logReadResult(startOffset = 5, records = Seq(
new SimpleRecord("k6".getBytes, "v6".getBytes),
new SimpleRecord("k7".getBytes, "v7".getBytes)
))
when(log.read(5L, 1000, FetchIsolation.LOG_END, true
)).thenReturn(readResult3)
assertNotNull(loader.load(tp, coordinator).get(10, TimeUnit.SECONDS))
verify(coordinator).replay(0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k1", "v1"))
verify(coordinator).replay(1L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k2", "v2"))
verify(coordinator).replay(2L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k3", "v3"))
verify(coordinator).replay(3L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k4", "v4"))
verify(coordinator).replay(4L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k5", "v5"))
verify(coordinator).replay(5L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k6", "v6"))
verify(coordinator).replay(6L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k7", "v7"))
verify(coordinator, times(0)).updateLastWrittenOffset(0)
verify(coordinator, times(1)).updateLastWrittenOffset(2)
verify(coordinator, times(1)).updateLastWrittenOffset(5)
verify(coordinator, times(1)).updateLastWrittenOffset(7)
verify(coordinator, times(1)).updateLastCommittedOffset(0)
verify(coordinator, times(1)).updateLastCommittedOffset(2)
verify(coordinator, times(0)).updateLastCommittedOffset(5)
}
}
@Test
def testUpdateLastWrittenOffsetAndUpdateLastCommittedOffsetNoRecordsRead(): Unit = {
val tp = new TopicPartition("foo", 0)
val replicaManager = mock(classOf[ReplicaManager])
val serde = new StringKeyValueDeserializer
val log = mock(classOf[UnifiedLog])
val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
Using.resource(new CoordinatorLoaderImpl[(String, String)](
time = Time.SYSTEM,
replicaManager = replicaManager,
deserializer = serde,
loadBufferSize = 1000
)) { loader =>
when(replicaManager.getLog(tp)).thenReturn(Some(log))
when(log.logStartOffset).thenReturn(0L)
when(log.highWatermark).thenReturn(0L)
when(replicaManager.getLogEndOffset(tp)).thenReturn(Some(0L))
assertNotNull(loader.load(tp, coordinator).get(10, TimeUnit.SECONDS))
verify(coordinator, times(0)).updateLastWrittenOffset(anyLong())
verify(coordinator, times(0)).updateLastCommittedOffset(anyLong())
}
}
@Test
def testUpdateLastWrittenOffsetOnBatchLoadedWhileHighWatermarkAhead(): Unit = {
val tp = new TopicPartition("foo", 0)
val replicaManager = mock(classOf[ReplicaManager])
val serde = new StringKeyValueDeserializer
val log = mock(classOf[UnifiedLog])
val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
Using.resource(new CoordinatorLoaderImpl[(String, String)](
time = Time.SYSTEM,
replicaManager = replicaManager,
deserializer = serde,
loadBufferSize = 1000
)) { loader =>
when(replicaManager.getLog(tp)).thenReturn(Some(log))
when(log.logStartOffset).thenReturn(0L)
when(log.highWatermark).thenReturn(5L).thenReturn(7L).thenReturn(7L)
when(replicaManager.getLogEndOffset(tp)).thenReturn(Some(7L))
val readResult1 = logReadResult(startOffset = 0, records = Seq(
new SimpleRecord("k1".getBytes, "v1".getBytes),
new SimpleRecord("k2".getBytes, "v2".getBytes)
))
when(log.read(0L, 1000, FetchIsolation.LOG_END, true
)).thenReturn(readResult1)
val readResult2 = logReadResult(startOffset = 2, records = Seq(
new SimpleRecord("k3".getBytes, "v3".getBytes),
new SimpleRecord("k4".getBytes, "v4".getBytes),
new SimpleRecord("k5".getBytes, "v5".getBytes)
))
when(log.read(2L, 1000, FetchIsolation.LOG_END, true
)).thenReturn(readResult2)
val readResult3 = logReadResult(startOffset = 5, records = Seq(
new SimpleRecord("k6".getBytes, "v6".getBytes),
new SimpleRecord("k7".getBytes, "v7".getBytes)
))
when(log.read(5L, 1000, FetchIsolation.LOG_END, true
)).thenReturn(readResult3)
assertNotNull(loader.load(tp, coordinator).get(10, TimeUnit.SECONDS))
verify(coordinator).replay(0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k1", "v1"))
verify(coordinator).replay(1L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k2", "v2"))
verify(coordinator).replay(2L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k3", "v3"))
verify(coordinator).replay(3L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k4", "v4"))
verify(coordinator).replay(4L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k5", "v5"))
verify(coordinator).replay(5L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k6", "v6"))
verify(coordinator).replay(6L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k7", "v7"))
verify(coordinator, times(0)).updateLastWrittenOffset(0)
verify(coordinator, times(0)).updateLastWrittenOffset(2)
verify(coordinator, times(0)).updateLastWrittenOffset(5)
verify(coordinator, times(1)).updateLastWrittenOffset(7)
verify(coordinator, times(0)).updateLastCommittedOffset(0)
verify(coordinator, times(0)).updateLastCommittedOffset(2)
verify(coordinator, times(0)).updateLastCommittedOffset(5)
verify(coordinator, times(1)).updateLastCommittedOffset(7)
}
}
@Test
def testPartitionGoesOfflineDuringLoad(): Unit = {
val tp = new TopicPartition("foo", 0)
val replicaManager = mock(classOf[ReplicaManager])
val serde = new StringKeyValueDeserializer
val log = mock(classOf[UnifiedLog])
val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
Using.resource(new CoordinatorLoaderImpl[(String, String)](
time = Time.SYSTEM,
replicaManager = replicaManager,
deserializer = serde,
loadBufferSize = 1000
)) { loader =>
when(replicaManager.getLog(tp)).thenReturn(Some(log))
when(log.logStartOffset).thenReturn(0L)
when(log.highWatermark).thenReturn(0L)
when(replicaManager.getLogEndOffset(tp)).thenReturn(Some(5L)).thenReturn(Some(-1L))
val readResult1 = logReadResult(startOffset = 0, records = Seq(
new SimpleRecord("k1".getBytes, "v1".getBytes),
new SimpleRecord("k2".getBytes, "v2".getBytes)
))
when(log.read(0L, 1000, FetchIsolation.LOG_END, true
)).thenReturn(readResult1)
val readResult2 = logReadResult(startOffset = 2, records = Seq(
new SimpleRecord("k3".getBytes, "v3".getBytes),
new SimpleRecord("k4".getBytes, "v4".getBytes),
new SimpleRecord("k5".getBytes, "v5".getBytes)
))
when(log.read(2L, 1000, FetchIsolation.LOG_END, true
)).thenReturn(readResult2)
assertFutureThrows(classOf[NotLeaderOrFollowerException], loader.load(tp, coordinator))
}
}
private def logReadResult(
startOffset: Long,
producerId: Long = RecordBatch.NO_PRODUCER_ID,
producerEpoch: Short = RecordBatch.NO_PRODUCER_EPOCH,
records: Seq[SimpleRecord]
): FetchDataInfo = {
val fileRecords = mock(classOf[FileRecords])
val memoryRecords = if (producerId == RecordBatch.NO_PRODUCER_ID) {
MemoryRecords.withRecords(
startOffset,
Compression.NONE,
records: _*
)
} else {
MemoryRecords.withTransactionalRecords(
startOffset,
Compression.NONE,
producerId,
producerEpoch,
0,
RecordBatch.NO_PARTITION_LEADER_EPOCH,
records: _*
)
}
when(fileRecords.sizeInBytes).thenReturn(memoryRecords.sizeInBytes)
val bufferCapture: ArgumentCaptor[ByteBuffer] = ArgumentCaptor.forClass(classOf[ByteBuffer])
when(fileRecords.readInto(
bufferCapture.capture(),
ArgumentMatchers.anyInt())
).thenAnswer { _ =>
val buffer = bufferCapture.getValue
buffer.put(memoryRecords.buffer.duplicate)
buffer.flip()
}
new FetchDataInfo(new LogOffsetMetadata(startOffset), fileRecords)
}
private def logReadResult(
startOffset: Long,
producerId: Long,
producerEpoch: Short,
controlRecordType: ControlRecordType
): FetchDataInfo = {
val fileRecords = mock(classOf[FileRecords])
val memoryRecords = MemoryRecords.withEndTransactionMarker(
startOffset,
0L,
RecordBatch.NO_PARTITION_LEADER_EPOCH,
producerId,
producerEpoch,
new EndTransactionMarker(controlRecordType, 0)
)
when(fileRecords.sizeInBytes).thenReturn(memoryRecords.sizeInBytes)
val bufferCapture: ArgumentCaptor[ByteBuffer] = ArgumentCaptor.forClass(classOf[ByteBuffer])
when(fileRecords.readInto(
bufferCapture.capture(),
ArgumentMatchers.anyInt())
).thenAnswer { _ =>
val buffer = bufferCapture.getValue
buffer.put(memoryRecords.buffer.duplicate)
buffer.flip()
}
new FetchDataInfo(new LogOffsetMetadata(startOffset), fileRecords)
}
}