KAFKA-16584 Make log processing summary configurable or debug (#16509)

KAFKA-16584 Make log processing summary configurable or debug

Reviewers: Matthias Sax <mjsax@apache.org>, Bill Bejeck <bbejeck@apache.org>
This commit is contained in:
dujian0068 2024-07-24 04:09:25 +08:00 committed by GitHub
parent d43806c7f3
commit 7efb58f321
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 30 additions and 6 deletions

View File

@ -93,6 +93,7 @@ settings.put(... , ...);</code></pre>
<li><a class="reference internal" href="#task-assignor-class" id="id39">task.assignor.class</a></li>
<li><a class="reference internal" href="#topology-optimization" id="id31">topology.optimization</a></li>
<li><a class="reference internal" href="#windowed-inner-class-serde" id="id38">windowed.inner.class.serde</a></li>
<li><a class="reference internal" href="#log-summary-interval-ms" id="id40">log.summary.interval.ms</a></li>
</ul>
</li>
<li><a class="reference internal" href="#kafka-consumers-and-producer-configuration-parameters" id="id16">Kafka consumers and producer configuration parameters</a>
@ -470,6 +471,11 @@ streamsSettings.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);</code></pre>
<td colspan="2">Sets window size for the deserializer in order to calculate window end times.</td>
<td><code class="docutils literal"><span class="pre">null</span></code></td>
</tr>
<tr class="row-even"><td>log.summary.interval.ms</td>
<td>Low</td>
<td colspan="2">Added to a windows maintainMs to ensure data is not deleted from the log prematurely. Allows for clock drift.</td>
<td>120000milliseconds (2 minutes)</td>
</tr>
</tbody>
</table>
<div class="section" id="acceptable-recovery-lag">
@ -1066,6 +1072,16 @@ streamsConfig.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, CustomRocksD
</p>
</div></blockquote>
</div>
<div class="section" id="log-summary-interval-ms">
<h4><a class="toc-backref" href="#id40">log.summary.interval.ms</a><a class="headerlink" href="#log-summary-interval-ms" title="Permalink to this headline"></a></h4>
<blockquote>
<div>
This configuration controls the output interval for summary information.
If greater or equal to 0, the summary log will be output according to the set time interval;
If less than 0, summary output is disabled.
</div>
</blockquote>
</div>
<div class="section" id="upgrade-from">
<span id="streams-developer-guide-upgrade-from"></span><h4><a class="toc-backref" href="#id14">upgrade.from</a><a class="headerlink" href="#upgrade-from" title="Permalink to this headline"></a></h4>
<blockquote>

View File

@ -834,6 +834,10 @@ public class StreamsConfig extends AbstractConfig {
private static final String TASK_ASSIGNOR_CLASS_DOC = "A task assignor class or class name implementing the <code>" +
TaskAssignor.class.getName() + "</code> interface. Defaults to the <code>HighAvailabilityTaskAssignor</code> class.";
public static final String LOG_SUMMARY_INTERVAL_MS_CONFIG = "log.summary.interval.ms";
private static final String LOG_SUMMARY_INTERVAL_MS_DOC = "This configuration controls the output interval for summary information.\n" +
"If greater or equal to 0, the summary log will be output according to the set time interval;\n" +
"If less than 0, summary output is disabled.";
/**
* {@code topology.optimization}
* @deprecated since 2.7; use {@link #TOPOLOGY_OPTIMIZATION_CONFIG} instead
@ -1206,7 +1210,12 @@ public class StreamsConfig extends AbstractConfig {
Type.LONG,
null,
Importance.LOW,
WINDOW_SIZE_MS_DOC);
WINDOW_SIZE_MS_DOC)
.define(LOG_SUMMARY_INTERVAL_MS_CONFIG,
Type.LONG,
2 * 60 * 1000L,
Importance.LOW,
LOG_SUMMARY_INTERVAL_MS_DOC);
}
// this is the list of configs for underlying clients

View File

@ -303,7 +303,7 @@ public class StreamThread extends Thread implements ProcessingThread {
private final Sensor commitRatioSensor;
private final Sensor failedStreamThreadSensor;
private static final long LOG_SUMMARY_INTERVAL_MS = 2 * 60 * 1000L; // log a summary of processing every 2 minutes
private final long logSummaryIntervalMs; // the count summary log output time interval
private long lastLogSummaryMs = -1L;
private long totalRecordsProcessedSinceLastSummary = 0L;
private long totalPunctuatorsSinceLastSummary = 0L;
@ -643,6 +643,7 @@ public class StreamThread extends Thread implements ProcessingThread {
this.processingMode = processingMode(config);
this.stateUpdaterEnabled = InternalConfig.getStateUpdaterEnabled(config.originals());
this.processingThreadsEnabled = InternalConfig.getProcessingThreadsEnabled(config.originals());
this.logSummaryIntervalMs = config.getLong(StreamsConfig.LOG_SUMMARY_INTERVAL_MS_CONFIG);
}
private static final class InternalConsumerConfig extends ConsumerConfig {
@ -1069,8 +1070,7 @@ public class StreamThread extends Thread implements ProcessingThread {
pollRatioSensor.record((double) pollLatency / runOnceLatency, now);
commitRatioSensor.record((double) totalCommitLatency / runOnceLatency, now);
final boolean logProcessingSummary = now - lastLogSummaryMs > LOG_SUMMARY_INTERVAL_MS;
if (logProcessingSummary) {
if (logSummaryIntervalMs > 0 && now - lastLogSummaryMs > logSummaryIntervalMs) {
log.info("Processed {} total records, ran {} punctuators, and committed {} total tasks since the last update",
totalRecordsProcessedSinceLastSummary, totalPunctuatorsSinceLastSummary, totalCommittedSinceLastSummary);
@ -1142,8 +1142,7 @@ public class StreamThread extends Thread implements ProcessingThread {
pollRatioSensor.record((double) pollLatency / runOnceLatency, now);
commitRatioSensor.record((double) totalCommitLatency / runOnceLatency, now);
final boolean logProcessingSummary = now - lastLogSummaryMs > LOG_SUMMARY_INTERVAL_MS;
if (logProcessingSummary) {
if (logSummaryIntervalMs > 0 && now - lastLogSummaryMs > logSummaryIntervalMs) {
log.info("Committed {} total tasks since the last update", totalCommittedSinceLastSummary);
totalCommittedSinceLastSummary = 0L;