diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java index 6a2a787578e..6e2350fae08 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java @@ -28,7 +28,9 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.RetriableException; import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.utils.Time; import org.apache.kafka.connect.errors.ConnectException; @@ -318,32 +320,8 @@ public class KafkaBasedLog { } private void readToLogEnd() { - log.trace("Reading to end of offset log"); - Set assignment = consumer.assignment(); - Map endOffsets; - // Note that we'd prefer to not use the consumer to find the end offsets for the assigned topic partitions. - // That is because it's possible that the consumer is already blocked waiting for new records to appear, when - // the consumer is already at the end. In such cases, using 'consumer.endOffsets(...)' will block until at least - // one more record becomes available, meaning we can't even check whether we're at the end offset. - // Since all we're trying to do here is get the end offset, we should use the supplied admin client - // (if available) - // (which prevents 'consumer.endOffsets(...)' - // from - - // Deprecated constructors do not provide an admin supplier, so the admin is potentially null. - if (admin != null) { - // Use the admin client to immediately find the end offsets for the assigned topic partitions. - // Unlike using the consumer - endOffsets = admin.endOffsets(assignment); - } else { - // The admin may be null if older deprecated constructor is used, though AK Connect currently always provides an admin client. - // Using the consumer is not ideal, because when the topic has low volume, the 'poll(...)' method called from the - // work thread may have blocked the consumer while waiting for more records (even when there are none). - // In such cases, this call to the consumer to simply find the end offsets will block even though we might already be - // at the end offset. - endOffsets = consumer.endOffsets(assignment); - } + Map endOffsets = readEndOffsets(assignment); log.trace("Reading to end of log offsets {}", endOffsets); while (!endOffsets.isEmpty()) { @@ -366,6 +344,37 @@ public class KafkaBasedLog { } } + // Visible for testing + Map readEndOffsets(Set assignment) { + log.trace("Reading to end of offset log"); + + // Note that we'd prefer to not use the consumer to find the end offsets for the assigned topic partitions. + // That is because it's possible that the consumer is already blocked waiting for new records to appear, when + // the consumer is already at the end. In such cases, using 'consumer.endOffsets(...)' will block until at least + // one more record becomes available, meaning we can't even check whether we're at the end offset. + // Since all we're trying to do here is get the end offset, we should use the supplied admin client + // (if available) to obtain the end offsets for the given topic partitions. + + // Deprecated constructors do not provide an admin supplier, so the admin is potentially null. + if (admin != null) { + // Use the admin client to immediately find the end offsets for the assigned topic partitions. + // Unlike using the consumer + try { + return admin.endOffsets(assignment); + } catch (UnsupportedVersionException e) { + // This may happen with really old brokers that don't support the auto topic creation + // field in metadata requests + log.debug("Reading to end of log offsets with consumer since admin client is unsupported: {}", e.getMessage()); + // Forget the reference to the admin so that we won't even try to use the admin the next time this method is called + admin = null; + // continue and let the consumer handle the read + } + // Other errors, like timeouts and retriable exceptions are intentionally propagated + } + // The admin may be null if older deprecated constructor is used or if the admin client is using a broker that doesn't + // support getting the end offsets (e.g., 0.10.x). In such cases, we should use the consumer, which is not ideal (see above). + return consumer.endOffsets(assignment); + } private class WorkThread extends Thread { public WorkThread() { @@ -390,7 +399,11 @@ public class KafkaBasedLog { log.trace("Finished read to end log for topic {}", topic); } catch (TimeoutException e) { log.warn("Timeout while reading log to end for topic '{}'. Retrying automatically. " + - "This may occur when brokers are unavailable or unreachable. Reason: {}", topic, e.getMessage()); + "This may occur when brokers are unavailable or unreachable. Reason: {}", topic, e.getMessage()); + continue; + } catch (RetriableException | org.apache.kafka.connect.errors.RetriableException e) { + log.warn("Retriable error while reading log to end for topic '{}'. Retrying automatically. " + + "Reason: {}", topic, e.getMessage()); continue; } catch (WakeupException e) { // Either received another get() call and need to retry reading to end of log or stop() was diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java index 9a7907bcdaf..9661c69e63f 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java @@ -651,8 +651,12 @@ public class TopicAdmin implements AutoCloseable { * @param partitions the topic partitions * @return the map of offset for each topic partition, or an empty map if the supplied partitions * are null or empty - * @throws RetriableException if a retriable error occurs, the operation takes too long, or the - * thread is interrupted while attempting to perform this operation + * @throws UnsupportedVersionException if the admin client cannot read end offsets + * @throws TimeoutException if the offset metadata could not be fetched before the amount of time allocated + * by {@code request.timeout.ms} expires, and this call can be retried + * @throws LeaderNotAvailableException if the leader was not available and this call can be retried + * @throws RetriableException if a retriable error occurs, or the thread is interrupted while attempting + * to perform this operation * @throws ConnectException if a non retriable error occurs */ public Map endOffsets(Set partitions) { @@ -677,13 +681,15 @@ public class TopicAdmin implements AutoCloseable { // Should theoretically never happen, because this method is the same as what the consumer uses and therefore // should exist in the broker since before the admin client was added String msg = String.format("API to get the get the end offsets for topic '%s' is unsupported on brokers at %s", topic, bootstrapServers()); - throw new ConnectException(msg, e); + throw new UnsupportedVersionException(msg, e); } else if (cause instanceof TimeoutException) { String msg = String.format("Timed out while waiting to get end offsets for topic '%s' on brokers at %s", topic, bootstrapServers()); - throw new RetriableException(msg, e); + throw new TimeoutException(msg, e); } else if (cause instanceof LeaderNotAvailableException) { String msg = String.format("Unable to get end offsets during leader election for topic '%s' on brokers at %s", topic, bootstrapServers()); - throw new RetriableException(msg, e); + throw new LeaderNotAvailableException(msg, e); + } else if (cause instanceof org.apache.kafka.common.errors.RetriableException) { + throw (org.apache.kafka.common.errors.RetriableException) cause; } else { String msg = String.format("Error while getting end offsets for topic '%s' on brokers at %s", topic, bootstrapServers()); throw new ConnectException(msg, e); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java index 15bf8ca9b4f..e36f2a902fa 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java @@ -31,6 +31,7 @@ import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.LeaderNotAvailableException; import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.record.TimestampType; @@ -61,11 +62,13 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; @RunWith(PowerMockRunner.class) @@ -117,6 +120,8 @@ public class KafkaBasedLogTest { @Mock private KafkaProducer producer; private MockConsumer consumer; + @Mock + private TopicAdmin admin; private Map>> consumedRecords = new HashMap<>(); private Callback> consumedCallback = (error, record) -> { @@ -463,15 +468,91 @@ public class KafkaBasedLogTest { PowerMock.verifyAll(); } + @Test + public void testReadEndOffsetsUsingAdmin() throws Exception { + // Create a log that uses the admin supplier + setupWithAdmin(); + expectProducerAndConsumerCreate(); + + Set tps = new HashSet<>(Arrays.asList(TP0, TP1)); + Map endOffsets = new HashMap<>(); + endOffsets.put(TP0, 0L); + endOffsets.put(TP1, 0L); + admin.endOffsets(EasyMock.eq(tps)); + PowerMock.expectLastCall().andReturn(endOffsets).times(2); + + PowerMock.replayAll(); + + store.start(); + assertEquals(endOffsets, store.readEndOffsets(tps)); + } + + @Test + public void testReadEndOffsetsUsingAdminThatFailsWithUnsupported() throws Exception { + // Create a log that uses the admin supplier + setupWithAdmin(); + expectProducerAndConsumerCreate(); + + Set tps = new HashSet<>(Arrays.asList(TP0, TP1)); + // Getting end offsets using the admin client should fail with unsupported version + admin.endOffsets(EasyMock.eq(tps)); + PowerMock.expectLastCall().andThrow(new UnsupportedVersionException("too old")); + + // Falls back to the consumer + Map endOffsets = new HashMap<>(); + endOffsets.put(TP0, 0L); + endOffsets.put(TP1, 0L); + consumer.updateEndOffsets(endOffsets); + + PowerMock.replayAll(); + + store.start(); + assertEquals(endOffsets, store.readEndOffsets(tps)); + } + + @Test + public void testReadEndOffsetsUsingAdminThatFailsWithRetriable() throws Exception { + // Create a log that uses the admin supplier + setupWithAdmin(); + expectProducerAndConsumerCreate(); + + Set tps = new HashSet<>(Arrays.asList(TP0, TP1)); + Map endOffsets = new HashMap<>(); + endOffsets.put(TP0, 0L); + endOffsets.put(TP1, 0L); + // Getting end offsets upon startup should work fine + admin.endOffsets(EasyMock.eq(tps)); + PowerMock.expectLastCall().andReturn(endOffsets).times(1); + // Getting end offsets using the admin client should fail with leader not available + admin.endOffsets(EasyMock.eq(tps)); + PowerMock.expectLastCall().andThrow(new LeaderNotAvailableException("retry")); + + PowerMock.replayAll(); + + store.start(); + assertThrows(LeaderNotAvailableException.class, () -> store.readEndOffsets(tps)); + } + + @SuppressWarnings("unchecked") + private void setupWithAdmin() { + Supplier adminSupplier = () -> admin; + java.util.function.Consumer initializer = admin -> { }; + store = PowerMock.createPartialMock(KafkaBasedLog.class, new String[]{"createConsumer", "createProducer"}, + TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, adminSupplier, consumedCallback, time, initializer); + } + + private void expectProducerAndConsumerCreate() throws Exception { + PowerMock.expectPrivate(store, "createProducer") + .andReturn(producer); + PowerMock.expectPrivate(store, "createConsumer") + .andReturn(consumer); + } private void expectStart() throws Exception { initializer.run(); EasyMock.expectLastCall().times(1); - PowerMock.expectPrivate(store, "createProducer") - .andReturn(producer); - PowerMock.expectPrivate(store, "createConsumer") - .andReturn(consumer); + expectProducerAndConsumerCreate(); } private void expectStop() { diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java index 9ba0b1d0aab..edd989125b4 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java @@ -33,6 +33,7 @@ import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.config.TopicConfig; import org.apache.kafka.common.errors.ClusterAuthorizationException; +import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.errors.TopicAuthorizationException; import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.message.CreateTopicsResponseData; @@ -50,7 +51,6 @@ import org.apache.kafka.common.requests.ListOffsetsResponse; import org.apache.kafka.common.requests.MetadataResponse; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.connect.errors.ConnectException; -import org.apache.kafka.connect.errors.RetriableException; import org.junit.Test; import java.util.ArrayList; @@ -485,7 +485,7 @@ public class TopicAdminTest { } @Test - public void endOffsetsShouldFailWithNonRetriableWhenVersionUnsupportedErrorOccurs() { + public void endOffsetsShouldFailWithUnsupportedVersionWhenVersionUnsupportedErrorOccurs() { String topicName = "myTopic"; TopicPartition tp1 = new TopicPartition(topicName, 0); Set tps = Collections.singleton(tp1); @@ -496,15 +496,14 @@ public class TopicAdminTest { env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE)); env.kafkaClient().prepareResponse(listOffsetsResultWithUnsupportedVersion(tp1, offset)); TopicAdmin admin = new TopicAdmin(null, env.adminClient()); - ConnectException e = assertThrows(ConnectException.class, () -> { + UnsupportedVersionException e = assertThrows(UnsupportedVersionException.class, () -> { admin.endOffsets(tps); }); - assertTrue(e.getMessage().contains("is unsupported on brokers")); } } @Test - public void endOffsetsShouldFailWithRetriableWhenTimeoutErrorOccurs() { + public void endOffsetsShouldFailWithTimeoutExceptionWhenTimeoutErrorOccurs() { String topicName = "myTopic"; TopicPartition tp1 = new TopicPartition(topicName, 0); Set tps = Collections.singleton(tp1); @@ -515,10 +514,9 @@ public class TopicAdminTest { env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE)); env.kafkaClient().prepareResponse(listOffsetsResultWithTimeout(tp1, offset)); TopicAdmin admin = new TopicAdmin(null, env.adminClient()); - RetriableException e = assertThrows(RetriableException.class, () -> { + TimeoutException e = assertThrows(TimeoutException.class, () -> { admin.endOffsets(tps); }); - assertTrue(e.getMessage().contains("Timed out while waiting")); } }