From b888fa1ec9d543ee798840509eda3326d6fa0a0d Mon Sep 17 00:00:00 2001 From: Jeff Kim Date: Wed, 6 Dec 2023 11:38:05 -0500 Subject: [PATCH] KAFKA-15910: New group coordinator needs to generate snapshots while loading (#14849) After the new coordinator loads a __consumer_offsets partition, it logs the following exception when making a read operation (fetch/list groups, etc): ``` java.lang.RuntimeException: No in-memory snapshot for epoch 740745. Snapshot epochs are: at org.apache.kafka.timeline.SnapshotRegistry.getSnapshot(SnapshotRegistry.java:178) at org.apache.kafka.timeline.SnapshottableHashTable.snapshottableIterator(SnapshottableHashTable.java:407) at org.apache.kafka.timeline.TimelineHashMap$ValueIterator.(TimelineHashMap.java:283) at org.apache.kafka.timeline.TimelineHashMap$Values.iterator(TimelineHashMap.java:271) ``` This happens because we don't have a snapshot at the last updated high watermark after loading. We cannot generate a snapshot at the high watermark after loading all batches because it may contain records that have not yet been committed. We also don't know where the high watermark will advance up to so we need to generate a snapshot for each offset the loader observes to be greater than the current high watermark. Then once we add the high watermark listener and update the high watermark we can delete all of the older snapshots. Reviewers: David Jacot --- .../group/CoordinatorLoaderImpl.scala | 13 + .../group/CoordinatorLoaderImplTest.scala | 108 +++++- .../group/runtime/CoordinatorPlayback.java | 14 + .../group/runtime/CoordinatorRuntime.java | 136 ++------ .../group/runtime/CoordinatorShard.java | 13 +- .../runtime/SnapshottableCoordinator.java | 211 +++++++++++ .../group/runtime/CoordinatorRuntimeTest.java | 330 +++++++++++++----- .../runtime/SnapshottableCoordinatorTest.java | 145 ++++++++ 8 files changed, 771 insertions(+), 199 deletions(-) create mode 100644 group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/SnapshottableCoordinator.java create mode 100644 group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/SnapshottableCoordinatorTest.java diff --git a/core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala b/core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala index a9a50b77583..27e555fad5a 100644 --- a/core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala +++ b/core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala @@ -96,6 +96,7 @@ class CoordinatorLoaderImpl[T]( // the log end offset but the log is empty. This could happen with compacted topics. var readAtLeastOneRecord = true + var previousHighWatermark = -1L var numRecords = 0 var numBytes = 0 while (currentOffset < logEndOffset && readAtLeastOneRecord && isRunning.get) { @@ -152,7 +153,19 @@ class CoordinatorLoaderImpl[T]( } } + // Note that the high watermark can be greater than the current offset but as we load more records + // the current offset will eventually surpass the high watermark. Also note that the high watermark + // will continue to advance while loading. currentOffset = batch.nextOffset + val currentHighWatermark = log.highWatermark + if (currentOffset >= currentHighWatermark) { + coordinator.updateLastWrittenOffset(currentOffset) + } + + if (currentHighWatermark > previousHighWatermark) { + coordinator.updateLastCommittedOffset(currentHighWatermark) + previousHighWatermark = currentHighWatermark + } } numBytes = numBytes + memoryRecords.sizeInBytes() } diff --git a/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorLoaderImplTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorLoaderImplTest.scala index 5bb4eb5b756..1475e715c08 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorLoaderImplTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorLoaderImplTest.scala @@ -29,8 +29,9 @@ import org.apache.kafka.storage.internals.log.{FetchDataInfo, FetchIsolation, Lo import org.apache.kafka.test.TestUtils.assertFutureThrows import org.junit.jupiter.api.Assertions.{assertEquals, assertNotNull} import org.junit.jupiter.api.{Test, Timeout} +import org.mockito.ArgumentMatchers.anyLong import org.mockito.{ArgumentCaptor, ArgumentMatchers} -import org.mockito.Mockito.{mock, verify, when} +import org.mockito.Mockito.{mock, times, verify, when} import org.mockito.invocation.InvocationOnMock import java.nio.ByteBuffer @@ -105,6 +106,7 @@ class CoordinatorLoaderImplTest { when(replicaManager.getLog(tp)).thenReturn(Some(log)) when(log.logStartOffset).thenReturn(0L) when(replicaManager.getLogEndOffset(tp)).thenReturn(Some(7L)) + when(log.highWatermark).thenReturn(0L) val readResult1 = logReadResult(startOffset = 0, records = Seq( new SimpleRecord("k1".getBytes, "v1".getBytes), @@ -152,6 +154,9 @@ class CoordinatorLoaderImplTest { verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k5", "v5")) verify(coordinator).replay(100L, 5.toShort, ("k6", "v6")) verify(coordinator).replay(100L, 5.toShort, ("k7", "v7")) + verify(coordinator).updateLastWrittenOffset(2) + verify(coordinator).updateLastWrittenOffset(5) + verify(coordinator).updateLastCommittedOffset(0) } } @@ -366,6 +371,107 @@ class CoordinatorLoaderImplTest { } } + @Test + def testUpdateLastWrittenOffsetOnBatchLoaded(): Unit = { + val tp = new TopicPartition("foo", 0) + val replicaManager = mock(classOf[ReplicaManager]) + val serde = new StringKeyValueDeserializer + val log = mock(classOf[UnifiedLog]) + val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]]) + + TestUtils.resource(new CoordinatorLoaderImpl[(String, String)]( + time = Time.SYSTEM, + replicaManager = replicaManager, + deserializer = serde, + loadBufferSize = 1000 + )) { loader => + when(replicaManager.getLog(tp)).thenReturn(Some(log)) + when(log.logStartOffset).thenReturn(0L) + when(log.highWatermark).thenReturn(0L).thenReturn(0L).thenReturn(2L) + when(replicaManager.getLogEndOffset(tp)).thenReturn(Some(7L)) + + val readResult1 = logReadResult(startOffset = 0, records = Seq( + new SimpleRecord("k1".getBytes, "v1".getBytes), + new SimpleRecord("k2".getBytes, "v2".getBytes) + )) + + when(log.read( + startOffset = 0L, + maxLength = 1000, + isolation = FetchIsolation.LOG_END, + minOneMessage = true + )).thenReturn(readResult1) + + val readResult2 = logReadResult(startOffset = 2, records = Seq( + new SimpleRecord("k3".getBytes, "v3".getBytes), + new SimpleRecord("k4".getBytes, "v4".getBytes), + new SimpleRecord("k5".getBytes, "v5".getBytes) + )) + + when(log.read( + startOffset = 2L, + maxLength = 1000, + isolation = FetchIsolation.LOG_END, + minOneMessage = true + )).thenReturn(readResult2) + + val readResult3 = logReadResult(startOffset = 5, records = Seq( + new SimpleRecord("k6".getBytes, "v6".getBytes), + new SimpleRecord("k7".getBytes, "v7".getBytes) + )) + + when(log.read( + startOffset = 5L, + maxLength = 1000, + isolation = FetchIsolation.LOG_END, + minOneMessage = true + )).thenReturn(readResult3) + + assertNotNull(loader.load(tp, coordinator).get(10, TimeUnit.SECONDS)) + + verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k1", "v1")) + verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k2", "v2")) + verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k3", "v3")) + verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k4", "v4")) + verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k5", "v5")) + verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k6", "v6")) + verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k7", "v7")) + verify(coordinator, times(0)).updateLastWrittenOffset(0) + verify(coordinator, times(1)).updateLastWrittenOffset(2) + verify(coordinator, times(1)).updateLastWrittenOffset(5) + verify(coordinator, times(1)).updateLastWrittenOffset(7) + verify(coordinator, times(1)).updateLastCommittedOffset(0) + verify(coordinator, times(1)).updateLastCommittedOffset(2) + verify(coordinator, times(0)).updateLastCommittedOffset(5) + } + } + + @Test + def testUpdateLastWrittenOffsetAndUpdateLastCommittedOffsetNoRecordsRead(): Unit = { + val tp = new TopicPartition("foo", 0) + val replicaManager = mock(classOf[ReplicaManager]) + val serde = new StringKeyValueDeserializer + val log = mock(classOf[UnifiedLog]) + val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]]) + + TestUtils.resource(new CoordinatorLoaderImpl[(String, String)]( + time = Time.SYSTEM, + replicaManager = replicaManager, + deserializer = serde, + loadBufferSize = 1000 + )) { loader => + when(replicaManager.getLog(tp)).thenReturn(Some(log)) + when(log.logStartOffset).thenReturn(0L) + when(log.highWatermark).thenReturn(0L) + when(replicaManager.getLogEndOffset(tp)).thenReturn(Some(0L)) + + assertNotNull(loader.load(tp, coordinator).get(10, TimeUnit.SECONDS)) + + verify(coordinator, times(0)).updateLastWrittenOffset(anyLong()) + verify(coordinator, times(0)).updateLastCommittedOffset(anyLong()) + } + } + private def logReadResult( startOffset: Long, producerId: Long = RecordBatch.NO_PRODUCER_ID, diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorPlayback.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorPlayback.java index b89ef6160ef..1fd8c9712a6 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorPlayback.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorPlayback.java @@ -38,4 +38,18 @@ public interface CoordinatorPlayback { short producerEpoch, U record ) throws RuntimeException; + + /** + * Invoke operations when a batch has been successfully loaded. + * + * @param offset the offset of the last record in the batch plus one. + */ + void updateLastWrittenOffset(Long offset); + + /** + * Called when the high watermark advances. + * + * @param offset The offset of the new high watermark. + */ + void updateLastCommittedOffset(Long offset); } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java index 07663521086..cc1b41018b6 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java @@ -421,25 +421,10 @@ public class CoordinatorRuntime, U> implements Aut int epoch; /** - * The snapshot registry backing the coordinator. + * The state machine and the metadata that can be accessed by + * other threads. */ - SnapshotRegistry snapshotRegistry; - - /** - * The actual state machine. - */ - S coordinator; - - /** - * The last offset written to the partition. - */ - long lastWrittenOffset; - - /** - * The last offset committed. This represents the high - * watermark of the partition. - */ - long lastCommittedOffset; + SnapshottableCoordinator coordinator; /** * Constructor. @@ -462,67 +447,6 @@ public class CoordinatorRuntime, U> implements Aut this.timer = new EventBasedCoordinatorTimer(tp, logContext); } - /** - * Updates the last written offset. This also create a new snapshot - * in the snapshot registry. - * - * @param offset The new last written offset. - */ - private void updateLastWrittenOffset( - long offset - ) { - if (offset <= lastWrittenOffset) { - throw new IllegalStateException("New last written offset " + offset + " of " + tp + - " must be larger than " + lastWrittenOffset + "."); - } - - log.debug("Update last written offset of {} to {}.", tp, offset); - lastWrittenOffset = offset; - snapshotRegistry.getOrCreateSnapshot(offset); - } - - /** - * Reverts the last written offset. This also reverts the snapshot - * registry to this offset. All the changes applied after the offset - * are lost. - * - * @param offset The offset to revert to. - */ - private void revertLastWrittenOffset( - long offset - ) { - if (offset > lastWrittenOffset) { - throw new IllegalStateException("New offset " + offset + " of " + tp + - " must be smaller than " + lastWrittenOffset + "."); - } - - log.debug("Revert last written offset of {} to {}.", tp, offset); - lastWrittenOffset = offset; - snapshotRegistry.revertToSnapshot(offset); - } - - /** - * Updates the last committed offset. This completes all the deferred - * events waiting on this offset. This also cleanups all the snapshots - * prior to this offset. - * - * @param offset The new last committed offset. - */ - private void updateLastCommittedOffset( - long offset - ) { - if (offset <= lastCommittedOffset) { - throw new IllegalStateException("New committed offset " + offset + " of " + tp + - " must be larger than " + lastCommittedOffset + "."); - } - - log.debug("Update committed offset of {} to {}.", tp, offset); - lastCommittedOffset = offset; - deferredEventQueue.completeUpTo(offset); - snapshotRegistry.deleteSnapshotsUpTo(offset); - coordinatorMetrics.onUpdateLastCommittedOffset(tp, offset); - } - /** * Transitions to the new state. * @@ -540,23 +464,25 @@ public class CoordinatorRuntime, U> implements Aut switch (newState) { case LOADING: state = CoordinatorState.LOADING; - snapshotRegistry = new SnapshotRegistry(logContext); - lastWrittenOffset = 0L; - lastCommittedOffset = 0L; - coordinator = coordinatorShardBuilderSupplier - .get() - .withLogContext(logContext) - .withSnapshotRegistry(snapshotRegistry) - .withTime(time) - .withTimer(timer) - .withCoordinatorMetrics(coordinatorMetrics) - .withTopicPartition(tp) - .build(); + SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext); + coordinator = new SnapshottableCoordinator<>( + logContext, + snapshotRegistry, + coordinatorShardBuilderSupplier + .get() + .withLogContext(logContext) + .withSnapshotRegistry(snapshotRegistry) + .withTime(time) + .withTimer(timer) + .withCoordinatorMetrics(coordinatorMetrics) + .withTopicPartition(tp) + .build(), + tp + ); break; case ACTIVE: state = CoordinatorState.ACTIVE; - snapshotRegistry.getOrCreateSnapshot(0); partitionWriter.registerListener(tp, highWatermarklistener); coordinator.onLoaded(metadataImage); break; @@ -589,7 +515,6 @@ public class CoordinatorRuntime, U> implements Aut coordinator.onUnloaded(); } coordinator = null; - snapshotRegistry = null; } } @@ -734,10 +659,10 @@ public class CoordinatorRuntime, U> implements Aut try { // Get the context of the coordinator or fail if the coordinator is not in active state. withActiveContextOrThrow(tp, context -> { - long prevLastWrittenOffset = context.lastWrittenOffset; + long prevLastWrittenOffset = context.coordinator.lastWrittenOffset(); // Execute the operation. - result = op.generateRecordsAndResult(context.coordinator); + result = op.generateRecordsAndResult(context.coordinator.coordinator()); if (result.records().isEmpty()) { // If the records are empty, it was a read operation after all. In this case, @@ -773,7 +698,7 @@ public class CoordinatorRuntime, U> implements Aut producerEpoch, result.records() ); - context.updateLastWrittenOffset(offset); + context.coordinator.updateLastWrittenOffset(offset); // Add the response to the deferred queue. if (!future.isDone()) { @@ -782,7 +707,7 @@ public class CoordinatorRuntime, U> implements Aut complete(null); } } catch (Throwable t) { - context.revertLastWrittenOffset(prevLastWrittenOffset); + context.coordinator.revertLastWrittenOffset(prevLastWrittenOffset); complete(t); } } @@ -918,8 +843,8 @@ public class CoordinatorRuntime, U> implements Aut withActiveContextOrThrow(tp, context -> { // Execute the read operation. response = op.generateResponse( - context.coordinator, - context.lastCommittedOffset + context.coordinator.coordinator(), + context.coordinator.lastCommittedOffset() ); // The response can be completed immediately. @@ -1061,7 +986,9 @@ public class CoordinatorRuntime, U> implements Aut log.debug("High watermark of {} incremented to {}.", tp, offset); scheduleInternalOperation("HighWatermarkUpdated(tp=" + tp + ", offset=" + offset + ")", tp, () -> { withActiveContextOrThrow(tp, context -> { - context.updateLastCommittedOffset(offset); + context.coordinator.updateLastCommittedOffset(offset); + context.deferredEventQueue.completeUpTo(offset); + coordinatorMetrics.onUpdateLastCommittedOffset(tp, offset); }); }); } @@ -1211,8 +1138,10 @@ public class CoordinatorRuntime, U> implements Aut * Creates the context if it does not exist. * * @param tp The topic partition. + * + * Visible for testing. */ - private void maybeCreateContext(TopicPartition tp) { + void maybeCreateContext(TopicPartition tp) { coordinators.computeIfAbsent(tp, CoordinatorContext::new); } @@ -1422,7 +1351,10 @@ public class CoordinatorRuntime, U> implements Aut case FAILED: case INITIAL: context.transitionTo(CoordinatorState.LOADING); - loader.load(tp, context.coordinator).whenComplete((summary, exception) -> { + loader.load( + tp, + context.coordinator + ).whenComplete((summary, exception) -> { scheduleInternalOperation("CompleteLoad(tp=" + tp + ", epoch=" + partitionEpoch + ")", tp, () -> { withContextOrThrow(tp, ctx -> { if (ctx.state != CoordinatorState.LOADING) { diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorShard.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorShard.java index 1dca9835098..cd5f4e258b4 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorShard.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorShard.java @@ -23,7 +23,7 @@ import org.apache.kafka.image.MetadataImage; * CoordinatorShard is basically a replicated state machine managed by the * {@link CoordinatorRuntime}. */ -public interface CoordinatorShard extends CoordinatorPlayback { +public interface CoordinatorShard { /** * The coordinator has been loaded. This is used to apply any @@ -47,4 +47,15 @@ public interface CoordinatorShard extends CoordinatorPlayback { * any post unloading operations. */ default void onUnloaded() {} + + /** + * Replay a record to update the state machine. + * + * @param record The record to replay. + */ + void replay( + long producerId, + short producerEpoch, + U record + ) throws RuntimeException; } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/SnapshottableCoordinator.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/SnapshottableCoordinator.java new file mode 100644 index 00000000000..fdc1aa47a09 --- /dev/null +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/SnapshottableCoordinator.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.runtime; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.image.MetadataDelta; +import org.apache.kafka.image.MetadataImage; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.slf4j.Logger; + +/** + * SnapshottableCoordinator is a wrapper on top of the coordinator state machine. This object is not accessed concurrently + * but multiple threads access it while loading the coordinator partition and therefore requires all methods to be + * synchronized. + */ +class SnapshottableCoordinator, U> implements CoordinatorPlayback { + /** + * The logger. + */ + private final Logger log; + + /** + * The snapshot registry backing the coordinator. + */ + private final SnapshotRegistry snapshotRegistry; + + /** + * The actual state machine. + */ + private final S coordinator; + + /** + * The topic partition. + */ + private final TopicPartition tp; + + /** + * The last offset written to the partition. + */ + private long lastWrittenOffset; + + /** + * The last offset committed. This represents the high + * watermark of the partition. + */ + private long lastCommittedOffset; + + SnapshottableCoordinator( + LogContext logContext, + SnapshotRegistry snapshotRegistry, + S coordinator, + TopicPartition tp + ) { + this.log = logContext.logger(SnapshottableCoordinator.class); + this.coordinator = coordinator; + this.snapshotRegistry = snapshotRegistry; + this.tp = tp; + this.lastWrittenOffset = 0; + this.lastCommittedOffset = 0; + snapshotRegistry.getOrCreateSnapshot(0); + } + + /** + * Reverts the last written offset. This also reverts the snapshot + * registry to this offset. All the changes applied after the offset + * are lost. + * + * @param offset The offset to revert to. + */ + synchronized void revertLastWrittenOffset( + long offset + ) { + if (offset > lastWrittenOffset) { + throw new IllegalStateException("New offset " + offset + " of " + tp + + " must be smaller than " + lastWrittenOffset + "."); + } + + log.debug("Revert last written offset of {} to {}.", tp, offset); + lastWrittenOffset = offset; + snapshotRegistry.revertToSnapshot(offset); + } + + /** + * Replays the record onto the state machine. + * + * @param producerId The producer id. + * @param producerEpoch The producer epoch. + * @param record A record. + */ + @Override + public synchronized void replay( + long producerId, + short producerEpoch, + U record + ) { + coordinator.replay(producerId, producerEpoch, record); + } + + /** + * Updates the last written offset. This also create a new snapshot + * in the snapshot registry. + * + * @param offset The new last written offset. + */ + @Override + public synchronized void updateLastWrittenOffset(Long offset) { + if (offset <= lastWrittenOffset) { + throw new IllegalStateException("New last written offset " + offset + " of " + tp + + " must be greater than " + lastWrittenOffset + "."); + } + + lastWrittenOffset = offset; + snapshotRegistry.getOrCreateSnapshot(offset); + log.debug("Updated last written offset of {} to {}.", tp, offset); + } + + /** + * Updates the last committed offset. This completes all the deferred + * events waiting on this offset. This also cleanups all the snapshots + * prior to this offset. + * + * @param offset The new last committed offset. + */ + @Override + public synchronized void updateLastCommittedOffset(Long offset) { + if (offset < lastCommittedOffset) { + throw new IllegalStateException("New committed offset " + offset + " of " + tp + + " must be greater than or equal to " + lastCommittedOffset + "."); + } + + lastCommittedOffset = offset; + snapshotRegistry.deleteSnapshotsUpTo(offset); + log.debug("Updated committed offset of {} to {}.", tp, offset); + } + + /** + * The coordinator has been loaded. This is used to apply any + * post loading operations. + * + * @param newImage The metadata image. + */ + synchronized void onLoaded(MetadataImage newImage) { + this.coordinator.onLoaded(newImage); + } + + /** + * The coordinator has been unloaded. This is used to apply + * any post unloading operations. + */ + synchronized void onUnloaded() { + if (this.coordinator != null) { + this.coordinator.onUnloaded(); + } + } + + /** + * @return The last written offset. + */ + synchronized long lastWrittenOffset() { + return this.lastWrittenOffset; + } + + /** + * A new metadata image is available. This is only called after {@link SnapshottableCoordinator#onLoaded(MetadataImage)} + * is called to signal that the coordinator has been fully loaded. + * + * @param newImage The new metadata image. + * @param delta The delta image. + */ + synchronized void onNewMetadataImage(MetadataImage newImage, MetadataDelta delta) { + this.coordinator.onNewMetadataImage(newImage, delta); + } + + /** + * @return The last committed offset. + */ + synchronized long lastCommittedOffset() { + return this.lastCommittedOffset; + } + + /** + * @return The coordinator. + */ + synchronized S coordinator() { + return this.coordinator; + } + + /** + * @return The snapshot registry. + * + * Only used for testing. + */ + synchronized SnapshotRegistry snapshotRegistry() { + return this.snapshotRegistry; + } +} diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java index 2c589a7feba..cba97e6edf6 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java @@ -31,6 +31,7 @@ import org.apache.kafka.server.util.timer.MockTimer; import org.apache.kafka.timeline.SnapshotRegistry; import org.apache.kafka.timeline.TimelineHashSet; import org.junit.jupiter.api.Test; +import org.mockito.ArgumentMatcher; import java.util.Arrays; import java.util.Collections; @@ -59,6 +60,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; @@ -127,17 +129,30 @@ public class CoordinatorRuntimeTest { */ private static class MockCoordinatorLoader implements CoordinatorLoader { private final LoadSummary summary; + private final List lastWrittenOffsets; + private final List lastCommittedOffsets; - public MockCoordinatorLoader(LoadSummary summary) { + public MockCoordinatorLoader( + LoadSummary summary, + List lastWrittenOffsets, + List lastCommittedOffsets + ) { this.summary = summary; + this.lastWrittenOffsets = lastWrittenOffsets; + this.lastCommittedOffsets = lastCommittedOffsets; } public MockCoordinatorLoader() { - this(null); + this(null, Collections.emptyList(), Collections.emptyList()); } @Override - public CompletableFuture load(TopicPartition tp, CoordinatorPlayback replayable) { + public CompletableFuture load( + TopicPartition tp, + CoordinatorPlayback replayable + ) { + lastWrittenOffsets.forEach(replayable::updateLastWrittenOffset); + lastCommittedOffsets.forEach(replayable::updateLastCommittedOffset); return CompletableFuture.completedFuture(summary); } @@ -194,7 +209,7 @@ public class CoordinatorRuntimeTest { /** * A simple Coordinator implementation that stores the records into a set. */ - private static class MockCoordinatorShard implements CoordinatorShard { + static class MockCoordinatorShard implements CoordinatorShard { private final TimelineHashSet records; private final CoordinatorTimer timer; @@ -322,7 +337,7 @@ public class CoordinatorRuntimeTest { when(builder.build()).thenReturn(coordinator); when(supplier.get()).thenReturn(builder); CompletableFuture future = new CompletableFuture<>(); - when(loader.load(TP, coordinator)).thenReturn(future); + when(loader.load(eq(TP), argThat(coordinatorMatcher(runtime, TP)))).thenReturn(future); // Getting the coordinator context fails because the coordinator // does not exist until scheduleLoadOperation is called. @@ -333,11 +348,12 @@ public class CoordinatorRuntimeTest { // Getting the coordinator context succeeds now. CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); + assertEquals(coordinator, ctx.coordinator.coordinator()); // The coordinator is loading. assertEquals(LOADING, ctx.state); assertEquals(0, ctx.epoch); - assertEquals(coordinator, ctx.coordinator); + assertEquals(coordinator, ctx.coordinator.coordinator()); // When the loading completes, the coordinator transitions to active. future.complete(null); @@ -353,7 +369,7 @@ public class CoordinatorRuntimeTest { ); // Verify that the builder got all the expected objects. - verify(builder, times(1)).withSnapshotRegistry(eq(ctx.snapshotRegistry)); + verify(builder, times(1)).withSnapshotRegistry(eq(ctx.coordinator.snapshotRegistry())); verify(builder, times(1)).withLogContext(eq(ctx.logContext)); verify(builder, times(1)).withTime(eq(timer.time())); verify(builder, times(1)).withTimer(eq(ctx.timer)); @@ -389,7 +405,7 @@ public class CoordinatorRuntimeTest { when(builder.build()).thenReturn(coordinator); when(supplier.get()).thenReturn(builder); CompletableFuture future = new CompletableFuture<>(); - when(loader.load(TP, coordinator)).thenReturn(future); + when(loader.load(eq(TP), argThat(coordinatorMatcher(runtime, TP)))).thenReturn(future); // Schedule the loading. runtime.scheduleLoadOperation(TP, 0); @@ -398,7 +414,7 @@ public class CoordinatorRuntimeTest { CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); assertEquals(LOADING, ctx.state); assertEquals(0, ctx.epoch); - assertEquals(coordinator, ctx.coordinator); + assertEquals(coordinator, ctx.coordinator.coordinator()); // When the loading fails, the coordinator transitions to failed. future.completeExceptionally(new Exception("failure")); @@ -443,7 +459,7 @@ public class CoordinatorRuntimeTest { when(builder.build()).thenReturn(coordinator); when(supplier.get()).thenReturn(builder); CompletableFuture future = new CompletableFuture<>(); - when(loader.load(TP, coordinator)).thenReturn(future); + when(loader.load(eq(TP), argThat(coordinatorMatcher(runtime, TP)))).thenReturn(future); // Schedule the loading. runtime.scheduleLoadOperation(TP, 10); @@ -452,7 +468,7 @@ public class CoordinatorRuntimeTest { CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); assertEquals(LOADING, ctx.state); assertEquals(10, ctx.epoch); - assertEquals(coordinator, ctx.coordinator); + assertEquals(coordinator, ctx.coordinator.coordinator()); // When the loading completes, the coordinator transitions to active. future.complete(null); @@ -495,7 +511,7 @@ public class CoordinatorRuntimeTest { when(builder.build()).thenReturn(coordinator); when(supplier.get()).thenReturn(builder); CompletableFuture future = new CompletableFuture<>(); - when(loader.load(TP, coordinator)).thenReturn(future); + when(loader.load(eq(TP), argThat(coordinatorMatcher(runtime, TP)))).thenReturn(future); // Schedule the loading. runtime.scheduleLoadOperation(TP, 10); @@ -504,7 +520,7 @@ public class CoordinatorRuntimeTest { CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); assertEquals(LOADING, ctx.state); assertEquals(10, ctx.epoch); - assertEquals(coordinator, ctx.coordinator); + assertEquals(coordinator, ctx.coordinator.coordinator()); // When the loading fails, the coordinator transitions to failed. future.completeExceptionally(new Exception("failure")); @@ -519,14 +535,14 @@ public class CoordinatorRuntimeTest { // Schedule the reloading. future = new CompletableFuture<>(); - when(loader.load(TP, coordinator)).thenReturn(future); + when(loader.load(eq(TP), argThat(coordinatorMatcher(runtime, TP)))).thenReturn(future); runtime.scheduleLoadOperation(TP, 11); // Getting the context succeeds and the coordinator should be in loading. ctx = runtime.contextOrThrow(TP); assertEquals(LOADING, ctx.state); assertEquals(11, ctx.epoch); - assertEquals(coordinator, ctx.coordinator); + assertEquals(coordinator, ctx.coordinator.coordinator()); // Complete the loading. future.complete(null); @@ -692,9 +708,9 @@ public class CoordinatorRuntimeTest { // Verify the initial state. CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); - assertEquals(0L, ctx.lastWrittenOffset); - assertEquals(0L, ctx.lastCommittedOffset); - assertEquals(Collections.singletonList(0L), ctx.snapshotRegistry.epochsList()); + assertEquals(0L, ctx.coordinator.lastWrittenOffset()); + assertEquals(0L, ctx.coordinator.lastCommittedOffset()); + assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList()); // Write #1. CompletableFuture write1 = runtime.scheduleWriteOperation("write#1", TP, @@ -704,13 +720,13 @@ public class CoordinatorRuntimeTest { assertFalse(write1.isDone()); // The last written offset is updated. - assertEquals(2L, ctx.lastWrittenOffset); + assertEquals(2L, ctx.coordinator.lastWrittenOffset()); // The last committed offset does not change. - assertEquals(0L, ctx.lastCommittedOffset); + assertEquals(0L, ctx.coordinator.lastCommittedOffset()); // A new snapshot is created. - assertEquals(Arrays.asList(0L, 2L), ctx.snapshotRegistry.epochsList()); + assertEquals(Arrays.asList(0L, 2L), ctx.coordinator.snapshotRegistry().epochsList()); // Records have been replayed to the coordinator. - assertEquals(mkSet("record1", "record2"), ctx.coordinator.records()); + assertEquals(mkSet("record1", "record2"), ctx.coordinator.coordinator().records()); // Records have been written to the log. assertEquals(Arrays.asList("record1", "record2"), writer.records(TP)); @@ -722,13 +738,13 @@ public class CoordinatorRuntimeTest { assertFalse(write2.isDone()); // The last written offset is updated. - assertEquals(3L, ctx.lastWrittenOffset); + assertEquals(3L, ctx.coordinator.lastWrittenOffset()); // The last committed offset does not change. - assertEquals(0L, ctx.lastCommittedOffset); + assertEquals(0L, ctx.coordinator.lastCommittedOffset()); // A new snapshot is created. - assertEquals(Arrays.asList(0L, 2L, 3L), ctx.snapshotRegistry.epochsList()); + assertEquals(Arrays.asList(0L, 2L, 3L), ctx.coordinator.snapshotRegistry().epochsList()); // Records have been replayed to the coordinator. - assertEquals(mkSet("record1", "record2", "record3"), ctx.coordinator.records()); + assertEquals(mkSet("record1", "record2", "record3"), ctx.coordinator.coordinator().records()); // Records have been written to the log. assertEquals(Arrays.asList("record1", "record2", "record3"), writer.records(TP)); @@ -740,10 +756,10 @@ public class CoordinatorRuntimeTest { assertFalse(write3.isDone()); // The state does not change. - assertEquals(3L, ctx.lastWrittenOffset); - assertEquals(0L, ctx.lastCommittedOffset); - assertEquals(Arrays.asList(0L, 2L, 3L), ctx.snapshotRegistry.epochsList()); - assertEquals(mkSet("record1", "record2", "record3"), ctx.coordinator.records()); + assertEquals(3L, ctx.coordinator.lastWrittenOffset()); + assertEquals(0L, ctx.coordinator.lastCommittedOffset()); + assertEquals(Arrays.asList(0L, 2L, 3L), ctx.coordinator.snapshotRegistry().epochsList()); + assertEquals(mkSet("record1", "record2", "record3"), ctx.coordinator.coordinator().records()); assertEquals(Arrays.asList("record1", "record2", "record3"), writer.records(TP)); // Commit write #1. @@ -754,9 +770,9 @@ public class CoordinatorRuntimeTest { assertEquals("response1", write1.get(5, TimeUnit.SECONDS)); // The last committed offset is updated. - assertEquals(2L, ctx.lastCommittedOffset); + assertEquals(2L, ctx.coordinator.lastCommittedOffset()); // The snapshot is cleaned up. - assertEquals(Arrays.asList(2L, 3L), ctx.snapshotRegistry.epochsList()); + assertEquals(Arrays.asList(2L, 3L), ctx.coordinator.snapshotRegistry().epochsList()); // Commit write #2. writer.commit(TP, 3); @@ -768,9 +784,9 @@ public class CoordinatorRuntimeTest { assertEquals("response3", write3.get(5, TimeUnit.SECONDS)); // The last committed offset is updated. - assertEquals(3L, ctx.lastCommittedOffset); + assertEquals(3L, ctx.coordinator.lastCommittedOffset()); // The snapshot is cleaned up. - assertEquals(Collections.singletonList(3L), ctx.snapshotRegistry.epochsList()); + assertEquals(Collections.singletonList(3L), ctx.coordinator.snapshotRegistry().epochsList()); // Write #4 but without records. CompletableFuture write4 = runtime.scheduleWriteOperation("write#4", TP, @@ -779,7 +795,7 @@ public class CoordinatorRuntimeTest { // It is completed immediately because the state is fully committed. assertTrue(write4.isDone()); assertEquals("response4", write4.get(5, TimeUnit.SECONDS)); - assertEquals(Collections.singletonList(3L), ctx.snapshotRegistry.epochsList()); + assertEquals(Collections.singletonList(3L), ctx.coordinator.snapshotRegistry().epochsList()); } @Test @@ -850,22 +866,28 @@ public class CoordinatorRuntimeTest { // Verify the initial state. CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); - assertEquals(0L, ctx.lastWrittenOffset); - assertEquals(0L, ctx.lastCommittedOffset); - assertEquals(Collections.singletonList(0L), ctx.snapshotRegistry.epochsList()); + assertEquals(0L, ctx.coordinator.lastWrittenOffset()); + assertEquals(0L, ctx.coordinator.lastCommittedOffset()); + assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList()); // Override the coordinator with a coordinator that throws // an exception when replay is called. - ctx.coordinator = new MockCoordinatorShard(ctx.snapshotRegistry, ctx.timer) { - @Override - public void replay( - long producerId, - short producerEpoch, - String record - ) throws RuntimeException { - throw new IllegalArgumentException("error"); - } - }; + SnapshotRegistry snapshotRegistry = ctx.coordinator.snapshotRegistry(); + ctx.coordinator = new SnapshottableCoordinator<>( + new LogContext(), + snapshotRegistry, + new MockCoordinatorShard(snapshotRegistry, ctx.timer) { + @Override + public void replay( + long producerId, + short producerEpoch, + String record + ) throws RuntimeException { + throw new IllegalArgumentException("error"); + } + }, + TP + ); // Write. It should fail. CompletableFuture write = runtime.scheduleWriteOperation("write", TP, @@ -873,9 +895,9 @@ public class CoordinatorRuntimeTest { assertFutureThrows(write, IllegalArgumentException.class); // Verify that the state has not changed. - assertEquals(0L, ctx.lastWrittenOffset); - assertEquals(0L, ctx.lastCommittedOffset); - assertEquals(Collections.singletonList(0L), ctx.snapshotRegistry.epochsList()); + assertEquals(0L, ctx.coordinator.lastWrittenOffset()); + assertEquals(0L, ctx.coordinator.lastCommittedOffset()); + assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList()); } @Test @@ -901,19 +923,19 @@ public class CoordinatorRuntimeTest { // Verify the initial state. CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); - assertEquals(0, ctx.lastWrittenOffset); - assertEquals(0, ctx.lastCommittedOffset); - assertEquals(Collections.singletonList(0L), ctx.snapshotRegistry.epochsList()); + assertEquals(0, ctx.coordinator.lastWrittenOffset()); + assertEquals(0, ctx.coordinator.lastCommittedOffset()); + assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList()); // Write #1. It should succeed and be applied to the coordinator. runtime.scheduleWriteOperation("write#1", TP, state -> new CoordinatorResult<>(Arrays.asList("record1", "record2"), "response1")); // Verify that the state has been updated. - assertEquals(2L, ctx.lastWrittenOffset); - assertEquals(0L, ctx.lastCommittedOffset); - assertEquals(Arrays.asList(0L, 2L), ctx.snapshotRegistry.epochsList()); - assertEquals(mkSet("record1", "record2"), ctx.coordinator.records()); + assertEquals(2L, ctx.coordinator.lastWrittenOffset()); + assertEquals(0L, ctx.coordinator.lastCommittedOffset()); + assertEquals(Arrays.asList(0L, 2L), ctx.coordinator.snapshotRegistry().epochsList()); + assertEquals(mkSet("record1", "record2"), ctx.coordinator.coordinator().records()); // Write #2. It should fail because the writer is configured to only // accept 2 records per batch. @@ -922,10 +944,10 @@ public class CoordinatorRuntimeTest { assertFutureThrows(write2, KafkaException.class); // Verify that the state has not changed. - assertEquals(2L, ctx.lastWrittenOffset); - assertEquals(0L, ctx.lastCommittedOffset); - assertEquals(Arrays.asList(0L, 2L), ctx.snapshotRegistry.epochsList()); - assertEquals(mkSet("record1", "record2"), ctx.coordinator.records()); + assertEquals(2L, ctx.coordinator.lastWrittenOffset()); + assertEquals(0L, ctx.coordinator.lastCommittedOffset()); + assertEquals(Arrays.asList(0L, 2L), ctx.coordinator.snapshotRegistry().epochsList()); + assertEquals(mkSet("record1", "record2"), ctx.coordinator.coordinator().records()); } @Test @@ -1019,8 +1041,8 @@ public class CoordinatorRuntimeTest { // Verify the initial state. CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); - assertEquals(0, ctx.lastWrittenOffset); - assertEquals(0, ctx.lastCommittedOffset); + assertEquals(0, ctx.coordinator.lastWrittenOffset()); + assertEquals(0, ctx.coordinator.lastCommittedOffset()); // Write #1. CompletableFuture write1 = runtime.scheduleWriteOperation("write#1", TP, @@ -1040,13 +1062,13 @@ public class CoordinatorRuntimeTest { assertFalse(write2.isDone()); // The last written and committed offsets are updated. - assertEquals(4, ctx.lastWrittenOffset); - assertEquals(2, ctx.lastCommittedOffset); + assertEquals(4, ctx.coordinator.lastWrittenOffset()); + assertEquals(2, ctx.coordinator.lastCommittedOffset()); // Read. CompletableFuture read = runtime.scheduleReadOperation("read", TP, (state, offset) -> { // The read operation should be given the last committed offset. - assertEquals(ctx.lastCommittedOffset, offset); + assertEquals(ctx.coordinator.lastCommittedOffset(), offset); return "read-response"; }); @@ -1098,8 +1120,8 @@ public class CoordinatorRuntimeTest { // Verify the initial state. CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); - assertEquals(0, ctx.lastWrittenOffset); - assertEquals(0, ctx.lastCommittedOffset); + assertEquals(0, ctx.coordinator.lastWrittenOffset()); + assertEquals(0, ctx.coordinator.lastCommittedOffset()); // Write #1. runtime.scheduleWriteOperation("write#1", TP, @@ -1114,7 +1136,7 @@ public class CoordinatorRuntimeTest { // Read. It fails with an exception that is used to complete the future. CompletableFuture read = runtime.scheduleReadOperation("read", TP, (state, offset) -> { - assertEquals(ctx.lastCommittedOffset, offset); + assertEquals(ctx.coordinator.lastCommittedOffset(), offset); throw new IllegalArgumentException("error"); }); assertFutureThrows(read, IllegalArgumentException.class); @@ -1141,8 +1163,8 @@ public class CoordinatorRuntimeTest { // Check initial state. CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); - assertEquals(0, ctx.lastWrittenOffset); - assertEquals(0, ctx.lastCommittedOffset); + assertEquals(0, ctx.coordinator.lastWrittenOffset()); + assertEquals(0, ctx.coordinator.lastCommittedOffset()); // Write #1. CompletableFuture write1 = runtime.scheduleWriteOperation("write#1", TP, @@ -1160,7 +1182,7 @@ public class CoordinatorRuntimeTest { assertEquals(0, ctx.timer.size()); // Timer #1. This is never executed. - ctx.coordinator.timer.schedule("timer-1", 10, TimeUnit.SECONDS, true, + ctx.timer.schedule("timer-1", 10, TimeUnit.SECONDS, true, () -> new CoordinatorResult<>(Arrays.asList("record5", "record6"), null)); // The coordinator timer should have one pending task. @@ -1219,14 +1241,17 @@ public class CoordinatorRuntimeTest { .thenReturn(coordinator1); CompletableFuture future0 = new CompletableFuture<>(); - when(loader.load(tp0, coordinator0)).thenReturn(future0); + when(loader.load(eq(tp0), argThat(coordinatorMatcher(runtime, tp0)))).thenReturn(future0); CompletableFuture future1 = new CompletableFuture<>(); - when(loader.load(tp1, coordinator1)).thenReturn(future1); + when(loader.load(eq(tp1), argThat(coordinatorMatcher(runtime, tp1)))).thenReturn(future1); runtime.scheduleLoadOperation(tp0, 0); runtime.scheduleLoadOperation(tp1, 0); + assertEquals(coordinator0, runtime.contextOrThrow(tp0).coordinator.coordinator()); + assertEquals(coordinator1, runtime.contextOrThrow(tp1).coordinator.coordinator()); + // Coordinator 0 is loaded. It should get the current image // that is the empty one. future0.complete(null); @@ -1266,18 +1291,18 @@ public class CoordinatorRuntimeTest { // Check initial state. CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); - assertEquals(0, ctx.lastWrittenOffset); - assertEquals(0, ctx.lastCommittedOffset); + assertEquals(0, ctx.coordinator.lastWrittenOffset()); + assertEquals(0, ctx.coordinator.lastCommittedOffset()); // The coordinator timer should be empty. assertEquals(0, ctx.timer.size()); // Timer #1. - ctx.coordinator.timer.schedule("timer-1", 10, TimeUnit.MILLISECONDS, true, + ctx.timer.schedule("timer-1", 10, TimeUnit.MILLISECONDS, true, () -> new CoordinatorResult<>(Arrays.asList("record1", "record2"), null)); // Timer #2. - ctx.coordinator.timer.schedule("timer-2", 20, TimeUnit.MILLISECONDS, true, + ctx.timer.schedule("timer-2", 20, TimeUnit.MILLISECONDS, true, () -> new CoordinatorResult<>(Arrays.asList("record3", "record4"), null)); // The coordinator timer should have two pending tasks. @@ -1287,14 +1312,14 @@ public class CoordinatorRuntimeTest { timer.advanceClock(10 + 1); // Verify that the operation was executed. - assertEquals(mkSet("record1", "record2"), ctx.coordinator.records()); + assertEquals(mkSet("record1", "record2"), ctx.coordinator.coordinator().records()); assertEquals(1, ctx.timer.size()); // Advance time to fire timer #2, timer.advanceClock(10 + 1); // Verify that the operation was executed. - assertEquals(mkSet("record1", "record2", "record3", "record4"), ctx.coordinator.records()); + assertEquals(mkSet("record1", "record2", "record3", "record4"), ctx.coordinator.coordinator().records()); assertEquals(0, ctx.timer.size()); } @@ -1329,7 +1354,7 @@ public class CoordinatorRuntimeTest { assertEquals(0, processor.size()); // Timer #1. - ctx.coordinator.timer.schedule("timer-1", 10, TimeUnit.MILLISECONDS, true, + ctx.timer.schedule("timer-1", 10, TimeUnit.MILLISECONDS, true, () -> new CoordinatorResult<>(Collections.singletonList("record1"), null)); // The coordinator timer should have one pending task. @@ -1342,14 +1367,14 @@ public class CoordinatorRuntimeTest { assertEquals(1, processor.size()); // Schedule a second timer with the same key. - ctx.coordinator.timer.schedule("timer-1", 10, TimeUnit.MILLISECONDS, true, + ctx.timer.schedule("timer-1", 10, TimeUnit.MILLISECONDS, true, () -> new CoordinatorResult<>(Collections.singletonList("record2"), null)); // The coordinator timer should still have one pending task. assertEquals(1, ctx.timer.size()); // Schedule a third timer with the same key. - ctx.coordinator.timer.schedule("timer-1", 10, TimeUnit.MILLISECONDS, true, + ctx.timer.schedule("timer-1", 10, TimeUnit.MILLISECONDS, true, () -> new CoordinatorResult<>(Collections.singletonList("record3"), null)); // The coordinator timer should still have one pending task. @@ -1367,7 +1392,7 @@ public class CoordinatorRuntimeTest { // Verify that the correct operation was executed. Only the third // instance should have been executed here. - assertEquals(mkSet("record3"), ctx.coordinator.records()); + assertEquals(mkSet("record3"), ctx.coordinator.coordinator().records()); assertEquals(0, ctx.timer.size()); } @@ -1402,7 +1427,7 @@ public class CoordinatorRuntimeTest { assertEquals(0, processor.size()); // Timer #1. - ctx.coordinator.timer.schedule("timer-1", 10, TimeUnit.MILLISECONDS, true, + ctx.timer.schedule("timer-1", 10, TimeUnit.MILLISECONDS, true, () -> new CoordinatorResult<>(Collections.singletonList("record1"), null)); // The coordinator timer should have one pending task. @@ -1415,14 +1440,14 @@ public class CoordinatorRuntimeTest { assertEquals(1, processor.size()); // Schedule a second timer with the same key. - ctx.coordinator.timer.schedule("timer-1", 10, TimeUnit.MILLISECONDS, true, + ctx.timer.schedule("timer-1", 10, TimeUnit.MILLISECONDS, true, () -> new CoordinatorResult<>(Collections.singletonList("record2"), null)); // The coordinator timer should still have one pending task. assertEquals(1, ctx.timer.size()); // Cancel the timer. - ctx.coordinator.timer.cancel("timer-1"); + ctx.timer.cancel("timer-1"); // The coordinator timer have no pending timers. assertEquals(0, ctx.timer.size()); @@ -1438,7 +1463,7 @@ public class CoordinatorRuntimeTest { assertTrue(processor.poll()); // Verify that no operation was executed. - assertEquals(Collections.emptySet(), ctx.coordinator.records()); + assertEquals(Collections.emptySet(), ctx.coordinator.coordinator().records()); assertEquals(0, ctx.timer.size()); } @@ -1466,7 +1491,7 @@ public class CoordinatorRuntimeTest { // Timer #1. AtomicInteger cnt = new AtomicInteger(0); - ctx.coordinator.timer.schedule("timer-1", 10, TimeUnit.MILLISECONDS, true, () -> { + ctx.timer.schedule("timer-1", 10, TimeUnit.MILLISECONDS, true, () -> { cnt.incrementAndGet(); throw new KafkaException("error"); }); @@ -1496,7 +1521,7 @@ public class CoordinatorRuntimeTest { assertEquals(1, ctx.timer.size()); // Cancel Timer #1. - ctx.coordinator.timer.cancel("timer-1"); + ctx.timer.cancel("timer-1"); assertEquals(0, ctx.timer.size()); } @@ -1524,7 +1549,7 @@ public class CoordinatorRuntimeTest { // Timer #1. AtomicInteger cnt = new AtomicInteger(0); - ctx.coordinator.timer.schedule("timer-1", 10, TimeUnit.MILLISECONDS, false, () -> { + ctx.timer.schedule("timer-1", 10, TimeUnit.MILLISECONDS, false, () -> { cnt.incrementAndGet(); throw new KafkaException("error"); }); @@ -1572,7 +1597,7 @@ public class CoordinatorRuntimeTest { when(builder.build()).thenReturn(coordinator); when(supplier.get()).thenReturn(builder); CompletableFuture future = new CompletableFuture<>(); - when(loader.load(TP, coordinator)).thenReturn(future); + when(loader.load(eq(TP), argThat(coordinatorMatcher(runtime, TP)))).thenReturn(future); // Schedule the loading. runtime.scheduleLoadOperation(TP, 0); @@ -1590,11 +1615,12 @@ public class CoordinatorRuntimeTest { // Start loading a new topic partition. TopicPartition tp = new TopicPartition("__consumer_offsets", 1); future = new CompletableFuture<>(); - when(loader.load(tp, coordinator)).thenReturn(future); + when(loader.load(eq(tp), argThat(coordinatorMatcher(runtime, tp)))).thenReturn(future); // Schedule the loading. runtime.scheduleLoadOperation(tp, 0); // Getting the context succeeds and the coordinator should be in loading. ctx = runtime.contextOrThrow(tp); + assertEquals(coordinator, ctx.coordinator.coordinator()); assertEquals(LOADING, ctx.state); verify(runtimeMetrics, times(2)).recordPartitionStateChange(INITIAL, LOADING); @@ -1627,7 +1653,9 @@ public class CoordinatorRuntimeTest { startTimeMs, startTimeMs + 1000, 30, - 3000))) + 3000), + Collections.emptyList(), + Collections.emptyList())) .withEventProcessor(new DirectEventProcessor()) .withPartitionWriter(writer) .withCoordinatorShardBuilderSupplier(supplier) @@ -1659,4 +1687,116 @@ public class CoordinatorRuntimeTest { verify(runtimeMetrics, times(1)).recordPartitionLoadSensor(startTimeMs, startTimeMs + 1000); } + + @Test + public void testPartitionLoadGeneratesSnapshotAtHighWatermark() { + MockTimer timer = new MockTimer(); + MockPartitionWriter writer = mock(MockPartitionWriter.class); + MockCoordinatorShardBuilderSupplier supplier = mock(MockCoordinatorShardBuilderSupplier.class); + MockCoordinatorShardBuilder builder = mock(MockCoordinatorShardBuilder.class); + MockCoordinatorShard coordinator = mock(MockCoordinatorShard.class); + GroupCoordinatorRuntimeMetrics runtimeMetrics = mock(GroupCoordinatorRuntimeMetrics.class); + + CoordinatorRuntime runtime = + new CoordinatorRuntime.Builder() + .withTime(Time.SYSTEM) + .withTimer(timer) + .withLoader(new MockCoordinatorLoader( + new CoordinatorLoader.LoadSummary( + 1000, + 2000, + 30, + 3000), + Arrays.asList(5L, 15L, 27L), + Arrays.asList(5L, 15L))) + .withEventProcessor(new DirectEventProcessor()) + .withPartitionWriter(writer) + .withCoordinatorShardBuilderSupplier(supplier) + .withCoordinatorRuntimeMetrics(runtimeMetrics) + .withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class)) + .build(); + + when(builder.withSnapshotRegistry(any())).thenReturn(builder); + when(builder.withLogContext(any())).thenReturn(builder); + when(builder.withTime(any())).thenReturn(builder); + when(builder.withTimer(any())).thenReturn(builder); + when(builder.withCoordinatorMetrics(any())).thenReturn(builder); + when(builder.withTopicPartition(any())).thenReturn(builder); + when(builder.build()).thenReturn(coordinator); + when(supplier.get()).thenReturn(builder); + + // Schedule the loading. + runtime.scheduleLoadOperation(TP, 0); + + // Getting the coordinator context succeeds now. + CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); + + // When the loading completes, the coordinator transitions to active. + assertEquals(ACTIVE, ctx.state); + + assertEquals(27L, ctx.coordinator.lastWrittenOffset()); + assertEquals(15L, ctx.coordinator.lastCommittedOffset()); + assertFalse(ctx.coordinator.snapshotRegistry().hasSnapshot(0L)); + assertFalse(ctx.coordinator.snapshotRegistry().hasSnapshot(5L)); + assertTrue(ctx.coordinator.snapshotRegistry().hasSnapshot(15L)); + assertTrue(ctx.coordinator.snapshotRegistry().hasSnapshot(27L)); + } + + @Test + public void testPartitionLoadGeneratesSnapshotAtHighWatermarkNoRecordsLoaded() { + MockTimer timer = new MockTimer(); + MockPartitionWriter writer = mock(MockPartitionWriter.class); + MockCoordinatorShardBuilderSupplier supplier = mock(MockCoordinatorShardBuilderSupplier.class); + MockCoordinatorShardBuilder builder = mock(MockCoordinatorShardBuilder.class); + MockCoordinatorShard coordinator = mock(MockCoordinatorShard.class); + GroupCoordinatorRuntimeMetrics runtimeMetrics = mock(GroupCoordinatorRuntimeMetrics.class); + + CoordinatorRuntime runtime = + new CoordinatorRuntime.Builder() + .withTime(Time.SYSTEM) + .withTimer(timer) + .withLoader(new MockCoordinatorLoader( + new CoordinatorLoader.LoadSummary( + 1000, + 2000, + 30, + 3000), + Collections.emptyList(), + Collections.emptyList())) + .withEventProcessor(new DirectEventProcessor()) + .withPartitionWriter(writer) + .withCoordinatorShardBuilderSupplier(supplier) + .withCoordinatorRuntimeMetrics(runtimeMetrics) + .withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class)) + .build(); + + when(builder.withSnapshotRegistry(any())).thenReturn(builder); + when(builder.withLogContext(any())).thenReturn(builder); + when(builder.withTime(any())).thenReturn(builder); + when(builder.withTimer(any())).thenReturn(builder); + when(builder.withCoordinatorMetrics(any())).thenReturn(builder); + when(builder.withTopicPartition(any())).thenReturn(builder); + when(builder.build()).thenReturn(coordinator); + when(supplier.get()).thenReturn(builder); + + // Schedule the loading. + runtime.scheduleLoadOperation(TP, 0); + + // Getting the coordinator context succeeds now. + CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); + + // When the loading completes, the coordinator transitions to active. + assertEquals(ACTIVE, ctx.state); + + assertEquals(0L, ctx.coordinator.lastWrittenOffset()); + assertEquals(0L, ctx.coordinator.lastCommittedOffset()); + assertTrue(ctx.coordinator.snapshotRegistry().hasSnapshot(0L)); + } + + private static , U> ArgumentMatcher> coordinatorMatcher( + CoordinatorRuntime runtime, + TopicPartition tp + ) { + return c -> c.equals(runtime.contextOrThrow(tp).coordinator); + } } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/SnapshottableCoordinatorTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/SnapshottableCoordinatorTest.java new file mode 100644 index 00000000000..220442c057d --- /dev/null +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/SnapshottableCoordinatorTest.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.runtime; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.coordinator.group.MockCoordinatorTimer; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.junit.jupiter.api.Test; + +import static org.apache.kafka.coordinator.group.runtime.CoordinatorRuntimeTest.MockCoordinatorShard; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class SnapshottableCoordinatorTest { + + @Test + public void testUpdateLastWrittenOffset() { + LogContext logContext = new LogContext(); + SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext); + SnapshottableCoordinator coordinator = new SnapshottableCoordinator<>( + logContext, + snapshotRegistry, + new MockCoordinatorShard(snapshotRegistry, new MockCoordinatorTimer<>(new MockTime())), + new TopicPartition("test-topic", 0) + ); + + assertTrue(coordinator.snapshotRegistry().hasSnapshot(0L)); + coordinator.updateLastWrittenOffset(100L); + assertEquals(100L, coordinator.lastWrittenOffset()); + assertTrue(coordinator.snapshotRegistry().hasSnapshot(100L)); + assertTrue(coordinator.snapshotRegistry().hasSnapshot(0L)); + } + + @Test + public void testUpdateLastWrittenOffsetFailed() { + LogContext logContext = new LogContext(); + SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext); + SnapshottableCoordinator coordinator = new SnapshottableCoordinator<>( + logContext, + snapshotRegistry, + new MockCoordinatorShard(snapshotRegistry, new MockCoordinatorTimer<>(new MockTime())), + new TopicPartition("test-topic", 0) + ); + + assertEquals(0L, coordinator.lastWrittenOffset()); + assertThrows(IllegalStateException.class, () -> coordinator.updateLastWrittenOffset(0L)); + assertEquals(0L, coordinator.lastWrittenOffset()); + } + + @Test + public void testRevertWrittenOffset() { + LogContext logContext = new LogContext(); + SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext); + SnapshottableCoordinator coordinator = new SnapshottableCoordinator<>( + logContext, + snapshotRegistry, + new MockCoordinatorShard(snapshotRegistry, new MockCoordinatorTimer<>(new MockTime())), + new TopicPartition("test-topic", 0) + ); + + coordinator.updateLastWrittenOffset(100L); + coordinator.updateLastWrittenOffset(200L); + assertTrue(coordinator.snapshotRegistry().hasSnapshot(0L)); + assertTrue(coordinator.snapshotRegistry().hasSnapshot(100L)); + assertTrue(coordinator.snapshotRegistry().hasSnapshot(200L)); + + coordinator.revertLastWrittenOffset(100L); + assertEquals(100L, coordinator.lastWrittenOffset()); + assertTrue(coordinator.snapshotRegistry().hasSnapshot(100L)); + assertFalse(coordinator.snapshotRegistry().hasSnapshot(200L)); + } + + @Test + public void testRevertLastWrittenOffsetFailed() { + LogContext logContext = new LogContext(); + SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext); + SnapshottableCoordinator coordinator = new SnapshottableCoordinator<>( + logContext, + snapshotRegistry, + new MockCoordinatorShard(snapshotRegistry, new MockCoordinatorTimer<>(new MockTime())), + new TopicPartition("test-topic", 0) + ); + + assertEquals(0, coordinator.lastWrittenOffset()); + assertThrows(IllegalStateException.class, () -> coordinator.revertLastWrittenOffset(1L)); + assertEquals(0, coordinator.lastWrittenOffset()); + } + + @Test + public void testUpdateLastCommittedOffset() { + LogContext logContext = new LogContext(); + SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext); + SnapshottableCoordinator coordinator = new SnapshottableCoordinator<>( + logContext, + snapshotRegistry, + new MockCoordinatorShard(snapshotRegistry, new MockCoordinatorTimer<>(new MockTime())), + new TopicPartition("test-topic", 0) + ); + + coordinator.updateLastWrittenOffset(100L); + assertTrue(coordinator.snapshotRegistry().hasSnapshot(0L)); + assertTrue(coordinator.snapshotRegistry().hasSnapshot(100L)); + + coordinator.updateLastCommittedOffset(100L); + assertEquals(100L, coordinator.lastCommittedOffset()); + assertFalse(coordinator.snapshotRegistry().hasSnapshot(0L)); + assertTrue(coordinator.snapshotRegistry().hasSnapshot(100L)); + } + + @Test + public void testUpdateLastCommittedOffsetFailed() { + LogContext logContext = new LogContext(); + SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext); + SnapshottableCoordinator coordinator = new SnapshottableCoordinator<>( + logContext, + snapshotRegistry, + new MockCoordinatorShard(snapshotRegistry, new MockCoordinatorTimer<>(new MockTime())), + new TopicPartition("test-topic", 0) + ); + + coordinator.updateLastWrittenOffset(100L); + coordinator.updateLastCommittedOffset(100L); + assertEquals(100L, coordinator.lastCommittedOffset()); + assertThrows(IllegalStateException.class, () -> coordinator.updateLastCommittedOffset(99L)); + assertEquals(100L, coordinator.lastCommittedOffset()); + } +}