KAFKA-15910: New group coordinator needs to generate snapshots while loading (#14849)

After the new coordinator loads a __consumer_offsets partition, it logs the following exception when making a read operation (fetch/list groups, etc):

 ```
java.lang.RuntimeException: No in-memory snapshot for epoch 740745. Snapshot epochs are:
at org.apache.kafka.timeline.SnapshotRegistry.getSnapshot(SnapshotRegistry.java:178)
at org.apache.kafka.timeline.SnapshottableHashTable.snapshottableIterator(SnapshottableHashTable.java:407)
at org.apache.kafka.timeline.TimelineHashMap$ValueIterator.<init>(TimelineHashMap.java:283)
at org.apache.kafka.timeline.TimelineHashMap$Values.iterator(TimelineHashMap.java:271)
```
 
This happens because we don't have a snapshot at the last updated high watermark after loading. We cannot generate a snapshot at the high watermark after loading all batches because it may contain records that have not yet been committed. We also don't know where the high watermark will advance up to so we need to generate a snapshot for each offset the loader observes to be greater than the current high watermark. Then once we add the high watermark listener and update the high watermark we can delete all of the older snapshots. 

Reviewers: David Jacot <djacot@confluent.io>
This commit is contained in:
Jeff Kim 2023-12-06 11:38:05 -05:00 committed by GitHub
parent 841fa2d433
commit b888fa1ec9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 771 additions and 199 deletions

View File

@ -96,6 +96,7 @@ class CoordinatorLoaderImpl[T](
// the log end offset but the log is empty. This could happen with compacted topics.
var readAtLeastOneRecord = true
var previousHighWatermark = -1L
var numRecords = 0
var numBytes = 0
while (currentOffset < logEndOffset && readAtLeastOneRecord && isRunning.get) {
@ -152,7 +153,19 @@ class CoordinatorLoaderImpl[T](
}
}
// Note that the high watermark can be greater than the current offset but as we load more records
// the current offset will eventually surpass the high watermark. Also note that the high watermark
// will continue to advance while loading.
currentOffset = batch.nextOffset
val currentHighWatermark = log.highWatermark
if (currentOffset >= currentHighWatermark) {
coordinator.updateLastWrittenOffset(currentOffset)
}
if (currentHighWatermark > previousHighWatermark) {
coordinator.updateLastCommittedOffset(currentHighWatermark)
previousHighWatermark = currentHighWatermark
}
}
numBytes = numBytes + memoryRecords.sizeInBytes()
}

View File

@ -29,8 +29,9 @@ import org.apache.kafka.storage.internals.log.{FetchDataInfo, FetchIsolation, Lo
import org.apache.kafka.test.TestUtils.assertFutureThrows
import org.junit.jupiter.api.Assertions.{assertEquals, assertNotNull}
import org.junit.jupiter.api.{Test, Timeout}
import org.mockito.ArgumentMatchers.anyLong
import org.mockito.{ArgumentCaptor, ArgumentMatchers}
import org.mockito.Mockito.{mock, verify, when}
import org.mockito.Mockito.{mock, times, verify, when}
import org.mockito.invocation.InvocationOnMock
import java.nio.ByteBuffer
@ -105,6 +106,7 @@ class CoordinatorLoaderImplTest {
when(replicaManager.getLog(tp)).thenReturn(Some(log))
when(log.logStartOffset).thenReturn(0L)
when(replicaManager.getLogEndOffset(tp)).thenReturn(Some(7L))
when(log.highWatermark).thenReturn(0L)
val readResult1 = logReadResult(startOffset = 0, records = Seq(
new SimpleRecord("k1".getBytes, "v1".getBytes),
@ -152,6 +154,9 @@ class CoordinatorLoaderImplTest {
verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k5", "v5"))
verify(coordinator).replay(100L, 5.toShort, ("k6", "v6"))
verify(coordinator).replay(100L, 5.toShort, ("k7", "v7"))
verify(coordinator).updateLastWrittenOffset(2)
verify(coordinator).updateLastWrittenOffset(5)
verify(coordinator).updateLastCommittedOffset(0)
}
}
@ -366,6 +371,107 @@ class CoordinatorLoaderImplTest {
}
}
@Test
def testUpdateLastWrittenOffsetOnBatchLoaded(): Unit = {
val tp = new TopicPartition("foo", 0)
val replicaManager = mock(classOf[ReplicaManager])
val serde = new StringKeyValueDeserializer
val log = mock(classOf[UnifiedLog])
val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
TestUtils.resource(new CoordinatorLoaderImpl[(String, String)](
time = Time.SYSTEM,
replicaManager = replicaManager,
deserializer = serde,
loadBufferSize = 1000
)) { loader =>
when(replicaManager.getLog(tp)).thenReturn(Some(log))
when(log.logStartOffset).thenReturn(0L)
when(log.highWatermark).thenReturn(0L).thenReturn(0L).thenReturn(2L)
when(replicaManager.getLogEndOffset(tp)).thenReturn(Some(7L))
val readResult1 = logReadResult(startOffset = 0, records = Seq(
new SimpleRecord("k1".getBytes, "v1".getBytes),
new SimpleRecord("k2".getBytes, "v2".getBytes)
))
when(log.read(
startOffset = 0L,
maxLength = 1000,
isolation = FetchIsolation.LOG_END,
minOneMessage = true
)).thenReturn(readResult1)
val readResult2 = logReadResult(startOffset = 2, records = Seq(
new SimpleRecord("k3".getBytes, "v3".getBytes),
new SimpleRecord("k4".getBytes, "v4".getBytes),
new SimpleRecord("k5".getBytes, "v5".getBytes)
))
when(log.read(
startOffset = 2L,
maxLength = 1000,
isolation = FetchIsolation.LOG_END,
minOneMessage = true
)).thenReturn(readResult2)
val readResult3 = logReadResult(startOffset = 5, records = Seq(
new SimpleRecord("k6".getBytes, "v6".getBytes),
new SimpleRecord("k7".getBytes, "v7".getBytes)
))
when(log.read(
startOffset = 5L,
maxLength = 1000,
isolation = FetchIsolation.LOG_END,
minOneMessage = true
)).thenReturn(readResult3)
assertNotNull(loader.load(tp, coordinator).get(10, TimeUnit.SECONDS))
verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k1", "v1"))
verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k2", "v2"))
verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k3", "v3"))
verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k4", "v4"))
verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k5", "v5"))
verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k6", "v6"))
verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k7", "v7"))
verify(coordinator, times(0)).updateLastWrittenOffset(0)
verify(coordinator, times(1)).updateLastWrittenOffset(2)
verify(coordinator, times(1)).updateLastWrittenOffset(5)
verify(coordinator, times(1)).updateLastWrittenOffset(7)
verify(coordinator, times(1)).updateLastCommittedOffset(0)
verify(coordinator, times(1)).updateLastCommittedOffset(2)
verify(coordinator, times(0)).updateLastCommittedOffset(5)
}
}
@Test
def testUpdateLastWrittenOffsetAndUpdateLastCommittedOffsetNoRecordsRead(): Unit = {
val tp = new TopicPartition("foo", 0)
val replicaManager = mock(classOf[ReplicaManager])
val serde = new StringKeyValueDeserializer
val log = mock(classOf[UnifiedLog])
val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
TestUtils.resource(new CoordinatorLoaderImpl[(String, String)](
time = Time.SYSTEM,
replicaManager = replicaManager,
deserializer = serde,
loadBufferSize = 1000
)) { loader =>
when(replicaManager.getLog(tp)).thenReturn(Some(log))
when(log.logStartOffset).thenReturn(0L)
when(log.highWatermark).thenReturn(0L)
when(replicaManager.getLogEndOffset(tp)).thenReturn(Some(0L))
assertNotNull(loader.load(tp, coordinator).get(10, TimeUnit.SECONDS))
verify(coordinator, times(0)).updateLastWrittenOffset(anyLong())
verify(coordinator, times(0)).updateLastCommittedOffset(anyLong())
}
}
private def logReadResult(
startOffset: Long,
producerId: Long = RecordBatch.NO_PRODUCER_ID,

View File

@ -38,4 +38,18 @@ public interface CoordinatorPlayback<U> {
short producerEpoch,
U record
) throws RuntimeException;
/**
* Invoke operations when a batch has been successfully loaded.
*
* @param offset the offset of the last record in the batch plus one.
*/
void updateLastWrittenOffset(Long offset);
/**
* Called when the high watermark advances.
*
* @param offset The offset of the new high watermark.
*/
void updateLastCommittedOffset(Long offset);
}

View File

@ -421,25 +421,10 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
int epoch;
/**
* The snapshot registry backing the coordinator.
* The state machine and the metadata that can be accessed by
* other threads.
*/
SnapshotRegistry snapshotRegistry;
/**
* The actual state machine.
*/
S coordinator;
/**
* The last offset written to the partition.
*/
long lastWrittenOffset;
/**
* The last offset committed. This represents the high
* watermark of the partition.
*/
long lastCommittedOffset;
SnapshottableCoordinator<S, U> coordinator;
/**
* Constructor.
@ -462,67 +447,6 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
this.timer = new EventBasedCoordinatorTimer(tp, logContext);
}
/**
* Updates the last written offset. This also create a new snapshot
* in the snapshot registry.
*
* @param offset The new last written offset.
*/
private void updateLastWrittenOffset(
long offset
) {
if (offset <= lastWrittenOffset) {
throw new IllegalStateException("New last written offset " + offset + " of " + tp +
" must be larger than " + lastWrittenOffset + ".");
}
log.debug("Update last written offset of {} to {}.", tp, offset);
lastWrittenOffset = offset;
snapshotRegistry.getOrCreateSnapshot(offset);
}
/**
* Reverts the last written offset. This also reverts the snapshot
* registry to this offset. All the changes applied after the offset
* are lost.
*
* @param offset The offset to revert to.
*/
private void revertLastWrittenOffset(
long offset
) {
if (offset > lastWrittenOffset) {
throw new IllegalStateException("New offset " + offset + " of " + tp +
" must be smaller than " + lastWrittenOffset + ".");
}
log.debug("Revert last written offset of {} to {}.", tp, offset);
lastWrittenOffset = offset;
snapshotRegistry.revertToSnapshot(offset);
}
/**
* Updates the last committed offset. This completes all the deferred
* events waiting on this offset. This also cleanups all the snapshots
* prior to this offset.
*
* @param offset The new last committed offset.
*/
private void updateLastCommittedOffset(
long offset
) {
if (offset <= lastCommittedOffset) {
throw new IllegalStateException("New committed offset " + offset + " of " + tp +
" must be larger than " + lastCommittedOffset + ".");
}
log.debug("Update committed offset of {} to {}.", tp, offset);
lastCommittedOffset = offset;
deferredEventQueue.completeUpTo(offset);
snapshotRegistry.deleteSnapshotsUpTo(offset);
coordinatorMetrics.onUpdateLastCommittedOffset(tp, offset);
}
/**
* Transitions to the new state.
*
@ -540,23 +464,25 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
switch (newState) {
case LOADING:
state = CoordinatorState.LOADING;
snapshotRegistry = new SnapshotRegistry(logContext);
lastWrittenOffset = 0L;
lastCommittedOffset = 0L;
coordinator = coordinatorShardBuilderSupplier
.get()
.withLogContext(logContext)
.withSnapshotRegistry(snapshotRegistry)
.withTime(time)
.withTimer(timer)
.withCoordinatorMetrics(coordinatorMetrics)
.withTopicPartition(tp)
.build();
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext);
coordinator = new SnapshottableCoordinator<>(
logContext,
snapshotRegistry,
coordinatorShardBuilderSupplier
.get()
.withLogContext(logContext)
.withSnapshotRegistry(snapshotRegistry)
.withTime(time)
.withTimer(timer)
.withCoordinatorMetrics(coordinatorMetrics)
.withTopicPartition(tp)
.build(),
tp
);
break;
case ACTIVE:
state = CoordinatorState.ACTIVE;
snapshotRegistry.getOrCreateSnapshot(0);
partitionWriter.registerListener(tp, highWatermarklistener);
coordinator.onLoaded(metadataImage);
break;
@ -589,7 +515,6 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
coordinator.onUnloaded();
}
coordinator = null;
snapshotRegistry = null;
}
}
@ -734,10 +659,10 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
try {
// Get the context of the coordinator or fail if the coordinator is not in active state.
withActiveContextOrThrow(tp, context -> {
long prevLastWrittenOffset = context.lastWrittenOffset;
long prevLastWrittenOffset = context.coordinator.lastWrittenOffset();
// Execute the operation.
result = op.generateRecordsAndResult(context.coordinator);
result = op.generateRecordsAndResult(context.coordinator.coordinator());
if (result.records().isEmpty()) {
// If the records are empty, it was a read operation after all. In this case,
@ -773,7 +698,7 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
producerEpoch,
result.records()
);
context.updateLastWrittenOffset(offset);
context.coordinator.updateLastWrittenOffset(offset);
// Add the response to the deferred queue.
if (!future.isDone()) {
@ -782,7 +707,7 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
complete(null);
}
} catch (Throwable t) {
context.revertLastWrittenOffset(prevLastWrittenOffset);
context.coordinator.revertLastWrittenOffset(prevLastWrittenOffset);
complete(t);
}
}
@ -918,8 +843,8 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
withActiveContextOrThrow(tp, context -> {
// Execute the read operation.
response = op.generateResponse(
context.coordinator,
context.lastCommittedOffset
context.coordinator.coordinator(),
context.coordinator.lastCommittedOffset()
);
// The response can be completed immediately.
@ -1061,7 +986,9 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
log.debug("High watermark of {} incremented to {}.", tp, offset);
scheduleInternalOperation("HighWatermarkUpdated(tp=" + tp + ", offset=" + offset + ")", tp, () -> {
withActiveContextOrThrow(tp, context -> {
context.updateLastCommittedOffset(offset);
context.coordinator.updateLastCommittedOffset(offset);
context.deferredEventQueue.completeUpTo(offset);
coordinatorMetrics.onUpdateLastCommittedOffset(tp, offset);
});
});
}
@ -1211,8 +1138,10 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
* Creates the context if it does not exist.
*
* @param tp The topic partition.
*
* Visible for testing.
*/
private void maybeCreateContext(TopicPartition tp) {
void maybeCreateContext(TopicPartition tp) {
coordinators.computeIfAbsent(tp, CoordinatorContext::new);
}
@ -1422,7 +1351,10 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
case FAILED:
case INITIAL:
context.transitionTo(CoordinatorState.LOADING);
loader.load(tp, context.coordinator).whenComplete((summary, exception) -> {
loader.load(
tp,
context.coordinator
).whenComplete((summary, exception) -> {
scheduleInternalOperation("CompleteLoad(tp=" + tp + ", epoch=" + partitionEpoch + ")", tp, () -> {
withContextOrThrow(tp, ctx -> {
if (ctx.state != CoordinatorState.LOADING) {

View File

@ -23,7 +23,7 @@ import org.apache.kafka.image.MetadataImage;
* CoordinatorShard is basically a replicated state machine managed by the
* {@link CoordinatorRuntime}.
*/
public interface CoordinatorShard<U> extends CoordinatorPlayback<U> {
public interface CoordinatorShard<U> {
/**
* The coordinator has been loaded. This is used to apply any
@ -47,4 +47,15 @@ public interface CoordinatorShard<U> extends CoordinatorPlayback<U> {
* any post unloading operations.
*/
default void onUnloaded() {}
/**
* Replay a record to update the state machine.
*
* @param record The record to replay.
*/
void replay(
long producerId,
short producerEpoch,
U record
) throws RuntimeException;
}

View File

@ -0,0 +1,211 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.runtime;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.slf4j.Logger;
/**
* SnapshottableCoordinator is a wrapper on top of the coordinator state machine. This object is not accessed concurrently
* but multiple threads access it while loading the coordinator partition and therefore requires all methods to be
* synchronized.
*/
class SnapshottableCoordinator<S extends CoordinatorShard<U>, U> implements CoordinatorPlayback<U> {
/**
* The logger.
*/
private final Logger log;
/**
* The snapshot registry backing the coordinator.
*/
private final SnapshotRegistry snapshotRegistry;
/**
* The actual state machine.
*/
private final S coordinator;
/**
* The topic partition.
*/
private final TopicPartition tp;
/**
* The last offset written to the partition.
*/
private long lastWrittenOffset;
/**
* The last offset committed. This represents the high
* watermark of the partition.
*/
private long lastCommittedOffset;
SnapshottableCoordinator(
LogContext logContext,
SnapshotRegistry snapshotRegistry,
S coordinator,
TopicPartition tp
) {
this.log = logContext.logger(SnapshottableCoordinator.class);
this.coordinator = coordinator;
this.snapshotRegistry = snapshotRegistry;
this.tp = tp;
this.lastWrittenOffset = 0;
this.lastCommittedOffset = 0;
snapshotRegistry.getOrCreateSnapshot(0);
}
/**
* Reverts the last written offset. This also reverts the snapshot
* registry to this offset. All the changes applied after the offset
* are lost.
*
* @param offset The offset to revert to.
*/
synchronized void revertLastWrittenOffset(
long offset
) {
if (offset > lastWrittenOffset) {
throw new IllegalStateException("New offset " + offset + " of " + tp +
" must be smaller than " + lastWrittenOffset + ".");
}
log.debug("Revert last written offset of {} to {}.", tp, offset);
lastWrittenOffset = offset;
snapshotRegistry.revertToSnapshot(offset);
}
/**
* Replays the record onto the state machine.
*
* @param producerId The producer id.
* @param producerEpoch The producer epoch.
* @param record A record.
*/
@Override
public synchronized void replay(
long producerId,
short producerEpoch,
U record
) {
coordinator.replay(producerId, producerEpoch, record);
}
/**
* Updates the last written offset. This also create a new snapshot
* in the snapshot registry.
*
* @param offset The new last written offset.
*/
@Override
public synchronized void updateLastWrittenOffset(Long offset) {
if (offset <= lastWrittenOffset) {
throw new IllegalStateException("New last written offset " + offset + " of " + tp +
" must be greater than " + lastWrittenOffset + ".");
}
lastWrittenOffset = offset;
snapshotRegistry.getOrCreateSnapshot(offset);
log.debug("Updated last written offset of {} to {}.", tp, offset);
}
/**
* Updates the last committed offset. This completes all the deferred
* events waiting on this offset. This also cleanups all the snapshots
* prior to this offset.
*
* @param offset The new last committed offset.
*/
@Override
public synchronized void updateLastCommittedOffset(Long offset) {
if (offset < lastCommittedOffset) {
throw new IllegalStateException("New committed offset " + offset + " of " + tp +
" must be greater than or equal to " + lastCommittedOffset + ".");
}
lastCommittedOffset = offset;
snapshotRegistry.deleteSnapshotsUpTo(offset);
log.debug("Updated committed offset of {} to {}.", tp, offset);
}
/**
* The coordinator has been loaded. This is used to apply any
* post loading operations.
*
* @param newImage The metadata image.
*/
synchronized void onLoaded(MetadataImage newImage) {
this.coordinator.onLoaded(newImage);
}
/**
* The coordinator has been unloaded. This is used to apply
* any post unloading operations.
*/
synchronized void onUnloaded() {
if (this.coordinator != null) {
this.coordinator.onUnloaded();
}
}
/**
* @return The last written offset.
*/
synchronized long lastWrittenOffset() {
return this.lastWrittenOffset;
}
/**
* A new metadata image is available. This is only called after {@link SnapshottableCoordinator#onLoaded(MetadataImage)}
* is called to signal that the coordinator has been fully loaded.
*
* @param newImage The new metadata image.
* @param delta The delta image.
*/
synchronized void onNewMetadataImage(MetadataImage newImage, MetadataDelta delta) {
this.coordinator.onNewMetadataImage(newImage, delta);
}
/**
* @return The last committed offset.
*/
synchronized long lastCommittedOffset() {
return this.lastCommittedOffset;
}
/**
* @return The coordinator.
*/
synchronized S coordinator() {
return this.coordinator;
}
/**
* @return The snapshot registry.
*
* Only used for testing.
*/
synchronized SnapshotRegistry snapshotRegistry() {
return this.snapshotRegistry;
}
}

View File

@ -31,6 +31,7 @@ import org.apache.kafka.server.util.timer.MockTimer;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.apache.kafka.timeline.TimelineHashSet;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatcher;
import java.util.Arrays;
import java.util.Collections;
@ -59,6 +60,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
@ -127,17 +129,30 @@ public class CoordinatorRuntimeTest {
*/
private static class MockCoordinatorLoader implements CoordinatorLoader<String> {
private final LoadSummary summary;
private final List<Long> lastWrittenOffsets;
private final List<Long> lastCommittedOffsets;
public MockCoordinatorLoader(LoadSummary summary) {
public MockCoordinatorLoader(
LoadSummary summary,
List<Long> lastWrittenOffsets,
List<Long> lastCommittedOffsets
) {
this.summary = summary;
this.lastWrittenOffsets = lastWrittenOffsets;
this.lastCommittedOffsets = lastCommittedOffsets;
}
public MockCoordinatorLoader() {
this(null);
this(null, Collections.emptyList(), Collections.emptyList());
}
@Override
public CompletableFuture<LoadSummary> load(TopicPartition tp, CoordinatorPlayback<String> replayable) {
public CompletableFuture<LoadSummary> load(
TopicPartition tp,
CoordinatorPlayback<String> replayable
) {
lastWrittenOffsets.forEach(replayable::updateLastWrittenOffset);
lastCommittedOffsets.forEach(replayable::updateLastCommittedOffset);
return CompletableFuture.completedFuture(summary);
}
@ -194,7 +209,7 @@ public class CoordinatorRuntimeTest {
/**
* A simple Coordinator implementation that stores the records into a set.
*/
private static class MockCoordinatorShard implements CoordinatorShard<String> {
static class MockCoordinatorShard implements CoordinatorShard<String> {
private final TimelineHashSet<String> records;
private final CoordinatorTimer<Void, String> timer;
@ -322,7 +337,7 @@ public class CoordinatorRuntimeTest {
when(builder.build()).thenReturn(coordinator);
when(supplier.get()).thenReturn(builder);
CompletableFuture<CoordinatorLoader.LoadSummary> future = new CompletableFuture<>();
when(loader.load(TP, coordinator)).thenReturn(future);
when(loader.load(eq(TP), argThat(coordinatorMatcher(runtime, TP)))).thenReturn(future);
// Getting the coordinator context fails because the coordinator
// does not exist until scheduleLoadOperation is called.
@ -333,11 +348,12 @@ public class CoordinatorRuntimeTest {
// Getting the coordinator context succeeds now.
CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP);
assertEquals(coordinator, ctx.coordinator.coordinator());
// The coordinator is loading.
assertEquals(LOADING, ctx.state);
assertEquals(0, ctx.epoch);
assertEquals(coordinator, ctx.coordinator);
assertEquals(coordinator, ctx.coordinator.coordinator());
// When the loading completes, the coordinator transitions to active.
future.complete(null);
@ -353,7 +369,7 @@ public class CoordinatorRuntimeTest {
);
// Verify that the builder got all the expected objects.
verify(builder, times(1)).withSnapshotRegistry(eq(ctx.snapshotRegistry));
verify(builder, times(1)).withSnapshotRegistry(eq(ctx.coordinator.snapshotRegistry()));
verify(builder, times(1)).withLogContext(eq(ctx.logContext));
verify(builder, times(1)).withTime(eq(timer.time()));
verify(builder, times(1)).withTimer(eq(ctx.timer));
@ -389,7 +405,7 @@ public class CoordinatorRuntimeTest {
when(builder.build()).thenReturn(coordinator);
when(supplier.get()).thenReturn(builder);
CompletableFuture<CoordinatorLoader.LoadSummary> future = new CompletableFuture<>();
when(loader.load(TP, coordinator)).thenReturn(future);
when(loader.load(eq(TP), argThat(coordinatorMatcher(runtime, TP)))).thenReturn(future);
// Schedule the loading.
runtime.scheduleLoadOperation(TP, 0);
@ -398,7 +414,7 @@ public class CoordinatorRuntimeTest {
CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP);
assertEquals(LOADING, ctx.state);
assertEquals(0, ctx.epoch);
assertEquals(coordinator, ctx.coordinator);
assertEquals(coordinator, ctx.coordinator.coordinator());
// When the loading fails, the coordinator transitions to failed.
future.completeExceptionally(new Exception("failure"));
@ -443,7 +459,7 @@ public class CoordinatorRuntimeTest {
when(builder.build()).thenReturn(coordinator);
when(supplier.get()).thenReturn(builder);
CompletableFuture<CoordinatorLoader.LoadSummary> future = new CompletableFuture<>();
when(loader.load(TP, coordinator)).thenReturn(future);
when(loader.load(eq(TP), argThat(coordinatorMatcher(runtime, TP)))).thenReturn(future);
// Schedule the loading.
runtime.scheduleLoadOperation(TP, 10);
@ -452,7 +468,7 @@ public class CoordinatorRuntimeTest {
CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP);
assertEquals(LOADING, ctx.state);
assertEquals(10, ctx.epoch);
assertEquals(coordinator, ctx.coordinator);
assertEquals(coordinator, ctx.coordinator.coordinator());
// When the loading completes, the coordinator transitions to active.
future.complete(null);
@ -495,7 +511,7 @@ public class CoordinatorRuntimeTest {
when(builder.build()).thenReturn(coordinator);
when(supplier.get()).thenReturn(builder);
CompletableFuture<CoordinatorLoader.LoadSummary> future = new CompletableFuture<>();
when(loader.load(TP, coordinator)).thenReturn(future);
when(loader.load(eq(TP), argThat(coordinatorMatcher(runtime, TP)))).thenReturn(future);
// Schedule the loading.
runtime.scheduleLoadOperation(TP, 10);
@ -504,7 +520,7 @@ public class CoordinatorRuntimeTest {
CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP);
assertEquals(LOADING, ctx.state);
assertEquals(10, ctx.epoch);
assertEquals(coordinator, ctx.coordinator);
assertEquals(coordinator, ctx.coordinator.coordinator());
// When the loading fails, the coordinator transitions to failed.
future.completeExceptionally(new Exception("failure"));
@ -519,14 +535,14 @@ public class CoordinatorRuntimeTest {
// Schedule the reloading.
future = new CompletableFuture<>();
when(loader.load(TP, coordinator)).thenReturn(future);
when(loader.load(eq(TP), argThat(coordinatorMatcher(runtime, TP)))).thenReturn(future);
runtime.scheduleLoadOperation(TP, 11);
// Getting the context succeeds and the coordinator should be in loading.
ctx = runtime.contextOrThrow(TP);
assertEquals(LOADING, ctx.state);
assertEquals(11, ctx.epoch);
assertEquals(coordinator, ctx.coordinator);
assertEquals(coordinator, ctx.coordinator.coordinator());
// Complete the loading.
future.complete(null);
@ -692,9 +708,9 @@ public class CoordinatorRuntimeTest {
// Verify the initial state.
CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP);
assertEquals(0L, ctx.lastWrittenOffset);
assertEquals(0L, ctx.lastCommittedOffset);
assertEquals(Collections.singletonList(0L), ctx.snapshotRegistry.epochsList());
assertEquals(0L, ctx.coordinator.lastWrittenOffset());
assertEquals(0L, ctx.coordinator.lastCommittedOffset());
assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList());
// Write #1.
CompletableFuture<String> write1 = runtime.scheduleWriteOperation("write#1", TP,
@ -704,13 +720,13 @@ public class CoordinatorRuntimeTest {
assertFalse(write1.isDone());
// The last written offset is updated.
assertEquals(2L, ctx.lastWrittenOffset);
assertEquals(2L, ctx.coordinator.lastWrittenOffset());
// The last committed offset does not change.
assertEquals(0L, ctx.lastCommittedOffset);
assertEquals(0L, ctx.coordinator.lastCommittedOffset());
// A new snapshot is created.
assertEquals(Arrays.asList(0L, 2L), ctx.snapshotRegistry.epochsList());
assertEquals(Arrays.asList(0L, 2L), ctx.coordinator.snapshotRegistry().epochsList());
// Records have been replayed to the coordinator.
assertEquals(mkSet("record1", "record2"), ctx.coordinator.records());
assertEquals(mkSet("record1", "record2"), ctx.coordinator.coordinator().records());
// Records have been written to the log.
assertEquals(Arrays.asList("record1", "record2"), writer.records(TP));
@ -722,13 +738,13 @@ public class CoordinatorRuntimeTest {
assertFalse(write2.isDone());
// The last written offset is updated.
assertEquals(3L, ctx.lastWrittenOffset);
assertEquals(3L, ctx.coordinator.lastWrittenOffset());
// The last committed offset does not change.
assertEquals(0L, ctx.lastCommittedOffset);
assertEquals(0L, ctx.coordinator.lastCommittedOffset());
// A new snapshot is created.
assertEquals(Arrays.asList(0L, 2L, 3L), ctx.snapshotRegistry.epochsList());
assertEquals(Arrays.asList(0L, 2L, 3L), ctx.coordinator.snapshotRegistry().epochsList());
// Records have been replayed to the coordinator.
assertEquals(mkSet("record1", "record2", "record3"), ctx.coordinator.records());
assertEquals(mkSet("record1", "record2", "record3"), ctx.coordinator.coordinator().records());
// Records have been written to the log.
assertEquals(Arrays.asList("record1", "record2", "record3"), writer.records(TP));
@ -740,10 +756,10 @@ public class CoordinatorRuntimeTest {
assertFalse(write3.isDone());
// The state does not change.
assertEquals(3L, ctx.lastWrittenOffset);
assertEquals(0L, ctx.lastCommittedOffset);
assertEquals(Arrays.asList(0L, 2L, 3L), ctx.snapshotRegistry.epochsList());
assertEquals(mkSet("record1", "record2", "record3"), ctx.coordinator.records());
assertEquals(3L, ctx.coordinator.lastWrittenOffset());
assertEquals(0L, ctx.coordinator.lastCommittedOffset());
assertEquals(Arrays.asList(0L, 2L, 3L), ctx.coordinator.snapshotRegistry().epochsList());
assertEquals(mkSet("record1", "record2", "record3"), ctx.coordinator.coordinator().records());
assertEquals(Arrays.asList("record1", "record2", "record3"), writer.records(TP));
// Commit write #1.
@ -754,9 +770,9 @@ public class CoordinatorRuntimeTest {
assertEquals("response1", write1.get(5, TimeUnit.SECONDS));
// The last committed offset is updated.
assertEquals(2L, ctx.lastCommittedOffset);
assertEquals(2L, ctx.coordinator.lastCommittedOffset());
// The snapshot is cleaned up.
assertEquals(Arrays.asList(2L, 3L), ctx.snapshotRegistry.epochsList());
assertEquals(Arrays.asList(2L, 3L), ctx.coordinator.snapshotRegistry().epochsList());
// Commit write #2.
writer.commit(TP, 3);
@ -768,9 +784,9 @@ public class CoordinatorRuntimeTest {
assertEquals("response3", write3.get(5, TimeUnit.SECONDS));
// The last committed offset is updated.
assertEquals(3L, ctx.lastCommittedOffset);
assertEquals(3L, ctx.coordinator.lastCommittedOffset());
// The snapshot is cleaned up.
assertEquals(Collections.singletonList(3L), ctx.snapshotRegistry.epochsList());
assertEquals(Collections.singletonList(3L), ctx.coordinator.snapshotRegistry().epochsList());
// Write #4 but without records.
CompletableFuture<String> write4 = runtime.scheduleWriteOperation("write#4", TP,
@ -779,7 +795,7 @@ public class CoordinatorRuntimeTest {
// It is completed immediately because the state is fully committed.
assertTrue(write4.isDone());
assertEquals("response4", write4.get(5, TimeUnit.SECONDS));
assertEquals(Collections.singletonList(3L), ctx.snapshotRegistry.epochsList());
assertEquals(Collections.singletonList(3L), ctx.coordinator.snapshotRegistry().epochsList());
}
@Test
@ -850,22 +866,28 @@ public class CoordinatorRuntimeTest {
// Verify the initial state.
CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP);
assertEquals(0L, ctx.lastWrittenOffset);
assertEquals(0L, ctx.lastCommittedOffset);
assertEquals(Collections.singletonList(0L), ctx.snapshotRegistry.epochsList());
assertEquals(0L, ctx.coordinator.lastWrittenOffset());
assertEquals(0L, ctx.coordinator.lastCommittedOffset());
assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList());
// Override the coordinator with a coordinator that throws
// an exception when replay is called.
ctx.coordinator = new MockCoordinatorShard(ctx.snapshotRegistry, ctx.timer) {
@Override
public void replay(
long producerId,
short producerEpoch,
String record
) throws RuntimeException {
throw new IllegalArgumentException("error");
}
};
SnapshotRegistry snapshotRegistry = ctx.coordinator.snapshotRegistry();
ctx.coordinator = new SnapshottableCoordinator<>(
new LogContext(),
snapshotRegistry,
new MockCoordinatorShard(snapshotRegistry, ctx.timer) {
@Override
public void replay(
long producerId,
short producerEpoch,
String record
) throws RuntimeException {
throw new IllegalArgumentException("error");
}
},
TP
);
// Write. It should fail.
CompletableFuture<String> write = runtime.scheduleWriteOperation("write", TP,
@ -873,9 +895,9 @@ public class CoordinatorRuntimeTest {
assertFutureThrows(write, IllegalArgumentException.class);
// Verify that the state has not changed.
assertEquals(0L, ctx.lastWrittenOffset);
assertEquals(0L, ctx.lastCommittedOffset);
assertEquals(Collections.singletonList(0L), ctx.snapshotRegistry.epochsList());
assertEquals(0L, ctx.coordinator.lastWrittenOffset());
assertEquals(0L, ctx.coordinator.lastCommittedOffset());
assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList());
}
@Test
@ -901,19 +923,19 @@ public class CoordinatorRuntimeTest {
// Verify the initial state.
CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP);
assertEquals(0, ctx.lastWrittenOffset);
assertEquals(0, ctx.lastCommittedOffset);
assertEquals(Collections.singletonList(0L), ctx.snapshotRegistry.epochsList());
assertEquals(0, ctx.coordinator.lastWrittenOffset());
assertEquals(0, ctx.coordinator.lastCommittedOffset());
assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList());
// Write #1. It should succeed and be applied to the coordinator.
runtime.scheduleWriteOperation("write#1", TP,
state -> new CoordinatorResult<>(Arrays.asList("record1", "record2"), "response1"));
// Verify that the state has been updated.
assertEquals(2L, ctx.lastWrittenOffset);
assertEquals(0L, ctx.lastCommittedOffset);
assertEquals(Arrays.asList(0L, 2L), ctx.snapshotRegistry.epochsList());
assertEquals(mkSet("record1", "record2"), ctx.coordinator.records());
assertEquals(2L, ctx.coordinator.lastWrittenOffset());
assertEquals(0L, ctx.coordinator.lastCommittedOffset());
assertEquals(Arrays.asList(0L, 2L), ctx.coordinator.snapshotRegistry().epochsList());
assertEquals(mkSet("record1", "record2"), ctx.coordinator.coordinator().records());
// Write #2. It should fail because the writer is configured to only
// accept 2 records per batch.
@ -922,10 +944,10 @@ public class CoordinatorRuntimeTest {
assertFutureThrows(write2, KafkaException.class);
// Verify that the state has not changed.
assertEquals(2L, ctx.lastWrittenOffset);
assertEquals(0L, ctx.lastCommittedOffset);
assertEquals(Arrays.asList(0L, 2L), ctx.snapshotRegistry.epochsList());
assertEquals(mkSet("record1", "record2"), ctx.coordinator.records());
assertEquals(2L, ctx.coordinator.lastWrittenOffset());
assertEquals(0L, ctx.coordinator.lastCommittedOffset());
assertEquals(Arrays.asList(0L, 2L), ctx.coordinator.snapshotRegistry().epochsList());
assertEquals(mkSet("record1", "record2"), ctx.coordinator.coordinator().records());
}
@Test
@ -1019,8 +1041,8 @@ public class CoordinatorRuntimeTest {
// Verify the initial state.
CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP);
assertEquals(0, ctx.lastWrittenOffset);
assertEquals(0, ctx.lastCommittedOffset);
assertEquals(0, ctx.coordinator.lastWrittenOffset());
assertEquals(0, ctx.coordinator.lastCommittedOffset());
// Write #1.
CompletableFuture<String> write1 = runtime.scheduleWriteOperation("write#1", TP,
@ -1040,13 +1062,13 @@ public class CoordinatorRuntimeTest {
assertFalse(write2.isDone());
// The last written and committed offsets are updated.
assertEquals(4, ctx.lastWrittenOffset);
assertEquals(2, ctx.lastCommittedOffset);
assertEquals(4, ctx.coordinator.lastWrittenOffset());
assertEquals(2, ctx.coordinator.lastCommittedOffset());
// Read.
CompletableFuture<String> read = runtime.scheduleReadOperation("read", TP, (state, offset) -> {
// The read operation should be given the last committed offset.
assertEquals(ctx.lastCommittedOffset, offset);
assertEquals(ctx.coordinator.lastCommittedOffset(), offset);
return "read-response";
});
@ -1098,8 +1120,8 @@ public class CoordinatorRuntimeTest {
// Verify the initial state.
CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP);
assertEquals(0, ctx.lastWrittenOffset);
assertEquals(0, ctx.lastCommittedOffset);
assertEquals(0, ctx.coordinator.lastWrittenOffset());
assertEquals(0, ctx.coordinator.lastCommittedOffset());
// Write #1.
runtime.scheduleWriteOperation("write#1", TP,
@ -1114,7 +1136,7 @@ public class CoordinatorRuntimeTest {
// Read. It fails with an exception that is used to complete the future.
CompletableFuture<String> read = runtime.scheduleReadOperation("read", TP, (state, offset) -> {
assertEquals(ctx.lastCommittedOffset, offset);
assertEquals(ctx.coordinator.lastCommittedOffset(), offset);
throw new IllegalArgumentException("error");
});
assertFutureThrows(read, IllegalArgumentException.class);
@ -1141,8 +1163,8 @@ public class CoordinatorRuntimeTest {
// Check initial state.
CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP);
assertEquals(0, ctx.lastWrittenOffset);
assertEquals(0, ctx.lastCommittedOffset);
assertEquals(0, ctx.coordinator.lastWrittenOffset());
assertEquals(0, ctx.coordinator.lastCommittedOffset());
// Write #1.
CompletableFuture<String> write1 = runtime.scheduleWriteOperation("write#1", TP,
@ -1160,7 +1182,7 @@ public class CoordinatorRuntimeTest {
assertEquals(0, ctx.timer.size());
// Timer #1. This is never executed.
ctx.coordinator.timer.schedule("timer-1", 10, TimeUnit.SECONDS, true,
ctx.timer.schedule("timer-1", 10, TimeUnit.SECONDS, true,
() -> new CoordinatorResult<>(Arrays.asList("record5", "record6"), null));
// The coordinator timer should have one pending task.
@ -1219,14 +1241,17 @@ public class CoordinatorRuntimeTest {
.thenReturn(coordinator1);
CompletableFuture<CoordinatorLoader.LoadSummary> future0 = new CompletableFuture<>();
when(loader.load(tp0, coordinator0)).thenReturn(future0);
when(loader.load(eq(tp0), argThat(coordinatorMatcher(runtime, tp0)))).thenReturn(future0);
CompletableFuture<CoordinatorLoader.LoadSummary> future1 = new CompletableFuture<>();
when(loader.load(tp1, coordinator1)).thenReturn(future1);
when(loader.load(eq(tp1), argThat(coordinatorMatcher(runtime, tp1)))).thenReturn(future1);
runtime.scheduleLoadOperation(tp0, 0);
runtime.scheduleLoadOperation(tp1, 0);
assertEquals(coordinator0, runtime.contextOrThrow(tp0).coordinator.coordinator());
assertEquals(coordinator1, runtime.contextOrThrow(tp1).coordinator.coordinator());
// Coordinator 0 is loaded. It should get the current image
// that is the empty one.
future0.complete(null);
@ -1266,18 +1291,18 @@ public class CoordinatorRuntimeTest {
// Check initial state.
CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP);
assertEquals(0, ctx.lastWrittenOffset);
assertEquals(0, ctx.lastCommittedOffset);
assertEquals(0, ctx.coordinator.lastWrittenOffset());
assertEquals(0, ctx.coordinator.lastCommittedOffset());
// The coordinator timer should be empty.
assertEquals(0, ctx.timer.size());
// Timer #1.
ctx.coordinator.timer.schedule("timer-1", 10, TimeUnit.MILLISECONDS, true,
ctx.timer.schedule("timer-1", 10, TimeUnit.MILLISECONDS, true,
() -> new CoordinatorResult<>(Arrays.asList("record1", "record2"), null));
// Timer #2.
ctx.coordinator.timer.schedule("timer-2", 20, TimeUnit.MILLISECONDS, true,
ctx.timer.schedule("timer-2", 20, TimeUnit.MILLISECONDS, true,
() -> new CoordinatorResult<>(Arrays.asList("record3", "record4"), null));
// The coordinator timer should have two pending tasks.
@ -1287,14 +1312,14 @@ public class CoordinatorRuntimeTest {
timer.advanceClock(10 + 1);
// Verify that the operation was executed.
assertEquals(mkSet("record1", "record2"), ctx.coordinator.records());
assertEquals(mkSet("record1", "record2"), ctx.coordinator.coordinator().records());
assertEquals(1, ctx.timer.size());
// Advance time to fire timer #2,
timer.advanceClock(10 + 1);
// Verify that the operation was executed.
assertEquals(mkSet("record1", "record2", "record3", "record4"), ctx.coordinator.records());
assertEquals(mkSet("record1", "record2", "record3", "record4"), ctx.coordinator.coordinator().records());
assertEquals(0, ctx.timer.size());
}
@ -1329,7 +1354,7 @@ public class CoordinatorRuntimeTest {
assertEquals(0, processor.size());
// Timer #1.
ctx.coordinator.timer.schedule("timer-1", 10, TimeUnit.MILLISECONDS, true,
ctx.timer.schedule("timer-1", 10, TimeUnit.MILLISECONDS, true,
() -> new CoordinatorResult<>(Collections.singletonList("record1"), null));
// The coordinator timer should have one pending task.
@ -1342,14 +1367,14 @@ public class CoordinatorRuntimeTest {
assertEquals(1, processor.size());
// Schedule a second timer with the same key.
ctx.coordinator.timer.schedule("timer-1", 10, TimeUnit.MILLISECONDS, true,
ctx.timer.schedule("timer-1", 10, TimeUnit.MILLISECONDS, true,
() -> new CoordinatorResult<>(Collections.singletonList("record2"), null));
// The coordinator timer should still have one pending task.
assertEquals(1, ctx.timer.size());
// Schedule a third timer with the same key.
ctx.coordinator.timer.schedule("timer-1", 10, TimeUnit.MILLISECONDS, true,
ctx.timer.schedule("timer-1", 10, TimeUnit.MILLISECONDS, true,
() -> new CoordinatorResult<>(Collections.singletonList("record3"), null));
// The coordinator timer should still have one pending task.
@ -1367,7 +1392,7 @@ public class CoordinatorRuntimeTest {
// Verify that the correct operation was executed. Only the third
// instance should have been executed here.
assertEquals(mkSet("record3"), ctx.coordinator.records());
assertEquals(mkSet("record3"), ctx.coordinator.coordinator().records());
assertEquals(0, ctx.timer.size());
}
@ -1402,7 +1427,7 @@ public class CoordinatorRuntimeTest {
assertEquals(0, processor.size());
// Timer #1.
ctx.coordinator.timer.schedule("timer-1", 10, TimeUnit.MILLISECONDS, true,
ctx.timer.schedule("timer-1", 10, TimeUnit.MILLISECONDS, true,
() -> new CoordinatorResult<>(Collections.singletonList("record1"), null));
// The coordinator timer should have one pending task.
@ -1415,14 +1440,14 @@ public class CoordinatorRuntimeTest {
assertEquals(1, processor.size());
// Schedule a second timer with the same key.
ctx.coordinator.timer.schedule("timer-1", 10, TimeUnit.MILLISECONDS, true,
ctx.timer.schedule("timer-1", 10, TimeUnit.MILLISECONDS, true,
() -> new CoordinatorResult<>(Collections.singletonList("record2"), null));
// The coordinator timer should still have one pending task.
assertEquals(1, ctx.timer.size());
// Cancel the timer.
ctx.coordinator.timer.cancel("timer-1");
ctx.timer.cancel("timer-1");
// The coordinator timer have no pending timers.
assertEquals(0, ctx.timer.size());
@ -1438,7 +1463,7 @@ public class CoordinatorRuntimeTest {
assertTrue(processor.poll());
// Verify that no operation was executed.
assertEquals(Collections.emptySet(), ctx.coordinator.records());
assertEquals(Collections.emptySet(), ctx.coordinator.coordinator().records());
assertEquals(0, ctx.timer.size());
}
@ -1466,7 +1491,7 @@ public class CoordinatorRuntimeTest {
// Timer #1.
AtomicInteger cnt = new AtomicInteger(0);
ctx.coordinator.timer.schedule("timer-1", 10, TimeUnit.MILLISECONDS, true, () -> {
ctx.timer.schedule("timer-1", 10, TimeUnit.MILLISECONDS, true, () -> {
cnt.incrementAndGet();
throw new KafkaException("error");
});
@ -1496,7 +1521,7 @@ public class CoordinatorRuntimeTest {
assertEquals(1, ctx.timer.size());
// Cancel Timer #1.
ctx.coordinator.timer.cancel("timer-1");
ctx.timer.cancel("timer-1");
assertEquals(0, ctx.timer.size());
}
@ -1524,7 +1549,7 @@ public class CoordinatorRuntimeTest {
// Timer #1.
AtomicInteger cnt = new AtomicInteger(0);
ctx.coordinator.timer.schedule("timer-1", 10, TimeUnit.MILLISECONDS, false, () -> {
ctx.timer.schedule("timer-1", 10, TimeUnit.MILLISECONDS, false, () -> {
cnt.incrementAndGet();
throw new KafkaException("error");
});
@ -1572,7 +1597,7 @@ public class CoordinatorRuntimeTest {
when(builder.build()).thenReturn(coordinator);
when(supplier.get()).thenReturn(builder);
CompletableFuture<CoordinatorLoader.LoadSummary> future = new CompletableFuture<>();
when(loader.load(TP, coordinator)).thenReturn(future);
when(loader.load(eq(TP), argThat(coordinatorMatcher(runtime, TP)))).thenReturn(future);
// Schedule the loading.
runtime.scheduleLoadOperation(TP, 0);
@ -1590,11 +1615,12 @@ public class CoordinatorRuntimeTest {
// Start loading a new topic partition.
TopicPartition tp = new TopicPartition("__consumer_offsets", 1);
future = new CompletableFuture<>();
when(loader.load(tp, coordinator)).thenReturn(future);
when(loader.load(eq(tp), argThat(coordinatorMatcher(runtime, tp)))).thenReturn(future);
// Schedule the loading.
runtime.scheduleLoadOperation(tp, 0);
// Getting the context succeeds and the coordinator should be in loading.
ctx = runtime.contextOrThrow(tp);
assertEquals(coordinator, ctx.coordinator.coordinator());
assertEquals(LOADING, ctx.state);
verify(runtimeMetrics, times(2)).recordPartitionStateChange(INITIAL, LOADING);
@ -1627,7 +1653,9 @@ public class CoordinatorRuntimeTest {
startTimeMs,
startTimeMs + 1000,
30,
3000)))
3000),
Collections.emptyList(),
Collections.emptyList()))
.withEventProcessor(new DirectEventProcessor())
.withPartitionWriter(writer)
.withCoordinatorShardBuilderSupplier(supplier)
@ -1659,4 +1687,116 @@ public class CoordinatorRuntimeTest {
verify(runtimeMetrics, times(1)).recordPartitionLoadSensor(startTimeMs, startTimeMs + 1000);
}
@Test
public void testPartitionLoadGeneratesSnapshotAtHighWatermark() {
MockTimer timer = new MockTimer();
MockPartitionWriter writer = mock(MockPartitionWriter.class);
MockCoordinatorShardBuilderSupplier supplier = mock(MockCoordinatorShardBuilderSupplier.class);
MockCoordinatorShardBuilder builder = mock(MockCoordinatorShardBuilder.class);
MockCoordinatorShard coordinator = mock(MockCoordinatorShard.class);
GroupCoordinatorRuntimeMetrics runtimeMetrics = mock(GroupCoordinatorRuntimeMetrics.class);
CoordinatorRuntime<MockCoordinatorShard, String> runtime =
new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
.withTime(Time.SYSTEM)
.withTimer(timer)
.withLoader(new MockCoordinatorLoader(
new CoordinatorLoader.LoadSummary(
1000,
2000,
30,
3000),
Arrays.asList(5L, 15L, 27L),
Arrays.asList(5L, 15L)))
.withEventProcessor(new DirectEventProcessor())
.withPartitionWriter(writer)
.withCoordinatorShardBuilderSupplier(supplier)
.withCoordinatorRuntimeMetrics(runtimeMetrics)
.withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
.build();
when(builder.withSnapshotRegistry(any())).thenReturn(builder);
when(builder.withLogContext(any())).thenReturn(builder);
when(builder.withTime(any())).thenReturn(builder);
when(builder.withTimer(any())).thenReturn(builder);
when(builder.withCoordinatorMetrics(any())).thenReturn(builder);
when(builder.withTopicPartition(any())).thenReturn(builder);
when(builder.build()).thenReturn(coordinator);
when(supplier.get()).thenReturn(builder);
// Schedule the loading.
runtime.scheduleLoadOperation(TP, 0);
// Getting the coordinator context succeeds now.
CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP);
// When the loading completes, the coordinator transitions to active.
assertEquals(ACTIVE, ctx.state);
assertEquals(27L, ctx.coordinator.lastWrittenOffset());
assertEquals(15L, ctx.coordinator.lastCommittedOffset());
assertFalse(ctx.coordinator.snapshotRegistry().hasSnapshot(0L));
assertFalse(ctx.coordinator.snapshotRegistry().hasSnapshot(5L));
assertTrue(ctx.coordinator.snapshotRegistry().hasSnapshot(15L));
assertTrue(ctx.coordinator.snapshotRegistry().hasSnapshot(27L));
}
@Test
public void testPartitionLoadGeneratesSnapshotAtHighWatermarkNoRecordsLoaded() {
MockTimer timer = new MockTimer();
MockPartitionWriter writer = mock(MockPartitionWriter.class);
MockCoordinatorShardBuilderSupplier supplier = mock(MockCoordinatorShardBuilderSupplier.class);
MockCoordinatorShardBuilder builder = mock(MockCoordinatorShardBuilder.class);
MockCoordinatorShard coordinator = mock(MockCoordinatorShard.class);
GroupCoordinatorRuntimeMetrics runtimeMetrics = mock(GroupCoordinatorRuntimeMetrics.class);
CoordinatorRuntime<MockCoordinatorShard, String> runtime =
new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
.withTime(Time.SYSTEM)
.withTimer(timer)
.withLoader(new MockCoordinatorLoader(
new CoordinatorLoader.LoadSummary(
1000,
2000,
30,
3000),
Collections.emptyList(),
Collections.emptyList()))
.withEventProcessor(new DirectEventProcessor())
.withPartitionWriter(writer)
.withCoordinatorShardBuilderSupplier(supplier)
.withCoordinatorRuntimeMetrics(runtimeMetrics)
.withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
.build();
when(builder.withSnapshotRegistry(any())).thenReturn(builder);
when(builder.withLogContext(any())).thenReturn(builder);
when(builder.withTime(any())).thenReturn(builder);
when(builder.withTimer(any())).thenReturn(builder);
when(builder.withCoordinatorMetrics(any())).thenReturn(builder);
when(builder.withTopicPartition(any())).thenReturn(builder);
when(builder.build()).thenReturn(coordinator);
when(supplier.get()).thenReturn(builder);
// Schedule the loading.
runtime.scheduleLoadOperation(TP, 0);
// Getting the coordinator context succeeds now.
CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP);
// When the loading completes, the coordinator transitions to active.
assertEquals(ACTIVE, ctx.state);
assertEquals(0L, ctx.coordinator.lastWrittenOffset());
assertEquals(0L, ctx.coordinator.lastCommittedOffset());
assertTrue(ctx.coordinator.snapshotRegistry().hasSnapshot(0L));
}
private static <S extends CoordinatorShard<U>, U> ArgumentMatcher<CoordinatorPlayback<U>> coordinatorMatcher(
CoordinatorRuntime<S, U> runtime,
TopicPartition tp
) {
return c -> c.equals(runtime.contextOrThrow(tp).coordinator);
}
}

View File

@ -0,0 +1,145 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.runtime;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.coordinator.group.MockCoordinatorTimer;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.junit.jupiter.api.Test;
import static org.apache.kafka.coordinator.group.runtime.CoordinatorRuntimeTest.MockCoordinatorShard;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class SnapshottableCoordinatorTest {
@Test
public void testUpdateLastWrittenOffset() {
LogContext logContext = new LogContext();
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext);
SnapshottableCoordinator<MockCoordinatorShard, String> coordinator = new SnapshottableCoordinator<>(
logContext,
snapshotRegistry,
new MockCoordinatorShard(snapshotRegistry, new MockCoordinatorTimer<>(new MockTime())),
new TopicPartition("test-topic", 0)
);
assertTrue(coordinator.snapshotRegistry().hasSnapshot(0L));
coordinator.updateLastWrittenOffset(100L);
assertEquals(100L, coordinator.lastWrittenOffset());
assertTrue(coordinator.snapshotRegistry().hasSnapshot(100L));
assertTrue(coordinator.snapshotRegistry().hasSnapshot(0L));
}
@Test
public void testUpdateLastWrittenOffsetFailed() {
LogContext logContext = new LogContext();
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext);
SnapshottableCoordinator<MockCoordinatorShard, String> coordinator = new SnapshottableCoordinator<>(
logContext,
snapshotRegistry,
new MockCoordinatorShard(snapshotRegistry, new MockCoordinatorTimer<>(new MockTime())),
new TopicPartition("test-topic", 0)
);
assertEquals(0L, coordinator.lastWrittenOffset());
assertThrows(IllegalStateException.class, () -> coordinator.updateLastWrittenOffset(0L));
assertEquals(0L, coordinator.lastWrittenOffset());
}
@Test
public void testRevertWrittenOffset() {
LogContext logContext = new LogContext();
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext);
SnapshottableCoordinator<MockCoordinatorShard, String> coordinator = new SnapshottableCoordinator<>(
logContext,
snapshotRegistry,
new MockCoordinatorShard(snapshotRegistry, new MockCoordinatorTimer<>(new MockTime())),
new TopicPartition("test-topic", 0)
);
coordinator.updateLastWrittenOffset(100L);
coordinator.updateLastWrittenOffset(200L);
assertTrue(coordinator.snapshotRegistry().hasSnapshot(0L));
assertTrue(coordinator.snapshotRegistry().hasSnapshot(100L));
assertTrue(coordinator.snapshotRegistry().hasSnapshot(200L));
coordinator.revertLastWrittenOffset(100L);
assertEquals(100L, coordinator.lastWrittenOffset());
assertTrue(coordinator.snapshotRegistry().hasSnapshot(100L));
assertFalse(coordinator.snapshotRegistry().hasSnapshot(200L));
}
@Test
public void testRevertLastWrittenOffsetFailed() {
LogContext logContext = new LogContext();
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext);
SnapshottableCoordinator<MockCoordinatorShard, String> coordinator = new SnapshottableCoordinator<>(
logContext,
snapshotRegistry,
new MockCoordinatorShard(snapshotRegistry, new MockCoordinatorTimer<>(new MockTime())),
new TopicPartition("test-topic", 0)
);
assertEquals(0, coordinator.lastWrittenOffset());
assertThrows(IllegalStateException.class, () -> coordinator.revertLastWrittenOffset(1L));
assertEquals(0, coordinator.lastWrittenOffset());
}
@Test
public void testUpdateLastCommittedOffset() {
LogContext logContext = new LogContext();
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext);
SnapshottableCoordinator<MockCoordinatorShard, String> coordinator = new SnapshottableCoordinator<>(
logContext,
snapshotRegistry,
new MockCoordinatorShard(snapshotRegistry, new MockCoordinatorTimer<>(new MockTime())),
new TopicPartition("test-topic", 0)
);
coordinator.updateLastWrittenOffset(100L);
assertTrue(coordinator.snapshotRegistry().hasSnapshot(0L));
assertTrue(coordinator.snapshotRegistry().hasSnapshot(100L));
coordinator.updateLastCommittedOffset(100L);
assertEquals(100L, coordinator.lastCommittedOffset());
assertFalse(coordinator.snapshotRegistry().hasSnapshot(0L));
assertTrue(coordinator.snapshotRegistry().hasSnapshot(100L));
}
@Test
public void testUpdateLastCommittedOffsetFailed() {
LogContext logContext = new LogContext();
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext);
SnapshottableCoordinator<MockCoordinatorShard, String> coordinator = new SnapshottableCoordinator<>(
logContext,
snapshotRegistry,
new MockCoordinatorShard(snapshotRegistry, new MockCoordinatorTimer<>(new MockTime())),
new TopicPartition("test-topic", 0)
);
coordinator.updateLastWrittenOffset(100L);
coordinator.updateLastCommittedOffset(100L);
assertEquals(100L, coordinator.lastCommittedOffset());
assertThrows(IllegalStateException.class, () -> coordinator.updateLastCommittedOffset(99L));
assertEquals(100L, coordinator.lastCommittedOffset());
}
}