KAFKA-14401: Fail kafka log read end requests if underneath work thread fails (#14372)

Reviewers: Chris Egerton <chrise@aiven.io>
This commit is contained in:
vamossagar12 2024-07-16 06:25:52 +05:30 committed by GitHub
parent 79ab5074e7
commit 808498e939
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 62 additions and 9 deletions

View File

@ -568,13 +568,12 @@ public class KafkaBasedLog<K, V> {
public WorkThread() {
super("KafkaBasedLog Work Thread - " + topic);
}
@Override
public void run() {
try {
log.trace("{} started execution", this);
while (true) {
int numCallbacks;
log.trace("{} started execution", this);
while (true) {
int numCallbacks = 0;
try {
synchronized (KafkaBasedLog.this) {
if (stopRequested)
break;
@ -587,11 +586,11 @@ public class KafkaBasedLog<K, V> {
log.trace("Finished read to end log for topic {}", topic);
} catch (TimeoutException e) {
log.warn("Timeout while reading log to end for topic '{}'. Retrying automatically. " +
"This may occur when brokers are unavailable or unreachable. Reason: {}", topic, e.getMessage());
"This may occur when brokers are unavailable or unreachable. Reason: {}", topic, e.getMessage());
continue;
} catch (RetriableException | org.apache.kafka.connect.errors.RetriableException e) {
log.warn("Retriable error while reading log to end for topic '{}'. Retrying automatically. " +
"Reason: {}", topic, e.getMessage());
"Reason: {}", topic, e.getMessage());
continue;
} catch (WakeupException e) {
// Either received another get() call and need to retry reading to end of log or stop() was
@ -615,9 +614,18 @@ public class KafkaBasedLog<K, V> {
// See previous comment, both possible causes of this wakeup are handled by starting this loop again
continue;
}
} catch (Throwable t) {
log.error("Unexpected exception in {}", this, t);
synchronized (KafkaBasedLog.this) {
// Only fail exactly the number of callbacks we found before triggering the read to log end
// since it is possible for another write + readToEnd to sneak in the meantime which we don't
// want to fail.
for (int i = 0; i < numCallbacks; i++) {
Callback<Void> cb = readLogEndOffsetCallbacks.poll();
cb.onCompletion(t, null);
}
}
}
} catch (Throwable t) {
log.error("Unexpected exception in {}", this, t);
}
}
}

View File

@ -57,6 +57,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@ -404,6 +405,50 @@ public class KafkaBasedLogTest {
verifyStartAndStop();
}
@Test
public void testOffsetReadFailureWhenWorkThreadFails() throws Exception {
RuntimeException exception = new RuntimeException();
Set<TopicPartition> tps = new HashSet<>(Arrays.asList(TP0, TP1));
Map<TopicPartition, Long> endOffsets = new HashMap<>();
endOffsets.put(TP0, 0L);
endOffsets.put(TP1, 0L);
admin = mock(TopicAdmin.class);
when(admin.endOffsets(eq(tps)))
.thenReturn(endOffsets)
.thenThrow(exception)
.thenReturn(endOffsets);
store.start();
AtomicInteger numSuccesses = new AtomicInteger();
AtomicInteger numFailures = new AtomicInteger();
AtomicReference<FutureCallback<Void>> finalSuccessCallbackRef = new AtomicReference<>();
final FutureCallback<Void> successCallback = new FutureCallback<>((error, result) -> numSuccesses.getAndIncrement());
store.readToEnd(successCallback);
// First log end read should succeed.
successCallback.get(1000, TimeUnit.MILLISECONDS);
// Second log end read fails.
final FutureCallback<Void> firstFailedCallback = new FutureCallback<>((error, result) -> {
numFailures.getAndIncrement();
// We issue another readToEnd call here to simulate the case that more read requests can come in while
// the failure is being handled in the WorkThread. This read request should not be impacted by the outcome of
// the current read request's failure.
final FutureCallback<Void> finalSuccessCallback = new FutureCallback<>((e, r) -> numSuccesses.getAndIncrement());
finalSuccessCallbackRef.set(finalSuccessCallback);
store.readToEnd(finalSuccessCallback);
});
store.readToEnd(firstFailedCallback);
ExecutionException e1 = assertThrows(ExecutionException.class, () -> firstFailedCallback.get(1000, TimeUnit.MILLISECONDS));
assertEquals(exception, e1.getCause());
// Last log read end should succeed.
finalSuccessCallbackRef.get().get(1000, TimeUnit.MILLISECONDS);
assertEquals(2, numSuccesses.get());
assertEquals(1, numFailures.get());
}
@Test
public void testProducerError() {
TestFuture<RecordMetadata> tp0Future = new TestFuture<>();