From e11b515ef35475e8940cf2655a6d3a5d2acecbe3 Mon Sep 17 00:00:00 2001 From: Chris Egerton Date: Thu, 1 Dec 2022 11:20:55 -0500 Subject: [PATCH] KAFKA-14017: Implement new KIP-618 APIs in FileStreamSourceConnector (#12355) Reviewers: Yash Mayya , Maison --- .../file/FileStreamSourceConnector.java | 15 +++++++++++++++ .../file/FileStreamSourceConnectorTest.java | 19 +++++++++++++++++++ 2 files changed, 34 insertions(+) diff --git a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java index 61908c61b9b..e732d55f81d 100644 --- a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java +++ b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java @@ -22,6 +22,7 @@ import org.apache.kafka.common.config.ConfigDef.Importance; import org.apache.kafka.common.config.ConfigDef.Type; import org.apache.kafka.common.utils.AppInfoParser; import org.apache.kafka.connect.connector.Task; +import org.apache.kafka.connect.source.ExactlyOnceSupport; import org.apache.kafka.connect.source.SourceConnector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -86,4 +87,18 @@ public class FileStreamSourceConnector extends SourceConnector { public ConfigDef config() { return CONFIG_DEF; } + + @Override + public ExactlyOnceSupport exactlyOnceSupport(Map props) { + AbstractConfig parsedConfig = new AbstractConfig(CONFIG_DEF, props); + filename = parsedConfig.getString(FILE_CONFIG); + // We can provide exactly-once guarantees if reading from a "real" file + // (as long as the file is only appended to over the lifetime of the connector) + // If we're reading from stdin, we can't provide exactly-once guarantees + // since we don't even track offsets + return filename != null && !filename.isEmpty() + ? ExactlyOnceSupport.SUPPORTED + : ExactlyOnceSupport.UNSUPPORTED; + } + } diff --git a/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceConnectorTest.java b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceConnectorTest.java index 0ade9ba79bc..d3b0265bc89 100644 --- a/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceConnectorTest.java +++ b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceConnectorTest.java @@ -20,6 +20,8 @@ import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.config.ConfigValue; import org.apache.kafka.connect.connector.ConnectorContext; +import org.apache.kafka.connect.source.ConnectorTransactionBoundaries; +import org.apache.kafka.connect.source.ExactlyOnceSupport; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -59,6 +61,23 @@ public class FileStreamSourceConnectorTest { } } + @Test + public void testExactlyOnceSupport() { + sourceProperties.put(FileStreamSourceConnector.FILE_CONFIG, FILENAME); + assertEquals(ExactlyOnceSupport.SUPPORTED, connector.exactlyOnceSupport(sourceProperties)); + + sourceProperties.put(FileStreamSourceConnector.FILE_CONFIG, " "); + assertEquals(ExactlyOnceSupport.UNSUPPORTED, connector.exactlyOnceSupport(sourceProperties)); + + sourceProperties.remove(FileStreamSourceConnector.FILE_CONFIG); + assertEquals(ExactlyOnceSupport.UNSUPPORTED, connector.exactlyOnceSupport(sourceProperties)); + } + + @Test + public void testTransactionBoundaryDefinition() { + assertEquals(ConnectorTransactionBoundaries.UNSUPPORTED, connector.canDefineTransactionBoundaries(sourceProperties)); + } + @Test public void testSourceTasks() { connector.start(sourceProperties);