MINOR: Allow writing tombstone offsets for arbitrary partitions in the FileStreamSourceConnector (#14234)

Reviewers: Chris Egerton <chrise@aiven.io>
This commit is contained in:
Yash Mayya 2023-08-17 19:13:53 +01:00 committed by GitHub
parent a253dc6643
commit 7802c264c9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 25 additions and 10 deletions

View File

@ -117,18 +117,10 @@ public class FileStreamSourceConnector extends SourceConnector {
// This connector makes use of a single source partition at a time which represents the file that it is configured to read from.
// However, there could also be source partitions from previous configurations of the connector.
for (Map.Entry<Map<String, ?>, Map<String, ?>> partitionOffset : offsets.entrySet()) {
Map<String, ?> partition = partitionOffset.getKey();
if (partition == null) {
throw new ConnectException("Partition objects cannot be null");
}
if (!partition.containsKey(FILENAME_FIELD)) {
throw new ConnectException("Partition objects should contain the key '" + FILENAME_FIELD + "'");
}
Map<String, ?> offset = partitionOffset.getValue();
// null offsets are allowed and represent a deletion of offsets for a partition
if (offset == null) {
// We allow tombstones for anything; if there's garbage in the offsets for the connector, we don't
// want to prevent users from being able to clean it up using the REST API
continue;
}
@ -145,6 +137,15 @@ public class FileStreamSourceConnector extends SourceConnector {
if (offsetPosition < 0) {
throw new ConnectException("The value for the '" + POSITION_FIELD + "' key in the offset should be a non-negative value");
}
Map<String, ?> partition = partitionOffset.getKey();
if (partition == null) {
throw new ConnectException("Partition objects cannot be null");
}
if (!partition.containsKey(FILENAME_FIELD)) {
throw new ConnectException("Partition objects should contain the key '" + FILENAME_FIELD + "'");
}
}
// Let the task check whether the actual value for the offset position is valid for the configured file on startup

View File

@ -227,4 +227,18 @@ public class FileStreamSourceConnectorTest {
assertTrue(connector.alterOffsets(sourceProperties, offsets));
assertTrue(connector.alterOffsets(sourceProperties, new HashMap<>()));
}
@Test
public void testAlterOffsetsTombstones() {
Function<Map<String, ?>, Boolean> alterOffsets = partition -> connector.alterOffsets(
sourceProperties,
Collections.singletonMap(partition, null)
);
assertTrue(alterOffsets.apply(null));
assertTrue(alterOffsets.apply(Collections.emptyMap()));
assertTrue(alterOffsets.apply(Collections.singletonMap(FILENAME_FIELD, FILENAME)));
assertTrue(alterOffsets.apply(Collections.singletonMap(FILENAME_FIELD, "/someotherfilename")));
assertTrue(alterOffsets.apply(Collections.singletonMap("garbage_partition_key", "garbage_partition_value")));
}
}