mirror of https://github.com/apache/kafka.git
KAFKA-18484 [1/N]; Handle exceptions from deferred events in coordinator (#18661)
Guard against the coordinator getting stuck due to deferred events throwing exceptions. Reviewers: David Jacot <djacot@confluent.io>
This commit is contained in:
parent
9da516b1a9
commit
5a57473a52
|
@ -460,7 +460,7 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
|
||||||
* A simple container class to hold all the attributes
|
* A simple container class to hold all the attributes
|
||||||
* related to a pending batch.
|
* related to a pending batch.
|
||||||
*/
|
*/
|
||||||
private static class CoordinatorBatch {
|
private class CoordinatorBatch {
|
||||||
/**
|
/**
|
||||||
* The base (or first) offset of the batch. If the batch fails
|
* The base (or first) offset of the batch. If the batch fails
|
||||||
* for any reason, the state machines is rolled back to it.
|
* for any reason, the state machines is rolled back to it.
|
||||||
|
@ -500,9 +500,9 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
|
||||||
final Optional<TimerTask> lingerTimeoutTask;
|
final Optional<TimerTask> lingerTimeoutTask;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The list of deferred events associated with the batch.
|
* The deferred events associated with the batch.
|
||||||
*/
|
*/
|
||||||
final List<DeferredEvent> deferredEvents;
|
final DeferredEventCollection deferredEvents;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The next offset. This is updated when records
|
* The next offset. This is updated when records
|
||||||
|
@ -527,7 +527,7 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
|
||||||
this.buffer = buffer;
|
this.buffer = buffer;
|
||||||
this.builder = builder;
|
this.builder = builder;
|
||||||
this.lingerTimeoutTask = lingerTimeoutTask;
|
this.lingerTimeoutTask = lingerTimeoutTask;
|
||||||
this.deferredEvents = new ArrayList<>();
|
this.deferredEvents = new DeferredEventCollection();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -806,9 +806,7 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
|
||||||
}
|
}
|
||||||
|
|
||||||
// Add all the pending deferred events to the deferred event queue.
|
// Add all the pending deferred events to the deferred event queue.
|
||||||
for (DeferredEvent event : currentBatch.deferredEvents) {
|
deferredEventQueue.add(offset, currentBatch.deferredEvents);
|
||||||
deferredEventQueue.add(offset, event);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Free up the current batch.
|
// Free up the current batch.
|
||||||
freeCurrentBatch();
|
freeCurrentBatch();
|
||||||
|
@ -839,9 +837,7 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
|
||||||
private void failCurrentBatch(Throwable t) {
|
private void failCurrentBatch(Throwable t) {
|
||||||
if (currentBatch != null) {
|
if (currentBatch != null) {
|
||||||
coordinator.revertLastWrittenOffset(currentBatch.baseOffset);
|
coordinator.revertLastWrittenOffset(currentBatch.baseOffset);
|
||||||
for (DeferredEvent event : currentBatch.deferredEvents) {
|
currentBatch.deferredEvents.complete(t);
|
||||||
event.complete(t);
|
|
||||||
}
|
|
||||||
freeCurrentBatch();
|
freeCurrentBatch();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1157,6 +1153,38 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A collection of {@link DeferredEvent}. When completed, completes all the events in the collection
|
||||||
|
* and logs any exceptions thrown.
|
||||||
|
*/
|
||||||
|
class DeferredEventCollection implements DeferredEvent {
|
||||||
|
private final List<DeferredEvent> events = new ArrayList<>();
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void complete(Throwable t) {
|
||||||
|
for (DeferredEvent event : events) {
|
||||||
|
try {
|
||||||
|
event.complete(t);
|
||||||
|
} catch (Throwable e) {
|
||||||
|
log.error("Completion of event {} failed due to {}.", event, e.getMessage(), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean add(DeferredEvent event) {
|
||||||
|
return events.add(event);
|
||||||
|
}
|
||||||
|
|
||||||
|
public int size() {
|
||||||
|
return events.size();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "DeferredEventCollection(events=" + events + ")";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A coordinator write operation.
|
* A coordinator write operation.
|
||||||
*
|
*
|
||||||
|
|
|
@ -94,8 +94,10 @@ import static org.junit.jupiter.api.Assertions.assertNull;
|
||||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
import static org.mockito.ArgumentMatchers.any;
|
import static org.mockito.ArgumentMatchers.any;
|
||||||
|
import static org.mockito.ArgumentMatchers.anyLong;
|
||||||
import static org.mockito.ArgumentMatchers.argThat;
|
import static org.mockito.ArgumentMatchers.argThat;
|
||||||
import static org.mockito.ArgumentMatchers.eq;
|
import static org.mockito.ArgumentMatchers.eq;
|
||||||
|
import static org.mockito.Mockito.doThrow;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.spy;
|
import static org.mockito.Mockito.spy;
|
||||||
import static org.mockito.Mockito.times;
|
import static org.mockito.Mockito.times;
|
||||||
|
@ -1116,6 +1118,105 @@ public class CoordinatorRuntimeTest {
|
||||||
assertEquals(10, ctx.epoch);
|
assertEquals(10, ctx.epoch);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testScheduleUnloadingWithDeferredEventExceptions() throws ExecutionException, InterruptedException, TimeoutException {
|
||||||
|
MockTimer timer = new MockTimer();
|
||||||
|
MockPartitionWriter writer = new MockPartitionWriter();
|
||||||
|
MockCoordinatorShardBuilderSupplier supplier = mock(MockCoordinatorShardBuilderSupplier.class);
|
||||||
|
MockCoordinatorShardBuilder builder = mock(MockCoordinatorShardBuilder.class);
|
||||||
|
MockCoordinatorShard coordinator = mock(MockCoordinatorShard.class);
|
||||||
|
CoordinatorRuntimeMetrics metrics = mock(CoordinatorRuntimeMetrics.class);
|
||||||
|
|
||||||
|
// All operations will throw an exception when completed.
|
||||||
|
doThrow(new KafkaException("error")).when(metrics).recordEventPurgatoryTime(anyLong());
|
||||||
|
|
||||||
|
CoordinatorRuntime<MockCoordinatorShard, String> runtime =
|
||||||
|
new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
|
||||||
|
.withTime(timer.time())
|
||||||
|
.withTimer(timer)
|
||||||
|
.withDefaultWriteTimeOut(Duration.ofMillis(20))
|
||||||
|
.withLoader(new MockCoordinatorLoader())
|
||||||
|
.withEventProcessor(new DirectEventProcessor())
|
||||||
|
.withPartitionWriter(writer)
|
||||||
|
.withCoordinatorShardBuilderSupplier(supplier)
|
||||||
|
.withCoordinatorRuntimeMetrics(metrics)
|
||||||
|
.withCoordinatorMetrics(mock(CoordinatorMetrics.class))
|
||||||
|
.withSerializer(new StringSerializer())
|
||||||
|
.withAppendLingerMs(10)
|
||||||
|
.withExecutorService(mock(ExecutorService.class))
|
||||||
|
.build();
|
||||||
|
|
||||||
|
when(builder.withSnapshotRegistry(any())).thenReturn(builder);
|
||||||
|
when(builder.withLogContext(any())).thenReturn(builder);
|
||||||
|
when(builder.withTime(any())).thenReturn(builder);
|
||||||
|
when(builder.withTimer(any())).thenReturn(builder);
|
||||||
|
when(builder.withCoordinatorMetrics(any())).thenReturn(builder);
|
||||||
|
when(builder.withTopicPartition(any())).thenReturn(builder);
|
||||||
|
when(builder.withExecutor(any())).thenReturn(builder);
|
||||||
|
when(builder.build()).thenReturn(coordinator);
|
||||||
|
when(supplier.get()).thenReturn(builder);
|
||||||
|
|
||||||
|
// Load the coordinator.
|
||||||
|
runtime.scheduleLoadOperation(TP, 10);
|
||||||
|
CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP);
|
||||||
|
|
||||||
|
// Get the max batch size.
|
||||||
|
int maxBatchSize = writer.config(TP).maxMessageSize();
|
||||||
|
|
||||||
|
// Create records with three quarters of the max batch size each, so that it is not
|
||||||
|
// possible to have more than one record in a single batch.
|
||||||
|
List<String> records = Stream.of('1', '2', '3').map(c -> {
|
||||||
|
char[] payload = new char[maxBatchSize * 3 / 4];
|
||||||
|
Arrays.fill(payload, c);
|
||||||
|
return new String(payload);
|
||||||
|
}).collect(Collectors.toList());
|
||||||
|
|
||||||
|
// Write #1.
|
||||||
|
CompletableFuture<String> write1 = runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20),
|
||||||
|
state -> new CoordinatorResult<>(List.of(records.get(0)), "response1")
|
||||||
|
);
|
||||||
|
|
||||||
|
// Write #2.
|
||||||
|
CompletableFuture<String> write2 = runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(20),
|
||||||
|
state -> new CoordinatorResult<>(List.of(records.get(1)), "response2")
|
||||||
|
);
|
||||||
|
|
||||||
|
// Write #3, to force the flush of write #2.
|
||||||
|
CompletableFuture<String> write3 = runtime.scheduleWriteOperation("write#3", TP, Duration.ofMillis(20),
|
||||||
|
state -> new CoordinatorResult<>(List.of(records.get(1)), "response3")
|
||||||
|
);
|
||||||
|
|
||||||
|
// Records have been written to the log.
|
||||||
|
assertEquals(List.of(
|
||||||
|
records(timer.time().milliseconds(), records.get(0)),
|
||||||
|
records(timer.time().milliseconds(), records.get(1))
|
||||||
|
), writer.entries(TP));
|
||||||
|
|
||||||
|
// Verify that no writes are committed yet.
|
||||||
|
assertFalse(write1.isDone());
|
||||||
|
assertFalse(write2.isDone());
|
||||||
|
assertFalse(write3.isDone());
|
||||||
|
|
||||||
|
// Schedule the unloading.
|
||||||
|
runtime.scheduleUnloadOperation(TP, OptionalInt.of(ctx.epoch + 1));
|
||||||
|
assertEquals(CLOSED, ctx.state);
|
||||||
|
|
||||||
|
// All write completions throw exceptions after completing their futures.
|
||||||
|
// Despite the exceptions, the unload should still complete.
|
||||||
|
assertTrue(write1.isDone());
|
||||||
|
assertTrue(write2.isDone());
|
||||||
|
assertTrue(write3.isDone());
|
||||||
|
assertFutureThrows(write1, NotCoordinatorException.class);
|
||||||
|
assertFutureThrows(write2, NotCoordinatorException.class);
|
||||||
|
assertFutureThrows(write3, NotCoordinatorException.class);
|
||||||
|
|
||||||
|
// Verify that onUnloaded is called.
|
||||||
|
verify(coordinator, times(1)).onUnloaded();
|
||||||
|
|
||||||
|
// Getting the coordinator context fails because it no longer exists.
|
||||||
|
assertThrows(NotCoordinatorException.class, () -> runtime.contextOrThrow(TP));
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testScheduleWriteOp() throws ExecutionException, InterruptedException, TimeoutException {
|
public void testScheduleWriteOp() throws ExecutionException, InterruptedException, TimeoutException {
|
||||||
MockTimer timer = new MockTimer();
|
MockTimer timer = new MockTimer();
|
||||||
|
@ -3080,6 +3181,83 @@ public class CoordinatorRuntimeTest {
|
||||||
assertTrue(write2.isDone());
|
assertTrue(write2.isDone());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testHighWatermarkUpdateWithDeferredEventExceptions() throws ExecutionException, InterruptedException, TimeoutException {
|
||||||
|
MockTimer timer = new MockTimer();
|
||||||
|
MockPartitionWriter writer = new MockPartitionWriter();
|
||||||
|
CoordinatorRuntimeMetrics metrics = mock(CoordinatorRuntimeMetrics.class);
|
||||||
|
|
||||||
|
// All operations will throw an exception when completed.
|
||||||
|
doThrow(new KafkaException("error")).when(metrics).recordEventPurgatoryTime(anyLong());
|
||||||
|
|
||||||
|
CoordinatorRuntime<MockCoordinatorShard, String> runtime =
|
||||||
|
new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
|
||||||
|
.withTime(timer.time())
|
||||||
|
.withTimer(timer)
|
||||||
|
.withDefaultWriteTimeOut(Duration.ofMillis(20))
|
||||||
|
.withLoader(new MockCoordinatorLoader())
|
||||||
|
.withEventProcessor(new DirectEventProcessor())
|
||||||
|
.withPartitionWriter(writer)
|
||||||
|
.withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier())
|
||||||
|
.withCoordinatorRuntimeMetrics(metrics)
|
||||||
|
.withCoordinatorMetrics(mock(CoordinatorMetrics.class))
|
||||||
|
.withSerializer(new StringSerializer())
|
||||||
|
.withAppendLingerMs(10)
|
||||||
|
.withExecutorService(mock(ExecutorService.class))
|
||||||
|
.build();
|
||||||
|
|
||||||
|
// Load the coordinator.
|
||||||
|
runtime.scheduleLoadOperation(TP, 10);
|
||||||
|
|
||||||
|
// Get the max batch size.
|
||||||
|
int maxBatchSize = writer.config(TP).maxMessageSize();
|
||||||
|
|
||||||
|
// Create records with three quarters of the max batch size each, so that it is not
|
||||||
|
// possible to have more than one record in a single batch.
|
||||||
|
List<String> records = Stream.of('1', '2', '3').map(c -> {
|
||||||
|
char[] payload = new char[maxBatchSize * 3 / 4];
|
||||||
|
Arrays.fill(payload, c);
|
||||||
|
return new String(payload);
|
||||||
|
}).collect(Collectors.toList());
|
||||||
|
|
||||||
|
// Write #1.
|
||||||
|
CompletableFuture<String> write1 = runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20),
|
||||||
|
state -> new CoordinatorResult<>(List.of(records.get(0)), "response1")
|
||||||
|
);
|
||||||
|
|
||||||
|
// Write #2.
|
||||||
|
CompletableFuture<String> write2 = runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(20),
|
||||||
|
state -> new CoordinatorResult<>(List.of(records.get(1)), "response2")
|
||||||
|
);
|
||||||
|
|
||||||
|
// Write #3, to force the flush of write #2.
|
||||||
|
CompletableFuture<String> write3 = runtime.scheduleWriteOperation("write#3", TP, Duration.ofMillis(20),
|
||||||
|
state -> new CoordinatorResult<>(List.of(records.get(1)), "response3")
|
||||||
|
);
|
||||||
|
|
||||||
|
// Records have been written to the log.
|
||||||
|
assertEquals(List.of(
|
||||||
|
records(timer.time().milliseconds(), records.get(0)),
|
||||||
|
records(timer.time().milliseconds(), records.get(1))
|
||||||
|
), writer.entries(TP));
|
||||||
|
|
||||||
|
// Verify that no writes are committed yet.
|
||||||
|
assertFalse(write1.isDone());
|
||||||
|
assertFalse(write2.isDone());
|
||||||
|
assertFalse(write3.isDone());
|
||||||
|
|
||||||
|
// Commit the first and second record.
|
||||||
|
writer.commit(TP, 2);
|
||||||
|
|
||||||
|
// Write #1 and write #2's completions throw exceptions after completing their futures.
|
||||||
|
// Despite the exception from write #1, write #2 should still be completed.
|
||||||
|
assertTrue(write1.isDone());
|
||||||
|
assertTrue(write2.isDone());
|
||||||
|
assertFalse(write3.isDone());
|
||||||
|
assertEquals("response1", write1.get(5, TimeUnit.SECONDS));
|
||||||
|
assertEquals("response2", write2.get(5, TimeUnit.SECONDS));
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testWriteEventWriteTimeoutTaskIsCancelledWhenHighWatermarkIsUpdated() {
|
public void testWriteEventWriteTimeoutTaskIsCancelledWhenHighWatermarkIsUpdated() {
|
||||||
MockTimer timer = new MockTimer();
|
MockTimer timer = new MockTimer();
|
||||||
|
|
Loading…
Reference in New Issue