diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/SnapshottableCoordinator.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/SnapshottableCoordinator.java index b7f74748c45..467ed84da01 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/SnapshottableCoordinator.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/SnapshottableCoordinator.java @@ -74,7 +74,7 @@ class SnapshottableCoordinator, U> implements Coor this.tp = tp; this.lastWrittenOffset = 0; this.lastCommittedOffset = 0; - snapshotRegistry.getOrCreateSnapshot(0); + snapshotRegistry.idempotentCreateSnapshot(0); } /** @@ -145,7 +145,7 @@ class SnapshottableCoordinator, U> implements Coor } lastWrittenOffset = offset; - snapshotRegistry.getOrCreateSnapshot(offset); + snapshotRegistry.idempotentCreateSnapshot(offset); log.debug("Updated last written offset of {} to {}.", tp, offset); } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java index cef907da2c1..d9add89eb50 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java @@ -532,7 +532,7 @@ public class GroupMetadataManagerTestContext { this.groupMetadataManager = groupMetadataManager; this.classicGroupInitialRebalanceDelayMs = classicGroupInitialRebalanceDelayMs; this.classicGroupNewMemberJoinTimeoutMs = classicGroupNewMemberJoinTimeoutMs; - snapshotRegistry.getOrCreateSnapshot(lastWrittenOffset); + snapshotRegistry.idempotentCreateSnapshot(lastWrittenOffset); } public void commit() { @@ -1512,7 +1512,7 @@ public class GroupMetadataManagerTestContext { } lastWrittenOffset++; - snapshotRegistry.getOrCreateSnapshot(lastWrittenOffset); + snapshotRegistry.idempotentCreateSnapshot(lastWrittenOffset); } void onUnloaded() { diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java index f4334dc6e54..a8d019258f4 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java @@ -481,7 +481,7 @@ public class OffsetMetadataManagerTest { long producerId, CoordinatorRecord record ) { - snapshotRegistry.getOrCreateSnapshot(lastWrittenOffset); + snapshotRegistry.idempotentCreateSnapshot(lastWrittenOffset); ApiMessageAndVersion key = record.key(); ApiMessageAndVersion value = record.value(); @@ -512,7 +512,7 @@ public class OffsetMetadataManagerTest { long producerId, TransactionResult result ) { - snapshotRegistry.getOrCreateSnapshot(lastWrittenOffset); + snapshotRegistry.idempotentCreateSnapshot(lastWrittenOffset); offsetMetadataManager.replayEndTransactionMarker(producerId, result); lastWrittenOffset++; } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsShardTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsShardTest.java index a68cd4a4ab2..950c359294a 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsShardTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsShardTest.java @@ -61,7 +61,7 @@ public class GroupCoordinatorMetricsShardTest { shard.incrementNumConsumerGroups(ConsumerGroup.ConsumerGroupState.STABLE); shard.incrementNumConsumerGroups(ConsumerGroup.ConsumerGroupState.DEAD); - snapshotRegistry.getOrCreateSnapshot(1000); + snapshotRegistry.idempotentCreateSnapshot(1000); // The value should not be updated until the offset has been committed. assertEquals(0, shard.numOffsets()); assertEquals(0, shard.numConsumerGroups()); @@ -87,7 +87,7 @@ public class GroupCoordinatorMetricsShardTest { shard.decrementNumConsumerGroups(ConsumerGroup.ConsumerGroupState.STABLE); shard.decrementNumConsumerGroups(ConsumerGroup.ConsumerGroupState.DEAD); - snapshotRegistry.getOrCreateSnapshot(2000); + snapshotRegistry.idempotentCreateSnapshot(2000); shard.commitUpTo(2000); assertEquals(0, shard.numOffsets()); assertEquals(0, shard.numConsumerGroups()); @@ -184,7 +184,7 @@ public class GroupCoordinatorMetricsShardTest { IntStream.range(0, 4).forEach(__ -> shard.incrementNumConsumerGroups(ConsumerGroup.ConsumerGroupState.EMPTY)); - snapshotRegistry.getOrCreateSnapshot(1000); + snapshotRegistry.idempotentCreateSnapshot(1000); shard.commitUpTo(1000); assertEquals(4, shard.numConsumerGroups()); assertEquals(4, shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.EMPTY)); @@ -198,7 +198,7 @@ public class GroupCoordinatorMetricsShardTest { group2.updateMember(member2); group3.updateMember(member3); - snapshotRegistry.getOrCreateSnapshot(2000); + snapshotRegistry.idempotentCreateSnapshot(2000); shard.commitUpTo(2000); assertEquals(0, shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.EMPTY)); assertEquals(4, shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.STABLE)); @@ -206,7 +206,7 @@ public class GroupCoordinatorMetricsShardTest { group2.setGroupEpoch(1); group3.setGroupEpoch(1); - snapshotRegistry.getOrCreateSnapshot(3000); + snapshotRegistry.idempotentCreateSnapshot(3000); shard.commitUpTo(3000); assertEquals(0, shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.EMPTY)); assertEquals(2, shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.ASSIGNING)); @@ -219,7 +219,7 @@ public class GroupCoordinatorMetricsShardTest { .setPartitionsPendingRevocation(Collections.singletonMap(Uuid.ZERO_UUID, Collections.singleton(0))) .build(); - snapshotRegistry.getOrCreateSnapshot(4000); + snapshotRegistry.idempotentCreateSnapshot(4000); shard.commitUpTo(4000); assertEquals(0, shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.EMPTY)); assertEquals(1, shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.ASSIGNING)); diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsTest.java index 3631a581cd3..31995a71757 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsTest.java @@ -154,8 +154,8 @@ public class GroupCoordinatorMetricsTest { 9 ); - snapshotRegistry0.getOrCreateSnapshot(1000); - snapshotRegistry1.getOrCreateSnapshot(1500); + snapshotRegistry0.idempotentCreateSnapshot(1000); + snapshotRegistry1.idempotentCreateSnapshot(1500); shard0.commitUpTo(1000); shard1.commitUpTo(1500); diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java index c5cd2ab194c..e82128ab594 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java @@ -1110,12 +1110,12 @@ public class ConsumerGroupTest { new TopicPartition("__consumer_offsets", 0) ); ConsumerGroup group = new ConsumerGroup(snapshotRegistry, "group-foo", metricsShard); - snapshotRegistry.getOrCreateSnapshot(0); + snapshotRegistry.idempotentCreateSnapshot(0); assertEquals(ConsumerGroup.ConsumerGroupState.EMPTY.toString(), group.stateAsString(0)); group.updateMember(new ConsumerGroupMember.Builder("member1") .setSubscribedTopicNames(Collections.singletonList("foo")) .build()); - snapshotRegistry.getOrCreateSnapshot(1); + snapshotRegistry.idempotentCreateSnapshot(1); assertEquals(ConsumerGroup.ConsumerGroupState.EMPTY.toString(), group.stateAsString(0)); assertEquals(ConsumerGroup.ConsumerGroupState.STABLE.toString(), group.stateAsString(1)); } @@ -1137,7 +1137,7 @@ public class ConsumerGroupTest { group.validateOffsetFetch("member-id", 0, Long.MAX_VALUE)); // Create a member. - snapshotRegistry.getOrCreateSnapshot(0); + snapshotRegistry.idempotentCreateSnapshot(0); group.updateMember(new ConsumerGroupMember.Builder("member-id").build()); // The member does not exist at last committed offset 0. @@ -1244,7 +1244,7 @@ public class ConsumerGroupTest { public void testAsDescribedGroup() { SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext()); ConsumerGroup group = new ConsumerGroup(snapshotRegistry, "group-id-1", mock(GroupCoordinatorMetricsShard.class)); - snapshotRegistry.getOrCreateSnapshot(0); + snapshotRegistry.idempotentCreateSnapshot(0); assertEquals(ConsumerGroup.ConsumerGroupState.EMPTY.toString(), group.stateAsString(0)); group.updateMember(new ConsumerGroupMember.Builder("member1") @@ -1253,7 +1253,7 @@ public class ConsumerGroupTest { .build()); group.updateMember(new ConsumerGroupMember.Builder("member2") .build()); - snapshotRegistry.getOrCreateSnapshot(1); + snapshotRegistry.idempotentCreateSnapshot(1); ConsumerGroupDescribeResponseData.DescribedGroup expected = new ConsumerGroupDescribeResponseData.DescribedGroup() .setGroupId("group-id-1") @@ -1324,14 +1324,14 @@ public class ConsumerGroupTest { new TopicPartition("__consumer_offsets", 0) ); ConsumerGroup group = new ConsumerGroup(snapshotRegistry, "group-foo", metricsShard); - snapshotRegistry.getOrCreateSnapshot(0); + snapshotRegistry.idempotentCreateSnapshot(0); assertTrue(group.isInStates(Collections.singleton("empty"), 0)); assertFalse(group.isInStates(Collections.singleton("Empty"), 0)); group.updateMember(new ConsumerGroupMember.Builder("member1") .setSubscribedTopicNames(Collections.singletonList("foo")) .build()); - snapshotRegistry.getOrCreateSnapshot(1); + snapshotRegistry.idempotentCreateSnapshot(1); assertTrue(group.isInStates(Collections.singleton("empty"), 0)); assertTrue(group.isInStates(Collections.singleton("stable"), 1)); assertFalse(group.isInStates(Collections.singleton("empty"), 1)); diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupTest.java index 3310e080744..45bded5bac7 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupTest.java @@ -678,13 +678,13 @@ public class ShareGroupTest { public void testAsListedGroup() { SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext()); ShareGroup shareGroup = new ShareGroup(snapshotRegistry, "group-foo"); - snapshotRegistry.getOrCreateSnapshot(0); + snapshotRegistry.idempotentCreateSnapshot(0); assertEquals(ShareGroupState.EMPTY, shareGroup.state(0)); assertEquals("Empty", shareGroup.stateAsString(0)); shareGroup.updateMember(new ShareGroupMember.Builder("member1") .setSubscribedTopicNames(Collections.singletonList("foo")) .build()); - snapshotRegistry.getOrCreateSnapshot(1); + snapshotRegistry.idempotentCreateSnapshot(1); assertEquals(ShareGroupState.EMPTY, shareGroup.state(0)); assertEquals("Empty", shareGroup.stateAsString(0)); assertEquals(ShareGroupState.STABLE, shareGroup.state(1)); @@ -786,7 +786,7 @@ public class ShareGroupTest { public void testAsDescribedGroup() { SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext()); ShareGroup shareGroup = new ShareGroup(snapshotRegistry, "group-id-1"); - snapshotRegistry.getOrCreateSnapshot(0); + snapshotRegistry.idempotentCreateSnapshot(0); assertEquals(ShareGroupState.EMPTY.toString(), shareGroup.stateAsString(0)); shareGroup.updateMember(new ShareGroupMember.Builder("member1") @@ -794,7 +794,7 @@ public class ShareGroupTest { .build()); shareGroup.updateMember(new ShareGroupMember.Builder("member2") .build()); - snapshotRegistry.getOrCreateSnapshot(1); + snapshotRegistry.idempotentCreateSnapshot(1); ShareGroupDescribeResponseData.DescribedGroup expected = new ShareGroupDescribeResponseData.DescribedGroup() .setGroupId("group-id-1") @@ -818,14 +818,14 @@ public class ShareGroupTest { public void testIsInStatesCaseInsensitive() { SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext()); ShareGroup shareGroup = new ShareGroup(snapshotRegistry, "group-foo"); - snapshotRegistry.getOrCreateSnapshot(0); + snapshotRegistry.idempotentCreateSnapshot(0); assertTrue(shareGroup.isInStates(Collections.singleton("empty"), 0)); assertFalse(shareGroup.isInStates(Collections.singleton("Empty"), 0)); shareGroup.updateMember(new ShareGroupMember.Builder("member1") .setSubscribedTopicNames(Collections.singletonList("foo")) .build()); - snapshotRegistry.getOrCreateSnapshot(1); + snapshotRegistry.idempotentCreateSnapshot(1); assertTrue(shareGroup.isInStates(Collections.singleton("empty"), 0)); assertTrue(shareGroup.isInStates(Collections.singleton("stable"), 1)); assertFalse(shareGroup.isInStates(Collections.singleton("empty"), 1)); diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/timeline/TimelineHashMapBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/timeline/TimelineHashMapBenchmark.java index 280215541c0..cc9ae039592 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/timeline/TimelineHashMapBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/timeline/TimelineHashMapBenchmark.java @@ -78,7 +78,7 @@ public class TimelineHashMapBenchmark { int key = (int) (0xffffffff & ((i * 2862933555777941757L) + 3037000493L)); if (j > 10 && key % 3 == 0) { snapshotRegistry.deleteSnapshotsUpTo(epoch - 1000); - snapshotRegistry.getOrCreateSnapshot(epoch); + snapshotRegistry.idempotentCreateSnapshot(epoch); j = 0; } else { j++; diff --git a/metadata/src/main/java/org/apache/kafka/controller/OffsetControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/OffsetControlManager.java index 4094c34e580..b11072648e0 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/OffsetControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/OffsetControlManager.java @@ -36,7 +36,7 @@ import java.util.Optional; /** * Manages read and write offsets, and in-memory snapshots. - * + *

* Also manages the following metrics: * kafka.controller:type=KafkaController,name=ActiveControllerCount * kafka.controller:type=KafkaController,name=LastAppliedRecordLagMs @@ -45,7 +45,7 @@ import java.util.Optional; * kafka.controller:type=KafkaController,name=LastCommittedRecordOffset */ class OffsetControlManager { - public static class Builder { + static class Builder { private LogContext logContext = null; private SnapshotRegistry snapshotRegistry = null; private QuorumControllerMetrics metrics = null; @@ -71,7 +71,7 @@ class OffsetControlManager { return this; } - public OffsetControlManager build() { + OffsetControlManager build() { if (logContext == null) logContext = new LogContext(); if (snapshotRegistry == null) snapshotRegistry = new SnapshotRegistry(logContext); if (metrics == null) { @@ -156,7 +156,7 @@ class OffsetControlManager { this.lastStableOffset = -1L; this.transactionStartOffset = -1L; this.nextWriteOffset = -1L; - snapshotRegistry.getOrCreateSnapshot(-1L); + snapshotRegistry.idempotentCreateSnapshot(-1L); metrics.setActive(false); metrics.setLastCommittedRecordOffset(-1L); metrics.setLastAppliedRecordOffset(-1L); @@ -249,7 +249,7 @@ class OffsetControlManager { // Before switching to active, create an in-memory snapshot at the last committed // offset. This is required because the active controller assumes that there is always // an in-memory snapshot at the last committed offset. - snapshotRegistry.getOrCreateSnapshot(lastStableOffset); + snapshotRegistry.idempotentCreateSnapshot(lastStableOffset); this.nextWriteOffset = newNextWriteOffset; metrics.setActive(true); } @@ -298,7 +298,7 @@ class OffsetControlManager { void handleScheduleAtomicAppend(long endOffset) { this.nextWriteOffset = endOffset + 1; - snapshotRegistry.getOrCreateSnapshot(endOffset); + snapshotRegistry.idempotentCreateSnapshot(endOffset); metrics.setLastAppliedRecordOffset(endOffset); @@ -323,7 +323,7 @@ class OffsetControlManager { lastStableOffset = newLastStableOffset; snapshotRegistry.deleteSnapshotsUpTo(lastStableOffset); if (!active()) { - snapshotRegistry.getOrCreateSnapshot(lastStableOffset); + snapshotRegistry.idempotentCreateSnapshot(lastStableOffset); } } } @@ -362,7 +362,7 @@ class OffsetControlManager { "current snapshot."); } log.info("Successfully loaded snapshot {}.", currentSnapshotName); - this.snapshotRegistry.getOrCreateSnapshot(currentSnapshotId.offset()); + this.snapshotRegistry.idempotentCreateSnapshot(currentSnapshotId.offset()); this.lastCommittedOffset = currentSnapshotId.offset(); this.lastCommittedEpoch = currentSnapshotId.epoch(); this.lastStableOffset = currentSnapshotId.offset(); @@ -383,7 +383,7 @@ class OffsetControlManager { throw new RuntimeException("Can't replay a BeginTransactionRecord at " + offset + " because the transaction at " + transactionStartOffset + " was never closed."); } - snapshotRegistry.getOrCreateSnapshot(offset - 1); + snapshotRegistry.idempotentCreateSnapshot(offset - 1); transactionStartOffset = offset; log.info("Replayed {} at offset {}.", message, offset); } diff --git a/metadata/src/test/java/org/apache/kafka/controller/AclControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/AclControlManagerTest.java index 8ad6ce2633e..84143c8b3e1 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/AclControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/AclControlManagerTest.java @@ -203,7 +203,7 @@ public class AclControlManagerTest { @Test public void testLoadSnapshot() { SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext()); - snapshotRegistry.getOrCreateSnapshot(0); + snapshotRegistry.idempotentCreateSnapshot(0); AclControlManager manager = new AclControlManager.Builder(). setSnapshotRegistry(snapshotRegistry). build(); diff --git a/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java index 32401d27447..4a19f0eb11b 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java @@ -54,7 +54,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals; @Timeout(value = 40) public class FeatureControlManagerTest { - @SuppressWarnings("unchecked") private static Map rangeMap(Object... args) { Map result = new HashMap<>(); for (int i = 0; i < args.length; i += 3) { @@ -100,7 +99,7 @@ public class FeatureControlManagerTest { setSnapshotRegistry(snapshotRegistry). setMetadataVersion(MetadataVersion.IBP_3_3_IV0). build(); - snapshotRegistry.getOrCreateSnapshot(-1); + snapshotRegistry.idempotentCreateSnapshot(-1); assertEquals(new FinalizedControllerFeatures(Collections.singletonMap("metadata.version", (short) 4), -1), manager.finalizedFeatures(-1)); assertEquals(ControllerResult.atomicOf(emptyList(), Collections. @@ -131,7 +130,7 @@ public class FeatureControlManagerTest { FeatureLevelRecord record = new FeatureLevelRecord(). setName("foo").setFeatureLevel((short) 2); - snapshotRegistry.getOrCreateSnapshot(-1); + snapshotRegistry.idempotentCreateSnapshot(-1); FeatureControlManager manager = new FeatureControlManager.Builder(). setLogContext(logContext). setQuorumFeatures(features("foo", 1, 2)). @@ -139,7 +138,7 @@ public class FeatureControlManagerTest { setMetadataVersion(MetadataVersion.IBP_3_3_IV0). build(); manager.replay(record); - snapshotRegistry.getOrCreateSnapshot(123); + snapshotRegistry.idempotentCreateSnapshot(123); assertEquals(new FinalizedControllerFeatures(versionMap("metadata.version", 4, "foo", 2), 123), manager.finalizedFeatures(123)); } @@ -185,7 +184,7 @@ public class FeatureControlManagerTest { updateMap("bar", 3), Collections.emptyMap(), false); assertEquals(Collections.singletonMap("bar", ApiError.NONE), result.response()); manager.replay((FeatureLevelRecord) result.records().get(0).message()); - snapshotRegistry.getOrCreateSnapshot(3); + snapshotRegistry.idempotentCreateSnapshot(3); assertEquals(ControllerResult.atomicOf(emptyList(), Collections. singletonMap("bar", new ApiError(Errors.INVALID_UPDATE_VERSION, @@ -213,7 +212,7 @@ public class FeatureControlManagerTest { } @Test - public void testReplayRecords() throws Exception { + public void testReplayRecords() { LogContext logContext = new LogContext(); SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext); FeatureControlManager manager = new FeatureControlManager.Builder(). diff --git a/server-common/src/main/java/org/apache/kafka/timeline/SnapshotRegistry.java b/server-common/src/main/java/org/apache/kafka/timeline/SnapshotRegistry.java index b35670a0bcc..52ab96ecbcd 100644 --- a/server-common/src/main/java/org/apache/kafka/timeline/SnapshotRegistry.java +++ b/server-common/src/main/java/org/apache/kafka/timeline/SnapshotRegistry.java @@ -119,7 +119,7 @@ public class SnapshotRegistry { * Returns a snapshot iterator that iterates from the snapshots with the * lowest epoch to those with the highest. */ - public Iterator iterator() { + Iterator iterator() { return new SnapshotIterator(head.next()); } @@ -128,7 +128,7 @@ public class SnapshotRegistry { * lowest epoch to those with the highest, starting at the snapshot with the * given epoch. */ - public Iterator iterator(long epoch) { + Iterator iterator(long epoch) { return iterator(getSnapshot(epoch)); } @@ -136,7 +136,7 @@ public class SnapshotRegistry { * Returns a snapshot iterator that iterates from the snapshots with the * lowest epoch to those with the highest, starting at the given snapshot. */ - public Iterator iterator(Snapshot snapshot) { + Iterator iterator(Snapshot snapshot) { return new SnapshotIterator(snapshot); } @@ -144,7 +144,7 @@ public class SnapshotRegistry { * Returns a reverse snapshot iterator that iterates from the snapshots with the * highest epoch to those with the lowest. */ - public Iterator reverseIterator() { + Iterator reverseIterator() { return new ReverseSnapshotIterator(); } @@ -172,7 +172,7 @@ public class SnapshotRegistry { /** * Gets the snapshot for a specific epoch. */ - public Snapshot getSnapshot(long epoch) { + Snapshot getSnapshot(long epoch) { Snapshot snapshot = snapshots.get(epoch); if (snapshot == null) { throw new RuntimeException("No in-memory snapshot for epoch " + epoch + ". Snapshot " + @@ -183,13 +183,13 @@ public class SnapshotRegistry { /** * Creates a new snapshot at the given epoch. - *
- * If {@code epoch} already exists and it is the last snapshot then just return that snapshot. + *

+ * If {@code epoch} already exists, and it is the last snapshot then just return that snapshot. * - * @param epoch The epoch to create the snapshot at. The current epoch + * @param epoch The epoch to create the snapshot at. The current epoch * will be advanced to one past this epoch. */ - public Snapshot getOrCreateSnapshot(long epoch) { + Snapshot getOrCreateSnapshot(long epoch) { Snapshot last = head.prev(); if (last.epoch() > epoch) { throw new RuntimeException("Can't create a new in-memory snapshot at epoch " + epoch + @@ -205,6 +205,18 @@ public class SnapshotRegistry { return snapshot; } + /** + * Creates a new snapshot at the given epoch. + *

+ * If {@code epoch} already exists, and it is the last snapshot then this operation will do nothing. + * + * @param epoch The epoch to create the snapshot at. The current epoch + * will be advanced to one past this epoch. + */ + public void idempotentCreateSnapshot(long epoch) { + getOrCreateSnapshot(epoch); + } + /** * Reverts the state of all data structures to the state at the given epoch. * @@ -238,7 +250,7 @@ public class SnapshotRegistry { * * @param snapshot The snapshot to delete. */ - public void deleteSnapshot(Snapshot snapshot) { + void deleteSnapshot(Snapshot snapshot) { Snapshot prev = snapshot.prev(); if (prev != head) { prev.mergeFrom(snapshot); @@ -274,7 +286,7 @@ public class SnapshotRegistry { /** * Associate a revertable with this registry. */ - public void register(Revertable revertable) { + void register(Revertable revertable) { revertables.add(revertable); } diff --git a/server-common/src/main/java/org/apache/kafka/timeline/SnapshottableHashTable.java b/server-common/src/main/java/org/apache/kafka/timeline/SnapshottableHashTable.java index 8fb3dbd50d3..c080113c9a5 100644 --- a/server-common/src/main/java/org/apache/kafka/timeline/SnapshottableHashTable.java +++ b/server-common/src/main/java/org/apache/kafka/timeline/SnapshottableHashTable.java @@ -29,33 +29,33 @@ import java.util.NoSuchElementException; * We handle divergences between the current state and historical state by copying a * reference to elements that have been deleted or overwritten into the most recent * snapshot tier. - *
+ *

* Note that there are no keys in SnapshottableHashTable, only values. So it more similar * to a hash set than a hash map. The subclasses implement full-featured maps and sets * using this class as a building block. - *
+ *

* Each snapshot tier contains a size and a hash table. The size reflects the size at * the time the snapshot was taken. Note that, as an optimization, snapshot tiers will * be null if they don't contain anything. So for example, if snapshot 20 of Object O * contains the same entries as snapshot 10 of that object, the snapshot 20 tier for * object O will be null. - *
+ *

* The current tier's data is stored in the fields inherited from BaseHashTable. It * would be conceptually simpler to have a separate BaseHashTable object, but since Java * doesn't have value types, subclassing is the only way to avoid another pointer * indirection and the associated extra memory cost. - *
+ *

* Note that each element in the hash table contains a start epoch, and a value. The * start epoch is there to identify when the object was first inserted. This in turn * determines which snapshots it is a member of. - *
+ *

* In order to retrieve an object from snapshot E, we start by checking to see if the * object exists in the "current" hash tier. If it does, and its startEpoch extends back * to E, we return that object. Otherwise, we check all the snapshot tiers, starting * with E, and ending with the most recent snapshot, to see if the object is there. * As an optimization, if we encounter the object in a snapshot tier but its epoch is too * new, we know that its value at epoch E must be null, so we can return that immediately. - *
+ *

* The class hierarchy looks like this: *

  *        Revertable       BaseHashTable
@@ -66,11 +66,11 @@ import java.util.NoSuchElementException;
  * 
* BaseHashTable is a simple hash table that uses separate chaining. The interface is * pretty bare-bones since this class is not intended to be used directly by end-users. - *
+ *

* This class, SnapshottableHashTable, has the logic for snapshotting and iterating over * snapshots. This is the core of the snapshotted hash table code and handles the * tiering. - *
+ *

* TimelineHashSet and TimelineHashMap are mostly wrappers around this * SnapshottableHashTable class. They implement standard Java APIs for Set and Map, * respectively. There's a fair amount of boilerplate for this, but it's necessary so @@ -78,11 +78,11 @@ import java.util.NoSuchElementException; * The accessor APIs have two versions -- one that looks at the current state, and one * that looks at a historical snapshotted state. Mutation APIs only ever mutate the * current state. - *
+ *

* One very important feature of SnapshottableHashTable is that we support iterating * over a snapshot even while changes are being made to the current state. See the * Javadoc for the iterator for more information about how this is accomplished. - *
+ *

* All of these classes require external synchronization, and don't support null keys or * values. */ @@ -135,7 +135,7 @@ class SnapshottableHashTable * You can use this iterator even if you are making changes to the map. * The changes may or may not be visible while you are iterating. */ @@ -185,7 +185,7 @@ class SnapshottableHashTable * You can use this iterator even if you are making changes to the map. * The snapshot is immutable and will always show up the same. */ @@ -408,37 +408,6 @@ class SnapshottableHashTable iter = snapshotRegistry.iterator(); iter.hasNext(); ) { - Snapshot snapshot = iter.next(); - bld.append(prefix); - bld.append("epoch ").append(snapshot.epoch()).append(": "); - HashTier tier = snapshot.getDelta(this); - if (tier == null) { - bld.append("null"); - } else { - bld.append("HashTier{"); - bld.append("size=").append(tier.size); - bld.append(", deltaTable="); - if (tier.deltaTable == null) { - bld.append("null"); - } else { - bld.append(tier.deltaTable.baseToDebugString()); - } - bld.append("}"); - } - bld.append(String.format("%n")); - } - bld.append(String.format("]}%n")); - return bld.toString(); - } - @SuppressWarnings("unchecked") @Override public void executeRevert(long targetEpoch, Delta delta) {