MINOR: demote "Committing task offsets" log to DEBUG (#9489)

Demote "committing offsets" log message to DEBUG and promote/add summarizing INFO level logs in the main StreamThread loop

Reviewers: Boyang Chen <boyang@confluent.io>, Walker Carlson <wcarlson@confluent.io>, John Roesler <john@confluent.io>
This commit is contained in:
A. Sophie Blee-Goldman 2020-11-13 13:25:43 -08:00 committed by GitHub
parent 0b9c7512bf
commit cfc813537e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 14 additions and 6 deletions

View File

@ -630,6 +630,8 @@ public class StreamThread extends Thread {
advanceNowAndComputeLatency();
int totalProcessed = 0;
int totalPunctuated = 0;
int totalCommitted = 0;
long totalCommitLatency = 0L;
long totalProcessLatency = 0L;
long totalPunctuateLatency = 0L;
@ -667,6 +669,7 @@ public class StreamThread extends Thread {
numIterations);
final int punctuated = taskManager.punctuate();
totalPunctuated += punctuated;
final long punctuateLatency = advanceNowAndComputeLatency();
totalPunctuateLatency += punctuateLatency;
if (punctuated > 0) {
@ -676,6 +679,7 @@ public class StreamThread extends Thread {
log.debug("{} punctuators ran.", punctuated);
final int committed = maybeCommit();
totalCommitted += committed;
final long commitLatency = advanceNowAndComputeLatency();
totalCommitLatency += commitLatency;
if (committed > 0) {
@ -703,6 +707,10 @@ public class StreamThread extends Thread {
// we record the ratio out of the while loop so that the accumulated latency spans over
// multiple iterations with reasonably large max.num.records and hence is less vulnerable to outliers
taskManager.recordTaskProcessRatio(totalProcessLatency, now);
log.info("Processed {} total records, ran {} punctuators, and committed {} total tasks " +
"for active tasks {} and standby tasks {}",
totalProcessed, totalPunctuated, totalCommitted, taskManager.activeTaskIds(), taskManager.standbyTaskIds());
}
now = time.milliseconds();
@ -755,7 +763,7 @@ public class StreamThread extends Thread {
// to unblock the restoration as soon as possible
records = pollRequests(Duration.ZERO);
} else if (state == State.PARTITIONS_REVOKED) {
// try to fetch som records with zero poll millis to unblock
// try to fetch some records with zero poll millis to unblock
// other useful work while waiting for the join response
records = pollRequests(Duration.ZERO);
} else if (state == State.RUNNING || state == State.STARTING) {
@ -774,13 +782,13 @@ public class StreamThread extends Thread {
final long pollLatency = advanceNowAndComputeLatency();
if (log.isDebugEnabled()) {
log.debug("Main Consumer poll completed in {} ms and fetched {} records", pollLatency, records.count());
}
final int numRecords = records.count();
log.info("Main Consumer poll completed in {} ms and fetched {} records", pollLatency, numRecords);
pollSensor.record(pollLatency, now);
if (!records.isEmpty()) {
pollRecordsSensor.record(records.count(), now);
pollRecordsSensor.record(numRecords, now);
taskManager.addRecordsToTasks(records);
}
return pollLatency;

View File

@ -1070,7 +1070,7 @@ public class TaskManager {
}
private void commitOffsetsOrTransaction(final Map<TaskId, Map<TopicPartition, OffsetAndMetadata>> offsetsPerTask) {
log.info("Committing task offsets {}", offsetsPerTask);
log.debug("Committing task offsets {}", offsetsPerTask);
if (!offsetsPerTask.isEmpty()) {
if (processingMode == EXACTLY_ONCE_ALPHA) {