mirror of https://github.com/apache/kafka.git
KAFKA-18063: SnapshotRegistry should not leak memory (#17898)
SnapshotRegistry needs to have a reference to all snapshot data structures. However, this should not be a strong reference, but a weak reference, so that these data structures can be garbage collected as needed. This PR also adds a scrub mechanism so that we can eventually reclaim the slots used by GC'ed Revertable objects in the SnapshotRegistry.revertables array. Reviewers: David Jacot <david.jacot@gmail.com>
This commit is contained in:
parent
aac2c53837
commit
542bb9d4aa
|
@ -21,17 +21,18 @@ import org.apache.kafka.common.utils.LogContext;
|
|||
|
||||
import org.slf4j.Logger;
|
||||
|
||||
import java.lang.ref.WeakReference;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
|
||||
/**
|
||||
* A registry containing snapshots of timeline data structures.
|
||||
* We generally expect a small number of snapshots-- perhaps 1 or 2 at a time.
|
||||
* Therefore, we use ArrayLists here rather than a data structure with higher overhead.
|
||||
* A registry containing snapshots of timeline data structures. All timeline data structures must
|
||||
* be registered here, so that they can be reverted to the expected state when desired.
|
||||
* Because the registry only keeps a weak reference to each timeline data structure, it does not
|
||||
* prevent them from being garbage collected.
|
||||
*/
|
||||
public class SnapshotRegistry {
|
||||
public static final long LATEST_EPOCH = Long.MAX_VALUE;
|
||||
|
@ -107,12 +108,39 @@ public class SnapshotRegistry {
|
|||
private final Snapshot head = new Snapshot(Long.MIN_VALUE);
|
||||
|
||||
/**
|
||||
* Collection of all Revertable registered with this registry
|
||||
* A collection of all Revertable objects registered here. Since we store only weak
|
||||
* references, every time we access a revertable through this list, we must check to
|
||||
* see if it has been garbage collected. If so, WeakReference.get will return null.
|
||||
*
|
||||
* Although the garbage collector handles freeing the underlying Revertables, over
|
||||
* time slots in the ArrayList will fill up with expired references. Therefore, after
|
||||
* enough registrations, we scrub the ArrayList of the expired references by creating
|
||||
* a new arraylist.
|
||||
*/
|
||||
private final List<Revertable> revertables = new ArrayList<>();
|
||||
private List<WeakReference<Revertable>> revertables = new ArrayList<>();
|
||||
|
||||
/**
|
||||
* The maximum number of registrations to allow before we compact the revertable list.
|
||||
*/
|
||||
private final int maxRegistrationsSinceScrub;
|
||||
|
||||
/**
|
||||
* The number of registrations we have done since removing all expired weak references.
|
||||
*/
|
||||
private int numRegistrationsSinceScrub = 0;
|
||||
|
||||
/**
|
||||
* The number of scrubs that we have done.
|
||||
*/
|
||||
private long numScrubs = 0;
|
||||
|
||||
public SnapshotRegistry(LogContext logContext) {
|
||||
this(logContext, 10_000);
|
||||
}
|
||||
|
||||
public SnapshotRegistry(LogContext logContext, int maxRegistrationsSinceScrub) {
|
||||
this.log = logContext.logger(SnapshotRegistry.class);
|
||||
this.maxRegistrationsSinceScrub = maxRegistrationsSinceScrub;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -272,20 +300,59 @@ public class SnapshotRegistry {
|
|||
}
|
||||
|
||||
/**
|
||||
* Associate a revertable with this registry.
|
||||
* Return the number of scrub operations that we have done.
|
||||
*/
|
||||
public void register(Revertable revertable) {
|
||||
revertables.add(revertable);
|
||||
public long numScrubs() {
|
||||
return numScrubs;
|
||||
}
|
||||
|
||||
/**
|
||||
* Delete all snapshots and resets all of the Revertable object registered.
|
||||
* Associate a revertable with this registry.
|
||||
*/
|
||||
public void register(Revertable revertable) {
|
||||
numRegistrationsSinceScrub++;
|
||||
if (numRegistrationsSinceScrub > maxRegistrationsSinceScrub) {
|
||||
scrub();
|
||||
}
|
||||
revertables.add(new WeakReference<>(revertable));
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove all expired weak references from the revertable list.
|
||||
*/
|
||||
void scrub() {
|
||||
ArrayList<WeakReference<Revertable>> newRevertables =
|
||||
new ArrayList<>(revertables.size() / 2);
|
||||
for (WeakReference<Revertable> ref : revertables) {
|
||||
if (ref.get() != null) {
|
||||
newRevertables.add(ref);
|
||||
}
|
||||
}
|
||||
numScrubs++;
|
||||
this.revertables = newRevertables;
|
||||
numRegistrationsSinceScrub = 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Delete all snapshots and reset all of the Revertable objects.
|
||||
*/
|
||||
public void reset() {
|
||||
deleteSnapshotsUpTo(LATEST_EPOCH);
|
||||
|
||||
for (Revertable revertable : revertables) {
|
||||
revertable.reset();
|
||||
ArrayList<WeakReference<Revertable>> newRevertables = new ArrayList<>();
|
||||
for (WeakReference<Revertable> ref : revertables) {
|
||||
Revertable revertable = ref.get();
|
||||
if (revertable != null) {
|
||||
try {
|
||||
revertable.reset();
|
||||
} catch (Exception e) {
|
||||
log.error("Error reverting {}", revertable, e);
|
||||
}
|
||||
newRevertables.add(ref);
|
||||
}
|
||||
}
|
||||
numScrubs++;
|
||||
this.revertables = newRevertables;
|
||||
numRegistrationsSinceScrub = 0;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -94,4 +94,28 @@ public class SnapshotRegistryTest {
|
|||
|
||||
assertEquals(latest, duplicate);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testScrub() {
|
||||
SnapshotRegistry registry = new SnapshotRegistry(new LogContext(), 2);
|
||||
new TimelineInteger(registry).set(123);
|
||||
new TimelineInteger(registry).set(123);
|
||||
assertEquals(0, registry.numScrubs());
|
||||
new TimelineInteger(registry).set(123);
|
||||
assertEquals(1, registry.numScrubs());
|
||||
new TimelineInteger(registry).set(123);
|
||||
new TimelineInteger(registry).set(123);
|
||||
new TimelineInteger(registry).set(123);
|
||||
assertEquals(2, registry.numScrubs());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReset() {
|
||||
SnapshotRegistry registry = new SnapshotRegistry(new LogContext(), 2);
|
||||
TimelineInteger integer = new TimelineInteger(registry);
|
||||
integer.set(123);
|
||||
registry.reset();
|
||||
assertEquals(0, integer.get());
|
||||
assertEquals(1, registry.numScrubs());
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue