diff --git a/core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala b/core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala index a9a50b77583..27e555fad5a 100644 --- a/core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala +++ b/core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala @@ -96,6 +96,7 @@ class CoordinatorLoaderImpl[T]( // the log end offset but the log is empty. This could happen with compacted topics. var readAtLeastOneRecord = true + var previousHighWatermark = -1L var numRecords = 0 var numBytes = 0 while (currentOffset < logEndOffset && readAtLeastOneRecord && isRunning.get) { @@ -152,7 +153,19 @@ class CoordinatorLoaderImpl[T]( } } + // Note that the high watermark can be greater than the current offset but as we load more records + // the current offset will eventually surpass the high watermark. Also note that the high watermark + // will continue to advance while loading. currentOffset = batch.nextOffset + val currentHighWatermark = log.highWatermark + if (currentOffset >= currentHighWatermark) { + coordinator.updateLastWrittenOffset(currentOffset) + } + + if (currentHighWatermark > previousHighWatermark) { + coordinator.updateLastCommittedOffset(currentHighWatermark) + previousHighWatermark = currentHighWatermark + } } numBytes = numBytes + memoryRecords.sizeInBytes() } diff --git a/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorLoaderImplTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorLoaderImplTest.scala index 5bb4eb5b756..1475e715c08 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorLoaderImplTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorLoaderImplTest.scala @@ -29,8 +29,9 @@ import org.apache.kafka.storage.internals.log.{FetchDataInfo, FetchIsolation, Lo import org.apache.kafka.test.TestUtils.assertFutureThrows import org.junit.jupiter.api.Assertions.{assertEquals, assertNotNull} import org.junit.jupiter.api.{Test, Timeout} +import org.mockito.ArgumentMatchers.anyLong import org.mockito.{ArgumentCaptor, ArgumentMatchers} -import org.mockito.Mockito.{mock, verify, when} +import org.mockito.Mockito.{mock, times, verify, when} import org.mockito.invocation.InvocationOnMock import java.nio.ByteBuffer @@ -105,6 +106,7 @@ class CoordinatorLoaderImplTest { when(replicaManager.getLog(tp)).thenReturn(Some(log)) when(log.logStartOffset).thenReturn(0L) when(replicaManager.getLogEndOffset(tp)).thenReturn(Some(7L)) + when(log.highWatermark).thenReturn(0L) val readResult1 = logReadResult(startOffset = 0, records = Seq( new SimpleRecord("k1".getBytes, "v1".getBytes), @@ -152,6 +154,9 @@ class CoordinatorLoaderImplTest { verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k5", "v5")) verify(coordinator).replay(100L, 5.toShort, ("k6", "v6")) verify(coordinator).replay(100L, 5.toShort, ("k7", "v7")) + verify(coordinator).updateLastWrittenOffset(2) + verify(coordinator).updateLastWrittenOffset(5) + verify(coordinator).updateLastCommittedOffset(0) } } @@ -366,6 +371,107 @@ class CoordinatorLoaderImplTest { } } + @Test + def testUpdateLastWrittenOffsetOnBatchLoaded(): Unit = { + val tp = new TopicPartition("foo", 0) + val replicaManager = mock(classOf[ReplicaManager]) + val serde = new StringKeyValueDeserializer + val log = mock(classOf[UnifiedLog]) + val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]]) + + TestUtils.resource(new CoordinatorLoaderImpl[(String, String)]( + time = Time.SYSTEM, + replicaManager = replicaManager, + deserializer = serde, + loadBufferSize = 1000 + )) { loader => + when(replicaManager.getLog(tp)).thenReturn(Some(log)) + when(log.logStartOffset).thenReturn(0L) + when(log.highWatermark).thenReturn(0L).thenReturn(0L).thenReturn(2L) + when(replicaManager.getLogEndOffset(tp)).thenReturn(Some(7L)) + + val readResult1 = logReadResult(startOffset = 0, records = Seq( + new SimpleRecord("k1".getBytes, "v1".getBytes), + new SimpleRecord("k2".getBytes, "v2".getBytes) + )) + + when(log.read( + startOffset = 0L, + maxLength = 1000, + isolation = FetchIsolation.LOG_END, + minOneMessage = true + )).thenReturn(readResult1) + + val readResult2 = logReadResult(startOffset = 2, records = Seq( + new SimpleRecord("k3".getBytes, "v3".getBytes), + new SimpleRecord("k4".getBytes, "v4".getBytes), + new SimpleRecord("k5".getBytes, "v5".getBytes) + )) + + when(log.read( + startOffset = 2L, + maxLength = 1000, + isolation = FetchIsolation.LOG_END, + minOneMessage = true + )).thenReturn(readResult2) + + val readResult3 = logReadResult(startOffset = 5, records = Seq( + new SimpleRecord("k6".getBytes, "v6".getBytes), + new SimpleRecord("k7".getBytes, "v7".getBytes) + )) + + when(log.read( + startOffset = 5L, + maxLength = 1000, + isolation = FetchIsolation.LOG_END, + minOneMessage = true + )).thenReturn(readResult3) + + assertNotNull(loader.load(tp, coordinator).get(10, TimeUnit.SECONDS)) + + verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k1", "v1")) + verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k2", "v2")) + verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k3", "v3")) + verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k4", "v4")) + verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k5", "v5")) + verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k6", "v6")) + verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k7", "v7")) + verify(coordinator, times(0)).updateLastWrittenOffset(0) + verify(coordinator, times(1)).updateLastWrittenOffset(2) + verify(coordinator, times(1)).updateLastWrittenOffset(5) + verify(coordinator, times(1)).updateLastWrittenOffset(7) + verify(coordinator, times(1)).updateLastCommittedOffset(0) + verify(coordinator, times(1)).updateLastCommittedOffset(2) + verify(coordinator, times(0)).updateLastCommittedOffset(5) + } + } + + @Test + def testUpdateLastWrittenOffsetAndUpdateLastCommittedOffsetNoRecordsRead(): Unit = { + val tp = new TopicPartition("foo", 0) + val replicaManager = mock(classOf[ReplicaManager]) + val serde = new StringKeyValueDeserializer + val log = mock(classOf[UnifiedLog]) + val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]]) + + TestUtils.resource(new CoordinatorLoaderImpl[(String, String)]( + time = Time.SYSTEM, + replicaManager = replicaManager, + deserializer = serde, + loadBufferSize = 1000 + )) { loader => + when(replicaManager.getLog(tp)).thenReturn(Some(log)) + when(log.logStartOffset).thenReturn(0L) + when(log.highWatermark).thenReturn(0L) + when(replicaManager.getLogEndOffset(tp)).thenReturn(Some(0L)) + + assertNotNull(loader.load(tp, coordinator).get(10, TimeUnit.SECONDS)) + + verify(coordinator, times(0)).updateLastWrittenOffset(anyLong()) + verify(coordinator, times(0)).updateLastCommittedOffset(anyLong()) + } + } + private def logReadResult( startOffset: Long, producerId: Long = RecordBatch.NO_PRODUCER_ID, diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorPlayback.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorPlayback.java index b89ef6160ef..1fd8c9712a6 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorPlayback.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorPlayback.java @@ -38,4 +38,18 @@ public interface CoordinatorPlayback { short producerEpoch, U record ) throws RuntimeException; + + /** + * Invoke operations when a batch has been successfully loaded. + * + * @param offset the offset of the last record in the batch plus one. + */ + void updateLastWrittenOffset(Long offset); + + /** + * Called when the high watermark advances. + * + * @param offset The offset of the new high watermark. + */ + void updateLastCommittedOffset(Long offset); } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java index 07663521086..cc1b41018b6 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java @@ -421,25 +421,10 @@ public class CoordinatorRuntime, U> implements Aut int epoch; /** - * The snapshot registry backing the coordinator. + * The state machine and the metadata that can be accessed by + * other threads. */ - SnapshotRegistry snapshotRegistry; - - /** - * The actual state machine. - */ - S coordinator; - - /** - * The last offset written to the partition. - */ - long lastWrittenOffset; - - /** - * The last offset committed. This represents the high - * watermark of the partition. - */ - long lastCommittedOffset; + SnapshottableCoordinator coordinator; /** * Constructor. @@ -462,67 +447,6 @@ public class CoordinatorRuntime, U> implements Aut this.timer = new EventBasedCoordinatorTimer(tp, logContext); } - /** - * Updates the last written offset. This also create a new snapshot - * in the snapshot registry. - * - * @param offset The new last written offset. - */ - private void updateLastWrittenOffset( - long offset - ) { - if (offset <= lastWrittenOffset) { - throw new IllegalStateException("New last written offset " + offset + " of " + tp + - " must be larger than " + lastWrittenOffset + "."); - } - - log.debug("Update last written offset of {} to {}.", tp, offset); - lastWrittenOffset = offset; - snapshotRegistry.getOrCreateSnapshot(offset); - } - - /** - * Reverts the last written offset. This also reverts the snapshot - * registry to this offset. All the changes applied after the offset - * are lost. - * - * @param offset The offset to revert to. - */ - private void revertLastWrittenOffset( - long offset - ) { - if (offset > lastWrittenOffset) { - throw new IllegalStateException("New offset " + offset + " of " + tp + - " must be smaller than " + lastWrittenOffset + "."); - } - - log.debug("Revert last written offset of {} to {}.", tp, offset); - lastWrittenOffset = offset; - snapshotRegistry.revertToSnapshot(offset); - } - - /** - * Updates the last committed offset. This completes all the deferred - * events waiting on this offset. This also cleanups all the snapshots - * prior to this offset. - * - * @param offset The new last committed offset. - */ - private void updateLastCommittedOffset( - long offset - ) { - if (offset <= lastCommittedOffset) { - throw new IllegalStateException("New committed offset " + offset + " of " + tp + - " must be larger than " + lastCommittedOffset + "."); - } - - log.debug("Update committed offset of {} to {}.", tp, offset); - lastCommittedOffset = offset; - deferredEventQueue.completeUpTo(offset); - snapshotRegistry.deleteSnapshotsUpTo(offset); - coordinatorMetrics.onUpdateLastCommittedOffset(tp, offset); - } - /** * Transitions to the new state. * @@ -540,23 +464,25 @@ public class CoordinatorRuntime, U> implements Aut switch (newState) { case LOADING: state = CoordinatorState.LOADING; - snapshotRegistry = new SnapshotRegistry(logContext); - lastWrittenOffset = 0L; - lastCommittedOffset = 0L; - coordinator = coordinatorShardBuilderSupplier - .get() - .withLogContext(logContext) - .withSnapshotRegistry(snapshotRegistry) - .withTime(time) - .withTimer(timer) - .withCoordinatorMetrics(coordinatorMetrics) - .withTopicPartition(tp) - .build(); + SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext); + coordinator = new SnapshottableCoordinator<>( + logContext, + snapshotRegistry, + coordinatorShardBuilderSupplier + .get() + .withLogContext(logContext) + .withSnapshotRegistry(snapshotRegistry) + .withTime(time) + .withTimer(timer) + .withCoordinatorMetrics(coordinatorMetrics) + .withTopicPartition(tp) + .build(), + tp + ); break; case ACTIVE: state = CoordinatorState.ACTIVE; - snapshotRegistry.getOrCreateSnapshot(0); partitionWriter.registerListener(tp, highWatermarklistener); coordinator.onLoaded(metadataImage); break; @@ -589,7 +515,6 @@ public class CoordinatorRuntime, U> implements Aut coordinator.onUnloaded(); } coordinator = null; - snapshotRegistry = null; } } @@ -734,10 +659,10 @@ public class CoordinatorRuntime, U> implements Aut try { // Get the context of the coordinator or fail if the coordinator is not in active state. withActiveContextOrThrow(tp, context -> { - long prevLastWrittenOffset = context.lastWrittenOffset; + long prevLastWrittenOffset = context.coordinator.lastWrittenOffset(); // Execute the operation. - result = op.generateRecordsAndResult(context.coordinator); + result = op.generateRecordsAndResult(context.coordinator.coordinator()); if (result.records().isEmpty()) { // If the records are empty, it was a read operation after all. In this case, @@ -773,7 +698,7 @@ public class CoordinatorRuntime, U> implements Aut producerEpoch, result.records() ); - context.updateLastWrittenOffset(offset); + context.coordinator.updateLastWrittenOffset(offset); // Add the response to the deferred queue. if (!future.isDone()) { @@ -782,7 +707,7 @@ public class CoordinatorRuntime, U> implements Aut complete(null); } } catch (Throwable t) { - context.revertLastWrittenOffset(prevLastWrittenOffset); + context.coordinator.revertLastWrittenOffset(prevLastWrittenOffset); complete(t); } } @@ -918,8 +843,8 @@ public class CoordinatorRuntime, U> implements Aut withActiveContextOrThrow(tp, context -> { // Execute the read operation. response = op.generateResponse( - context.coordinator, - context.lastCommittedOffset + context.coordinator.coordinator(), + context.coordinator.lastCommittedOffset() ); // The response can be completed immediately. @@ -1061,7 +986,9 @@ public class CoordinatorRuntime, U> implements Aut log.debug("High watermark of {} incremented to {}.", tp, offset); scheduleInternalOperation("HighWatermarkUpdated(tp=" + tp + ", offset=" + offset + ")", tp, () -> { withActiveContextOrThrow(tp, context -> { - context.updateLastCommittedOffset(offset); + context.coordinator.updateLastCommittedOffset(offset); + context.deferredEventQueue.completeUpTo(offset); + coordinatorMetrics.onUpdateLastCommittedOffset(tp, offset); }); }); } @@ -1211,8 +1138,10 @@ public class CoordinatorRuntime, U> implements Aut * Creates the context if it does not exist. * * @param tp The topic partition. + * + * Visible for testing. */ - private void maybeCreateContext(TopicPartition tp) { + void maybeCreateContext(TopicPartition tp) { coordinators.computeIfAbsent(tp, CoordinatorContext::new); } @@ -1422,7 +1351,10 @@ public class CoordinatorRuntime, U> implements Aut case FAILED: case INITIAL: context.transitionTo(CoordinatorState.LOADING); - loader.load(tp, context.coordinator).whenComplete((summary, exception) -> { + loader.load( + tp, + context.coordinator + ).whenComplete((summary, exception) -> { scheduleInternalOperation("CompleteLoad(tp=" + tp + ", epoch=" + partitionEpoch + ")", tp, () -> { withContextOrThrow(tp, ctx -> { if (ctx.state != CoordinatorState.LOADING) { diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorShard.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorShard.java index 1dca9835098..cd5f4e258b4 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorShard.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorShard.java @@ -23,7 +23,7 @@ import org.apache.kafka.image.MetadataImage; * CoordinatorShard is basically a replicated state machine managed by the * {@link CoordinatorRuntime}. */ -public interface CoordinatorShard extends CoordinatorPlayback { +public interface CoordinatorShard { /** * The coordinator has been loaded. This is used to apply any @@ -47,4 +47,15 @@ public interface CoordinatorShard extends CoordinatorPlayback { * any post unloading operations. */ default void onUnloaded() {} + + /** + * Replay a record to update the state machine. + * + * @param record The record to replay. + */ + void replay( + long producerId, + short producerEpoch, + U record + ) throws RuntimeException; } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/SnapshottableCoordinator.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/SnapshottableCoordinator.java new file mode 100644 index 00000000000..fdc1aa47a09 --- /dev/null +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/SnapshottableCoordinator.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.runtime; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.image.MetadataDelta; +import org.apache.kafka.image.MetadataImage; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.slf4j.Logger; + +/** + * SnapshottableCoordinator is a wrapper on top of the coordinator state machine. This object is not accessed concurrently + * but multiple threads access it while loading the coordinator partition and therefore requires all methods to be + * synchronized. + */ +class SnapshottableCoordinator, U> implements CoordinatorPlayback { + /** + * The logger. + */ + private final Logger log; + + /** + * The snapshot registry backing the coordinator. + */ + private final SnapshotRegistry snapshotRegistry; + + /** + * The actual state machine. + */ + private final S coordinator; + + /** + * The topic partition. + */ + private final TopicPartition tp; + + /** + * The last offset written to the partition. + */ + private long lastWrittenOffset; + + /** + * The last offset committed. This represents the high + * watermark of the partition. + */ + private long lastCommittedOffset; + + SnapshottableCoordinator( + LogContext logContext, + SnapshotRegistry snapshotRegistry, + S coordinator, + TopicPartition tp + ) { + this.log = logContext.logger(SnapshottableCoordinator.class); + this.coordinator = coordinator; + this.snapshotRegistry = snapshotRegistry; + this.tp = tp; + this.lastWrittenOffset = 0; + this.lastCommittedOffset = 0; + snapshotRegistry.getOrCreateSnapshot(0); + } + + /** + * Reverts the last written offset. This also reverts the snapshot + * registry to this offset. All the changes applied after the offset + * are lost. + * + * @param offset The offset to revert to. + */ + synchronized void revertLastWrittenOffset( + long offset + ) { + if (offset > lastWrittenOffset) { + throw new IllegalStateException("New offset " + offset + " of " + tp + + " must be smaller than " + lastWrittenOffset + "."); + } + + log.debug("Revert last written offset of {} to {}.", tp, offset); + lastWrittenOffset = offset; + snapshotRegistry.revertToSnapshot(offset); + } + + /** + * Replays the record onto the state machine. + * + * @param producerId The producer id. + * @param producerEpoch The producer epoch. + * @param record A record. + */ + @Override + public synchronized void replay( + long producerId, + short producerEpoch, + U record + ) { + coordinator.replay(producerId, producerEpoch, record); + } + + /** + * Updates the last written offset. This also create a new snapshot + * in the snapshot registry. + * + * @param offset The new last written offset. + */ + @Override + public synchronized void updateLastWrittenOffset(Long offset) { + if (offset <= lastWrittenOffset) { + throw new IllegalStateException("New last written offset " + offset + " of " + tp + + " must be greater than " + lastWrittenOffset + "."); + } + + lastWrittenOffset = offset; + snapshotRegistry.getOrCreateSnapshot(offset); + log.debug("Updated last written offset of {} to {}.", tp, offset); + } + + /** + * Updates the last committed offset. This completes all the deferred + * events waiting on this offset. This also cleanups all the snapshots + * prior to this offset. + * + * @param offset The new last committed offset. + */ + @Override + public synchronized void updateLastCommittedOffset(Long offset) { + if (offset < lastCommittedOffset) { + throw new IllegalStateException("New committed offset " + offset + " of " + tp + + " must be greater than or equal to " + lastCommittedOffset + "."); + } + + lastCommittedOffset = offset; + snapshotRegistry.deleteSnapshotsUpTo(offset); + log.debug("Updated committed offset of {} to {}.", tp, offset); + } + + /** + * The coordinator has been loaded. This is used to apply any + * post loading operations. + * + * @param newImage The metadata image. + */ + synchronized void onLoaded(MetadataImage newImage) { + this.coordinator.onLoaded(newImage); + } + + /** + * The coordinator has been unloaded. This is used to apply + * any post unloading operations. + */ + synchronized void onUnloaded() { + if (this.coordinator != null) { + this.coordinator.onUnloaded(); + } + } + + /** + * @return The last written offset. + */ + synchronized long lastWrittenOffset() { + return this.lastWrittenOffset; + } + + /** + * A new metadata image is available. This is only called after {@link SnapshottableCoordinator#onLoaded(MetadataImage)} + * is called to signal that the coordinator has been fully loaded. + * + * @param newImage The new metadata image. + * @param delta The delta image. + */ + synchronized void onNewMetadataImage(MetadataImage newImage, MetadataDelta delta) { + this.coordinator.onNewMetadataImage(newImage, delta); + } + + /** + * @return The last committed offset. + */ + synchronized long lastCommittedOffset() { + return this.lastCommittedOffset; + } + + /** + * @return The coordinator. + */ + synchronized S coordinator() { + return this.coordinator; + } + + /** + * @return The snapshot registry. + * + * Only used for testing. + */ + synchronized SnapshotRegistry snapshotRegistry() { + return this.snapshotRegistry; + } +} diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java index 2c589a7feba..cba97e6edf6 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java @@ -31,6 +31,7 @@ import org.apache.kafka.server.util.timer.MockTimer; import org.apache.kafka.timeline.SnapshotRegistry; import org.apache.kafka.timeline.TimelineHashSet; import org.junit.jupiter.api.Test; +import org.mockito.ArgumentMatcher; import java.util.Arrays; import java.util.Collections; @@ -59,6 +60,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; @@ -127,17 +129,30 @@ public class CoordinatorRuntimeTest { */ private static class MockCoordinatorLoader implements CoordinatorLoader { private final LoadSummary summary; + private final List lastWrittenOffsets; + private final List lastCommittedOffsets; - public MockCoordinatorLoader(LoadSummary summary) { + public MockCoordinatorLoader( + LoadSummary summary, + List lastWrittenOffsets, + List lastCommittedOffsets + ) { this.summary = summary; + this.lastWrittenOffsets = lastWrittenOffsets; + this.lastCommittedOffsets = lastCommittedOffsets; } public MockCoordinatorLoader() { - this(null); + this(null, Collections.emptyList(), Collections.emptyList()); } @Override - public CompletableFuture load(TopicPartition tp, CoordinatorPlayback replayable) { + public CompletableFuture load( + TopicPartition tp, + CoordinatorPlayback replayable + ) { + lastWrittenOffsets.forEach(replayable::updateLastWrittenOffset); + lastCommittedOffsets.forEach(replayable::updateLastCommittedOffset); return CompletableFuture.completedFuture(summary); } @@ -194,7 +209,7 @@ public class CoordinatorRuntimeTest { /** * A simple Coordinator implementation that stores the records into a set. */ - private static class MockCoordinatorShard implements CoordinatorShard { + static class MockCoordinatorShard implements CoordinatorShard { private final TimelineHashSet records; private final CoordinatorTimer timer; @@ -322,7 +337,7 @@ public class CoordinatorRuntimeTest { when(builder.build()).thenReturn(coordinator); when(supplier.get()).thenReturn(builder); CompletableFuture future = new CompletableFuture<>(); - when(loader.load(TP, coordinator)).thenReturn(future); + when(loader.load(eq(TP), argThat(coordinatorMatcher(runtime, TP)))).thenReturn(future); // Getting the coordinator context fails because the coordinator // does not exist until scheduleLoadOperation is called. @@ -333,11 +348,12 @@ public class CoordinatorRuntimeTest { // Getting the coordinator context succeeds now. CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); + assertEquals(coordinator, ctx.coordinator.coordinator()); // The coordinator is loading. assertEquals(LOADING, ctx.state); assertEquals(0, ctx.epoch); - assertEquals(coordinator, ctx.coordinator); + assertEquals(coordinator, ctx.coordinator.coordinator()); // When the loading completes, the coordinator transitions to active. future.complete(null); @@ -353,7 +369,7 @@ public class CoordinatorRuntimeTest { ); // Verify that the builder got all the expected objects. - verify(builder, times(1)).withSnapshotRegistry(eq(ctx.snapshotRegistry)); + verify(builder, times(1)).withSnapshotRegistry(eq(ctx.coordinator.snapshotRegistry())); verify(builder, times(1)).withLogContext(eq(ctx.logContext)); verify(builder, times(1)).withTime(eq(timer.time())); verify(builder, times(1)).withTimer(eq(ctx.timer)); @@ -389,7 +405,7 @@ public class CoordinatorRuntimeTest { when(builder.build()).thenReturn(coordinator); when(supplier.get()).thenReturn(builder); CompletableFuture future = new CompletableFuture<>(); - when(loader.load(TP, coordinator)).thenReturn(future); + when(loader.load(eq(TP), argThat(coordinatorMatcher(runtime, TP)))).thenReturn(future); // Schedule the loading. runtime.scheduleLoadOperation(TP, 0); @@ -398,7 +414,7 @@ public class CoordinatorRuntimeTest { CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); assertEquals(LOADING, ctx.state); assertEquals(0, ctx.epoch); - assertEquals(coordinator, ctx.coordinator); + assertEquals(coordinator, ctx.coordinator.coordinator()); // When the loading fails, the coordinator transitions to failed. future.completeExceptionally(new Exception("failure")); @@ -443,7 +459,7 @@ public class CoordinatorRuntimeTest { when(builder.build()).thenReturn(coordinator); when(supplier.get()).thenReturn(builder); CompletableFuture future = new CompletableFuture<>(); - when(loader.load(TP, coordinator)).thenReturn(future); + when(loader.load(eq(TP), argThat(coordinatorMatcher(runtime, TP)))).thenReturn(future); // Schedule the loading. runtime.scheduleLoadOperation(TP, 10); @@ -452,7 +468,7 @@ public class CoordinatorRuntimeTest { CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); assertEquals(LOADING, ctx.state); assertEquals(10, ctx.epoch); - assertEquals(coordinator, ctx.coordinator); + assertEquals(coordinator, ctx.coordinator.coordinator()); // When the loading completes, the coordinator transitions to active. future.complete(null); @@ -495,7 +511,7 @@ public class CoordinatorRuntimeTest { when(builder.build()).thenReturn(coordinator); when(supplier.get()).thenReturn(builder); CompletableFuture future = new CompletableFuture<>(); - when(loader.load(TP, coordinator)).thenReturn(future); + when(loader.load(eq(TP), argThat(coordinatorMatcher(runtime, TP)))).thenReturn(future); // Schedule the loading. runtime.scheduleLoadOperation(TP, 10); @@ -504,7 +520,7 @@ public class CoordinatorRuntimeTest { CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); assertEquals(LOADING, ctx.state); assertEquals(10, ctx.epoch); - assertEquals(coordinator, ctx.coordinator); + assertEquals(coordinator, ctx.coordinator.coordinator()); // When the loading fails, the coordinator transitions to failed. future.completeExceptionally(new Exception("failure")); @@ -519,14 +535,14 @@ public class CoordinatorRuntimeTest { // Schedule the reloading. future = new CompletableFuture<>(); - when(loader.load(TP, coordinator)).thenReturn(future); + when(loader.load(eq(TP), argThat(coordinatorMatcher(runtime, TP)))).thenReturn(future); runtime.scheduleLoadOperation(TP, 11); // Getting the context succeeds and the coordinator should be in loading. ctx = runtime.contextOrThrow(TP); assertEquals(LOADING, ctx.state); assertEquals(11, ctx.epoch); - assertEquals(coordinator, ctx.coordinator); + assertEquals(coordinator, ctx.coordinator.coordinator()); // Complete the loading. future.complete(null); @@ -692,9 +708,9 @@ public class CoordinatorRuntimeTest { // Verify the initial state. CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); - assertEquals(0L, ctx.lastWrittenOffset); - assertEquals(0L, ctx.lastCommittedOffset); - assertEquals(Collections.singletonList(0L), ctx.snapshotRegistry.epochsList()); + assertEquals(0L, ctx.coordinator.lastWrittenOffset()); + assertEquals(0L, ctx.coordinator.lastCommittedOffset()); + assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList()); // Write #1. CompletableFuture write1 = runtime.scheduleWriteOperation("write#1", TP, @@ -704,13 +720,13 @@ public class CoordinatorRuntimeTest { assertFalse(write1.isDone()); // The last written offset is updated. - assertEquals(2L, ctx.lastWrittenOffset); + assertEquals(2L, ctx.coordinator.lastWrittenOffset()); // The last committed offset does not change. - assertEquals(0L, ctx.lastCommittedOffset); + assertEquals(0L, ctx.coordinator.lastCommittedOffset()); // A new snapshot is created. - assertEquals(Arrays.asList(0L, 2L), ctx.snapshotRegistry.epochsList()); + assertEquals(Arrays.asList(0L, 2L), ctx.coordinator.snapshotRegistry().epochsList()); // Records have been replayed to the coordinator. - assertEquals(mkSet("record1", "record2"), ctx.coordinator.records()); + assertEquals(mkSet("record1", "record2"), ctx.coordinator.coordinator().records()); // Records have been written to the log. assertEquals(Arrays.asList("record1", "record2"), writer.records(TP)); @@ -722,13 +738,13 @@ public class CoordinatorRuntimeTest { assertFalse(write2.isDone()); // The last written offset is updated. - assertEquals(3L, ctx.lastWrittenOffset); + assertEquals(3L, ctx.coordinator.lastWrittenOffset()); // The last committed offset does not change. - assertEquals(0L, ctx.lastCommittedOffset); + assertEquals(0L, ctx.coordinator.lastCommittedOffset()); // A new snapshot is created. - assertEquals(Arrays.asList(0L, 2L, 3L), ctx.snapshotRegistry.epochsList()); + assertEquals(Arrays.asList(0L, 2L, 3L), ctx.coordinator.snapshotRegistry().epochsList()); // Records have been replayed to the coordinator. - assertEquals(mkSet("record1", "record2", "record3"), ctx.coordinator.records()); + assertEquals(mkSet("record1", "record2", "record3"), ctx.coordinator.coordinator().records()); // Records have been written to the log. assertEquals(Arrays.asList("record1", "record2", "record3"), writer.records(TP)); @@ -740,10 +756,10 @@ public class CoordinatorRuntimeTest { assertFalse(write3.isDone()); // The state does not change. - assertEquals(3L, ctx.lastWrittenOffset); - assertEquals(0L, ctx.lastCommittedOffset); - assertEquals(Arrays.asList(0L, 2L, 3L), ctx.snapshotRegistry.epochsList()); - assertEquals(mkSet("record1", "record2", "record3"), ctx.coordinator.records()); + assertEquals(3L, ctx.coordinator.lastWrittenOffset()); + assertEquals(0L, ctx.coordinator.lastCommittedOffset()); + assertEquals(Arrays.asList(0L, 2L, 3L), ctx.coordinator.snapshotRegistry().epochsList()); + assertEquals(mkSet("record1", "record2", "record3"), ctx.coordinator.coordinator().records()); assertEquals(Arrays.asList("record1", "record2", "record3"), writer.records(TP)); // Commit write #1. @@ -754,9 +770,9 @@ public class CoordinatorRuntimeTest { assertEquals("response1", write1.get(5, TimeUnit.SECONDS)); // The last committed offset is updated. - assertEquals(2L, ctx.lastCommittedOffset); + assertEquals(2L, ctx.coordinator.lastCommittedOffset()); // The snapshot is cleaned up. - assertEquals(Arrays.asList(2L, 3L), ctx.snapshotRegistry.epochsList()); + assertEquals(Arrays.asList(2L, 3L), ctx.coordinator.snapshotRegistry().epochsList()); // Commit write #2. writer.commit(TP, 3); @@ -768,9 +784,9 @@ public class CoordinatorRuntimeTest { assertEquals("response3", write3.get(5, TimeUnit.SECONDS)); // The last committed offset is updated. - assertEquals(3L, ctx.lastCommittedOffset); + assertEquals(3L, ctx.coordinator.lastCommittedOffset()); // The snapshot is cleaned up. - assertEquals(Collections.singletonList(3L), ctx.snapshotRegistry.epochsList()); + assertEquals(Collections.singletonList(3L), ctx.coordinator.snapshotRegistry().epochsList()); // Write #4 but without records. CompletableFuture write4 = runtime.scheduleWriteOperation("write#4", TP, @@ -779,7 +795,7 @@ public class CoordinatorRuntimeTest { // It is completed immediately because the state is fully committed. assertTrue(write4.isDone()); assertEquals("response4", write4.get(5, TimeUnit.SECONDS)); - assertEquals(Collections.singletonList(3L), ctx.snapshotRegistry.epochsList()); + assertEquals(Collections.singletonList(3L), ctx.coordinator.snapshotRegistry().epochsList()); } @Test @@ -850,22 +866,28 @@ public class CoordinatorRuntimeTest { // Verify the initial state. CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); - assertEquals(0L, ctx.lastWrittenOffset); - assertEquals(0L, ctx.lastCommittedOffset); - assertEquals(Collections.singletonList(0L), ctx.snapshotRegistry.epochsList()); + assertEquals(0L, ctx.coordinator.lastWrittenOffset()); + assertEquals(0L, ctx.coordinator.lastCommittedOffset()); + assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList()); // Override the coordinator with a coordinator that throws // an exception when replay is called. - ctx.coordinator = new MockCoordinatorShard(ctx.snapshotRegistry, ctx.timer) { - @Override - public void replay( - long producerId, - short producerEpoch, - String record - ) throws RuntimeException { - throw new IllegalArgumentException("error"); - } - }; + SnapshotRegistry snapshotRegistry = ctx.coordinator.snapshotRegistry(); + ctx.coordinator = new SnapshottableCoordinator<>( + new LogContext(), + snapshotRegistry, + new MockCoordinatorShard(snapshotRegistry, ctx.timer) { + @Override + public void replay( + long producerId, + short producerEpoch, + String record + ) throws RuntimeException { + throw new IllegalArgumentException("error"); + } + }, + TP + ); // Write. It should fail. CompletableFuture write = runtime.scheduleWriteOperation("write", TP, @@ -873,9 +895,9 @@ public class CoordinatorRuntimeTest { assertFutureThrows(write, IllegalArgumentException.class); // Verify that the state has not changed. - assertEquals(0L, ctx.lastWrittenOffset); - assertEquals(0L, ctx.lastCommittedOffset); - assertEquals(Collections.singletonList(0L), ctx.snapshotRegistry.epochsList()); + assertEquals(0L, ctx.coordinator.lastWrittenOffset()); + assertEquals(0L, ctx.coordinator.lastCommittedOffset()); + assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList()); } @Test @@ -901,19 +923,19 @@ public class CoordinatorRuntimeTest { // Verify the initial state. CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); - assertEquals(0, ctx.lastWrittenOffset); - assertEquals(0, ctx.lastCommittedOffset); - assertEquals(Collections.singletonList(0L), ctx.snapshotRegistry.epochsList()); + assertEquals(0, ctx.coordinator.lastWrittenOffset()); + assertEquals(0, ctx.coordinator.lastCommittedOffset()); + assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList()); // Write #1. It should succeed and be applied to the coordinator. runtime.scheduleWriteOperation("write#1", TP, state -> new CoordinatorResult<>(Arrays.asList("record1", "record2"), "response1")); // Verify that the state has been updated. - assertEquals(2L, ctx.lastWrittenOffset); - assertEquals(0L, ctx.lastCommittedOffset); - assertEquals(Arrays.asList(0L, 2L), ctx.snapshotRegistry.epochsList()); - assertEquals(mkSet("record1", "record2"), ctx.coordinator.records()); + assertEquals(2L, ctx.coordinator.lastWrittenOffset()); + assertEquals(0L, ctx.coordinator.lastCommittedOffset()); + assertEquals(Arrays.asList(0L, 2L), ctx.coordinator.snapshotRegistry().epochsList()); + assertEquals(mkSet("record1", "record2"), ctx.coordinator.coordinator().records()); // Write #2. It should fail because the writer is configured to only // accept 2 records per batch. @@ -922,10 +944,10 @@ public class CoordinatorRuntimeTest { assertFutureThrows(write2, KafkaException.class); // Verify that the state has not changed. - assertEquals(2L, ctx.lastWrittenOffset); - assertEquals(0L, ctx.lastCommittedOffset); - assertEquals(Arrays.asList(0L, 2L), ctx.snapshotRegistry.epochsList()); - assertEquals(mkSet("record1", "record2"), ctx.coordinator.records()); + assertEquals(2L, ctx.coordinator.lastWrittenOffset()); + assertEquals(0L, ctx.coordinator.lastCommittedOffset()); + assertEquals(Arrays.asList(0L, 2L), ctx.coordinator.snapshotRegistry().epochsList()); + assertEquals(mkSet("record1", "record2"), ctx.coordinator.coordinator().records()); } @Test @@ -1019,8 +1041,8 @@ public class CoordinatorRuntimeTest { // Verify the initial state. CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); - assertEquals(0, ctx.lastWrittenOffset); - assertEquals(0, ctx.lastCommittedOffset); + assertEquals(0, ctx.coordinator.lastWrittenOffset()); + assertEquals(0, ctx.coordinator.lastCommittedOffset()); // Write #1. CompletableFuture write1 = runtime.scheduleWriteOperation("write#1", TP, @@ -1040,13 +1062,13 @@ public class CoordinatorRuntimeTest { assertFalse(write2.isDone()); // The last written and committed offsets are updated. - assertEquals(4, ctx.lastWrittenOffset); - assertEquals(2, ctx.lastCommittedOffset); + assertEquals(4, ctx.coordinator.lastWrittenOffset()); + assertEquals(2, ctx.coordinator.lastCommittedOffset()); // Read. CompletableFuture read = runtime.scheduleReadOperation("read", TP, (state, offset) -> { // The read operation should be given the last committed offset. - assertEquals(ctx.lastCommittedOffset, offset); + assertEquals(ctx.coordinator.lastCommittedOffset(), offset); return "read-response"; }); @@ -1098,8 +1120,8 @@ public class CoordinatorRuntimeTest { // Verify the initial state. CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); - assertEquals(0, ctx.lastWrittenOffset); - assertEquals(0, ctx.lastCommittedOffset); + assertEquals(0, ctx.coordinator.lastWrittenOffset()); + assertEquals(0, ctx.coordinator.lastCommittedOffset()); // Write #1. runtime.scheduleWriteOperation("write#1", TP, @@ -1114,7 +1136,7 @@ public class CoordinatorRuntimeTest { // Read. It fails with an exception that is used to complete the future. CompletableFuture read = runtime.scheduleReadOperation("read", TP, (state, offset) -> { - assertEquals(ctx.lastCommittedOffset, offset); + assertEquals(ctx.coordinator.lastCommittedOffset(), offset); throw new IllegalArgumentException("error"); }); assertFutureThrows(read, IllegalArgumentException.class); @@ -1141,8 +1163,8 @@ public class CoordinatorRuntimeTest { // Check initial state. CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); - assertEquals(0, ctx.lastWrittenOffset); - assertEquals(0, ctx.lastCommittedOffset); + assertEquals(0, ctx.coordinator.lastWrittenOffset()); + assertEquals(0, ctx.coordinator.lastCommittedOffset()); // Write #1. CompletableFuture write1 = runtime.scheduleWriteOperation("write#1", TP, @@ -1160,7 +1182,7 @@ public class CoordinatorRuntimeTest { assertEquals(0, ctx.timer.size()); // Timer #1. This is never executed. - ctx.coordinator.timer.schedule("timer-1", 10, TimeUnit.SECONDS, true, + ctx.timer.schedule("timer-1", 10, TimeUnit.SECONDS, true, () -> new CoordinatorResult<>(Arrays.asList("record5", "record6"), null)); // The coordinator timer should have one pending task. @@ -1219,14 +1241,17 @@ public class CoordinatorRuntimeTest { .thenReturn(coordinator1); CompletableFuture future0 = new CompletableFuture<>(); - when(loader.load(tp0, coordinator0)).thenReturn(future0); + when(loader.load(eq(tp0), argThat(coordinatorMatcher(runtime, tp0)))).thenReturn(future0); CompletableFuture future1 = new CompletableFuture<>(); - when(loader.load(tp1, coordinator1)).thenReturn(future1); + when(loader.load(eq(tp1), argThat(coordinatorMatcher(runtime, tp1)))).thenReturn(future1); runtime.scheduleLoadOperation(tp0, 0); runtime.scheduleLoadOperation(tp1, 0); + assertEquals(coordinator0, runtime.contextOrThrow(tp0).coordinator.coordinator()); + assertEquals(coordinator1, runtime.contextOrThrow(tp1).coordinator.coordinator()); + // Coordinator 0 is loaded. It should get the current image // that is the empty one. future0.complete(null); @@ -1266,18 +1291,18 @@ public class CoordinatorRuntimeTest { // Check initial state. CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); - assertEquals(0, ctx.lastWrittenOffset); - assertEquals(0, ctx.lastCommittedOffset); + assertEquals(0, ctx.coordinator.lastWrittenOffset()); + assertEquals(0, ctx.coordinator.lastCommittedOffset()); // The coordinator timer should be empty. assertEquals(0, ctx.timer.size()); // Timer #1. - ctx.coordinator.timer.schedule("timer-1", 10, TimeUnit.MILLISECONDS, true, + ctx.timer.schedule("timer-1", 10, TimeUnit.MILLISECONDS, true, () -> new CoordinatorResult<>(Arrays.asList("record1", "record2"), null)); // Timer #2. - ctx.coordinator.timer.schedule("timer-2", 20, TimeUnit.MILLISECONDS, true, + ctx.timer.schedule("timer-2", 20, TimeUnit.MILLISECONDS, true, () -> new CoordinatorResult<>(Arrays.asList("record3", "record4"), null)); // The coordinator timer should have two pending tasks. @@ -1287,14 +1312,14 @@ public class CoordinatorRuntimeTest { timer.advanceClock(10 + 1); // Verify that the operation was executed. - assertEquals(mkSet("record1", "record2"), ctx.coordinator.records()); + assertEquals(mkSet("record1", "record2"), ctx.coordinator.coordinator().records()); assertEquals(1, ctx.timer.size()); // Advance time to fire timer #2, timer.advanceClock(10 + 1); // Verify that the operation was executed. - assertEquals(mkSet("record1", "record2", "record3", "record4"), ctx.coordinator.records()); + assertEquals(mkSet("record1", "record2", "record3", "record4"), ctx.coordinator.coordinator().records()); assertEquals(0, ctx.timer.size()); } @@ -1329,7 +1354,7 @@ public class CoordinatorRuntimeTest { assertEquals(0, processor.size()); // Timer #1. - ctx.coordinator.timer.schedule("timer-1", 10, TimeUnit.MILLISECONDS, true, + ctx.timer.schedule("timer-1", 10, TimeUnit.MILLISECONDS, true, () -> new CoordinatorResult<>(Collections.singletonList("record1"), null)); // The coordinator timer should have one pending task. @@ -1342,14 +1367,14 @@ public class CoordinatorRuntimeTest { assertEquals(1, processor.size()); // Schedule a second timer with the same key. - ctx.coordinator.timer.schedule("timer-1", 10, TimeUnit.MILLISECONDS, true, + ctx.timer.schedule("timer-1", 10, TimeUnit.MILLISECONDS, true, () -> new CoordinatorResult<>(Collections.singletonList("record2"), null)); // The coordinator timer should still have one pending task. assertEquals(1, ctx.timer.size()); // Schedule a third timer with the same key. - ctx.coordinator.timer.schedule("timer-1", 10, TimeUnit.MILLISECONDS, true, + ctx.timer.schedule("timer-1", 10, TimeUnit.MILLISECONDS, true, () -> new CoordinatorResult<>(Collections.singletonList("record3"), null)); // The coordinator timer should still have one pending task. @@ -1367,7 +1392,7 @@ public class CoordinatorRuntimeTest { // Verify that the correct operation was executed. Only the third // instance should have been executed here. - assertEquals(mkSet("record3"), ctx.coordinator.records()); + assertEquals(mkSet("record3"), ctx.coordinator.coordinator().records()); assertEquals(0, ctx.timer.size()); } @@ -1402,7 +1427,7 @@ public class CoordinatorRuntimeTest { assertEquals(0, processor.size()); // Timer #1. - ctx.coordinator.timer.schedule("timer-1", 10, TimeUnit.MILLISECONDS, true, + ctx.timer.schedule("timer-1", 10, TimeUnit.MILLISECONDS, true, () -> new CoordinatorResult<>(Collections.singletonList("record1"), null)); // The coordinator timer should have one pending task. @@ -1415,14 +1440,14 @@ public class CoordinatorRuntimeTest { assertEquals(1, processor.size()); // Schedule a second timer with the same key. - ctx.coordinator.timer.schedule("timer-1", 10, TimeUnit.MILLISECONDS, true, + ctx.timer.schedule("timer-1", 10, TimeUnit.MILLISECONDS, true, () -> new CoordinatorResult<>(Collections.singletonList("record2"), null)); // The coordinator timer should still have one pending task. assertEquals(1, ctx.timer.size()); // Cancel the timer. - ctx.coordinator.timer.cancel("timer-1"); + ctx.timer.cancel("timer-1"); // The coordinator timer have no pending timers. assertEquals(0, ctx.timer.size()); @@ -1438,7 +1463,7 @@ public class CoordinatorRuntimeTest { assertTrue(processor.poll()); // Verify that no operation was executed. - assertEquals(Collections.emptySet(), ctx.coordinator.records()); + assertEquals(Collections.emptySet(), ctx.coordinator.coordinator().records()); assertEquals(0, ctx.timer.size()); } @@ -1466,7 +1491,7 @@ public class CoordinatorRuntimeTest { // Timer #1. AtomicInteger cnt = new AtomicInteger(0); - ctx.coordinator.timer.schedule("timer-1", 10, TimeUnit.MILLISECONDS, true, () -> { + ctx.timer.schedule("timer-1", 10, TimeUnit.MILLISECONDS, true, () -> { cnt.incrementAndGet(); throw new KafkaException("error"); }); @@ -1496,7 +1521,7 @@ public class CoordinatorRuntimeTest { assertEquals(1, ctx.timer.size()); // Cancel Timer #1. - ctx.coordinator.timer.cancel("timer-1"); + ctx.timer.cancel("timer-1"); assertEquals(0, ctx.timer.size()); } @@ -1524,7 +1549,7 @@ public class CoordinatorRuntimeTest { // Timer #1. AtomicInteger cnt = new AtomicInteger(0); - ctx.coordinator.timer.schedule("timer-1", 10, TimeUnit.MILLISECONDS, false, () -> { + ctx.timer.schedule("timer-1", 10, TimeUnit.MILLISECONDS, false, () -> { cnt.incrementAndGet(); throw new KafkaException("error"); }); @@ -1572,7 +1597,7 @@ public class CoordinatorRuntimeTest { when(builder.build()).thenReturn(coordinator); when(supplier.get()).thenReturn(builder); CompletableFuture future = new CompletableFuture<>(); - when(loader.load(TP, coordinator)).thenReturn(future); + when(loader.load(eq(TP), argThat(coordinatorMatcher(runtime, TP)))).thenReturn(future); // Schedule the loading. runtime.scheduleLoadOperation(TP, 0); @@ -1590,11 +1615,12 @@ public class CoordinatorRuntimeTest { // Start loading a new topic partition. TopicPartition tp = new TopicPartition("__consumer_offsets", 1); future = new CompletableFuture<>(); - when(loader.load(tp, coordinator)).thenReturn(future); + when(loader.load(eq(tp), argThat(coordinatorMatcher(runtime, tp)))).thenReturn(future); // Schedule the loading. runtime.scheduleLoadOperation(tp, 0); // Getting the context succeeds and the coordinator should be in loading. ctx = runtime.contextOrThrow(tp); + assertEquals(coordinator, ctx.coordinator.coordinator()); assertEquals(LOADING, ctx.state); verify(runtimeMetrics, times(2)).recordPartitionStateChange(INITIAL, LOADING); @@ -1627,7 +1653,9 @@ public class CoordinatorRuntimeTest { startTimeMs, startTimeMs + 1000, 30, - 3000))) + 3000), + Collections.emptyList(), + Collections.emptyList())) .withEventProcessor(new DirectEventProcessor()) .withPartitionWriter(writer) .withCoordinatorShardBuilderSupplier(supplier) @@ -1659,4 +1687,116 @@ public class CoordinatorRuntimeTest { verify(runtimeMetrics, times(1)).recordPartitionLoadSensor(startTimeMs, startTimeMs + 1000); } + + @Test + public void testPartitionLoadGeneratesSnapshotAtHighWatermark() { + MockTimer timer = new MockTimer(); + MockPartitionWriter writer = mock(MockPartitionWriter.class); + MockCoordinatorShardBuilderSupplier supplier = mock(MockCoordinatorShardBuilderSupplier.class); + MockCoordinatorShardBuilder builder = mock(MockCoordinatorShardBuilder.class); + MockCoordinatorShard coordinator = mock(MockCoordinatorShard.class); + GroupCoordinatorRuntimeMetrics runtimeMetrics = mock(GroupCoordinatorRuntimeMetrics.class); + + CoordinatorRuntime runtime = + new CoordinatorRuntime.Builder() + .withTime(Time.SYSTEM) + .withTimer(timer) + .withLoader(new MockCoordinatorLoader( + new CoordinatorLoader.LoadSummary( + 1000, + 2000, + 30, + 3000), + Arrays.asList(5L, 15L, 27L), + Arrays.asList(5L, 15L))) + .withEventProcessor(new DirectEventProcessor()) + .withPartitionWriter(writer) + .withCoordinatorShardBuilderSupplier(supplier) + .withCoordinatorRuntimeMetrics(runtimeMetrics) + .withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class)) + .build(); + + when(builder.withSnapshotRegistry(any())).thenReturn(builder); + when(builder.withLogContext(any())).thenReturn(builder); + when(builder.withTime(any())).thenReturn(builder); + when(builder.withTimer(any())).thenReturn(builder); + when(builder.withCoordinatorMetrics(any())).thenReturn(builder); + when(builder.withTopicPartition(any())).thenReturn(builder); + when(builder.build()).thenReturn(coordinator); + when(supplier.get()).thenReturn(builder); + + // Schedule the loading. + runtime.scheduleLoadOperation(TP, 0); + + // Getting the coordinator context succeeds now. + CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); + + // When the loading completes, the coordinator transitions to active. + assertEquals(ACTIVE, ctx.state); + + assertEquals(27L, ctx.coordinator.lastWrittenOffset()); + assertEquals(15L, ctx.coordinator.lastCommittedOffset()); + assertFalse(ctx.coordinator.snapshotRegistry().hasSnapshot(0L)); + assertFalse(ctx.coordinator.snapshotRegistry().hasSnapshot(5L)); + assertTrue(ctx.coordinator.snapshotRegistry().hasSnapshot(15L)); + assertTrue(ctx.coordinator.snapshotRegistry().hasSnapshot(27L)); + } + + @Test + public void testPartitionLoadGeneratesSnapshotAtHighWatermarkNoRecordsLoaded() { + MockTimer timer = new MockTimer(); + MockPartitionWriter writer = mock(MockPartitionWriter.class); + MockCoordinatorShardBuilderSupplier supplier = mock(MockCoordinatorShardBuilderSupplier.class); + MockCoordinatorShardBuilder builder = mock(MockCoordinatorShardBuilder.class); + MockCoordinatorShard coordinator = mock(MockCoordinatorShard.class); + GroupCoordinatorRuntimeMetrics runtimeMetrics = mock(GroupCoordinatorRuntimeMetrics.class); + + CoordinatorRuntime runtime = + new CoordinatorRuntime.Builder() + .withTime(Time.SYSTEM) + .withTimer(timer) + .withLoader(new MockCoordinatorLoader( + new CoordinatorLoader.LoadSummary( + 1000, + 2000, + 30, + 3000), + Collections.emptyList(), + Collections.emptyList())) + .withEventProcessor(new DirectEventProcessor()) + .withPartitionWriter(writer) + .withCoordinatorShardBuilderSupplier(supplier) + .withCoordinatorRuntimeMetrics(runtimeMetrics) + .withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class)) + .build(); + + when(builder.withSnapshotRegistry(any())).thenReturn(builder); + when(builder.withLogContext(any())).thenReturn(builder); + when(builder.withTime(any())).thenReturn(builder); + when(builder.withTimer(any())).thenReturn(builder); + when(builder.withCoordinatorMetrics(any())).thenReturn(builder); + when(builder.withTopicPartition(any())).thenReturn(builder); + when(builder.build()).thenReturn(coordinator); + when(supplier.get()).thenReturn(builder); + + // Schedule the loading. + runtime.scheduleLoadOperation(TP, 0); + + // Getting the coordinator context succeeds now. + CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); + + // When the loading completes, the coordinator transitions to active. + assertEquals(ACTIVE, ctx.state); + + assertEquals(0L, ctx.coordinator.lastWrittenOffset()); + assertEquals(0L, ctx.coordinator.lastCommittedOffset()); + assertTrue(ctx.coordinator.snapshotRegistry().hasSnapshot(0L)); + } + + private static , U> ArgumentMatcher> coordinatorMatcher( + CoordinatorRuntime runtime, + TopicPartition tp + ) { + return c -> c.equals(runtime.contextOrThrow(tp).coordinator); + } } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/SnapshottableCoordinatorTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/SnapshottableCoordinatorTest.java new file mode 100644 index 00000000000..220442c057d --- /dev/null +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/SnapshottableCoordinatorTest.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.runtime; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.coordinator.group.MockCoordinatorTimer; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.junit.jupiter.api.Test; + +import static org.apache.kafka.coordinator.group.runtime.CoordinatorRuntimeTest.MockCoordinatorShard; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class SnapshottableCoordinatorTest { + + @Test + public void testUpdateLastWrittenOffset() { + LogContext logContext = new LogContext(); + SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext); + SnapshottableCoordinator coordinator = new SnapshottableCoordinator<>( + logContext, + snapshotRegistry, + new MockCoordinatorShard(snapshotRegistry, new MockCoordinatorTimer<>(new MockTime())), + new TopicPartition("test-topic", 0) + ); + + assertTrue(coordinator.snapshotRegistry().hasSnapshot(0L)); + coordinator.updateLastWrittenOffset(100L); + assertEquals(100L, coordinator.lastWrittenOffset()); + assertTrue(coordinator.snapshotRegistry().hasSnapshot(100L)); + assertTrue(coordinator.snapshotRegistry().hasSnapshot(0L)); + } + + @Test + public void testUpdateLastWrittenOffsetFailed() { + LogContext logContext = new LogContext(); + SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext); + SnapshottableCoordinator coordinator = new SnapshottableCoordinator<>( + logContext, + snapshotRegistry, + new MockCoordinatorShard(snapshotRegistry, new MockCoordinatorTimer<>(new MockTime())), + new TopicPartition("test-topic", 0) + ); + + assertEquals(0L, coordinator.lastWrittenOffset()); + assertThrows(IllegalStateException.class, () -> coordinator.updateLastWrittenOffset(0L)); + assertEquals(0L, coordinator.lastWrittenOffset()); + } + + @Test + public void testRevertWrittenOffset() { + LogContext logContext = new LogContext(); + SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext); + SnapshottableCoordinator coordinator = new SnapshottableCoordinator<>( + logContext, + snapshotRegistry, + new MockCoordinatorShard(snapshotRegistry, new MockCoordinatorTimer<>(new MockTime())), + new TopicPartition("test-topic", 0) + ); + + coordinator.updateLastWrittenOffset(100L); + coordinator.updateLastWrittenOffset(200L); + assertTrue(coordinator.snapshotRegistry().hasSnapshot(0L)); + assertTrue(coordinator.snapshotRegistry().hasSnapshot(100L)); + assertTrue(coordinator.snapshotRegistry().hasSnapshot(200L)); + + coordinator.revertLastWrittenOffset(100L); + assertEquals(100L, coordinator.lastWrittenOffset()); + assertTrue(coordinator.snapshotRegistry().hasSnapshot(100L)); + assertFalse(coordinator.snapshotRegistry().hasSnapshot(200L)); + } + + @Test + public void testRevertLastWrittenOffsetFailed() { + LogContext logContext = new LogContext(); + SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext); + SnapshottableCoordinator coordinator = new SnapshottableCoordinator<>( + logContext, + snapshotRegistry, + new MockCoordinatorShard(snapshotRegistry, new MockCoordinatorTimer<>(new MockTime())), + new TopicPartition("test-topic", 0) + ); + + assertEquals(0, coordinator.lastWrittenOffset()); + assertThrows(IllegalStateException.class, () -> coordinator.revertLastWrittenOffset(1L)); + assertEquals(0, coordinator.lastWrittenOffset()); + } + + @Test + public void testUpdateLastCommittedOffset() { + LogContext logContext = new LogContext(); + SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext); + SnapshottableCoordinator coordinator = new SnapshottableCoordinator<>( + logContext, + snapshotRegistry, + new MockCoordinatorShard(snapshotRegistry, new MockCoordinatorTimer<>(new MockTime())), + new TopicPartition("test-topic", 0) + ); + + coordinator.updateLastWrittenOffset(100L); + assertTrue(coordinator.snapshotRegistry().hasSnapshot(0L)); + assertTrue(coordinator.snapshotRegistry().hasSnapshot(100L)); + + coordinator.updateLastCommittedOffset(100L); + assertEquals(100L, coordinator.lastCommittedOffset()); + assertFalse(coordinator.snapshotRegistry().hasSnapshot(0L)); + assertTrue(coordinator.snapshotRegistry().hasSnapshot(100L)); + } + + @Test + public void testUpdateLastCommittedOffsetFailed() { + LogContext logContext = new LogContext(); + SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext); + SnapshottableCoordinator coordinator = new SnapshottableCoordinator<>( + logContext, + snapshotRegistry, + new MockCoordinatorShard(snapshotRegistry, new MockCoordinatorTimer<>(new MockTime())), + new TopicPartition("test-topic", 0) + ); + + coordinator.updateLastWrittenOffset(100L); + coordinator.updateLastCommittedOffset(100L); + assertEquals(100L, coordinator.lastCommittedOffset()); + assertThrows(IllegalStateException.class, () -> coordinator.updateLastCommittedOffset(99L)); + assertEquals(100L, coordinator.lastCommittedOffset()); + } +}