KAFKA-8320 : fix retriable exception package for source connectors (#6675)

WorkerSourceTask is catching the exception from wrong package org.apache.kafka.common.errors. It is not clear from the API standpoint as to which package the connect framework supports - the one from common or connect. The safest thing would be to support both the packages even though it's less desirable.

Author: Magesh Nandakumar <magesh.n.kumar@gmail.com>
Reviewers: Arjun Satish <arjun@confluent.io>, Randall Hauch <rhauch@gmail.com>
This commit is contained in:
Magesh Nandakumar 2019-05-15 15:20:20 -07:00 committed by Randall Hauch
parent 2208f9966d
commit 5928ffd0dc
1 changed files with 3 additions and 3 deletions

View File

@ -21,7 +21,6 @@ import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
@ -31,6 +30,7 @@ import org.apache.kafka.common.metrics.stats.Total;
import org.apache.kafka.common.metrics.stats.Value;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.header.Headers;
import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
@ -243,7 +243,7 @@ class WorkerSourceTask extends WorkerTask {
protected List<SourceRecord> poll() throws InterruptedException {
try {
return task.poll();
} catch (RetriableException e) {
} catch (RetriableException | org.apache.kafka.common.errors.RetriableException e) {
log.warn("{} failed to poll records from SourceTask. Will retry operation.", this, e);
// Do nothing. Let the framework poll whenever it's ready.
return null;
@ -341,7 +341,7 @@ class WorkerSourceTask extends WorkerTask {
}
});
lastSendFailed = false;
} catch (RetriableException e) {
} catch (org.apache.kafka.common.errors.RetriableException e) {
log.warn("{} Failed to send {}, backing off before retrying:", this, producerRecord, e);
toSend = toSend.subList(processed, toSend.size());
lastSendFailed = true;