From 70c51641fbf54c3cea12b2ab2d730683813d00e8 Mon Sep 17 00:00:00 2001 From: Kaushik Raina <103954755+k-raina@users.noreply.github.com> Date: Wed, 16 Jul 2025 22:08:02 +0530 Subject: [PATCH] Cherrypick "MINOR : Handle error for client telemetry push (#19881)" (#20176) Update catch to handle compression errors Before : ![image](https://github.com/user-attachments/assets/c5ca121e-ba0c-4664-91f1-20b54abf67cc) After ``` Sent message: KR Message 376 [kafka-producer-network-thread | kr-kafka-producer] INFO org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - KR: Failed to compress telemetry payload for compression: zstd, sending uncompressed data Sent message: KR Message 377 ``` Reviewers: Apoorv Mittal , Bill Bejeck , Chia-Ping Tsai --- .../common/telemetry/internals/ClientTelemetryReporter.java | 5 ++--- .../java/org/apache/kafka/server/ClientMetricsManager.java | 2 +- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java index 705aafaaa70..e0491943fef 100644 --- a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java +++ b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java @@ -41,7 +41,6 @@ import org.apache.kafka.common.utils.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; import java.nio.ByteBuffer; import java.time.Duration; import java.util.Collections; @@ -718,8 +717,8 @@ public class ClientTelemetryReporter implements MetricsReporter { ByteBuffer compressedPayload; try { compressedPayload = ClientTelemetryUtils.compress(payload, compressionType); - } catch (IOException e) { - log.info("Failed to compress telemetry payload for compression: {}, sending uncompressed data", compressionType); + } catch (Throwable e) { + log.debug("Failed to compress telemetry payload for compression: {}, sending uncompressed data", compressionType); compressedPayload = ByteBuffer.wrap(payload.toByteArray()); compressionType = CompressionType.NONE; } diff --git a/server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java b/server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java index dcd17a3ecc0..2487ccb6f3d 100644 --- a/server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java +++ b/server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java @@ -217,7 +217,7 @@ public class ClientMetricsManager implements AutoCloseable { long exportTimeStartMs = time.hiResClockMs(); receiverPlugin.exportMetrics(requestContext, request); clientMetricsStats.recordPluginExport(clientInstanceId, time.hiResClockMs() - exportTimeStartMs); - } catch (Exception exception) { + } catch (Throwable exception) { clientMetricsStats.recordPluginErrorCount(clientInstanceId); clientInstance.lastKnownError(Errors.INVALID_RECORD); log.error("Error exporting client metrics to the plugin for client instance id: {}", clientInstanceId, exception);