KAFKA-14282: stop tracking Produced sensors by processor node id (#12836)

Users have been seeing a large number of these error messages being logged by the RecordCollectorImpl:

Unable to records bytes produced to topic XXX by sink node YYY as the node is not recognized.
It seems like we try to save all known sink nodes when the record collector is constructed, by we do so by going through the known sink topics which means we could miss some nodes, for example if dynamic topic routing is used. Previously we were logging an error and would skip recording the metric if we tried to send a record from a sink node it didn't recognize, but there's not really any reason to have been tracking the sensors by node in the first place -- we can just track the actual sink topics themselves.

Reviewers: John Roesler <vvcephei@apache.org>, Christo Lolov <lolovc@amazon.com>
This commit is contained in:
A. Sophie Blee-Goldman 2022-11-11 17:58:08 -08:00 committed by GitHub
parent 876c338a60
commit 51b7eb7937
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 49 additions and 57 deletions

View File

@ -70,7 +70,7 @@ public class RecordCollectorImpl implements RecordCollector {
private final StreamsMetricsImpl streamsMetrics;
private final Sensor droppedRecordsSensor;
private final Map<String, Map<String, Sensor>> sinkNodeToProducedSensorByTopic = new HashMap<>();
private final Map<String, Sensor> producedSensorByTopic = new HashMap<>();
private final AtomicReference<KafkaException> sendException = new AtomicReference<>(null);
@ -94,7 +94,7 @@ public class RecordCollectorImpl implements RecordCollector {
this.droppedRecordsSensor = TaskMetrics.droppedRecordsSensor(threadId, taskId.toString(), streamsMetrics);
for (final String topic : topology.sinkTopics()) {
final String processorNodeId = topology.sink(topic).name();
sinkNodeToProducedSensorByTopic.computeIfAbsent(processorNodeId, t -> new HashMap<>()).put(
producedSensorByTopic.put(
topic,
TopicMetrics.producedSensor(
threadId,
@ -219,26 +219,20 @@ public class RecordCollectorImpl implements RecordCollector {
}
if (!topic.endsWith("-changelog")) {
final Map<String, Sensor> producedSensorByTopic = sinkNodeToProducedSensorByTopic.get(processorNodeId);
if (producedSensorByTopic == null) {
log.error("Unable to records bytes produced to topic {} by sink node {} as the node is not recognized.\n"
+ "Known sink nodes are {}.", topic, processorNodeId, sinkNodeToProducedSensorByTopic.keySet());
} else {
// we may not have created a sensor during initialization if the node uses dynamic topic routing,
// as all topics are not known up front, so create the sensor for that topic if absent
final Sensor topicProducedSensor = producedSensorByTopic.computeIfAbsent(
// we may not have created a sensor during initialization if the node uses dynamic topic routing,
// as all topics are not known up front, so create the sensor for this topic if absent
final Sensor topicProducedSensor = producedSensorByTopic.computeIfAbsent(
topic,
t -> TopicMetrics.producedSensor(
Thread.currentThread().getName(),
taskId.toString(),
processorNodeId,
topic,
t -> TopicMetrics.producedSensor(
Thread.currentThread().getName(),
taskId.toString(),
processorNodeId,
topic,
context.metrics()
)
);
final long bytesProduced = producerRecordSizeInBytes(serializedRecord);
topicProducedSensor.record(bytesProduced, context.currentSystemTimeMs());
}
context.metrics()
)
);
final long bytesProduced = producerRecordSizeInBytes(serializedRecord);
topicProducedSensor.record(bytesProduced, context.currentSystemTimeMs());
}
} else {
recordSendError(topic, exception, serializedRecord);
@ -341,10 +335,8 @@ public class RecordCollectorImpl implements RecordCollector {
}
private void removeAllProducedSensors() {
for (final Map<String, Sensor> nodeMap : sinkNodeToProducedSensorByTopic.values()) {
for (final Sensor sensor : nodeMap.values()) {
streamsMetrics.removeSensor(sensor);
}
for (final Sensor sensor : producedSensorByTopic.values()) {
streamsMetrics.removeSensor(sensor);
}
}

View File

@ -239,12 +239,12 @@ public class RecordCollectorTest {
public void shouldSendToSpecificPartition() {
final Headers headers = new RecordHeaders(new Header[] {new RecordHeader("key", "value".getBytes())});
collector.send(topic, "999", "0", null, 0, null, stringSerializer, stringSerializer, null, null);
collector.send(topic, "999", "0", null, 0, null, stringSerializer, stringSerializer, null, null);
collector.send(topic, "999", "0", null, 0, null, stringSerializer, stringSerializer, null, null);
collector.send(topic, "999", "0", headers, 1, null, stringSerializer, stringSerializer, null, null);
collector.send(topic, "999", "0", headers, 1, null, stringSerializer, stringSerializer, null, null);
collector.send(topic, "999", "0", headers, 2, null, stringSerializer, stringSerializer, null, null);
collector.send(topic, "999", "0", null, 0, null, stringSerializer, stringSerializer, null, context);
collector.send(topic, "999", "0", null, 0, null, stringSerializer, stringSerializer, null, context);
collector.send(topic, "999", "0", null, 0, null, stringSerializer, stringSerializer, null, context);
collector.send(topic, "999", "0", headers, 1, null, stringSerializer, stringSerializer, null, context);
collector.send(topic, "999", "0", headers, 1, null, stringSerializer, stringSerializer, null, context);
collector.send(topic, "999", "0", headers, 2, null, stringSerializer, stringSerializer, null, context);
Map<TopicPartition, Long> offsets = collector.offsets();
@ -253,9 +253,9 @@ public class RecordCollectorTest {
assertEquals(0L, (long) offsets.get(new TopicPartition(topic, 2)));
assertEquals(6, mockProducer.history().size());
collector.send(topic, "999", "0", null, 0, null, stringSerializer, stringSerializer, null, null);
collector.send(topic, "999", "0", null, 1, null, stringSerializer, stringSerializer, null, null);
collector.send(topic, "999", "0", headers, 2, null, stringSerializer, stringSerializer, null, null);
collector.send(topic, "999", "0", null, 0, null, stringSerializer, stringSerializer, null, context);
collector.send(topic, "999", "0", null, 1, null, stringSerializer, stringSerializer, null, context);
collector.send(topic, "999", "0", headers, 2, null, stringSerializer, stringSerializer, null, context);
offsets = collector.offsets();
@ -269,15 +269,15 @@ public class RecordCollectorTest {
public void shouldSendWithPartitioner() {
final Headers headers = new RecordHeaders(new Header[] {new RecordHeader("key", "value".getBytes())});
collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, null, null, streamPartitioner);
collector.send(topic, "9", "0", null, null, stringSerializer, stringSerializer, null, null, streamPartitioner);
collector.send(topic, "27", "0", null, null, stringSerializer, stringSerializer, null, null, streamPartitioner);
collector.send(topic, "81", "0", null, null, stringSerializer, stringSerializer, null, null, streamPartitioner);
collector.send(topic, "243", "0", null, null, stringSerializer, stringSerializer, null, null, streamPartitioner);
collector.send(topic, "28", "0", headers, null, stringSerializer, stringSerializer, null, null, streamPartitioner);
collector.send(topic, "82", "0", headers, null, stringSerializer, stringSerializer, null, null, streamPartitioner);
collector.send(topic, "244", "0", headers, null, stringSerializer, stringSerializer, null, null, streamPartitioner);
collector.send(topic, "245", "0", null, null, stringSerializer, stringSerializer, null, null, streamPartitioner);
collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, null, context, streamPartitioner);
collector.send(topic, "9", "0", null, null, stringSerializer, stringSerializer, null, context, streamPartitioner);
collector.send(topic, "27", "0", null, null, stringSerializer, stringSerializer, null, context, streamPartitioner);
collector.send(topic, "81", "0", null, null, stringSerializer, stringSerializer, null, context, streamPartitioner);
collector.send(topic, "243", "0", null, null, stringSerializer, stringSerializer, null, context, streamPartitioner);
collector.send(topic, "28", "0", headers, null, stringSerializer, stringSerializer, null, context, streamPartitioner);
collector.send(topic, "82", "0", headers, null, stringSerializer, stringSerializer, null, context, streamPartitioner);
collector.send(topic, "244", "0", headers, null, stringSerializer, stringSerializer, null, context, streamPartitioner);
collector.send(topic, "245", "0", null, null, stringSerializer, stringSerializer, null, context, streamPartitioner);
final Map<TopicPartition, Long> offsets = collector.offsets();
@ -295,15 +295,15 @@ public class RecordCollectorTest {
public void shouldSendWithNoPartition() {
final Headers headers = new RecordHeaders(new Header[] {new RecordHeader("key", "value".getBytes())});
collector.send(topic, "3", "0", headers, null, null, stringSerializer, stringSerializer, null, null);
collector.send(topic, "9", "0", headers, null, null, stringSerializer, stringSerializer, null, null);
collector.send(topic, "27", "0", headers, null, null, stringSerializer, stringSerializer, null, null);
collector.send(topic, "81", "0", headers, null, null, stringSerializer, stringSerializer, null, null);
collector.send(topic, "243", "0", headers, null, null, stringSerializer, stringSerializer, null, null);
collector.send(topic, "28", "0", headers, null, null, stringSerializer, stringSerializer, null, null);
collector.send(topic, "82", "0", headers, null, null, stringSerializer, stringSerializer, null, null);
collector.send(topic, "244", "0", headers, null, null, stringSerializer, stringSerializer, null, null);
collector.send(topic, "245", "0", headers, null, null, stringSerializer, stringSerializer, null, null);
collector.send(topic, "3", "0", headers, null, null, stringSerializer, stringSerializer, null, context);
collector.send(topic, "9", "0", headers, null, null, stringSerializer, stringSerializer, null, context);
collector.send(topic, "27", "0", headers, null, null, stringSerializer, stringSerializer, null, context);
collector.send(topic, "81", "0", headers, null, null, stringSerializer, stringSerializer, null, context);
collector.send(topic, "243", "0", headers, null, null, stringSerializer, stringSerializer, null, context);
collector.send(topic, "28", "0", headers, null, null, stringSerializer, stringSerializer, null, context);
collector.send(topic, "82", "0", headers, null, null, stringSerializer, stringSerializer, null, context);
collector.send(topic, "244", "0", headers, null, null, stringSerializer, stringSerializer, null, context);
collector.send(topic, "245", "0", headers, null, null, stringSerializer, stringSerializer, null, context);
final Map<TopicPartition, Long> offsets = collector.offsets();
@ -318,9 +318,9 @@ public class RecordCollectorTest {
public void shouldUpdateOffsetsUponCompletion() {
Map<TopicPartition, Long> offsets = collector.offsets();
collector.send(topic, "999", "0", null, 0, null, stringSerializer, stringSerializer, null, null);
collector.send(topic, "999", "0", null, 1, null, stringSerializer, stringSerializer, null, null);
collector.send(topic, "999", "0", null, 2, null, stringSerializer, stringSerializer, null, null);
collector.send(topic, "999", "0", null, 0, null, stringSerializer, stringSerializer, null, context);
collector.send(topic, "999", "0", null, 1, null, stringSerializer, stringSerializer, null, context);
collector.send(topic, "999", "0", null, 2, null, stringSerializer, stringSerializer, null, context);
assertEquals(Collections.<TopicPartition, Long>emptyMap(), offsets);
@ -338,7 +338,7 @@ public class RecordCollectorTest {
final CustomStringSerializer valueSerializer = new CustomStringSerializer();
keySerializer.configure(Collections.emptyMap(), true);
collector.send(topic, "3", "0", new RecordHeaders(), null, keySerializer, valueSerializer, null, null, streamPartitioner);
collector.send(topic, "3", "0", new RecordHeaders(), null, keySerializer, valueSerializer, null, context, streamPartitioner);
final List<ProducerRecord<byte[], byte[]>> recordHistory = mockProducer.history();
for (final ProducerRecord<byte[], byte[]> sentRecord : recordHistory) {
@ -571,7 +571,7 @@ public class RecordCollectorTest {
final StreamsException exception = assertThrows(
StreamsException.class,
() -> collector.send(topic, "0", "0", null, null, stringSerializer, stringSerializer, null, null, streamPartitioner)
() -> collector.send(topic, "0", "0", null, null, stringSerializer, stringSerializer, null, context, streamPartitioner)
);
assertThat(
exception.getMessage(),