mirror of https://github.com/apache/kafka.git
Cherrypick "MINOR : Handle error for client telemetry push (#19881)" (#20176)
CI / build (push) Has been cancelled
Details
CI / build (push) Has been cancelled
Details
Update catch to handle compression errors Before :  After ``` Sent message: KR Message 376 [kafka-producer-network-thread | kr-kafka-producer] INFO org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - KR: Failed to compress telemetry payload for compression: zstd, sending uncompressed data Sent message: KR Message 377 ``` Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>, Bill Bejeck <bbejeck@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
parent
eefee6d58d
commit
70c51641fb
|
@ -41,7 +41,6 @@ import org.apache.kafka.common.utils.Time;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.time.Duration;
|
import java.time.Duration;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
@ -718,8 +717,8 @@ public class ClientTelemetryReporter implements MetricsReporter {
|
||||||
ByteBuffer compressedPayload;
|
ByteBuffer compressedPayload;
|
||||||
try {
|
try {
|
||||||
compressedPayload = ClientTelemetryUtils.compress(payload, compressionType);
|
compressedPayload = ClientTelemetryUtils.compress(payload, compressionType);
|
||||||
} catch (IOException e) {
|
} catch (Throwable e) {
|
||||||
log.info("Failed to compress telemetry payload for compression: {}, sending uncompressed data", compressionType);
|
log.debug("Failed to compress telemetry payload for compression: {}, sending uncompressed data", compressionType);
|
||||||
compressedPayload = ByteBuffer.wrap(payload.toByteArray());
|
compressedPayload = ByteBuffer.wrap(payload.toByteArray());
|
||||||
compressionType = CompressionType.NONE;
|
compressionType = CompressionType.NONE;
|
||||||
}
|
}
|
||||||
|
|
|
@ -217,7 +217,7 @@ public class ClientMetricsManager implements AutoCloseable {
|
||||||
long exportTimeStartMs = time.hiResClockMs();
|
long exportTimeStartMs = time.hiResClockMs();
|
||||||
receiverPlugin.exportMetrics(requestContext, request);
|
receiverPlugin.exportMetrics(requestContext, request);
|
||||||
clientMetricsStats.recordPluginExport(clientInstanceId, time.hiResClockMs() - exportTimeStartMs);
|
clientMetricsStats.recordPluginExport(clientInstanceId, time.hiResClockMs() - exportTimeStartMs);
|
||||||
} catch (Exception exception) {
|
} catch (Throwable exception) {
|
||||||
clientMetricsStats.recordPluginErrorCount(clientInstanceId);
|
clientMetricsStats.recordPluginErrorCount(clientInstanceId);
|
||||||
clientInstance.lastKnownError(Errors.INVALID_RECORD);
|
clientInstance.lastKnownError(Errors.INVALID_RECORD);
|
||||||
log.error("Error exporting client metrics to the plugin for client instance id: {}", clientInstanceId, exception);
|
log.error("Error exporting client metrics to the plugin for client instance id: {}", clientInstanceId, exception);
|
||||||
|
|
Loading…
Reference in New Issue