MINOR: Fixing client telemetry validate request (#19959)

Minor fix to correct the validate condition for GetTelemetryRequests.
Added respective tests as well.

Reviewers: Andrew Schofield <aschofield@confluent.io>
This commit is contained in:
Apoorv Mittal 2025-06-12 22:52:50 +01:00
parent 923b6c3fea
commit 4c2d86f411
2 changed files with 110 additions and 2 deletions

View File

@ -391,7 +391,7 @@ public class ClientMetricsManager implements AutoCloseable {
ClientMetricsInstance clientInstance, long timestamp) {
if (!clientInstance.maybeUpdateGetRequestTimestamp(timestamp) && (clientInstance.lastKnownError() != Errors.UNKNOWN_SUBSCRIPTION_ID
|| clientInstance.lastKnownError() != Errors.UNSUPPORTED_COMPRESSION_TYPE)) {
&& clientInstance.lastKnownError() != Errors.UNSUPPORTED_COMPRESSION_TYPE)) {
clientMetricsStats.recordThrottleCount(clientInstance.clientInstanceId());
String msg = String.format("Request from the client [%s] arrived before the next push interval time",
request.data().clientInstanceId());

View File

@ -826,7 +826,6 @@ public class ClientMetricsManagerTest {
assertEquals((double) 0, getMetric(ClientMetricsManager.ClientMetricsStats.PLUGIN_ERROR + "-count").metricValue());
assertEquals(Double.NaN, getMetric(ClientMetricsManager.ClientMetricsStats.PLUGIN_EXPORT_TIME + "-avg").metricValue());
assertEquals(Double.NaN, getMetric(ClientMetricsManager.ClientMetricsStats.PLUGIN_EXPORT_TIME + "-max").metricValue());
}
@Test
@ -1124,6 +1123,115 @@ public class ClientMetricsManagerTest {
}
}
@Test
public void testGetTelemetrySubscriptionAfterPushTelemetryUnknownSubscriptionSucceeds() throws Exception {
clientMetricsManager.updateSubscription("sub-1", ClientMetricsTestUtils.defaultTestProperties());
assertEquals(1, clientMetricsManager.subscriptions().size());
GetTelemetrySubscriptionsRequest subscriptionsRequest = new GetTelemetrySubscriptionsRequest.Builder(
new GetTelemetrySubscriptionsRequestData(), true).build();
GetTelemetrySubscriptionsResponse subscriptionsResponse = clientMetricsManager.processGetTelemetrySubscriptionRequest(
subscriptionsRequest, ClientMetricsTestUtils.requestContext());
Properties properties = new Properties();
properties.put("interval.ms", "100");
clientMetricsManager.updateSubscription("sub-2", properties);
assertEquals(2, clientMetricsManager.subscriptions().size());
PushTelemetryRequest request = new Builder(
new PushTelemetryRequestData()
.setClientInstanceId(subscriptionsResponse.data().clientInstanceId())
.setSubscriptionId(subscriptionsResponse.data().subscriptionId())
.setCompressionType(CompressionType.NONE.id)
.setMetrics(ByteBuffer.wrap("test-data".getBytes(StandardCharsets.UTF_8))), true).build();
PushTelemetryResponse response = clientMetricsManager.processPushTelemetryRequest(
request, ClientMetricsTestUtils.requestContext());
assertEquals(Errors.UNKNOWN_SUBSCRIPTION_ID, response.error());
ClientMetricsInstance instance = clientMetricsManager.clientInstance(subscriptionsResponse.data().clientInstanceId());
assertNotNull(instance);
assertEquals(Errors.UNKNOWN_SUBSCRIPTION_ID, instance.lastKnownError());
subscriptionsRequest = new GetTelemetrySubscriptionsRequest.Builder(
new GetTelemetrySubscriptionsRequestData().setClientInstanceId(subscriptionsResponse.data().clientInstanceId()), true).build();
subscriptionsResponse = clientMetricsManager.processGetTelemetrySubscriptionRequest(
subscriptionsRequest, ClientMetricsTestUtils.requestContext());
assertEquals(Errors.NONE, subscriptionsResponse.error());
}
@Test
public void testGetTelemetrySubscriptionAfterPushTelemetryUnknownCompressionSucceeds() throws Exception {
clientMetricsManager.updateSubscription("sub-1", ClientMetricsTestUtils.defaultTestProperties());
assertEquals(1, clientMetricsManager.subscriptions().size());
GetTelemetrySubscriptionsRequest subscriptionsRequest = new GetTelemetrySubscriptionsRequest.Builder(
new GetTelemetrySubscriptionsRequestData(), true).build();
GetTelemetrySubscriptionsResponse subscriptionsResponse = clientMetricsManager.processGetTelemetrySubscriptionRequest(
subscriptionsRequest, ClientMetricsTestUtils.requestContext());
PushTelemetryRequest request = new Builder(
new PushTelemetryRequestData()
.setClientInstanceId(subscriptionsResponse.data().clientInstanceId())
.setSubscriptionId(subscriptionsResponse.data().subscriptionId())
.setCompressionType((byte) 10) // // Invalid compression type
.setMetrics(ByteBuffer.wrap("test-data".getBytes(StandardCharsets.UTF_8))), true).build();
PushTelemetryResponse response = clientMetricsManager.processPushTelemetryRequest(
request, ClientMetricsTestUtils.requestContext());
assertEquals(Errors.UNSUPPORTED_COMPRESSION_TYPE, response.error());
ClientMetricsInstance instance = clientMetricsManager.clientInstance(subscriptionsResponse.data().clientInstanceId());
assertNotNull(instance);
assertEquals(Errors.UNSUPPORTED_COMPRESSION_TYPE, instance.lastKnownError());
subscriptionsRequest = new GetTelemetrySubscriptionsRequest.Builder(
new GetTelemetrySubscriptionsRequestData().setClientInstanceId(subscriptionsResponse.data().clientInstanceId()), true).build();
subscriptionsResponse = clientMetricsManager.processGetTelemetrySubscriptionRequest(
subscriptionsRequest, ClientMetricsTestUtils.requestContext());
assertEquals(Errors.NONE, subscriptionsResponse.error());
}
@Test
public void testGetTelemetrySubscriptionAfterPushTelemetryBytesExceptionFails() throws Exception {
try (
Metrics kafkaMetrics = new Metrics();
ClientMetricsManager clientMetricsManager = new ClientMetricsManager(clientMetricsReceiverPlugin, 1, time, kafkaMetrics)
) {
GetTelemetrySubscriptionsRequest subscriptionsRequest = new GetTelemetrySubscriptionsRequest.Builder(
new GetTelemetrySubscriptionsRequestData(), true).build();
GetTelemetrySubscriptionsResponse subscriptionsResponse = clientMetricsManager.processGetTelemetrySubscriptionRequest(
subscriptionsRequest, ClientMetricsTestUtils.requestContext());
byte[] metrics = "ab".getBytes(StandardCharsets.UTF_8);
assertEquals(2, metrics.length);
PushTelemetryRequest request = new PushTelemetryRequest.Builder(
new PushTelemetryRequestData()
.setClientInstanceId(subscriptionsResponse.data().clientInstanceId())
.setSubscriptionId(subscriptionsResponse.data().subscriptionId())
.setMetrics(ByteBuffer.wrap(metrics)), true).build();
// Set the max bytes 1 to force the error.
PushTelemetryResponse response = clientMetricsManager.processPushTelemetryRequest(
request, ClientMetricsTestUtils.requestContext());
assertEquals(Errors.TELEMETRY_TOO_LARGE, response.error());
ClientMetricsInstance instance = clientMetricsManager.clientInstance(subscriptionsResponse.data().clientInstanceId());
assertNotNull(instance);
assertEquals(Errors.TELEMETRY_TOO_LARGE, instance.lastKnownError());
subscriptionsRequest = new GetTelemetrySubscriptionsRequest.Builder(
new GetTelemetrySubscriptionsRequestData().setClientInstanceId(subscriptionsResponse.data().clientInstanceId()), true).build();
subscriptionsResponse = clientMetricsManager.processGetTelemetrySubscriptionRequest(
subscriptionsRequest, ClientMetricsTestUtils.requestContext());
assertEquals(Errors.THROTTLING_QUOTA_EXCEEDED, subscriptionsResponse.error());
}
}
@Test
public void testCacheEviction() throws Exception {
Properties properties = new Properties();