KAFKA-18294 Remove deprecated SourceTask#commitRecord (#18260)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Nick Guo 2024-12-19 01:06:33 +08:00 committed by GitHub
parent 21b7bb2265
commit 0163fa2d06
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 9 additions and 34 deletions

View File

@ -131,28 +131,6 @@ public abstract class SourceTask implements Task {
@Override
public abstract void stop();
/**
* <p>
* Commit an individual {@link SourceRecord} when the callback from the producer client is received. This method is
* also called when a record is filtered by a transformation, and thus will never be ACK'd by a broker.
* <p>
* This is an alias for {@link #commitRecord(SourceRecord, RecordMetadata)} for backwards compatibility. The default
* implementation of {@link #commitRecord(SourceRecord, RecordMetadata)} just calls this method. It is not necessary
* to override both methods.
* <p>
* SourceTasks are not required to implement this functionality; Kafka Connect will record offsets
* automatically. This hook is provided for systems that also need to store offsets internally
* in their own system.
*
* @param record {@link SourceRecord} that was successfully sent via the producer or filtered by a transformation
* @throws InterruptedException
* @deprecated Use {@link #commitRecord(SourceRecord, RecordMetadata)} instead.
*/
@Deprecated
public void commitRecord(SourceRecord record) throws InterruptedException {
// This space intentionally left blank.
}
/**
* <p>
* Commit an individual {@link SourceRecord} when the callback from the producer client is received. This method is
@ -164,8 +142,7 @@ public abstract class SourceTask implements Task {
* automatically. This hook is provided for systems that also need to store offsets internally
* in their own system.
* <p>
* The default implementation just calls {@link #commitRecord(SourceRecord)}, which is a nop by default. It is
* not necessary to implement both methods.
* The default implementation is a nop. It is not necessary to implement the method.
*
* @param record {@link SourceRecord} that was successfully sent via the producer, filtered by a transformation, or dropped on producer exception
* @param metadata {@link RecordMetadata} record metadata returned from the broker, or null if the record was filtered or if producer exceptions are ignored
@ -173,7 +150,6 @@ public abstract class SourceTask implements Task {
*/
public void commitRecord(SourceRecord record, RecordMetadata metadata)
throws InterruptedException {
// by default, just call other method for backwards compatibility
commitRecord(record);
// by default, just do nothing
}
}

View File

@ -736,16 +736,13 @@ public class BlockingConnectorTest {
super.commit();
}
@Override
@SuppressWarnings("deprecation")
public void commitRecord(SourceRecord record) throws InterruptedException {
block.maybeBlockOn(SOURCE_TASK_COMMIT_RECORD);
super.commitRecord(record);
}
@Override
public void commitRecord(SourceRecord record, RecordMetadata metadata) throws InterruptedException {
block.maybeBlockOn(SOURCE_TASK_COMMIT_RECORD_WITH_METADATA);
if (metadata == null) {
block.maybeBlockOn(SOURCE_TASK_COMMIT_RECORD);
} else {
block.maybeBlockOn(SOURCE_TASK_COMMIT_RECORD_WITH_METADATA);
}
super.commitRecord(record, metadata);
}
}

View File

@ -165,6 +165,8 @@
<li>The <code>onPartitionsRevoked(Collection&lt;TopicPartition&gt;)</code> and <code>onPartitionsAssigned(Collection&lt;TopicPartition&gt;)</code> methods
were removed from <code>SinkTask</code>.
</li>
<li>The <code>commitRecord(SourceRecord)</code> method was removed from <code>SourceTask</code>.
</li>
</ul>
</li>
<li><b>Consumer</b>