mirror of https://github.com/apache/kafka.git
KAFKA-19506 Implement dynamic compression type selection and fallback for client telemetry (#20144)
#### Summary This PR implements dynamic compression type selection and fallback mechanism for client telemetry to handle cases where compression libraries are not available on the client classpath. #### Problem Currently, when a compression library is missing (e.g., NoClassDefFoundError), the client telemetry system catches the generic Throwable but doesn't learn from the failure. This means, the same unsupported compression type will be attempted on every telemetry push #### Solution This PR introduces a comprehensive fallback mechanism: - Specific Exception Handling: Replace generic Throwable catching with specific exceptions (IOException, NoClassDefFoundError) - Unsupported Compression Tracking: Add unsupportedCompressionTypes collection to track compression types that have failed due to missing libraries - Dynamic Selection: Enhance ClientTelemetryUtils.preferredCompressionType() to accept an unsupported types parameter and filter out known problematic compression types - Thread Safety: Use ConcurrentHashMap.newKeySet() for thread-safe access to the unsupported types collection - Improved Logging: Include exception details in log messages for better debugging #### Key Changes - Modified createPushRequest() to track failed compression types in unsupportedCompressionTypes - Updated ClientTelemetryUtils.preferredCompressionType() to filter out unsupported types - Enhanced exception handling with specific exception types instead of Throwable #### Testing - Added appropriate Unit tests - Testing apache kafka on local logs: ``` ✗ cat ~/Desktop/kafka-client.log | grep " org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter" 2025-07-17 07:56:52:602 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Creating telemetry subscription request with client instance id AAAAAAAAAAAAAAAAAAAAAA 2025-07-17 07:56:52:602 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Setting telemetry state from SUBSCRIPTION_NEEDED to SUBSCRIPTION_IN_PROGRESS 2025-07-17 07:56:52:640 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Setting telemetry state from SUBSCRIPTION_IN_PROGRESS to PUSH_NEEDED 2025-07-17 07:56:52:640 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Telemetry subscription push interval value from broker was 5000; to stagger requests the first push interval is being adjusted to 4551 2025-07-17 07:56:52:640 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Updating subscription - subscription: ClientTelemetrySubscription{clientInstanceId=aVd3fzviRGSgEuAWNY5mMA, subscriptionId=1650084878, pushIntervalMs=5000, acceptedCompressionTypes=[zstd, lz4, snappy, none], deltaTemporality=true, selector=org.apache.kafka.common.telemetry.internals.ClientTelemetryUtils$$Lambda$308/0x00000005011ce470@2f16e398}; intervalMs: 4551, lastRequestMs: 1752739012639 2025-07-17 07:56:52:640 [kafka-producer-network-thread | kr-kafka-producer] INFO org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Client telemetry registered with client instance id: aVd3fzviRGSgEuAWNY5mMA 2025-07-17 07:56:57:196 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Creating telemetry push request with client instance id aVd3fzviRGSgEuAWNY5mMA 2025-07-17 07:56:57:196 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Setting telemetry state from PUSH_NEEDED to PUSH_IN_PROGRESS 2025-07-17 07:56:57:224 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Compression library zstd not found, sending uncompressed data at org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter$DefaultClientTelemetrySender.createPushRequest(ClientTelemetryReporter.java:722) at org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter$DefaultClientTelemetrySender.createPushRequest(ClientTelemetryReporter.java:703) at org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter$DefaultClientTelemetrySender.createRequest(ClientTelemetryReporter.java:389) 2025-07-17 07:56:57:295 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Setting telemetry state from PUSH_IN_PROGRESS to PUSH_NEEDED 2025-07-17 07:57:02:296 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Creating telemetry push request with client instance id aVd3fzviRGSgEuAWNY5mMA 2025-07-17 07:57:02:297 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Setting telemetry state from PUSH_NEEDED to PUSH_IN_PROGRESS 2025-07-17 07:57:02:300 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Compression library lz4 not found, sending uncompressed data at org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter$DefaultClientTelemetrySender.createPushRequest(ClientTelemetryReporter.java:722) at org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter$DefaultClientTelemetrySender.createPushRequest(ClientTelemetryReporter.java:703) at org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter$DefaultClientTelemetrySender.createRequest(ClientTelemetryReporter.java:389) 2025-07-17 07:57:02:329 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Setting telemetry state from PUSH_IN_PROGRESS to PUSH_NEEDED 2025-07-17 07:57:07:329 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Creating telemetry push request with client instance id aVd3fzviRGSgEuAWNY5mMA 2025-07-17 07:57:07:330 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Setting telemetry state from PUSH_NEEDED to PUSH_IN_PROGRESS 2025-07-17 07:57:07:331 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Compression library snappy not found, sending uncompressed data at org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter$DefaultClientTelemetrySender.createPushRequest(ClientTelemetryReporter.java:722) at org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter$DefaultClientTelemetrySender.createPushRequest(ClientTelemetryReporter.java:703) at org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter$DefaultClientTelemetrySender.createRequest(ClientTelemetryReporter.java:389) 2025-07-17 07:57:07:344 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Setting telemetry state from PUSH_IN_PROGRESS to PUSH_NEEDED 2025-07-17 07:57:12:346 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Creating telemetry push request with client instance id aVd3fzviRGSgEuAWNY5mMA 2025-07-17 07:57:12:346 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Setting telemetry state from PUSH_NEEDED to PUSH_IN_PROGRESS 2025-07-17 07:57:12:400 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Setting telemetry state from PUSH_IN_PROGRESS to PUSH_NEEDED 2025-07-17 07:57:17:402 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Creating telemetry push request with client instance id aVd3fzviRGSgEuAWNY5mMA 2025-07-17 07:57:17:402 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Setting telemetry state from PUSH_NEEDED to PUSH_IN_PROGRESS 2025-07-17 07:57:17:442 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Setting telemetry state from PUSH_IN_PROGRESS to PUSH_NEEDED 2025-07-17 07:57:22:442 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Creating telemetry push request with client instance id aVd3fzviRGSgEuAWNY5mMA 2025-07-17 07:57:22:442 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Setting telemetry state from PUSH_NEEDED to PUSH_IN_PROGRESS 2025-07-17 07:57:22:508 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Setting telemetry state from PUSH_IN_PROGRESS to PUSH_NEEDED 2025-07-17 07:57:27:512 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Creating telemetry push request with client instance id aVd3fzviRGSgEuAWNY5mMA 2025-07-17 07:57:27:512 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Setting telemetry state from PUSH_NEEDED to PUSH_IN_PROGRESS 2025-07-17 07:57:27:555 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Setting telemetry state from PUSH_IN_PROGRESS to PUSH_NEEDED 2025-07-17 07:57:32:555 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Creating telemetry push request with client instance id aVd3fzviRGSgEuAWNY5mMA 2025-07-17 07:57:32:555 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Setting telemetry state from PUSH_NEEDED to PUSH_IN_PROGRESS 2025-07-17 07:57:32:578 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Setting telemetry state from PUSH_IN_PROGRESS to PUSH_NEEDED 2025-07-17 07:57:37:580 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Creating telemetry push request with client instance id aVd3fzviRGSgEuAWNY5mMA 2025-07-17 07:57:37:580 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Setting telemetry state from PUSH_NEEDED to PUSH_IN_PROGRESS 2025-07-17 07:57:37:606 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Setting telemetry state from PUSH_IN_PROGRESS to PUSH_NEEDED 2025-07-17 07:57:42:606 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Creating telemetry push request with client instance id aVd3fzviRGSgEuAWNY5mMA 2025-07-17 07:57:42:606 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Setting telemetry state from PUSH_NEEDED to PUSH_IN_PROGRESS 2025-07-17 07:57:42:646 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Setting telemetry state from PUSH_IN_PROGRESS to PUSH_NEEDED 2025-07-17 07:57:47:647 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Creating telemetry push request with client instance id aVd3fzviRGSgEuAWNY5mMA 2025-07-17 07:57:47:647 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Setting telemetry state from PUSH_NEEDED to PUSH_IN_PROGRESS 2025-07-17 07:57:47:673 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Setting telemetry state from PUSH_IN_PROGRESS to PUSH_NEEDED 2025-07-17 07:57:52:673 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Creating telemetry push request with client instance id aVd3fzviRGSgEuAWNY5mMA 2025-07-17 07:57:52:673 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Setting telemetry state from PUSH_NEEDED to PUSH_IN_PROGRESS 2025-07-17 07:57:52:711 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Setting telemetry state from PUSH_IN_PROGRESS to PUSH_NEEDED 2025-07-17 07:57:57:711 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Creating telemetry push request with client instance id aVd3fzviRGSgEuAWNY5mMA 2025-07-17 07:57:57:711 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Setting telemetry state from PUSH_NEEDED to PUSH_IN_PROGRESS 2025-07-17 07:57:57:765 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Setting telemetry state from PUSH_IN_PROGRESS to PUSH_NEEDED ``` Reviewers: poorv Mittal <apoorvmittal10@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
parent
eba9839776
commit
b4bf0bf693
|
@ -50,6 +50,7 @@ import java.util.Objects;
|
|||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.StringJoiner;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.locks.Condition;
|
||||
|
@ -269,6 +270,7 @@ public class ClientTelemetryReporter implements MetricsReporter {
|
|||
private static final double INITIAL_PUSH_JITTER_LOWER = 0.5;
|
||||
private static final double INITIAL_PUSH_JITTER_UPPER = 1.5;
|
||||
|
||||
private final Set<CompressionType> unsupportedCompressionTypes = ConcurrentHashMap.newKeySet();
|
||||
private final ReadWriteLock lock = new ReentrantReadWriteLock();
|
||||
private final Condition subscriptionLoaded = lock.writeLock().newCondition();
|
||||
/*
|
||||
|
@ -713,12 +715,26 @@ public class ClientTelemetryReporter implements MetricsReporter {
|
|||
return Optional.empty();
|
||||
}
|
||||
|
||||
CompressionType compressionType = ClientTelemetryUtils.preferredCompressionType(localSubscription.acceptedCompressionTypes());
|
||||
CompressionType compressionType = ClientTelemetryUtils.preferredCompressionType(localSubscription.acceptedCompressionTypes(), unsupportedCompressionTypes);
|
||||
ByteBuffer compressedPayload;
|
||||
try {
|
||||
compressedPayload = ClientTelemetryUtils.compress(payload, compressionType);
|
||||
} catch (Throwable e) {
|
||||
log.debug("Failed to compress telemetry payload for compression: {}, sending uncompressed data", compressionType);
|
||||
// Distinguish between recoverable errors (NoClassDefFoundError for missing compression libs)
|
||||
// and fatal errors (OutOfMemoryError, etc.) that should terminate telemetry.
|
||||
if (e instanceof Error && !(e instanceof NoClassDefFoundError) && !(e.getCause() instanceof NoClassDefFoundError)) {
|
||||
lock.writeLock().lock();
|
||||
try {
|
||||
state = ClientTelemetryState.TERMINATED;
|
||||
} finally {
|
||||
lock.writeLock().unlock();
|
||||
}
|
||||
log.error("Unexpected error occurred while compressing telemetry payload for compression: {}, stopping client telemetry", compressionType, e);
|
||||
throw new KafkaException("Unexpected compression error", e);
|
||||
}
|
||||
|
||||
log.debug("Failed to compress telemetry payload for compression: {}, sending uncompressed data", compressionType, e);
|
||||
unsupportedCompressionTypes.add(compressionType);
|
||||
compressedPayload = ByteBuffer.wrap(payload.toByteArray());
|
||||
compressionType = CompressionType.NONE;
|
||||
}
|
||||
|
|
|
@ -39,6 +39,7 @@ import java.util.Collections;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.function.Predicate;
|
||||
|
||||
import io.opentelemetry.proto.metrics.v1.MetricsData;
|
||||
|
@ -181,13 +182,23 @@ public class ClientTelemetryUtils {
|
|||
return validateResourceLabel(metadata, MetricsContext.NAMESPACE);
|
||||
}
|
||||
|
||||
public static CompressionType preferredCompressionType(List<CompressionType> acceptedCompressionTypes) {
|
||||
if (acceptedCompressionTypes != null && !acceptedCompressionTypes.isEmpty()) {
|
||||
// Broker is providing the compression types in order of preference. Grab the
|
||||
// first one.
|
||||
return acceptedCompressionTypes.get(0);
|
||||
}
|
||||
return CompressionType.NONE;
|
||||
/**
|
||||
* Determines the preferred compression type from broker-accepted types, avoiding unsupported ones.
|
||||
*
|
||||
* @param acceptedCompressionTypes the list of compression types accepted by the broker in order
|
||||
* of preference (must not be null, use empty list if no compression is accepted)
|
||||
* @param unsupportedCompressionTypes the set of compression types that should be avoided due to
|
||||
* missing libraries or previous failures (must not be null)
|
||||
* @return the preferred compression type to use, or {@link CompressionType#NONE} if no acceptable
|
||||
* compression type is available
|
||||
*/
|
||||
public static CompressionType preferredCompressionType(List<CompressionType> acceptedCompressionTypes, Set<CompressionType> unsupportedCompressionTypes) {
|
||||
// Broker is providing the compression types in order of preference. Grab the
|
||||
// first one that's supported.
|
||||
return acceptedCompressionTypes.stream()
|
||||
.filter(t -> !unsupportedCompressionTypes.contains(t))
|
||||
.findFirst()
|
||||
.orElse(CompressionType.NONE);
|
||||
}
|
||||
|
||||
public static ByteBuffer compress(MetricsData metrics, CompressionType compressionType) throws IOException {
|
||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.kafka.common.telemetry.internals;
|
|||
import org.apache.kafka.clients.CommonClientConfigs;
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
import org.apache.kafka.clients.producer.ProducerConfig;
|
||||
import org.apache.kafka.common.KafkaException;
|
||||
import org.apache.kafka.common.Uuid;
|
||||
import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData;
|
||||
import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData;
|
||||
|
@ -63,8 +64,10 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
|
|||
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.ArgumentMatchers.eq;
|
||||
|
||||
public class ClientTelemetryReporterTest {
|
||||
|
||||
|
@ -413,6 +416,134 @@ public class ClientTelemetryReporterTest {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCreateRequestPushCompressionFallbackToNextType() {
|
||||
clientTelemetryReporter.configure(configs);
|
||||
clientTelemetryReporter.contextChange(metricsContext);
|
||||
|
||||
ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = (ClientTelemetryReporter.DefaultClientTelemetrySender) clientTelemetryReporter.telemetrySender();
|
||||
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
|
||||
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED));
|
||||
|
||||
// Set up subscription with multiple compression types: GZIP -> LZ4 -> SNAPPY
|
||||
ClientTelemetryReporter.ClientTelemetrySubscription subscription = new ClientTelemetryReporter.ClientTelemetrySubscription(
|
||||
uuid, 1234, 20000, List.of(CompressionType.GZIP, CompressionType.LZ4, CompressionType.SNAPPY), true, null);
|
||||
telemetrySender.updateSubscriptionResult(subscription, time.milliseconds());
|
||||
|
||||
try (MockedStatic<ClientTelemetryUtils> mockedCompress = Mockito.mockStatic(ClientTelemetryUtils.class, new CallsRealMethods())) {
|
||||
// First request: GZIP fails with NoClassDefFoundError, should use NONE for this request
|
||||
mockedCompress.when(() -> ClientTelemetryUtils.compress(any(), eq(CompressionType.GZIP))).thenThrow(new NoClassDefFoundError("GZIP not available"));
|
||||
|
||||
Optional<AbstractRequest.Builder<?>> requestOptional = telemetrySender.createRequest();
|
||||
assertNotNull(requestOptional);
|
||||
assertTrue(requestOptional.isPresent());
|
||||
assertInstanceOf(PushTelemetryRequest.class, requestOptional.get().build());
|
||||
PushTelemetryRequest request = (PushTelemetryRequest) requestOptional.get().build();
|
||||
|
||||
// Should fallback to NONE for this request (GZIP gets cached as unsupported)
|
||||
assertEquals(CompressionType.NONE.id, request.data().compressionType());
|
||||
assertEquals(ClientTelemetryState.PUSH_IN_PROGRESS, telemetrySender.state());
|
||||
|
||||
// Reset state for next request
|
||||
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED));
|
||||
|
||||
// Second request: LZ4 is selected (since GZIP is now cached as unsupported), LZ4 fails, should use NONE
|
||||
// Note that some libraries eg. LZ4 return KafkaException with cause as NoClassDefFoundError
|
||||
mockedCompress.when(() -> ClientTelemetryUtils.compress(any(), eq(CompressionType.LZ4))).thenThrow(new KafkaException(new NoClassDefFoundError("LZ4 not available")));
|
||||
|
||||
requestOptional = telemetrySender.createRequest();
|
||||
assertNotNull(requestOptional);
|
||||
assertTrue(requestOptional.isPresent());
|
||||
assertInstanceOf(PushTelemetryRequest.class, requestOptional.get().build());
|
||||
request = (PushTelemetryRequest) requestOptional.get().build();
|
||||
|
||||
// Should fallback to NONE for this request (LZ4 gets cached as unsupported)
|
||||
assertEquals(CompressionType.NONE.id, request.data().compressionType());
|
||||
assertEquals(ClientTelemetryState.PUSH_IN_PROGRESS, telemetrySender.state());
|
||||
|
||||
// Reset state for next request
|
||||
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED));
|
||||
|
||||
// Third request: SNAPPY is selected (since GZIP and LZ4 are now cached as unsupported), SNAPPY fails, should use NONE
|
||||
mockedCompress.when(() -> ClientTelemetryUtils.compress(any(), eq(CompressionType.SNAPPY))).thenThrow(new NoClassDefFoundError("SNAPPY not available"));
|
||||
|
||||
requestOptional = telemetrySender.createRequest();
|
||||
assertNotNull(requestOptional);
|
||||
assertTrue(requestOptional.isPresent());
|
||||
assertInstanceOf(PushTelemetryRequest.class, requestOptional.get().build());
|
||||
request = (PushTelemetryRequest) requestOptional.get().build();
|
||||
|
||||
// Should fallback to NONE for this request (SNAPPY gets cached as unsupported)
|
||||
assertEquals(CompressionType.NONE.id, request.data().compressionType());
|
||||
assertEquals(ClientTelemetryState.PUSH_IN_PROGRESS, telemetrySender.state());
|
||||
|
||||
// Reset state for next request
|
||||
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED));
|
||||
|
||||
// Fourth request: All compression types are now cached as unsupported, should use NONE directly
|
||||
requestOptional = telemetrySender.createRequest();
|
||||
assertNotNull(requestOptional);
|
||||
assertTrue(requestOptional.isPresent());
|
||||
assertInstanceOf(PushTelemetryRequest.class, requestOptional.get().build());
|
||||
request = (PushTelemetryRequest) requestOptional.get().build();
|
||||
|
||||
// Should use NONE directly (no compression types are supported)
|
||||
assertEquals(CompressionType.NONE.id, request.data().compressionType());
|
||||
assertEquals(ClientTelemetryState.PUSH_IN_PROGRESS, telemetrySender.state());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCreateRequestPushCompressionFallbackAndTermination() {
|
||||
clientTelemetryReporter.configure(configs);
|
||||
clientTelemetryReporter.contextChange(metricsContext);
|
||||
|
||||
ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = (ClientTelemetryReporter.DefaultClientTelemetrySender) clientTelemetryReporter.telemetrySender();
|
||||
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
|
||||
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED));
|
||||
|
||||
// Set up subscription with ZSTD compression type
|
||||
ClientTelemetryReporter.ClientTelemetrySubscription subscription = new ClientTelemetryReporter.ClientTelemetrySubscription(
|
||||
uuid, 1234, 20000, List.of(CompressionType.ZSTD, CompressionType.LZ4), true, null);
|
||||
telemetrySender.updateSubscriptionResult(subscription, time.milliseconds());
|
||||
|
||||
try (MockedStatic<ClientTelemetryUtils> mockedCompress = Mockito.mockStatic(ClientTelemetryUtils.class, new CallsRealMethods())) {
|
||||
|
||||
// === Test 1: NoClassDefFoundError fallback (recoverable) ===
|
||||
mockedCompress.when(() -> ClientTelemetryUtils.compress(any(), eq(CompressionType.ZSTD)))
|
||||
.thenThrow(new NoClassDefFoundError("com/github/luben/zstd/BufferPool"));
|
||||
|
||||
assertEquals(ClientTelemetryState.PUSH_NEEDED, telemetrySender.state());
|
||||
|
||||
Optional<AbstractRequest.Builder<?>> request1 = telemetrySender.createRequest();
|
||||
assertNotNull(request1);
|
||||
assertTrue(request1.isPresent());
|
||||
assertInstanceOf(PushTelemetryRequest.class, request1.get().build());
|
||||
PushTelemetryRequest pushRequest1 = (PushTelemetryRequest) request1.get().build();
|
||||
assertEquals(CompressionType.NONE.id, pushRequest1.data().compressionType()); // Fallback to NONE
|
||||
assertEquals(ClientTelemetryState.PUSH_IN_PROGRESS, telemetrySender.state());
|
||||
|
||||
// Reset state (simulate successful response handling)
|
||||
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED));
|
||||
|
||||
// === Test 2: OutOfMemoryError causes termination (non-recoverable Error) ===
|
||||
mockedCompress.reset();
|
||||
mockedCompress.when(() -> ClientTelemetryUtils.compress(any(), eq(CompressionType.LZ4)))
|
||||
.thenThrow(new OutOfMemoryError("Out of memory during compression"));
|
||||
|
||||
assertEquals(ClientTelemetryState.PUSH_NEEDED, telemetrySender.state());
|
||||
|
||||
assertThrows(KafkaException.class, () -> telemetrySender.createRequest());
|
||||
assertEquals(ClientTelemetryState.TERMINATED, telemetrySender.state());
|
||||
|
||||
// === Test 3: After termination, no more requests ===
|
||||
Optional<AbstractRequest.Builder<?>> request3 = telemetrySender.createRequest();
|
||||
assertNotNull(request3);
|
||||
assertFalse(request3.isPresent()); // No request created
|
||||
assertEquals(ClientTelemetryState.TERMINATED, telemetrySender.state()); // State remains TERMINATED
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testHandleResponseGetSubscriptions() {
|
||||
ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = (ClientTelemetryReporter.DefaultClientTelemetrySender) clientTelemetryReporter.telemetrySender();
|
||||
|
|
|
@ -30,10 +30,9 @@ import java.io.IOException;
|
|||
import java.nio.ByteBuffer;
|
||||
import java.time.Instant;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.function.Predicate;
|
||||
|
||||
import io.opentelemetry.proto.metrics.v1.Metric;
|
||||
|
@ -69,12 +68,12 @@ public class ClientTelemetryUtilsTest {
|
|||
@Test
|
||||
public void testGetSelectorFromRequestedMetrics() {
|
||||
// no metrics selector
|
||||
assertEquals(ClientTelemetryUtils.SELECTOR_NO_METRICS, ClientTelemetryUtils.getSelectorFromRequestedMetrics(Collections.emptyList()));
|
||||
assertEquals(ClientTelemetryUtils.SELECTOR_NO_METRICS, ClientTelemetryUtils.getSelectorFromRequestedMetrics(List.of()));
|
||||
assertEquals(ClientTelemetryUtils.SELECTOR_NO_METRICS, ClientTelemetryUtils.getSelectorFromRequestedMetrics(null));
|
||||
// all metrics selector
|
||||
assertEquals(ClientTelemetryUtils.SELECTOR_ALL_METRICS, ClientTelemetryUtils.getSelectorFromRequestedMetrics(Collections.singletonList("*")));
|
||||
assertEquals(ClientTelemetryUtils.SELECTOR_ALL_METRICS, ClientTelemetryUtils.getSelectorFromRequestedMetrics(List.of("*")));
|
||||
// specific metrics selector
|
||||
Predicate<? super MetricKeyable> selector = ClientTelemetryUtils.getSelectorFromRequestedMetrics(Arrays.asList("metric1", "metric2"));
|
||||
Predicate<? super MetricKeyable> selector = ClientTelemetryUtils.getSelectorFromRequestedMetrics(List.of("metric1", "metric2"));
|
||||
assertNotEquals(ClientTelemetryUtils.SELECTOR_NO_METRICS, selector);
|
||||
assertNotEquals(ClientTelemetryUtils.SELECTOR_ALL_METRICS, selector);
|
||||
assertTrue(selector.test(new MetricKey("metric1.test")));
|
||||
|
@ -86,7 +85,7 @@ public class ClientTelemetryUtilsTest {
|
|||
@Test
|
||||
public void testGetCompressionTypesFromAcceptedList() {
|
||||
assertEquals(0, ClientTelemetryUtils.getCompressionTypesFromAcceptedList(null).size());
|
||||
assertEquals(0, ClientTelemetryUtils.getCompressionTypesFromAcceptedList(Collections.emptyList()).size());
|
||||
assertEquals(0, ClientTelemetryUtils.getCompressionTypesFromAcceptedList(List.of()).size());
|
||||
|
||||
List<Byte> compressionTypes = new ArrayList<>();
|
||||
compressionTypes.add(CompressionType.GZIP.id);
|
||||
|
@ -123,10 +122,24 @@ public class ClientTelemetryUtilsTest {
|
|||
|
||||
@Test
|
||||
public void testPreferredCompressionType() {
|
||||
assertEquals(CompressionType.NONE, ClientTelemetryUtils.preferredCompressionType(Collections.emptyList()));
|
||||
assertEquals(CompressionType.NONE, ClientTelemetryUtils.preferredCompressionType(null));
|
||||
assertEquals(CompressionType.NONE, ClientTelemetryUtils.preferredCompressionType(Arrays.asList(CompressionType.NONE, CompressionType.GZIP)));
|
||||
assertEquals(CompressionType.GZIP, ClientTelemetryUtils.preferredCompressionType(Arrays.asList(CompressionType.GZIP, CompressionType.NONE)));
|
||||
// Test with no unsupported types
|
||||
assertEquals(CompressionType.NONE, ClientTelemetryUtils.preferredCompressionType(List.of(), Set.of()));
|
||||
assertEquals(CompressionType.NONE, ClientTelemetryUtils.preferredCompressionType(List.of(CompressionType.NONE, CompressionType.GZIP), Set.of()));
|
||||
assertEquals(CompressionType.GZIP, ClientTelemetryUtils.preferredCompressionType(List.of(CompressionType.GZIP, CompressionType.NONE), Set.of()));
|
||||
|
||||
// Test unsupported type filtering (returns first available type, or NONE if all are unsupported)
|
||||
assertEquals(CompressionType.LZ4, ClientTelemetryUtils.preferredCompressionType(List.of(CompressionType.GZIP, CompressionType.LZ4), Set.of(CompressionType.GZIP)));
|
||||
assertEquals(CompressionType.SNAPPY, ClientTelemetryUtils.preferredCompressionType(List.of(CompressionType.GZIP, CompressionType.LZ4, CompressionType.SNAPPY), Set.of(CompressionType.GZIP, CompressionType.LZ4)));
|
||||
assertEquals(CompressionType.NONE, ClientTelemetryUtils.preferredCompressionType(List.of(CompressionType.GZIP, CompressionType.LZ4), Set.of(CompressionType.GZIP, CompressionType.LZ4)));
|
||||
|
||||
// Test edge case: no match between requested and supported types
|
||||
assertEquals(CompressionType.GZIP, ClientTelemetryUtils.preferredCompressionType(List.of(CompressionType.GZIP, CompressionType.LZ4), Set.of(CompressionType.SNAPPY)));
|
||||
|
||||
// Test NullPointerException for null parameters
|
||||
assertThrows(NullPointerException.class, () ->
|
||||
ClientTelemetryUtils.preferredCompressionType(null, Set.of()));
|
||||
assertThrows(NullPointerException.class, () ->
|
||||
ClientTelemetryUtils.preferredCompressionType(List.of(CompressionType.GZIP, CompressionType.NONE), null));
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
|
@ -150,19 +163,19 @@ public class ClientTelemetryUtilsTest {
|
|||
private MetricsData getMetricsData() {
|
||||
List<Metric> metricsList = new ArrayList<>();
|
||||
metricsList.add(SinglePointMetric.sum(
|
||||
new MetricKey("metricName"), 1.0, true, Instant.now(), null, Collections.emptySet())
|
||||
new MetricKey("metricName"), 1.0, true, Instant.now(), null, Set.of())
|
||||
.builder().build());
|
||||
metricsList.add(SinglePointMetric.sum(
|
||||
new MetricKey("metricName1"), 100.0, false, Instant.now(), Instant.now(), Collections.emptySet())
|
||||
new MetricKey("metricName1"), 100.0, false, Instant.now(), Instant.now(), Set.of())
|
||||
.builder().build());
|
||||
metricsList.add(SinglePointMetric.deltaSum(
|
||||
new MetricKey("metricName2"), 1.0, true, Instant.now(), Instant.now(), Collections.emptySet())
|
||||
new MetricKey("metricName2"), 1.0, true, Instant.now(), Instant.now(), Set.of())
|
||||
.builder().build());
|
||||
metricsList.add(SinglePointMetric.gauge(
|
||||
new MetricKey("metricName3"), 1.0, Instant.now(), Collections.emptySet())
|
||||
new MetricKey("metricName3"), 1.0, Instant.now(), Set.of())
|
||||
.builder().build());
|
||||
metricsList.add(SinglePointMetric.gauge(
|
||||
new MetricKey("metricName4"), Long.valueOf(100), Instant.now(), Collections.emptySet())
|
||||
new MetricKey("metricName4"), Long.valueOf(100), Instant.now(), Set.of())
|
||||
.builder().build());
|
||||
|
||||
MetricsData.Builder builder = MetricsData.newBuilder();
|
||||
|
|
Loading…
Reference in New Issue