KAFKA-7941: Catch TimeoutException in KafkaBasedLog worker thread (#6283)

When calling readLogToEnd(), the KafkaBasedLog worker thread should catch TimeoutException and log a warning, which can occur if brokers are unavailable, otherwise the worker thread terminates.

Includes an enhancement to MockConsumer that allows simulating exceptions not just when polling but also when querying for offsets, which is necessary for testing the fix.

Author: Paul Whalen <pgwhalen@gmail.com>
Reviewers: Randall Hauch <rhauch@gmail.com>, Arjun Satish <arjun@confluent.io>, Ryanne Dolan <ryannedolan@gmail.com>
This commit is contained in:
Paul 2019-08-13 10:16:55 -05:00 committed by Randall Hauch
parent 794637232c
commit e2c8612d01
7 changed files with 112 additions and 12 deletions

View File

@ -61,7 +61,8 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
private final Set<TopicPartition> paused;
private Map<TopicPartition, List<ConsumerRecord<K, V>>> records;
private KafkaException exception;
private KafkaException pollException;
private KafkaException offsetsException;
private AtomicBoolean wakeup;
private boolean closed;
@ -74,7 +75,7 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
this.beginningOffsets = new HashMap<>();
this.endOffsets = new HashMap<>();
this.pollTasks = new LinkedList<>();
this.exception = null;
this.pollException = null;
this.wakeup = new AtomicBoolean(false);
this.committed = new HashMap<>();
}
@ -173,9 +174,9 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
throw new WakeupException();
}
if (exception != null) {
RuntimeException exception = this.exception;
this.exception = null;
if (pollException != null) {
RuntimeException exception = this.pollException;
this.pollException = null;
throw exception;
}
@ -220,8 +221,20 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
recs.add(record);
}
/**
* @deprecated Use {@link #setPollException(KafkaException)} instead
*/
@Deprecated
public synchronized void setException(KafkaException exception) {
this.exception = exception;
setPollException(exception);
}
public synchronized void setPollException(KafkaException exception) {
this.pollException = exception;
}
public synchronized void setOffsetsException(KafkaException exception) {
this.offsetsException = exception;
}
@Override
@ -393,6 +406,11 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
@Override
public synchronized Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions) {
if (offsetsException != null) {
RuntimeException exception = this.offsetsException;
this.offsetsException = null;
throw exception;
}
Map<TopicPartition, Long> result = new HashMap<>();
for (TopicPartition tp : partitions) {
Long beginningOffset = beginningOffsets.get(tp);
@ -405,6 +423,11 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
@Override
public synchronized Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions) {
if (offsetsException != null) {
RuntimeException exception = this.offsetsException;
this.offsetsException = null;
throw exception;
}
Map<TopicPartition, Long> result = new HashMap<>();
for (TopicPartition tp : partitions) {
Long endOffset = getEndOffset(endOffsets.get(tp));

View File

@ -28,6 +28,7 @@ import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
@ -312,6 +313,10 @@ public class KafkaBasedLog<K, V> {
try {
readToLogEnd();
log.trace("Finished read to end log for topic {}", topic);
} catch (TimeoutException e) {
log.warn("Timeout while reading log to end for topic '{}'. Retrying automatically. " +
"This may occur when brokers are unavailable or unreachable. Reason: {}", topic, e.getMessage());
continue;
} catch (WakeupException e) {
// Either received another get() call and need to retry reading to end of log or stop() was
// called. Both are handled by restarting this loop.

View File

@ -30,6 +30,7 @@ import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.LeaderNotAvailableException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.TimestampType;
@ -370,7 +371,7 @@ public class KafkaBasedLogTest {
}
@Test
public void testConsumerError() throws Exception {
public void testPollConsumerError() throws Exception {
expectStart();
expectStop();
@ -388,7 +389,7 @@ public class KafkaBasedLogTest {
consumer.schedulePollTask(new Runnable() {
@Override
public void run() {
consumer.setException(Errors.COORDINATOR_NOT_AVAILABLE.exception());
consumer.setPollException(Errors.COORDINATOR_NOT_AVAILABLE.exception());
}
});
@ -423,6 +424,77 @@ public class KafkaBasedLogTest {
PowerMock.verifyAll();
}
@Test
public void testGetOffsetsConsumerErrorOnReadToEnd() throws Exception {
expectStart();
// Producer flushes when read to log end is called
producer.flush();
PowerMock.expectLastCall();
expectStop();
PowerMock.replayAll();
final CountDownLatch finishedLatch = new CountDownLatch(1);
Map<TopicPartition, Long> endOffsets = new HashMap<>();
endOffsets.put(TP0, 0L);
endOffsets.put(TP1, 0L);
consumer.updateEndOffsets(endOffsets);
store.start();
final AtomicBoolean getInvoked = new AtomicBoolean(false);
final FutureCallback<Void> readEndFutureCallback = new FutureCallback<>(new Callback<Void>() {
@Override
public void onCompletion(Throwable error, Void result) {
getInvoked.set(true);
}
});
consumer.schedulePollTask(new Runnable() {
@Override
public void run() {
// Once we're synchronized in a poll, start the read to end and schedule the exact set of poll events
// that should follow. This readToEnd call will immediately wakeup this consumer.poll() call without
// returning any data.
Map<TopicPartition, Long> newEndOffsets = new HashMap<>();
newEndOffsets.put(TP0, 1L);
newEndOffsets.put(TP1, 1L);
consumer.updateEndOffsets(newEndOffsets);
// Set exception to occur when getting offsets to read log to end. It'll be caught in the work thread,
// which will retry and eventually get the correct offsets and read log to end.
consumer.setOffsetsException(new TimeoutException("Failed to get offsets by times"));
store.readToEnd(readEndFutureCallback);
// Should keep polling until it reaches current log end offset for all partitions
consumer.scheduleNopPollTask();
consumer.scheduleNopPollTask();
consumer.schedulePollTask(new Runnable() {
@Override
public void run() {
consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP0_KEY, TP0_VALUE));
consumer.addRecord(new ConsumerRecord<>(TOPIC, 1, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP0_KEY, TP0_VALUE_NEW));
}
});
consumer.schedulePollTask(new Runnable() {
@Override
public void run() {
finishedLatch.countDown();
}
});
}
});
readEndFutureCallback.get(10000, TimeUnit.MILLISECONDS);
assertTrue(getInvoked.get());
assertTrue(finishedLatch.await(10000, TimeUnit.MILLISECONDS));
assertEquals(CONSUMER_ASSIGNMENT, consumer.assignment());
assertEquals(1L, consumer.position(TP0));
store.stop();
assertFalse(Whitebox.<Thread>getInternalState(store, "thread").isAlive());
assertTrue(consumer.closed());
PowerMock.verifyAll();
}
@Test
public void testProducerError() throws Exception {
expectStart();

View File

@ -302,7 +302,7 @@ public class GlobalStateManagerImplTest {
@Test
public void shouldRecoverFromInvalidOffsetExceptionAndRestoreRecords() {
initializeConsumer(2, 0, t1);
consumer.setException(new InvalidOffsetException("Try Again!") {
consumer.setPollException(new InvalidOffsetException("Try Again!") {
public Set<TopicPartition> partitions() {
return Collections.singleton(t1);
}

View File

@ -236,7 +236,7 @@ public class GlobalStreamThreadTest {
10 * 1000,
"Input record never consumed");
mockConsumer.setException(new InvalidOffsetException("Try Again!") {
mockConsumer.setPollException(new InvalidOffsetException("Try Again!") {
@Override
public Set<TopicPartition> partitions() {
return Collections.singleton(topicPartition);

View File

@ -158,7 +158,7 @@ public class StoreChangelogReaderTest {
public void shouldRecoverFromInvalidOffsetExceptionAndFinishRestore() {
final int messages = 10;
setupConsumer(messages, topicPartition);
consumer.setException(new InvalidOffsetException("Try Again!") {
consumer.setPollException(new InvalidOffsetException("Try Again!") {
@Override
public Set<TopicPartition> partitions() {
return Collections.singleton(topicPartition);

View File

@ -1409,7 +1409,7 @@ public class StreamThreadTest {
() -> mockRestoreConsumer.position(changelogPartition) == 1L,
"Never restore first record");
mockRestoreConsumer.setException(new InvalidOffsetException("Try Again!") {
mockRestoreConsumer.setPollException(new InvalidOffsetException("Try Again!") {
@Override
public Set<TopicPartition> partitions() {
return changelogPartitionSet;