KAFKA-14809 Fix logging conditional on WorkerSourceTask (#13386)

Reviewers: Chris Egerton <chrise@aiven.io>
This commit is contained in:
Hector Geraldino 2023-03-16 08:39:31 -04:00 committed by Chris Egerton
parent d1d3b5a486
commit 644a2dcec2
No known key found for this signature in database
GPG Key ID: B90BFC8C4393F2F0
1 changed files with 7 additions and 7 deletions

View File

@ -494,7 +494,7 @@ class WorkerSourceTask extends WorkerTask {
this.committableOffsets = CommittableOffsets.EMPTY;
}
if (committableOffsets.isEmpty()) {
if (offsetsToCommit.isEmpty()) {
log.debug("{} Either no records were produced by the task since the last offset commit, "
+ "or every record has been filtered out by a transformation "
+ "or dropped due to transformation or conversion errors.",
@ -503,15 +503,15 @@ class WorkerSourceTask extends WorkerTask {
// We continue with the offset commit process here instead of simply returning immediately
// in order to invoke SourceTask::commit and record metrics for a successful offset commit
} else {
log.info("{} Committing offsets for {} acknowledged messages", this, committableOffsets.numCommittableMessages());
if (committableOffsets.hasPending()) {
log.info("{} Committing offsets for {} acknowledged messages", this, offsetsToCommit.numCommittableMessages());
if (offsetsToCommit.hasPending()) {
log.debug("{} There are currently {} pending messages spread across {} source partitions whose offsets will not be committed. "
+ "The source partition with the most pending messages is {}, with {} pending messages",
this,
committableOffsets.numUncommittableMessages(),
committableOffsets.numDeques(),
committableOffsets.largestDequePartition(),
committableOffsets.largestDequeSize()
offsetsToCommit.numUncommittableMessages(),
offsetsToCommit.numDeques(),
offsetsToCommit.largestDequePartition(),
offsetsToCommit.largestDequeSize()
);
} else {
log.debug("{} There are currently no pending messages for this offset commit; "