diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java index a3d9b036c24..27279ea9000 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java @@ -494,7 +494,7 @@ class WorkerSourceTask extends WorkerTask { this.committableOffsets = CommittableOffsets.EMPTY; } - if (committableOffsets.isEmpty()) { + if (offsetsToCommit.isEmpty()) { log.debug("{} Either no records were produced by the task since the last offset commit, " + "or every record has been filtered out by a transformation " + "or dropped due to transformation or conversion errors.", @@ -503,15 +503,15 @@ class WorkerSourceTask extends WorkerTask { // We continue with the offset commit process here instead of simply returning immediately // in order to invoke SourceTask::commit and record metrics for a successful offset commit } else { - log.info("{} Committing offsets for {} acknowledged messages", this, committableOffsets.numCommittableMessages()); - if (committableOffsets.hasPending()) { + log.info("{} Committing offsets for {} acknowledged messages", this, offsetsToCommit.numCommittableMessages()); + if (offsetsToCommit.hasPending()) { log.debug("{} There are currently {} pending messages spread across {} source partitions whose offsets will not be committed. " + "The source partition with the most pending messages is {}, with {} pending messages", this, - committableOffsets.numUncommittableMessages(), - committableOffsets.numDeques(), - committableOffsets.largestDequePartition(), - committableOffsets.largestDequeSize() + offsetsToCommit.numUncommittableMessages(), + offsetsToCommit.numDeques(), + offsetsToCommit.largestDequePartition(), + offsetsToCommit.largestDequeSize() ); } else { log.debug("{} There are currently no pending messages for this offset commit; "