mirror of https://github.com/apache/kafka.git
KAFKA-15875: Stops leak Snapshot in public methods (#16807)
* KAFKA-15875: Stops leak Snapshot in public methods The Snapshot class is package protected but it's returned in several public methods in SnapshotRegistry. To prevent this accidental leakage, these methods are made package protected as well. For getOrCreateSnapshot a new method called IdempotentCreateSnapshot is created that returns void. * Make builer package protected, replace <br> with <p> Reviewers: Greg Harris <greg.harris@aiven.io>
This commit is contained in:
parent
130af38481
commit
4e862c0903
|
@ -74,7 +74,7 @@ class SnapshottableCoordinator<S extends CoordinatorShard<U>, U> implements Coor
|
|||
this.tp = tp;
|
||||
this.lastWrittenOffset = 0;
|
||||
this.lastCommittedOffset = 0;
|
||||
snapshotRegistry.getOrCreateSnapshot(0);
|
||||
snapshotRegistry.idempotentCreateSnapshot(0);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -145,7 +145,7 @@ class SnapshottableCoordinator<S extends CoordinatorShard<U>, U> implements Coor
|
|||
}
|
||||
|
||||
lastWrittenOffset = offset;
|
||||
snapshotRegistry.getOrCreateSnapshot(offset);
|
||||
snapshotRegistry.idempotentCreateSnapshot(offset);
|
||||
log.debug("Updated last written offset of {} to {}.", tp, offset);
|
||||
}
|
||||
|
||||
|
|
|
@ -532,7 +532,7 @@ public class GroupMetadataManagerTestContext {
|
|||
this.groupMetadataManager = groupMetadataManager;
|
||||
this.classicGroupInitialRebalanceDelayMs = classicGroupInitialRebalanceDelayMs;
|
||||
this.classicGroupNewMemberJoinTimeoutMs = classicGroupNewMemberJoinTimeoutMs;
|
||||
snapshotRegistry.getOrCreateSnapshot(lastWrittenOffset);
|
||||
snapshotRegistry.idempotentCreateSnapshot(lastWrittenOffset);
|
||||
}
|
||||
|
||||
public void commit() {
|
||||
|
@ -1512,7 +1512,7 @@ public class GroupMetadataManagerTestContext {
|
|||
}
|
||||
|
||||
lastWrittenOffset++;
|
||||
snapshotRegistry.getOrCreateSnapshot(lastWrittenOffset);
|
||||
snapshotRegistry.idempotentCreateSnapshot(lastWrittenOffset);
|
||||
}
|
||||
|
||||
void onUnloaded() {
|
||||
|
|
|
@ -481,7 +481,7 @@ public class OffsetMetadataManagerTest {
|
|||
long producerId,
|
||||
CoordinatorRecord record
|
||||
) {
|
||||
snapshotRegistry.getOrCreateSnapshot(lastWrittenOffset);
|
||||
snapshotRegistry.idempotentCreateSnapshot(lastWrittenOffset);
|
||||
|
||||
ApiMessageAndVersion key = record.key();
|
||||
ApiMessageAndVersion value = record.value();
|
||||
|
@ -512,7 +512,7 @@ public class OffsetMetadataManagerTest {
|
|||
long producerId,
|
||||
TransactionResult result
|
||||
) {
|
||||
snapshotRegistry.getOrCreateSnapshot(lastWrittenOffset);
|
||||
snapshotRegistry.idempotentCreateSnapshot(lastWrittenOffset);
|
||||
offsetMetadataManager.replayEndTransactionMarker(producerId, result);
|
||||
lastWrittenOffset++;
|
||||
}
|
||||
|
|
|
@ -61,7 +61,7 @@ public class GroupCoordinatorMetricsShardTest {
|
|||
shard.incrementNumConsumerGroups(ConsumerGroup.ConsumerGroupState.STABLE);
|
||||
shard.incrementNumConsumerGroups(ConsumerGroup.ConsumerGroupState.DEAD);
|
||||
|
||||
snapshotRegistry.getOrCreateSnapshot(1000);
|
||||
snapshotRegistry.idempotentCreateSnapshot(1000);
|
||||
// The value should not be updated until the offset has been committed.
|
||||
assertEquals(0, shard.numOffsets());
|
||||
assertEquals(0, shard.numConsumerGroups());
|
||||
|
@ -87,7 +87,7 @@ public class GroupCoordinatorMetricsShardTest {
|
|||
shard.decrementNumConsumerGroups(ConsumerGroup.ConsumerGroupState.STABLE);
|
||||
shard.decrementNumConsumerGroups(ConsumerGroup.ConsumerGroupState.DEAD);
|
||||
|
||||
snapshotRegistry.getOrCreateSnapshot(2000);
|
||||
snapshotRegistry.idempotentCreateSnapshot(2000);
|
||||
shard.commitUpTo(2000);
|
||||
assertEquals(0, shard.numOffsets());
|
||||
assertEquals(0, shard.numConsumerGroups());
|
||||
|
@ -184,7 +184,7 @@ public class GroupCoordinatorMetricsShardTest {
|
|||
|
||||
IntStream.range(0, 4).forEach(__ -> shard.incrementNumConsumerGroups(ConsumerGroup.ConsumerGroupState.EMPTY));
|
||||
|
||||
snapshotRegistry.getOrCreateSnapshot(1000);
|
||||
snapshotRegistry.idempotentCreateSnapshot(1000);
|
||||
shard.commitUpTo(1000);
|
||||
assertEquals(4, shard.numConsumerGroups());
|
||||
assertEquals(4, shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.EMPTY));
|
||||
|
@ -198,7 +198,7 @@ public class GroupCoordinatorMetricsShardTest {
|
|||
group2.updateMember(member2);
|
||||
group3.updateMember(member3);
|
||||
|
||||
snapshotRegistry.getOrCreateSnapshot(2000);
|
||||
snapshotRegistry.idempotentCreateSnapshot(2000);
|
||||
shard.commitUpTo(2000);
|
||||
assertEquals(0, shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.EMPTY));
|
||||
assertEquals(4, shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.STABLE));
|
||||
|
@ -206,7 +206,7 @@ public class GroupCoordinatorMetricsShardTest {
|
|||
group2.setGroupEpoch(1);
|
||||
group3.setGroupEpoch(1);
|
||||
|
||||
snapshotRegistry.getOrCreateSnapshot(3000);
|
||||
snapshotRegistry.idempotentCreateSnapshot(3000);
|
||||
shard.commitUpTo(3000);
|
||||
assertEquals(0, shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.EMPTY));
|
||||
assertEquals(2, shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.ASSIGNING));
|
||||
|
@ -219,7 +219,7 @@ public class GroupCoordinatorMetricsShardTest {
|
|||
.setPartitionsPendingRevocation(Collections.singletonMap(Uuid.ZERO_UUID, Collections.singleton(0)))
|
||||
.build();
|
||||
|
||||
snapshotRegistry.getOrCreateSnapshot(4000);
|
||||
snapshotRegistry.idempotentCreateSnapshot(4000);
|
||||
shard.commitUpTo(4000);
|
||||
assertEquals(0, shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.EMPTY));
|
||||
assertEquals(1, shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.ASSIGNING));
|
||||
|
|
|
@ -154,8 +154,8 @@ public class GroupCoordinatorMetricsTest {
|
|||
9
|
||||
);
|
||||
|
||||
snapshotRegistry0.getOrCreateSnapshot(1000);
|
||||
snapshotRegistry1.getOrCreateSnapshot(1500);
|
||||
snapshotRegistry0.idempotentCreateSnapshot(1000);
|
||||
snapshotRegistry1.idempotentCreateSnapshot(1500);
|
||||
shard0.commitUpTo(1000);
|
||||
shard1.commitUpTo(1500);
|
||||
|
||||
|
|
|
@ -1110,12 +1110,12 @@ public class ConsumerGroupTest {
|
|||
new TopicPartition("__consumer_offsets", 0)
|
||||
);
|
||||
ConsumerGroup group = new ConsumerGroup(snapshotRegistry, "group-foo", metricsShard);
|
||||
snapshotRegistry.getOrCreateSnapshot(0);
|
||||
snapshotRegistry.idempotentCreateSnapshot(0);
|
||||
assertEquals(ConsumerGroup.ConsumerGroupState.EMPTY.toString(), group.stateAsString(0));
|
||||
group.updateMember(new ConsumerGroupMember.Builder("member1")
|
||||
.setSubscribedTopicNames(Collections.singletonList("foo"))
|
||||
.build());
|
||||
snapshotRegistry.getOrCreateSnapshot(1);
|
||||
snapshotRegistry.idempotentCreateSnapshot(1);
|
||||
assertEquals(ConsumerGroup.ConsumerGroupState.EMPTY.toString(), group.stateAsString(0));
|
||||
assertEquals(ConsumerGroup.ConsumerGroupState.STABLE.toString(), group.stateAsString(1));
|
||||
}
|
||||
|
@ -1137,7 +1137,7 @@ public class ConsumerGroupTest {
|
|||
group.validateOffsetFetch("member-id", 0, Long.MAX_VALUE));
|
||||
|
||||
// Create a member.
|
||||
snapshotRegistry.getOrCreateSnapshot(0);
|
||||
snapshotRegistry.idempotentCreateSnapshot(0);
|
||||
group.updateMember(new ConsumerGroupMember.Builder("member-id").build());
|
||||
|
||||
// The member does not exist at last committed offset 0.
|
||||
|
@ -1244,7 +1244,7 @@ public class ConsumerGroupTest {
|
|||
public void testAsDescribedGroup() {
|
||||
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
|
||||
ConsumerGroup group = new ConsumerGroup(snapshotRegistry, "group-id-1", mock(GroupCoordinatorMetricsShard.class));
|
||||
snapshotRegistry.getOrCreateSnapshot(0);
|
||||
snapshotRegistry.idempotentCreateSnapshot(0);
|
||||
assertEquals(ConsumerGroup.ConsumerGroupState.EMPTY.toString(), group.stateAsString(0));
|
||||
|
||||
group.updateMember(new ConsumerGroupMember.Builder("member1")
|
||||
|
@ -1253,7 +1253,7 @@ public class ConsumerGroupTest {
|
|||
.build());
|
||||
group.updateMember(new ConsumerGroupMember.Builder("member2")
|
||||
.build());
|
||||
snapshotRegistry.getOrCreateSnapshot(1);
|
||||
snapshotRegistry.idempotentCreateSnapshot(1);
|
||||
|
||||
ConsumerGroupDescribeResponseData.DescribedGroup expected = new ConsumerGroupDescribeResponseData.DescribedGroup()
|
||||
.setGroupId("group-id-1")
|
||||
|
@ -1324,14 +1324,14 @@ public class ConsumerGroupTest {
|
|||
new TopicPartition("__consumer_offsets", 0)
|
||||
);
|
||||
ConsumerGroup group = new ConsumerGroup(snapshotRegistry, "group-foo", metricsShard);
|
||||
snapshotRegistry.getOrCreateSnapshot(0);
|
||||
snapshotRegistry.idempotentCreateSnapshot(0);
|
||||
assertTrue(group.isInStates(Collections.singleton("empty"), 0));
|
||||
assertFalse(group.isInStates(Collections.singleton("Empty"), 0));
|
||||
|
||||
group.updateMember(new ConsumerGroupMember.Builder("member1")
|
||||
.setSubscribedTopicNames(Collections.singletonList("foo"))
|
||||
.build());
|
||||
snapshotRegistry.getOrCreateSnapshot(1);
|
||||
snapshotRegistry.idempotentCreateSnapshot(1);
|
||||
assertTrue(group.isInStates(Collections.singleton("empty"), 0));
|
||||
assertTrue(group.isInStates(Collections.singleton("stable"), 1));
|
||||
assertFalse(group.isInStates(Collections.singleton("empty"), 1));
|
||||
|
|
|
@ -678,13 +678,13 @@ public class ShareGroupTest {
|
|||
public void testAsListedGroup() {
|
||||
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
|
||||
ShareGroup shareGroup = new ShareGroup(snapshotRegistry, "group-foo");
|
||||
snapshotRegistry.getOrCreateSnapshot(0);
|
||||
snapshotRegistry.idempotentCreateSnapshot(0);
|
||||
assertEquals(ShareGroupState.EMPTY, shareGroup.state(0));
|
||||
assertEquals("Empty", shareGroup.stateAsString(0));
|
||||
shareGroup.updateMember(new ShareGroupMember.Builder("member1")
|
||||
.setSubscribedTopicNames(Collections.singletonList("foo"))
|
||||
.build());
|
||||
snapshotRegistry.getOrCreateSnapshot(1);
|
||||
snapshotRegistry.idempotentCreateSnapshot(1);
|
||||
assertEquals(ShareGroupState.EMPTY, shareGroup.state(0));
|
||||
assertEquals("Empty", shareGroup.stateAsString(0));
|
||||
assertEquals(ShareGroupState.STABLE, shareGroup.state(1));
|
||||
|
@ -786,7 +786,7 @@ public class ShareGroupTest {
|
|||
public void testAsDescribedGroup() {
|
||||
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
|
||||
ShareGroup shareGroup = new ShareGroup(snapshotRegistry, "group-id-1");
|
||||
snapshotRegistry.getOrCreateSnapshot(0);
|
||||
snapshotRegistry.idempotentCreateSnapshot(0);
|
||||
assertEquals(ShareGroupState.EMPTY.toString(), shareGroup.stateAsString(0));
|
||||
|
||||
shareGroup.updateMember(new ShareGroupMember.Builder("member1")
|
||||
|
@ -794,7 +794,7 @@ public class ShareGroupTest {
|
|||
.build());
|
||||
shareGroup.updateMember(new ShareGroupMember.Builder("member2")
|
||||
.build());
|
||||
snapshotRegistry.getOrCreateSnapshot(1);
|
||||
snapshotRegistry.idempotentCreateSnapshot(1);
|
||||
|
||||
ShareGroupDescribeResponseData.DescribedGroup expected = new ShareGroupDescribeResponseData.DescribedGroup()
|
||||
.setGroupId("group-id-1")
|
||||
|
@ -818,14 +818,14 @@ public class ShareGroupTest {
|
|||
public void testIsInStatesCaseInsensitive() {
|
||||
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
|
||||
ShareGroup shareGroup = new ShareGroup(snapshotRegistry, "group-foo");
|
||||
snapshotRegistry.getOrCreateSnapshot(0);
|
||||
snapshotRegistry.idempotentCreateSnapshot(0);
|
||||
assertTrue(shareGroup.isInStates(Collections.singleton("empty"), 0));
|
||||
assertFalse(shareGroup.isInStates(Collections.singleton("Empty"), 0));
|
||||
|
||||
shareGroup.updateMember(new ShareGroupMember.Builder("member1")
|
||||
.setSubscribedTopicNames(Collections.singletonList("foo"))
|
||||
.build());
|
||||
snapshotRegistry.getOrCreateSnapshot(1);
|
||||
snapshotRegistry.idempotentCreateSnapshot(1);
|
||||
assertTrue(shareGroup.isInStates(Collections.singleton("empty"), 0));
|
||||
assertTrue(shareGroup.isInStates(Collections.singleton("stable"), 1));
|
||||
assertFalse(shareGroup.isInStates(Collections.singleton("empty"), 1));
|
||||
|
|
|
@ -78,7 +78,7 @@ public class TimelineHashMapBenchmark {
|
|||
int key = (int) (0xffffffff & ((i * 2862933555777941757L) + 3037000493L));
|
||||
if (j > 10 && key % 3 == 0) {
|
||||
snapshotRegistry.deleteSnapshotsUpTo(epoch - 1000);
|
||||
snapshotRegistry.getOrCreateSnapshot(epoch);
|
||||
snapshotRegistry.idempotentCreateSnapshot(epoch);
|
||||
j = 0;
|
||||
} else {
|
||||
j++;
|
||||
|
|
|
@ -36,7 +36,7 @@ import java.util.Optional;
|
|||
|
||||
/**
|
||||
* Manages read and write offsets, and in-memory snapshots.
|
||||
*
|
||||
* <p>
|
||||
* Also manages the following metrics:
|
||||
* kafka.controller:type=KafkaController,name=ActiveControllerCount
|
||||
* kafka.controller:type=KafkaController,name=LastAppliedRecordLagMs
|
||||
|
@ -45,7 +45,7 @@ import java.util.Optional;
|
|||
* kafka.controller:type=KafkaController,name=LastCommittedRecordOffset
|
||||
*/
|
||||
class OffsetControlManager {
|
||||
public static class Builder {
|
||||
static class Builder {
|
||||
private LogContext logContext = null;
|
||||
private SnapshotRegistry snapshotRegistry = null;
|
||||
private QuorumControllerMetrics metrics = null;
|
||||
|
@ -71,7 +71,7 @@ class OffsetControlManager {
|
|||
return this;
|
||||
}
|
||||
|
||||
public OffsetControlManager build() {
|
||||
OffsetControlManager build() {
|
||||
if (logContext == null) logContext = new LogContext();
|
||||
if (snapshotRegistry == null) snapshotRegistry = new SnapshotRegistry(logContext);
|
||||
if (metrics == null) {
|
||||
|
@ -156,7 +156,7 @@ class OffsetControlManager {
|
|||
this.lastStableOffset = -1L;
|
||||
this.transactionStartOffset = -1L;
|
||||
this.nextWriteOffset = -1L;
|
||||
snapshotRegistry.getOrCreateSnapshot(-1L);
|
||||
snapshotRegistry.idempotentCreateSnapshot(-1L);
|
||||
metrics.setActive(false);
|
||||
metrics.setLastCommittedRecordOffset(-1L);
|
||||
metrics.setLastAppliedRecordOffset(-1L);
|
||||
|
@ -249,7 +249,7 @@ class OffsetControlManager {
|
|||
// Before switching to active, create an in-memory snapshot at the last committed
|
||||
// offset. This is required because the active controller assumes that there is always
|
||||
// an in-memory snapshot at the last committed offset.
|
||||
snapshotRegistry.getOrCreateSnapshot(lastStableOffset);
|
||||
snapshotRegistry.idempotentCreateSnapshot(lastStableOffset);
|
||||
this.nextWriteOffset = newNextWriteOffset;
|
||||
metrics.setActive(true);
|
||||
}
|
||||
|
@ -298,7 +298,7 @@ class OffsetControlManager {
|
|||
void handleScheduleAtomicAppend(long endOffset) {
|
||||
this.nextWriteOffset = endOffset + 1;
|
||||
|
||||
snapshotRegistry.getOrCreateSnapshot(endOffset);
|
||||
snapshotRegistry.idempotentCreateSnapshot(endOffset);
|
||||
|
||||
metrics.setLastAppliedRecordOffset(endOffset);
|
||||
|
||||
|
@ -323,7 +323,7 @@ class OffsetControlManager {
|
|||
lastStableOffset = newLastStableOffset;
|
||||
snapshotRegistry.deleteSnapshotsUpTo(lastStableOffset);
|
||||
if (!active()) {
|
||||
snapshotRegistry.getOrCreateSnapshot(lastStableOffset);
|
||||
snapshotRegistry.idempotentCreateSnapshot(lastStableOffset);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -362,7 +362,7 @@ class OffsetControlManager {
|
|||
"current snapshot.");
|
||||
}
|
||||
log.info("Successfully loaded snapshot {}.", currentSnapshotName);
|
||||
this.snapshotRegistry.getOrCreateSnapshot(currentSnapshotId.offset());
|
||||
this.snapshotRegistry.idempotentCreateSnapshot(currentSnapshotId.offset());
|
||||
this.lastCommittedOffset = currentSnapshotId.offset();
|
||||
this.lastCommittedEpoch = currentSnapshotId.epoch();
|
||||
this.lastStableOffset = currentSnapshotId.offset();
|
||||
|
@ -383,7 +383,7 @@ class OffsetControlManager {
|
|||
throw new RuntimeException("Can't replay a BeginTransactionRecord at " + offset +
|
||||
" because the transaction at " + transactionStartOffset + " was never closed.");
|
||||
}
|
||||
snapshotRegistry.getOrCreateSnapshot(offset - 1);
|
||||
snapshotRegistry.idempotentCreateSnapshot(offset - 1);
|
||||
transactionStartOffset = offset;
|
||||
log.info("Replayed {} at offset {}.", message, offset);
|
||||
}
|
||||
|
|
|
@ -203,7 +203,7 @@ public class AclControlManagerTest {
|
|||
@Test
|
||||
public void testLoadSnapshot() {
|
||||
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
|
||||
snapshotRegistry.getOrCreateSnapshot(0);
|
||||
snapshotRegistry.idempotentCreateSnapshot(0);
|
||||
AclControlManager manager = new AclControlManager.Builder().
|
||||
setSnapshotRegistry(snapshotRegistry).
|
||||
build();
|
||||
|
|
|
@ -54,7 +54,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
|
|||
@Timeout(value = 40)
|
||||
public class FeatureControlManagerTest {
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private static Map<String, VersionRange> rangeMap(Object... args) {
|
||||
Map<String, VersionRange> result = new HashMap<>();
|
||||
for (int i = 0; i < args.length; i += 3) {
|
||||
|
@ -100,7 +99,7 @@ public class FeatureControlManagerTest {
|
|||
setSnapshotRegistry(snapshotRegistry).
|
||||
setMetadataVersion(MetadataVersion.IBP_3_3_IV0).
|
||||
build();
|
||||
snapshotRegistry.getOrCreateSnapshot(-1);
|
||||
snapshotRegistry.idempotentCreateSnapshot(-1);
|
||||
assertEquals(new FinalizedControllerFeatures(Collections.singletonMap("metadata.version", (short) 4), -1),
|
||||
manager.finalizedFeatures(-1));
|
||||
assertEquals(ControllerResult.atomicOf(emptyList(), Collections.
|
||||
|
@ -131,7 +130,7 @@ public class FeatureControlManagerTest {
|
|||
FeatureLevelRecord record = new FeatureLevelRecord().
|
||||
setName("foo").setFeatureLevel((short) 2);
|
||||
|
||||
snapshotRegistry.getOrCreateSnapshot(-1);
|
||||
snapshotRegistry.idempotentCreateSnapshot(-1);
|
||||
FeatureControlManager manager = new FeatureControlManager.Builder().
|
||||
setLogContext(logContext).
|
||||
setQuorumFeatures(features("foo", 1, 2)).
|
||||
|
@ -139,7 +138,7 @@ public class FeatureControlManagerTest {
|
|||
setMetadataVersion(MetadataVersion.IBP_3_3_IV0).
|
||||
build();
|
||||
manager.replay(record);
|
||||
snapshotRegistry.getOrCreateSnapshot(123);
|
||||
snapshotRegistry.idempotentCreateSnapshot(123);
|
||||
assertEquals(new FinalizedControllerFeatures(versionMap("metadata.version", 4, "foo", 2), 123),
|
||||
manager.finalizedFeatures(123));
|
||||
}
|
||||
|
@ -185,7 +184,7 @@ public class FeatureControlManagerTest {
|
|||
updateMap("bar", 3), Collections.emptyMap(), false);
|
||||
assertEquals(Collections.singletonMap("bar", ApiError.NONE), result.response());
|
||||
manager.replay((FeatureLevelRecord) result.records().get(0).message());
|
||||
snapshotRegistry.getOrCreateSnapshot(3);
|
||||
snapshotRegistry.idempotentCreateSnapshot(3);
|
||||
|
||||
assertEquals(ControllerResult.atomicOf(emptyList(), Collections.
|
||||
singletonMap("bar", new ApiError(Errors.INVALID_UPDATE_VERSION,
|
||||
|
@ -213,7 +212,7 @@ public class FeatureControlManagerTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testReplayRecords() throws Exception {
|
||||
public void testReplayRecords() {
|
||||
LogContext logContext = new LogContext();
|
||||
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext);
|
||||
FeatureControlManager manager = new FeatureControlManager.Builder().
|
||||
|
|
|
@ -119,7 +119,7 @@ public class SnapshotRegistry {
|
|||
* Returns a snapshot iterator that iterates from the snapshots with the
|
||||
* lowest epoch to those with the highest.
|
||||
*/
|
||||
public Iterator<Snapshot> iterator() {
|
||||
Iterator<Snapshot> iterator() {
|
||||
return new SnapshotIterator(head.next());
|
||||
}
|
||||
|
||||
|
@ -128,7 +128,7 @@ public class SnapshotRegistry {
|
|||
* lowest epoch to those with the highest, starting at the snapshot with the
|
||||
* given epoch.
|
||||
*/
|
||||
public Iterator<Snapshot> iterator(long epoch) {
|
||||
Iterator<Snapshot> iterator(long epoch) {
|
||||
return iterator(getSnapshot(epoch));
|
||||
}
|
||||
|
||||
|
@ -136,7 +136,7 @@ public class SnapshotRegistry {
|
|||
* Returns a snapshot iterator that iterates from the snapshots with the
|
||||
* lowest epoch to those with the highest, starting at the given snapshot.
|
||||
*/
|
||||
public Iterator<Snapshot> iterator(Snapshot snapshot) {
|
||||
Iterator<Snapshot> iterator(Snapshot snapshot) {
|
||||
return new SnapshotIterator(snapshot);
|
||||
}
|
||||
|
||||
|
@ -144,7 +144,7 @@ public class SnapshotRegistry {
|
|||
* Returns a reverse snapshot iterator that iterates from the snapshots with the
|
||||
* highest epoch to those with the lowest.
|
||||
*/
|
||||
public Iterator<Snapshot> reverseIterator() {
|
||||
Iterator<Snapshot> reverseIterator() {
|
||||
return new ReverseSnapshotIterator();
|
||||
}
|
||||
|
||||
|
@ -172,7 +172,7 @@ public class SnapshotRegistry {
|
|||
/**
|
||||
* Gets the snapshot for a specific epoch.
|
||||
*/
|
||||
public Snapshot getSnapshot(long epoch) {
|
||||
Snapshot getSnapshot(long epoch) {
|
||||
Snapshot snapshot = snapshots.get(epoch);
|
||||
if (snapshot == null) {
|
||||
throw new RuntimeException("No in-memory snapshot for epoch " + epoch + ". Snapshot " +
|
||||
|
@ -183,13 +183,13 @@ public class SnapshotRegistry {
|
|||
|
||||
/**
|
||||
* Creates a new snapshot at the given epoch.
|
||||
* <br>
|
||||
* If {@code epoch} already exists and it is the last snapshot then just return that snapshot.
|
||||
* <p>
|
||||
* If {@code epoch} already exists, and it is the last snapshot then just return that snapshot.
|
||||
*
|
||||
* @param epoch The epoch to create the snapshot at. The current epoch
|
||||
* @param epoch The epoch to create the snapshot at. The current epoch
|
||||
* will be advanced to one past this epoch.
|
||||
*/
|
||||
public Snapshot getOrCreateSnapshot(long epoch) {
|
||||
Snapshot getOrCreateSnapshot(long epoch) {
|
||||
Snapshot last = head.prev();
|
||||
if (last.epoch() > epoch) {
|
||||
throw new RuntimeException("Can't create a new in-memory snapshot at epoch " + epoch +
|
||||
|
@ -205,6 +205,18 @@ public class SnapshotRegistry {
|
|||
return snapshot;
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new snapshot at the given epoch.
|
||||
* <p>
|
||||
* If {@code epoch} already exists, and it is the last snapshot then this operation will do nothing.
|
||||
*
|
||||
* @param epoch The epoch to create the snapshot at. The current epoch
|
||||
* will be advanced to one past this epoch.
|
||||
*/
|
||||
public void idempotentCreateSnapshot(long epoch) {
|
||||
getOrCreateSnapshot(epoch);
|
||||
}
|
||||
|
||||
/**
|
||||
* Reverts the state of all data structures to the state at the given epoch.
|
||||
*
|
||||
|
@ -238,7 +250,7 @@ public class SnapshotRegistry {
|
|||
*
|
||||
* @param snapshot The snapshot to delete.
|
||||
*/
|
||||
public void deleteSnapshot(Snapshot snapshot) {
|
||||
void deleteSnapshot(Snapshot snapshot) {
|
||||
Snapshot prev = snapshot.prev();
|
||||
if (prev != head) {
|
||||
prev.mergeFrom(snapshot);
|
||||
|
@ -274,7 +286,7 @@ public class SnapshotRegistry {
|
|||
/**
|
||||
* Associate a revertable with this registry.
|
||||
*/
|
||||
public void register(Revertable revertable) {
|
||||
void register(Revertable revertable) {
|
||||
revertables.add(revertable);
|
||||
}
|
||||
|
||||
|
|
|
@ -29,33 +29,33 @@ import java.util.NoSuchElementException;
|
|||
* We handle divergences between the current state and historical state by copying a
|
||||
* reference to elements that have been deleted or overwritten into the most recent
|
||||
* snapshot tier.
|
||||
* <br>
|
||||
* <p>
|
||||
* Note that there are no keys in SnapshottableHashTable, only values. So it more similar
|
||||
* to a hash set than a hash map. The subclasses implement full-featured maps and sets
|
||||
* using this class as a building block.
|
||||
* <br>
|
||||
* <p>
|
||||
* Each snapshot tier contains a size and a hash table. The size reflects the size at
|
||||
* the time the snapshot was taken. Note that, as an optimization, snapshot tiers will
|
||||
* be null if they don't contain anything. So for example, if snapshot 20 of Object O
|
||||
* contains the same entries as snapshot 10 of that object, the snapshot 20 tier for
|
||||
* object O will be null.
|
||||
* <br>
|
||||
* <p>
|
||||
* The current tier's data is stored in the fields inherited from BaseHashTable. It
|
||||
* would be conceptually simpler to have a separate BaseHashTable object, but since Java
|
||||
* doesn't have value types, subclassing is the only way to avoid another pointer
|
||||
* indirection and the associated extra memory cost.
|
||||
* <br>
|
||||
* <p>
|
||||
* Note that each element in the hash table contains a start epoch, and a value. The
|
||||
* start epoch is there to identify when the object was first inserted. This in turn
|
||||
* determines which snapshots it is a member of.
|
||||
* <br>
|
||||
* <p>
|
||||
* In order to retrieve an object from snapshot E, we start by checking to see if the
|
||||
* object exists in the "current" hash tier. If it does, and its startEpoch extends back
|
||||
* to E, we return that object. Otherwise, we check all the snapshot tiers, starting
|
||||
* with E, and ending with the most recent snapshot, to see if the object is there.
|
||||
* As an optimization, if we encounter the object in a snapshot tier but its epoch is too
|
||||
* new, we know that its value at epoch E must be null, so we can return that immediately.
|
||||
* <br>
|
||||
* <p>
|
||||
* The class hierarchy looks like this:
|
||||
* <pre>
|
||||
* Revertable BaseHashTable
|
||||
|
@ -66,11 +66,11 @@ import java.util.NoSuchElementException;
|
|||
* </pre>
|
||||
* BaseHashTable is a simple hash table that uses separate chaining. The interface is
|
||||
* pretty bare-bones since this class is not intended to be used directly by end-users.
|
||||
* <br>
|
||||
* <p>
|
||||
* This class, SnapshottableHashTable, has the logic for snapshotting and iterating over
|
||||
* snapshots. This is the core of the snapshotted hash table code and handles the
|
||||
* tiering.
|
||||
* <br>
|
||||
* <p>
|
||||
* TimelineHashSet and TimelineHashMap are mostly wrappers around this
|
||||
* SnapshottableHashTable class. They implement standard Java APIs for Set and Map,
|
||||
* respectively. There's a fair amount of boilerplate for this, but it's necessary so
|
||||
|
@ -78,11 +78,11 @@ import java.util.NoSuchElementException;
|
|||
* The accessor APIs have two versions -- one that looks at the current state, and one
|
||||
* that looks at a historical snapshotted state. Mutation APIs only ever mutate the
|
||||
* current state.
|
||||
* <br>
|
||||
* <p>
|
||||
* One very important feature of SnapshottableHashTable is that we support iterating
|
||||
* over a snapshot even while changes are being made to the current state. See the
|
||||
* Javadoc for the iterator for more information about how this is accomplished.
|
||||
* <br>
|
||||
* <p>
|
||||
* All of these classes require external synchronization, and don't support null keys or
|
||||
* values.
|
||||
*/
|
||||
|
@ -135,7 +135,7 @@ class SnapshottableHashTable<T extends SnapshottableHashTable.ElementWithStartEp
|
|||
|
||||
/**
|
||||
* Iterate over the values that currently exist in the hash table.
|
||||
*
|
||||
* <p>
|
||||
* You can use this iterator even if you are making changes to the map.
|
||||
* The changes may or may not be visible while you are iterating.
|
||||
*/
|
||||
|
@ -185,7 +185,7 @@ class SnapshottableHashTable<T extends SnapshottableHashTable.ElementWithStartEp
|
|||
|
||||
/**
|
||||
* Iterate over the values that existed in the hash table during a specific snapshot.
|
||||
*
|
||||
* <p>
|
||||
* You can use this iterator even if you are making changes to the map.
|
||||
* The snapshot is immutable and will always show up the same.
|
||||
*/
|
||||
|
@ -408,37 +408,6 @@ class SnapshottableHashTable<T extends SnapshottableHashTable.ElementWithStartEp
|
|||
}
|
||||
}
|
||||
|
||||
String snapshottableToDebugString() {
|
||||
StringBuilder bld = new StringBuilder();
|
||||
bld.append(String.format("SnapshottableHashTable{%n"));
|
||||
bld.append("top tier: ");
|
||||
bld.append(baseToDebugString());
|
||||
bld.append(String.format(",%nsnapshot tiers: [%n"));
|
||||
String prefix = "";
|
||||
for (Iterator<Snapshot> iter = snapshotRegistry.iterator(); iter.hasNext(); ) {
|
||||
Snapshot snapshot = iter.next();
|
||||
bld.append(prefix);
|
||||
bld.append("epoch ").append(snapshot.epoch()).append(": ");
|
||||
HashTier<T> tier = snapshot.getDelta(this);
|
||||
if (tier == null) {
|
||||
bld.append("null");
|
||||
} else {
|
||||
bld.append("HashTier{");
|
||||
bld.append("size=").append(tier.size);
|
||||
bld.append(", deltaTable=");
|
||||
if (tier.deltaTable == null) {
|
||||
bld.append("null");
|
||||
} else {
|
||||
bld.append(tier.deltaTable.baseToDebugString());
|
||||
}
|
||||
bld.append("}");
|
||||
}
|
||||
bld.append(String.format("%n"));
|
||||
}
|
||||
bld.append(String.format("]}%n"));
|
||||
return bld.toString();
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public void executeRevert(long targetEpoch, Delta delta) {
|
||||
|
|
Loading…
Reference in New Issue