mirror of https://github.com/apache/kafka.git
KAFKA-14659 source-record-write-[rate|total] metrics should exclude filtered records (#13193)
Reviewers: Christo Lolov <christololov@gmail.com>, Chris Egerton <chrise@aiven.io>
This commit is contained in:
parent
d143d349ec
commit
a1b8586a57
|
@ -578,6 +578,8 @@ public abstract class AbstractWorkerSourceTask extends WorkerTask {
|
|||
private final int batchSize;
|
||||
private boolean completed = false;
|
||||
private int counter;
|
||||
private int skipped; // Keeps track of filtered records
|
||||
|
||||
public SourceRecordWriteCounter(int batchSize, SourceTaskMetricsGroup metricsGroup) {
|
||||
assert batchSize > 0;
|
||||
assert metricsGroup != null;
|
||||
|
@ -586,6 +588,7 @@ public abstract class AbstractWorkerSourceTask extends WorkerTask {
|
|||
this.metricsGroup = metricsGroup;
|
||||
}
|
||||
public void skipRecord() {
|
||||
skipped += 1;
|
||||
if (counter > 0 && --counter == 0) {
|
||||
finishedAllWrites();
|
||||
}
|
||||
|
@ -600,7 +603,7 @@ public abstract class AbstractWorkerSourceTask extends WorkerTask {
|
|||
}
|
||||
private void finishedAllWrites() {
|
||||
if (!completed) {
|
||||
metricsGroup.recordWrite(batchSize - counter);
|
||||
metricsGroup.recordWrite(batchSize - counter, skipped);
|
||||
completed = true;
|
||||
}
|
||||
}
|
||||
|
@ -652,8 +655,8 @@ public abstract class AbstractWorkerSourceTask extends WorkerTask {
|
|||
sourceRecordActiveCount.record(activeRecordCount);
|
||||
}
|
||||
|
||||
void recordWrite(int recordCount) {
|
||||
sourceRecordWrite.record(recordCount);
|
||||
void recordWrite(int recordCount, int skippedCount) {
|
||||
sourceRecordWrite.record(recordCount - skippedCount);
|
||||
activeRecordCount -= recordCount;
|
||||
activeRecordCount = Math.max(0, activeRecordCount);
|
||||
sourceRecordActiveCount.record(activeRecordCount);
|
||||
|
|
|
@ -179,15 +179,14 @@ public class ConnectMetricsRegistry {
|
|||
"belonging to the named source connector in this worker.",
|
||||
sourceTaskTags);
|
||||
sourceRecordWriteRate = createTemplate("source-record-write-rate", SOURCE_TASK_GROUP_NAME,
|
||||
"The average per-second number of records output from the transformations and written" +
|
||||
" to Kafka for this task belonging to the named source connector in this worker. This" +
|
||||
" is after transformations are applied and excludes any records filtered out by the " +
|
||||
"transformations.",
|
||||
"The average per-second number of records written to Kafka for this task belonging to the " +
|
||||
"named source connector in this worker, since the task was last restarted. This is after " +
|
||||
"transformations are applied, and excludes any records filtered out by the transformations.",
|
||||
sourceTaskTags);
|
||||
sourceRecordWriteTotal = createTemplate("source-record-write-total", SOURCE_TASK_GROUP_NAME,
|
||||
"The number of records output from the transformations and written to Kafka for this" +
|
||||
" task belonging to the named source connector in this worker, since the task was " +
|
||||
"last restarted.",
|
||||
"The number of records output written to Kafka for this task belonging to the " +
|
||||
"named source connector in this worker, since the task was last restarted. This is after " +
|
||||
"transformations are applied, and excludes any records filtered out by the transformations.",
|
||||
sourceTaskTags);
|
||||
sourceRecordPollBatchTimeMax = createTemplate("poll-batch-max-time-ms", SOURCE_TASK_GROUP_NAME,
|
||||
"The maximum time in milliseconds taken by this task to poll for a batch of " +
|
||||
|
|
|
@ -194,18 +194,18 @@ public class AbstractWorkerSourceTaskTest {
|
|||
AbstractWorkerSourceTask.SourceTaskMetricsGroup group1 = new AbstractWorkerSourceTask.SourceTaskMetricsGroup(taskId1, metrics);
|
||||
for (int i = 0; i != 10; ++i) {
|
||||
group.recordPoll(100, 1000 + i * 100);
|
||||
group.recordWrite(10);
|
||||
group.recordWrite(10, 2);
|
||||
}
|
||||
for (int i = 0; i != 20; ++i) {
|
||||
group1.recordPoll(100, 1000 + i * 100);
|
||||
group1.recordWrite(10);
|
||||
group1.recordWrite(10, 4);
|
||||
}
|
||||
assertEquals(1900.0, metrics.currentMetricValueAsDouble(group.metricGroup(), "poll-batch-max-time-ms"), 0.001d);
|
||||
assertEquals(1450.0, metrics.currentMetricValueAsDouble(group.metricGroup(), "poll-batch-avg-time-ms"), 0.001d);
|
||||
assertEquals(33.333, metrics.currentMetricValueAsDouble(group.metricGroup(), "source-record-poll-rate"), 0.001d);
|
||||
assertEquals(1000, metrics.currentMetricValueAsDouble(group.metricGroup(), "source-record-poll-total"), 0.001d);
|
||||
assertEquals(3.3333, metrics.currentMetricValueAsDouble(group.metricGroup(), "source-record-write-rate"), 0.001d);
|
||||
assertEquals(100, metrics.currentMetricValueAsDouble(group.metricGroup(), "source-record-write-total"), 0.001d);
|
||||
assertEquals(2.666, metrics.currentMetricValueAsDouble(group.metricGroup(), "source-record-write-rate"), 0.001d);
|
||||
assertEquals(80, metrics.currentMetricValueAsDouble(group.metricGroup(), "source-record-write-total"), 0.001d);
|
||||
assertEquals(900.0, metrics.currentMetricValueAsDouble(group.metricGroup(), "source-record-active-count"), 0.001d);
|
||||
|
||||
// Close the group
|
||||
|
@ -229,8 +229,8 @@ public class AbstractWorkerSourceTaskTest {
|
|||
assertEquals(1950.0, metrics.currentMetricValueAsDouble(group1.metricGroup(), "poll-batch-avg-time-ms"), 0.001d);
|
||||
assertEquals(66.667, metrics.currentMetricValueAsDouble(group1.metricGroup(), "source-record-poll-rate"), 0.001d);
|
||||
assertEquals(2000, metrics.currentMetricValueAsDouble(group1.metricGroup(), "source-record-poll-total"), 0.001d);
|
||||
assertEquals(6.667, metrics.currentMetricValueAsDouble(group1.metricGroup(), "source-record-write-rate"), 0.001d);
|
||||
assertEquals(200, metrics.currentMetricValueAsDouble(group1.metricGroup(), "source-record-write-total"), 0.001d);
|
||||
assertEquals(4.0, metrics.currentMetricValueAsDouble(group1.metricGroup(), "source-record-write-rate"), 0.001d);
|
||||
assertEquals(120, metrics.currentMetricValueAsDouble(group1.metricGroup(), "source-record-write-total"), 0.001d);
|
||||
assertEquals(1800.0, metrics.currentMetricValueAsDouble(group1.metricGroup(), "source-record-active-count"), 0.001d);
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue