KAFKA-14462; [5/N] Add EventAccumulator (#13505)

This patch adds the `EventAccumulator` which will be used in the runtime of the new group coordinator. The aim of this accumulator is to basically have a queue per __consumer_group partitions and to ensure that events addressed to the same partitions are not processed concurrently. The accumulator is generic so we could reuse it in different context.

Reviewers: Alexandre Dupriez <alexandre.dupriez@gmail.com>, Justine Olshan <jolshan@confluent.io>
This commit is contained in:
David Jacot 2023-04-13 08:33:40 +02:00 committed by GitHub
parent 571841fed3
commit 440a53099d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 489 additions and 0 deletions

View File

@ -0,0 +1,256 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.runtime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/**
* A concurrent event accumulator which group events per key and ensure that only one
* event with a given key can't be processed concurrently.
*
* This class is threadsafe.
*
* @param <K> The type of the key of the event.
* @param <T> The type of the event itself. It implements the {{@link Event}} interface.
*
* There are a few examples about how to use it in the unit tests.
*/
public class EventAccumulator<K, T extends EventAccumulator.Event<K>> implements AutoCloseable {
/**
* The interface which must be implemented by all events.
*
* @param <K> The type of the key of the event.
*/
public interface Event<K> {
K key();
}
/**
* The random generator used by this class.
*/
private final Random random;
/**
* The map of queues keyed by K.
*/
private final Map<K, Queue<T>> queues;
/**
* The list of available keys. Keys in this list can
* be delivered to pollers.
*/
private final List<K> availableKeys;
/**
* The set of keys that are being processed.
*/
private final Set<K> inflightKeys;
/**
* The lock for protecting access to the resources.
*/
private final ReentrantLock lock;
/**
* The condition variable for waking up poller threads.
*/
private final Condition condition;
/**
* The number of events in the accumulator.
*/
private int size;
/**
* A boolean indicated whether the accumulator is closed.
*/
private boolean closed;
public EventAccumulator() {
this(new Random());
}
public EventAccumulator(
Random random
) {
this.random = random;
this.queues = new HashMap<>();
this.availableKeys = new ArrayList<>();
this.inflightKeys = new HashSet<>();
this.closed = false;
this.lock = new ReentrantLock();
this.condition = lock.newCondition();
}
/**
* Adds an {{@link Event}} to the queue.
*
* @param event An {{@link Event}}.
*/
public void add(T event) {
lock.lock();
try {
if (closed) throw new IllegalStateException("Can't accept an event because the accumulator is closed.");
K key = event.key();
Queue<T> queue = queues.get(key);
if (queue == null) {
queue = new LinkedList<>();
queues.put(key, queue);
if (!inflightKeys.contains(key)) {
addAvailableKey(key);
}
}
queue.add(event);
size++;
} finally {
lock.unlock();
}
}
/**
* Returns the next {{@link Event}} available. This method block indefinitely until
* one event is ready or the accumulator is closed.
*
* @return The next event.
*/
public T poll() {
return poll(Long.MAX_VALUE, TimeUnit.SECONDS);
}
/**
* Returns the next {{@link Event}} available. This method blocks for the provided
* time and returns null of not event is available.
*
* @param timeout The timeout.
* @param unit The timeout unit.
* @return The next event available or null.
*/
public T poll(long timeout, TimeUnit unit) {
lock.lock();
try {
K key = randomKey();
long nanos = unit.toNanos(timeout);
while (key == null && !closed && nanos > 0) {
try {
nanos = condition.awaitNanos(nanos);
} catch (InterruptedException e) {
// Ignore.
}
key = randomKey();
}
if (key == null) return null;
Queue<T> queue = queues.get(key);
T event = queue.poll();
if (queue.isEmpty()) queues.remove(key);
inflightKeys.add(key);
size--;
return event;
} finally {
lock.unlock();
}
}
/**
* Marks the event as processed and releases the next event
* with the same key. This unblocks waiting threads.
*
* @param event The event that was processed.
*/
public void done(T event) {
lock.lock();
try {
K key = event.key();
inflightKeys.remove(key);
if (queues.containsKey(key)) {
addAvailableKey(key);
}
} finally {
lock.unlock();
}
}
/**
* Returns the size of the accumulator.
*/
public int size() {
lock.lock();
try {
return size;
} finally {
lock.unlock();
}
}
/**
* Closes the accumulator. This unblocks all the waiting threads.
*/
@Override
public void close() {
lock.lock();
try {
closed = true;
condition.signalAll();
} finally {
lock.unlock();
}
}
/**
* Adds the key to the available keys set.
*
* This method must be called while holding the lock.
*/
private void addAvailableKey(K key) {
availableKeys.add(key);
condition.signalAll();
}
/**
* Returns the next available key. The key is selected randomly
* from the available keys set.
*
* This method must be called while holding the lock.
*/
private K randomKey() {
if (availableKeys.isEmpty()) return null;
int lastIndex = availableKeys.size() - 1;
int randomIndex = random.nextInt(availableKeys.size());
K randomKey = availableKeys.get(randomIndex);
Collections.swap(availableKeys, randomIndex, lastIndex);
availableKeys.remove(lastIndex);
return randomKey;
}
}

View File

@ -0,0 +1,233 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.runtime;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
public class EventAccumulatorTest {
private class MockEvent implements EventAccumulator.Event<Integer> {
int key;
int value;
MockEvent(int key, int value) {
this.key = key;
this.value = value;
}
@Override
public Integer key() {
return key;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
MockEvent mockEvent = (MockEvent) o;
if (key != mockEvent.key) return false;
return value == mockEvent.value;
}
@Override
public int hashCode() {
int result = key;
result = 31 * result + value;
return result;
}
@Override
public String toString() {
return "MockEvent(key=" + key + ", value=" + value + ')';
}
}
@Test
public void testBasicOperations() {
EventAccumulator<Integer, MockEvent> accumulator = new EventAccumulator<>();
assertEquals(0, accumulator.size());
assertNull(accumulator.poll(0, TimeUnit.MICROSECONDS));
List<MockEvent> events = Arrays.asList(
new MockEvent(1, 0),
new MockEvent(1, 1),
new MockEvent(1, 2),
new MockEvent(2, 0),
new MockEvent(2, 1),
new MockEvent(2, 3),
new MockEvent(3, 0),
new MockEvent(3, 1),
new MockEvent(3, 2)
);
events.forEach(accumulator::add);
assertEquals(9, accumulator.size());
Set<MockEvent> polledEvents = new HashSet<>();
for (int i = 0; i < events.size(); i++) {
MockEvent event = accumulator.poll(0, TimeUnit.MICROSECONDS);
assertNotNull(event);
polledEvents.add(event);
assertEquals(events.size() - 1 - i, accumulator.size());
accumulator.done(event);
}
assertNull(accumulator.poll(0, TimeUnit.MICROSECONDS));
assertEquals(new HashSet<>(events), polledEvents);
assertEquals(0, accumulator.size());
accumulator.close();
}
@Test
public void testKeyConcurrentProcessingAndOrdering() {
EventAccumulator<Integer, MockEvent> accumulator = new EventAccumulator<>();
MockEvent event0 = new MockEvent(1, 0);
MockEvent event1 = new MockEvent(1, 1);
MockEvent event2 = new MockEvent(1, 2);
accumulator.add(event0);
accumulator.add(event1);
accumulator.add(event2);
assertEquals(3, accumulator.size());
MockEvent event = null;
// Poll event0.
event = accumulator.poll(0, TimeUnit.MICROSECONDS);
assertEquals(event0, event);
// Poll returns null because key is inflight.
assertNull(accumulator.poll(0, TimeUnit.MICROSECONDS));
accumulator.done(event);
// Poll event1.
event = accumulator.poll(0, TimeUnit.MICROSECONDS);
assertEquals(event1, event);
// Poll returns null because key is inflight.
assertNull(accumulator.poll(0, TimeUnit.MICROSECONDS));
accumulator.done(event);
// Poll event2.
event = accumulator.poll(0, TimeUnit.MICROSECONDS);
assertEquals(event2, event);
// Poll returns null because key is inflight.
assertNull(accumulator.poll(0, TimeUnit.MICROSECONDS));
accumulator.done(event);
accumulator.close();
}
@Test
public void testDoneUnblockWaitingThreads() throws ExecutionException, InterruptedException, TimeoutException {
EventAccumulator<Integer, MockEvent> accumulator = new EventAccumulator<>();
MockEvent event0 = new MockEvent(1, 0);
MockEvent event1 = new MockEvent(1, 1);
MockEvent event2 = new MockEvent(1, 2);
CompletableFuture<MockEvent> future0 = CompletableFuture.supplyAsync(accumulator::poll);
CompletableFuture<MockEvent> future1 = CompletableFuture.supplyAsync(accumulator::poll);
CompletableFuture<MockEvent> future2 = CompletableFuture.supplyAsync(accumulator::poll);
List<CompletableFuture<MockEvent>> futures = Arrays.asList(future0, future1, future2);
assertFalse(future0.isDone());
assertFalse(future1.isDone());
assertFalse(future2.isDone());
accumulator.add(event0);
accumulator.add(event1);
accumulator.add(event2);
// One future should be completed with event0.
assertEquals(event0, CompletableFuture
.anyOf(futures.toArray(new CompletableFuture[0]))
.get(5, TimeUnit.SECONDS));
futures = futures.stream().filter(future -> !future.isDone()).collect(Collectors.toList());
assertEquals(2, futures.size());
// Processing of event0 is done.
accumulator.done(event0);
// One future should be completed with event1.
assertEquals(event1, CompletableFuture
.anyOf(futures.toArray(new CompletableFuture[0]))
.get(5, TimeUnit.SECONDS));
futures = futures.stream().filter(future -> !future.isDone()).collect(Collectors.toList());
assertEquals(1, futures.size());
// Processing of event1 is done.
accumulator.done(event1);
// One future should be completed with event2.
assertEquals(event2, CompletableFuture
.anyOf(futures.toArray(new CompletableFuture[0]))
.get(5, TimeUnit.SECONDS));
futures = futures.stream().filter(future -> !future.isDone()).collect(Collectors.toList());
assertEquals(0, futures.size());
// Processing of event2 is done.
accumulator.done(event2);
assertEquals(0, accumulator.size());
accumulator.close();
}
@Test
public void testCloseUnblockWaitingThreads() throws ExecutionException, InterruptedException, TimeoutException {
EventAccumulator<Integer, MockEvent> accumulator = new EventAccumulator<>();
CompletableFuture<MockEvent> future0 = CompletableFuture.supplyAsync(accumulator::poll);
CompletableFuture<MockEvent> future1 = CompletableFuture.supplyAsync(accumulator::poll);
CompletableFuture<MockEvent> future2 = CompletableFuture.supplyAsync(accumulator::poll);
assertFalse(future0.isDone());
assertFalse(future1.isDone());
assertFalse(future2.isDone());
// Closing should release all the pending futures.
accumulator.close();
assertNull(future0.get(5, TimeUnit.SECONDS));
assertNull(future1.get(5, TimeUnit.SECONDS));
assertNull(future2.get(5, TimeUnit.SECONDS));
}
}