From 0163fa2d06559d4e7a6348d92434a5c21661e4cd Mon Sep 17 00:00:00 2001 From: Nick Guo Date: Thu, 19 Dec 2024 01:06:33 +0800 Subject: [PATCH] KAFKA-18294 Remove deprecated SourceTask#commitRecord (#18260) Reviewers: Chia-Ping Tsai --- .../kafka/connect/source/SourceTask.java | 28 ++----------------- .../integration/BlockingConnectorTest.java | 13 ++++----- docs/upgrade.html | 2 ++ 3 files changed, 9 insertions(+), 34 deletions(-) diff --git a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java b/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java index abe3e36bf1e..c8dd4db6d37 100644 --- a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java +++ b/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java @@ -131,28 +131,6 @@ public abstract class SourceTask implements Task { @Override public abstract void stop(); - /** - *

- * Commit an individual {@link SourceRecord} when the callback from the producer client is received. This method is - * also called when a record is filtered by a transformation, and thus will never be ACK'd by a broker. - *

- * This is an alias for {@link #commitRecord(SourceRecord, RecordMetadata)} for backwards compatibility. The default - * implementation of {@link #commitRecord(SourceRecord, RecordMetadata)} just calls this method. It is not necessary - * to override both methods. - *

- * SourceTasks are not required to implement this functionality; Kafka Connect will record offsets - * automatically. This hook is provided for systems that also need to store offsets internally - * in their own system. - * - * @param record {@link SourceRecord} that was successfully sent via the producer or filtered by a transformation - * @throws InterruptedException - * @deprecated Use {@link #commitRecord(SourceRecord, RecordMetadata)} instead. - */ - @Deprecated - public void commitRecord(SourceRecord record) throws InterruptedException { - // This space intentionally left blank. - } - /** *

* Commit an individual {@link SourceRecord} when the callback from the producer client is received. This method is @@ -164,8 +142,7 @@ public abstract class SourceTask implements Task { * automatically. This hook is provided for systems that also need to store offsets internally * in their own system. *

- * The default implementation just calls {@link #commitRecord(SourceRecord)}, which is a nop by default. It is - * not necessary to implement both methods. + * The default implementation is a nop. It is not necessary to implement the method. * * @param record {@link SourceRecord} that was successfully sent via the producer, filtered by a transformation, or dropped on producer exception * @param metadata {@link RecordMetadata} record metadata returned from the broker, or null if the record was filtered or if producer exceptions are ignored @@ -173,7 +150,6 @@ public abstract class SourceTask implements Task { */ public void commitRecord(SourceRecord record, RecordMetadata metadata) throws InterruptedException { - // by default, just call other method for backwards compatibility - commitRecord(record); + // by default, just do nothing } } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java index 54aa1bb1908..17135a69366 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java @@ -736,16 +736,13 @@ public class BlockingConnectorTest { super.commit(); } - @Override - @SuppressWarnings("deprecation") - public void commitRecord(SourceRecord record) throws InterruptedException { - block.maybeBlockOn(SOURCE_TASK_COMMIT_RECORD); - super.commitRecord(record); - } - @Override public void commitRecord(SourceRecord record, RecordMetadata metadata) throws InterruptedException { - block.maybeBlockOn(SOURCE_TASK_COMMIT_RECORD_WITH_METADATA); + if (metadata == null) { + block.maybeBlockOn(SOURCE_TASK_COMMIT_RECORD); + } else { + block.maybeBlockOn(SOURCE_TASK_COMMIT_RECORD_WITH_METADATA); + } super.commitRecord(record, metadata); } } diff --git a/docs/upgrade.html b/docs/upgrade.html index fa420edd2cd..14e19ef4162 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -165,6 +165,8 @@

  • The onPartitionsRevoked(Collection<TopicPartition>) and onPartitionsAssigned(Collection<TopicPartition>) methods were removed from SinkTask.
  • +
  • The commitRecord(SourceRecord) method was removed from SourceTask. +
  • Consumer