diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java index 71e026c7176..bb1ad77b154 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java @@ -21,7 +21,6 @@ import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.KafkaException; -import org.apache.kafka.common.errors.RetriableException; import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.metrics.stats.Avg; @@ -31,6 +30,7 @@ import org.apache.kafka.common.metrics.stats.Total; import org.apache.kafka.common.metrics.stats.Value; import org.apache.kafka.common.utils.Time; import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.errors.RetriableException; import org.apache.kafka.connect.header.Header; import org.apache.kafka.connect.header.Headers; import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup; @@ -243,7 +243,7 @@ class WorkerSourceTask extends WorkerTask { protected List poll() throws InterruptedException { try { return task.poll(); - } catch (RetriableException e) { + } catch (RetriableException | org.apache.kafka.common.errors.RetriableException e) { log.warn("{} failed to poll records from SourceTask. Will retry operation.", this, e); // Do nothing. Let the framework poll whenever it's ready. return null; @@ -341,7 +341,7 @@ class WorkerSourceTask extends WorkerTask { } }); lastSendFailed = false; - } catch (RetriableException e) { + } catch (org.apache.kafka.common.errors.RetriableException e) { log.warn("{} Failed to send {}, backing off before retrying:", this, producerRecord, e); toSend = toSend.subList(processed, toSend.size()); lastSendFailed = true;