KAFKA-14017: Implement new KIP-618 APIs in FileStreamSourceConnector (#12355)

Reviewers: Yash Mayya <yash.mayya@gmail.com>,  Maison <mickael.maison@gmail.com>
This commit is contained in:
Chris Egerton 2022-12-01 11:20:55 -05:00 committed by GitHub
parent b56e71faee
commit e11b515ef3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 34 additions and 0 deletions

View File

@ -22,6 +22,7 @@ import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.source.ExactlyOnceSupport;
import org.apache.kafka.connect.source.SourceConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -86,4 +87,18 @@ public class FileStreamSourceConnector extends SourceConnector {
public ConfigDef config() {
return CONFIG_DEF;
}
@Override
public ExactlyOnceSupport exactlyOnceSupport(Map<String, String> props) {
AbstractConfig parsedConfig = new AbstractConfig(CONFIG_DEF, props);
filename = parsedConfig.getString(FILE_CONFIG);
// We can provide exactly-once guarantees if reading from a "real" file
// (as long as the file is only appended to over the lifetime of the connector)
// If we're reading from stdin, we can't provide exactly-once guarantees
// since we don't even track offsets
return filename != null && !filename.isEmpty()
? ExactlyOnceSupport.SUPPORTED
: ExactlyOnceSupport.UNSUPPORTED;
}
}

View File

@ -20,6 +20,8 @@ import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.connect.connector.ConnectorContext;
import org.apache.kafka.connect.source.ConnectorTransactionBoundaries;
import org.apache.kafka.connect.source.ExactlyOnceSupport;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@ -59,6 +61,23 @@ public class FileStreamSourceConnectorTest {
}
}
@Test
public void testExactlyOnceSupport() {
sourceProperties.put(FileStreamSourceConnector.FILE_CONFIG, FILENAME);
assertEquals(ExactlyOnceSupport.SUPPORTED, connector.exactlyOnceSupport(sourceProperties));
sourceProperties.put(FileStreamSourceConnector.FILE_CONFIG, " ");
assertEquals(ExactlyOnceSupport.UNSUPPORTED, connector.exactlyOnceSupport(sourceProperties));
sourceProperties.remove(FileStreamSourceConnector.FILE_CONFIG);
assertEquals(ExactlyOnceSupport.UNSUPPORTED, connector.exactlyOnceSupport(sourceProperties));
}
@Test
public void testTransactionBoundaryDefinition() {
assertEquals(ConnectorTransactionBoundaries.UNSUPPORTED, connector.canDefineTransactionBoundaries(sourceProperties));
}
@Test
public void testSourceTasks() {
connector.start(sourceProperties);