diff --git a/metadata/src/main/java/org/apache/kafka/controller/ProducerIdControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ProducerIdControlManager.java index 924605c6d91..7291f932666 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ProducerIdControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ProducerIdControlManager.java @@ -37,7 +37,7 @@ public class ProducerIdControlManager { ProducerIdControlManager(ClusterControlManager clusterControlManager, SnapshotRegistry snapshotRegistry) { this.clusterControlManager = clusterControlManager; - this.lastProducerId = new TimelineLong(snapshotRegistry, 0L); + this.lastProducerId = new TimelineLong(snapshotRegistry); } ControllerResult generateNextProducerId(int brokerId, long brokerEpoch) { diff --git a/metadata/src/main/java/org/apache/kafka/timeline/Revertable.java b/metadata/src/main/java/org/apache/kafka/timeline/Revertable.java index f7ead35775a..43eb117d6c3 100644 --- a/metadata/src/main/java/org/apache/kafka/timeline/Revertable.java +++ b/metadata/src/main/java/org/apache/kafka/timeline/Revertable.java @@ -29,4 +29,9 @@ interface Revertable { * @param delta The delta associated with this epoch for this object. */ void executeRevert(long targetEpoch, Delta delta); + + /** + * Reverts to the initial value. + */ + void reset(); } diff --git a/metadata/src/main/java/org/apache/kafka/timeline/SnapshotRegistry.java b/metadata/src/main/java/org/apache/kafka/timeline/SnapshotRegistry.java index 245014fd3ab..b34aceeadbe 100644 --- a/metadata/src/main/java/org/apache/kafka/timeline/SnapshotRegistry.java +++ b/metadata/src/main/java/org/apache/kafka/timeline/SnapshotRegistry.java @@ -105,6 +105,11 @@ public class SnapshotRegistry { */ private final Snapshot head = new Snapshot(Long.MIN_VALUE); + /** + * Collection of all Revertable registered with this registry + */ + private final List revertables = new ArrayList<>(); + public SnapshotRegistry(LogContext logContext) { this.log = logContext.logger(SnapshotRegistry.class); } @@ -254,4 +259,22 @@ public class SnapshotRegistry { public long latestEpoch() { return head.prev().epoch(); } + + /** + * Associate with this registry. + */ + public void register(Revertable revertable) { + revertables.add(revertable); + } + + /** + * Delete all snapshots and resets all of the Revertable object registered. + */ + public void reset() { + deleteSnapshotsUpTo(LATEST_EPOCH); + + for (Revertable revertable : revertables) { + revertable.reset(); + } + } } diff --git a/metadata/src/main/java/org/apache/kafka/timeline/SnapshottableHashTable.java b/metadata/src/main/java/org/apache/kafka/timeline/SnapshottableHashTable.java index 2f5d7be0c93..cbd0a280fc1 100644 --- a/metadata/src/main/java/org/apache/kafka/timeline/SnapshottableHashTable.java +++ b/metadata/src/main/java/org/apache/kafka/timeline/SnapshottableHashTable.java @@ -280,6 +280,7 @@ class SnapshottableHashTable iter = snapshottableIterator(SnapshottableHashTable.LATEST_EPOCH); + while (iter.hasNext()) { + iter.next(); + iter.remove(); + } + } } diff --git a/metadata/src/main/java/org/apache/kafka/timeline/TimelineHashMap.java b/metadata/src/main/java/org/apache/kafka/timeline/TimelineHashMap.java index 6e0251708b5..855e7ed71d3 100644 --- a/metadata/src/main/java/org/apache/kafka/timeline/TimelineHashMap.java +++ b/metadata/src/main/java/org/apache/kafka/timeline/TimelineHashMap.java @@ -179,11 +179,7 @@ public class TimelineHashMap @Override public void clear() { - Iterator> iter = snapshottableIterator(SnapshottableHashTable.LATEST_EPOCH); - while (iter.hasNext()) { - iter.next(); - iter.remove(); - } + reset(); } final class KeySet extends AbstractSet { diff --git a/metadata/src/main/java/org/apache/kafka/timeline/TimelineHashSet.java b/metadata/src/main/java/org/apache/kafka/timeline/TimelineHashSet.java index 73ac0e45f71..34efb10fdf9 100644 --- a/metadata/src/main/java/org/apache/kafka/timeline/TimelineHashSet.java +++ b/metadata/src/main/java/org/apache/kafka/timeline/TimelineHashSet.java @@ -225,12 +225,7 @@ public class TimelineHashSet @Override public void clear() { - Iterator> iter = - snapshottableIterator(SnapshottableHashTable.LATEST_EPOCH); - while (iter.hasNext()) { - iter.next(); - iter.remove(); - } + reset(); } @Override diff --git a/metadata/src/main/java/org/apache/kafka/timeline/TimelineInteger.java b/metadata/src/main/java/org/apache/kafka/timeline/TimelineInteger.java index d28db4991d2..d158890bc4f 100644 --- a/metadata/src/main/java/org/apache/kafka/timeline/TimelineInteger.java +++ b/metadata/src/main/java/org/apache/kafka/timeline/TimelineInteger.java @@ -26,8 +26,10 @@ import java.util.Iterator; * This class requires external synchronization. */ public class TimelineInteger implements Revertable { + public static final int INIT = 0; + static class IntegerContainer implements Delta { - private int value = 0; + private int value = INIT; int value() { return value; @@ -48,7 +50,9 @@ public class TimelineInteger implements Revertable { public TimelineInteger(SnapshotRegistry snapshotRegistry) { this.snapshotRegistry = snapshotRegistry; - this.value = 0; + this.value = INIT; + + snapshotRegistry.register(this); } public int get() { @@ -95,6 +99,11 @@ public class TimelineInteger implements Revertable { this.value = container.value; } + @Override + public void reset() { + set(INIT); + } + @Override public int hashCode() { return value; diff --git a/metadata/src/main/java/org/apache/kafka/timeline/TimelineLong.java b/metadata/src/main/java/org/apache/kafka/timeline/TimelineLong.java index 36a300ff949..9b401db5b7d 100644 --- a/metadata/src/main/java/org/apache/kafka/timeline/TimelineLong.java +++ b/metadata/src/main/java/org/apache/kafka/timeline/TimelineLong.java @@ -26,8 +26,10 @@ import java.util.Iterator; * This class requires external synchronization. */ public class TimelineLong implements Revertable { + public static final long INIT = 0; + static class LongContainer implements Delta { - private long value = 0; + private long value = INIT; long value() { return value; @@ -47,12 +49,10 @@ public class TimelineLong implements Revertable { private long value; public TimelineLong(SnapshotRegistry snapshotRegistry) { - this(snapshotRegistry, 0L); - } - - public TimelineLong(SnapshotRegistry snapshotRegistry, long value) { this.snapshotRegistry = snapshotRegistry; - this.value = value; + this.value = INIT; + + snapshotRegistry.register(this); } public long get() { @@ -99,6 +99,11 @@ public class TimelineLong implements Revertable { this.value = container.value(); } + @Override + public void reset() { + set(INIT); + } + @Override public int hashCode() { return ((int) value) ^ (int) (value >>> 32); diff --git a/metadata/src/test/java/org/apache/kafka/timeline/SnapshottableHashTableTest.java b/metadata/src/test/java/org/apache/kafka/timeline/SnapshottableHashTableTest.java index ef9405ccffb..972ff587490 100644 --- a/metadata/src/test/java/org/apache/kafka/timeline/SnapshottableHashTableTest.java +++ b/metadata/src/test/java/org/apache/kafka/timeline/SnapshottableHashTableTest.java @@ -18,6 +18,7 @@ package org.apache.kafka.timeline; import java.util.ArrayList; +import java.util.Collections; import java.util.IdentityHashMap; import java.util.Iterator; import java.util.List; @@ -207,6 +208,26 @@ public class SnapshottableHashTableTest { assertIteratorYields(table.snapshottableIterator(Long.MAX_VALUE), E_1A, E_2A, E_3A); } + @Test + public void testReset() { + SnapshotRegistry registry = new SnapshotRegistry(new LogContext()); + SnapshottableHashTable table = + new SnapshottableHashTable<>(registry, 1); + assertEquals(null, table.snapshottableAddOrReplace(E_1A)); + assertEquals(null, table.snapshottableAddOrReplace(E_2A)); + assertEquals(null, table.snapshottableAddOrReplace(E_3A)); + registry.createSnapshot(0); + assertEquals(E_1A, table.snapshottableAddOrReplace(E_1B)); + assertEquals(E_3A, table.snapshottableAddOrReplace(E_3B)); + registry.createSnapshot(1); + + registry.reset(); + + assertEquals(Collections.emptyList(), registry.epochsList()); + // Check that the table is empty + assertIteratorYields(table.snapshottableIterator(Long.MAX_VALUE)); + } + /** * Assert that the given iterator contains the given elements, in any order. * We compare using reference equality here, rather than object equality. diff --git a/metadata/src/test/java/org/apache/kafka/timeline/TimelineIntegerTest.java b/metadata/src/test/java/org/apache/kafka/timeline/TimelineIntegerTest.java index c2a84c60b6a..13a5d358c84 100644 --- a/metadata/src/test/java/org/apache/kafka/timeline/TimelineIntegerTest.java +++ b/metadata/src/test/java/org/apache/kafka/timeline/TimelineIntegerTest.java @@ -17,6 +17,8 @@ package org.apache.kafka.timeline; +import java.util.Collections; + import org.apache.kafka.common.utils.LogContext; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; @@ -70,4 +72,19 @@ public class TimelineIntegerTest { registry.revertToSnapshot(2); assertEquals(0, integer.get()); } + + @Test + public void testReset() { + SnapshotRegistry registry = new SnapshotRegistry(new LogContext()); + TimelineInteger value = new TimelineInteger(registry); + registry.createSnapshot(2); + value.set(1); + registry.createSnapshot(3); + value.set(2); + + registry.reset(); + + assertEquals(Collections.emptyList(), registry.epochsList()); + assertEquals(TimelineInteger.INIT, value.get()); + } } diff --git a/metadata/src/test/java/org/apache/kafka/timeline/TimelineLongTest.java b/metadata/src/test/java/org/apache/kafka/timeline/TimelineLongTest.java index 378c6c63966..10ce56671a2 100644 --- a/metadata/src/test/java/org/apache/kafka/timeline/TimelineLongTest.java +++ b/metadata/src/test/java/org/apache/kafka/timeline/TimelineLongTest.java @@ -17,6 +17,8 @@ package org.apache.kafka.timeline; +import java.util.Collections; + import org.apache.kafka.common.utils.LogContext; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; @@ -70,4 +72,19 @@ public class TimelineLongTest { registry.revertToSnapshot(2); assertEquals(0L, value.get()); } + + @Test + public void testReset() { + SnapshotRegistry registry = new SnapshotRegistry(new LogContext()); + TimelineLong value = new TimelineLong(registry); + registry.createSnapshot(2); + value.set(1L); + registry.createSnapshot(3); + value.set(2L); + + registry.reset(); + + assertEquals(Collections.emptyList(), registry.epochsList()); + assertEquals(TimelineLong.INIT, value.get()); + } }