KAFKA-16664; Re-add EventAccumulator.poll(long, TimeUnit) (#16144)

We have revamped the thread idle ratio metric in https://github.com/apache/kafka/pull/15835. https://github.com/apache/kafka/pull/15835#discussion_r1588068337 describes a case where the metric loses accuracy and in order to set a lower bound to the accuracy, this patch re-adds a poll with a timeout that was removed as part of https://github.com/apache/kafka/pull/15430.

Reviewers: David Jacot <djacot@confluent.io>
This commit is contained in:
Jeff Kim 2024-06-04 02:27:35 -04:00 committed by GitHub
parent 64c50a274b
commit d7bc43ed06
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 33 additions and 32 deletions

View File

@ -27,6 +27,7 @@ import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
@ -162,43 +163,31 @@ public class EventAccumulator<K, T extends EventAccumulator.Event<K>> implements
}
/**
* Returns the next {{@link Event}} available or null if no event is
* available.
* Immediately returns the next {{@link Event}} available or null
* if the accumulator is empty.
*
* @return The next event available or null.
*/
public T poll() {
lock.lock();
try {
K key = randomKey();
if (key == null) return null;
Deque<T> queue = queues.get(key);
T event = queue.poll();
if (queue.isEmpty()) queues.remove(key);
inflightKeys.add(key);
size--;
return event;
} finally {
lock.unlock();
}
return poll(0, TimeUnit.MILLISECONDS);
}
/**
* Returns the next {{@link Event}} available. This method blocks until an
* event is available or accumulator is closed.
* Returns the next {{@link Event}} available. This method blocks for the provided
* time and returns null of no event is available.
*
* @param timeout The timeout.
* @param unit The timeout unit.
* @return The next event available or null.
*/
public T take() {
public T poll(long timeout, TimeUnit unit) {
lock.lock();
try {
K key = randomKey();
while (key == null && !closed) {
long nanos = unit.toNanos(timeout);
while (key == null && !closed && nanos > 0) {
try {
condition.await();
nanos = condition.awaitNanos(nanos);
} catch (InterruptedException e) {
// Ignore.
}

View File

@ -25,6 +25,7 @@ import org.slf4j.Logger;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@ -34,6 +35,11 @@ import java.util.stream.IntStream;
*/
public class MultiThreadedEventProcessor implements CoordinatorEventProcessor {
/**
* The poll timeout to wait for an event by the EventProcessorThread.
*/
private static final long POLL_TIMEOUT_MS = 300L;
/**
* The logger.
*/
@ -129,7 +135,7 @@ public class MultiThreadedEventProcessor implements CoordinatorEventProcessor {
// time should be discounted by # threads.
long idleStartTimeMs = time.milliseconds();
CoordinatorEvent event = accumulator.take();
CoordinatorEvent event = accumulator.poll(POLL_TIMEOUT_MS, TimeUnit.MILLISECONDS);
long idleEndTimeMs = time.milliseconds();
long idleTimeMs = idleEndTimeMs - idleStartTimeMs;
metrics.recordThreadIdleTime(idleTimeMs / threads.size());

View File

@ -190,9 +190,12 @@ public class EventAccumulatorTest {
MockEvent event1 = new MockEvent(1, 1);
MockEvent event2 = new MockEvent(1, 2);
CompletableFuture<MockEvent> future0 = CompletableFuture.supplyAsync(accumulator::take);
CompletableFuture<MockEvent> future1 = CompletableFuture.supplyAsync(accumulator::take);
CompletableFuture<MockEvent> future2 = CompletableFuture.supplyAsync(accumulator::take);
CompletableFuture<MockEvent> future0 = CompletableFuture.supplyAsync(() ->
accumulator.poll(Long.MAX_VALUE, TimeUnit.MILLISECONDS));
CompletableFuture<MockEvent> future1 = CompletableFuture.supplyAsync(() ->
accumulator.poll(Long.MAX_VALUE, TimeUnit.MILLISECONDS));
CompletableFuture<MockEvent> future2 = CompletableFuture.supplyAsync(() ->
accumulator.poll(Long.MAX_VALUE, TimeUnit.MILLISECONDS));
List<CompletableFuture<MockEvent>> futures = Arrays.asList(future0, future1, future2);
assertFalse(future0.isDone());
@ -245,9 +248,12 @@ public class EventAccumulatorTest {
public void testCloseUnblockWaitingThreads() throws ExecutionException, InterruptedException, TimeoutException {
EventAccumulator<Integer, MockEvent> accumulator = new EventAccumulator<>();
CompletableFuture<MockEvent> future0 = CompletableFuture.supplyAsync(accumulator::take);
CompletableFuture<MockEvent> future1 = CompletableFuture.supplyAsync(accumulator::take);
CompletableFuture<MockEvent> future2 = CompletableFuture.supplyAsync(accumulator::take);
CompletableFuture<MockEvent> future0 = CompletableFuture.supplyAsync(() ->
accumulator.poll(Long.MAX_VALUE, TimeUnit.MILLISECONDS));
CompletableFuture<MockEvent> future1 = CompletableFuture.supplyAsync(() ->
accumulator.poll(Long.MAX_VALUE, TimeUnit.MILLISECONDS));
CompletableFuture<MockEvent> future2 = CompletableFuture.supplyAsync(() ->
accumulator.poll(Long.MAX_VALUE, TimeUnit.MILLISECONDS));
assertFalse(future0.isDone());
assertFalse(future1.isDone());

View File

@ -60,8 +60,8 @@ public class MultiThreadedEventProcessorTest {
}
@Override
public CoordinatorEvent take() {
CoordinatorEvent event = super.take();
public CoordinatorEvent poll(long timeout, TimeUnit unit) {
CoordinatorEvent event = super.poll(timeout, unit);
time.sleep(takeDelayMs);
return event;
}