MINOR: Add reset to SnapshotRegistry and Revertable (#10891)

Add reset functionality to SnapshotRegitry and Revertable, so that we can
clear the current state before loading a snapshot.

Reviewers: Colin P. McCabe <cmccabe@apache.org>
This commit is contained in:
José Armando García Sancio 2021-06-18 17:08:54 -07:00 committed by GitHub
parent 299eea88a5
commit c333bfd417
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 118 additions and 20 deletions

View File

@ -37,7 +37,7 @@ public class ProducerIdControlManager {
ProducerIdControlManager(ClusterControlManager clusterControlManager, SnapshotRegistry snapshotRegistry) { ProducerIdControlManager(ClusterControlManager clusterControlManager, SnapshotRegistry snapshotRegistry) {
this.clusterControlManager = clusterControlManager; this.clusterControlManager = clusterControlManager;
this.lastProducerId = new TimelineLong(snapshotRegistry, 0L); this.lastProducerId = new TimelineLong(snapshotRegistry);
} }
ControllerResult<ProducerIdsBlock> generateNextProducerId(int brokerId, long brokerEpoch) { ControllerResult<ProducerIdsBlock> generateNextProducerId(int brokerId, long brokerEpoch) {

View File

@ -29,4 +29,9 @@ interface Revertable {
* @param delta The delta associated with this epoch for this object. * @param delta The delta associated with this epoch for this object.
*/ */
void executeRevert(long targetEpoch, Delta delta); void executeRevert(long targetEpoch, Delta delta);
/**
* Reverts to the initial value.
*/
void reset();
} }

View File

@ -105,6 +105,11 @@ public class SnapshotRegistry {
*/ */
private final Snapshot head = new Snapshot(Long.MIN_VALUE); private final Snapshot head = new Snapshot(Long.MIN_VALUE);
/**
* Collection of all Revertable registered with this registry
*/
private final List<Revertable> revertables = new ArrayList<>();
public SnapshotRegistry(LogContext logContext) { public SnapshotRegistry(LogContext logContext) {
this.log = logContext.logger(SnapshotRegistry.class); this.log = logContext.logger(SnapshotRegistry.class);
} }
@ -254,4 +259,22 @@ public class SnapshotRegistry {
public long latestEpoch() { public long latestEpoch() {
return head.prev().epoch(); return head.prev().epoch();
} }
/**
* Associate with this registry.
*/
public void register(Revertable revertable) {
revertables.add(revertable);
}
/**
* Delete all snapshots and resets all of the Revertable object registered.
*/
public void reset() {
deleteSnapshotsUpTo(LATEST_EPOCH);
for (Revertable revertable : revertables) {
revertable.reset();
}
}
} }

View File

@ -280,6 +280,7 @@ class SnapshottableHashTable<T extends SnapshottableHashTable.ElementWithStartEp
SnapshottableHashTable(SnapshotRegistry snapshotRegistry, int expectedSize) { SnapshottableHashTable(SnapshotRegistry snapshotRegistry, int expectedSize) {
super(expectedSize); super(expectedSize);
this.snapshotRegistry = snapshotRegistry; this.snapshotRegistry = snapshotRegistry;
snapshotRegistry.register(this);
} }
int snapshottableSize(long epoch) { int snapshottableSize(long epoch) {
@ -452,4 +453,13 @@ class SnapshottableHashTable<T extends SnapshottableHashTable.ElementWithStartEp
} }
} }
} }
@Override
public void reset() {
Iterator<T> iter = snapshottableIterator(SnapshottableHashTable.LATEST_EPOCH);
while (iter.hasNext()) {
iter.next();
iter.remove();
}
}
} }

View File

@ -179,11 +179,7 @@ public class TimelineHashMap<K, V>
@Override @Override
public void clear() { public void clear() {
Iterator<TimelineHashMapEntry<K, V>> iter = snapshottableIterator(SnapshottableHashTable.LATEST_EPOCH); reset();
while (iter.hasNext()) {
iter.next();
iter.remove();
}
} }
final class KeySet extends AbstractSet<K> { final class KeySet extends AbstractSet<K> {

View File

@ -225,12 +225,7 @@ public class TimelineHashSet<T>
@Override @Override
public void clear() { public void clear() {
Iterator<TimelineHashSetEntry<T>> iter = reset();
snapshottableIterator(SnapshottableHashTable.LATEST_EPOCH);
while (iter.hasNext()) {
iter.next();
iter.remove();
}
} }
@Override @Override

View File

@ -26,8 +26,10 @@ import java.util.Iterator;
* This class requires external synchronization. * This class requires external synchronization.
*/ */
public class TimelineInteger implements Revertable { public class TimelineInteger implements Revertable {
public static final int INIT = 0;
static class IntegerContainer implements Delta { static class IntegerContainer implements Delta {
private int value = 0; private int value = INIT;
int value() { int value() {
return value; return value;
@ -48,7 +50,9 @@ public class TimelineInteger implements Revertable {
public TimelineInteger(SnapshotRegistry snapshotRegistry) { public TimelineInteger(SnapshotRegistry snapshotRegistry) {
this.snapshotRegistry = snapshotRegistry; this.snapshotRegistry = snapshotRegistry;
this.value = 0; this.value = INIT;
snapshotRegistry.register(this);
} }
public int get() { public int get() {
@ -95,6 +99,11 @@ public class TimelineInteger implements Revertable {
this.value = container.value; this.value = container.value;
} }
@Override
public void reset() {
set(INIT);
}
@Override @Override
public int hashCode() { public int hashCode() {
return value; return value;

View File

@ -26,8 +26,10 @@ import java.util.Iterator;
* This class requires external synchronization. * This class requires external synchronization.
*/ */
public class TimelineLong implements Revertable { public class TimelineLong implements Revertable {
public static final long INIT = 0;
static class LongContainer implements Delta { static class LongContainer implements Delta {
private long value = 0; private long value = INIT;
long value() { long value() {
return value; return value;
@ -47,12 +49,10 @@ public class TimelineLong implements Revertable {
private long value; private long value;
public TimelineLong(SnapshotRegistry snapshotRegistry) { public TimelineLong(SnapshotRegistry snapshotRegistry) {
this(snapshotRegistry, 0L);
}
public TimelineLong(SnapshotRegistry snapshotRegistry, long value) {
this.snapshotRegistry = snapshotRegistry; this.snapshotRegistry = snapshotRegistry;
this.value = value; this.value = INIT;
snapshotRegistry.register(this);
} }
public long get() { public long get() {
@ -99,6 +99,11 @@ public class TimelineLong implements Revertable {
this.value = container.value(); this.value = container.value();
} }
@Override
public void reset() {
set(INIT);
}
@Override @Override
public int hashCode() { public int hashCode() {
return ((int) value) ^ (int) (value >>> 32); return ((int) value) ^ (int) (value >>> 32);

View File

@ -18,6 +18,7 @@
package org.apache.kafka.timeline; package org.apache.kafka.timeline;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.IdentityHashMap; import java.util.IdentityHashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
@ -207,6 +208,26 @@ public class SnapshottableHashTableTest {
assertIteratorYields(table.snapshottableIterator(Long.MAX_VALUE), E_1A, E_2A, E_3A); assertIteratorYields(table.snapshottableIterator(Long.MAX_VALUE), E_1A, E_2A, E_3A);
} }
@Test
public void testReset() {
SnapshotRegistry registry = new SnapshotRegistry(new LogContext());
SnapshottableHashTable<TestElement> table =
new SnapshottableHashTable<>(registry, 1);
assertEquals(null, table.snapshottableAddOrReplace(E_1A));
assertEquals(null, table.snapshottableAddOrReplace(E_2A));
assertEquals(null, table.snapshottableAddOrReplace(E_3A));
registry.createSnapshot(0);
assertEquals(E_1A, table.snapshottableAddOrReplace(E_1B));
assertEquals(E_3A, table.snapshottableAddOrReplace(E_3B));
registry.createSnapshot(1);
registry.reset();
assertEquals(Collections.emptyList(), registry.epochsList());
// Check that the table is empty
assertIteratorYields(table.snapshottableIterator(Long.MAX_VALUE));
}
/** /**
* Assert that the given iterator contains the given elements, in any order. * Assert that the given iterator contains the given elements, in any order.
* We compare using reference equality here, rather than object equality. * We compare using reference equality here, rather than object equality.

View File

@ -17,6 +17,8 @@
package org.apache.kafka.timeline; package org.apache.kafka.timeline;
import java.util.Collections;
import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.LogContext;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout; import org.junit.jupiter.api.Timeout;
@ -70,4 +72,19 @@ public class TimelineIntegerTest {
registry.revertToSnapshot(2); registry.revertToSnapshot(2);
assertEquals(0, integer.get()); assertEquals(0, integer.get());
} }
@Test
public void testReset() {
SnapshotRegistry registry = new SnapshotRegistry(new LogContext());
TimelineInteger value = new TimelineInteger(registry);
registry.createSnapshot(2);
value.set(1);
registry.createSnapshot(3);
value.set(2);
registry.reset();
assertEquals(Collections.emptyList(), registry.epochsList());
assertEquals(TimelineInteger.INIT, value.get());
}
} }

View File

@ -17,6 +17,8 @@
package org.apache.kafka.timeline; package org.apache.kafka.timeline;
import java.util.Collections;
import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.LogContext;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout; import org.junit.jupiter.api.Timeout;
@ -70,4 +72,19 @@ public class TimelineLongTest {
registry.revertToSnapshot(2); registry.revertToSnapshot(2);
assertEquals(0L, value.get()); assertEquals(0L, value.get());
} }
@Test
public void testReset() {
SnapshotRegistry registry = new SnapshotRegistry(new LogContext());
TimelineLong value = new TimelineLong(registry);
registry.createSnapshot(2);
value.set(1L);
registry.createSnapshot(3);
value.set(2L);
registry.reset();
assertEquals(Collections.emptyList(), registry.epochsList());
assertEquals(TimelineLong.INIT, value.get());
}
} }