mirror of https://github.com/apache/kafka.git
Minor updates for CompletableEventReaper logging
This commit is contained in:
parent
aaefbeff32
commit
40f6754810
|
@ -39,12 +39,32 @@ import java.util.concurrent.CompletableFuture;
|
|||
*/
|
||||
public class CompletableEventReaper {
|
||||
|
||||
private enum State {
|
||||
ACTIVE,
|
||||
PAUSED
|
||||
}
|
||||
|
||||
private enum Modification {
|
||||
|
||||
ADD(State.ACTIVE, "added"),
|
||||
PAUSE(State.PAUSED, "paused"),
|
||||
RESUME(State.ACTIVE, "resumed");
|
||||
|
||||
private final State state;
|
||||
private final String verb;
|
||||
|
||||
Modification(State state, String verb) {
|
||||
this.state = state;
|
||||
this.verb = verb;
|
||||
}
|
||||
}
|
||||
|
||||
private final Logger log;
|
||||
|
||||
/**
|
||||
* Tracked events that are candidates for expiration. The key is the event and the value is a flag for paused.
|
||||
* Tracked events that are candidates for expiration.
|
||||
*/
|
||||
private final Map<CompletableEvent<?>, Boolean> tracked;
|
||||
private final Map<CompletableEvent<?>, State> tracked;
|
||||
|
||||
public CompletableEventReaper(LogContext logContext) {
|
||||
this.log = logContext.logger(CompletableEventReaper.class);
|
||||
|
@ -57,7 +77,7 @@ public class CompletableEventReaper {
|
|||
* @param event Event to track
|
||||
*/
|
||||
public void add(CompletableEvent<?> event) {
|
||||
put(event, false, false, "add");
|
||||
put(event, Modification.ADD);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -66,7 +86,7 @@ public class CompletableEventReaper {
|
|||
* @param event Event to pause
|
||||
*/
|
||||
public void pause(CompletableEvent<?> event) {
|
||||
put(event, true, true, "pause");
|
||||
put(event, Modification.PAUSE);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -75,18 +95,21 @@ public class CompletableEventReaper {
|
|||
* @param event Event to resume
|
||||
*/
|
||||
public void resume(CompletableEvent<?> event) {
|
||||
put(event, false, true, "resume");
|
||||
put(event, Modification.RESUME);
|
||||
}
|
||||
|
||||
private void put(CompletableEvent<?> event, boolean paused, boolean checkExists, String verb) {
|
||||
Objects.requireNonNull(event, "Event to " + verb + " must be non-null");
|
||||
private void put(CompletableEvent<?> event, Modification modification) {
|
||||
Objects.requireNonNull(event, "Event must be non-null");
|
||||
|
||||
if (checkExists && !tracked.containsKey(event))
|
||||
throw new IllegalArgumentException("The event " + event + " was not previously added; cannot " + verb);
|
||||
if (modification == Modification.ADD) {
|
||||
if (tracked.containsKey(event))
|
||||
throw new IllegalArgumentException("The event " + event + " was previously added, so it cannot be " + modification.verb + " again");
|
||||
} else if (!tracked.containsKey(event)) {
|
||||
throw new IllegalArgumentException("The event " + event + " was not previously added, so it cannot be " + modification.verb);
|
||||
}
|
||||
|
||||
tracked.put(event, paused);
|
||||
|
||||
log.debug("Event {} was {}-d with paused set to {}", event, verb, paused);
|
||||
tracked.put(event, modification.state);
|
||||
log.trace("Event {} was {} and is now in state {}", event, modification.verb, modification.state);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -117,10 +140,10 @@ public class CompletableEventReaper {
|
|||
int count = 0;
|
||||
List<CompletableEvent<?>> unpausedEvents = new ArrayList<>(tracked.size());
|
||||
|
||||
for (Map.Entry<CompletableEvent<?>, Boolean> entry : tracked.entrySet()) {
|
||||
boolean isPaused = entry.getValue();
|
||||
for (Map.Entry<CompletableEvent<?>, State> entry : tracked.entrySet()) {
|
||||
State state = entry.getValue();
|
||||
|
||||
if (isPaused) {
|
||||
if (state == State.PAUSED) {
|
||||
// Don't reap "paused" events
|
||||
continue;
|
||||
}
|
||||
|
@ -203,7 +226,7 @@ public class CompletableEventReaper {
|
|||
// path of the ConsumerNetworkThread loop.
|
||||
List<CompletableEvent<?>> events = new ArrayList<>();
|
||||
|
||||
for (Map.Entry<CompletableEvent<?>, Boolean> entry : tracked.entrySet()) {
|
||||
for (Map.Entry<CompletableEvent<?>, State> entry : tracked.entrySet()) {
|
||||
CompletableEvent<?> event = entry.getKey();
|
||||
|
||||
if (!event.future().isDone())
|
||||
|
|
Loading…
Reference in New Issue